Skip to content

Commit

Permalink
dont use package variables
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Mar 20, 2024
1 parent 056a74b commit a4b7bc4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 38 deletions.
73 changes: 36 additions & 37 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,46 @@ import (
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

var (
messageTransmitterABI abi.ABI
messageSent abi.Event
messageTransmitterAddress common.Address
processingQueue chan *types.TxState
flushInterval time.Duration
)

// errSignal allows broadcasting an error value to multiple receivers.
type errSignal struct {
Ready chan struct{}
}

// StartListener starts the ethereum websocket subscription, queries history pertaining to the lookback period,
// and starts the reoccurring flush
//
// If an error occurs in websocket stream, this function will handle relevant sub routines and then re-run itself.
func (e *Ethereum) StartListener(
ctx context.Context,
logger log.Logger,
processingQueue_ chan *types.TxState,
flushInterval_ time.Duration,
processingQueue chan *types.TxState,
flushInterval time.Duration,
) {
logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain)

processingQueue = processingQueue_
flushInterval = flushInterval_

messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json")
if err != nil {
logger.Error("Unable to read MessageTransmitter abi", "err", err)
os.Exit(1)
}
messageTransmitterABI, err = abi.JSON(bytes.NewReader(messageTransmitter))
messageTransmitterABI, err := abi.JSON(bytes.NewReader(messageTransmitter))
if err != nil {
logger.Error("Unable to parse MessageTransmitter abi", "err", err)
os.Exit(1)
}

messageSent = messageTransmitterABI.Events["MessageSent"]
messageTransmitterAddress = common.HexToAddress(e.messageTransmitterAddress)

e.startListenerRoutines(ctx, logger)
}

// startListenerRoutines starts the ethereum websocket subscription, queries history pertaining to the lookback period,
// and starts the reoccurring flush
//
// If an error occurs in websocket stream, this function will handle relevant sub routines and then re-run itself.
func (e *Ethereum) startListenerRoutines(
ctx context.Context,
logger log.Logger,
) {
messageSent := messageTransmitterABI.Events["MessageSent"]
messageTransmitterAddress := common.HexToAddress(e.messageTransmitterAddress)

sig := &errSignal{
Ready: make(chan struct{}),
}

// start main stream (does not account for lookback period or specific start block)
stream, sub, history := e.startMainStream(ctx, logger)
stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress)

go e.consumeStream(ctx, logger, stream, sig)
consumeHistory(logger, history)
go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig)
consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI)

// get history from (start block - lookback) up until latest block
latestBlock := e.LatestBlock()
Expand All @@ -87,12 +68,12 @@ func (e *Ethereum) startListenerRoutines(
}
startLookback := start - e.lookbackPeriod
logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, startLookback, latestBlock)
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock)

logger.Info("Finished getting history")

if flushInterval > 0 {
go e.flushMechanism(ctx, logger, sig)
go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushInterval, sig)
}

// listen for errors in the main websocket stream
Expand All @@ -108,15 +89,17 @@ func (e *Ethereum) startListenerRoutines(
// restart
e.startBlock = e.lastFlushedBlock
time.Sleep(10 * time.Millisecond)
e.startListenerRoutines(ctx, logger)
e.StartListener(ctx, logger, processingQueue, flushInterval)
return
}

}

func (e *Ethereum) startMainStream(
ctx context.Context,
logger log.Logger,
messageSent abi.Event,
messageTransmitterAddress common.Address,

) (stream <-chan ethtypes.Log, sub ethereum.Subscription, history []ethtypes.Log) {

var err error
Expand Down Expand Up @@ -154,6 +137,10 @@ func (e *Ethereum) startMainStream(
func (e *Ethereum) getAndConsumeHistory(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
messageSent abi.Event,
messageTransmitterAddress common.Address,
messageTransmitterABI abi.ABI,
start, end uint64) {

var toUnSub ethereum.Subscription
Expand Down Expand Up @@ -198,7 +185,7 @@ func (e *Ethereum) getAndConsumeHistory(
break
}
toUnSub.Unsubscribe()
consumeHistory(logger, history)
consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI)

start += chunkSize
chunk++
Expand All @@ -210,6 +197,9 @@ func (e *Ethereum) getAndConsumeHistory(
func consumeHistory(
logger log.Logger,
history []ethtypes.Log,
processingQueue chan *types.TxState,
messageSent abi.Event,
messageTransmitterABI abi.ABI,
) {
for _, historicalLog := range history {
parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &historicalLog)
Expand All @@ -228,8 +218,12 @@ func consumeHistory(
func (e *Ethereum) consumeStream(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
messageSent abi.Event,
messageTransmitterABI abi.ABI,
stream <-chan ethtypes.Log,
sig *errSignal,

) {
logger.Info("Starting consumption of incoming stream")
var txState *types.TxState
Expand Down Expand Up @@ -268,6 +262,11 @@ func (e *Ethereum) consumeStream(
func (e *Ethereum) flushMechanism(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
messageSent abi.Event,
messageTransmitterAddress common.Address,
messageTransmitterABI abi.ABI,
flushInterval time.Duration,
sig *errSignal,
) {
logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval))
Expand All @@ -286,7 +285,7 @@ func (e *Ethereum) flushMechanism(

logger.Info(fmt.Sprintf("Flush started from %d to %d", start, latestBlock))

e.getAndConsumeHistory(ctx, logger, start, latestBlock)
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, start, latestBlock)

e.lastFlushedBlock = latestBlock

Expand Down
2 changes: 1 addition & 1 deletion noble/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (n *Noble) Broadcast(
txBuilder := sdkContext.TxConfig.NewTxBuilder()

// sign and broadcast txn
for attempt := 0; attempt <= n.maxRetries; attempt++ {
for attempt := 1; attempt <= n.maxRetries; attempt++ {
err := n.attemptBroadcast(ctx, logger, msgs, sequenceMap, sdkContext, txBuilder)
if err == nil {
return nil
Expand Down

0 comments on commit a4b7bc4

Please sign in to comment.