Skip to content

Commit

Permalink
blockchain: add chasing head for DA check;
Browse files Browse the repository at this point in the history
  • Loading branch information
galaio committed Mar 7, 2024
1 parent 9cde534 commit a93500b
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 5 deletions.
3 changes: 3 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type ChainHeaderReader interface {

// GetHighestVerifiedHeader retrieves the highest header verified.
GetHighestVerifiedHeader() *types.Header

// ChasingHead return the best chain head of peers.
ChasingHead() *types.Header
}

type VotePool interface {
Expand Down
8 changes: 6 additions & 2 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,12 @@ func (p *Parlia) IsDataAvailable(chain consensus.ChainHeaderReader, block *types
return nil
}
// only required to check within BlobReserveThreshold block's DA
currentHeader := chain.CurrentHeader()
if block.NumberU64() < currentHeader.Number.Uint64()-params.BlobReserveThreshold {
highest := chain.ChasingHead()
current := chain.CurrentHeader()
if highest == nil || highest.Number.Cmp(current.Number) < 0 {
highest = current
}
if block.NumberU64() < highest.Number.Uint64()-params.BlobReserveThreshold {
return nil
}

Expand Down
19 changes: 16 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ type BlockChain struct {
highestVerifiedHeader atomic.Pointer[types.Header]
currentBlock atomic.Pointer[types.Header] // Current head of the chain
currentSnapBlock atomic.Pointer[types.Header] // Current head of snap-sync
chasingHead atomic.Pointer[types.Header]

bodyCache *lru.Cache[common.Hash, *types.Body]
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
Expand Down Expand Up @@ -392,6 +393,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.highestVerifiedHeader.Store(nil)
bc.currentBlock.Store(nil)
bc.currentSnapBlock.Store(nil)
bc.chasingHead.Store(nil)

// Update chain info data metrics
chainInfoGauge.Update(metrics.GaugeInfoValue{"chain_id": bc.chainConfig.ChainID.String()})
Expand Down Expand Up @@ -1040,6 +1042,16 @@ func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
return nil
}

// UpdateChasingHead update remote best chain head, used by DA check now.
func (bc *BlockChain) UpdateChasingHead(head *types.Header) {
bc.chasingHead.Store(head)
}

// ChasingHead return the best chain head of peers.
func (bc *BlockChain) ChasingHead() *types.Header {
return bc.chasingHead.Load()
}

// Reset purges the entire blockchain, restoring it to its genesis state.
func (bc *BlockChain) Reset() error {
return bc.ResetWithGenesisBlock(bc.genesisBlock)
Expand Down Expand Up @@ -1383,8 +1395,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [

// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
// TODO(GalaIO): when sync the history block, it needs store blobs too.
// TODO(GalaIO): when sync the history block, it needs check DA & store blobs too.
//if isCancun() {
// posa.IsDataAvailable()
// writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db, blockChain, receiptChain, td, blobs)
//}
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
Expand Down Expand Up @@ -1465,8 +1478,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
// TODO(GalaIO): if enable cancun, need write blobs
// TODO(GalaIO): if enable cancun, need check DA & write blobs
//if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
// posa.IsDataAvailable()
// rawdb.WriteBlobs(batch, block.Hash(), block.NumberU64(), blobs)
//}

Expand Down Expand Up @@ -1825,7 +1839,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
// TODO(GalaIO): if enable cancun, it must set received blob cache for check, remove cache when failed
func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) {
// If the chain is terminating, don't even bother starting up.
if bc.insertStopped() {
Expand Down
4 changes: 4 additions & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,7 @@ func (cm *chainMaker) GetTd(hash common.Hash, number uint64) *big.Int {
func (cm *chainMaker) GetHighestVerifiedHeader() *types.Header {
panic("not supported")
}

func (cm *chainMaker) ChasingHead() *types.Header {
panic("not supported")
}
4 changes: 4 additions & 0 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ func (hc *HeaderChain) GetHighestVerifiedHeader() *types.Header {
return nil
}

func (hc *HeaderChain) ChasingHead() *types.Header {
return nil
}

// GetAncestor retrieves the Nth ancestor of a given block. It assumes that either the given block or
// a close ancestor of it is canonical. maxNonCanonical points to a downwards counter limiting the
// number of blocks to be individually checked before we reach the canonical chain.
Expand Down
5 changes: 5 additions & 0 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ type BlockChain interface {
// TrieDB retrieves the low level trie database used for interacting
// with trie nodes.
TrieDB() *trie.Database

// UpdateChasingHead update remote best chain head, used by DA check now.
UpdateChasingHead(head *types.Header)
}

type DownloadOption func(downloader *Downloader) *Downloader
Expand Down Expand Up @@ -590,6 +593,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
} else if mode == FullSync {
fetchers = append(fetchers, func() error { return d.processFullSyncContent(ttd, beaconMode) })
}
// update the chasing head
d.blockchain.UpdateChasingHead(remoteHeader)
return d.spawnSync(fetchers)
}

Expand Down

0 comments on commit a93500b

Please sign in to comment.