Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(op-node): support multi clients to fetch blobs #199

Merged
merged 19 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type L1EndpointSetup interface {
// The results of the RPC client may be trusted for faster processing, or strictly validated.
// The kind of the RPC may be non-basic, to optimize RPC usage.
Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.L1ClientConfig, err error)
SetupBlobClient(ctx context.Context, log log.Logger) ([]client.RPC, error)
Check() error
}

Expand Down Expand Up @@ -162,6 +163,37 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
return l1Node, rpcCfg, nil
}

func (cfg *L1EndpointConfig) SetupBlobClient(ctx context.Context, log log.Logger) ([]client.RPC, error) {
rpcClients := make([]client.RPC, 0)

opts := []client.RPCOption{
client.WithHttpPollInterval(cfg.HttpPollInterval),
client.WithDialBackoff(10),
}
if cfg.RateLimit != 0 {
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
}
isMultiUrl, urlList := service_client.MultiUrlParse(cfg.L1NodeAddr)

if isMultiUrl {
for _, url := range urlList {
rpcClient, err := client.NewRPC(ctx, log, url, opts...)
if err != nil {
return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", url, err)
}
rpcClients = append(rpcClients, rpcClient)
}
} else {
rpcClient, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...)
if err != nil {
return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
}
rpcClients = append(rpcClients, rpcClient)
}

return rpcClients, nil
}

func fallbackClientWrap(ctx context.Context, logger log.Logger, urlList []string, cfg *L1EndpointConfig, rollupCfg *rollup.Config, opts ...client.RPCOption) (client.RPC, *sources.L1ClientConfig, error) {
l1Node, err := client.NewRPC(ctx, logger, urlList[0], opts...)
if err != nil {
Expand Down Expand Up @@ -189,6 +221,11 @@ func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger, rollupCf
return p.Client, sources.L1ClientDefaultConfig(rollupCfg, p.TrustRPC, p.RPCProviderKind), nil
}

func (p *PreparedL1Endpoint) SetupBlobClient(ctx context.Context, log log.Logger) ([]client.RPC, error) {
// TODO add test
return nil, nil
owen-reorg marked this conversation as resolved.
Show resolved Hide resolved
}

func (cfg *PreparedL1Endpoint) Check() error {
if cfg.Client == nil {
return errors.New("rpc client cannot be nil")
Expand Down
20 changes: 19 additions & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
return nil
}

func (n *OpNode) initL1BlobClient(ctx context.Context, cfg *Config) (*sources.BSCBlobClient, error) {
rpcClients, err := cfg.L1.SetupBlobClient(ctx, n.log)
if err != nil {
return nil, fmt.Errorf("failed to setup L1 blob client: %w", err)
}
instrumentedClients := make([]client.RPC, 0)
for _, rpc := range rpcClients {
instrumentedClients = append(instrumentedClients, client.NewInstrumentedRPC(rpc, n.metrics))
}
return sources.NewBSCBlobClient(instrumentedClients), nil
}

func (n *OpNode) initL1BeaconAPI(ctx context.Context, cfg *Config) error {
// BSC use L1 client to fetch blobs
return nil
Expand Down Expand Up @@ -412,7 +424,13 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
} else {
n.safeDB = safedb.Disabled
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA)

bscBlobClient, err := n.initL1BlobClient(ctx, cfg)
owen-reorg marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to init bsc blob client: %w", err)
}

n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, bscBlobClient, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA)
return nil
}

Expand Down
103 changes: 103 additions & 0 deletions op-service/sources/blob_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package sources

import (
"context"
"errors"
"fmt"
"math/big"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
)

type BSCBlobClient struct {
// BSCBlobClient will rotate client.RPC in pool whenever a client runs into an error while fetching blobs
pool *ClientPool[client.RPC]
}

func NewBSCBlobClient(clients []client.RPC) *BSCBlobClient {
return &BSCBlobClient{
pool: NewClientPool[client.RPC](clients...),
}
}

func (s *BSCBlobClient) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) {
if len(hashes) == 0 {
return []*eth.Blob{}, nil
}

blobSidecars, err := s.GetBlobSidecars(ctx, ref)
if err != nil {
return nil, fmt.Errorf("failed to get blob sidecars for L1BlockRef %s: %w", ref, err)
}

validatedBlobs, err := validateBlobSidecars(blobSidecars, ref)
if err != nil {
return nil, fmt.Errorf("failed to validate blob sidecars for L1BlockRef %s: %w", ref, err)
}

blobs := make([]*eth.Blob, len(hashes))
for i, indexedBlobHash := range hashes {
blob, ok := validatedBlobs[indexedBlobHash.Hash]
if !ok {
return nil, fmt.Errorf("blob sidecars fetched from rpc mismatched with expected hash %s for L1BlockRef %s", indexedBlobHash.Hash, ref)
}
blobs[i] = blob
}
return blobs, nil
}

func (s *BSCBlobClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef) (eth.BSCBlobSidecars, error) {
var errs []error
for i := 0; i < s.pool.Len(); i++ {
var blobSidecars eth.BSCBlobSidecars

f := s.pool.Get()
owen-reorg marked this conversation as resolved.
Show resolved Hide resolved
err := f.CallContext(ctx, &blobSidecars, "eth_getBlobSidecars", numberID(ref.Number).Arg())
if err != nil {
s.pool.MoveToNext()
errs = append(errs, err)
} else {
if blobSidecars == nil || len(blobSidecars) == 0 {
err = ethereum.NotFound
errs = append(errs, err)
s.pool.MoveToNext()
} else {
return blobSidecars, nil
}
}
}
return nil, errors.Join(errs...)
}

func validateBlobSidecars(blobSidecars eth.BSCBlobSidecars, ref eth.L1BlockRef) (map[common.Hash]*eth.Blob, error) {
if len(blobSidecars) == 0 {
return nil, fmt.Errorf("invalidate api response, blob sidecars of block %s are empty", ref.Hash)
}
blobsMap := make(map[common.Hash]*eth.Blob)
for _, blobSidecar := range blobSidecars {
if blobSidecar.BlockNumber.ToInt().Cmp(big.NewInt(0).SetUint64(ref.Number)) != 0 {
return nil, fmt.Errorf("invalidate api response of tx %s, expect block number %d, got %d", blobSidecar.TxHash, ref.Number, blobSidecar.BlockNumber.ToInt().Uint64())
}
if blobSidecar.BlockHash.Cmp(ref.Hash) != 0 {
return nil, fmt.Errorf("invalidate api response of tx %s, expect block hash %s, got %s", blobSidecar.TxHash, ref.Hash, blobSidecar.BlockHash)
}
if len(blobSidecar.Blobs) == 0 || len(blobSidecar.Blobs) != len(blobSidecar.Commitments) || len(blobSidecar.Blobs) != len(blobSidecar.Proofs) {
return nil, fmt.Errorf("invalidate api response of tx %s,idx:%d, len of blobs(%d)/commitments(%d)/proofs(%d) is not equal or is 0", blobSidecar.TxHash, blobSidecar.TxIndex, len(blobSidecar.Blobs), len(blobSidecar.Commitments), len(blobSidecar.Proofs))
}

for i := 0; i < len(blobSidecar.Blobs); i++ {
// confirm blob data is valid by verifying its proof against the commitment
if err := eth.VerifyBlobProof(&blobSidecar.Blobs[i], kzg4844.Commitment(blobSidecar.Commitments[i]), kzg4844.Proof(blobSidecar.Proofs[i])); err != nil {
return nil, fmt.Errorf("blob of tx %s at index %d failed verification: %w", blobSidecar.TxHash, i, err)
}
// the blob's kzg commitment hashes
hash := eth.KZGToVersionedHash(kzg4844.Commitment(blobSidecar.Commitments[i]))
blobsMap[hash] = &blobSidecar.Blobs[i]
}
}
return blobsMap, nil
}
69 changes: 0 additions & 69 deletions op-service/sources/l1_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ package sources
import (
"context"
"fmt"
"math/big"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-node/rollup"
Expand Down Expand Up @@ -257,73 +255,6 @@ func (s *L1Client) ClearReceiptsCacheBefore(blockNumber uint64) {
s.recProvider.GetReceiptsCache().RemoveLessThan(blockNumber)
}

func (s *L1Client) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) {
if len(hashes) == 0 {
return []*eth.Blob{}, nil
}

blobSidecars, err := s.getBlobSidecars(ctx, ref)
if err != nil {
return nil, fmt.Errorf("failed to get blob sidecars for L1BlockRef %s: %w", ref, err)
}

validatedBlobs, err := validateBlobSidecars(blobSidecars, ref)
if err != nil {
return nil, fmt.Errorf("failed to validate blob sidecars for L1BlockRef %s: %w", ref, err)
}

blobs := make([]*eth.Blob, len(hashes))
for i, indexedBlobHash := range hashes {
blob, ok := validatedBlobs[indexedBlobHash.Hash]
if !ok {
return nil, fmt.Errorf("blob sidecars fetched from rpc mismatched with expected hash %s for L1BlockRef %s", indexedBlobHash.Hash, ref)
}
blobs[i] = blob
}
return blobs, nil
}

func (s *L1Client) getBlobSidecars(ctx context.Context, ref eth.L1BlockRef) (eth.BSCBlobSidecars, error) {
var blobSidecars eth.BSCBlobSidecars
err := s.client.CallContext(ctx, &blobSidecars, "eth_getBlobSidecars", numberID(ref.Number).Arg())
if err != nil {
return nil, err
}
if blobSidecars == nil {
return nil, ethereum.NotFound
}
return blobSidecars, nil
}

func validateBlobSidecars(blobSidecars eth.BSCBlobSidecars, ref eth.L1BlockRef) (map[common.Hash]*eth.Blob, error) {
if len(blobSidecars) == 0 {
return nil, fmt.Errorf("invalidate api response, blob sidecars of block %s are empty", ref.Hash)
}
blobsMap := make(map[common.Hash]*eth.Blob)
for _, blobSidecar := range blobSidecars {
if blobSidecar.BlockNumber.ToInt().Cmp(big.NewInt(0).SetUint64(ref.Number)) != 0 {
return nil, fmt.Errorf("invalidate api response of tx %s, expect block number %d, got %d", blobSidecar.TxHash, ref.Number, blobSidecar.BlockNumber.ToInt().Uint64())
}
if blobSidecar.BlockHash.Cmp(ref.Hash) != 0 {
return nil, fmt.Errorf("invalidate api response of tx %s, expect block hash %s, got %s", blobSidecar.TxHash, ref.Hash, blobSidecar.BlockHash)
}
if len(blobSidecar.Blobs) == 0 || len(blobSidecar.Blobs) != len(blobSidecar.Commitments) || len(blobSidecar.Blobs) != len(blobSidecar.Proofs) {
return nil, fmt.Errorf("invalidate api response of tx %s,idx:%d, len of blobs(%d)/commitments(%d)/proofs(%d) is not equal or is 0", blobSidecar.TxHash, blobSidecar.TxIndex, len(blobSidecar.Blobs), len(blobSidecar.Commitments), len(blobSidecar.Proofs))
}

for i := 0; i < len(blobSidecar.Blobs); i++ {
// confirm blob data is valid by verifying its proof against the commitment
if err := eth.VerifyBlobProof(&blobSidecar.Blobs[i], kzg4844.Commitment(blobSidecar.Commitments[i]), kzg4844.Proof(blobSidecar.Proofs[i])); err != nil {
return nil, fmt.Errorf("blob of tx %s at index %d failed verification: %w", blobSidecar.TxHash, i, err)
}
// the blob's kzg commitment hashes
hash := eth.KZGToVersionedHash(kzg4844.Commitment(blobSidecar.Commitments[i]))
blobsMap[hash] = &blobSidecar.Blobs[i]
}
}
return blobsMap, nil
}

func (s *L1Client) Close() {
close(s.done)
s.EthClient.Close()
Expand Down
Loading