Skip to content

Commit

Permalink
[CT-1326] send price updates after block is finalized (#2611)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayy04 authored Nov 26, 2024
1 parent 6682638 commit 115acf7
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 3 deletions.
1 change: 1 addition & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ func New(
},
app.RevShareKeeper,
&app.MarketMapKeeper,
app.FullNodeStreamingManager,
)
pricesModule := pricesmodule.NewAppModule(
appCodec,
Expand Down
1 change: 1 addition & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count"
GrpcSendPriceUpdateCount = "grpc_send_price_update_count"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
Expand Down
99 changes: 96 additions & 3 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
err error,
) {
// Perform some basic validation on the request.
if len(clobPairIds) == 0 && len(subaccountIds) == 0 {
if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 {
return types.ErrInvalidStreamingRequest
}

Expand Down Expand Up @@ -493,6 +493,33 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
)
}

// SendPriceUpdates sends price updates to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendPriceUpdate(
ctx sdk.Context,
priceUpdate pricestypes.StreamPriceUpdate,
) {
if !lib.IsDeliverTxMode(ctx) {
// If not `DeliverTx`, return since there is no optimistic price updates.
return
}

metrics.IncrCounter(
metrics.GrpcSendPriceUpdateCount,
1,
)

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_PriceUpdate{
PriceUpdate: &priceUpdate,
},
}
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
&stagedEvent,
)
}

// Retrieve all events staged during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
Expand Down Expand Up @@ -545,6 +572,14 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.
return exists
}

// TracksMarketId checks if a market id is being tracked by the streaming manager.
func (sm *FullNodeStreamingManagerImpl) TracksMarketId(marketId uint32) bool {
sm.Lock()
defer sm.Unlock()
_, exists := sm.marketIdToSubscriptionIdMapping[marketId]
return exists
}

func getStreamUpdatesFromOffchainUpdates(
v1updates []ocutypes.OffChainUpdateV1,
blockHeight uint32,
Expand Down Expand Up @@ -773,6 +808,31 @@ func getStreamUpdatesForSubaccountUpdates(
return streamUpdates, subaccountIds
}

func getStreamUpdatesForPriceUpdates(
priceUpdates []pricestypes.StreamPriceUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) (
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
// Group subaccount updates by subaccount id.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
marketIds = make([]uint32, 0)
for _, priceUpdate := range priceUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_PriceUpdate{
PriceUpdate: &priceUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
marketIds = append(marketIds, priceUpdate.MarketId)
}
return streamUpdates, marketIds
}

// AddOrderUpdatesToCache adds a series of updates to the full node streaming cache.
// Clob pair ids are the clob pair id each update is relevant to.
func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(
Expand Down Expand Up @@ -976,6 +1036,27 @@ func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesBySubaccountWithLock(
}
}

// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache,
// and store corresponding market ids.
// This method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
if len(streamUpdates) != len(marketIds) {
sm.logger.Error("Mismatch between stream updates and market IDs lengths")
return
}
sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...)
for _, marketId := range marketIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.marketIdToSubscriptionIdMapping[marketId],
)
}
}

// Grpc Streaming logic after consensus agrees on a block.
// - Stream all events staged during `FinalizeBlock`.
// - Stream orderbook updates to sync fills in local ops queue.
Expand All @@ -989,7 +1070,8 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(

finalizedFills,
finalizedSubaccountUpdates,
finalizedOrderbookUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)
finalizedOrderbookUpdates,
finalizedPriceUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

sm.Lock()
defer sm.Unlock()
Expand Down Expand Up @@ -1032,6 +1114,14 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
)
sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds)

// Finally, cache updates for finalized subaccount updates
priceStreamUpdates, marketIds := getStreamUpdatesForPriceUpdates(
finalizedPriceUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByMarketIdWithLock(priceStreamUpdates, marketIds)

// Emit all stream updates in a single batch.
// Note we still have the lock, which is released right before function returns.
sm.FlushStreamUpdatesWithLock()
Expand All @@ -1045,6 +1135,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
finalizedFills []clobtypes.StreamOrderbookFill,
finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate,
finalizedOrderbookUpdates []clobtypes.StreamOrderbookUpdate,
finalizedPriceUpdates []pricestypes.StreamPriceUpdate,
) {
// Get onchain stream events stored in transient store.
stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx)
Expand All @@ -1062,6 +1153,8 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate)
case *clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate:
finalizedOrderbookUpdates = append(finalizedOrderbookUpdates, *event.OrderbookUpdate)
case *clobtypes.StagedFinalizeBlockEvent_PriceUpdate:
finalizedPriceUpdates = append(finalizedPriceUpdates, *event.PriceUpdate)
default:
panic(fmt.Sprintf("Unhandled staged event type: %v\n", stagedEvent.Event))
}
Expand All @@ -1076,7 +1169,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
float32(len(finalizedFills)),
)

return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates
return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates, finalizedPriceUpdates
}

func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
Expand Down
10 changes: 10 additions & 0 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId)
return false
}

func (sm *NoopGrpcStreamingManager) TracksMarketId(id uint32) bool {
return false
}

func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams(
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate {
Expand Down Expand Up @@ -90,6 +94,12 @@ func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate(
) {
}

func (sm *NoopGrpcStreamingManager) SendPriceUpdate(
ctx sdk.Context,
priceUpdate pricestypes.StreamPriceUpdate,
) {
}

func (sm *NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock(
ctx sdk.Context,
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
Expand Down
5 changes: 5 additions & 0 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,15 @@ type FullNodeStreamingManager interface {
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
)
SendPriceUpdate(
ctx sdk.Context,
priceUpdate pricestypes.StreamPriceUpdate,
)
GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent
TracksSubaccountId(id satypes.SubaccountId) bool
TracksMarketId(marketId uint32) bool
StreamBatchUpdatesAfterFinalizeBlock(
ctx sdk.Context,
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
Expand Down
2 changes: 2 additions & 0 deletions protocol/testutil/keeper/prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"testing"

streaming "github.com/dydxprotocol/v4-chain/protocol/streaming"
revsharetypes "github.com/dydxprotocol/v4-chain/protocol/x/revshare/types"

"github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -124,6 +125,7 @@ func createPricesKeeper(
},
revShareKeeper,
marketMapKeeper,
streaming.NewNoopGrpcStreamingManager(),
)

return k, storeKey, indexPriceCache, mockTimeProvider
Expand Down
9 changes: 9 additions & 0 deletions protocol/x/prices/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/types"
"github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
)

Expand All @@ -24,6 +25,8 @@ type (
authorities map[string]struct{}
RevShareKeeper types.RevShareKeeper
MarketMapKeeper types.MarketMapKeeper

streamingManager streamingtypes.FullNodeStreamingManager
}
)

Expand All @@ -38,6 +41,7 @@ func NewKeeper(
authorities []string,
revShareKeeper types.RevShareKeeper,
marketMapKeeper types.MarketMapKeeper,
streamingManager streamingtypes.FullNodeStreamingManager,
) *Keeper {
return &Keeper{
cdc: cdc,
Expand All @@ -48,6 +52,7 @@ func NewKeeper(
authorities: lib.UniqueSliceToSet(authorities),
RevShareKeeper: revShareKeeper,
MarketMapKeeper: marketMapKeeper,
streamingManager: streamingManager,
}
}

Expand All @@ -66,3 +71,7 @@ func (k Keeper) HasAuthority(authority string) bool {
_, ok := k.authorities[authority]
return ok
}

func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager {
return k.streamingManager
}
14 changes: 14 additions & 0 deletions protocol/x/prices/keeper/market_price.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ func (k Keeper) UpdateMarketPrices(
pricefeedmetrics.GetLabelForMarketId(marketPrice.Id),
},
)

// If GRPC streaming is on, emit a price update to stream.
if k.GetFullNodeStreamingManager().Enabled() {
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) {
k.GetFullNodeStreamingManager().SendPriceUpdate(
ctx,
types.StreamPriceUpdate{
MarketId: marketPrice.Id,
Price: marketPrice,
Snapshot: false,
},
)
}
}
}

// Generate indexer events.
Expand Down

0 comments on commit 115acf7

Please sign in to comment.