-
Notifications
You must be signed in to change notification settings - Fork 928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: adds support for gRPC streaming and removes dependency on RPC #3915
base: main
Are you sure you want to change the base?
Conversation
PR still not fully ready, some tests are still failing. but would be good to get initial opinion + if someone can see if it's providing you with any performance increase |
case <-f.doneCh: | ||
return nil | ||
case <-ctx.Done(): | ||
return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question for reviewers]
not sure if this is the right way to stop the resources in here. I guess we can delete this part. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth keeping. It might look unnecessary, but it will actually prevent flakes for tests, so lets keep it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about closing the underlying connection? What's responsible for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. I can support closing the connection if we make the Client
a struct. would that be something we want?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nodebuilder/core/module.go
Outdated
fx.OnStart(func(_ context.Context, client core.Client) error { | ||
return client.Start() | ||
}), | ||
fx.OnStop(func(_ context.Context, client core.Client) error { | ||
return client.Stop() | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[note for reviewers]
with the new implementation, the gRPC connection is started when creating the client instead of waiting until we call Start()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but we also need to close conn when node stops.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
go.mod
Outdated
github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403 | ||
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 | ||
// broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app | ||
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 | ||
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 | ||
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241104132041-3d2b32b7ddf2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[note for reviewers]
will be updated once we merge the PRs and make release
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same here
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3915 +/- ##
==========================================
+ Coverage 44.83% 45.24% +0.40%
==========================================
Files 265 308 +43
Lines 14620 21997 +7377
==========================================
+ Hits 6555 9952 +3397
- Misses 7313 10953 +3640
- Partials 752 1092 +340 ☔ View full report in Codecov by Sentry. |
Tested locally. Both grpc clients work |
state/core_access.go
Outdated
@@ -123,7 +123,7 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { | |||
ca.ctx, ca.cancel = context.WithCancel(context.Background()) | |||
|
|||
// dial given celestia-core endpoint | |||
endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort) | |||
endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.port) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/fetcher.go
Outdated
|
||
// resolveHeight takes a height pointer and returns its value if it's not nil. | ||
// otherwise, returns the latest height. | ||
func (f *BlockFetcher) resolveHeight(ctx context.Context, height *int64) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[non-blocking]
Passing a pointer for an int is really an anti-pattern, which existed in the previous client. I guess you kept it for the case where the nil is passed which means get the latest, but we don't need to stick to that pattern, and can use something else, like a separate method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I tried to keep the same API. I don't like passing pointers to numbers too. Do you think this should be updated in this PR? or we create an issue and leave it later on for someone to pick it up at some point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case <-f.doneCh: | ||
return nil | ||
case <-ctx.Done(): | ||
return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth keeping. It might look unnecessary, but it will actually prevent flakes for tests, so lets keep it.
@@ -173,12 +173,12 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 | |||
} | |||
log.Debugw("fetched signed block from core", "height", b.Header.Height) | |||
|
|||
eds, err := extendBlock(b.Data, b.Header.Version.App) | |||
eds, err := extendBlock(*b.Data, b.Header.Version.App) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we already know, Data is quite big, let's avoid copying it here. It should be easy to change extendBlock
to take ptr instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be GC at some point. The thing is extendBlock
also makes a couple other calls by value instead of reference. So multiple copies will be done regardless, this will be just one less. What do you think of creating an issue that checks wherever Data
is used, it passes it by reference, even in app and maybe other repos so that we have everything fixed?
if !f.client.IsRunning() { | ||
return nil, errors.New("client not running") | ||
} | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
f.cancel = cancel | ||
f.doneCh = make(chan struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend fool-proving this a bit. Basically, the current logic supports only a single subscription, while the method makes it look like multiple are and calling it multiple times is a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the same existing pattern. What do you mean by fool-proving?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant making erroring or panicking if subsvription is called again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d4243fa like this?
core/fetcher.go
Outdated
if height != nil { | ||
return *height, nil | ||
} | ||
status, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a way to request the latest height through existing methods? Let's avoid additional roundtrip for Head request if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like 0
on BlockByHeightRequest
can represent latest height.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why I recommended to just use 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im fine with changing and removing the pointers. Is this something you want to do in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes pls, we would like to leave as minimal tech debt behind as possiblr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/client.go
Outdated
"/websocket", | ||
httpClient.StandardClient(), | ||
) | ||
return coregrpc.StartBlockAPIGRPCClient(fmt.Sprintf("tcp://%s:%s", ip, port)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse the client across state
and core
modules?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I'm planning to do it in a subsequent PR
func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) ( | ||
*types.Block, | ||
*types.BlockMeta, | ||
*types.Commit, | ||
*types.ValidatorSet, | ||
error, | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return of 5 values caught my eye. I can't find if BlockMeta returned from this method is used anywhere. If not, perhaps we can return just SignedBlock struct, which contains everything except Blockmeta?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am returning everything we receive in this. If you don't want block meta to be returned, I guess we could even stop sending it from core. What do you think?
core/fetcher.go
Outdated
signedBlock, err := f.GetSignedBlock(ctx, &resp.Height) | ||
if err != nil { | ||
log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add per-call timeout. Otherwise we might get stuck on one of the calls indefinitely
Also would need to handle canceled context here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/fetcher.go
Outdated
|
||
// resolveHeight takes a height pointer and returns its value if it's not nil. | ||
// otherwise, returns the latest height. | ||
func (f *BlockFetcher) resolveHeight(ctx context.Context, height *int64) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit]: Wouldn't this be a bit more simple if we just used 0
height to indicate Head instead of nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -173,12 +173,12 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 | |||
} | |||
log.Debugw("fetched signed block from core", "height", b.Header.Height) | |||
|
|||
eds, err := extendBlock(b.Data, b.Header.Version.App) | |||
eds, err := extendBlock(*b.Data, b.Header.Version.App) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we passing by value here? Just curious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because extendBlock expects the parameter to be passed by value. I didn't want to change that behavior since Data is already being copied in a lot of places, one more copy is fine. Related: #3915 (comment)
if err != nil && ctx.Err() == nil { | ||
require.NoError(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just require.NoError? we do not expect one here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a funny behavior happening there that I guess will be fixed if we support closing the connection manually. Related: #3915 (comment)
case <-f.doneCh: | ||
return nil | ||
case <-ctx.Done(): | ||
return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about closing the underlying connection? What's responsible for that?
nodebuilder/core/module.go
Outdated
fx.OnStart(func(_ context.Context, client core.Client) error { | ||
return client.Start() | ||
}), | ||
fx.OnStop(func(_ context.Context, client core.Client) error { | ||
return client.Stop() | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but we also need to close conn when node stops.
# Conflicts: # core/exchange.go # core/exchange_test.go
The Celestia-node part of celestiaorg/celestia-core#1513