Skip to content

Commit

Permalink
Merge pull request #39 from strangelove-ventures/andrew/mutex_cleanup
Browse files Browse the repository at this point in the history
Cleanup mutexes
  • Loading branch information
agouin authored Jan 26, 2024
2 parents 9ff253a + 35288d5 commit 4a61710
Show file tree
Hide file tree
Showing 6 changed files with 684 additions and 636 deletions.
203 changes: 203 additions & 0 deletions ethereum/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package ethereum

import (
"context"
"encoding/hex"
"errors"
"fmt"
"math/big"
"regexp"
"strconv"
"time"

"cosmossdk.io/log"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

func (e *Ethereum) InitializeBroadcaster(
ctx context.Context,
logger log.Logger,
sequenceMap *types.SequenceMap,
) error {
nextNonce, err := GetEthereumAccountNonce(e.rpcURL, e.minterAddress)
if err != nil {
return fmt.Errorf("unable to retrieve evm account nonce: %w", err)
}
sequenceMap.Put(e.Domain(), uint64(nextNonce))

return nil
}

func (e *Ethereum) Broadcast(
ctx context.Context,
logger log.Logger,
msgs []*types.MessageState,
sequenceMap *types.SequenceMap,
) error {

// set up eth client
client, err := ethclient.Dial(e.rpcURL)
if err != nil {
return fmt.Errorf("unable to dial ethereum client: %w", err)
}
defer client.Close()

backend := NewContractBackendWrapper(client)

auth, err := bind.NewKeyedTransactorWithChainID(e.privateKey, big.NewInt(e.chainID))
if err != nil {
return fmt.Errorf("unable to create auth: %w", err)
}

messageTransmitter, err := contracts.NewMessageTransmitter(common.HexToAddress(e.messageTransmitterAddress), backend)
if err != nil {
return fmt.Errorf("unable to create message transmitter: %w", err)
}

var broadcastErrors error
MsgLoop:
for _, msg := range msgs {
if msg.Status == types.Complete {
continue MsgLoop
}

attestationBytes, err := hex.DecodeString(msg.Attestation[2:])
if err != nil {
return errors.New("unable to decode message attestation")
}

for attempt := 0; attempt <= e.maxRetries; attempt++ {
if err := e.attemptBroadcast(
ctx,
logger,
msg,
sequenceMap,
auth,
messageTransmitter,
attestationBytes,
); err == nil {
continue MsgLoop
}

// if it's not the last attempt, retry
// TODO increase the destination.ethereum.broadcast retries (3-5) and retry interval (15s). By checking for used nonces, there is no gas cost for failed mints.
if attempt != e.maxRetries {
logger.Info(fmt.Sprintf("Retrying in %d seconds", e.retryIntervalSeconds))
time.Sleep(time.Duration(e.retryIntervalSeconds) * time.Second)
}
}
// retried max times with failure
msg.Status = types.Failed
broadcastErrors = errors.Join(broadcastErrors, errors.New("reached max number of broadcast attempts"))
}
return broadcastErrors
}

func (e *Ethereum) attemptBroadcast(
ctx context.Context,
logger log.Logger,
msg *types.MessageState,
sequenceMap *types.SequenceMap,
auth *bind.TransactOpts,
messageTransmitter *contracts.MessageTransmitter,
attestationBytes []byte,
) error {
logger.Info(fmt.Sprintf(
"Broadcasting message from %d to %d: with source tx hash %s",
msg.SourceDomain,
msg.DestDomain,
msg.SourceTxHash))

nonce := sequenceMap.Next(e.domain)
auth.Nonce = big.NewInt(int64(nonce))

e.mu.Lock()
defer e.mu.Unlock()

// TODO remove
nextNonce, err := GetEthereumAccountNonce(e.rpcURL, e.minterAddress)
if err != nil {
logger.Error("unable to retrieve account number")
} else {
auth.Nonce = big.NewInt(nextNonce)
}
// TODO end remove

// check if nonce already used
co := &bind.CallOpts{
Pending: true,
Context: ctx,
}

logger.Debug("Checking if nonce was used for broadcast to Ethereum", "source_domain", msg.SourceDomain, "nonce", msg.Nonce)

key := append(
common.LeftPadBytes((big.NewInt(int64(msg.SourceDomain))).Bytes(), 4),
common.LeftPadBytes((big.NewInt(int64(msg.Nonce))).Bytes(), 8)...,
)

response, nonceErr := messageTransmitter.UsedNonces(co, [32]byte(crypto.Keccak256(key)))
if nonceErr != nil {
logger.Debug("Error querying whether nonce was used. Continuing...")
} else {
fmt.Printf("received used nonce response: %d\n", response)
if response.Uint64() == uint64(1) {
// nonce has already been used, mark as complete
logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d",
msg.SourceDomain, msg.Nonce))
msg.Status = types.Complete
return nil
}
}

// broadcast txn
tx, err := messageTransmitter.ReceiveMessage(
auth,
msg.MsgSentBytes,
attestationBytes,
)
if err == nil {
msg.Status = types.Complete

fullLog, err := tx.MarshalJSON()
if err != nil {
logger.Error("error marshalling eth tx log", err)
}

msg.DestTxHash = tx.Hash().Hex()

logger.Info(fmt.Sprintf("Successfully broadcast %s to Ethereum. Tx hash: %s, FULL LOG: %s", msg.SourceTxHash, msg.DestTxHash, string(fullLog)))

return nil
}

logger.Error(fmt.Sprintf("error during broadcast: %s", err.Error()))
if parsedErr, ok := err.(JsonError); ok {
if parsedErr.ErrorCode() == 3 && parsedErr.Error() == "execution reverted: Nonce already used" {
msg.Status = types.Complete
logger.Error(fmt.Sprintf("This account nonce has already been used: %d", nonce))

return nil
}

match, _ := regexp.MatchString("nonce too low: next nonce [0-9]+, tx nonce [0-9]+", parsedErr.Error())
if match {
numberRegex := regexp.MustCompile("[0-9]+")
nextNonce, err := strconv.ParseInt(numberRegex.FindAllString(parsedErr.Error(), 1)[0], 10, 0)
if err != nil {
nextNonce, err = GetEthereumAccountNonce(e.rpcURL, e.minterAddress)
if err != nil {
logger.Error("unable to retrieve account number")
}
}
sequenceMap.Put(e.domain, uint64(nextNonce))
}
}

return err
}
Loading

0 comments on commit 4a61710

Please sign in to comment.