From f05dae47db889954143808fd3fce3fe5b08a00ce Mon Sep 17 00:00:00 2001 From: buddh0 Date: Wed, 27 Nov 2024 14:54:44 +0800 Subject: [PATCH] core/state: revert the interface of StateDB.Commit --- core/blockchain.go | 157 ++++++++++++++++++++---------------------- core/state/statedb.go | 8 +-- 2 files changed, 75 insertions(+), 90 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 41323f4578..743d7ef318 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -251,7 +251,6 @@ type BlockChain struct { snaps *snapshot.Tree // Snapshot tree for fast trie leaf access triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping - commitLock sync.Mutex // CommitLock is used to protect above field from being modified concurrently lastWrite uint64 // Last block when the state was flushed flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state triedb *triedb.Database // The database handler for maintaining trie nodes. @@ -1766,6 +1765,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Note all the components of block(td, hash->number map, header, body, receipts) // should be written atomically. BlockBatch is used for containing all components. wg := sync.WaitGroup{} + defer wg.Wait() wg.Add(1) go func() { blockBatch := bc.db.BlockStore().NewBatch() @@ -1793,88 +1793,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. wg.Done() }() - tryCommitTrieDB := func() error { - bc.commitLock.Lock() - defer bc.commitLock.Unlock() - - // If node is running in path mode, skip explicit gc operation - // which is unnecessary in this mode. - if bc.triedb.Scheme() == rawdb.PathScheme { - return nil - } - - triedb := bc.statedb.TrieDB() - // If we're running an archive node, always flush - if bc.cacheConfig.TrieDirtyDisabled { - return triedb.Commit(block.Root(), false) - } - // Full but not archive node, do proper garbage collection - triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive - bc.triegc.Push(block.Root(), -int64(block.NumberU64())) - - // Flush limits are not considered for the first TriesInMemory blocks. - current := block.NumberU64() - if current <= state.TriesInMemory { - return nil - } - // If we exceeded our memory allowance, flush matured singleton nodes to disk - var ( - _, nodes, _, imgs = triedb.Size() - limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 - ) - if nodes > limit || imgs > 4*1024*1024 { - triedb.Cap(limit - ethdb.IdealBatchSize) - } - // Find the next state trie we need to commit - chosen := current - state.TriesInMemory - flushInterval := time.Duration(bc.flushInterval.Load()) - // If we exceeded out time allowance, flush an entire trie to disk - if bc.gcproc > flushInterval { - canWrite := true - if posa, ok := bc.engine.(consensus.PoSA); ok { - if !posa.EnoughDistance(bc, block.Header()) { - canWrite = false - } - } - if canWrite { - // If the header is missing (canonical chain behind), we're reorging a low - // diff sidechain. Suspend committing until this operation is completed. - header := bc.GetHeaderByNumber(chosen) - if header == nil { - log.Warn("Reorg in progress, trie commit postponed", "number", chosen) - } else { - // If we're exceeding limits but haven't reached a large enough memory gap, - // warn the user that the system is becoming unstable. - if chosen < bc.lastWrite+state.TriesInMemory && bc.gcproc >= 2*flushInterval { - log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/float64(state.TriesInMemory)) - } - // Flush an entire trie and restart the counters - triedb.Commit(header.Root, true) - rawdb.WriteSafePointBlockNumber(bc.db, chosen) - bc.lastWrite = chosen - bc.gcproc = 0 - } - } - } - // Garbage collect anything below our required write retention - wg2 := sync.WaitGroup{} - for !bc.triegc.Empty() { - root, number := bc.triegc.Pop() - if uint64(-number) > chosen { - bc.triegc.Push(root, number) - break - } - wg2.Add(1) - go func() { - triedb.Dereference(root) - wg2.Done() - }() - } - wg2.Wait() - return nil - } // Commit all cached state changes into underlying memory database. - _, diffLayer, err := statedb.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), tryCommitTrieDB) + root, diffLayer, err := statedb.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number())) if err != nil { return err } @@ -1894,7 +1814,78 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. go bc.cacheDiffLayer(diffLayer, diffLayerCh) } - wg.Wait() + + // If node is running in path mode, skip explicit gc operation + // which is unnecessary in this mode. + if bc.triedb.Scheme() == rawdb.PathScheme { + return nil + } + // If we're running an archive node, always flush + if bc.cacheConfig.TrieDirtyDisabled { + return bc.triedb.Commit(root, false) + } + // Full but not archive node, do proper garbage collection + bc.triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive + bc.triegc.Push(block.Root(), -int64(block.NumberU64())) + + // Flush limits are not considered for the first TriesInMemory blocks. + current := block.NumberU64() + if current <= state.TriesInMemory { + return nil + } + // If we exceeded our memory allowance, flush matured singleton nodes to disk + var ( + _, nodes, _, imgs = bc.triedb.Size() + limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 + ) + if nodes > limit || imgs > 4*1024*1024 { + bc.triedb.Cap(limit - ethdb.IdealBatchSize) + } + // Find the next state trie we need to commit + chosen := current - state.TriesInMemory + flushInterval := time.Duration(bc.flushInterval.Load()) + // If we exceeded out time allowance, flush an entire trie to disk + if bc.gcproc > flushInterval { + canWrite := true + if posa, ok := bc.engine.(consensus.PoSA); ok { + if !posa.EnoughDistance(bc, block.Header()) { + canWrite = false + } + } + if canWrite { + // If the header is missing (canonical chain behind), we're reorging a low + // diff sidechain. Suspend committing until this operation is completed. + header := bc.GetHeaderByNumber(chosen) + if header == nil { + log.Warn("Reorg in progress, trie commit postponed", "number", chosen) + } else { + // If we're exceeding limits but haven't reached a large enough memory gap, + // warn the user that the system is becoming unstable. + if chosen < bc.lastWrite+state.TriesInMemory && bc.gcproc >= 2*flushInterval { + log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/float64(state.TriesInMemory)) + } + // Flush an entire trie and restart the counters + bc.triedb.Commit(header.Root, true) + rawdb.WriteSafePointBlockNumber(bc.db, chosen) + bc.lastWrite = chosen + bc.gcproc = 0 + } + } + } + // Garbage collect anything below our required write retention + for !bc.triegc.Empty() { + root, number := bc.triegc.Pop() + if uint64(-number) > chosen { + bc.triegc.Push(root, number) + break + } + wg.Add(1) + go func() { + bc.triedb.Dereference(root) + wg.Done() + }() + } + return nil } diff --git a/core/state/statedb.go b/core/state/statedb.go index ebe16900c4..14b50fec3b 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1488,17 +1488,11 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool) (*stateU // // The associated block number of the state transition is also provided // for more chain context. -func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool, postCommitFuncs ...func() error) (common.Hash, *types.DiffLayer, error) { +func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, *types.DiffLayer, error) { ret, err := s.commitAndFlush(block, deleteEmptyObjects) if err != nil { return common.Hash{}, nil, err } - for _, postFunc := range postCommitFuncs { - err := postFunc() - if err != nil { - return common.Hash{}, nil, err - } - } return ret.root, ret.diffLayer, nil }