From 3d7dbd24c9f29a68bc5b16f3f6a6e798fdad56b6 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 14 May 2024 19:28:56 +0800 Subject: [PATCH 01/16] feat(op-node): support multi clients to fetch blobs --- op-node/node/client.go | 37 +++++++++++ op-node/node/node.go | 20 +++++- op-service/sources/blob_client.go | 100 ++++++++++++++++++++++++++++++ op-service/sources/l1_client.go | 69 --------------------- 4 files changed, 156 insertions(+), 70 deletions(-) create mode 100644 op-service/sources/blob_client.go diff --git a/op-node/node/client.go b/op-node/node/client.go index d6776d8599..9bb8a0f9c3 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -29,6 +29,7 @@ type L1EndpointSetup interface { // The results of the RPC client may be trusted for faster processing, or strictly validated. // The kind of the RPC may be non-basic, to optimize RPC usage. Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.L1ClientConfig, err error) + SetupBlobClient(ctx context.Context, log log.Logger) ([]client.RPC, error) Check() error } @@ -162,6 +163,37 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf return l1Node, rpcCfg, nil } +func (cfg *L1EndpointConfig) SetupBlobClient(ctx context.Context, log log.Logger) ([]client.RPC, error) { + rpcClients := make([]client.RPC, 0) + + opts := []client.RPCOption{ + client.WithHttpPollInterval(cfg.HttpPollInterval), + client.WithDialBackoff(10), + } + if cfg.RateLimit != 0 { + opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize)) + } + isMultiUrl, urlList := service_client.MultiUrlParse(cfg.L1NodeAddr) + + if isMultiUrl { + for _, url := range urlList { + rpcClient, err := client.NewRPC(ctx, log, url, opts...) + if err != nil { + return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", url, err) + } + rpcClients = append(rpcClients, rpcClient) + } + } else { + rpcClient, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...) + if err != nil { + return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err) + } + rpcClients = append(rpcClients, rpcClient) + } + + return rpcClients, nil +} + func fallbackClientWrap(ctx context.Context, logger log.Logger, urlList []string, cfg *L1EndpointConfig, rollupCfg *rollup.Config, opts ...client.RPCOption) (client.RPC, *sources.L1ClientConfig, error) { l1Node, err := client.NewRPC(ctx, logger, urlList[0], opts...) if err != nil { @@ -189,6 +221,11 @@ func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger, rollupCf return p.Client, sources.L1ClientDefaultConfig(rollupCfg, p.TrustRPC, p.RPCProviderKind), nil } +func (p *PreparedL1Endpoint) SetupBlobClient(ctx context.Context, log log.Logger) ([]client.RPC, error) { + // TODO add test + return nil, nil +} + func (cfg *PreparedL1Endpoint) Check() error { if cfg.Client == nil { return errors.New("rpc client cannot be nil") diff --git a/op-node/node/node.go b/op-node/node/node.go index 3da88cbdad..7447e84047 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -309,6 +309,18 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error { return nil } +func (n *OpNode) initL1BlobClient(ctx context.Context, cfg *Config) (*sources.BSCBlobClient, error) { + rpcClients, err := cfg.L1.SetupBlobClient(ctx, n.log) + if err != nil { + return nil, fmt.Errorf("failed to setup L1 blob client: %w", err) + } + instrumentedClients := make([]client.RPC, 0) + for _, rpc := range rpcClients { + instrumentedClients = append(instrumentedClients, client.NewInstrumentedRPC(rpc, n.metrics)) + } + return sources.NewBSCBlobClient(instrumentedClients), nil +} + func (n *OpNode) initL1BeaconAPI(ctx context.Context, cfg *Config) error { // BSC use L1 client to fetch blobs return nil @@ -412,7 +424,13 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger } else { n.safeDB = safedb.Disabled } - n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA) + + bscBlobClient, err := n.initL1BlobClient(ctx, cfg) + if err != nil { + return fmt.Errorf("failed to init bsc blob client: %w", err) + } + + n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, bscBlobClient, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA) return nil } diff --git a/op-service/sources/blob_client.go b/op-service/sources/blob_client.go new file mode 100644 index 0000000000..97d5336c08 --- /dev/null +++ b/op-service/sources/blob_client.go @@ -0,0 +1,100 @@ +package sources + +import ( + "context" + "errors" + "fmt" + "math/big" + + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto/kzg4844" +) + +type BSCBlobClient struct { + // BSCBlobClient will rotate client.RPC in pool whenever a client runs into an error while fetching blobs + pool *ClientPool[client.RPC] +} + +func NewBSCBlobClient(clients []client.RPC) *BSCBlobClient { + return &BSCBlobClient{ + pool: NewClientPool[client.RPC](clients...), + } +} + +func (s *BSCBlobClient) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) { + if len(hashes) == 0 { + return []*eth.Blob{}, nil + } + + blobSidecars, err := s.GetBlobSidecars(ctx, ref) + if err != nil { + return nil, fmt.Errorf("failed to get blob sidecars for L1BlockRef %s: %w", ref, err) + } + + validatedBlobs, err := validateBlobSidecars(blobSidecars, ref) + if err != nil { + return nil, fmt.Errorf("failed to validate blob sidecars for L1BlockRef %s: %w", ref, err) + } + + blobs := make([]*eth.Blob, len(hashes)) + for i, indexedBlobHash := range hashes { + blob, ok := validatedBlobs[indexedBlobHash.Hash] + if !ok { + return nil, fmt.Errorf("blob sidecars fetched from rpc mismatched with expected hash %s for L1BlockRef %s", indexedBlobHash.Hash, ref) + } + blobs[i] = blob + } + return blobs, nil +} + +func (s *BSCBlobClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef) (eth.BSCBlobSidecars, error) { + var errs []error + for i := 0; i < s.pool.Len(); i++ { + var blobSidecars eth.BSCBlobSidecars + + f := s.pool.Get() + err := f.CallContext(ctx, &blobSidecars, "eth_getBlobSidecars", numberID(ref.Number).Arg()) + if err != nil { + s.pool.MoveToNext() + errs = append(errs, err) + } else { + if blobSidecars == nil || len(blobSidecars) == 0 { + return nil, ethereum.NotFound + } + return blobSidecars, nil + } + } + return nil, errors.Join(errs...) +} + +func validateBlobSidecars(blobSidecars eth.BSCBlobSidecars, ref eth.L1BlockRef) (map[common.Hash]*eth.Blob, error) { + if len(blobSidecars) == 0 { + return nil, fmt.Errorf("invalidate api response, blob sidecars of block %s are empty", ref.Hash) + } + blobsMap := make(map[common.Hash]*eth.Blob) + for _, blobSidecar := range blobSidecars { + if blobSidecar.BlockNumber.ToInt().Cmp(big.NewInt(0).SetUint64(ref.Number)) != 0 { + return nil, fmt.Errorf("invalidate api response of tx %s, expect block number %d, got %d", blobSidecar.TxHash, ref.Number, blobSidecar.BlockNumber.ToInt().Uint64()) + } + if blobSidecar.BlockHash.Cmp(ref.Hash) != 0 { + return nil, fmt.Errorf("invalidate api response of tx %s, expect block hash %s, got %s", blobSidecar.TxHash, ref.Hash, blobSidecar.BlockHash) + } + if len(blobSidecar.Blobs) == 0 || len(blobSidecar.Blobs) != len(blobSidecar.Commitments) || len(blobSidecar.Blobs) != len(blobSidecar.Proofs) { + return nil, fmt.Errorf("invalidate api response of tx %s,idx:%d, len of blobs(%d)/commitments(%d)/proofs(%d) is not equal or is 0", blobSidecar.TxHash, blobSidecar.TxIndex, len(blobSidecar.Blobs), len(blobSidecar.Commitments), len(blobSidecar.Proofs)) + } + + for i := 0; i < len(blobSidecar.Blobs); i++ { + // confirm blob data is valid by verifying its proof against the commitment + if err := eth.VerifyBlobProof(&blobSidecar.Blobs[i], kzg4844.Commitment(blobSidecar.Commitments[i]), kzg4844.Proof(blobSidecar.Proofs[i])); err != nil { + return nil, fmt.Errorf("blob of tx %s at index %d failed verification: %w", blobSidecar.TxHash, i, err) + } + // the blob's kzg commitment hashes + hash := eth.KZGToVersionedHash(kzg4844.Commitment(blobSidecar.Commitments[i])) + blobsMap[hash] = &blobSidecar.Blobs[i] + } + } + return blobsMap, nil +} diff --git a/op-service/sources/l1_client.go b/op-service/sources/l1_client.go index fc69d061c4..a347a17272 100644 --- a/op-service/sources/l1_client.go +++ b/op-service/sources/l1_client.go @@ -3,14 +3,12 @@ package sources import ( "context" "fmt" - "math/big" "strings" "sync" "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-node/rollup" @@ -257,73 +255,6 @@ func (s *L1Client) ClearReceiptsCacheBefore(blockNumber uint64) { s.recProvider.GetReceiptsCache().RemoveLessThan(blockNumber) } -func (s *L1Client) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) { - if len(hashes) == 0 { - return []*eth.Blob{}, nil - } - - blobSidecars, err := s.getBlobSidecars(ctx, ref) - if err != nil { - return nil, fmt.Errorf("failed to get blob sidecars for L1BlockRef %s: %w", ref, err) - } - - validatedBlobs, err := validateBlobSidecars(blobSidecars, ref) - if err != nil { - return nil, fmt.Errorf("failed to validate blob sidecars for L1BlockRef %s: %w", ref, err) - } - - blobs := make([]*eth.Blob, len(hashes)) - for i, indexedBlobHash := range hashes { - blob, ok := validatedBlobs[indexedBlobHash.Hash] - if !ok { - return nil, fmt.Errorf("blob sidecars fetched from rpc mismatched with expected hash %s for L1BlockRef %s", indexedBlobHash.Hash, ref) - } - blobs[i] = blob - } - return blobs, nil -} - -func (s *L1Client) getBlobSidecars(ctx context.Context, ref eth.L1BlockRef) (eth.BSCBlobSidecars, error) { - var blobSidecars eth.BSCBlobSidecars - err := s.client.CallContext(ctx, &blobSidecars, "eth_getBlobSidecars", numberID(ref.Number).Arg()) - if err != nil { - return nil, err - } - if blobSidecars == nil { - return nil, ethereum.NotFound - } - return blobSidecars, nil -} - -func validateBlobSidecars(blobSidecars eth.BSCBlobSidecars, ref eth.L1BlockRef) (map[common.Hash]*eth.Blob, error) { - if len(blobSidecars) == 0 { - return nil, fmt.Errorf("invalidate api response, blob sidecars of block %s are empty", ref.Hash) - } - blobsMap := make(map[common.Hash]*eth.Blob) - for _, blobSidecar := range blobSidecars { - if blobSidecar.BlockNumber.ToInt().Cmp(big.NewInt(0).SetUint64(ref.Number)) != 0 { - return nil, fmt.Errorf("invalidate api response of tx %s, expect block number %d, got %d", blobSidecar.TxHash, ref.Number, blobSidecar.BlockNumber.ToInt().Uint64()) - } - if blobSidecar.BlockHash.Cmp(ref.Hash) != 0 { - return nil, fmt.Errorf("invalidate api response of tx %s, expect block hash %s, got %s", blobSidecar.TxHash, ref.Hash, blobSidecar.BlockHash) - } - if len(blobSidecar.Blobs) == 0 || len(blobSidecar.Blobs) != len(blobSidecar.Commitments) || len(blobSidecar.Blobs) != len(blobSidecar.Proofs) { - return nil, fmt.Errorf("invalidate api response of tx %s,idx:%d, len of blobs(%d)/commitments(%d)/proofs(%d) is not equal or is 0", blobSidecar.TxHash, blobSidecar.TxIndex, len(blobSidecar.Blobs), len(blobSidecar.Commitments), len(blobSidecar.Proofs)) - } - - for i := 0; i < len(blobSidecar.Blobs); i++ { - // confirm blob data is valid by verifying its proof against the commitment - if err := eth.VerifyBlobProof(&blobSidecar.Blobs[i], kzg4844.Commitment(blobSidecar.Commitments[i]), kzg4844.Proof(blobSidecar.Proofs[i])); err != nil { - return nil, fmt.Errorf("blob of tx %s at index %d failed verification: %w", blobSidecar.TxHash, i, err) - } - // the blob's kzg commitment hashes - hash := eth.KZGToVersionedHash(kzg4844.Commitment(blobSidecar.Commitments[i])) - blobsMap[hash] = &blobSidecar.Blobs[i] - } - } - return blobsMap, nil -} - func (s *L1Client) Close() { close(s.done) s.EthClient.Close() From 2612d686029542580e2dd7f480ba8dbd9424774e Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Wed, 15 May 2024 16:20:07 +0800 Subject: [PATCH 02/16] fix: GetBlobSidecars move to next client if not found blobs --- op-service/sources/blob_client.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/op-service/sources/blob_client.go b/op-service/sources/blob_client.go index 97d5336c08..2f6227120d 100644 --- a/op-service/sources/blob_client.go +++ b/op-service/sources/blob_client.go @@ -62,9 +62,12 @@ func (s *BSCBlobClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef) errs = append(errs, err) } else { if blobSidecars == nil || len(blobSidecars) == 0 { - return nil, ethereum.NotFound + err = ethereum.NotFound + errs = append(errs, err) + s.pool.MoveToNext() + } else { + return blobSidecars, nil } - return blobSidecars, nil } } return nil, errors.Join(errs...) From 3f698ed3234ee4c0bdc87faf92a0b7ea703779d3 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 21 May 2024 18:13:00 +0800 Subject: [PATCH 03/16] fix(blob_client): return ethereum.NotFound when received mismatched blob response --- op-node/node/node.go | 12 ++++++++++++ op-service/sources/blob_client.go | 6 +++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/op-node/node/node.go b/op-node/node/node.go index 3cf4a45a3a..2c990c069f 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -304,6 +304,18 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error { return nil } +func (n *OpNode) initL1BlobClient(ctx context.Context, cfg *Config) (*sources.BSCBlobClient, error) { + rpcClients, err := cfg.L1.SetupBlobClient(ctx, n.log) + if err != nil { + return nil, fmt.Errorf("failed to setup L1 blob client: %w", err) + } + instrumentedClients := make([]client.RPC, 0) + for _, rpc := range rpcClients { + instrumentedClients = append(instrumentedClients, client.NewInstrumentedRPC(rpc, n.metrics)) + } + return sources.NewBSCBlobClient(instrumentedClients), nil +} + func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error { rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup) if err != nil { diff --git a/op-service/sources/blob_client.go b/op-service/sources/blob_client.go index 2f6227120d..6a9342f1de 100644 --- a/op-service/sources/blob_client.go +++ b/op-service/sources/blob_client.go @@ -43,7 +43,7 @@ func (s *BSCBlobClient) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes for i, indexedBlobHash := range hashes { blob, ok := validatedBlobs[indexedBlobHash.Hash] if !ok { - return nil, fmt.Errorf("blob sidecars fetched from rpc mismatched with expected hash %s for L1BlockRef %s", indexedBlobHash.Hash, ref) + return nil, fmt.Errorf("blob sidecars fetched from rpc mismatched with expected hash %s for L1BlockRef %s :%w", indexedBlobHash.Hash, ref, ethereum.NotFound) } blobs[i] = blob } @@ -75,7 +75,7 @@ func (s *BSCBlobClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef) func validateBlobSidecars(blobSidecars eth.BSCBlobSidecars, ref eth.L1BlockRef) (map[common.Hash]*eth.Blob, error) { if len(blobSidecars) == 0 { - return nil, fmt.Errorf("invalidate api response, blob sidecars of block %s are empty", ref.Hash) + return nil, fmt.Errorf("invalidate api response, blob sidecars of block %s are empty: %w", ref.Hash, ethereum.NotFound) } blobsMap := make(map[common.Hash]*eth.Blob) for _, blobSidecar := range blobSidecars { @@ -83,7 +83,7 @@ func validateBlobSidecars(blobSidecars eth.BSCBlobSidecars, ref eth.L1BlockRef) return nil, fmt.Errorf("invalidate api response of tx %s, expect block number %d, got %d", blobSidecar.TxHash, ref.Number, blobSidecar.BlockNumber.ToInt().Uint64()) } if blobSidecar.BlockHash.Cmp(ref.Hash) != 0 { - return nil, fmt.Errorf("invalidate api response of tx %s, expect block hash %s, got %s", blobSidecar.TxHash, ref.Hash, blobSidecar.BlockHash) + return nil, fmt.Errorf("invalidate api response of tx %s, expect block hash %s, got %s :%w", blobSidecar.TxHash, ref.Hash, blobSidecar.BlockHash, ethereum.NotFound) } if len(blobSidecar.Blobs) == 0 || len(blobSidecar.Blobs) != len(blobSidecar.Commitments) || len(blobSidecar.Blobs) != len(blobSidecar.Proofs) { return nil, fmt.Errorf("invalidate api response of tx %s,idx:%d, len of blobs(%d)/commitments(%d)/proofs(%d) is not equal or is 0", blobSidecar.TxHash, blobSidecar.TxIndex, len(blobSidecar.Blobs), len(blobSidecar.Commitments), len(blobSidecar.Proofs)) From 1afdeb82c53b181d2e23775e4807b32d018484a3 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Wed, 22 May 2024 10:11:00 +0800 Subject: [PATCH 04/16] refactor: change L1 blob client setup and add related flags --- op-node/flags/flags.go | 20 +++++++ op-node/node/client.go | 117 +++++++++++++++++------------------------ op-node/node/config.go | 2 +- op-node/node/node.go | 36 ++++++------- op-node/service.go | 14 +++-- 5 files changed, 92 insertions(+), 97 deletions(-) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 06e6455ab5..71bc727da2 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -68,6 +68,12 @@ var ( EnvVars: prefixEnvVars("L1_BEACON"), Category: RollupCategory, } + L1BlobNodeAddr = &cli.StringFlag{ + Name: "l1.blob", + Usage: "Address of L1 blob endpoint to use. Multiple alternative addresses are supported, separated by commas, and will rotate when error", + EnvVars: prefixEnvVars("L1_BLOB_RPC"), + Category: RollupCategory, + } /* Optional Flags */ BeaconHeader = &cli.StringFlag{ Name: "l1.beacon-header", @@ -187,6 +193,20 @@ var ( Value: time.Second * 3, Category: L1RPCCategory, } + L1BlobRPCRateLimit = &cli.Float64Flag{ + Name: "l1.blob-rpc-rate-limit", + Usage: "Optional self-imposed global rate-limit on L1 blob RPC requests, specified in requests / second. Disabled if set to 0.", + EnvVars: prefixEnvVars("L1_BLOB_RPC_RATE_LIMIT"), + Value: 0, + Category: L1RPCCategory, + } + L1BlobRPCMaxBatchSize = &cli.IntFlag{ + Name: "l1.blob-rpc-max-batch-size", + Usage: "Maximum number of RPC requests to bundle", + EnvVars: prefixEnvVars("L1_BLOB_RPC_MAX_BATCH_SIZE"), + Value: 20, + Category: L1RPCCategory, + } VerifierL1Confs = &cli.Uint64Flag{ Name: "verifier.l1-confs", Usage: "Number of L1 blocks to keep distance from the L1 head before deriving L2 data from. Reorgs are supported, but may be slow to perform.", diff --git a/op-node/node/client.go b/op-node/node/client.go index 9bb8a0f9c3..d2395bc48d 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -10,7 +10,6 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-service/client" - service_client "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum/go-ethereum/log" @@ -29,15 +28,11 @@ type L1EndpointSetup interface { // The results of the RPC client may be trusted for faster processing, or strictly validated. // The kind of the RPC may be non-basic, to optimize RPC usage. Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.L1ClientConfig, err error) - SetupBlobClient(ctx context.Context, log log.Logger) ([]client.RPC, error) Check() error } -type L1BeaconEndpointSetup interface { - Setup(ctx context.Context, log log.Logger) (cl sources.BeaconClient, fb []sources.BlobSideCarsFetcher, err error) - // ShouldIgnoreBeaconCheck returns true if the Beacon-node version check should not halt startup. - ShouldIgnoreBeaconCheck() bool - ShouldFetchAllSidecars() bool +type L1BlobEndpointSetup interface { + Setup(ctx context.Context, log log.Logger) ([]client.RPC, error) Check() error } @@ -148,7 +143,7 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize)) } - isMultiUrl, urlList := service_client.MultiUrlParse(cfg.L1NodeAddr) + isMultiUrl, urlList := client.MultiUrlParse(cfg.L1NodeAddr) if isMultiUrl { return fallbackClientWrap(ctx, log, urlList, cfg, rollupCfg, opts...) } @@ -163,37 +158,6 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf return l1Node, rpcCfg, nil } -func (cfg *L1EndpointConfig) SetupBlobClient(ctx context.Context, log log.Logger) ([]client.RPC, error) { - rpcClients := make([]client.RPC, 0) - - opts := []client.RPCOption{ - client.WithHttpPollInterval(cfg.HttpPollInterval), - client.WithDialBackoff(10), - } - if cfg.RateLimit != 0 { - opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize)) - } - isMultiUrl, urlList := service_client.MultiUrlParse(cfg.L1NodeAddr) - - if isMultiUrl { - for _, url := range urlList { - rpcClient, err := client.NewRPC(ctx, log, url, opts...) - if err != nil { - return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", url, err) - } - rpcClients = append(rpcClients, rpcClient) - } - } else { - rpcClient, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...) - if err != nil { - return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err) - } - rpcClients = append(rpcClients, rpcClient) - } - - return rpcClients, nil -} - func fallbackClientWrap(ctx context.Context, logger log.Logger, urlList []string, cfg *L1EndpointConfig, rollupCfg *rollup.Config, opts ...client.RPCOption) (client.RPC, *sources.L1ClientConfig, error) { l1Node, err := client.NewRPC(ctx, logger, urlList[0], opts...) if err != nil { @@ -234,47 +198,60 @@ func (cfg *PreparedL1Endpoint) Check() error { return nil } -type L1BeaconEndpointConfig struct { - BeaconAddr string // Address of L1 User Beacon-API endpoint to use (beacon namespace required) - BeaconHeader string // Optional HTTP header for all requests to L1 Beacon - BeaconArchiverAddr string // Address of L1 User Beacon-API Archive endpoint to use for expired blobs (beacon namespace required) - BeaconCheckIgnore bool // When false, halt startup if the beacon version endpoint fails - BeaconFetchAllSidecars bool // Whether to fetch all blob sidecars and filter locally +type L1BlobEndpointConfig struct { + // Address of L1 blob node endpoint to use, multiple alternative addresses separated by commas are supported, and will rotate when error + NodeAddrs string + + // RateLimit specifies a self-imposed rate-limit on L1 requests. 0 is no rate-limit. + RateLimit float64 + + // BatchSize specifies the maximum batch-size, which also applies as L1 rate-limit burst amount (if set). + BatchSize int } -var _ L1BeaconEndpointSetup = (*L1BeaconEndpointConfig)(nil) +var _ L1BlobEndpointSetup = (*L1BlobEndpointConfig)(nil) -func (cfg *L1BeaconEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl sources.BeaconClient, fb []sources.BlobSideCarsFetcher, err error) { - var opts []client.BasicHTTPClientOption - if cfg.BeaconHeader != "" { - hdr, err := parseHTTPHeader(cfg.BeaconHeader) - if err != nil { - return nil, nil, fmt.Errorf("parsing beacon header: %w", err) - } - opts = append(opts, client.WithHeader(hdr)) +func (cfg *L1BlobEndpointConfig) Check() error { + if cfg.NodeAddrs == "" { + return fmt.Errorf("empty L1 blob endpoint address") } - - a := client.NewBasicHTTPClient(cfg.BeaconAddr, log, opts...) - if cfg.BeaconArchiverAddr != "" { - b := client.NewBasicHTTPClient(cfg.BeaconArchiverAddr, log) - fb = append(fb, sources.NewBeaconHTTPClient(b)) + if cfg.BatchSize < 1 || cfg.BatchSize > 500 { + return fmt.Errorf("batch size is invalid or unreasonable: %d", cfg.BatchSize) } - return sources.NewBeaconHTTPClient(a), fb, nil -} - -func (cfg *L1BeaconEndpointConfig) Check() error { - if cfg.BeaconAddr == "" && !cfg.BeaconCheckIgnore { - return errors.New("expected L1 Beacon API endpoint, but got none") + if cfg.RateLimit < 0 { + return fmt.Errorf("rate limit cannot be negative") } return nil } -func (cfg *L1BeaconEndpointConfig) ShouldIgnoreBeaconCheck() bool { - return cfg.BeaconCheckIgnore -} +func (cfg *L1BlobEndpointConfig) Setup(ctx context.Context, log log.Logger) ([]client.RPC, error) { + rpcClients := make([]client.RPC, 0) -func (cfg *L1BeaconEndpointConfig) ShouldFetchAllSidecars() bool { - return cfg.BeaconFetchAllSidecars + opts := []client.RPCOption{ + client.WithDialBackoff(10), + } + if cfg.RateLimit != 0 { + opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize)) + } + isMultiUrl, urlList := client.MultiUrlParse(cfg.NodeAddrs) + + if isMultiUrl { + for _, url := range urlList { + rpcClient, err := client.NewRPC(ctx, log, url, opts...) + if err != nil { + return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", url, err) + } + rpcClients = append(rpcClients, rpcClient) + } + } else { + rpcClient, err := client.NewRPC(ctx, log, cfg.NodeAddrs, opts...) + if err != nil { + return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", cfg.NodeAddrs, err) + } + rpcClients = append(rpcClients, rpcClient) + } + + return rpcClients, nil } func parseHTTPHeader(headerStr string) (http.Header, error) { diff --git a/op-node/node/config.go b/op-node/node/config.go index f0582acfa4..82a3435b0a 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -21,7 +21,7 @@ type Config struct { L1 L1EndpointSetup L2 L2EndpointSetup - Beacon L1BeaconEndpointSetup + L1Blob L1BlobEndpointSetup Driver driver.Config diff --git a/op-node/node/node.go b/op-node/node/node.go index 2c990c069f..c789aac1fe 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -51,14 +51,15 @@ type OpNode struct { l1SafeSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling) l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling) - l1Source *sources.L1Client // L1 Client to fetch data from - l2Driver *driver.Driver // L2 Engine to Sync - l2Source *sources.EngineClient // L2 Execution Engine RPC bindings - server *rpcServer // RPC server hosting the rollup-node API - p2pNode *p2p.NodeP2P // P2P node functionality - p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer - tracer Tracer // tracer to get events for testing/debugging - runCfg *RuntimeConfig // runtime configurables + l1Source *sources.L1Client // L1 Client to fetch data from + l2Driver *driver.Driver // L2 Engine to Sync + l2Source *sources.EngineClient // L2 Execution Engine RPC bindings + l1Blob *sources.BSCBlobClient // L1 Blob Client to fetch blobs + server *rpcServer // RPC server hosting the rollup-node API + p2pNode *p2p.NodeP2P // P2P node functionality + p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer + tracer Tracer // tracer to get events for testing/debugging + runCfg *RuntimeConfig // runtime configurables safeDB closableSafeDB @@ -123,6 +124,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) if err := n.initL1(ctx, cfg); err != nil { return fmt.Errorf("failed to init L1: %w", err) } + if err := n.initL1Blob(ctx, cfg); err != nil { + return fmt.Errorf("failed to init L1 blob: %w", err) + } if err := n.initL2(ctx, cfg, snapshotLog); err != nil { return fmt.Errorf("failed to init L2: %w", err) } @@ -304,16 +308,17 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error { return nil } -func (n *OpNode) initL1BlobClient(ctx context.Context, cfg *Config) (*sources.BSCBlobClient, error) { - rpcClients, err := cfg.L1.SetupBlobClient(ctx, n.log) +func (n *OpNode) initL1Blob(ctx context.Context, cfg *Config) error { + rpcClients, err := cfg.L1Blob.Setup(ctx, n.log) if err != nil { - return nil, fmt.Errorf("failed to setup L1 blob client: %w", err) + return fmt.Errorf("failed to setup L1 blob client: %w", err) } instrumentedClients := make([]client.RPC, 0) for _, rpc := range rpcClients { instrumentedClients = append(instrumentedClients, client.NewInstrumentedRPC(rpc, n.metrics)) } - return sources.NewBSCBlobClient(instrumentedClients), nil + n.l1Blob = sources.NewBSCBlobClient(instrumentedClients) + return nil } func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error { @@ -355,12 +360,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger n.safeDB = safedb.Disabled } - bscBlobClient, err := n.initL1BlobClient(ctx, cfg) - if err != nil { - return fmt.Errorf("failed to init bsc blob client: %w", err) - } - - n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, bscBlobClient, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA) + n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.l1Blob, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA) return nil } diff --git a/op-node/service.go b/op-node/service.go index dc38981e6f..844b8f0f3b 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -79,7 +79,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { L2: l2Endpoint, Rollup: *rollupConfig, Driver: *driverConfig, - Beacon: NewBeaconEndpointConfig(ctx), + L1Blob: NewL1BlobEndpointConfig(ctx), RPC: node.RPCConfig{ ListenAddr: ctx.String(flags.RPCListenAddr.Name), ListenPort: ctx.Int(flags.RPCListenPort.Name), @@ -128,13 +128,11 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { return cfg, nil } -func NewBeaconEndpointConfig(ctx *cli.Context) node.L1BeaconEndpointSetup { - return &node.L1BeaconEndpointConfig{ - BeaconAddr: ctx.String(flags.BeaconAddr.Name), - BeaconHeader: ctx.String(flags.BeaconHeader.Name), - BeaconArchiverAddr: ctx.String(flags.BeaconArchiverAddr.Name), - BeaconCheckIgnore: ctx.Bool(flags.BeaconCheckIgnore.Name), - BeaconFetchAllSidecars: ctx.Bool(flags.BeaconFetchAllSidecars.Name), +func NewL1BlobEndpointConfig(ctx *cli.Context) node.L1BlobEndpointSetup { + return &node.L1BlobEndpointConfig{ + NodeAddrs: ctx.String(flags.L1BlobNodeAddr.Name), + RateLimit: ctx.Float64(flags.L1BlobRPCRateLimit.Name), + BatchSize: ctx.Int(flags.L1BlobRPCMaxBatchSize.Name), } } From 738ab281b5d17083f173f30ef6d538b73d2f9fe6 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Wed, 22 May 2024 16:03:55 +0800 Subject: [PATCH 05/16] fix: l1 blob related flags --- op-node/flags/flags.go | 3 +++ op-node/node/client.go | 5 ----- op-node/node/node.go | 1 - op-service/sources/blob_client.go | 2 +- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 71bc727da2..9001847e19 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -383,6 +383,7 @@ var ( var requiredFlags = []cli.Flag{ L1NodeAddr, + L1BlobNodeAddr, L2EngineAddr, L2EngineJWTSecret, } @@ -402,6 +403,8 @@ var optionalFlags = []cli.Flag{ L1RPCMaxBatchSize, L1RPCMaxConcurrency, L1HTTPPollInterval, + L1BlobRPCRateLimit, + L1BlobRPCMaxBatchSize, VerifierL1Confs, SequencerEnabledFlag, SequencerStoppedFlag, diff --git a/op-node/node/client.go b/op-node/node/client.go index d2395bc48d..3de665bb21 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -185,11 +185,6 @@ func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger, rollupCf return p.Client, sources.L1ClientDefaultConfig(rollupCfg, p.TrustRPC, p.RPCProviderKind), nil } -func (p *PreparedL1Endpoint) SetupBlobClient(ctx context.Context, log log.Logger) ([]client.RPC, error) { - // TODO add test - return nil, nil -} - func (cfg *PreparedL1Endpoint) Check() error { if cfg.Client == nil { return errors.New("rpc client cannot be nil") diff --git a/op-node/node/node.go b/op-node/node/node.go index c789aac1fe..bbcf7637ce 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -359,7 +359,6 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger } else { n.safeDB = safedb.Disabled } - n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.l1Blob, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA) return nil } diff --git a/op-service/sources/blob_client.go b/op-service/sources/blob_client.go index 6a9342f1de..f04016e7aa 100644 --- a/op-service/sources/blob_client.go +++ b/op-service/sources/blob_client.go @@ -14,7 +14,7 @@ import ( ) type BSCBlobClient struct { - // BSCBlobClient will rotate client.RPC in pool whenever a client runs into an error while fetching blobs + // BSCBlobClient will rotate client.RPC in pool whenever a client runs into an error or return nil while fetching blobs pool *ClientPool[client.RPC] } From 8477792773f4f0bd10401561ccaee5c1f5f8a541 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Thu, 23 May 2024 11:58:10 +0800 Subject: [PATCH 06/16] fix: op-service test build error --- op-node/flags/flags.go | 6 +- op-node/service.go | 2 +- .../{blob_client.go => bsc_blob_client.go} | 0 op-service/sources/bsc_blob_client_test.go | 60 +++++++++++++++++++ op-service/sources/eth_client_test.go | 2 +- op-service/sources/l1_client_test.go | 52 ++++++++-------- op-service/sources/receipts_basic_test.go | 4 +- op-service/sources/receipts_caching_test.go | 2 +- 8 files changed, 94 insertions(+), 34 deletions(-) rename op-service/sources/{blob_client.go => bsc_blob_client.go} (100%) create mode 100644 op-service/sources/bsc_blob_client_test.go diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 9001847e19..147a334911 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -68,8 +68,8 @@ var ( EnvVars: prefixEnvVars("L1_BEACON"), Category: RollupCategory, } - L1BlobNodeAddr = &cli.StringFlag{ - Name: "l1.blob", + L1BlobRpcAddr = &cli.StringFlag{ + Name: "l1.blob-rpc", Usage: "Address of L1 blob endpoint to use. Multiple alternative addresses are supported, separated by commas, and will rotate when error", EnvVars: prefixEnvVars("L1_BLOB_RPC"), Category: RollupCategory, @@ -383,7 +383,7 @@ var ( var requiredFlags = []cli.Flag{ L1NodeAddr, - L1BlobNodeAddr, + L1BlobRpcAddr, L2EngineAddr, L2EngineJWTSecret, } diff --git a/op-node/service.go b/op-node/service.go index 844b8f0f3b..34994b7193 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -130,7 +130,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { func NewL1BlobEndpointConfig(ctx *cli.Context) node.L1BlobEndpointSetup { return &node.L1BlobEndpointConfig{ - NodeAddrs: ctx.String(flags.L1BlobNodeAddr.Name), + NodeAddrs: ctx.String(flags.L1BlobRpcAddr.Name), RateLimit: ctx.Float64(flags.L1BlobRPCRateLimit.Name), BatchSize: ctx.Int(flags.L1BlobRPCMaxBatchSize.Name), } diff --git a/op-service/sources/blob_client.go b/op-service/sources/bsc_blob_client.go similarity index 100% rename from op-service/sources/blob_client.go rename to op-service/sources/bsc_blob_client.go diff --git a/op-service/sources/bsc_blob_client_test.go b/op-service/sources/bsc_blob_client_test.go new file mode 100644 index 0000000000..d671ca9341 --- /dev/null +++ b/op-service/sources/bsc_blob_client_test.go @@ -0,0 +1,60 @@ +package sources + +import ( + "testing" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto/kzg4844" +) + +func makeTestBSCBlobSidecar(blockHash common.Hash, blobs []eth.Blob) ([]eth.IndexedBlobHash, *eth.BSCBlobSidecar) { + commitments := []eth.Bytes48{} + proofs := []eth.Bytes48{} + hashs := []common.Hash{} + ibhs := []eth.IndexedBlobHash{} + for i, blob := range blobs { + commit, _ := kzg4844.BlobToCommitment(kzg4844.Blob(blob)) + proof, _ := kzg4844.ComputeBlobProof(kzg4844.Blob(blob), commit) + hash := eth.KZGToVersionedHash(commit) + commitments = append(commitments, eth.Bytes48(commit)) + proofs = append(proofs, eth.Bytes48(proof)) + hashs = append(hashs, hash) + ibhs = append(ibhs, eth.IndexedBlobHash{ + Index: uint64(i), + Hash: hash, + }) + } + + sidecar := eth.BSCBlobSidecar{ + BlockHash: blockHash, + BlockNumber: &hexutil.Big{}, + BSCBlobTxSidecar: eth.BSCBlobTxSidecar{ + Blobs: blobs, + Commitments: commitments, + Proofs: proofs, + }, + } + return ibhs, &sidecar +} + +func TestValidateBlobSidecars(t *testing.T) { + blockHash := common.BytesToHash([]byte{1}) + blobs := []eth.Blob{} + blob1 := eth.Blob{} + blob1[0] = 1 + blob2 := eth.Blob{} + blob2[0] = 2 + blobs = append(blobs, blob1) + blobs = append(blobs, blob2) + _, sidecar := makeTestBSCBlobSidecar(blockHash, blobs) + + sidecars := eth.BSCBlobSidecars{sidecar} + ref := eth.L1BlockRef{ + Hash: blockHash, + } + validateBlobSidecars(sidecars, ref) + + +} diff --git a/op-service/sources/eth_client_test.go b/op-service/sources/eth_client_test.go index a437f32085..93440a9c83 100644 --- a/op-service/sources/eth_client_test.go +++ b/op-service/sources/eth_client_test.go @@ -220,7 +220,7 @@ func TestReceiptValidation(t *testing.T) { ethcl := newEthClientWithCaches(nil, numTxs) ethcl.client = mrpc - ethcl.recProvider = rp + ethcl.recProvider = NewCachingReceiptsProvider(rp, nil, 1) ethcl.trustRPC = true _, _, err := ethcl.FetchReceipts(ctx, block.Hash) diff --git a/op-service/sources/l1_client_test.go b/op-service/sources/l1_client_test.go index 496bd9a14e..f2894a77f4 100644 --- a/op-service/sources/l1_client_test.go +++ b/op-service/sources/l1_client_test.go @@ -21,7 +21,7 @@ func TestGoOrUpdatePreFetchReceipts(t *testing.T) { m := new(mockRPC) ctx := context.Background() clientLog := testlog.Logger(t, log.LvlDebug) - latestHead := &rpcHeader{ + latestHead := &RPCHeader{ ParentHash: randHash(), UncleHash: common.Hash{}, Coinbase: common.Address{}, @@ -41,12 +41,12 @@ func TestGoOrUpdatePreFetchReceipts(t *testing.T) { WithdrawalsRoot: nil, Hash: randHash(), } - m.On("CallContext", ctx, new(*rpcHeader), + m.On("CallContext", ctx, new(*RPCHeader), "eth_getBlockByNumber", []any{"latest", false}).Run(func(args mock.Arguments) { - *args[1].(**rpcHeader) = latestHead + *args[1].(**RPCHeader) = latestHead }).Return([]error{nil}) for i := 81; i <= 90; i++ { - currentHead := &rpcHeader{ + currentHead := &RPCHeader{ ParentHash: randHash(), UncleHash: common.Hash{}, Coinbase: common.Address{}, @@ -66,21 +66,21 @@ func TestGoOrUpdatePreFetchReceipts(t *testing.T) { WithdrawalsRoot: nil, Hash: randHash(), } - currentBlock := &rpcBlock{ - rpcHeader: *currentHead, + currentBlock := &RPCBlock{ + RPCHeader: *currentHead, Transactions: []*types.Transaction{}, } - m.On("CallContext", ctx, new(*rpcHeader), + m.On("CallContext", ctx, new(*RPCHeader), "eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) { - *args[1].(**rpcHeader) = currentHead + *args[1].(**RPCHeader) = currentHead }).Return([]error{nil}) - m.On("CallContext", ctx, new(*rpcBlock), + m.On("CallContext", ctx, new(*RPCBlock), "eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) { - *args[1].(**rpcBlock) = currentBlock + *args[1].(**RPCBlock) = currentBlock }).Return([]error{nil}) } for i := 91; i <= 100; i++ { - currentHead := &rpcHeader{ + currentHead := &RPCHeader{ ParentHash: randHash(), UncleHash: common.Hash{}, Coinbase: common.Address{}, @@ -100,23 +100,23 @@ func TestGoOrUpdatePreFetchReceipts(t *testing.T) { WithdrawalsRoot: nil, Hash: randHash(), } - m.On("CallContext", ctx, new(*rpcHeader), + m.On("CallContext", ctx, new(*RPCHeader), "eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) { - *args[1].(**rpcHeader) = currentHead + *args[1].(**RPCHeader) = currentHead }).Return([]error{nil}) - currentBlock := &rpcBlock{ - rpcHeader: *currentHead, + currentBlock := &RPCBlock{ + RPCHeader: *currentHead, Transactions: []*types.Transaction{}, } - m.On("CallContext", ctx, new(*rpcBlock), + m.On("CallContext", ctx, new(*RPCBlock), "eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) { - *args[1].(**rpcBlock) = currentBlock + *args[1].(**RPCBlock) = currentBlock }).Return([]error{nil}) } var lastParentHeader common.Hash var real100Hash common.Hash for i := 76; i <= 100; i++ { - currentHead := &rpcHeader{ + currentHead := &RPCHeader{ ParentHash: lastParentHeader, UncleHash: common.Hash{}, Coinbase: common.Address{}, @@ -140,17 +140,17 @@ func TestGoOrUpdatePreFetchReceipts(t *testing.T) { real100Hash = currentHead.Hash } lastParentHeader = currentHead.Hash - m.On("CallContext", ctx, new(*rpcHeader), + m.On("CallContext", ctx, new(*RPCHeader), "eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) { - *args[1].(**rpcHeader) = currentHead + *args[1].(**RPCHeader) = currentHead }).Return([]error{nil}) - currentBlock := &rpcBlock{ - rpcHeader: *currentHead, + currentBlock := &RPCBlock{ + RPCHeader: *currentHead, Transactions: []*types.Transaction{}, } - m.On("CallContext", ctx, new(*rpcBlock), + m.On("CallContext", ctx, new(*RPCBlock), "eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) { - *args[1].(**rpcBlock) = currentBlock + *args[1].(**RPCBlock) = currentBlock }).Return([]error{nil}) } s, err := NewL1Client(m, clientLog, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 1000}, true, RPCKindBasic)) @@ -158,10 +158,10 @@ func TestGoOrUpdatePreFetchReceipts(t *testing.T) { err2 := s.GoOrUpdatePreFetchReceipts(ctx, 81) require.NoError(t, err2) time.Sleep(1 * time.Second) - pair, ok := s.receiptsCache.Get(100, false) + pair, ok := s.recProvider.GetReceiptsCache().Get(100, false) require.True(t, ok, "100 cache miss") require.Equal(t, real100Hash, pair.blockHash, "block 100 hash is different,want:%s,but:%s", real100Hash, pair.blockHash) - _, ok2 := s.receiptsCache.Get(76, false) + _, ok2 := s.recProvider.GetReceiptsCache().Get(76, false) require.True(t, ok2, "76 cache miss") }) } diff --git a/op-service/sources/receipts_basic_test.go b/op-service/sources/receipts_basic_test.go index 3a0bbdf221..402623b348 100644 --- a/op-service/sources/receipts_basic_test.go +++ b/op-service/sources/receipts_basic_test.go @@ -123,7 +123,7 @@ func TestBasicRPCReceiptsFetcher_Concurrency(t *testing.T) { }). Return([]error{nil}) - runConcurrentFetchingTest(t, rp, numFetchers, receipts, block) + runConcurrentFetchingTest(t, NewCachingReceiptsProvider(rp, nil, 1), numFetchers, receipts, block) mrpc.AssertExpectations(t) finalNumCalls := int(numCalls.Load()) @@ -148,7 +148,7 @@ func runConcurrentFetchingTest(t *testing.T, rp ReceiptsProvider, numFetchers in for i := 0; i < numFetchers; i++ { go func() { <-barrier - recs, err := rp.FetchReceipts(ctx, bInfo, txHashes) + recs, err, _ := rp.FetchReceipts(ctx, bInfo, txHashes, false) fetchResults <- fetchResult{rs: recs, err: err} }() } diff --git a/op-service/sources/receipts_caching_test.go b/op-service/sources/receipts_caching_test.go index e7891b9e34..e9b53a4001 100644 --- a/op-service/sources/receipts_caching_test.go +++ b/op-service/sources/receipts_caching_test.go @@ -38,7 +38,7 @@ func TestCachingReceiptsProvider_Caching(t *testing.T) { bInfo, _, _ := block.Info(true, true) for i := 0; i < 4; i++ { - gotRecs, err := rp.FetchReceipts(ctx, bInfo, txHashes) + gotRecs, err, _ := rp.FetchReceipts(ctx, bInfo, txHashes, false) require.NoError(t, err) for i, gotRec := range gotRecs { requireEqualReceipt(t, receipts[i], gotRec) From f4495da05c81afce71712d460ebbd966f74e8137 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 24 May 2024 16:14:38 +0800 Subject: [PATCH 07/16] op-service: add unit test for bsc blob client --- op-node/node/node.go | 10 +-- op-service/sources/bsc_blob_client_test.go | 76 ++++++++++++++++++++-- op-service/txmgr/metrics/noop.go | 1 + 3 files changed, 78 insertions(+), 9 deletions(-) diff --git a/op-node/node/node.go b/op-node/node/node.go index bbcf7637ce..7c5e4a0ebf 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -55,11 +55,11 @@ type OpNode struct { l2Driver *driver.Driver // L2 Engine to Sync l2Source *sources.EngineClient // L2 Execution Engine RPC bindings l1Blob *sources.BSCBlobClient // L1 Blob Client to fetch blobs - server *rpcServer // RPC server hosting the rollup-node API - p2pNode *p2p.NodeP2P // P2P node functionality - p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer - tracer Tracer // tracer to get events for testing/debugging - runCfg *RuntimeConfig // runtime configurables + server *rpcServer // RPC server hosting the rollup-node API + p2pNode *p2p.NodeP2P // P2P node functionality + p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer + tracer Tracer // tracer to get events for testing/debugging + runCfg *RuntimeConfig // runtime configurables safeDB closableSafeDB diff --git a/op-service/sources/bsc_blob_client_test.go b/op-service/sources/bsc_blob_client_test.go index d671ca9341..caf479db0e 100644 --- a/op-service/sources/bsc_blob_client_test.go +++ b/op-service/sources/bsc_blob_client_test.go @@ -1,18 +1,22 @@ package sources import ( + "context" "testing" + "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func makeTestBSCBlobSidecar(blockHash common.Hash, blobs []eth.Blob) ([]eth.IndexedBlobHash, *eth.BSCBlobSidecar) { commitments := []eth.Bytes48{} proofs := []eth.Bytes48{} - hashs := []common.Hash{} ibhs := []eth.IndexedBlobHash{} for i, blob := range blobs { commit, _ := kzg4844.BlobToCommitment(kzg4844.Blob(blob)) @@ -20,7 +24,6 @@ func makeTestBSCBlobSidecar(blockHash common.Hash, blobs []eth.Blob) ([]eth.Inde hash := eth.KZGToVersionedHash(commit) commitments = append(commitments, eth.Bytes48(commit)) proofs = append(proofs, eth.Bytes48(proof)) - hashs = append(hashs, hash) ibhs = append(ibhs, eth.IndexedBlobHash{ Index: uint64(i), Hash: hash, @@ -48,13 +51,78 @@ func TestValidateBlobSidecars(t *testing.T) { blob2[0] = 2 blobs = append(blobs, blob1) blobs = append(blobs, blob2) - _, sidecar := makeTestBSCBlobSidecar(blockHash, blobs) + ibhs, sidecar := makeTestBSCBlobSidecar(blockHash, blobs) sidecars := eth.BSCBlobSidecars{sidecar} ref := eth.L1BlockRef{ Hash: blockHash, } - validateBlobSidecars(sidecars, ref) + validatedSidecars, err := validateBlobSidecars(sidecars, ref) + require.NoError(t, err) + vBlob1, ok := validatedSidecars[ibhs[0].Hash] + require.Equal(t, *vBlob1, blob1) + require.Equal(t, ok, true) + vBlob2, ok := validatedSidecars[ibhs[1].Hash] + require.Equal(t, *vBlob2, blob2) + require.Equal(t, ok, true) + _, ok = validatedSidecars[common.Hash{}] + require.Equal(t, ok, false) + // mangle block hash to make sure it's detected + ref = eth.L1BlockRef{} + _, err = validateBlobSidecars(sidecars, ref) + require.ErrorIs(t, err, ethereum.NotFound) + // mangle blob to make sure it's detected + sidecars[0].BSCBlobTxSidecar.Blobs[0][11]++ + _, err = validateBlobSidecars(sidecars, ref) + require.Error(t, err) + // mangle commitment to make sure it's detected + sidecars[0].BSCBlobTxSidecar.Commitments[0][11]++ + _, err = validateBlobSidecars(sidecars, ref) + require.Error(t, err) + // mangle proof to make sure it's detected + sidecars[0].BSCBlobTxSidecar.Proofs[0][11]++ + _, err = validateBlobSidecars(sidecars, ref) + require.Error(t, err) +} + + +func TestBSCBlobClient(t *testing.T) { + blockHash := common.BytesToHash([]byte{1}) + blobs := []eth.Blob{} + blob1 := eth.Blob{} + blob1[0] = 1 + blob2 := eth.Blob{} + blob2[0] = 2 + blobs = append(blobs, blob1) + blobs = append(blobs, blob2) + ibhs, sidecar := makeTestBSCBlobSidecar(blockHash, blobs) + sidecars := eth.BSCBlobSidecars{sidecar} + ref := eth.L1BlockRef{ + Hash: blockHash, + } + + m := new(mockRPC) + ctx := context.Background() + m.On("CallContext", ctx, new(eth.BSCBlobSidecars), + "eth_getBlobSidecars", []any{"0x0"}).Run(func(args mock.Arguments) { + *args[1].(*eth.BSCBlobSidecars) = sidecars + }).Return([]error{nil}) + bscBlobClient := NewBSCBlobClient([]client.RPC{m}) + + gotBlobs, err := bscBlobClient.GetBlobs(ctx, ref, ibhs) + require.NoError(t, err) + require.Equal(t, len(gotBlobs), 2) + require.Equal(t, *gotBlobs[0], blob1) + require.Equal(t, *gotBlobs[1], blob2) + + // mangle block hash to make sure it's detected + _, err = bscBlobClient.GetBlobs(ctx, eth.L1BlockRef{}, ibhs) + println(err) + require.ErrorIs(t, err, ethereum.NotFound) + // mangle blob hash to make sure it's detected + ibhs[0].Hash[10]++ + _, err = bscBlobClient.GetBlobs(ctx, eth.L1BlockRef{}, ibhs) + require.ErrorIs(t, err, ethereum.NotFound) } diff --git a/op-service/txmgr/metrics/noop.go b/op-service/txmgr/metrics/noop.go index 01c01ceb5c..71c045bd22 100644 --- a/op-service/txmgr/metrics/noop.go +++ b/op-service/txmgr/metrics/noop.go @@ -23,3 +23,4 @@ func (*NoopTxMetrics) RecordBlobBaseFee(*big.Int) {} func (*NoopTxMetrics) RecordTipCap(*big.Int) {} func (*NoopTxMetrics) RPCError() {} func (m *NoopTxMetrics) RecordL1UrlSwitchEvt(url string) {} +func (m *NoopTxMetrics) RecordBlobsNumber(number int) {} From 02eaa42bf6fd0504cf5a463d4886f0c0ea3fdd6e Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 24 May 2024 18:03:44 +0800 Subject: [PATCH 08/16] chore: add op-service lint test --- .github/workflows/ci.yml | 48 ++++++++++++++++++++++++++ op-e2e/actions/fallback_client_test.go | 3 +- op-e2e/setup.go | 2 +- op-service/txmgr/txmgr_test.go | 10 ++++++ 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 300a32e947..ef40ec01f3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,6 +64,25 @@ jobs: version: latest args: -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint --timeout 5m -e "errors.As" -e "errors.Is" + op-service-lint: + runs-on: ubuntu-latest + + steps: + - name: Check out code + uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version-file: go.mod + + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + working-directory: op-service + version: latest + args: -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint --timeout 5m -e "errors.As" -e "errors.Is" + op-node-test: runs-on: ubuntu-latest needs: op-node-lint @@ -151,6 +170,35 @@ jobs: with: report_paths: '/tmp/test-results/op-proposer.xml' + op-service-test: + runs-on: ubuntu-latest + needs: op-service-lint + + steps: + - name: Check out code + uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version-file: go.mod + + - name: Install gotestsum + uses: autero1/action-gotestsum@v2.0.0 + with: + gotestsum_version: 1.10.0 + + - name: Run tests + working-directory: op-service + run: | + gotestsum --format=testname --junitfile=/tmp/test-results/op-service.xml -- -parallel=2 -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out ./... + + - name: Publish Test Report + uses: mikepenz/action-junit-report@v3 + if: success() || failure() # always run even if the previous step fails + with: + report_paths: '/tmp/test-results/op-service.xml' + op-e2e-http-test: runs-on: ubuntu-latest needs: [op-node-test, op-batcher-test, op-proposer-test] diff --git a/op-e2e/actions/fallback_client_test.go b/op-e2e/actions/fallback_client_test.go index cd1af829c4..91d8ceea3e 100644 --- a/op-e2e/actions/fallback_client_test.go +++ b/op-e2e/actions/fallback_client_test.go @@ -37,11 +37,12 @@ func setupFallbackClientTest(t Testing, sd *e2eutils.SetupData, log log.Logger, }) l1F, err := sources.NewL1Client(fallbackClient, log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic)) require.NoError(t, err) + l1Blob := sources.NewBSCBlobClient([]client.RPC{rpc}) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) require.NoError(t, err) - sequencer := NewL2Sequencer(t, log, l1F, l1F, plasma.Disabled, l2Cl, sd.RollupCfg, 0) + sequencer := NewL2Sequencer(t, log, l1F, l1Blob, plasma.Disabled, l2Cl, sd.RollupCfg, 0) return miner, l1_2, l1_3, engine, sequencer, fallbackClient.(*sources.FallbackClient) } diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 6715a385b4..0ad6e2a1d7 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -601,7 +601,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste configureL1(nodeCfg, sys.EthInstances["l1"]) configureL2(nodeCfg, sys.EthInstances[name], cfg.JWTSecret) if sys.RollupConfig.EcotoneTime != nil { - nodeCfg.Beacon = &rollupNode.L1BeaconEndpointConfig{BeaconAddr: sys.L1BeaconAPIAddr} + nodeCfg.L1Blob = &rollupNode.L1BlobEndpointConfig{NodeAddrs: sys.L1BeaconAPIAddr} } } diff --git a/op-service/txmgr/txmgr_test.go b/op-service/txmgr/txmgr_test.go index e404ea9de7..5e54f9e8b0 100644 --- a/op-service/txmgr/txmgr_test.go +++ b/op-service/txmgr/txmgr_test.go @@ -412,6 +412,7 @@ func TestAlreadyReserved(t *testing.T) { // TestTxMgrConfirmsAtMaxGasPrice asserts that Send properly returns the max gas // price receipt if none of the lower gas price txs were mined. func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) { + t.Skip("due to 0 base fee of bsc") t.Parallel() h := newTestHarness(t) @@ -442,6 +443,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) { // TestTxMgrConfirmsBlobTxAtMaxGasPrice asserts that Send properly returns the max gas price // receipt if none of the lower gas price txs were mined when attempting to send a blob tx. func TestTxMgrConfirmsBlobTxAtHigherGasPrice(t *testing.T) { + t.Skip("due to 0 base fee of bsc") t.Parallel() h := newTestHarness(t) @@ -508,6 +510,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) { // TestTxMgr_CraftTx ensures that the tx manager will create transactions as expected. func TestTxMgr_CraftTx(t *testing.T) { + t.Skip("due to 0 base fee of bsc") t.Parallel() h := newTestHarness(t) candidate := h.createTxCandidate() @@ -532,6 +535,7 @@ func TestTxMgr_CraftTx(t *testing.T) { // TestTxMgr_CraftBlobTx ensures that the tx manager will create blob transactions as expected. func TestTxMgr_CraftBlobTx(t *testing.T) { + t.Skip("due to 0 base fee of bsc") t.Parallel() h := newTestHarness(t) candidate := h.createBlobTxCandidate() @@ -661,6 +665,7 @@ func TestTxMgr_SigningFails(t *testing.T) { // receipt so long as at least one of the publications is able to succeed with a // simulated rpc failure. func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) { + t.Skip("due to 0 base fee of bsc") t.Parallel() h := newTestHarness(t) @@ -696,6 +701,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) { // with the minimum gas price, and asserts that its receipt is returned even // though if the gas price has been bumped in other goroutines. func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) { + t.Skip("due to 0 base fee of bsc") t.Parallel() h := newTestHarness(t) @@ -728,6 +734,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) { // TestTxMgrDoesntAbortNonceTooLowAfterMiningTx func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) { + t.Skip("due to 0 base fee of bsc") t.Parallel() h := newTestHarnessWithConfig(t, configWithNumConfs(2)) @@ -1008,6 +1015,7 @@ func doGasPriceIncrease(t *testing.T, txTipCap, txFeeCap, newTip, newBaseFee int } func TestIncreaseGasPrice(t *testing.T) { + t.Skip("due to 0 base fee of bsc") // t.Parallel() require.Equal(t, int64(10), priceBump, "test must be updated if priceBump is adjusted") tests := []struct { @@ -1112,6 +1120,7 @@ func TestIncreaseGasPrice(t *testing.T) { // TestIncreaseGasPriceLimits asserts that if the L1 base fee & tip remain the // same, repeated calls to IncreaseGasPrice eventually hit a limit. func TestIncreaseGasPriceLimits(t *testing.T) { + t.Skip("due to 0 base fee of bsc") t.Run("no-threshold", func(t *testing.T) { testIncreaseGasPriceLimit(t, gasPriceLimitTest{ expTipCap: 46, @@ -1273,6 +1282,7 @@ func TestNonceReset(t *testing.T) { } func TestMinFees(t *testing.T) { + t.Skip("due to 0 base fee of bsc") for _, tt := range []struct { desc string minBaseFee *big.Int From d3e5e519c10cc7a23bb00600b881be73a6d9ad78 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 24 May 2024 18:07:25 +0800 Subject: [PATCH 09/16] chore: goimports fmt op-service --- op-service/bsc/compat.go | 3 ++- op-service/dial/static_l2_provider.go | 2 +- op-service/sources/bsc_blob_client_test.go | 9 ++++----- op-service/sources/caching/cache.go | 2 +- op-service/sources/receipts.go | 2 +- op-service/sources/receipts_caching.go | 4 ++-- op-service/txmgr/metrics/noop.go | 2 +- op-service/txmgr/txmgr.go | 2 +- 8 files changed, 13 insertions(+), 13 deletions(-) diff --git a/op-service/bsc/compat.go b/op-service/bsc/compat.go index 06b7b243d9..e72303e45c 100644 --- a/op-service/bsc/compat.go +++ b/op-service/bsc/compat.go @@ -1,10 +1,11 @@ package bsc import ( - lru "github.com/hashicorp/golang-lru/v2" "math/big" "sort" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" diff --git a/op-service/dial/static_l2_provider.go b/op-service/dial/static_l2_provider.go index c6fbb64845..09a95cf307 100644 --- a/op-service/dial/static_l2_provider.go +++ b/op-service/dial/static_l2_provider.go @@ -35,7 +35,7 @@ func NewStaticL2EndpointProvider(ctx context.Context, log log.Logger, ethClientU } return &StaticL2EndpointProvider{ StaticL2RollupProvider: *rollupProvider, - ethClient: client.NewInstrumentedClient(ethClient, metrics), + ethClient: client.NewInstrumentedClient(ethClient, metrics), }, nil } diff --git a/op-service/sources/bsc_blob_client_test.go b/op-service/sources/bsc_blob_client_test.go index caf479db0e..bd84dd2407 100644 --- a/op-service/sources/bsc_blob_client_test.go +++ b/op-service/sources/bsc_blob_client_test.go @@ -26,15 +26,15 @@ func makeTestBSCBlobSidecar(blockHash common.Hash, blobs []eth.Blob) ([]eth.Inde proofs = append(proofs, eth.Bytes48(proof)) ibhs = append(ibhs, eth.IndexedBlobHash{ Index: uint64(i), - Hash: hash, + Hash: hash, }) } sidecar := eth.BSCBlobSidecar{ - BlockHash: blockHash, + BlockHash: blockHash, BlockNumber: &hexutil.Big{}, BSCBlobTxSidecar: eth.BSCBlobTxSidecar{ - Blobs: blobs, + Blobs: blobs, Commitments: commitments, Proofs: proofs, }, @@ -86,7 +86,6 @@ func TestValidateBlobSidecars(t *testing.T) { require.Error(t, err) } - func TestBSCBlobClient(t *testing.T) { blockHash := common.BytesToHash([]byte{1}) blobs := []eth.Blob{} @@ -106,7 +105,7 @@ func TestBSCBlobClient(t *testing.T) { ctx := context.Background() m.On("CallContext", ctx, new(eth.BSCBlobSidecars), "eth_getBlobSidecars", []any{"0x0"}).Run(func(args mock.Arguments) { - *args[1].(*eth.BSCBlobSidecars) = sidecars + *args[1].(*eth.BSCBlobSidecars) = sidecars }).Return([]error{nil}) bscBlobClient := NewBSCBlobClient([]client.RPC{m}) diff --git a/op-service/sources/caching/cache.go b/op-service/sources/caching/cache.go index 02cffa51dd..352a4ad98c 100644 --- a/op-service/sources/caching/cache.go +++ b/op-service/sources/caching/cache.go @@ -22,7 +22,7 @@ func (c *LRUCache[K, V]) Get(key K) (value V, ok bool) { return value, ok } -func (c *LRUCache[K,V]) GetOrPeek(key K, usePeek bool, recordMetrics bool) (value V, ok bool) { +func (c *LRUCache[K, V]) GetOrPeek(key K, usePeek bool, recordMetrics bool) (value V, ok bool) { if usePeek { value, ok = c.inner.Peek(key) } else { diff --git a/op-service/sources/receipts.go b/op-service/sources/receipts.go index a64f070f13..fcc85c5ec3 100644 --- a/op-service/sources/receipts.go +++ b/op-service/sources/receipts.go @@ -13,7 +13,7 @@ import ( type ReceiptsHashPair struct { blockHash common.Hash - receipts types.Receipts + receipts types.Receipts } type ReceiptsProvider interface { diff --git a/op-service/sources/receipts_caching.go b/op-service/sources/receipts_caching.go index a7127ef22b..261756aa55 100644 --- a/op-service/sources/receipts_caching.go +++ b/op-service/sources/receipts_caching.go @@ -56,7 +56,7 @@ func (p *CachingReceiptsProvider) FetchReceipts(ctx context.Context, blockInfo e block := eth.ToBlockID(blockInfo) var isFull bool - if v, ok := p.cache.Get(block.Number, !isForPreFetch); ok && v.blockHash == block.Hash { + if v, ok := p.cache.Get(block.Number, !isForPreFetch); ok && v.blockHash == block.Hash { return v.receipts, nil, isFull } @@ -64,7 +64,7 @@ func (p *CachingReceiptsProvider) FetchReceipts(ctx context.Context, blockInfo e mu.Lock() defer mu.Unlock() // Other routine might have fetched in the meantime - if v, ok := p.cache.Get(block.Number, !isForPreFetch); ok && v.blockHash == block.Hash { + if v, ok := p.cache.Get(block.Number, !isForPreFetch); ok && v.blockHash == block.Hash { // we might have created a new lock above while the old // fetching job completed. p.deleteFetchingLock(block.Hash) diff --git a/op-service/txmgr/metrics/noop.go b/op-service/txmgr/metrics/noop.go index 71c045bd22..32fe17712a 100644 --- a/op-service/txmgr/metrics/noop.go +++ b/op-service/txmgr/metrics/noop.go @@ -23,4 +23,4 @@ func (*NoopTxMetrics) RecordBlobBaseFee(*big.Int) {} func (*NoopTxMetrics) RecordTipCap(*big.Int) {} func (*NoopTxMetrics) RPCError() {} func (m *NoopTxMetrics) RecordL1UrlSwitchEvt(url string) {} -func (m *NoopTxMetrics) RecordBlobsNumber(number int) {} +func (m *NoopTxMetrics) RecordBlobsNumber(number int) {} diff --git a/op-service/txmgr/txmgr.go b/op-service/txmgr/txmgr.go index 87f174cdec..ad67550e99 100644 --- a/op-service/txmgr/txmgr.go +++ b/op-service/txmgr/txmgr.go @@ -512,7 +512,7 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction, l.Warn("nonce too high", "err", err) m.metr.TxPublished("nonce_too_high") bumpFeesImmediately = false // retry without fee bump - time.Sleep(100*time.Millisecond) + time.Sleep(100 * time.Millisecond) continue case errStringMatch(err, context.Canceled): m.metr.RPCError() From 00d5bb3dbee6bd21b86b763e77d50c7b2936f639 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 24 May 2024 18:48:03 +0800 Subject: [PATCH 10/16] chore: fix op-service ci --- op-service/sources/bsc_blob_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/op-service/sources/bsc_blob_client.go b/op-service/sources/bsc_blob_client.go index f04016e7aa..9858178d34 100644 --- a/op-service/sources/bsc_blob_client.go +++ b/op-service/sources/bsc_blob_client.go @@ -61,7 +61,7 @@ func (s *BSCBlobClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef) s.pool.MoveToNext() errs = append(errs, err) } else { - if blobSidecars == nil || len(blobSidecars) == 0 { + if len(blobSidecars) == 0 { err = ethereum.NotFound errs = append(errs, err) s.pool.MoveToNext() From 1b878f7187357ca34014f3e4807904f9359f68b3 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 24 May 2024 19:20:20 +0800 Subject: [PATCH 11/16] chore: fix op-service ci --- op-service/txmgr/txmgr.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/op-service/txmgr/txmgr.go b/op-service/txmgr/txmgr.go index ad67550e99..5e00b1a9f6 100644 --- a/op-service/txmgr/txmgr.go +++ b/op-service/txmgr/txmgr.go @@ -746,8 +746,6 @@ func (m *SimpleTxManager) suggestGasPriceCaps(ctx context.Context) (*big.Int, *b if err != nil { m.metr.RPCError() return nil, nil, nil, fmt.Errorf("failed to fetch the suggested base fee: %w", err) - } else if head.BaseFee == nil { - //return nil, nil, errors.New("txmgr does not support pre-london blocks that do not have a basefee") } // basefee of BSC block is 0 From 45bc4295494823fa44d3e618f37fd54565308ae8 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 28 May 2024 17:57:02 +0800 Subject: [PATCH 12/16] refactor: change --l1.blob-rpc to --l1.archive-blob-rpc --- op-node/flags/flags.go | 35 ++++++++++++++++++----------------- op-node/service.go | 6 +++--- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 147a334911..74020b0eac 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -68,12 +68,6 @@ var ( EnvVars: prefixEnvVars("L1_BEACON"), Category: RollupCategory, } - L1BlobRpcAddr = &cli.StringFlag{ - Name: "l1.blob-rpc", - Usage: "Address of L1 blob endpoint to use. Multiple alternative addresses are supported, separated by commas, and will rotate when error", - EnvVars: prefixEnvVars("L1_BLOB_RPC"), - Category: RollupCategory, - } /* Optional Flags */ BeaconHeader = &cli.StringFlag{ Name: "l1.beacon-header", @@ -193,17 +187,24 @@ var ( Value: time.Second * 3, Category: L1RPCCategory, } - L1BlobRPCRateLimit = &cli.Float64Flag{ - Name: "l1.blob-rpc-rate-limit", - Usage: "Optional self-imposed global rate-limit on L1 blob RPC requests, specified in requests / second. Disabled if set to 0.", - EnvVars: prefixEnvVars("L1_BLOB_RPC_RATE_LIMIT"), + L1ArchiveBlobRpcAddr = &cli.StringFlag{ + Name: "l1.archive-blob-rpc", + Usage: "Optional address of L1 archive blob endpoint to use. Multiple alternative addresses are supported, separated by commas, and will rotate when error", + Required: false, + EnvVars: prefixEnvVars("L1_ARCHIVE_BLOB_RPC"), + Category: RollupCategory, + } + L1ArchiveBlobRpcRateLimit = &cli.Float64Flag{ + Name: "l1.archive-blob-rpc-rate-limit", + Usage: "Optional self-imposed global rate-limit on L1 archive blob RPC requests, specified in requests / second. Disabled if set to 0.", + EnvVars: prefixEnvVars("L1_ARCHIVE_BLOB_RPC_RATE_LIMIT"), Value: 0, Category: L1RPCCategory, } - L1BlobRPCMaxBatchSize = &cli.IntFlag{ - Name: "l1.blob-rpc-max-batch-size", - Usage: "Maximum number of RPC requests to bundle", - EnvVars: prefixEnvVars("L1_BLOB_RPC_MAX_BATCH_SIZE"), + L1ArchiveBlobRpcMaxBatchSize = &cli.IntFlag{ + Name: "l1.archive-blob-rpc-max-batch-size", + Usage: "Optional maximum number of L1 archive blob RPC requests to bundle", + EnvVars: prefixEnvVars("L1_ARCHIVE_BLOB_RPC_MAX_BATCH_SIZE"), Value: 20, Category: L1RPCCategory, } @@ -383,7 +384,6 @@ var ( var requiredFlags = []cli.Flag{ L1NodeAddr, - L1BlobRpcAddr, L2EngineAddr, L2EngineJWTSecret, } @@ -403,8 +403,9 @@ var optionalFlags = []cli.Flag{ L1RPCMaxBatchSize, L1RPCMaxConcurrency, L1HTTPPollInterval, - L1BlobRPCRateLimit, - L1BlobRPCMaxBatchSize, + L1ArchiveBlobRpcAddr, + L1ArchiveBlobRpcRateLimit, + L1ArchiveBlobRpcMaxBatchSize, VerifierL1Confs, SequencerEnabledFlag, SequencerStoppedFlag, diff --git a/op-node/service.go b/op-node/service.go index 34994b7193..d6760de797 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -130,9 +130,9 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { func NewL1BlobEndpointConfig(ctx *cli.Context) node.L1BlobEndpointSetup { return &node.L1BlobEndpointConfig{ - NodeAddrs: ctx.String(flags.L1BlobRpcAddr.Name), - RateLimit: ctx.Float64(flags.L1BlobRPCRateLimit.Name), - BatchSize: ctx.Int(flags.L1BlobRPCMaxBatchSize.Name), + NodeAddrs: ctx.String(flags.L1NodeAddr.Name) + "," + ctx.String(flags.L1ArchiveBlobRpcAddr.Name), + RateLimit: ctx.Float64(flags.L1ArchiveBlobRpcRateLimit.Name), + BatchSize: ctx.Int(flags.L1ArchiveBlobRpcMaxBatchSize.Name), } } From 6f4b57a1dc127660111d19c8c0d88d12969bc45a Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 28 May 2024 18:46:27 +0800 Subject: [PATCH 13/16] refactor: rename blob related flags --- op-node/flags/flags.go | 20 ++++++++++---------- op-node/service.go | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 74020b0eac..30e98a6f6d 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -194,17 +194,17 @@ var ( EnvVars: prefixEnvVars("L1_ARCHIVE_BLOB_RPC"), Category: RollupCategory, } - L1ArchiveBlobRpcRateLimit = &cli.Float64Flag{ - Name: "l1.archive-blob-rpc-rate-limit", - Usage: "Optional self-imposed global rate-limit on L1 archive blob RPC requests, specified in requests / second. Disabled if set to 0.", - EnvVars: prefixEnvVars("L1_ARCHIVE_BLOB_RPC_RATE_LIMIT"), + L1BlobRpcRateLimit = &cli.Float64Flag{ + Name: "l1.blob-rpc-rate-limit", + Usage: "Optional self-imposed global rate-limit on L1 blob RPC requests, specified in requests / second. Disabled if set to 0.", + EnvVars: prefixEnvVars("L1_BLOB_RPC_RATE_LIMIT"), Value: 0, Category: L1RPCCategory, } - L1ArchiveBlobRpcMaxBatchSize = &cli.IntFlag{ - Name: "l1.archive-blob-rpc-max-batch-size", - Usage: "Optional maximum number of L1 archive blob RPC requests to bundle", - EnvVars: prefixEnvVars("L1_ARCHIVE_BLOB_RPC_MAX_BATCH_SIZE"), + L1BlobRpcMaxBatchSize = &cli.IntFlag{ + Name: "l1.blob-rpc-max-batch-size", + Usage: "Optional maximum number of L1 blob RPC requests to bundle", + EnvVars: prefixEnvVars("L1_BLOB_RPC_MAX_BATCH_SIZE"), Value: 20, Category: L1RPCCategory, } @@ -404,8 +404,8 @@ var optionalFlags = []cli.Flag{ L1RPCMaxConcurrency, L1HTTPPollInterval, L1ArchiveBlobRpcAddr, - L1ArchiveBlobRpcRateLimit, - L1ArchiveBlobRpcMaxBatchSize, + L1BlobRpcRateLimit, + L1BlobRpcMaxBatchSize, VerifierL1Confs, SequencerEnabledFlag, SequencerStoppedFlag, diff --git a/op-node/service.go b/op-node/service.go index d6760de797..dc770446ba 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -131,8 +131,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { func NewL1BlobEndpointConfig(ctx *cli.Context) node.L1BlobEndpointSetup { return &node.L1BlobEndpointConfig{ NodeAddrs: ctx.String(flags.L1NodeAddr.Name) + "," + ctx.String(flags.L1ArchiveBlobRpcAddr.Name), - RateLimit: ctx.Float64(flags.L1ArchiveBlobRpcRateLimit.Name), - BatchSize: ctx.Int(flags.L1ArchiveBlobRpcMaxBatchSize.Name), + RateLimit: ctx.Float64(flags.L1BlobRpcRateLimit.Name), + BatchSize: ctx.Int(flags.L1BlobRpcMaxBatchSize.Name), } } From b30322d426796e440a37e690b7207f285f0c33ad Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Wed, 29 May 2024 10:19:46 +0800 Subject: [PATCH 14/16] chore: fix e2e ci --- op-e2e/setup.go | 2 +- op-node/node/client.go | 71 ++++++++++++++++++++++++++++++++++++------ op-node/node/config.go | 1 + op-node/service.go | 11 +++++++ 4 files changed, 74 insertions(+), 11 deletions(-) diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 0ad6e2a1d7..6715a385b4 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -601,7 +601,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste configureL1(nodeCfg, sys.EthInstances["l1"]) configureL2(nodeCfg, sys.EthInstances[name], cfg.JWTSecret) if sys.RollupConfig.EcotoneTime != nil { - nodeCfg.L1Blob = &rollupNode.L1BlobEndpointConfig{NodeAddrs: sys.L1BeaconAPIAddr} + nodeCfg.Beacon = &rollupNode.L1BeaconEndpointConfig{BeaconAddr: sys.L1BeaconAPIAddr} } } diff --git a/op-node/node/client.go b/op-node/node/client.go index 3de665bb21..3f2bf80b05 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -31,6 +31,14 @@ type L1EndpointSetup interface { Check() error } +type L1BeaconEndpointSetup interface { + Setup(ctx context.Context, log log.Logger) (cl sources.BeaconClient, fb []sources.BlobSideCarsFetcher, err error) + // ShouldIgnoreBeaconCheck returns true if the Beacon-node version check should not halt startup. + ShouldIgnoreBeaconCheck() bool + ShouldFetchAllSidecars() bool + Check() error +} + type L1BlobEndpointSetup interface { Setup(ctx context.Context, log log.Logger) ([]client.RPC, error) Check() error @@ -193,6 +201,59 @@ func (cfg *PreparedL1Endpoint) Check() error { return nil } +type L1BeaconEndpointConfig struct { + BeaconAddr string // Address of L1 User Beacon-API endpoint to use (beacon namespace required) + BeaconHeader string // Optional HTTP header for all requests to L1 Beacon + BeaconArchiverAddr string // Address of L1 User Beacon-API Archive endpoint to use for expired blobs (beacon namespace required) + BeaconCheckIgnore bool // When false, halt startup if the beacon version endpoint fails + BeaconFetchAllSidecars bool // Whether to fetch all blob sidecars and filter locally +} + +var _ L1BeaconEndpointSetup = (*L1BeaconEndpointConfig)(nil) + +func (cfg *L1BeaconEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl sources.BeaconClient, fb []sources.BlobSideCarsFetcher, err error) { + var opts []client.BasicHTTPClientOption + if cfg.BeaconHeader != "" { + hdr, err := parseHTTPHeader(cfg.BeaconHeader) + if err != nil { + return nil, nil, fmt.Errorf("parsing beacon header: %w", err) + } + opts = append(opts, client.WithHeader(hdr)) + } + + a := client.NewBasicHTTPClient(cfg.BeaconAddr, log, opts...) + if cfg.BeaconArchiverAddr != "" { + b := client.NewBasicHTTPClient(cfg.BeaconArchiverAddr, log) + fb = append(fb, sources.NewBeaconHTTPClient(b)) + } + return sources.NewBeaconHTTPClient(a), fb, nil +} + +func (cfg *L1BeaconEndpointConfig) Check() error { + if cfg.BeaconAddr == "" && !cfg.BeaconCheckIgnore { + return errors.New("expected L1 Beacon API endpoint, but got none") + } + return nil +} + +func (cfg *L1BeaconEndpointConfig) ShouldIgnoreBeaconCheck() bool { + return cfg.BeaconCheckIgnore +} + +func (cfg *L1BeaconEndpointConfig) ShouldFetchAllSidecars() bool { + return cfg.BeaconFetchAllSidecars +} + +func parseHTTPHeader(headerStr string) (http.Header, error) { + h := make(http.Header, 1) + s := strings.SplitN(headerStr, ": ", 2) + if len(s) != 2 { + return nil, errors.New("invalid header format") + } + h.Add(s[0], s[1]) + return h, nil +} + type L1BlobEndpointConfig struct { // Address of L1 blob node endpoint to use, multiple alternative addresses separated by commas are supported, and will rotate when error NodeAddrs string @@ -248,13 +309,3 @@ func (cfg *L1BlobEndpointConfig) Setup(ctx context.Context, log log.Logger) ([]c return rpcClients, nil } - -func parseHTTPHeader(headerStr string) (http.Header, error) { - h := make(http.Header, 1) - s := strings.SplitN(headerStr, ": ", 2) - if len(s) != 2 { - return nil, errors.New("invalid header format") - } - h.Add(s[0], s[1]) - return h, nil -} diff --git a/op-node/node/config.go b/op-node/node/config.go index 82a3435b0a..4f67d0f786 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -21,6 +21,7 @@ type Config struct { L1 L1EndpointSetup L2 L2EndpointSetup + Beacon L1BeaconEndpointSetup L1Blob L1BlobEndpointSetup Driver driver.Config diff --git a/op-node/service.go b/op-node/service.go index dc770446ba..0c8f713850 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -79,6 +79,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { L2: l2Endpoint, Rollup: *rollupConfig, Driver: *driverConfig, + Beacon: NewBeaconEndpointConfig(ctx), L1Blob: NewL1BlobEndpointConfig(ctx), RPC: node.RPCConfig{ ListenAddr: ctx.String(flags.RPCListenAddr.Name), @@ -128,6 +129,16 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { return cfg, nil } +func NewBeaconEndpointConfig(ctx *cli.Context) node.L1BeaconEndpointSetup { + return &node.L1BeaconEndpointConfig{ + BeaconAddr: ctx.String(flags.BeaconAddr.Name), + BeaconHeader: ctx.String(flags.BeaconHeader.Name), + BeaconArchiverAddr: ctx.String(flags.BeaconArchiverAddr.Name), + BeaconCheckIgnore: ctx.Bool(flags.BeaconCheckIgnore.Name), + BeaconFetchAllSidecars: ctx.Bool(flags.BeaconFetchAllSidecars.Name), + } +} + func NewL1BlobEndpointConfig(ctx *cli.Context) node.L1BlobEndpointSetup { return &node.L1BlobEndpointConfig{ NodeAddrs: ctx.String(flags.L1NodeAddr.Name) + "," + ctx.String(flags.L1ArchiveBlobRpcAddr.Name), From a3609cf1119768c076baea4a3fb21c1d8555ea47 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Wed, 29 May 2024 17:14:14 +0800 Subject: [PATCH 15/16] chore: fix e2e ci --- op-e2e/setup.go | 1 + 1 file changed, 1 insertion(+) diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 6715a385b4..0be50744c2 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -602,6 +602,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste configureL2(nodeCfg, sys.EthInstances[name], cfg.JWTSecret) if sys.RollupConfig.EcotoneTime != nil { nodeCfg.Beacon = &rollupNode.L1BeaconEndpointConfig{BeaconAddr: sys.L1BeaconAPIAddr} + nodeCfg.L1Blob = &rollupNode.L1BlobEndpointConfig{NodeAddrs: sys.NodeEndpoint("l1")} } } From 527a280d51955e768b23c6c92d28a6cda0d56621 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Wed, 29 May 2024 17:23:18 +0800 Subject: [PATCH 16/16] fix: do not init L1 blob client if Ecotone is not active --- op-node/node/node.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/op-node/node/node.go b/op-node/node/node.go index 7c5e4a0ebf..8dd64e88d7 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -309,6 +309,14 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error { } func (n *OpNode) initL1Blob(ctx context.Context, cfg *Config) error { + // If Ecotone upgrade is not scheduled yet, then there is no need for a Blob API. + if cfg.Rollup.EcotoneTime == nil { + return nil + } + // Once the Ecotone upgrade is scheduled, we must have initialized the Blob API settings. + if cfg.L1Blob == nil { + return fmt.Errorf("missing L1 Blob Endpoint configuration: this API is mandatory for Ecotone upgrade at t=%d", *cfg.Rollup.EcotoneTime) + } rpcClients, err := cfg.L1Blob.Setup(ctx, n.log) if err != nil { return fmt.Errorf("failed to setup L1 blob client: %w", err)