Skip to content

Commit

Permalink
enable fast node
Browse files Browse the repository at this point in the history
  • Loading branch information
blxdyx committed Nov 26, 2024
1 parent 6afc003 commit c0b2ecf
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 33 deletions.
19 changes: 12 additions & 7 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,18 @@ func (rs *StateV3) ApplyState4(ctx context.Context, txTask *TxTask) error {
}

if (txTask.TxNum+1)%rs.domains.StepSize() == 0 /*&& txTask.TxNum > 0 */ {
// We do not update txNum before commitment cuz otherwise committed state will be in the beginning of next file, not in the latest.
// That's why we need to make txnum++ on SeekCommitment to get exact txNum for the latest committed state.
//fmt.Printf("[commitment] running due to txNum reached aggregation step %d\n", txNum/rs.domains.StepSize())
_, err := rs.domains.ComputeCommitment(ctx, true, txTask.BlockNum,
fmt.Sprintf("applying step %d", txTask.TxNum/rs.domains.StepSize()))
if err != nil {
return fmt.Errorf("StateV3.ComputeCommitment: %w", err)
if dbg.DiscardCommitment() {
rs.domains.ResetCommitment()
_ = rs.domains.SaveCommitment(txTask.BlockNum, txTask.Header.Root.Bytes())
} else {
// We do not update txNum before commitment cuz otherwise committed state will be in the beginning of next file, not in the latest.
// That's why we need to make txnum++ on SeekCommitment to get exact txNum for the latest committed state.
//fmt.Printf("[commitment] running due to txNum reached aggregation step %d\n", txNum/rs.domains.StepSize())
_, err := rs.domains.ComputeCommitment(ctx, true, txTask.BlockNum,
fmt.Sprintf("applying step %d", txTask.TxNum/rs.domains.StepSize()))
if err != nil {
return fmt.Errorf("StateV3.ComputeCommitment: %w", err)
}
}
}

Expand Down
36 changes: 24 additions & 12 deletions erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromB
}
sd.SetBlockNum(bn)
sd.SetTxNum(txn)
if dbg.DiscardCommitment() {
return 0, nil
}
newRh, err := sd.rebuildCommitment(ctx, tx, bn)
if err != nil {
return 0, err
Expand Down Expand Up @@ -385,6 +388,14 @@ func (sd *SharedDomains) ClearRam(resetCommitment bool) {
sd.estSize = 0
}

func (sd *SharedDomains) ResetCommitment() {
sd.sdCtx.updates.Reset()
}

func (sd *SharedDomains) SaveCommitment(blockNum uint64, rootHash []byte) error {
return sd.sdCtx.storeCommitmentState(blockNum, rootHash)
}

func (sd *SharedDomains) put(domain kv.Domain, key string, val []byte) {
// disable mutex - because work on parallel execution postponed after E3 release.
//sd.muMaps.Lock()
Expand Down Expand Up @@ -907,13 +918,18 @@ func (sd *SharedDomains) Flush(ctx context.Context, tx kv.RwTx) error {
sd.pastChangesAccumulator = make(map[string]*StateChangeSet)

defer mxFlushTook.ObserveDuration(time.Now())
fh, err := sd.ComputeCommitment(ctx, true, sd.BlockNum(), "flush-commitment")
if err != nil {
return err
}
if sd.trace {
_, f, l, _ := runtime.Caller(1)
fmt.Printf("[SD aggTx=%d] FLUSHING at tx %d [%x], caller %s:%d\n", sd.aggTx.id, sd.TxNum(), fh, filepath.Base(f), l)
var err error
if dbg.DiscardCommitment() {
sd.ResetCommitment()
} else {
fh, err := sd.ComputeCommitment(ctx, true, sd.BlockNum(), "flush-commitment")
if err != nil {
return err
}
if sd.trace {
_, f, l, _ := runtime.Caller(1)
fmt.Printf("[SD aggTx=%d] FLUSHING at tx %d [%x], caller %s:%d\n", sd.aggTx.id, sd.TxNum(), fh, filepath.Base(f), l)
}
}
for _, w := range sd.domainWriters {
if w == nil {
Expand Down Expand Up @@ -1119,7 +1135,7 @@ func (sdc *SharedDomainsCommitmentContext) SetLimitReadAsOfTxNum(txNum uint64) {
func NewSharedDomainsCommitmentContext(sd *SharedDomains, mode commitment.Mode, trieVariant commitment.TrieVariant) *SharedDomainsCommitmentContext {
ctx := &SharedDomainsCommitmentContext{
sharedDomains: sd,
discard: dbg.DiscardCommitment(),
discard: false,
branches: make(map[string]cachedBranch),
keccak: sha3.NewLegacyKeccak256().(cryptozerocopy.KeccakState),
}
Expand Down Expand Up @@ -1295,10 +1311,6 @@ func (sdc *SharedDomainsCommitmentContext) TouchKey(d kv.Domain, key string, val

// Evaluates commitment for processed state.
func (sdc *SharedDomainsCommitmentContext) ComputeCommitment(ctx context.Context, saveState bool, blockNum uint64, logPrefix string) (rootHash []byte, err error) {
if dbg.DiscardCommitment() {
sdc.updates.Reset()
return nil, nil
}
sdc.ResetBranchCache()
defer sdc.ResetBranchCache()

Expand Down
35 changes: 21 additions & 14 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,12 @@ Loop:
aggTx := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx)
aggTx.RestrictSubsetFileDeletions(true)
start := time.Now()
if _, err := doms.ComputeCommitment(ctx, true, blockNum, execStage.LogPrefix()); err != nil {
return err
if dbg.DiscardCommitment() {
_ = doms.SaveCommitment(blockNum, b.Root().Bytes())
} else {
if _, err := doms.ComputeCommitment(ctx, true, blockNum, execStage.LogPrefix()); err != nil {
return err
}
}
ts += time.Since(start)
aggTx.RestrictSubsetFileDeletions(false)
Expand Down Expand Up @@ -923,27 +927,30 @@ func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyT
return false, errors.New("header is nil")
}

if dbg.DiscardCommitment() {
return true, nil
}
if doms.BlockNum() != header.Number.Uint64() {
panic(fmt.Errorf("%d != %d", doms.BlockNum(), header.Number.Uint64()))
}

rh, err := doms.ComputeCommitment(ctx, true, header.Number.Uint64(), e.LogPrefix())
if err != nil {
return false, fmt.Errorf("StateV3.Apply: %w", err)
}
if cfg.blockProduction {
header.Root = common.BytesToHash(rh)
return true, nil
var rh []byte
if dbg.DiscardCommitment() {
doms.ResetCommitment()
_ = doms.SaveCommitment(doms.BlockNum(), header.Root.Bytes())
} else {
rh, err := doms.ComputeCommitment(ctx, true, header.Number.Uint64(), e.LogPrefix())
if err != nil {
return false, fmt.Errorf("StateV3.Apply: %w", err)
}
if cfg.blockProduction {
header.Root = common.BytesToHash(rh)
return true, nil
}
}
if bytes.Equal(rh, header.Root.Bytes()) {
if bytes.Equal(rh, header.Root.Bytes()) || dbg.DiscardCommitment() {
if !inMemExec {
if err := doms.Flush(ctx, applyTx); err != nil {
return false, err
}
if err = applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneCommitHistory(ctx, applyTx, nil); err != nil {
if err := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneCommitHistory(ctx, applyTx, nil); err != nil {
return false, err
}
}
Expand Down

0 comments on commit c0b2ecf

Please sign in to comment.