Skip to content

Commit

Permalink
Merge branch 'main' into reece/chore-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya authored Mar 26, 2024
2 parents 868cb24 + e54ab0b commit 9d32d09
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 55 deletions.
36 changes: 22 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ After that, it will flush from the last stored height - lookback period up until

By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag.

| **Exported Metric** | **Description** | **Type** |
|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.<br><br>Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge |
| **Exported Metric** | **Description** | **Type** |
|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.<br><br>Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge |
| cctp_relayer_chain_latest_height | Current height of the chain. | Gauge |
| cctp_relayer_broadcast_errors_total | The total number of failed broadcasts. Note: this is AFTER it retries `broadcast-retries` (config setting) number of times. | Counter |

### Noble Key

The noble private key you input into the config must be hex encoded. The easiest way to get this is via a chain binary:

`nobled keys export <KEY_NAME> --unarmored-hex --unsafe`


### API
Expand All @@ -42,20 +50,18 @@ Simple API to query message state cache
localhost:8000/tx/<hash, including the 0x prefix>
# All messages for a tx hash and domain 0 (Ethereum)
localhost:8000/tx/<hash>?domain=0
# All messages for a tx hash and a given type ('mint' or 'forward')
localhost:8000/tx/<hash>?type=forward
```

### State

| IrisLookupId | Type | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated |
|:-------------|:--------|:---------|:-------------|:-----------|:--------------|:-----------|:-------------|:--------|:--------|
| 0x123 | Mint | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Forward | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Mint | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Forward | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Mint | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Mint | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| IrisLookupId | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated |
|:-------------|:---------|:-------------|:-----------|:--------------|:-----------|:-------------|:--------|:--------|
| 0x123 | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |

### Generating Go ABI bindings

Expand All @@ -67,4 +73,6 @@ abigen --abi ethereum/abi/MessageTransmitter.json --pkg contracts- --type Messag
```

### Useful links
[Goerli USDC faucet](https://usdcfaucet.com/)
[USDC faucet](https://usdcfaucet.com/)

[Circle Docs/Contract Addresses](https://developers.circle.com/stablecoins/docs/evm-smart-contracts)
2 changes: 1 addition & 1 deletion cmd/appstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewAppState() *AppState {
return &AppState{}
}

// InitAppState checks if a logger and config are presant. If not, it adds them to the Appstate
// InitAppState checks if a logger and config are present. If not, it adds them to the AppState
func (a *AppState) InitAppState() {
if a.Logger == nil {
a.InitLogger()
Expand Down
19 changes: 10 additions & 9 deletions cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ func Start(a *AppState) *cobra.Command {

flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval)
if err != nil {
logger.Error("invalid flush interval", "error", err)
logger.Error("Invalid flush interval", "error", err)
}
if flushInterval == 0 {
logger.Info("flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
}

metrics := relayer.InitPromMetrics(port)
Expand All @@ -74,11 +74,11 @@ func Start(a *AppState) *cobra.Command {
logger = logger.With("name", c.Name(), "domain", c.Domain())

if err := c.InitializeClients(cmd.Context(), logger); err != nil {
logger.Error("error initializing client", "err", err)
logger.Error("Error initializing client", "err", err)
os.Exit(1)
}

go c.TrackLatestBlockHeight(cmd.Context(), logger)
go c.TrackLatestBlockHeight(cmd.Context(), logger, metrics)

// wait until height is available
maxRetries := 45
Expand Down Expand Up @@ -112,7 +112,7 @@ func Start(a *AppState) *cobra.Command {

// spin up Processor worker pool
for i := 0; i < int(cfg.ProcessorWorkerCount); i++ {
go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap)
go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap, metrics)
}

defer func() {
Expand All @@ -136,6 +136,7 @@ func StartProcessor(
registeredDomains map[types.Domain]types.Chain,
processingQueue chan *types.TxState,
sequenceMap *types.SequenceMap,
metrics *relayer.PromMetrics,
) {
logger := a.Logger
cfg := a.Config
Expand Down Expand Up @@ -211,8 +212,8 @@ func StartProcessor(
continue
}

if err := chain.Broadcast(ctx, logger, msgs, sequenceMap); err != nil {
logger.Error("unable to mint one or more transfers", "error(s)", err, "total_transfers", len(msgs), "name", chain.Name(), "domain", domain)
if err := chain.Broadcast(ctx, logger, msgs, sequenceMap, metrics); err != nil {
logger.Error("Unable to mint one or more transfers", "error(s)", err, "total_transfers", len(msgs), "name", chain.Name(), "domain", domain)
requeue = true
continue
}
Expand Down Expand Up @@ -282,7 +283,7 @@ func filterLowTransfers(cfg *types.Config, logger log.Logger, msg *types.Message
if msg.DestDomain == types.Domain(4) {
nobleCfg, ok := cfg.Chains["noble"].(*noble.ChainConfig)
if !ok {
logger.Info("chain named 'noble' not found in config, filtering transaction")
logger.Info("Chain named 'noble' not found in config, filtering transaction")
return true
}
minBurnAmount = nobleCfg.MinMintAmount
Expand Down Expand Up @@ -322,7 +323,7 @@ func startApi(a *AppState) {

err := router.SetTrustedProxies(cfg.Api.TrustedProxies) // vpn.primary.strange.love
if err != nil {
logger.Error("unable to set trusted proxies on API server: " + err.Error())
logger.Error("Unable to set trusted proxies on API server: " + err.Error())
os.Exit(1)
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestProcessNewLog(t *testing.T) {
sequenceMap := types.NewSequenceMap()
processingQueue = make(chan *types.TxState, 10)

go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap, nil)

emptyBz := make([]byte, 32)
expectedState := &types.TxState{
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestProcessDisabledCctpRoute(t *testing.T) {
sequenceMap := types.NewSequenceMap()
processingQueue = make(chan *types.TxState, 10)

go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap, nil)

emptyBz := make([]byte, 32)
expectedState := &types.TxState{
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestProcessInvalidDestinationCaller(t *testing.T) {
sequenceMap := types.NewSequenceMap()
processingQueue = make(chan *types.TxState, 10)

go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap, nil)

nonEmptyBytes := make([]byte, 31)
nonEmptyBytes = append(nonEmptyBytes, 0x1)
Expand Down
2 changes: 1 addition & 1 deletion config/sample-integration-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This file is for integration testing on deployed relayers
# use this file inconjunction with: integration/deployed_relayer_test.go
# use this file in conjunction with: integration/deployed_relayer_test.go

# "destination-caller": address of the relayer you are testing. Only a relayer with this
# address can pick up the transaction.
Expand Down
17 changes: 11 additions & 6 deletions ethereum/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts"
"github.com/strangelove-ventures/noble-cctp-relayer/relayer"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

Expand All @@ -37,6 +38,7 @@ func (e *Ethereum) Broadcast(
logger log.Logger,
msgs []*types.MessageState,
sequenceMap *types.SequenceMap,
m *relayer.PromMetrics,
) error {

logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain)
Expand All @@ -56,16 +58,17 @@ func (e *Ethereum) Broadcast(
var broadcastErrors error
MsgLoop:
for _, msg := range msgs {
if msg.Status == types.Complete {
continue MsgLoop
}

attestationBytes, err := hex.DecodeString(msg.Attestation[2:])
if err != nil {
return errors.New("unable to decode message attestation")
}

for attempt := 0; attempt <= e.maxRetries; attempt++ {
// check if another worker already broadcasted tx due to flush
if msg.Status == types.Complete {
continue MsgLoop
}

if err := e.attemptBroadcast(
ctx,
logger,
Expand All @@ -85,8 +88,11 @@ MsgLoop:
time.Sleep(time.Duration(e.retryIntervalSeconds) * time.Second)
}
}

// retried max times with failure
msg.Status = types.Failed
if m != nil {
m.IncBroadcastErrors(e.name, fmt.Sprint(e.domain))
}
broadcastErrors = errors.Join(broadcastErrors, errors.New("reached max number of broadcast attempts"))
}
return broadcastErrors
Expand Down Expand Up @@ -139,7 +145,6 @@ func (e *Ethereum) attemptBroadcast(
if nonceErr != nil {
logger.Debug("Error querying whether nonce was used. Continuing...", "error:", nonceErr)
} else {
fmt.Printf("received used nonce response: %d\n", response)
if response.Uint64() == uint64(1) {
// nonce has already been used, mark as complete
logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d",
Expand Down
6 changes: 0 additions & 6 deletions ethereum/contract_backend_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ethereum

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
Expand All @@ -19,10 +18,5 @@ func NewContractBackendWrapper(client *ethclient.Client) *ContractBackendWrapper
}

func (c *ContractBackendWrapper) SendTransaction(ctx context.Context, tx *types.Transaction) error {
json, err := tx.MarshalJSON()
if err != nil {
return err
}
fmt.Printf("SendTransaction: %+v\n\nRAW: %s\n", tx, json)
return c.Client.SendTransaction(ctx, tx)
}
21 changes: 15 additions & 6 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,18 @@ func (e *Ethereum) flushMechanism(
}
}

func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) {
func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) {
logger.With("routine", "TrackLatestBlockHeight", "chain", e.name, "domain", e.domain)

d := fmt.Sprint(e.domain)

headers := make(chan *ethtypes.Header)

sub, err := e.wsClient.SubscribeNewHead(ctx, headers)
if err != nil {
logger.Error("Failed to connect to websocket to track height. Will retry...", "err", err)
time.Sleep(1 * time.Second)
e.TrackLatestBlockHeight(ctx, logger)
e.TrackLatestBlockHeight(ctx, logger, m)
return
}

Expand All @@ -326,16 +328,19 @@ func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger
return
case err := <-sub.Err():
logger.Error("Height tracker websocket subscription error. Attempting to reconnect...", "err", err)
e.TrackLatestBlockHeight(ctx, logger)
e.TrackLatestBlockHeight(ctx, logger, m)
return
case header := <-headers:
e.SetLatestBlock(header.Number.Uint64())
if m != nil {
m.SetLatestHeight(e.name, d, header.Number.Int64())
}
}
}
}

func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) {
logger = logger.With("metric", "wallet blannce", "chain", e.name, "domain", e.domain)
logger = logger.With("metric", "wallet balance", "chain", e.name, "domain", e.domain)
queryRate := 5 * time.Minute

account := common.HexToAddress(e.minterAddress)
Expand All @@ -360,7 +365,9 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m
balanceBigFloat := new(big.Float).SetInt(balance)
balanceScaled, _ := new(big.Float).Quo(balanceBigFloat, scaleFactor).Float64()

m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)
if m != nil {
m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)
}
case <-timer.C:
balance, err := e.rpcClient.BalanceAt(ctx, account, nil)
if err != nil {
Expand All @@ -371,7 +378,9 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m
balanceBigFloat := new(big.Float).SetInt(balance)
balanceScaled, _ := new(big.Float).Quo(balanceBigFloat, scaleFactor).Float64()

m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)
if m != nil {
m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)
}

case <-ctx.Done():
timer.Stop()
Expand Down
2 changes: 1 addition & 1 deletion integration/eth_burn_to_noble_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestEthBurnToNobleMint(t *testing.T) {
processingQueue := make(chan *types.TxState, 10)

go ethChain.StartListener(ctx, a.Logger, processingQueue, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil)

_, _, generatedWallet := testdata.KeyTestPubAddr()
destAddress, _ := bech32.ConvertAndEncode("noble", generatedWallet)
Expand Down
2 changes: 1 addition & 1 deletion integration/noble_burn_to_eth_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestNobleBurnToEthMint(t *testing.T) {
processingQueue := make(chan *types.TxState, 10)

go nobleChain.StartListener(ctx, a.Logger, processingQueue, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil)

ethDestinationAddress, _, err := generateEthWallet()
require.NoError(t, err)
Expand Down
11 changes: 10 additions & 1 deletion noble/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cosmos/cosmos-sdk/types/tx/signing"
xauthsigning "github.com/cosmos/cosmos-sdk/x/auth/signing"
xauthtx "github.com/cosmos/cosmos-sdk/x/auth/tx"
"github.com/strangelove-ventures/noble-cctp-relayer/relayer"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

Expand Down Expand Up @@ -47,6 +48,7 @@ func (n *Noble) Broadcast(
logger log.Logger,
msgs []*types.MessageState,
sequenceMap *types.SequenceMap,
m *relayer.PromMetrics,
) error {
// set up sdk context
interfaceRegistry := codectypes.NewInterfaceRegistry()
Expand Down Expand Up @@ -76,7 +78,9 @@ func (n *Noble) Broadcast(
msg.Status = types.Failed
}
}

if m != nil {
m.IncBroadcastErrors(n.Name(), fmt.Sprint(n.Domain()))
}
return errors.New("reached max number of broadcast attempts")
}

Expand All @@ -103,6 +107,11 @@ func (n *Noble) attemptBroadcast(
continue
}

// check if another worker already broadcasted tx due to flush
if msg.Status == types.Complete {
continue
}

attestationBytes, err := hex.DecodeString(msg.Attestation[2:])
if err != nil {
return fmt.Errorf("unable to decode message attestation")
Expand Down
Loading

0 comments on commit 9d32d09

Please sign in to comment.