-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Function to guarantee PUB/SUB channel in SYNC_PUB #151
base: master
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #151 +/- ##
==========================================
- Coverage 98.39% 98.31% -0.08%
==========================================
Files 25 25
Lines 3049 3150 +101
Branches 232 233 +1
==========================================
+ Hits 3000 3097 +97
- Misses 37 41 +4
Partials 12 12
Continue to review full report at Codecov.
|
I added two more commits, refactoring the function a bit. I have also been thinking about whether the connection should be done in the Pending things before merging:
Important: It is interesting that I got an error in test_agent.py::test_get_uuid_used_as_alias_for_sub_in_sync_pub_async. Might be related to the issue with 7b7bd53 Commit message of 7b7bd53 (so it doesn't get lost after rebasing):
|
|
||
Ideally, this should only be called for alias that represent a | ||
SUB socket. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring Parameters
and Returns
sections are missing. 😉
@@ -912,7 +922,7 @@ def _handle_async_requests(self, data): | |||
else: | |||
handler(self, response) | |||
|
|||
def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]): | |||
def subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]): | |||
""" | |||
Subscribe the agent to another agent. | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to complete de docstring here as well now that the method is "user accessible".
|
||
Make sure they have stablished the PUB/SUB communication within the | ||
SYNC_PUB/SYNC_SUB channel before returning. This will guarantee that | ||
no PUB messages are lost. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing "Parameters" section from the docstring.
# Restore the original handler, now that the connection is guaranteed | ||
client.subscribe(uuid, original_handler) | ||
|
||
client.del_attr('_tmp_attr') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this implementation. Worst-case there might messages not received by the client when you change the handler. That is really bad. Also, that requires the .subscribe()
call to be executed again, which I'm not sure is the best thing either.
Would it make sense to ensure synchronization with SYNC-PUB requests? i.e.:
- First connect from the subscriber.
- Then make requests from the subscriber each X seconds until one response is received.
- That is all.
You achieve synchronization while avoiding double .subcribe()
call and avoid risks of the SUB handler receiving messages sent for the "syncrhonization" process.
You may want to specify an on_error=lambda x: pass
when sending the request to avoid warning log messages during synchronization.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems interesting, I will take a look into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would not be possible to synchronize them through requests. I am thinking about the case in which a request might change the internal state of the SYNC_PUB agent, and for those cases, an extra hack around would have to be made, I think (changing the handler of the SYNC_PUB momentarily for the requests).
Perhaps what we could do is explain that some messages might be lost in the process or that it should only be used for testing purposes. Now I'm thinking that perhaps the function does not even belong there. In the end, I came up with it to make some tests more consistent.
Any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmmmm.
Yeah, one possibility would be to take that away from there and keep it just for the tests.
Another possibility would be to make all SYNC_PUB sockets implement heart-beating. This is something we should do at some point anyway, hopefully Soon™.
Heart-beating would mean that a SYNC_PUB socket would publish a "heart-beat" (like an "I'm alive" message) periodically, to all subscribed agents. Clients can use this to know if they should reconnect (maybe the server went down).
What we could do for now is to implement just a part of the heart-beating: allow clients to request a heart-beat at any time (i.e.: "are you alive?"). This would be a request from the SYNC_SUB, although it would be treated as a special case and all SYNC_PUB channels should handle it the same way, without taking into account the user-defined handler for other requests. This means adding a "special case" in the protocol, but I don't think it is bad, as requests do not need to be very fast and are infrequent.
We could then use those heart-beat requests from the tests to ensure synchronization.
If you feel like that is too much for now, just take it away from there and open a new issue so we try to improve it for version 1.0.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe heart-beat messages could be published with a special topic (so that clients may decide to subscribe or not to this heart-beat).
Maybe we could use that for now for the tests: from the publisher, we add a timer that publishes heart-beats with a special topic, from the subscriber, when we want to make sure we are fully connected, we just subscribe to this topic and wait for the first heart-beat.
What you think?
Maybe we should anyway open an issue and move this discussion there (we're probably going to discuss these things again when we start working on a good heart-beat implementation).
assert message_01 not in a5.get_attr('received') | ||
assert b'fooWorld' in a5.get_attr('received') | ||
assert b'foobarFOO' in a5.get_attr('received') | ||
assert b'foBAR' in a5.get_attr('received') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All changes related to receive, received_list, etc. should go in a separate pull-request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I got them a bit messed up 😅 Will fix that before merging (if it is ever about to be merged).
This function will guarantee the PUB/SUB channel is already up and running after returning, ready for sending data right away.
However, there is a chance that the trace of the exception is printed for the test of ASYNC_REP (line 446 in test_agent). This is because we are doing something wrong, probably the connection has no time to be stablished before calling the function or something.
I managed to get around this issue in osmarkets without this function. I think we can close this PR and implement the heartbeat system in the future. What do you think @Peque ? |
This function aims to guarantee the PUB/SUB channel to be already up and running after returning, ready for sending data right away without it being dropped (client will be guaranteed to be at the other end of the PUB socket).