Skip to content

Commit

Permalink
core/state: revert the interface of StateDB.Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
buddh0 committed Nov 27, 2024
1 parent c49ea92 commit f05dae4
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 90 deletions.
157 changes: 74 additions & 83 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ type BlockChain struct {
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
commitLock sync.Mutex // CommitLock is used to protect above field from being modified concurrently
lastWrite uint64 // Last block when the state was flushed
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *triedb.Database // The database handler for maintaining trie nodes.
Expand Down Expand Up @@ -1766,6 +1765,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// Note all the components of block(td, hash->number map, header, body, receipts)
// should be written atomically. BlockBatch is used for containing all components.
wg := sync.WaitGroup{}
defer wg.Wait()
wg.Add(1)
go func() {
blockBatch := bc.db.BlockStore().NewBatch()
Expand Down Expand Up @@ -1793,88 +1793,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
wg.Done()
}()

tryCommitTrieDB := func() error {
bc.commitLock.Lock()
defer bc.commitLock.Unlock()

// If node is running in path mode, skip explicit gc operation
// which is unnecessary in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
return nil
}

triedb := bc.statedb.TrieDB()
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
return triedb.Commit(block.Root(), false)
}
// Full but not archive node, do proper garbage collection
triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(block.Root(), -int64(block.NumberU64()))

// Flush limits are not considered for the first TriesInMemory blocks.
current := block.NumberU64()
if current <= state.TriesInMemory {
return nil
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
_, nodes, _, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - state.TriesInMemory
flushInterval := time.Duration(bc.flushInterval.Load())
// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > flushInterval {
canWrite := true
if posa, ok := bc.engine.(consensus.PoSA); ok {
if !posa.EnoughDistance(bc, block.Header()) {
canWrite = false
}
}
if canWrite {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < bc.lastWrite+state.TriesInMemory && bc.gcproc >= 2*flushInterval {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/float64(state.TriesInMemory))
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true)
rawdb.WriteSafePointBlockNumber(bc.db, chosen)
bc.lastWrite = chosen
bc.gcproc = 0
}
}
}
// Garbage collect anything below our required write retention
wg2 := sync.WaitGroup{}
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
wg2.Add(1)
go func() {
triedb.Dereference(root)
wg2.Done()
}()
}
wg2.Wait()
return nil
}
// Commit all cached state changes into underlying memory database.
_, diffLayer, err := statedb.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), tryCommitTrieDB)
root, diffLayer, err := statedb.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return err
}
Expand All @@ -1894,7 +1814,78 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.

go bc.cacheDiffLayer(diffLayer, diffLayerCh)
}
wg.Wait()

// If node is running in path mode, skip explicit gc operation
// which is unnecessary in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
return nil
}
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
return bc.triedb.Commit(root, false)
}
// Full but not archive node, do proper garbage collection
bc.triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(block.Root(), -int64(block.NumberU64()))

// Flush limits are not considered for the first TriesInMemory blocks.
current := block.NumberU64()
if current <= state.TriesInMemory {
return nil
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
_, nodes, _, imgs = bc.triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - state.TriesInMemory
flushInterval := time.Duration(bc.flushInterval.Load())
// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > flushInterval {
canWrite := true
if posa, ok := bc.engine.(consensus.PoSA); ok {
if !posa.EnoughDistance(bc, block.Header()) {
canWrite = false
}
}
if canWrite {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < bc.lastWrite+state.TriesInMemory && bc.gcproc >= 2*flushInterval {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/float64(state.TriesInMemory))
}
// Flush an entire trie and restart the counters
bc.triedb.Commit(header.Root, true)
rawdb.WriteSafePointBlockNumber(bc.db, chosen)
bc.lastWrite = chosen
bc.gcproc = 0
}
}
}
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
wg.Add(1)
go func() {
bc.triedb.Dereference(root)
wg.Done()
}()
}

return nil
}

Expand Down
8 changes: 1 addition & 7 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1488,17 +1488,11 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool) (*stateU
//
// The associated block number of the state transition is also provided
// for more chain context.
func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool, postCommitFuncs ...func() error) (common.Hash, *types.DiffLayer, error) {
func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, *types.DiffLayer, error) {
ret, err := s.commitAndFlush(block, deleteEmptyObjects)
if err != nil {
return common.Hash{}, nil, err
}
for _, postFunc := range postCommitFuncs {
err := postFunc()
if err != nil {
return common.Hash{}, nil, err
}
}
return ret.root, ret.diffLayer, nil
}

Expand Down

0 comments on commit f05dae4

Please sign in to comment.