-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send envelopes in batches #253
Conversation
WalkthroughThe changes in this pull request primarily focus on enhancing the handling and processing of envelopes within the messaging service. Key modifications include the refactoring of the Changes
Possibly related PRs
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @richardhuaaa and the rest of your teammates on Graphite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (6)
pkg/envelopes/unsignedOriginator.go (1)
60-62
: Consider adding method documentation.Adding documentation would help clarify the purpose and return value of the TargetTopic method.
+// TargetTopic returns the topic associated with this envelope by delegating to the underlying PayerEnvelope func (u *UnsignedOriginatorEnvelope) TargetTopic() topic.Topic { return u.PayerEnvelope.TargetTopic() }
pkg/testutils/envelopes/envelopes.go (1)
123-138
: LGTM! Consider adding LastSeen parameter for flexibility.The implementation is clean and follows the existing patterns. However, you might want to consider adding an optional LastSeen parameter to support different test scenarios.
Consider this enhancement for future flexibility:
func CreateOriginatorEnvelopeWithTopic( t *testing.T, originatorNodeID uint32, originatorSequenceID uint64, topic []byte, + lastSeen *envelopes.VectorClock, ) *envelopes.OriginatorEnvelope { payerEnv := CreatePayerEnvelope(t, CreateClientEnvelope( &envelopes.AuthenticatedData{ TargetTopic: topic, TargetOriginator: originatorNodeID, - LastSeen: nil, + LastSeen: lastSeen, }, ))pkg/api/message/subscribe_test.go (2)
21-25
: Consider keeping topics asTopic
type for better debugging.While converting topics to bytes works, keeping them as
Topic
type until needed would preserve their semantic meaning during test execution and debugging. This would make it easier to inspect topic properties in test failures.var ( - topicA = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicA")).Bytes() - topicB = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicB")).Bytes() - topicC = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicC")).Bytes() + topicA = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicA")) + topicB = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicB")) + topicC = topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte("topicC")) )
Line range hint
1-286
: Consider adding tests for batch size limits.While the current test coverage is good, consider adding test cases that verify:
- Maximum batch size handling
- Performance with large numbers of topics
- Behavior when approaching system resource limits
This would ensure the batching implementation remains robust under stress.
Would you like me to help create these additional test cases?
pkg/api/message/service.go (1)
183-198
: Consider adding a maximum batch size limit.While batching improves performance by reducing the number of stream.Send calls, consider adding a maximum batch size limit to prevent potential memory issues with very large sets of envelopes. This is particularly important in streaming scenarios where the number of envelopes could be substantial.
Consider:
- Adding a constant for max batch size
- Splitting into multiple sends when the batch size exceeds the limit
const ( maxRequestedRows int32 = 1000 maxQueriesPerRequest int = 10000 maxTopicLength int = 128 maxVectorClockLength int = 100 pagingInterval time.Duration = 100 * time.Millisecond + maxBatchSize int = 1000 ) func (s *Service) sendEnvelopes( stream message_api.ReplicationApi_SubscribeEnvelopesServer, query *message_api.EnvelopesQuery, envs []*envelopes.OriginatorEnvelope, ) error { cursor := query.GetLastSeen().GetNodeIdToSequenceId() if cursor == nil { cursor = make(map[uint32]uint64) query.LastSeen = &envelopesProto.VectorClock{ NodeIdToSequenceId: cursor, } } envsToSend := make([]*envelopesProto.OriginatorEnvelope, 0, len(envs)) for _, env := range envs { if cursor[uint32(env.OriginatorNodeID())] >= env.OriginatorSequenceID() { continue } envsToSend = append(envsToSend, env.Proto()) cursor[uint32(env.OriginatorNodeID())] = env.OriginatorSequenceID() + // Send batch if it reaches the maximum size + if len(envsToSend) >= maxBatchSize { + if err := stream.Send(&message_api.SubscribeEnvelopesResponse{ + Envelopes: envsToSend, + }); err != nil { + return status.Errorf(codes.Internal, "error sending envelopes: %v", err) + } + envsToSend = make([]*envelopesProto.OriginatorEnvelope, 0, maxBatchSize) + } } + // Send remaining envelopes + if len(envsToSend) > 0 { err := stream.Send(&message_api.SubscribeEnvelopesResponse{ Envelopes: envsToSend, }); err != nil { return status.Errorf(codes.Internal, "error sending envelopes: %v", err) } + } return nil }pkg/api/message/subscribeWorker.go (1)
Line range hint
279-283
: Re-evaluate removing listeners when channels are fullImmediately removing listeners when their channels are full might lead to unintended disconnections under high load. Consider implementing backpressure handling, such as buffering messages, dropping the oldest messages, or notifying the listener before removal.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (9)
- pkg/api/message/service.go (1 hunks)
- pkg/api/message/subscribeWorker.go (8 hunks)
- pkg/api/message/subscribe_test.go (7 hunks)
- pkg/api/query_test.go (6 hunks)
- pkg/db/subscription_test.go (3 hunks)
- pkg/envelopes/originator.go (2 hunks)
- pkg/envelopes/payer.go (2 hunks)
- pkg/envelopes/unsignedOriginator.go (2 hunks)
- pkg/testutils/envelopes/envelopes.go (1 hunks)
🧰 Additional context used
🔇 Additional comments (23)
pkg/envelopes/payer.go (2)
9-9
: LGTM!The topic package import is appropriately placed and necessary for the new functionality.
60-62
: LGTM!The
TargetTopic
method is well-implemented as a clean delegation to the underlyingClientEnvelope
, maintaining consistency with similar implementations across other envelope types. This addition supports the PR's objective of enabling batch processing by providing topic information for envelope routing.pkg/envelopes/originator.go (2)
7-7
: LGTM: Import addition is appropriate.The addition of the topic package import is necessary for the new TargetTopic method's return type.
61-63
: LGTM: Clean implementation of TargetTopic method.The method follows the established pattern of delegating to the UnsignedOriginatorEnvelope, maintaining consistency with other envelope types. This addition supports the PR's objective of batch processing by providing topic information for envelope grouping.
Let's verify the consistency of TargetTopic implementation across envelope types:
✅ Verification successful
Let me try a different approach to verify the TargetTopic implementations.
This should help us see both the interface definitions and actual implementations of TargetTopic across different envelope types. The
-A 3
flag will show some context after each match to help us analyze the implementation patterns.
Verified: TargetTopic implementation follows consistent delegation pattern
The implementation in
OriginatorEnvelope
correctly follows the established delegation chain pattern seen across all envelope types:
OriginatorEnvelope
delegates toUnsignedOriginatorEnvelope
UnsignedOriginatorEnvelope
delegates toPayerEnvelope
PayerEnvelope
delegates toClientEnvelope
ClientEnvelope
provides the actual implementation by returningtargetTopic
This forms a clean delegation chain that maintains consistency across the envelope hierarchy, making the code predictable and maintainable.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify TargetTopic method implementation across envelope types # Expected: All envelope types should have consistent TargetTopic implementations # Search for TargetTopic method implementations ast-grep --pattern 'func $_($_) TargetTopic() topic.Topic { $$$ }'Length of output: 70
Script:
#!/bin/bash # Search for TargetTopic method implementations using ripgrep # This will help us find all occurrences across envelope types rg "TargetTopic\(\)" -A 3 pkg/envelopes/ # Also search for any interfaces defining TargetTopic rg "interface.*\{.*TargetTopic.*\}" pkg/envelopes/Length of output: 1046
pkg/envelopes/unsignedOriginator.go (2)
7-7
: LGTM!The topic package import is correctly added and necessary for the new TargetTopic method.
60-62
: LGTM! Implementation aligns with PR objectives.The TargetTopic method correctly delegates to PayerEnvelope, supporting the envelope batching functionality by providing consistent topic access across different envelope types.
pkg/testutils/envelopes/envelopes.go (1)
123-138
: Verify usage in batch-related tests.Let's verify that this new function is being used appropriately in the batch-related test files.
✅ Verification successful
Let me analyze the results and generate the final response.
Function is properly integrated in test files
The
CreateOriginatorEnvelopeWithTopic
function is being used appropriately in two test files:
pkg/api/query_test.go
pkg/api/message/subscribe_test.go
In both files, the function is used consistently to create test envelopes with different combinations of:
- Originator node IDs (1 and 2)
- Sequence IDs (1, 2, and 3)
- Topics (topicA and topicB)
The usage pattern shows proper integration for testing query and subscription scenarios with varying parameters, which is exactly what this utility function was designed for.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find test files using this new function to verify proper integration # with batch-related tests. echo "Searching for test files using CreateOriginatorEnvelopeWithTopic..." rg -l "CreateOriginatorEnvelopeWithTopic" --type go --type-add 'go:*.go' | grep "_test.go" echo -e "\nChecking the context of usage..." rg -B 2 -A 2 "CreateOriginatorEnvelopeWithTopic" --type go --type-add 'go:*.go'Length of output: 3489
pkg/db/subscription_test.go (3)
14-14
: LGTM: Required import for topic handling.The added import is necessary for the new topic creation functionality.
34-34
: LGTM: Consistent topic usage across test data.The test data consistently uses the properly constructed topic variable, improving maintainability and type safety across all test cases.
Note: This change aligns well with the PR's objective of batch processing envelopes, as it ensures proper topic structuring which is crucial for correct envelope routing.
Also applies to: 40-40, 68-68, 74-74, 80-80
18-19
: LGTM: Improved topic handling with centralized definition.Good refactoring to use a properly constructed topic instead of raw byte slices. This improves type safety and maintainability.
Let's verify the topic kind usage is consistent across the codebase:
✅ Verification successful
Based on the search results, I can now generate the final response as we have comprehensive information about the usage of
TOPIC_KIND_GROUP_MESSAGES_V1
across the codebase.Topic kind usage is consistent and appropriate
The verification shows that
TOPIC_KIND_GROUP_MESSAGES_V1
is used consistently throughout the codebase for group message-related functionality:
- It's properly defined in
pkg/topic/topic.go
- Used consistently in test utilities and test files
- Correctly handled in core functionality like
pkg/indexer/storer/groupMessage.go
andpkg/api/payer/service.go
- The usage in
subscription_test.go
aligns with the established patterns across the codebase🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check if TOPIC_KIND_GROUP_MESSAGES_V1 is used consistently in tests rg -A 2 "TOPIC_KIND_GROUP_MESSAGES_V1" --type goLength of output: 4253
pkg/api/query_test.go (7)
16-22
: LGTM: Well-structured topic handling refactor.The introduction of centralized topic variables using
topic.NewTopic
improves code maintainability and consistency. The consistent use ofTOPIC_KIND_GROUP_MESSAGES_V1
across all topics is appropriate.
30-69
: LGTM: Improved test data setup with explicit topic handling.The refactoring from direct byte slices to using
CreateOriginatorEnvelopeWithTopic
makes the test setup more explicit and maintainable. The consistent use of topic variables across test cases strengthens the test suite's reliability.
137-137
: LGTM: Consistent topic handling in query test.The update to use the new topic variable maintains consistency with the refactored approach.
174-174
: LGTM: Proper topic handling in vector clock test.The update maintains consistency while properly testing the interaction between topic filtering and vector clock-based queries.
195-195
: LGTM: Well-structured test for multi-topic batching scenario.The test case effectively validates querying multiple topics with vector clock, which is particularly relevant for the PR's envelope batching objective.
237-237
: LGTM: Appropriate use of unused topic for empty result test.Using
topicC
effectively tests the empty result case while maintaining consistency with the new topic handling approach.
255-256
: LGTM: Proper validation of invalid query conditions.The test effectively validates error handling when combining topic and originator filters while maintaining consistency with the new topic handling approach.
pkg/api/message/subscribe_test.go (2)
34-74
: LGTM! Test setup properly creates diverse test scenarios.The setup creates a good mix of test data with different originators and topics, which is essential for testing the new batching functionality. The consistent use of topic variables and
CreateOriginatorEnvelopeWithTopic
makes the test setup clear and maintainable.
149-149
: LGTM! Comprehensive test coverage for subscription scenarios.The test cases effectively cover:
- Topic-based filtering
- Multiple simultaneous subscriptions
- Cursor-based pagination
- Invalid request handling
This ensures the robustness of the envelope batching implementation.
Let's verify if we have test coverage for all edge cases:
Also applies to: 201-201, 236-236, 258-258, 277-277
pkg/api/message/service.go (1)
183-191
: LGTM! Efficient envelope batching implementation.The implementation efficiently accumulates envelopes with proper cursor handling and memory pre-allocation.
pkg/api/message/subscribeWorker.go (3)
Line range hint
34-60
: Good use of topic validation and logging innewListener
The addition of
topic.ParseTopic
for topic validation and incorporation of logging enhances the robustness and debuggability of the listener initialization process.
131-135
: Ensure type assertions inrangeKeys
are safeIn the
rangeKeys
method, type assertions are used without checks. Verify that these assertions (key.(K)
andvalue.(*listenerSet)
) are safe and cannot cause panics due to unexpected types.Run the following script to check for potential issues with type assertions:
#!/bin/bash # Description: Ensure that all type assertions in listenersMap are safe. # Test: Search for type assertions in listenersMap methods. rg 'listenersMap\[.*\]' -A 10 | rg '\.\(.*\)'
310-312
: Ensure all callers ofnewListener
are updated with the newlogger
parameterWith the addition of the
logger
parameter tonewListener
, verify that all calls to this function pass the appropriate logger to prevent potential nil pointer dereferences or missing logs.Run the following script to find all calls to
newListener
and check for thelogger
parameter:✅ Verification successful
Let me gather more context about the
newListener
function definition and its usage by generating additional shell scripts.
Based on the gathered information, I can now generate the final response:
The
newListener
function call is correctly updated with the logger parameterThe verification shows that:
- The
newListener
function signature includes thelogger
parameter of type*zap.Logger
- The only call to
newListener
in the codebase is insubscribeWorker.listen()
method, which correctly passess.log
as the logger parameter🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find all calls to `newListener` and ensure they include the `logger` parameter. # Test: Search for `newListener` function calls. Expect: All calls pass `logger` as an argument. rg 'newListener\(' -A 1Length of output: 656
Script:
#!/bin/bash # Get the full definition of newListener function to understand its parameters ast-grep --pattern 'func newListener($_) { $$$ }' # Get more context around the usage of newListener rg 'newListener\(' -B 5 -A 5Length of output: 1483
When streaming envelopes, send them in batches rather than one-by-one. We use a different batching strategy based on whether the subscription is originator/topic/global.
#255
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Tests