From 2cf869708d0bdf56c58dfb1fe5d9ecf76f62fb13 Mon Sep 17 00:00:00 2001 From: failfmi Date: Wed, 10 Nov 2021 10:19:23 +0200 Subject: [PATCH] fix(evm): Split logs filtering into chunks (#328) Signed-off-by: failfmi --- app/process/watcher/evm/watcher.go | 24 +++++++++++++++++++++--- app/process/watcher/evm/watcher_test.go | 3 ++- cmd/main.go | 3 ++- config/node.go | 1 + config/parser/node.go | 1 + docs/configuration.md | 1 + 6 files changed, 28 insertions(+), 5 deletions(-) diff --git a/app/process/watcher/evm/watcher.go b/app/process/watcher/evm/watcher.go index 918b86857..e137198a9 100644 --- a/app/process/watcher/evm/watcher.go +++ b/app/process/watcher/evm/watcher.go @@ -55,7 +55,15 @@ type Watcher struct { filterConfig FilterConfig } -var defaultSleepDuration = 15 * time.Second +// Certain node providers (Alchemy, Infura) have a limitation on how many blocks +// eth_getLogs can process at once. For this to be mitigated, a maximum amount of blocks +// is introduced, splitting the request into chunks with a range of N. +// For example, a query for events with a range of 5 000 blocks, will be split into 10 queries, each having +// a range of 500 blocks +const defaultMaxLogsBlocks = int64(500) + +// The default polling interval (in seconds) when querying for upcoming events/logs +const defaultSleepDuration = 15 * time.Second type FilterConfig struct { abi abi.ABI @@ -64,6 +72,7 @@ type FilterConfig struct { burnHash common.Hash lockHash common.Hash memberUpdatedHash common.Hash + maxLogsBlocks int64 } func NewWatcher( @@ -73,7 +82,8 @@ func NewWatcher( mappings c.Assets, startBlock int64, validator bool, - pollingInterval time.Duration) *Watcher { + pollingInterval time.Duration, + maxLogsBlocks int64) *Watcher { currentBlock, err := evmClient.BlockNumber(context.Background()) if err != nil { log.Fatalf("Could not retrieve latest block. Error: [%s].", err) @@ -101,6 +111,10 @@ func NewWatcher( contracts.Address(), } + if maxLogsBlocks == 0 { + maxLogsBlocks = defaultMaxLogsBlocks + } + filterConfig := FilterConfig{ abi: abi, topics: topics, @@ -108,6 +122,7 @@ func NewWatcher( burnHash: burnHash, lockHash: lockHash, memberUpdatedHash: memberUpdatedHash, + maxLogsBlocks: maxLogsBlocks, } if pollingInterval == 0 { @@ -187,6 +202,10 @@ func (ew Watcher) beginWatching(queue qi.Queue) { continue } + if toBlock-fromBlock > ew.filterConfig.maxLogsBlocks { + toBlock = fromBlock + ew.filterConfig.maxLogsBlocks + } + err = ew.processLogs(fromBlock, toBlock, queue) if err != nil { ew.logger.Errorf("Failed to process logs. Error: [%s].", err) @@ -199,7 +218,6 @@ func (ew Watcher) beginWatching(queue qi.Queue) { } func (ew Watcher) processLogs(fromBlock, endBlock int64, queue qi.Queue) error { - query := ðereum.FilterQuery{ FromBlock: new(big.Int).SetInt64(fromBlock), ToBlock: new(big.Int).SetInt64(endBlock), diff --git a/app/process/watcher/evm/watcher_test.go b/app/process/watcher/evm/watcher_test.go index 420b2aba0..2edb647e0 100644 --- a/app/process/watcher/evm/watcher_test.go +++ b/app/process/watcher/evm/watcher_test.go @@ -489,6 +489,7 @@ func TestNewWatcher(t *testing.T) { burnHash: burnHash, lockHash: lockHash, memberUpdatedHash: memberUpdatedHash, + maxLogsBlocks: 220, } assets := config.LoadAssets(networks) @@ -504,7 +505,7 @@ func TestNewWatcher(t *testing.T) { filterConfig: filterConfig, } - assert.EqualValues(t, w, NewWatcher(mocks.MStatusRepository, mocks.MBridgeContractService, mocks.MEVMClient, assets, 0, true, 15)) + assert.EqualValues(t, w, NewWatcher(mocks.MStatusRepository, mocks.MBridgeContractService, mocks.MEVMClient, assets, 0, true, 15, 220)) } // TODO: Test_NewWatcher_Fails diff --git a/cmd/main.go b/cmd/main.go index d9e5cde35..0a860cbb2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -137,7 +137,8 @@ func initializeServerPairs(server *server.Server, services *Services, repositori configuration.Bridge.Assets, configuration.Node.Clients.Evm[chain.Int64()].StartBlock, configuration.Node.Validator, - configuration.Node.Clients.Evm[chain.Int64()].PollingInterval)) + configuration.Node.Clients.Evm[chain.Int64()].PollingInterval, + configuration.Node.Clients.Evm[chain.Int64()].MaxLogsBlocks)) } // Register read-only handlers diff --git a/config/node.go b/config/node.go index 3134666e6..b1b65916f 100644 --- a/config/node.go +++ b/config/node.go @@ -49,6 +49,7 @@ type Evm struct { PrivateKey string StartBlock int64 PollingInterval time.Duration + MaxLogsBlocks int64 } type Hedera struct { diff --git a/config/parser/node.go b/config/parser/node.go index af350f7d3..c5ee709d9 100644 --- a/config/parser/node.go +++ b/config/parser/node.go @@ -49,6 +49,7 @@ type Evm struct { PrivateKey string `yaml:"private_key"` StartBlock int64 `yaml:"start_block"` PollingInterval time.Duration `yaml:"polling_interval"` + MaxLogsBlocks int64 `yaml:"max_logs_blocks"` } type Hedera struct { diff --git a/docs/configuration.md b/docs/configuration.md index 3d12d1096..7f952063f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -28,6 +28,7 @@ Name | Default `node.clients.evm[].private_key` | "" | The private key for the given EVM network. `node.clients.evm[].start_block` | 0 | The block from which the application will monitor for events for the given network. If specified, it will start in its primary mode (check `node.validator`) from the given block. If not specified, it will start in read-only mode from the latest saved block in the database to the current block at runtime (`now`) and then continue in its primary mode. `node.clients.evm[].polling_interval` | 15 | How often (in seconds) the evm client will poll the network for upcoming events. +`node.clients.evm[].max_logs_blocks` | 500 | The maximum amount of blocks range per query when filtering events. `node.clients.hedera.operator.account_id` | "" | The operator's Hedera account id. `node.clients.hedera.operator.private_key` | "" | The operator's Hedera private key. `node.clients.hedera.network` | testnet | Which Hedera network to use. Can be either `mainnet`, `previewnet`, `testnet`.