Skip to content

Commit

Permalink
multithreaded noble listener
Browse files Browse the repository at this point in the history
  • Loading branch information
bd21 committed Oct 5, 2023
1 parent 8cf505c commit ae4b0b1
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 38 deletions.
105 changes: 76 additions & 29 deletions cmd/noble/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/strangelove-ventures/noble-cctp-relayer/types"
"io"
"net/http"
"strconv"
"sync"
"time"
)

Expand All @@ -19,40 +21,85 @@ func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *t
cfg.Networks.Source.Noble.StartBlock,
cfg.Networks.Source.Noble.LookbackPeriod))

// TODO multithreading
// constantly query for blocks
var wg sync.WaitGroup
wg.Add(1)

// enqueue block heights
currentBlock := cfg.Networks.Source.Noble.StartBlock
for {
rawResponse, err := http.Get(fmt.Sprintf("https://rpc.testnet.noble.strange.love/tx_search?query=\"tx.height=%d\"", currentBlock))
if err != nil {
logger.Debug(fmt.Sprintf("unable to query Noble block %d", currentBlock))
time.Sleep(5 * time.Second)
}
if rawResponse.StatusCode != http.StatusOK {
logger.Debug(fmt.Sprintf("non 200 response received for Noble block %d", currentBlock))
}
chainTip := GetNobleChainTip(cfg)
blockQueue := make(chan uint64, 1000000)
for i := currentBlock; i < chainTip; i++ {
blockQueue <- i
}
currentBlock = chainTip

body, err := io.ReadAll(rawResponse.Body)
if err != nil {
logger.Debug(fmt.Sprintf("unable to parse Noble block %d", currentBlock))
time.Sleep(5 * time.Second)
go func() {
for {
chainTip = GetNobleChainTip(cfg)
if chainTip > currentBlock {
for i := currentBlock + 1; i <= chainTip; i++ {
blockQueue <- i
}
}
time.Sleep(6 * time.Second)
}
}()

response := types.GetBlockResultsResponse{}
err = json.Unmarshal(body, &response)
if err != nil {
logger.Debug(fmt.Sprintf("unable to unmarshal Noble block %d", currentBlock))
time.Sleep(5 * time.Second)
}
// constantly query for blocks
for i := 0; i < int(cfg.Networks.Source.Noble.Workers); i++ {
go func() {
for {
currentBlock := <-blockQueue
logger.Debug(fmt.Sprintf("Querying Noble block %d", currentBlock))
rawResponse, err := http.Get(fmt.Sprintf("https://rpc.testnet.noble.strange.love/tx_search?query=\"tx.height=%d\"", currentBlock))
if err != nil {
logger.Debug(fmt.Sprintf("unable to query Noble block %d", currentBlock))
continue
}
if rawResponse.StatusCode != http.StatusOK {
logger.Debug(fmt.Sprintf("non 200 response received for Noble block %d", currentBlock))
time.Sleep(5 * time.Second)
blockQueue <- currentBlock
continue
}

for _, tx := range response.Result.Txs {
parsedMsg, err := types.NobleLogToMessageState(tx)
if err != nil {
logger.Error("unable to parse Noble log into MessageState, skipping")
continue
body, err := io.ReadAll(rawResponse.Body)
if err != nil {
logger.Debug(fmt.Sprintf("unable to parse Noble block %d", currentBlock))
continue
}

response := types.BlockResultsResponse{}
err = json.Unmarshal(body, &response)
if err != nil {
logger.Debug(fmt.Sprintf("unable to unmarshal Noble block %d", currentBlock))
continue
}

for _, tx := range response.Result.Txs {
parsedMsg, err := types.NobleLogToMessageState(tx)
if err != nil {
logger.Debug("unable to parse Noble log into MessageState, skipping")
continue
}
processingQueue <- parsedMsg
}
}
processingQueue <- parsedMsg
}
currentBlock += 1
}()
}

wg.Wait()
}

func GetNobleChainTip(cfg config.Config) uint64 {
rawResponse, _ := http.Get(cfg.Networks.Source.Noble.RPC + "/block")
body, _ := io.ReadAll(rawResponse.Body)

response := types.BlockResponse{}
err := json.Unmarshal(body, &response)
if err != nil {
fmt.Println(err.Error())
}
res, _ := strconv.ParseInt(response.Result.Block.Header.Height, 10, 0)
return uint64(res)
}
4 changes: 2 additions & 2 deletions cmd/noble/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var processingQueue chan *types.MessageState
func init() {
cfg = config.Parse("../../.ignore/unit_tests.yaml")

logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.ErrorLevel))
logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.DebugLevel))
processingQueue = make(chan *types.MessageState, 10000)
}

Expand All @@ -27,7 +27,7 @@ func TestStartListener(t *testing.T) {
cfg.Networks.Source.Noble.LookbackPeriod = 0
go StartListener(cfg, logger, processingQueue)

time.Sleep(5 * time.Second)
time.Sleep(30 * time.Second)

msg := <-processingQueue

Expand Down
15 changes: 9 additions & 6 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package cmd

import (
"context"
"encoding/json"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/gin-gonic/gin"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
"io"
"net/http"
"os"
"strconv"
Expand Down Expand Up @@ -61,12 +63,13 @@ func init() {
}

// if Noble start block not set, default to latest
if Cfg.Networks.Source.Ethereum.StartBlock == 0 {
// TODO set to latest block
client, _ := ethclient.Dial(Cfg.Networks.Source.Ethereum.RPC)
defer client.Close()
header, _ := client.HeaderByNumber(context.Background(), nil)
Cfg.Networks.Source.Ethereum.StartBlock = header.Number.Uint64()
if Cfg.Networks.Source.Noble.StartBlock == 0 {
// todo refactor to use listener's function GetNobleChainTip
rawResponse, _ := http.Get(Cfg.Networks.Source.Noble.RPC + "/block")
body, _ := io.ReadAll(rawResponse.Body)
response := types.BlockResponse{}
_ = json.Unmarshal(body, &response)
Cfg.Networks.Source.Noble.StartBlock = uint64(response.Result.Block.Height)
}

// start api server
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {
RequestQueueSize uint32 `yaml:"request-queue-size"`
StartBlock uint64 `yaml:"start-block"`
LookbackPeriod uint64 `yaml:"lookback-period"`
Workers uint32 `yaml:"workers"`
Enabled bool `yaml:"enabled"`
} `yaml:"noble"`
} `yaml:"source"`
Expand Down
12 changes: 11 additions & 1 deletion types/block.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
package types

type GetBlockResultsResponse struct {
type BlockResponse struct {
Result struct {
Block struct {
Header struct {
Height string `json:"height"`
} `json:"header"`
} `json:"block"`
} `json:"result"`
}

type BlockResultsResponse struct {
Result struct {
Txs []Tx `json:"txs"`
} `json:"result"`
Expand Down

0 comments on commit ae4b0b1

Please sign in to comment.