Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support blob storage & miscs; #2229

Merged
merged 12 commits into from
Mar 4, 2024
2 changes: 1 addition & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,5 @@ type PoSA interface {
GetFinalizedHeader(chain ChainHeaderReader, header *types.Header) *types.Header
VerifyVote(chain ChainHeaderReader, vote *types.VoteEnvelope) error
IsActiveValidatorAt(chain ChainHeaderReader, header *types.Header, checkVoteKeyFn func(bLSPublicKey *types.BLSPublicKey) bool) bool
IsDataAvailable(chain ChainHeaderReader, block *types.Block, blobs types.BlobTxSidecars) error
IsDataAvailable(chain ChainHeaderReader, block *types.Block) error
}
3 changes: 2 additions & 1 deletion consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (p *Parlia) VerifyHeaders(chain consensus.ChainHeaderReader, headers []*typ
}

// IsDataAvailable it checks that the blobTx block has available blob data
func (p *Parlia) IsDataAvailable(chain consensus.ChainHeaderReader, block *types.Block, blobs types.BlobTxSidecars) error {
func (p *Parlia) IsDataAvailable(chain consensus.ChainHeaderReader, block *types.Block) error {
if !p.chainConfig.IsCancun(block.Number(), block.Time()) {
return nil
}
Expand All @@ -366,6 +366,7 @@ func (p *Parlia) IsDataAvailable(chain consensus.ChainHeaderReader, block *types
for _, tx := range block.Transactions() {
versionedHashes = append(versionedHashes, tx.BlobHashes())
}
blobs := block.Blobs()
if len(versionedHashes) != len(blobs) {
return errors.New("blobs do not match the versionedHashes length")
}
Expand Down
26 changes: 3 additions & 23 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ type BlockChain struct {
// Cache for the blocks that failed to pass MPT root verification
badBlockCache *lru.Cache[common.Hash, time.Time]

// blobs
receivedBlobsCache sync.Map // it saves received blobs for validation & storage

// trusted diff layers
diffLayerCache *exlru.Cache // Cache for the diffLayers
diffLayerChanCache *exlru.Cache // Cache for the difflayer channel
Expand Down Expand Up @@ -651,11 +648,6 @@ func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer, diffLayerCh cha

func (bc *BlockChain) cacheBlock(hash common.Hash, block *types.Block) {
bc.blockCache.Add(hash, block)
// try cache blob too
blob, ok := bc.receivedBlobsCache.Load(hash)
if ok {
bc.blobsCache.Add(hash, blob.(types.BlobTxSidecars))
}
}

// empty returns an indicator whether the blockchain is empty.
Expand Down Expand Up @@ -1545,9 +1537,7 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
rawdb.WriteBlock(batch, block)
// if enable cancun, it needs to write blobs too
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
blobs, _ := bc.receivedBlobsCache.Load(block.Hash())
rawdb.WriteBlobs(batch, block.Hash(), block.NumberU64(), blobs.(types.BlobTxSidecars))
bc.receivedBlobsCache.Delete(block.Hash())
rawdb.WriteBlobs(batch, block.Hash(), block.NumberU64(), block.Blobs())
}
if err := batch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
Expand Down Expand Up @@ -1593,13 +1583,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
// if enable cancun, it needs to write blobs too
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
blobs, exist := bc.receivedBlobsCache.Load(block.Hash())
if exist {
rawdb.WriteBlobs(blockBatch, block.Hash(), block.NumberU64(), blobs.(types.BlobTxSidecars))
} else {
rawdb.WriteBlobs(blockBatch, block.Hash(), block.NumberU64(), nil)
bc.receivedBlobsCache.Delete(block.Hash())
}
rawdb.WriteBlobs(blockBatch, block.Hash(), block.NumberU64(), block.Blobs())
}
rawdb.WritePreimages(blockBatch, state.Preimages())
if err := blockBatch.Write(); err != nil {
Expand Down Expand Up @@ -2026,11 +2010,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
// TODO(GalaIO): move IsDataAvailable combine into verifyHeaders?
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
if posa, ok := bc.engine.(consensus.PoSA); ok {
blobs, exist := bc.receivedBlobsCache.Load(block.Hash())
if !exist {
return it.index, fmt.Errorf("cannot find the target block's blob info, block: %v, hash: %v", block.NumberU64(), block.Hash())
}
if err = posa.IsDataAvailable(bc, block, blobs.(types.BlobTxSidecars)); err != nil {
if err = posa.IsDataAvailable(bc, block); err != nil {
return it.index, err
}
}
Expand Down
18 changes: 18 additions & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ type Block struct {
// inter-peer block relay.
ReceivedAt time.Time
ReceivedFrom interface{}

// blobs provides DA check
blobs BlobTxSidecars
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weren't sidecars supposed to be together with block only for propagation purposes? Why it got added inside the block itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The block struct may be a good glue struct, we can set blobs into it, and transfer them to the related component. There are other extra fields in here to record metadata.

We also need another extra blob with the block for p2p messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there's any process that is done with blocks passing apart from broadcasting. If there is any such process then it will slow down the work due to block becoming very heavy due to blobs.

We also need another extra blob with the block for p2p messages.

If we are including sidecar inside block definition itself then we won't need to make any changes for the p2p messages and the current mechanism only will need to check for Cancun or not and act accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we can discuss more about the definition. The block could transfer to InsertChain, IsDataAvailbility for checking block and blob, block is a glue struct, it contains header, block body, blobs, and other metadata. So we can remove receovedBlobCache too.

Block is passed by pointer, so it will cost little in most scenarios. It is only be used in NewBlockMsg, but only header, body will be encoded.

func (b *Block) EncodeRLP(w io.Writer) error {
	return rlp.Encode(w, &extblock{
		Header:      b.header,
		Txs:         b.transactions,
		Uncles:      b.uncles,
		Withdrawals: b.withdrawals,
	})
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only be used in NewBlockMsg, but only header, body will be encoded.

If we encode the sidecars as well then we may not even need to modify BlockWithExtraData and have this definition of block everywhere except when it comes to storing. As storing of sidecars and blocks will be separate. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a good idea.

}

// "external" block encoding. used for eth protocol, etc.
Expand Down Expand Up @@ -451,6 +454,10 @@ func (b *Block) SanityCheck() error {
return b.header.SanityCheck()
}

func (b *Block) Blobs() BlobTxSidecars {
return b.blobs
}

type writeCounter uint64

func (c *writeCounter) Write(b []byte) (int, error) {
Expand Down Expand Up @@ -512,6 +519,17 @@ func (b *Block) WithWithdrawals(withdrawals []*Withdrawal) *Block {
return block
}

// WithBlobs returns a block containing the given blobs.
func (b *Block) WithBlobs(blobs BlobTxSidecars) *Block {
block := &Block{
header: b.header,
transactions: b.transactions,
uncles: b.uncles,
blobs: blobs,
}
return block
}

// Hash returns the keccak256 hash of b's header.
// The hash is computed on the first call and cached thereafter.
func (b *Block) Hash() common.Hash {
Expand Down
62 changes: 47 additions & 15 deletions core/types/tx_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"crypto/ecdsa"
"fmt"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -93,23 +94,54 @@ func createEmptyBlobTx(key *ecdsa.PrivateKey, withSidecar bool) *Transaction {
}

func TestBlobTxSidecars_Encode(t *testing.T) {
bs := BlobTxSidecars{
&BlobTxSidecar{
Blobs: []kzg4844.Blob{emptyBlob},
Commitments: []kzg4844.Commitment{emptyBlobCommit},
Proofs: []kzg4844.Proof{emptyBlobProof},
tests := []struct {
raw BlobTxSidecars
err bool
}{
{
raw: BlobTxSidecars{
&BlobTxSidecar{
Blobs: []kzg4844.Blob{emptyBlob},
Commitments: []kzg4844.Commitment{emptyBlobCommit},
Proofs: []kzg4844.Proof{emptyBlobProof},
},
&BlobTxSidecar{
Blobs: []kzg4844.Blob{emptyBlob},
Commitments: []kzg4844.Commitment{emptyBlobCommit},
Proofs: []kzg4844.Proof{emptyBlobProof},
},
},
err: false,
},
&BlobTxSidecar{
Blobs: []kzg4844.Blob{emptyBlob},
Commitments: []kzg4844.Commitment{emptyBlobCommit},
Proofs: []kzg4844.Proof{emptyBlobProof},
{
raw: BlobTxSidecars{
&BlobTxSidecar{
Blobs: []kzg4844.Blob{emptyBlob},
Commitments: []kzg4844.Commitment{emptyBlobCommit},
Proofs: []kzg4844.Proof{emptyBlobProof},
},
nil,
},
err: true,
},
{
raw: BlobTxSidecars{},
err: false,
},
}

enc, err := rlp.EncodeToBytes(bs)
require.NoError(t, err)
var nbs BlobTxSidecars
err = rlp.DecodeBytes(enc, &nbs)
require.NoError(t, err)
require.Equal(t, bs, nbs)
for i, item := range tests {
t.Run(fmt.Sprintf("case%d", i), func(t *testing.T) {
enc, err := rlp.EncodeToBytes(item.raw)
require.NoError(t, err)
var nbs BlobTxSidecars
err = rlp.DecodeBytes(enc, &nbs)
if item.err {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, item.raw, nbs)
})
}
}
Loading