Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state: revert the interface of StateDB.Commit #2774

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 74 additions & 83 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ type BlockChain struct {
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
commitLock sync.Mutex // CommitLock is used to protect above field from being modified concurrently
lastWrite uint64 // Last block when the state was flushed
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *triedb.Database // The database handler for maintaining trie nodes.
Expand Down Expand Up @@ -1766,6 +1765,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// Note all the components of block(td, hash->number map, header, body, receipts)
// should be written atomically. BlockBatch is used for containing all components.
wg := sync.WaitGroup{}
defer wg.Wait()
wg.Add(1)
go func() {
blockBatch := bc.db.BlockStore().NewBatch()
Expand Down Expand Up @@ -1793,88 +1793,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
wg.Done()
}()

tryCommitTrieDB := func() error {
bc.commitLock.Lock()
defer bc.commitLock.Unlock()

// If node is running in path mode, skip explicit gc operation
// which is unnecessary in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
return nil
}

triedb := bc.statedb.TrieDB()
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
return triedb.Commit(block.Root(), false)
}
// Full but not archive node, do proper garbage collection
triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(block.Root(), -int64(block.NumberU64()))

// Flush limits are not considered for the first TriesInMemory blocks.
current := block.NumberU64()
if current <= state.TriesInMemory {
return nil
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
_, nodes, _, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - state.TriesInMemory
flushInterval := time.Duration(bc.flushInterval.Load())
// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > flushInterval {
canWrite := true
if posa, ok := bc.engine.(consensus.PoSA); ok {
if !posa.EnoughDistance(bc, block.Header()) {
canWrite = false
}
}
if canWrite {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < bc.lastWrite+state.TriesInMemory && bc.gcproc >= 2*flushInterval {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/float64(state.TriesInMemory))
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true)
rawdb.WriteSafePointBlockNumber(bc.db, chosen)
bc.lastWrite = chosen
bc.gcproc = 0
}
}
}
// Garbage collect anything below our required write retention
wg2 := sync.WaitGroup{}
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
wg2.Add(1)
go func() {
triedb.Dereference(root)
wg2.Done()
}()
}
wg2.Wait()
return nil
}
// Commit all cached state changes into underlying memory database.
_, diffLayer, err := statedb.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), tryCommitTrieDB)
root, diffLayer, err := statedb.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return err
}
Expand All @@ -1894,7 +1814,78 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.

go bc.cacheDiffLayer(diffLayer, diffLayerCh)
}
wg.Wait()

// If node is running in path mode, skip explicit gc operation
// which is unnecessary in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
return nil
}
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
return bc.triedb.Commit(root, false)
}
// Full but not archive node, do proper garbage collection
bc.triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(block.Root(), -int64(block.NumberU64()))

// Flush limits are not considered for the first TriesInMemory blocks.
current := block.NumberU64()
if current <= state.TriesInMemory {
return nil
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
_, nodes, _, imgs = bc.triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - state.TriesInMemory
flushInterval := time.Duration(bc.flushInterval.Load())
// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > flushInterval {
canWrite := true
if posa, ok := bc.engine.(consensus.PoSA); ok {
if !posa.EnoughDistance(bc, block.Header()) {
canWrite = false
}
}
if canWrite {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < bc.lastWrite+state.TriesInMemory && bc.gcproc >= 2*flushInterval {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/float64(state.TriesInMemory))
}
// Flush an entire trie and restart the counters
bc.triedb.Commit(header.Root, true)
rawdb.WriteSafePointBlockNumber(bc.db, chosen)
bc.lastWrite = chosen
bc.gcproc = 0
}
}
}
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
wg.Add(1)
go func() {
bc.triedb.Dereference(root)
wg.Done()
}()
}

return nil
}

Expand Down
8 changes: 1 addition & 7 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1488,17 +1488,11 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool) (*stateU
//
// The associated block number of the state transition is also provided
// for more chain context.
func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool, postCommitFuncs ...func() error) (common.Hash, *types.DiffLayer, error) {
func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, *types.DiffLayer, error) {
ret, err := s.commitAndFlush(block, deleteEmptyObjects)
if err != nil {
return common.Hash{}, nil, err
}
for _, postFunc := range postCommitFuncs {
err := postFunc()
if err != nil {
return common.Hash{}, nil, err
}
}
return ret.root, ret.diffLayer, nil
}

Expand Down