From 35288d57041b99ad80119bd2eca13564bcfdccf6 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 26 Jan 2024 13:14:43 -0700 Subject: [PATCH] Cleanup mutexes --- ethereum/broadcast.go | 203 +++++++++++++++++++++++++ ethereum/chain.go | 292 ----------------------------------- ethereum/listener.go | 131 ++++++++++++++++ noble/broadcast.go | 223 +++++++++++++++++++++++++++ noble/chain.go | 344 ------------------------------------------ noble/listener.go | 127 ++++++++++++++++ 6 files changed, 684 insertions(+), 636 deletions(-) create mode 100644 ethereum/broadcast.go create mode 100644 ethereum/listener.go create mode 100644 noble/broadcast.go create mode 100644 noble/listener.go diff --git a/ethereum/broadcast.go b/ethereum/broadcast.go new file mode 100644 index 0000000..acd54e9 --- /dev/null +++ b/ethereum/broadcast.go @@ -0,0 +1,203 @@ +package ethereum + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "math/big" + "regexp" + "strconv" + "time" + + "cosmossdk.io/log" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts" + "github.com/strangelove-ventures/noble-cctp-relayer/types" +) + +func (e *Ethereum) InitializeBroadcaster( + ctx context.Context, + logger log.Logger, + sequenceMap *types.SequenceMap, +) error { + nextNonce, err := GetEthereumAccountNonce(e.rpcURL, e.minterAddress) + if err != nil { + return fmt.Errorf("unable to retrieve evm account nonce: %w", err) + } + sequenceMap.Put(e.Domain(), uint64(nextNonce)) + + return nil +} + +func (e *Ethereum) Broadcast( + ctx context.Context, + logger log.Logger, + msgs []*types.MessageState, + sequenceMap *types.SequenceMap, +) error { + + // set up eth client + client, err := ethclient.Dial(e.rpcURL) + if err != nil { + return fmt.Errorf("unable to dial ethereum client: %w", err) + } + defer client.Close() + + backend := NewContractBackendWrapper(client) + + auth, err := bind.NewKeyedTransactorWithChainID(e.privateKey, big.NewInt(e.chainID)) + if err != nil { + return fmt.Errorf("unable to create auth: %w", err) + } + + messageTransmitter, err := contracts.NewMessageTransmitter(common.HexToAddress(e.messageTransmitterAddress), backend) + if err != nil { + return fmt.Errorf("unable to create message transmitter: %w", err) + } + + var broadcastErrors error +MsgLoop: + for _, msg := range msgs { + if msg.Status == types.Complete { + continue MsgLoop + } + + attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) + if err != nil { + return errors.New("unable to decode message attestation") + } + + for attempt := 0; attempt <= e.maxRetries; attempt++ { + if err := e.attemptBroadcast( + ctx, + logger, + msg, + sequenceMap, + auth, + messageTransmitter, + attestationBytes, + ); err == nil { + continue MsgLoop + } + + // if it's not the last attempt, retry + // TODO increase the destination.ethereum.broadcast retries (3-5) and retry interval (15s). By checking for used nonces, there is no gas cost for failed mints. + if attempt != e.maxRetries { + logger.Info(fmt.Sprintf("Retrying in %d seconds", e.retryIntervalSeconds)) + time.Sleep(time.Duration(e.retryIntervalSeconds) * time.Second) + } + } + // retried max times with failure + msg.Status = types.Failed + broadcastErrors = errors.Join(broadcastErrors, errors.New("reached max number of broadcast attempts")) + } + return broadcastErrors +} + +func (e *Ethereum) attemptBroadcast( + ctx context.Context, + logger log.Logger, + msg *types.MessageState, + sequenceMap *types.SequenceMap, + auth *bind.TransactOpts, + messageTransmitter *contracts.MessageTransmitter, + attestationBytes []byte, +) error { + logger.Info(fmt.Sprintf( + "Broadcasting message from %d to %d: with source tx hash %s", + msg.SourceDomain, + msg.DestDomain, + msg.SourceTxHash)) + + nonce := sequenceMap.Next(e.domain) + auth.Nonce = big.NewInt(int64(nonce)) + + e.mu.Lock() + defer e.mu.Unlock() + + // TODO remove + nextNonce, err := GetEthereumAccountNonce(e.rpcURL, e.minterAddress) + if err != nil { + logger.Error("unable to retrieve account number") + } else { + auth.Nonce = big.NewInt(nextNonce) + } + // TODO end remove + + // check if nonce already used + co := &bind.CallOpts{ + Pending: true, + Context: ctx, + } + + logger.Debug("Checking if nonce was used for broadcast to Ethereum", "source_domain", msg.SourceDomain, "nonce", msg.Nonce) + + key := append( + common.LeftPadBytes((big.NewInt(int64(msg.SourceDomain))).Bytes(), 4), + common.LeftPadBytes((big.NewInt(int64(msg.Nonce))).Bytes(), 8)..., + ) + + response, nonceErr := messageTransmitter.UsedNonces(co, [32]byte(crypto.Keccak256(key))) + if nonceErr != nil { + logger.Debug("Error querying whether nonce was used. Continuing...") + } else { + fmt.Printf("received used nonce response: %d\n", response) + if response.Uint64() == uint64(1) { + // nonce has already been used, mark as complete + logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d", + msg.SourceDomain, msg.Nonce)) + msg.Status = types.Complete + return nil + } + } + + // broadcast txn + tx, err := messageTransmitter.ReceiveMessage( + auth, + msg.MsgSentBytes, + attestationBytes, + ) + if err == nil { + msg.Status = types.Complete + + fullLog, err := tx.MarshalJSON() + if err != nil { + logger.Error("error marshalling eth tx log", err) + } + + msg.DestTxHash = tx.Hash().Hex() + + logger.Info(fmt.Sprintf("Successfully broadcast %s to Ethereum. Tx hash: %s, FULL LOG: %s", msg.SourceTxHash, msg.DestTxHash, string(fullLog))) + + return nil + } + + logger.Error(fmt.Sprintf("error during broadcast: %s", err.Error())) + if parsedErr, ok := err.(JsonError); ok { + if parsedErr.ErrorCode() == 3 && parsedErr.Error() == "execution reverted: Nonce already used" { + msg.Status = types.Complete + logger.Error(fmt.Sprintf("This account nonce has already been used: %d", nonce)) + + return nil + } + + match, _ := regexp.MatchString("nonce too low: next nonce [0-9]+, tx nonce [0-9]+", parsedErr.Error()) + if match { + numberRegex := regexp.MustCompile("[0-9]+") + nextNonce, err := strconv.ParseInt(numberRegex.FindAllString(parsedErr.Error(), 1)[0], 10, 0) + if err != nil { + nextNonce, err = GetEthereumAccountNonce(e.rpcURL, e.minterAddress) + if err != nil { + logger.Error("unable to retrieve account number") + } + } + sequenceMap.Put(e.domain, uint64(nextNonce)) + } + } + + return err +} diff --git a/ethereum/chain.go b/ethereum/chain.go index 749006f..74aa4a7 100644 --- a/ethereum/chain.go +++ b/ethereum/chain.go @@ -2,29 +2,12 @@ package ethereum import ( "bytes" - "context" "crypto/ecdsa" "embed" "encoding/hex" - "errors" - "fmt" - "math/big" - "os" - "regexp" - "strconv" "strings" "sync" - "time" - "cosmossdk.io/log" - ethereum "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/pascaldekloe/etherstream" - "github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts" "github.com/strangelove-ventures/noble-cctp-relayer/types" ) @@ -103,278 +86,3 @@ func (e *Ethereum) IsDestinationCaller(destinationCaller []byte) bool { return bytes.Equal(destinationCaller, zeroByteArr) || bytes.Equal(destinationCaller, decodedMinterPadded) } - -func (e *Ethereum) InitializeBroadcaster( - ctx context.Context, - logger log.Logger, - sequenceMap *types.SequenceMap, -) error { - nextNonce, err := GetEthereumAccountNonce(e.rpcURL, e.minterAddress) - if err != nil { - return fmt.Errorf("unable to retrieve evm account nonce: %w", err) - } - sequenceMap.Put(e.Domain(), uint64(nextNonce)) - - return nil -} - -func (e *Ethereum) StartListener( - ctx context.Context, - logger log.Logger, - processingQueue chan *types.TxState, -) { - logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain) - - // set up client - messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json") - if err != nil { - logger.Error("unable to read MessageTransmitter abi", "err", err) - os.Exit(1) - } - messageTransmitterABI, err := abi.JSON(bytes.NewReader(messageTransmitter)) - if err != nil { - logger.Error("unable to parse MessageTransmitter abi", "err", err) - } - - messageSent := messageTransmitterABI.Events["MessageSent"] - - ethClient, err := ethclient.DialContext(ctx, e.wsURL) - if err != nil { - logger.Error("unable to initialize ethereum client", "err", err) - os.Exit(1) - } - - // defer ethClient.Close() - - messageTransmitterAddress := common.HexToAddress(e.messageTransmitterAddress) - etherReader := etherstream.Reader{Backend: ethClient} - - if e.startBlock == 0 { - header, err := ethClient.HeaderByNumber(ctx, nil) - if err != nil { - logger.Error("unable to retrieve latest eth block header", "err", err) - os.Exit(1) - } - - e.startBlock = header.Number.Uint64() - } - - query := ethereum.FilterQuery{ - Addresses: []common.Address{messageTransmitterAddress}, - Topics: [][]common.Hash{{messageSent.ID}}, - FromBlock: big.NewInt(int64(e.startBlock - e.lookbackPeriod)), - } - - logger.Info(fmt.Sprintf( - "Starting Ethereum listener at block %d looking back %d blocks", - e.startBlock, - e.lookbackPeriod)) - - // websockets do not query history - // https://github.com/ethereum/go-ethereum/issues/15063 - stream, sub, history, err := etherReader.QueryWithHistory(ctx, &query) - if err != nil { - logger.Error("unable to subscribe to logs", "err", err) - os.Exit(1) - } - - // process history - for _, historicalLog := range history { - parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &historicalLog) - if err != nil { - logger.Error("Unable to parse history log into MessageState, skipping", "err", err) - continue - } - logger.Info(fmt.Sprintf("New historical msg from source domain %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) - - processingQueue <- &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} - - // It might help to wait a small amount of time between sending messages into the processing queue - // so that account sequences / nonces are set correctly - // time.Sleep(10 * time.Millisecond) - } - - // consume stream - go func() { - var txState *types.TxState - for { - select { - case <-ctx.Done(): - ethClient.Close() - return - case err := <-sub.Err(): - logger.Error("connection closed", "err", err) - ethClient.Close() - os.Exit(1) - case streamLog := <-stream: - parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog) - if err != nil { - logger.Error("Unable to parse ws log into MessageState, skipping") - continue - } - logger.Info(fmt.Sprintf("New stream msg from %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) - if txState == nil { - txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} - } else if parsedMsg.SourceTxHash != txState.TxHash { - processingQueue <- txState - txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} - } else { - txState.Msgs = append(txState.Msgs, parsedMsg) - - } - default: - if txState != nil { - processingQueue <- txState - txState = nil - } - } - } - }() -} - -func (e *Ethereum) Broadcast( - ctx context.Context, - logger log.Logger, - msgs []*types.MessageState, - sequenceMap *types.SequenceMap, -) error { - - // set up eth client - client, err := ethclient.Dial(e.rpcURL) - if err != nil { - return fmt.Errorf("unable to dial ethereum client: %w", err) - } - defer client.Close() - - backend := NewContractBackendWrapper(client) - - auth, err := bind.NewKeyedTransactorWithChainID(e.privateKey, big.NewInt(e.chainID)) - if err != nil { - return fmt.Errorf("unable to create auth: %w", err) - } - - messageTransmitter, err := contracts.NewMessageTransmitter(common.HexToAddress(e.messageTransmitterAddress), backend) - if err != nil { - return fmt.Errorf("unable to create message transmitter: %w", err) - } - - var broadcastErrors error -MsgLoop: - for _, msg := range msgs { - - if msg.Status == types.Complete { - continue MsgLoop - } - - attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) - if err != nil { - return errors.New("unable to decode message attestation") - } - - for attempt := 0; attempt <= e.maxRetries; attempt++ { - logger.Info(fmt.Sprintf( - "Broadcasting message from %d to %d: with source tx hash %s", - msg.SourceDomain, - msg.DestDomain, - msg.SourceTxHash)) - - nonce := sequenceMap.Next(e.domain) - auth.Nonce = big.NewInt(int64(nonce)) - - e.mu.Lock() - - // TODO remove - nextNonce, err := GetEthereumAccountNonce(e.rpcURL, e.minterAddress) - if err != nil { - logger.Error("unable to retrieve account number") - } else { - auth.Nonce = big.NewInt(nextNonce) - } - // TODO end remove - - // check if nonce already used - co := &bind.CallOpts{ - Pending: true, - Context: ctx, - } - - logger.Debug("Checking if nonce was used for broadcast to Ethereum", "source_domain", msg.SourceDomain, "nonce", msg.Nonce) - - key := append( - common.LeftPadBytes((big.NewInt(int64(msg.SourceDomain))).Bytes(), 4), - common.LeftPadBytes((big.NewInt(int64(msg.Nonce))).Bytes(), 8)..., - ) - - response, nonceErr := messageTransmitter.UsedNonces(co, [32]byte(crypto.Keccak256(key))) - if nonceErr != nil { - logger.Debug("Error querying whether nonce was used. Continuing...") - } else { - fmt.Printf("received used nonce response: %d\n", response) - if response.Uint64() == uint64(1) { - // nonce has already been used, mark as complete - logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d", - msg.SourceDomain, msg.Nonce)) - msg.Status = types.Complete - e.mu.Unlock() - continue MsgLoop - } - } - - // broadcast txn - tx, err := messageTransmitter.ReceiveMessage( - auth, - msg.MsgSentBytes, - attestationBytes, - ) - if err == nil { - msg.Status = types.Complete - - fullLog, err := tx.MarshalJSON() - if err != nil { - logger.Error("error marshalling eth tx log", err) - } - - msg.DestTxHash = tx.Hash().Hex() - - logger.Info(fmt.Sprintf("Successfully broadcast %s to Ethereum. Tx hash: %s, FULL LOG: %s", msg.SourceTxHash, msg.DestTxHash, string(fullLog))) - e.mu.Unlock() - continue MsgLoop - } - - logger.Error(fmt.Sprintf("error during broadcast: %s", err.Error())) - if parsedErr, ok := err.(JsonError); ok { - if parsedErr.ErrorCode() == 3 && parsedErr.Error() == "execution reverted: Nonce already used" { - msg.Status = types.Complete - logger.Error(fmt.Sprintf("This account nonce has already been used: %d", nonce)) - e.mu.Unlock() - continue MsgLoop - } - - match, _ := regexp.MatchString("nonce too low: next nonce [0-9]+, tx nonce [0-9]+", parsedErr.Error()) - if match { - numberRegex := regexp.MustCompile("[0-9]+") - nextNonce, err := strconv.ParseInt(numberRegex.FindAllString(parsedErr.Error(), 1)[0], 10, 0) - if err != nil { - nextNonce, err = GetEthereumAccountNonce(e.rpcURL, e.minterAddress) - if err != nil { - logger.Error("unable to retrieve account number") - } - } - sequenceMap.Put(e.domain, uint64(nextNonce)) - } - } - e.mu.Unlock() - - // if it's not the last attempt, retry - // TODO increase the destination.ethereum.broadcast retries (3-5) and retry interval (15s). By checking for used nonces, there is no gas cost for failed mints. - if attempt != e.maxRetries { - logger.Info(fmt.Sprintf("Retrying in %d seconds", e.retryIntervalSeconds)) - time.Sleep(time.Duration(e.retryIntervalSeconds) * time.Second) - } - } - // retried max times with failure - msg.Status = types.Failed - broadcastErrors = errors.Join(broadcastErrors, errors.New("reached max number of broadcast attempts")) - } - return broadcastErrors -} diff --git a/ethereum/listener.go b/ethereum/listener.go new file mode 100644 index 0000000..d604d37 --- /dev/null +++ b/ethereum/listener.go @@ -0,0 +1,131 @@ +package ethereum + +import ( + "bytes" + "context" + "fmt" + "math/big" + "os" + + "cosmossdk.io/log" + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/pascaldekloe/etherstream" + "github.com/strangelove-ventures/noble-cctp-relayer/types" +) + +func (e *Ethereum) StartListener( + ctx context.Context, + logger log.Logger, + processingQueue chan *types.TxState, +) { + logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain) + + // set up client + messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json") + if err != nil { + logger.Error("unable to read MessageTransmitter abi", "err", err) + os.Exit(1) + } + messageTransmitterABI, err := abi.JSON(bytes.NewReader(messageTransmitter)) + if err != nil { + logger.Error("unable to parse MessageTransmitter abi", "err", err) + } + + messageSent := messageTransmitterABI.Events["MessageSent"] + + ethClient, err := ethclient.DialContext(ctx, e.wsURL) + if err != nil { + logger.Error("unable to initialize ethereum client", "err", err) + os.Exit(1) + } + + // defer ethClient.Close() + + messageTransmitterAddress := common.HexToAddress(e.messageTransmitterAddress) + etherReader := etherstream.Reader{Backend: ethClient} + + if e.startBlock == 0 { + header, err := ethClient.HeaderByNumber(ctx, nil) + if err != nil { + logger.Error("unable to retrieve latest eth block header", "err", err) + os.Exit(1) + } + + e.startBlock = header.Number.Uint64() + } + + query := ethereum.FilterQuery{ + Addresses: []common.Address{messageTransmitterAddress}, + Topics: [][]common.Hash{{messageSent.ID}}, + FromBlock: big.NewInt(int64(e.startBlock - e.lookbackPeriod)), + } + + logger.Info(fmt.Sprintf( + "Starting Ethereum listener at block %d looking back %d blocks", + e.startBlock, + e.lookbackPeriod)) + + // websockets do not query history + // https://github.com/ethereum/go-ethereum/issues/15063 + stream, sub, history, err := etherReader.QueryWithHistory(ctx, &query) + if err != nil { + logger.Error("unable to subscribe to logs", "err", err) + os.Exit(1) + } + + // process history + for _, historicalLog := range history { + parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &historicalLog) + if err != nil { + logger.Error("Unable to parse history log into MessageState, skipping", "err", err) + continue + } + logger.Info(fmt.Sprintf("New historical msg from source domain %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) + + processingQueue <- &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} + + // It might help to wait a small amount of time between sending messages into the processing queue + // so that account sequences / nonces are set correctly + // time.Sleep(10 * time.Millisecond) + } + + // consume stream + go func() { + var txState *types.TxState + for { + select { + case <-ctx.Done(): + ethClient.Close() + return + case err := <-sub.Err(): + logger.Error("connection closed", "err", err) + ethClient.Close() + os.Exit(1) + case streamLog := <-stream: + parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog) + if err != nil { + logger.Error("Unable to parse ws log into MessageState, skipping") + continue + } + logger.Info(fmt.Sprintf("New stream msg from %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) + if txState == nil { + txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} + } else if parsedMsg.SourceTxHash != txState.TxHash { + processingQueue <- txState + txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} + } else { + txState.Msgs = append(txState.Msgs, parsedMsg) + + } + default: + if txState != nil { + processingQueue <- txState + txState = nil + } + } + } + }() +} diff --git a/noble/broadcast.go b/noble/broadcast.go new file mode 100644 index 0000000..0f216cc --- /dev/null +++ b/noble/broadcast.go @@ -0,0 +1,223 @@ +package noble + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "regexp" + "strconv" + "time" + + "cosmossdk.io/log" + nobletypes "github.com/circlefin/noble-cctp/x/cctp/types" + sdkclient "github.com/cosmos/cosmos-sdk/client" + clientTx "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/tx/signing" + xauthsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" + xauthtx "github.com/cosmos/cosmos-sdk/x/auth/tx" + "github.com/strangelove-ventures/noble-cctp-relayer/types" +) + +var ( + regexAccountSequenceMismatchErr = regexp.MustCompile(`expected (\d+), got (\d+)`) +) + +func (n *Noble) InitializeBroadcaster( + ctx context.Context, + logger log.Logger, + sequenceMap *types.SequenceMap, +) error { + accountNumber, accountSequence, err := n.AccountInfo(ctx) + if err != nil { + return fmt.Errorf("unable to get account info for noble: %w", err) + } + + n.accountNumber = accountNumber + sequenceMap.Put(n.Domain(), accountSequence) + + return nil +} + +func (n *Noble) Broadcast( + ctx context.Context, + logger log.Logger, + msgs []*types.MessageState, + sequenceMap *types.SequenceMap, +) error { + // set up sdk context + interfaceRegistry := codectypes.NewInterfaceRegistry() + nobletypes.RegisterInterfaces(interfaceRegistry) + cdc := codec.NewProtoCodec(interfaceRegistry) + sdkContext := sdkclient.Context{ + TxConfig: xauthtx.NewTxConfig(cdc, xauthtx.DefaultSignModes), + } + + // build txn + txBuilder := sdkContext.TxConfig.NewTxBuilder() + + // sign and broadcast txn + for attempt := 0; attempt <= n.maxRetries; attempt++ { + if err := n.attemptBroadcast(ctx, logger, msgs, sequenceMap, sdkContext, txBuilder); err == nil { + return nil + } + + // Log retry information + logger.Info(fmt.Sprintf("Retrying in %d seconds", n.retryIntervalSeconds)) + time.Sleep(time.Duration(n.retryIntervalSeconds) * time.Second) + } + + for _, msg := range msgs { + if msg.Status != types.Complete { + msg.Status = types.Failed + } + } + + return errors.New("reached max number of broadcast attempts") +} + +func (n *Noble) attemptBroadcast( + ctx context.Context, + logger log.Logger, + msgs []*types.MessageState, + sequenceMap *types.SequenceMap, + sdkContext sdkclient.Context, + txBuilder sdkclient.TxBuilder, +) error { + + var receiveMsgs []sdk.Msg + for _, msg := range msgs { + + used, err := n.cc.QueryUsedNonce(ctx, types.Domain(msg.SourceDomain), msg.Nonce) + if err != nil { + return fmt.Errorf("unable to query used nonce: %w", err) + } + + if used { + msg.Status = types.Complete + logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used", msg.Nonce)) + continue + } + + attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) + if err != nil { + return fmt.Errorf("unable to decode message attestation") + } + + receiveMsgs = append(receiveMsgs, nobletypes.NewMsgReceiveMessage( + n.minterAddress, + msg.MsgSentBytes, + attestationBytes, + )) + + logger.Info(fmt.Sprintf( + "Broadcasting message from %d to %d: with source tx hash %s", + msg.SourceDomain, + msg.DestDomain, + msg.SourceTxHash)) + } + + if err := txBuilder.SetMsgs(receiveMsgs...); err != nil { + return fmt.Errorf("failed to set messages on tx: %w", err) + } + + txBuilder.SetGasLimit(n.gasLimit) + + txBuilder.SetMemo(n.txMemo) + + n.mu.Lock() + defer n.mu.Unlock() + + accountSequence := sequenceMap.Next(n.Domain()) + + sigV2 := signing.SignatureV2{ + PubKey: n.privateKey.PubKey(), + Data: &signing.SingleSignatureData{ + SignMode: sdkContext.TxConfig.SignModeHandler().DefaultMode(), + Signature: nil, + }, + Sequence: uint64(accountSequence), + } + + signerData := xauthsigning.SignerData{ + ChainID: n.chainID, + AccountNumber: uint64(n.accountNumber), + Sequence: uint64(accountSequence), + } + + txBuilder.SetSignatures(sigV2) + + sigV2, err := clientTx.SignWithPrivKey( + sdkContext.TxConfig.SignModeHandler().DefaultMode(), + signerData, + txBuilder, + n.privateKey, + sdkContext.TxConfig, + uint64(accountSequence), + ) + if err != nil { + + return fmt.Errorf("failed to sign tx: %w", err) + } + + if err := txBuilder.SetSignatures(sigV2); err != nil { + + return fmt.Errorf("failed to set signatures: %w", err) + } + + // Generated Protobuf-encoded bytes. + txBytes, err := sdkContext.TxConfig.TxEncoder()(txBuilder.GetTx()) + if err != nil { + + return fmt.Errorf("failed to proto encode tx: %w", err) + } + + rpcResponse, err := n.cc.RPCClient.BroadcastTxSync(ctx, txBytes) + if err != nil { + return err + } + + if rpcResponse.Code == 32 { + newAccountSequence := n.extractAccountSequence(ctx, logger, rpcResponse.Log) + logger.Debug(fmt.Sprintf("retrying with new account sequence: %d", newAccountSequence)) + sequenceMap.Put(n.Domain(), newAccountSequence) + } + + if rpcResponse.Code != 0 { + return fmt.Errorf("received non-zero: %d - %s", rpcResponse.Code, rpcResponse.Log) + } + + // Tx was successfully broadcast + for _, msg := range msgs { + msg.DestTxHash = rpcResponse.Hash.String() + msg.Status = types.Complete + } + + logger.Info(fmt.Sprintf("Successfully broadcast %s to Noble. Tx hash: %s", msgs[0].SourceTxHash, msgs[0].DestTxHash)) + + return nil +} + +// extractAccountSequence attempts to extract the account sequence number from the RPC response logs when +// account sequence mismatch errors are encountered. If the account sequence number cannot be extracted from the logs, +// it is retrieved by making a request to the API endpoint. +func (n *Noble) extractAccountSequence(ctx context.Context, logger log.Logger, rpcResponseLog string) uint64 { + match := regexAccountSequenceMismatchErr.FindStringSubmatch(rpcResponseLog) + + if len(match) == 3 { + // Extract the numbers from the match. + newAccountSequence, _ := strconv.ParseUint(match[1], 10, 64) + return newAccountSequence + } + + // Otherwise, just request the account sequence + _, newAccountSequence, err := n.AccountInfo(ctx) + if err != nil { + logger.Error("unable to retrieve account sequence") + } + + return newAccountSequence +} diff --git a/noble/chain.go b/noble/chain.go index 6fcf883..fd99374 100644 --- a/noble/chain.go +++ b/noble/chain.go @@ -6,25 +6,11 @@ import ( "encoding/hex" "errors" "fmt" - "math/rand" - "regexp" - "strconv" "sync" - "time" - "cosmossdk.io/log" - nobletypes "github.com/circlefin/noble-cctp/x/cctp/types" - ctypes "github.com/cometbft/cometbft/rpc/core/types" - sdkClient "github.com/cosmos/cosmos-sdk/client" - clientTx "github.com/cosmos/cosmos-sdk/client/tx" - "github.com/cosmos/cosmos-sdk/codec" - codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/bech32" - "github.com/cosmos/cosmos-sdk/types/tx/signing" - xauthsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" - xauthtx "github.com/cosmos/cosmos-sdk/x/auth/tx" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" "github.com/strangelove-ventures/noble-cctp-relayer/cosmos" "github.com/strangelove-ventures/noble-cctp-relayer/types" @@ -144,333 +130,3 @@ func decodeDestinationCaller(input []byte) (string, error) { } return output, nil } - -func (n *Noble) InitializeBroadcaster( - ctx context.Context, - logger log.Logger, - sequenceMap *types.SequenceMap, -) error { - accountNumber, accountSequence, err := n.AccountInfo(ctx) - if err != nil { - return fmt.Errorf("unable to get account info for noble: %w", err) - } - - n.accountNumber = accountNumber - sequenceMap.Put(n.Domain(), accountSequence) - - return nil -} - -func (n *Noble) StartListener( - ctx context.Context, - logger log.Logger, - processingQueue chan *types.TxState, -) { - logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain()) - - if n.startBlock == 0 { - // get the latest block - chainTip, err := n.chainTip(ctx) - if err != nil { - panic(fmt.Errorf("unable to get chain tip for noble: %w", err)) - } - n.startBlock = chainTip - } - - logger.Info(fmt.Sprintf("Starting Noble listener at block %d looking back %d blocks", - n.startBlock, - n.lookbackPeriod)) - - accountNumber, _, err := n.AccountInfo(ctx) - if err != nil { - panic(fmt.Errorf("unable to get account info for noble: %w", err)) - } - - n.accountNumber = accountNumber - - // enqueue block heights - currentBlock := n.startBlock - lookback := n.lookbackPeriod - chainTip, err := n.chainTip(ctx) - blockQueue := make(chan uint64, 1000000) - - // history - currentBlock = currentBlock - lookback - for currentBlock <= chainTip { - blockQueue <- currentBlock - currentBlock++ - } - - // listen for new blocks - go func() { - first := make(chan struct{}, 1) - first <- struct{}{} - for { - timer := time.NewTimer(6 * time.Second) - select { - case <-first: - timer.Stop() - chainTip, err = n.chainTip(ctx) - if err == nil { - if chainTip >= currentBlock { - for i := currentBlock; i <= chainTip; i++ { - blockQueue <- i - } - currentBlock = chainTip + 1 - } - } - case <-timer.C: - chainTip, err = n.chainTip(ctx) - if err == nil { - if chainTip >= currentBlock { - for i := currentBlock; i <= chainTip; i++ { - blockQueue <- i - } - currentBlock = chainTip + 1 - } - } - case <-ctx.Done(): - timer.Stop() - return - } - } - }() - - // constantly query for blocks - for i := 0; i < int(n.workers); i++ { - go func() { - for { - select { - case <-ctx.Done(): - return - default: - block := <-blockQueue - res, err := n.cc.RPCClient.TxSearch(ctx, fmt.Sprintf("tx.height=%d", block), false, nil, nil, "") - if err != nil { - logger.Debug(fmt.Sprintf("unable to query Noble block %d", block)) - blockQueue <- block - } - - for _, tx := range res.Txs { - parsedMsgs, err := txToMessageState(tx) - if err != nil { - logger.Error("unable to parse Noble log to message state", "err", err.Error()) - continue - } - for _, parsedMsg := range parsedMsgs { - logger.Info(fmt.Sprintf("New stream msg with nonce %d from %d with tx hash %s", parsedMsg.Nonce, parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) - } - processingQueue <- &types.TxState{TxHash: tx.Hash.String(), Msgs: parsedMsgs} - } - } - } - }() - } - - <-ctx.Done() -} - -func (n *Noble) chainTip(ctx context.Context) (uint64, error) { - res, err := n.cc.RPCClient.Status(ctx) - if err != nil { - return 0, fmt.Errorf("unable to query status for noble: %w", err) - } - return uint64(res.SyncInfo.LatestBlockHeight), nil -} - -func (n *Noble) Broadcast( - ctx context.Context, - logger log.Logger, - msgs []*types.MessageState, - sequenceMap *types.SequenceMap, -) error { - // set up sdk context - interfaceRegistry := codectypes.NewInterfaceRegistry() - nobletypes.RegisterInterfaces(interfaceRegistry) - cdc := codec.NewProtoCodec(interfaceRegistry) - sdkContext := sdkClient.Context{ - TxConfig: xauthtx.NewTxConfig(cdc, xauthtx.DefaultSignModes), - } - - // build txn - txBuilder := sdkContext.TxConfig.NewTxBuilder() - - // sign and broadcast txn - for attempt := 0; attempt <= n.maxRetries; attempt++ { - - //TODO: MOVE EVERYTHING IN FOR LOOP TO FUNCTION. Same for ETH. - // see todo below. - - var receiveMsgs []sdk.Msg - for _, msg := range msgs { - - used, err := n.cc.QueryUsedNonce(ctx, types.Domain(msg.SourceDomain), msg.Nonce) - if err != nil { - return fmt.Errorf("unable to query used nonce: %w", err) - } - - if used { - msg.Status = types.Complete - logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used", msg.Nonce)) - continue - } - - attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) - if err != nil { - return fmt.Errorf("unable to decode message attestation") - } - - receiveMsgs = append(receiveMsgs, nobletypes.NewMsgReceiveMessage( - n.minterAddress, - msg.MsgSentBytes, - attestationBytes, - )) - - logger.Info(fmt.Sprintf( - "Broadcasting message from %d to %d: with source tx hash %s", - msg.SourceDomain, - msg.DestDomain, - msg.SourceTxHash)) - } - - if err := txBuilder.SetMsgs(receiveMsgs...); err != nil { - return fmt.Errorf("failed to set messages on tx: %w", err) - } - - txBuilder.SetGasLimit(n.gasLimit) - - txBuilder.SetMemo(n.txMemo) - - n.mu.Lock() - // TODO: uncomment this & remove all remainin n.mu.Unlock() 's after moving loop body to its own function - // defer n.mu.Unlock() - - accountSequence := sequenceMap.Next(n.Domain()) - - sigV2 := signing.SignatureV2{ - PubKey: n.privateKey.PubKey(), - Data: &signing.SingleSignatureData{ - SignMode: sdkContext.TxConfig.SignModeHandler().DefaultMode(), - Signature: nil, - }, - Sequence: uint64(accountSequence), - } - - signerData := xauthsigning.SignerData{ - ChainID: n.chainID, - AccountNumber: uint64(n.accountNumber), - Sequence: uint64(accountSequence), - } - - txBuilder.SetSignatures(sigV2) - - sigV2, err := clientTx.SignWithPrivKey( - sdkContext.TxConfig.SignModeHandler().DefaultMode(), - signerData, - txBuilder, - n.privateKey, - sdkContext.TxConfig, - uint64(accountSequence), - ) - if err != nil { - n.mu.Unlock() - return fmt.Errorf("failed to sign tx: %w", err) - } - - if err := txBuilder.SetSignatures(sigV2); err != nil { - n.mu.Unlock() - return fmt.Errorf("failed to set signatures: %w", err) - } - - // Generated Protobuf-encoded bytes. - txBytes, err := sdkContext.TxConfig.TxEncoder()(txBuilder.GetTx()) - if err != nil { - n.mu.Unlock() - return fmt.Errorf("failed to proto encode tx: %w", err) - } - - rpcResponse, err := n.cc.RPCClient.BroadcastTxSync(ctx, txBytes) - if err != nil || (rpcResponse != nil && rpcResponse.Code != 0) { - // Log the error - logger.Error(fmt.Sprintf("error during broadcast: %s", getErrorString(err, rpcResponse))) - - if err != nil || rpcResponse == nil { - // Log retry information - logger.Info(fmt.Sprintf("Retrying in %d seconds", n.retryIntervalSeconds)) - time.Sleep(time.Duration(n.retryIntervalSeconds) * time.Second) - // wait a random amount of time to lower probability of concurrent message nonce collision - time.Sleep(time.Duration(rand.Intn(5)) * time.Second) - n.mu.Unlock() - continue - } - - // Log details for non-zero response code - logger.Error(fmt.Sprintf("received non-zero: %d - %s", rpcResponse.Code, rpcResponse.Log)) - - // Handle specific error code (32) - if rpcResponse.Code == 32 { - newAccountSequence := n.extractAccountSequence(ctx, logger, rpcResponse.Log) - logger.Debug(fmt.Sprintf("retrying with new account sequence: %d", newAccountSequence)) - sequenceMap.Put(n.Domain(), newAccountSequence) - } - - // Log retry information - logger.Info(fmt.Sprintf("Retrying in %d seconds", n.retryIntervalSeconds)) - time.Sleep(time.Duration(n.retryIntervalSeconds) * time.Second) - // wait a random amount of time to lower probability of concurrent message nonce collision - time.Sleep(time.Duration(rand.Intn(5)) * time.Second) - n.mu.Unlock() - continue - } - - n.mu.Unlock() - - // Tx was successfully broadcast - for _, msg := range msgs { - msg.DestTxHash = rpcResponse.Hash.String() - msg.Status = types.Complete - } - logger.Info(fmt.Sprintf("Successfully broadcast %s to Noble. Tx hash: %s", msgs[0].SourceTxHash, msgs[0].DestTxHash)) - - return nil - } - - for _, msg := range msgs { - if msg.Status != types.Complete { - msg.Status = types.Failed - } - } - - return errors.New("reached max number of broadcast attempts") -} - -// getErrorString returns the appropriate value to log when tx broadcast errors are encountered. -func getErrorString(err error, rpcResponse *ctypes.ResultBroadcastTx) string { - if rpcResponse != nil { - return rpcResponse.Log - } - return err.Error() -} - -// extractAccountSequence attempts to extract the account sequence number from the RPC response logs when -// account sequence mismatch errors are encountered. If the account sequence number cannot be extracted from the logs, -// it is retrieved by making a request to the API endpoint. -func (n *Noble) extractAccountSequence(ctx context.Context, logger log.Logger, rpcResponseLog string) uint64 { - pattern := `expected (\d+), got (\d+)` - re := regexp.MustCompile(pattern) - match := re.FindStringSubmatch(rpcResponseLog) - - if len(match) == 3 { - // Extract the numbers from the match. - newAccountSequence, _ := strconv.ParseUint(match[1], 10, 64) - return newAccountSequence - } - - // Otherwise, just request the account sequence - _, newAccountSequence, err := n.AccountInfo(ctx) - if err != nil { - logger.Error("unable to retrieve account sequence") - } - - return newAccountSequence -} diff --git a/noble/listener.go b/noble/listener.go new file mode 100644 index 0000000..645e42d --- /dev/null +++ b/noble/listener.go @@ -0,0 +1,127 @@ +package noble + +import ( + "context" + "fmt" + "time" + + "cosmossdk.io/log" + "github.com/strangelove-ventures/noble-cctp-relayer/types" +) + +func (n *Noble) StartListener( + ctx context.Context, + logger log.Logger, + processingQueue chan *types.TxState, +) { + logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain()) + + if n.startBlock == 0 { + // get the latest block + chainTip, err := n.chainTip(ctx) + if err != nil { + panic(fmt.Errorf("unable to get chain tip for noble: %w", err)) + } + n.startBlock = chainTip + } + + logger.Info(fmt.Sprintf("Starting Noble listener at block %d looking back %d blocks", + n.startBlock, + n.lookbackPeriod)) + + accountNumber, _, err := n.AccountInfo(ctx) + if err != nil { + panic(fmt.Errorf("unable to get account info for noble: %w", err)) + } + + n.accountNumber = accountNumber + + // enqueue block heights + currentBlock := n.startBlock + lookback := n.lookbackPeriod + chainTip, err := n.chainTip(ctx) + blockQueue := make(chan uint64, 1000000) + + // history + currentBlock = currentBlock - lookback + for currentBlock <= chainTip { + blockQueue <- currentBlock + currentBlock++ + } + + // listen for new blocks + go func() { + first := make(chan struct{}, 1) + first <- struct{}{} + for { + timer := time.NewTimer(6 * time.Second) + select { + case <-first: + timer.Stop() + chainTip, err = n.chainTip(ctx) + if err == nil { + if chainTip >= currentBlock { + for i := currentBlock; i <= chainTip; i++ { + blockQueue <- i + } + currentBlock = chainTip + 1 + } + } + case <-timer.C: + chainTip, err = n.chainTip(ctx) + if err == nil { + if chainTip >= currentBlock { + for i := currentBlock; i <= chainTip; i++ { + blockQueue <- i + } + currentBlock = chainTip + 1 + } + } + case <-ctx.Done(): + timer.Stop() + return + } + } + }() + + // constantly query for blocks + for i := 0; i < int(n.workers); i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + default: + block := <-blockQueue + res, err := n.cc.RPCClient.TxSearch(ctx, fmt.Sprintf("tx.height=%d", block), false, nil, nil, "") + if err != nil { + logger.Debug(fmt.Sprintf("unable to query Noble block %d", block)) + blockQueue <- block + } + + for _, tx := range res.Txs { + parsedMsgs, err := txToMessageState(tx) + if err != nil { + logger.Error("unable to parse Noble log to message state", "err", err.Error()) + continue + } + for _, parsedMsg := range parsedMsgs { + logger.Info(fmt.Sprintf("New stream msg with nonce %d from %d with tx hash %s", parsedMsg.Nonce, parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) + } + processingQueue <- &types.TxState{TxHash: tx.Hash.String(), Msgs: parsedMsgs} + } + } + } + }() + } + + <-ctx.Done() +} + +func (n *Noble) chainTip(ctx context.Context) (uint64, error) { + res, err := n.cc.RPCClient.Status(ctx) + if err != nil { + return 0, fmt.Errorf("unable to query status for noble: %w", err) + } + return uint64(res.SyncInfo.LatestBlockHeight), nil +}