From c0b2ecf7029d1e6b238962c2278ca69ae84a3c46 Mon Sep 17 00:00:00 2001 From: blxdyx Date: Tue, 26 Nov 2024 15:46:07 +0800 Subject: [PATCH] enable fast node --- core/state/rw_v3.go | 19 ++++++++++------ erigon-lib/state/domain_shared.go | 36 ++++++++++++++++++++----------- eth/stagedsync/exec3.go | 35 ++++++++++++++++++------------ 3 files changed, 57 insertions(+), 33 deletions(-) diff --git a/core/state/rw_v3.go b/core/state/rw_v3.go index e31a3730509..d6d3a1b1cdd 100644 --- a/core/state/rw_v3.go +++ b/core/state/rw_v3.go @@ -194,13 +194,18 @@ func (rs *StateV3) ApplyState4(ctx context.Context, txTask *TxTask) error { } if (txTask.TxNum+1)%rs.domains.StepSize() == 0 /*&& txTask.TxNum > 0 */ { - // We do not update txNum before commitment cuz otherwise committed state will be in the beginning of next file, not in the latest. - // That's why we need to make txnum++ on SeekCommitment to get exact txNum for the latest committed state. - //fmt.Printf("[commitment] running due to txNum reached aggregation step %d\n", txNum/rs.domains.StepSize()) - _, err := rs.domains.ComputeCommitment(ctx, true, txTask.BlockNum, - fmt.Sprintf("applying step %d", txTask.TxNum/rs.domains.StepSize())) - if err != nil { - return fmt.Errorf("StateV3.ComputeCommitment: %w", err) + if dbg.DiscardCommitment() { + rs.domains.ResetCommitment() + _ = rs.domains.SaveCommitment(txTask.BlockNum, txTask.Header.Root.Bytes()) + } else { + // We do not update txNum before commitment cuz otherwise committed state will be in the beginning of next file, not in the latest. + // That's why we need to make txnum++ on SeekCommitment to get exact txNum for the latest committed state. + //fmt.Printf("[commitment] running due to txNum reached aggregation step %d\n", txNum/rs.domains.StepSize()) + _, err := rs.domains.ComputeCommitment(ctx, true, txTask.BlockNum, + fmt.Sprintf("applying step %d", txTask.TxNum/rs.domains.StepSize())) + if err != nil { + return fmt.Errorf("StateV3.ComputeCommitment: %w", err) + } } } diff --git a/erigon-lib/state/domain_shared.go b/erigon-lib/state/domain_shared.go index 6252d890e7f..17ba04d7386 100644 --- a/erigon-lib/state/domain_shared.go +++ b/erigon-lib/state/domain_shared.go @@ -353,6 +353,9 @@ func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromB } sd.SetBlockNum(bn) sd.SetTxNum(txn) + if dbg.DiscardCommitment() { + return 0, nil + } newRh, err := sd.rebuildCommitment(ctx, tx, bn) if err != nil { return 0, err @@ -385,6 +388,14 @@ func (sd *SharedDomains) ClearRam(resetCommitment bool) { sd.estSize = 0 } +func (sd *SharedDomains) ResetCommitment() { + sd.sdCtx.updates.Reset() +} + +func (sd *SharedDomains) SaveCommitment(blockNum uint64, rootHash []byte) error { + return sd.sdCtx.storeCommitmentState(blockNum, rootHash) +} + func (sd *SharedDomains) put(domain kv.Domain, key string, val []byte) { // disable mutex - because work on parallel execution postponed after E3 release. //sd.muMaps.Lock() @@ -907,13 +918,18 @@ func (sd *SharedDomains) Flush(ctx context.Context, tx kv.RwTx) error { sd.pastChangesAccumulator = make(map[string]*StateChangeSet) defer mxFlushTook.ObserveDuration(time.Now()) - fh, err := sd.ComputeCommitment(ctx, true, sd.BlockNum(), "flush-commitment") - if err != nil { - return err - } - if sd.trace { - _, f, l, _ := runtime.Caller(1) - fmt.Printf("[SD aggTx=%d] FLUSHING at tx %d [%x], caller %s:%d\n", sd.aggTx.id, sd.TxNum(), fh, filepath.Base(f), l) + var err error + if dbg.DiscardCommitment() { + sd.ResetCommitment() + } else { + fh, err := sd.ComputeCommitment(ctx, true, sd.BlockNum(), "flush-commitment") + if err != nil { + return err + } + if sd.trace { + _, f, l, _ := runtime.Caller(1) + fmt.Printf("[SD aggTx=%d] FLUSHING at tx %d [%x], caller %s:%d\n", sd.aggTx.id, sd.TxNum(), fh, filepath.Base(f), l) + } } for _, w := range sd.domainWriters { if w == nil { @@ -1119,7 +1135,7 @@ func (sdc *SharedDomainsCommitmentContext) SetLimitReadAsOfTxNum(txNum uint64) { func NewSharedDomainsCommitmentContext(sd *SharedDomains, mode commitment.Mode, trieVariant commitment.TrieVariant) *SharedDomainsCommitmentContext { ctx := &SharedDomainsCommitmentContext{ sharedDomains: sd, - discard: dbg.DiscardCommitment(), + discard: false, branches: make(map[string]cachedBranch), keccak: sha3.NewLegacyKeccak256().(cryptozerocopy.KeccakState), } @@ -1295,10 +1311,6 @@ func (sdc *SharedDomainsCommitmentContext) TouchKey(d kv.Domain, key string, val // Evaluates commitment for processed state. func (sdc *SharedDomainsCommitmentContext) ComputeCommitment(ctx context.Context, saveState bool, blockNum uint64, logPrefix string) (rootHash []byte, err error) { - if dbg.DiscardCommitment() { - sdc.updates.Reset() - return nil, nil - } sdc.ResetBranchCache() defer sdc.ResetBranchCache() diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 5b6b88e0149..b570e52e5b5 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -687,8 +687,12 @@ Loop: aggTx := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx) aggTx.RestrictSubsetFileDeletions(true) start := time.Now() - if _, err := doms.ComputeCommitment(ctx, true, blockNum, execStage.LogPrefix()); err != nil { - return err + if dbg.DiscardCommitment() { + _ = doms.SaveCommitment(blockNum, b.Root().Bytes()) + } else { + if _, err := doms.ComputeCommitment(ctx, true, blockNum, execStage.LogPrefix()); err != nil { + return err + } } ts += time.Since(start) aggTx.RestrictSubsetFileDeletions(false) @@ -923,27 +927,30 @@ func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyT return false, errors.New("header is nil") } - if dbg.DiscardCommitment() { - return true, nil - } if doms.BlockNum() != header.Number.Uint64() { panic(fmt.Errorf("%d != %d", doms.BlockNum(), header.Number.Uint64())) } - rh, err := doms.ComputeCommitment(ctx, true, header.Number.Uint64(), e.LogPrefix()) - if err != nil { - return false, fmt.Errorf("StateV3.Apply: %w", err) - } - if cfg.blockProduction { - header.Root = common.BytesToHash(rh) - return true, nil + var rh []byte + if dbg.DiscardCommitment() { + doms.ResetCommitment() + _ = doms.SaveCommitment(doms.BlockNum(), header.Root.Bytes()) + } else { + rh, err := doms.ComputeCommitment(ctx, true, header.Number.Uint64(), e.LogPrefix()) + if err != nil { + return false, fmt.Errorf("StateV3.Apply: %w", err) + } + if cfg.blockProduction { + header.Root = common.BytesToHash(rh) + return true, nil + } } - if bytes.Equal(rh, header.Root.Bytes()) { + if bytes.Equal(rh, header.Root.Bytes()) || dbg.DiscardCommitment() { if !inMemExec { if err := doms.Flush(ctx, applyTx); err != nil { return false, err } - if err = applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneCommitHistory(ctx, applyTx, nil); err != nil { + if err := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneCommitHistory(ctx, applyTx, nil); err != nil { return false, err } }