Skip to content

Commit

Permalink
height tracker use websocket + cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Mar 20, 2024
1 parent 2107822 commit ef0904b
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 60 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ Running the relayer
noble-cctp-relayer start --config ./config/sample-app-config.yaml
```
Sample configs can be found in [config](config).
### Promethius Metrics

### Flush Interval

Using the `--flush-interval` flag will run a flush on all paths every `duration`; ex `--flush-interval 5m`

The relayer will keep track of the latest flushed block. The first time the flush is run, the flush will start at the chains latest height - lookback period and flush up until height of the chain when the flush started. It will then store the height the flush ended on.

After that, it will flush from the last stored height - lookback period up until the latest height of the chain.

### Prometheus Metrics

By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag.

Expand Down
4 changes: 2 additions & 2 deletions cmd/appstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func (a *AppState) loadConfigFile() {
}
config, err := ParseConfig(a.ConfigPath)
if err != nil {
a.Logger.Error("unable to parse config file", "location", a.ConfigPath, "err", err)
a.Logger.Error("Unable to parse config file", "location", a.ConfigPath, "err", err)
os.Exit(1)
}
a.Logger.Info("successfully parsed config file", "location", a.ConfigPath)
a.Logger.Info("Successfully parsed config file", "location", a.ConfigPath)
a.Config = config

}
3 changes: 1 addition & 2 deletions cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func Start(a *AppState) *cobra.Command {
os.Exit(1)
}

updateLatestHeight := 1 * time.Second
go c.TrackLatestBlockHeight(cmd.Context(), logger, updateLatestHeight)
go c.TrackLatestBlockHeight(cmd.Context(), logger)

// wait until height is available
for c.LatestBlock() == 0 {
Expand Down
72 changes: 34 additions & 38 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ var (
// errSignal allows broadcasting an error value to multiple receivers.
type errSignal struct {
Ready chan struct{}

Err error
Next *errSignal
}

func (e *Ethereum) StartListener(
Expand All @@ -48,12 +45,12 @@ func (e *Ethereum) StartListener(

messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json")
if err != nil {
logger.Error("unable to read MessageTransmitter abi", "err", err)
logger.Error("Unable to read MessageTransmitter abi", "err", err)
os.Exit(1)
}
messageTransmitterABI, err = abi.JSON(bytes.NewReader(messageTransmitter))
if err != nil {
logger.Error("unable to parse MessageTransmitter abi", "err", err)
logger.Error("Unable to parse MessageTransmitter abi", "err", err)
os.Exit(1)
}

Expand Down Expand Up @@ -82,17 +79,17 @@ func (e *Ethereum) startListenerRoutines(
go e.consumeStream(ctx, logger, stream, sig)
consumeHistory(logger, history)

// get history from start-lookback up until latest block
// get history from (start block - lookback) up until latest block
latestBlock := e.LatestBlock()
start := latestBlock
if e.startBlock != 0 {
start = e.startBlock
}
startLookback := start - e.lookbackPeriod
logger.Info(fmt.Sprintf("getting history from %d: starting at:%d and looking back %d blocks", startLookback, start, e.lookbackPeriod))
logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, startLookback, latestBlock)

logger.Info("finished getting history")
logger.Info("Finished getting history")

if flushInterval > 0 {
go e.flushMechanism(ctx, logger, sig)
Expand All @@ -105,7 +102,7 @@ func (e *Ethereum) startListenerRoutines(
case <-ctx.Done():
return
case err := <-sub.Err():
logger.Error("websocket disconnected. Reconnecting...", "err", err)
logger.Error("Websocket disconnected. Reconnecting...", "err", err)
close(sig.Ready)

// restart
Expand Down Expand Up @@ -143,7 +140,7 @@ func (e *Ethereum) startMainStream(
// https://github.com/ethereum/go-ethereum/issues/15063
stream, sub, history, err = etherReader.QueryWithHistory(ctx, &query)
if err != nil {
logger.Error("unable to subscribe to logs", "attempt", queryAttempt, "err", err)
logger.Error("Unable to subscribe to logs", "attempt", queryAttempt, "err", err)
queryAttempt++
time.Sleep(1 * time.Second)
continue
Expand Down Expand Up @@ -178,7 +175,7 @@ func (e *Ethereum) getAndConsumeHistory(
toBlock = end
}

logger.Debug(fmt.Sprintf("looking back in chunks of %d: chunk: %d/%d start-block: %d end-block: %d", chunkSize, chunk, totalChunksNeeded, fromBlock, toBlock))
logger.Debug(fmt.Sprintf("Looking back in chunks of %d: chunk: %d/%d start-block: %d end-block: %d", chunkSize, chunk, totalChunksNeeded, fromBlock, toBlock))

etherReader := etherstream.Reader{Backend: e.wsClient}

Expand All @@ -192,7 +189,8 @@ func (e *Ethereum) getAndConsumeHistory(
for {
_, toUnSub, history, err = etherReader.QueryWithHistory(ctx, &query)
if err != nil {
logger.Error("unable to query history from %d to %d. attempt: %d", start, end, queryAttempt)
// TODO: add metrics for this log
logger.Error(fmt.Sprintf("Unable to query history from %d to %d. attempt: %d", start, end, queryAttempt), "err", err)
queryAttempt++
time.Sleep(1 * time.Second)
continue
Expand Down Expand Up @@ -233,14 +231,14 @@ func (e *Ethereum) consumeStream(
stream <-chan ethtypes.Log,
sig *errSignal,
) {
logger.Debug("consuming incoming messages")
logger.Info("Starting consumption of incoming stream")
var txState *types.TxState
for {
select {
case <-ctx.Done():
return
case <-sig.Ready:
logger.Debug("websocket disconnected...stopped consuming stream")
logger.Debug("Websocket disconnected... Stopped consuming stream. Will restart after websocket is re-established")
return
case streamLog := <-stream:
parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog)
Expand Down Expand Up @@ -272,7 +270,7 @@ func (e *Ethereum) flushMechanism(
logger log.Logger,
sig *errSignal,
) {
logger.Debug(fmt.Sprintf("flush mechanism started. Will flush every %v", flushInterval))
logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval))

for {
timer := time.NewTimer(flushInterval)
Expand All @@ -286,18 +284,18 @@ func (e *Ethereum) flushMechanism(

start := e.lastFlushedBlock - e.lookbackPeriod

logger.Info(fmt.Sprintf("flush started from %d to %d", start, latestBlock))
logger.Info(fmt.Sprintf("Flush started from %d to %d", start, latestBlock))

e.getAndConsumeHistory(ctx, logger, start, latestBlock)

e.lastFlushedBlock = latestBlock

logger.Info("flush complete")
logger.Info("Flush complete")

// if main websocket stream is disconnected, stop flush. It will be restarted once websocket is reconnected
case <-sig.Ready:
timer.Stop()
logger.Debug("websocket disconnected. Flush stopped. Will restart after websocket is re-established")
logger.Debug("Websocket disconnected... Flush stopped. Will restart after websocket is re-established")
return
case <-ctx.Done():
timer.Stop()
Expand All @@ -306,33 +304,31 @@ func (e *Ethereum) flushMechanism(
}
}

func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, loop time.Duration) {
func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) {
logger.With("routine", "TrackLatestBlockHeight", "chain", e.name, "domain", e.domain)

// first time
header, err := e.rpcClient.HeaderByNumber(ctx, nil)
headers := make(chan *ethtypes.Header)

sub, err := e.wsClient.SubscribeNewHead(ctx, headers)
if err != nil {
logger.Error(fmt.Sprintf("error getting latest block height. Will retry in %.2f second:", loop.Seconds()), "err", err)
}
if err == nil {
e.SetLatestBlock(header.Number.Uint64())
logger.Error("Failed to connect to websocket to track height. Will retry...", "err", err)
time.Sleep(1 * time.Second)
e.TrackLatestBlockHeight(ctx, logger)
return
}

// then start loop on a timer
logger.Info("Height tracking websocket subscritpiton connected")

for {
timer := time.NewTimer(loop)
select {
case <-timer.C:
header, err := e.rpcClient.HeaderByNumber(ctx, nil)
if err != nil {
logger.Debug(fmt.Sprintf("error getting latest block height. Will retry in %.2f second:", loop.Seconds()), "err", err)
continue
}
e.SetLatestBlock(header.Number.Uint64())

case <-ctx.Done():
timer.Stop()
return
case err := <-sub.Err():
logger.Error("Height tracker websocket subscritpiton error. Attempting to reconnect...", "err", err)
e.TrackLatestBlockHeight(ctx, logger)
return
case header := <-headers:
e.SetLatestBlock(header.Number.Uint64())
}
}
}
Expand All @@ -356,7 +352,7 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m
timer.Stop()
balance, err := e.rpcClient.BalanceAt(ctx, account, nil)
if err != nil {
logger.Error(fmt.Sprintf("error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err)
logger.Error(fmt.Sprintf("Error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err)
continue
}

Expand All @@ -367,7 +363,7 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m
case <-timer.C:
balance, err := e.rpcClient.BalanceAt(ctx, account, nil)
if err != nil {
logger.Error(fmt.Sprintf("error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err)
logger.Error(fmt.Sprintf("Error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err)
continue
}

Expand Down
5 changes: 0 additions & 5 deletions noble/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,6 @@ func (n *Noble) attemptBroadcast(

if used {
msg.Status = types.Complete
// bm, _ := new(cctptypes.BurnMessage).Parse(msg.MsgBody)
// x, err := hex.DecodeString(string(bm.MintRecipient))
// fmt.Println("err", err)
// y := common.HexToAddress(string(x))
// fmt.Println("ERRR", err)
logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used.", msg.Nonce), "src-tx", msg.SourceTxHash)
continue
}
Expand Down
22 changes: 11 additions & 11 deletions noble/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ func (n *Noble) StartListener(
block := <-blockQueue
res, err := n.cc.RPCClient.TxSearch(ctx, fmt.Sprintf("tx.height=%d", block), false, nil, nil, "")
if err != nil || res == nil {
logger.Debug(fmt.Sprintf("unable to query Noble block %d. Will retry.", block), "error:", err)
logger.Debug(fmt.Sprintf("Unable to query Noble block %d. Will retry.", block), "error:", err)
blockQueue <- block
continue
}

for _, tx := range res.Txs {
parsedMsgs, err := txToMessageState(tx)
if err != nil {
logger.Error("unable to parse Noble log to message state", "err", err.Error())
logger.Error("Unable to parse Noble log to message state", "err", err.Error())
continue
}
for _, parsedMsg := range parsedMsgs {
Expand All @@ -128,7 +128,7 @@ func (n *Noble) flushMechanism(
blockQueue chan uint64,
) {

logger.Debug(fmt.Sprintf("flush mechanism started. Will flush every %v", flushInterval))
logger.Debug(fmt.Sprintf("Flush mechanism started. Will flush every %v", flushInterval))

for {
timer := time.NewTimer(flushInterval)
Expand All @@ -139,11 +139,11 @@ func (n *Noble) flushMechanism(
// test to see that the rpc is available before attempting flush
res, err := n.cc.RPCClient.Status(ctx)
if err != nil {
logger.Error(fmt.Sprintf("skipping flush... error reaching out to rpc, will retry flush in %v", flushInterval))
logger.Error(fmt.Sprintf("Skipping flush... error reaching out to rpc, will retry flush in %v", flushInterval))
continue
}
if res.SyncInfo.CatchingUp {
logger.Error(fmt.Sprintf("skipping flush... rpc still catching, will retry flush in %v", flushInterval))
logger.Error(fmt.Sprintf("Skipping flush... rpc still catching, will retry flush in %v", flushInterval))
continue
}

Expand All @@ -154,14 +154,14 @@ func (n *Noble) flushMechanism(

flushStart := lastFlushedBlock - n.lookbackPeriod

logger.Info(fmt.Sprintf("flush started from: %d to: %d", flushStart, latestBlock))
logger.Info(fmt.Sprintf("Flush started from: %d to: %d", flushStart, latestBlock))

for i := flushStart; i <= latestBlock; i++ {
blockQueue <- i
}
n.lastFlushedBlock = latestBlock

logger.Info("flush complete")
logger.Info("Flush complete")

case <-ctx.Done():
timer.Stop()
Expand All @@ -170,24 +170,24 @@ func (n *Noble) flushMechanism(
}
}

func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, loop time.Duration) {
func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) {
logger.With("routine", "TrackLatestBlockHeight", "chain", n.Name(), "domain", n.Domain())

// first time
res, err := n.cc.RPCClient.Status(ctx)
if err != nil {
logger.Error("unable to query Nobles latest height", "err", err)
logger.Error("Unable to query Nobles latest height", "err", err)
}
n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight))

// then start loop on a timer
for {
timer := time.NewTimer(loop)
timer := time.NewTimer(6 * time.Second)
select {
case <-timer.C:
res, err := n.cc.RPCClient.Status(ctx)
if err != nil {
logger.Error("unable to query Nobles latest height", "err", err)
logger.Error("Unable to query Nobles latest height", "err", err)
continue
}
n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight))
Expand Down
1 change: 0 additions & 1 deletion types/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type Chain interface {
TrackLatestBlockHeight(
ctx context.Context,
logger log.Logger,
loop time.Duration,
)

WalletBalanceMetric(
Expand Down

0 comments on commit ef0904b

Please sign in to comment.