Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(op-node): support multi clients to fetch blobs #199

Merged
merged 19 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ jobs:
version: latest
args: -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint --timeout 5m -e "errors.As" -e "errors.Is"

op-service-lint:
runs-on: ubuntu-latest

steps:
- name: Check out code
uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: go.mod

- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
working-directory: op-service
version: latest
args: -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint --timeout 5m -e "errors.As" -e "errors.Is"

op-node-test:
runs-on: ubuntu-latest
needs: op-node-lint
Expand Down Expand Up @@ -151,6 +170,35 @@ jobs:
with:
report_paths: '/tmp/test-results/op-proposer.xml'

op-service-test:
runs-on: ubuntu-latest
needs: op-service-lint

steps:
- name: Check out code
uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: go.mod

- name: Install gotestsum
uses: autero1/[email protected]
with:
gotestsum_version: 1.10.0

- name: Run tests
working-directory: op-service
run: |
gotestsum --format=testname --junitfile=/tmp/test-results/op-service.xml -- -parallel=2 -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out ./...

- name: Publish Test Report
uses: mikepenz/action-junit-report@v3
if: success() || failure() # always run even if the previous step fails
with:
report_paths: '/tmp/test-results/op-service.xml'

op-e2e-http-test:
runs-on: ubuntu-latest
needs: [op-node-test, op-batcher-test, op-proposer-test]
Expand Down
3 changes: 2 additions & 1 deletion op-e2e/actions/fallback_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ func setupFallbackClientTest(t Testing, sd *e2eutils.SetupData, log log.Logger,
})
l1F, err := sources.NewL1Client(fallbackClient, log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err)
l1Blob := sources.NewBSCBlobClient([]client.RPC{rpc})
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)

sequencer := NewL2Sequencer(t, log, l1F, l1F, plasma.Disabled, l2Cl, sd.RollupCfg, 0)
sequencer := NewL2Sequencer(t, log, l1F, l1Blob, plasma.Disabled, l2Cl, sd.RollupCfg, 0)
return miner, l1_2, l1_3, engine, sequencer, fallbackClient.(*sources.FallbackClient)
}

Expand Down
2 changes: 1 addition & 1 deletion op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
configureL1(nodeCfg, sys.EthInstances["l1"])
configureL2(nodeCfg, sys.EthInstances[name], cfg.JWTSecret)
if sys.RollupConfig.EcotoneTime != nil {
nodeCfg.Beacon = &rollupNode.L1BeaconEndpointConfig{BeaconAddr: sys.L1BeaconAPIAddr}
nodeCfg.L1Blob = &rollupNode.L1BlobEndpointConfig{NodeAddrs: sys.L1BeaconAPIAddr}
}
}

Expand Down
24 changes: 24 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,27 @@ var (
Value: time.Second * 3,
Category: L1RPCCategory,
}
L1ArchiveBlobRpcAddr = &cli.StringFlag{
Name: "l1.archive-blob-rpc",
Usage: "Optional address of L1 archive blob endpoint to use. Multiple alternative addresses are supported, separated by commas, and will rotate when error",
Required: false,
EnvVars: prefixEnvVars("L1_ARCHIVE_BLOB_RPC"),
Category: RollupCategory,
}
L1BlobRpcRateLimit = &cli.Float64Flag{
Name: "l1.blob-rpc-rate-limit",
Usage: "Optional self-imposed global rate-limit on L1 blob RPC requests, specified in requests / second. Disabled if set to 0.",
EnvVars: prefixEnvVars("L1_BLOB_RPC_RATE_LIMIT"),
Value: 0,
Category: L1RPCCategory,
}
L1BlobRpcMaxBatchSize = &cli.IntFlag{
Name: "l1.blob-rpc-max-batch-size",
Usage: "Optional maximum number of L1 blob RPC requests to bundle",
EnvVars: prefixEnvVars("L1_BLOB_RPC_MAX_BATCH_SIZE"),
Value: 20,
Category: L1RPCCategory,
}
VerifierL1Confs = &cli.Uint64Flag{
Name: "verifier.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head before deriving L2 data from. Reorgs are supported, but may be slow to perform.",
Expand Down Expand Up @@ -382,6 +403,9 @@ var optionalFlags = []cli.Flag{
L1RPCMaxBatchSize,
L1RPCMaxConcurrency,
L1HTTPPollInterval,
L1ArchiveBlobRpcAddr,
L1BlobRpcRateLimit,
L1BlobRpcMaxBatchSize,
VerifierL1Confs,
SequencerEnabledFlag,
SequencerStoppedFlag,
Expand Down
85 changes: 47 additions & 38 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/client"
service_client "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"

"github.com/ethereum/go-ethereum/log"
Expand All @@ -32,11 +31,8 @@ type L1EndpointSetup interface {
Check() error
}

type L1BeaconEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl sources.BeaconClient, fb []sources.BlobSideCarsFetcher, err error)
// ShouldIgnoreBeaconCheck returns true if the Beacon-node version check should not halt startup.
ShouldIgnoreBeaconCheck() bool
ShouldFetchAllSidecars() bool
type L1BlobEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) ([]client.RPC, error)
Check() error
}

Expand Down Expand Up @@ -147,7 +143,7 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
}

isMultiUrl, urlList := service_client.MultiUrlParse(cfg.L1NodeAddr)
isMultiUrl, urlList := client.MultiUrlParse(cfg.L1NodeAddr)
if isMultiUrl {
return fallbackClientWrap(ctx, log, urlList, cfg, rollupCfg, opts...)
}
Expand Down Expand Up @@ -197,47 +193,60 @@ func (cfg *PreparedL1Endpoint) Check() error {
return nil
}

type L1BeaconEndpointConfig struct {
BeaconAddr string // Address of L1 User Beacon-API endpoint to use (beacon namespace required)
BeaconHeader string // Optional HTTP header for all requests to L1 Beacon
BeaconArchiverAddr string // Address of L1 User Beacon-API Archive endpoint to use for expired blobs (beacon namespace required)
BeaconCheckIgnore bool // When false, halt startup if the beacon version endpoint fails
BeaconFetchAllSidecars bool // Whether to fetch all blob sidecars and filter locally
type L1BlobEndpointConfig struct {
// Address of L1 blob node endpoint to use, multiple alternative addresses separated by commas are supported, and will rotate when error
NodeAddrs string

// RateLimit specifies a self-imposed rate-limit on L1 requests. 0 is no rate-limit.
RateLimit float64

// BatchSize specifies the maximum batch-size, which also applies as L1 rate-limit burst amount (if set).
BatchSize int
}

var _ L1BeaconEndpointSetup = (*L1BeaconEndpointConfig)(nil)
var _ L1BlobEndpointSetup = (*L1BlobEndpointConfig)(nil)

func (cfg *L1BeaconEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl sources.BeaconClient, fb []sources.BlobSideCarsFetcher, err error) {
var opts []client.BasicHTTPClientOption
if cfg.BeaconHeader != "" {
hdr, err := parseHTTPHeader(cfg.BeaconHeader)
if err != nil {
return nil, nil, fmt.Errorf("parsing beacon header: %w", err)
}
opts = append(opts, client.WithHeader(hdr))
func (cfg *L1BlobEndpointConfig) Check() error {
if cfg.NodeAddrs == "" {
return fmt.Errorf("empty L1 blob endpoint address")
}

a := client.NewBasicHTTPClient(cfg.BeaconAddr, log, opts...)
if cfg.BeaconArchiverAddr != "" {
b := client.NewBasicHTTPClient(cfg.BeaconArchiverAddr, log)
fb = append(fb, sources.NewBeaconHTTPClient(b))
if cfg.BatchSize < 1 || cfg.BatchSize > 500 {
return fmt.Errorf("batch size is invalid or unreasonable: %d", cfg.BatchSize)
}
return sources.NewBeaconHTTPClient(a), fb, nil
}

func (cfg *L1BeaconEndpointConfig) Check() error {
if cfg.BeaconAddr == "" && !cfg.BeaconCheckIgnore {
return errors.New("expected L1 Beacon API endpoint, but got none")
if cfg.RateLimit < 0 {
return fmt.Errorf("rate limit cannot be negative")
}
return nil
}

func (cfg *L1BeaconEndpointConfig) ShouldIgnoreBeaconCheck() bool {
return cfg.BeaconCheckIgnore
}
func (cfg *L1BlobEndpointConfig) Setup(ctx context.Context, log log.Logger) ([]client.RPC, error) {
rpcClients := make([]client.RPC, 0)

opts := []client.RPCOption{
client.WithDialBackoff(10),
}
if cfg.RateLimit != 0 {
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
}
isMultiUrl, urlList := client.MultiUrlParse(cfg.NodeAddrs)

if isMultiUrl {
for _, url := range urlList {
rpcClient, err := client.NewRPC(ctx, log, url, opts...)
if err != nil {
return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", url, err)
}
rpcClients = append(rpcClients, rpcClient)
}
} else {
rpcClient, err := client.NewRPC(ctx, log, cfg.NodeAddrs, opts...)
if err != nil {
return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", cfg.NodeAddrs, err)
}
rpcClients = append(rpcClients, rpcClient)
}

func (cfg *L1BeaconEndpointConfig) ShouldFetchAllSidecars() bool {
return cfg.BeaconFetchAllSidecars
return rpcClients, nil
}

func parseHTTPHeader(headerStr string) (http.Header, error) {
Expand Down
2 changes: 1 addition & 1 deletion op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Config struct {
L1 L1EndpointSetup
L2 L2EndpointSetup

Beacon L1BeaconEndpointSetup
L1Blob L1BlobEndpointSetup

Driver driver.Config

Expand Down
35 changes: 26 additions & 9 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ type OpNode struct {
l1SafeSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)

l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging
runCfg *RuntimeConfig // runtime configurables
l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
l1Blob *sources.BSCBlobClient // L1 Blob Client to fetch blobs
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging
runCfg *RuntimeConfig // runtime configurables

safeDB closableSafeDB

Expand Down Expand Up @@ -123,6 +124,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL1(ctx, cfg); err != nil {
return fmt.Errorf("failed to init L1: %w", err)
}
if err := n.initL1Blob(ctx, cfg); err != nil {
return fmt.Errorf("failed to init L1 blob: %w", err)
}
if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return fmt.Errorf("failed to init L2: %w", err)
}
Expand Down Expand Up @@ -304,6 +308,19 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
return nil
}

func (n *OpNode) initL1Blob(ctx context.Context, cfg *Config) error {
rpcClients, err := cfg.L1Blob.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L1 blob client: %w", err)
}
instrumentedClients := make([]client.RPC, 0)
for _, rpc := range rpcClients {
instrumentedClients = append(instrumentedClients, client.NewInstrumentedRPC(rpc, n.metrics))
}
n.l1Blob = sources.NewBSCBlobClient(instrumentedClients)
return nil
}

func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup)
if err != nil {
Expand Down Expand Up @@ -342,7 +359,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
} else {
n.safeDB = safedb.Disabled
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA)
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.l1Blob, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA)
return nil
}

Expand Down
14 changes: 6 additions & 8 deletions op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
L2: l2Endpoint,
Rollup: *rollupConfig,
Driver: *driverConfig,
Beacon: NewBeaconEndpointConfig(ctx),
L1Blob: NewL1BlobEndpointConfig(ctx),
RPC: node.RPCConfig{
ListenAddr: ctx.String(flags.RPCListenAddr.Name),
ListenPort: ctx.Int(flags.RPCListenPort.Name),
Expand Down Expand Up @@ -128,13 +128,11 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return cfg, nil
}

func NewBeaconEndpointConfig(ctx *cli.Context) node.L1BeaconEndpointSetup {
return &node.L1BeaconEndpointConfig{
BeaconAddr: ctx.String(flags.BeaconAddr.Name),
BeaconHeader: ctx.String(flags.BeaconHeader.Name),
BeaconArchiverAddr: ctx.String(flags.BeaconArchiverAddr.Name),
BeaconCheckIgnore: ctx.Bool(flags.BeaconCheckIgnore.Name),
BeaconFetchAllSidecars: ctx.Bool(flags.BeaconFetchAllSidecars.Name),
func NewL1BlobEndpointConfig(ctx *cli.Context) node.L1BlobEndpointSetup {
return &node.L1BlobEndpointConfig{
NodeAddrs: ctx.String(flags.L1NodeAddr.Name) + "," + ctx.String(flags.L1ArchiveBlobRpcAddr.Name),
RateLimit: ctx.Float64(flags.L1BlobRpcRateLimit.Name),
BatchSize: ctx.Int(flags.L1BlobRpcMaxBatchSize.Name),
}
}

Expand Down
3 changes: 2 additions & 1 deletion op-service/bsc/compat.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package bsc

import (
lru "github.com/hashicorp/golang-lru/v2"
"math/big"
"sort"

lru "github.com/hashicorp/golang-lru/v2"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down
2 changes: 1 addition & 1 deletion op-service/dial/static_l2_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewStaticL2EndpointProvider(ctx context.Context, log log.Logger, ethClientU
}
return &StaticL2EndpointProvider{
StaticL2RollupProvider: *rollupProvider,
ethClient: client.NewInstrumentedClient(ethClient, metrics),
ethClient: client.NewInstrumentedClient(ethClient, metrics),
}, nil
}

Expand Down
Loading
Loading