diff --git a/README.md b/README.md index 9bdb3e2..55e5d9c 100644 --- a/README.md +++ b/README.md @@ -30,9 +30,17 @@ After that, it will flush from the last stored height - lookback period up until By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag. -| **Exported Metric** | **Description** | **Type** | -|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|----------| -| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.

Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge | +| **Exported Metric** | **Description** | **Type** | +|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|----------| +| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.

Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge | +| cctp_relayer_chain_latest_height | Current height of the chain. | Gauge | +| cctp_relayer_broadcast_errors_total | The total number of failed broadcasts. Note: this is AFTER it retries `broadcast-retries` (config setting) number of times. | Counter | + +### Noble Key + +The noble private key you input into the config must be hex encoded. The easiest way to get this is via a chain binary: + +`nobled keys export --unarmored-hex --unsafe` ### API @@ -42,20 +50,18 @@ Simple API to query message state cache localhost:8000/tx/ # All messages for a tx hash and domain 0 (Ethereum) localhost:8000/tx/?domain=0 -# All messages for a tx hash and a given type ('mint' or 'forward') -localhost:8000/tx/?type=forward ``` ### State -| IrisLookupId | Type | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated | -|:-------------|:--------|:---------|:-------------|:-----------|:--------------|:-----------|:-------------|:--------|:--------| -| 0x123 | Mint | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Forward | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Mint | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Forward | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Mint | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Mint | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| IrisLookupId | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated | +|:-------------|:---------|:-------------|:-----------|:--------------|:-----------|:-------------|:--------|:--------| +| 0x123 | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | ### Generating Go ABI bindings @@ -67,4 +73,6 @@ abigen --abi ethereum/abi/MessageTransmitter.json --pkg contracts- --type Messag ``` ### Useful links -[Goerli USDC faucet](https://usdcfaucet.com/) +[USDC faucet](https://usdcfaucet.com/) + +[Circle Docs/Contract Addresses](https://developers.circle.com/stablecoins/docs/evm-smart-contracts) diff --git a/cmd/appstate.go b/cmd/appstate.go index 222cf12..8336f08 100644 --- a/cmd/appstate.go +++ b/cmd/appstate.go @@ -25,7 +25,7 @@ func NewAppState() *AppState { return &AppState{} } -// InitAppState checks if a logger and config are presant. If not, it adds them to the Appstate +// InitAppState checks if a logger and config are present. If not, it adds them to the AppState func (a *AppState) InitAppState() { if a.Logger == nil { a.InitLogger() diff --git a/cmd/process.go b/cmd/process.go index d62f65e..23f0451 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -56,10 +56,10 @@ func Start(a *AppState) *cobra.Command { flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval) if err != nil { - logger.Error("invalid flush interval", "error", err) + logger.Error("Invalid flush interval", "error", err) } if flushInterval == 0 { - logger.Info("flush interval not set. Use the --flush-interval flag to set a reoccurring flush") + logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush") } metrics := relayer.InitPromMetrics(port) @@ -74,11 +74,11 @@ func Start(a *AppState) *cobra.Command { logger = logger.With("name", c.Name(), "domain", c.Domain()) if err := c.InitializeClients(cmd.Context(), logger); err != nil { - logger.Error("error initializing client", "err", err) + logger.Error("Error initializing client", "err", err) os.Exit(1) } - go c.TrackLatestBlockHeight(cmd.Context(), logger) + go c.TrackLatestBlockHeight(cmd.Context(), logger, metrics) // wait until height is available maxRetries := 45 @@ -112,7 +112,7 @@ func Start(a *AppState) *cobra.Command { // spin up Processor worker pool for i := 0; i < int(cfg.ProcessorWorkerCount); i++ { - go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap) + go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap, metrics) } defer func() { @@ -136,6 +136,7 @@ func StartProcessor( registeredDomains map[types.Domain]types.Chain, processingQueue chan *types.TxState, sequenceMap *types.SequenceMap, + metrics *relayer.PromMetrics, ) { logger := a.Logger cfg := a.Config @@ -211,8 +212,8 @@ func StartProcessor( continue } - if err := chain.Broadcast(ctx, logger, msgs, sequenceMap); err != nil { - logger.Error("unable to mint one or more transfers", "error(s)", err, "total_transfers", len(msgs), "name", chain.Name(), "domain", domain) + if err := chain.Broadcast(ctx, logger, msgs, sequenceMap, metrics); err != nil { + logger.Error("Unable to mint one or more transfers", "error(s)", err, "total_transfers", len(msgs), "name", chain.Name(), "domain", domain) requeue = true continue } @@ -282,7 +283,7 @@ func filterLowTransfers(cfg *types.Config, logger log.Logger, msg *types.Message if msg.DestDomain == types.Domain(4) { nobleCfg, ok := cfg.Chains["noble"].(*noble.ChainConfig) if !ok { - logger.Info("chain named 'noble' not found in config, filtering transaction") + logger.Info("Chain named 'noble' not found in config, filtering transaction") return true } minBurnAmount = nobleCfg.MinMintAmount @@ -322,7 +323,7 @@ func startApi(a *AppState) { err := router.SetTrustedProxies(cfg.Api.TrustedProxies) // vpn.primary.strange.love if err != nil { - logger.Error("unable to set trusted proxies on API server: " + err.Error()) + logger.Error("Unable to set trusted proxies on API server: " + err.Error()) os.Exit(1) } diff --git a/cmd/process_test.go b/cmd/process_test.go index 77521c5..307d16f 100644 --- a/cmd/process_test.go +++ b/cmd/process_test.go @@ -24,7 +24,7 @@ func TestProcessNewLog(t *testing.T) { sequenceMap := types.NewSequenceMap() processingQueue = make(chan *types.TxState, 10) - go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap) + go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap, nil) emptyBz := make([]byte, 32) expectedState := &types.TxState{ @@ -56,7 +56,7 @@ func TestProcessDisabledCctpRoute(t *testing.T) { sequenceMap := types.NewSequenceMap() processingQueue = make(chan *types.TxState, 10) - go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap) + go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap, nil) emptyBz := make([]byte, 32) expectedState := &types.TxState{ @@ -89,7 +89,7 @@ func TestProcessInvalidDestinationCaller(t *testing.T) { sequenceMap := types.NewSequenceMap() processingQueue = make(chan *types.TxState, 10) - go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap) + go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap, nil) nonEmptyBytes := make([]byte, 31) nonEmptyBytes = append(nonEmptyBytes, 0x1) diff --git a/config/sample-integration-config.yaml b/config/sample-integration-config.yaml index 5eef16f..b0512cc 100644 --- a/config/sample-integration-config.yaml +++ b/config/sample-integration-config.yaml @@ -1,5 +1,5 @@ # This file is for integration testing on deployed relayers -# use this file inconjunction with: integration/deployed_relayer_test.go +# use this file in conjunction with: integration/deployed_relayer_test.go # "destination-caller": address of the relayer you are testing. Only a relayer with this # address can pick up the transaction. diff --git a/ethereum/broadcast.go b/ethereum/broadcast.go index c69e83d..052ab66 100644 --- a/ethereum/broadcast.go +++ b/ethereum/broadcast.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts" + "github.com/strangelove-ventures/noble-cctp-relayer/relayer" "github.com/strangelove-ventures/noble-cctp-relayer/types" ) @@ -37,6 +38,7 @@ func (e *Ethereum) Broadcast( logger log.Logger, msgs []*types.MessageState, sequenceMap *types.SequenceMap, + m *relayer.PromMetrics, ) error { logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain) @@ -56,16 +58,17 @@ func (e *Ethereum) Broadcast( var broadcastErrors error MsgLoop: for _, msg := range msgs { - if msg.Status == types.Complete { - continue MsgLoop - } - attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) if err != nil { return errors.New("unable to decode message attestation") } for attempt := 0; attempt <= e.maxRetries; attempt++ { + // check if another worker already broadcasted tx due to flush + if msg.Status == types.Complete { + continue MsgLoop + } + if err := e.attemptBroadcast( ctx, logger, @@ -85,8 +88,11 @@ MsgLoop: time.Sleep(time.Duration(e.retryIntervalSeconds) * time.Second) } } + // retried max times with failure - msg.Status = types.Failed + if m != nil { + m.IncBroadcastErrors(e.name, fmt.Sprint(e.domain)) + } broadcastErrors = errors.Join(broadcastErrors, errors.New("reached max number of broadcast attempts")) } return broadcastErrors @@ -139,7 +145,6 @@ func (e *Ethereum) attemptBroadcast( if nonceErr != nil { logger.Debug("Error querying whether nonce was used. Continuing...", "error:", nonceErr) } else { - fmt.Printf("received used nonce response: %d\n", response) if response.Uint64() == uint64(1) { // nonce has already been used, mark as complete logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d", diff --git a/ethereum/contract_backend_wrapper.go b/ethereum/contract_backend_wrapper.go index 53f7908..7290372 100644 --- a/ethereum/contract_backend_wrapper.go +++ b/ethereum/contract_backend_wrapper.go @@ -2,7 +2,6 @@ package ethereum import ( "context" - "fmt" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" @@ -19,10 +18,5 @@ func NewContractBackendWrapper(client *ethclient.Client) *ContractBackendWrapper } func (c *ContractBackendWrapper) SendTransaction(ctx context.Context, tx *types.Transaction) error { - json, err := tx.MarshalJSON() - if err != nil { - return err - } - fmt.Printf("SendTransaction: %+v\n\nRAW: %s\n", tx, json) return c.Client.SendTransaction(ctx, tx) } diff --git a/ethereum/listener.go b/ethereum/listener.go index 7a6b33e..03f184f 100644 --- a/ethereum/listener.go +++ b/ethereum/listener.go @@ -305,16 +305,18 @@ func (e *Ethereum) flushMechanism( } } -func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) { +func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) { logger.With("routine", "TrackLatestBlockHeight", "chain", e.name, "domain", e.domain) + d := fmt.Sprint(e.domain) + headers := make(chan *ethtypes.Header) sub, err := e.wsClient.SubscribeNewHead(ctx, headers) if err != nil { logger.Error("Failed to connect to websocket to track height. Will retry...", "err", err) time.Sleep(1 * time.Second) - e.TrackLatestBlockHeight(ctx, logger) + e.TrackLatestBlockHeight(ctx, logger, m) return } @@ -326,16 +328,19 @@ func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger return case err := <-sub.Err(): logger.Error("Height tracker websocket subscription error. Attempting to reconnect...", "err", err) - e.TrackLatestBlockHeight(ctx, logger) + e.TrackLatestBlockHeight(ctx, logger, m) return case header := <-headers: e.SetLatestBlock(header.Number.Uint64()) + if m != nil { + m.SetLatestHeight(e.name, d, header.Number.Int64()) + } } } } func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) { - logger = logger.With("metric", "wallet blannce", "chain", e.name, "domain", e.domain) + logger = logger.With("metric", "wallet balance", "chain", e.name, "domain", e.domain) queryRate := 5 * time.Minute account := common.HexToAddress(e.minterAddress) @@ -360,7 +365,9 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m balanceBigFloat := new(big.Float).SetInt(balance) balanceScaled, _ := new(big.Float).Quo(balanceBigFloat, scaleFactor).Float64() - m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled) + if m != nil { + m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled) + } case <-timer.C: balance, err := e.rpcClient.BalanceAt(ctx, account, nil) if err != nil { @@ -371,7 +378,9 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m balanceBigFloat := new(big.Float).SetInt(balance) balanceScaled, _ := new(big.Float).Quo(balanceBigFloat, scaleFactor).Float64() - m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled) + if m != nil { + m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled) + } case <-ctx.Done(): timer.Stop() diff --git a/integration/eth_burn_to_noble_mint_test.go b/integration/eth_burn_to_noble_mint_test.go index 181a58f..f526db8 100644 --- a/integration/eth_burn_to_noble_mint_test.go +++ b/integration/eth_burn_to_noble_mint_test.go @@ -73,7 +73,7 @@ func TestEthBurnToNobleMint(t *testing.T) { processingQueue := make(chan *types.TxState, 10) go ethChain.StartListener(ctx, a.Logger, processingQueue, 0) - go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap) + go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil) _, _, generatedWallet := testdata.KeyTestPubAddr() destAddress, _ := bech32.ConvertAndEncode("noble", generatedWallet) diff --git a/integration/noble_burn_to_eth_mint_test.go b/integration/noble_burn_to_eth_mint_test.go index 6f04edc..ddcef78 100644 --- a/integration/noble_burn_to_eth_mint_test.go +++ b/integration/noble_burn_to_eth_mint_test.go @@ -78,7 +78,7 @@ func TestNobleBurnToEthMint(t *testing.T) { processingQueue := make(chan *types.TxState, 10) go nobleChain.StartListener(ctx, a.Logger, processingQueue, 0) - go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap) + go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil) ethDestinationAddress, _, err := generateEthWallet() require.NoError(t, err) diff --git a/noble/broadcast.go b/noble/broadcast.go index fb760ab..defdb51 100644 --- a/noble/broadcast.go +++ b/noble/broadcast.go @@ -19,6 +19,7 @@ import ( "github.com/cosmos/cosmos-sdk/types/tx/signing" xauthsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" xauthtx "github.com/cosmos/cosmos-sdk/x/auth/tx" + "github.com/strangelove-ventures/noble-cctp-relayer/relayer" "github.com/strangelove-ventures/noble-cctp-relayer/types" ) @@ -47,6 +48,7 @@ func (n *Noble) Broadcast( logger log.Logger, msgs []*types.MessageState, sequenceMap *types.SequenceMap, + m *relayer.PromMetrics, ) error { // set up sdk context interfaceRegistry := codectypes.NewInterfaceRegistry() @@ -76,7 +78,9 @@ func (n *Noble) Broadcast( msg.Status = types.Failed } } - + if m != nil { + m.IncBroadcastErrors(n.Name(), fmt.Sprint(n.Domain())) + } return errors.New("reached max number of broadcast attempts") } @@ -103,6 +107,11 @@ func (n *Noble) attemptBroadcast( continue } + // check if another worker already broadcasted tx due to flush + if msg.Status == types.Complete { + continue + } + attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) if err != nil { return fmt.Errorf("unable to decode message attestation") diff --git a/noble/listener.go b/noble/listener.go index 61099b3..80788eb 100644 --- a/noble/listener.go +++ b/noble/listener.go @@ -172,15 +172,20 @@ func (n *Noble) flushMechanism( } } -func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) { +func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) { logger.With("routine", "TrackLatestBlockHeight", "chain", n.Name(), "domain", n.Domain()) + d := fmt.Sprint(n.Domain()) + // first time res, err := n.cc.RPCClient.Status(ctx) if err != nil { logger.Error("Unable to query Nobles latest height", "err", err) } n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight)) + if m != nil { + m.SetLatestHeight(n.Name(), d, res.SyncInfo.LatestBlockHeight) + } // then start loop on a timer for { @@ -193,6 +198,9 @@ func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) { continue } n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight)) + if m != nil { + m.SetLatestHeight(n.Name(), d, res.SyncInfo.LatestBlockHeight) + } case <-ctx.Done(): timer.Stop() return diff --git a/noble/message_state.go b/noble/message_state.go index 5273f9d..6414b7e 100644 --- a/noble/message_state.go +++ b/noble/message_state.go @@ -73,8 +73,6 @@ func txToMessageState(tx *ctypes.ResultTx) ([]*types.MessageState, error) { } messageStates = append(messageStates, messageState) - - fmt.Printf("Appended transfer from 4 to %d\n", msg.DestinationDomain) } } if !parsed { diff --git a/relayer/metrics.go b/relayer/metrics.go index 3b04cf7..7f1281f 100644 --- a/relayer/metrics.go +++ b/relayer/metrics.go @@ -10,7 +10,9 @@ import ( ) type PromMetrics struct { - WalletBalance *prometheus.GaugeVec + WalletBalance *prometheus.GaugeVec + LatestHeight *prometheus.GaugeVec + BroadcastErrors *prometheus.CounterVec } func InitPromMetrics(port int16) *PromMetrics { @@ -18,7 +20,9 @@ func InitPromMetrics(port int16) *PromMetrics { // labels var ( - walletLabels = []string{"chain", "address", "denom"} + walletLabels = []string{"chain", "address", "denom"} + heightLabels = []string{"chain", "domain"} + broadcastErrorLabels = []string{"chain", "domain"} ) m := &PromMetrics{ @@ -26,9 +30,19 @@ func InitPromMetrics(port int16) *PromMetrics { Name: "cctp_relayer_wallet_balance", Help: "The current balance for a wallet", }, walletLabels), + LatestHeight: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cctp_relayer_chain_latest_height", + Help: "The current height of the chain", + }, heightLabels), + BroadcastErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cctp_relayer_broadcast_errors_total", + Help: "The total number of failed broadcasts. Note: this is AFTER is retires `broadcast-retries` number of times (config setting).", + }, broadcastErrorLabels), } reg.MustRegister(m.WalletBalance) + reg.MustRegister(m.LatestHeight) + reg.MustRegister(m.BroadcastErrors) // Expose /metrics HTTP endpoint go func() { @@ -42,3 +56,11 @@ func InitPromMetrics(port int16) *PromMetrics { func (m *PromMetrics) SetWalletBalance(chain, address, denom string, balance float64) { m.WalletBalance.WithLabelValues(chain, address, denom).Set(balance) } + +func (m *PromMetrics) SetLatestHeight(chain, domain string, height int64) { + m.LatestHeight.WithLabelValues(chain, domain).Set(float64(height)) +} + +func (m *PromMetrics) IncBroadcastErrors(chain, domain string) { + m.BroadcastErrors.WithLabelValues(chain, domain).Inc() +} diff --git a/types/chain.go b/types/chain.go index 7340d8a..30851d4 100644 --- a/types/chain.go +++ b/types/chain.go @@ -61,16 +61,18 @@ type Chain interface { logger log.Logger, msgs []*MessageState, sequenceMap *SequenceMap, + metrics *relayer.PromMetrics, ) error TrackLatestBlockHeight( ctx context.Context, logger log.Logger, + metrics *relayer.PromMetrics, ) WalletBalanceMetric( ctx context.Context, logger log.Logger, - metric *relayer.PromMetrics, + metrics *relayer.PromMetrics, ) }