Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Metrics: track latest height + broadcast errors #72

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ After that, it will flush from the last stored height - lookback period up until

By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag.

| **Exported Metric** | **Description** | **Type** |
|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.<br><br>Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge |
| **Exported Metric** | **Description** | **Type** |
|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.<br><br>Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge |
| cctp_relayer_chain_latest_height | Current height of the chain. | Gauge |
| cctp_relayer_broadcast_errors_total | The total number of failed broadcasts. Note: this is AFTER is retires `broadcast-retries` (config setting) number of times. | Counter |
boojamya marked this conversation as resolved.
Show resolved Hide resolved

### Noble Key

The noble private key you input into the config must be hex encoded. The easiest way to get this is via a chain binary:

`nobled keys export <KEY_NAME> --unarmored-hex --unsafe`


### API
Expand All @@ -42,20 +50,18 @@ Simple API to query message state cache
localhost:8000/tx/<hash, including the 0x prefix>
# All messages for a tx hash and domain 0 (Ethereum)
localhost:8000/tx/<hash>?domain=0
# All messages for a tx hash and a given type ('mint' or 'forward')
localhost:8000/tx/<hash>?type=forward
```

### State

| IrisLookupId | Type | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated |
|:-------------|:--------|:---------|:-------------|:-----------|:--------------|:-----------|:-------------|:--------|:--------|
| 0x123 | Mint | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Forward | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Mint | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Forward | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Mint | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Mint | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| IrisLookupId | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated |
|:-------------|:---------|:-------------|:-----------|:--------------|:-----------|:-------------|:--------|:--------|
| 0x123 | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |
| 0x123 | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date |

### Generating Go ABI bindings

Expand All @@ -67,4 +73,6 @@ abigen --abi ethereum/abi/MessageTransmitter.json --pkg contracts- --type Messag
```

### Useful links
[Goerli USDC faucet](https://usdcfaucet.com/)
[USDC faucet](https://usdcfaucet.com/)

[Circle Docs/Contract Addresses](https://developers.circle.com/stablecoins/docs/evm-smart-contracts)
2 changes: 1 addition & 1 deletion cmd/appstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewAppState() *AppState {
return &AppState{}
}

// InitAppState checks if a logger and config are presant. If not, it adds them to the Appstate
// InitAppState checks if a logger and config are present. If not, it adds them to the AppState
func (a *AppState) InitAppState() {
if a.Logger == nil {
a.InitLogger()
Expand Down
23 changes: 12 additions & 11 deletions cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ func Start(a *AppState) *cobra.Command {

flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval)
if err != nil {
logger.Error("invalid flush interval", "error", err)
logger.Error("Invalid flush interval", "error", err)
}
if flushInterval == 0 {
logger.Info("flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
}

metrics := relayer.InitPromMetrics(port)
Expand All @@ -74,11 +74,11 @@ func Start(a *AppState) *cobra.Command {
logger = logger.With("name", c.Name(), "domain", c.Domain())

if err := c.InitializeClients(cmd.Context(), logger); err != nil {
logger.Error("error initializing client", "err", err)
logger.Error("Error initializing client", "err", err)
os.Exit(1)
}

go c.TrackLatestBlockHeight(cmd.Context(), logger)
go c.TrackLatestBlockHeight(cmd.Context(), logger, metrics)

// wait until height is available
maxRetries := 45
Expand Down Expand Up @@ -112,7 +112,7 @@ func Start(a *AppState) *cobra.Command {

// spin up Processor worker pool
for i := 0; i < int(cfg.ProcessorWorkerCount); i++ {
go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap)
go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap, metrics)
}

defer func() {
Expand All @@ -136,6 +136,7 @@ func StartProcessor(
registeredDomains map[types.Domain]types.Chain,
processingQueue chan *types.TxState,
sequenceMap *types.SequenceMap,
metrics *relayer.PromMetrics,
) {
logger := a.Logger
cfg := a.Config
Expand Down Expand Up @@ -211,8 +212,8 @@ func StartProcessor(
continue
}

if err := chain.Broadcast(ctx, logger, msgs, sequenceMap); err != nil {
logger.Error("unable to mint one or more transfers", "error(s)", err, "total_transfers", len(msgs), "name", chain.Name(), "domain", domain)
if err := chain.Broadcast(ctx, logger, msgs, sequenceMap, metrics); err != nil {
logger.Error("Unable to mint one or more transfers", "error(s)", err, "total_transfers", len(msgs), "name", chain.Name(), "domain", domain)
requeue = true
continue
}
Expand Down Expand Up @@ -269,20 +270,20 @@ func filterInvalidDestinationCallers(registeredDomains map[types.Domain]types.Ch
return true
}

// filterLowTransfers returns true if the amount being transfered to the destination chain is lower than the min-mint-amount configured
// filterLowTransfers returns true if the amount being transferred to the destination chain is lower than the min-mint-amount configured
func filterLowTransfers(cfg *types.Config, logger log.Logger, msg *types.MessageState) bool {
bm, err := new(cctptypes.BurnMessage).Parse(msg.MsgBody)
if err != nil {
logger.Info("This is not a burn message", "err", err)
return true
}

// TODO: not assume that "noble" is domain 4, add "domain" to the noble chain conifg
// TODO: not assume that "noble" is domain 4, add "domain" to the noble chain config
var minBurnAmount uint64
if msg.DestDomain == types.Domain(4) {
nobleCfg, ok := cfg.Chains["noble"].(*noble.ChainConfig)
if !ok {
logger.Info("chain named 'noble' not found in config, filtering transaction")
logger.Info("Chain named 'noble' not found in config, filtering transaction")
return true
}
minBurnAmount = nobleCfg.MinMintAmount
Expand Down Expand Up @@ -322,7 +323,7 @@ func startApi(a *AppState) {

err := router.SetTrustedProxies(cfg.Api.TrustedProxies) // vpn.primary.strange.love
if err != nil {
logger.Error("unable to set trusted proxies on API server: " + err.Error())
logger.Error("Unable to set trusted proxies on API server: " + err.Error())
os.Exit(1)
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestProcessNewLog(t *testing.T) {
sequenceMap := types.NewSequenceMap()
processingQueue = make(chan *types.TxState, 10)

go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap, nil)
Reecepbcups marked this conversation as resolved.
Show resolved Hide resolved

emptyBz := make([]byte, 32)
expectedState := &types.TxState{
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestProcessDisabledCctpRoute(t *testing.T) {
sequenceMap := types.NewSequenceMap()
processingQueue = make(chan *types.TxState, 10)

go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap, nil)

emptyBz := make([]byte, 32)
expectedState := &types.TxState{
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestProcessInvalidDestinationCaller(t *testing.T) {
sequenceMap := types.NewSequenceMap()
processingQueue = make(chan *types.TxState, 10)

go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(context.TODO(), a, registeredDomains, processingQueue, sequenceMap, nil)

nonEmptyBytes := make([]byte, 31)
nonEmptyBytes = append(nonEmptyBytes, 0x1)
Expand Down
2 changes: 1 addition & 1 deletion config/sample-integration-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This file is for integration testing on deployed relayers
# use this file inconjunction with: integration/deployed_relayer_test.go
# use this file in conjunction with: integration/deployed_relayer_test.go

# "destination-caller": address of the relayer you are testing. Only a relayer with this
# address can pick up the transaction.
Expand Down
7 changes: 6 additions & 1 deletion ethereum/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts"
"github.com/strangelove-ventures/noble-cctp-relayer/relayer"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

Expand All @@ -37,6 +38,7 @@ func (e *Ethereum) Broadcast(
logger log.Logger,
msgs []*types.MessageState,
sequenceMap *types.SequenceMap,
m *relayer.PromMetrics,
) error {

logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain)
Expand Down Expand Up @@ -85,8 +87,11 @@ MsgLoop:
time.Sleep(time.Duration(e.retryIntervalSeconds) * time.Second)
}
}

// retried max times with failure
msg.Status = types.Failed
if m != nil {
m.IncBroadcastErrors(e.name, fmt.Sprint(e.domain))
}
broadcastErrors = errors.Join(broadcastErrors, errors.New("reached max number of broadcast attempts"))
}
return broadcastErrors
Expand Down
25 changes: 17 additions & 8 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,37 +305,42 @@ func (e *Ethereum) flushMechanism(
}
}

func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) {
func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) {
logger.With("routine", "TrackLatestBlockHeight", "chain", e.name, "domain", e.domain)

d := fmt.Sprint(e.domain)

headers := make(chan *ethtypes.Header)

sub, err := e.wsClient.SubscribeNewHead(ctx, headers)
if err != nil {
logger.Error("Failed to connect to websocket to track height. Will retry...", "err", err)
time.Sleep(1 * time.Second)
e.TrackLatestBlockHeight(ctx, logger)
e.TrackLatestBlockHeight(ctx, logger, m)
return
}

logger.Info("Height tracking websocket subscritpiton connected")
logger.Info("Height tracking websocket subscription connected")

for {
select {
case <-ctx.Done():
return
case err := <-sub.Err():
logger.Error("Height tracker websocket subscritpiton error. Attempting to reconnect...", "err", err)
e.TrackLatestBlockHeight(ctx, logger)
logger.Error("Height tracker websocket subscription error. Attempting to reconnect...", "err", err)
e.TrackLatestBlockHeight(ctx, logger, m)
return
case header := <-headers:
e.SetLatestBlock(header.Number.Uint64())
if m != nil {
m.SetLatestHeight(e.name, d, header.Number.Int64())
}
}
}
}

func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) {
logger = logger.With("metric", "wallet blannce", "chain", e.name, "domain", e.domain)
logger = logger.With("metric", "wallet balance", "chain", e.name, "domain", e.domain)
queryRate := 5 * time.Minute

account := common.HexToAddress(e.minterAddress)
Expand All @@ -360,7 +365,9 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m
balanceBigFloat := new(big.Float).SetInt(balance)
balanceScaled, _ := new(big.Float).Quo(balanceBigFloat, scaleFactor).Float64()

m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)
if m != nil {
m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)
}
case <-timer.C:
balance, err := e.rpcClient.BalanceAt(ctx, account, nil)
if err != nil {
Expand All @@ -371,7 +378,9 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m
balanceBigFloat := new(big.Float).SetInt(balance)
balanceScaled, _ := new(big.Float).Quo(balanceBigFloat, scaleFactor).Float64()

m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)
if m != nil {
m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)
}

case <-ctx.Done():
timer.Stop()
Expand Down
2 changes: 1 addition & 1 deletion integration/eth_burn_to_noble_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestEthBurnToNobleMint(t *testing.T) {
processingQueue := make(chan *types.TxState, 10)

go ethChain.StartListener(ctx, a.Logger, processingQueue, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil)

_, _, generatedWallet := testdata.KeyTestPubAddr()
destAddress, _ := bech32.ConvertAndEncode("noble", generatedWallet)
Expand Down
2 changes: 1 addition & 1 deletion integration/noble_burn_to_eth_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestNobleBurnToEthMint(t *testing.T) {
processingQueue := make(chan *types.TxState, 10)

go nobleChain.StartListener(ctx, a.Logger, processingQueue, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil)

ethDestinationAddress, _, err := generateEthWallet()
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion noble/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cosmos/cosmos-sdk/types/tx/signing"
xauthsigning "github.com/cosmos/cosmos-sdk/x/auth/signing"
xauthtx "github.com/cosmos/cosmos-sdk/x/auth/tx"
"github.com/strangelove-ventures/noble-cctp-relayer/relayer"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

Expand Down Expand Up @@ -47,6 +48,7 @@ func (n *Noble) Broadcast(
logger log.Logger,
msgs []*types.MessageState,
sequenceMap *types.SequenceMap,
m *relayer.PromMetrics,
) error {
// set up sdk context
interfaceRegistry := codectypes.NewInterfaceRegistry()
Expand Down Expand Up @@ -76,7 +78,9 @@ func (n *Noble) Broadcast(
msg.Status = types.Failed
}
}

if m != nil {
m.IncBroadcastErrors(n.Name(), fmt.Sprint(n.Domain()))
}
return errors.New("reached max number of broadcast attempts")
}

Expand Down
10 changes: 9 additions & 1 deletion noble/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,20 @@ func (n *Noble) flushMechanism(
}
}

func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) {
func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) {
logger.With("routine", "TrackLatestBlockHeight", "chain", n.Name(), "domain", n.Domain())

d := fmt.Sprint(n.Domain())

// first time
res, err := n.cc.RPCClient.Status(ctx)
if err != nil {
logger.Error("Unable to query Nobles latest height", "err", err)
}
n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight))
if m != nil {
m.SetLatestHeight(n.Name(), d, res.SyncInfo.LatestBlockHeight)
}

// then start loop on a timer
for {
Expand All @@ -193,6 +198,9 @@ func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) {
continue
}
n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight))
if m != nil {
m.SetLatestHeight(n.Name(), d, res.SyncInfo.LatestBlockHeight)
}
case <-ctx.Done():
timer.Stop()
return
Expand Down
Loading
Loading