Skip to content

Commit

Permalink
fix(evm): Split logs filtering into chunks (#328)
Browse files Browse the repository at this point in the history
Signed-off-by: failfmi <[email protected]>
  • Loading branch information
failfmi authored Nov 10, 2021
1 parent 5c8c56e commit 2cf8697
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 5 deletions.
24 changes: 21 additions & 3 deletions app/process/watcher/evm/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,15 @@ type Watcher struct {
filterConfig FilterConfig
}

var defaultSleepDuration = 15 * time.Second
// Certain node providers (Alchemy, Infura) have a limitation on how many blocks
// eth_getLogs can process at once. For this to be mitigated, a maximum amount of blocks
// is introduced, splitting the request into chunks with a range of N.
// For example, a query for events with a range of 5 000 blocks, will be split into 10 queries, each having
// a range of 500 blocks
const defaultMaxLogsBlocks = int64(500)

// The default polling interval (in seconds) when querying for upcoming events/logs
const defaultSleepDuration = 15 * time.Second

type FilterConfig struct {
abi abi.ABI
Expand All @@ -64,6 +72,7 @@ type FilterConfig struct {
burnHash common.Hash
lockHash common.Hash
memberUpdatedHash common.Hash
maxLogsBlocks int64
}

func NewWatcher(
Expand All @@ -73,7 +82,8 @@ func NewWatcher(
mappings c.Assets,
startBlock int64,
validator bool,
pollingInterval time.Duration) *Watcher {
pollingInterval time.Duration,
maxLogsBlocks int64) *Watcher {
currentBlock, err := evmClient.BlockNumber(context.Background())
if err != nil {
log.Fatalf("Could not retrieve latest block. Error: [%s].", err)
Expand Down Expand Up @@ -101,13 +111,18 @@ func NewWatcher(
contracts.Address(),
}

if maxLogsBlocks == 0 {
maxLogsBlocks = defaultMaxLogsBlocks
}

filterConfig := FilterConfig{
abi: abi,
topics: topics,
addresses: addresses,
burnHash: burnHash,
lockHash: lockHash,
memberUpdatedHash: memberUpdatedHash,
maxLogsBlocks: maxLogsBlocks,
}

if pollingInterval == 0 {
Expand Down Expand Up @@ -187,6 +202,10 @@ func (ew Watcher) beginWatching(queue qi.Queue) {
continue
}

if toBlock-fromBlock > ew.filterConfig.maxLogsBlocks {
toBlock = fromBlock + ew.filterConfig.maxLogsBlocks
}

err = ew.processLogs(fromBlock, toBlock, queue)
if err != nil {
ew.logger.Errorf("Failed to process logs. Error: [%s].", err)
Expand All @@ -199,7 +218,6 @@ func (ew Watcher) beginWatching(queue qi.Queue) {
}

func (ew Watcher) processLogs(fromBlock, endBlock int64, queue qi.Queue) error {

query := &ethereum.FilterQuery{
FromBlock: new(big.Int).SetInt64(fromBlock),
ToBlock: new(big.Int).SetInt64(endBlock),
Expand Down
3 changes: 2 additions & 1 deletion app/process/watcher/evm/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ func TestNewWatcher(t *testing.T) {
burnHash: burnHash,
lockHash: lockHash,
memberUpdatedHash: memberUpdatedHash,
maxLogsBlocks: 220,
}

assets := config.LoadAssets(networks)
Expand All @@ -504,7 +505,7 @@ func TestNewWatcher(t *testing.T) {
filterConfig: filterConfig,
}

assert.EqualValues(t, w, NewWatcher(mocks.MStatusRepository, mocks.MBridgeContractService, mocks.MEVMClient, assets, 0, true, 15))
assert.EqualValues(t, w, NewWatcher(mocks.MStatusRepository, mocks.MBridgeContractService, mocks.MEVMClient, assets, 0, true, 15, 220))
}

// TODO: Test_NewWatcher_Fails
Expand Down
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func initializeServerPairs(server *server.Server, services *Services, repositori
configuration.Bridge.Assets,
configuration.Node.Clients.Evm[chain.Int64()].StartBlock,
configuration.Node.Validator,
configuration.Node.Clients.Evm[chain.Int64()].PollingInterval))
configuration.Node.Clients.Evm[chain.Int64()].PollingInterval,
configuration.Node.Clients.Evm[chain.Int64()].MaxLogsBlocks))
}

// Register read-only handlers
Expand Down
1 change: 1 addition & 0 deletions config/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Evm struct {
PrivateKey string
StartBlock int64
PollingInterval time.Duration
MaxLogsBlocks int64
}

type Hedera struct {
Expand Down
1 change: 1 addition & 0 deletions config/parser/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Evm struct {
PrivateKey string `yaml:"private_key"`
StartBlock int64 `yaml:"start_block"`
PollingInterval time.Duration `yaml:"polling_interval"`
MaxLogsBlocks int64 `yaml:"max_logs_blocks"`
}

type Hedera struct {
Expand Down
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Name | Default
`node.clients.evm[].private_key` | "" | The private key for the given EVM network.
`node.clients.evm[].start_block` | 0 | The block from which the application will monitor for events for the given network. If specified, it will start in its primary mode (check `node.validator`) from the given block. If not specified, it will start in read-only mode from the latest saved block in the database to the current block at runtime (`now`) and then continue in its primary mode.
`node.clients.evm[].polling_interval` | 15 | How often (in seconds) the evm client will poll the network for upcoming events.
`node.clients.evm[].max_logs_blocks` | 500 | The maximum amount of blocks range per query when filtering events.
`node.clients.hedera.operator.account_id` | "" | The operator's Hedera account id.
`node.clients.hedera.operator.private_key` | "" | The operator's Hedera private key.
`node.clients.hedera.network` | testnet | Which Hedera network to use. Can be either `mainnet`, `previewnet`, `testnet`.
Expand Down

0 comments on commit 2cf8697

Please sign in to comment.