-
Notifications
You must be signed in to change notification settings - Fork 197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redis pool persistent connection #114
Conversation
channels_redis/core.py
Outdated
@@ -208,6 +208,9 @@ def _setup_encryption(self, symmetric_encryption_keys): | |||
await self.receive_buffer[channel].put(message) | |||
else: | |||
await self.receive_buffer[real_channel].put(message) | |||
# this sleep makes sure that the redis queue is not popped before the | |||
# previous message was processed and returned back to the caller | |||
await asyncio.sleep(0.0001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am not happy with this, but i couldn't find any other solution to this. as when i used connection pools , the last test case which was creating an issue:
Test Case No 16 :
get a new channel - > add channel to group -> send four messages -> one by one call receive
PROBLEM : Alternate messages were getting skipped and following flow of events is being followed
receive -> start receive_loop -> wait for pop result -> pop result_1 -> pop result_2 -> process and return result 1 from receive -> CANCEL receive_loop -> call receive again -> pop result_3 -> process and return result_3
result_2 was being popped from the queue and the task was then getting cancelled. let me know if you come up with a better solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sleep schedules the receive coroutine before again awaiting the receive_single coroutine which in turns pops the redis queue. now the receive_loop was cancelled before even the message was popped from redis.
This results in a dangling coroutine (see the |
Ahh I guess i somehow skipped the warning: |
try: | ||
await self.receive_loop_task | ||
except CancelledError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as with a task , it does not get cancelled immediately after calling cancel, CancelledError Need to be raised for that task to cancel.
https://stackoverflow.com/questions/40897428/please-explain-task-was-destroyed-but-it-is-pending
Due to this when the loop stops after the tests, the task was pending.
Awaiting it so as to raise the CancelledError and get the task cancelled before the receive coroutine completes.
@@ -208,6 +215,9 @@ def _setup_encryption(self, symmetric_encryption_keys): | |||
await self.receive_buffer[channel].put(message) | |||
else: | |||
await self.receive_buffer[real_channel].put(message) | |||
# this sleep makes sure that the redis queue is not popped before the | |||
# previous message was processed and returned back to the caller | |||
await asyncio.sleep(0.0001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the sleep do this? Sleeping for 0.1 milliseconds sounds like a hack to ensure something else happens when in reality it should be a lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well the sleep halts this coroutine after pushing the message in the channel receive buffer. so that the control moves to the receive coroutine before running the while loop again and it always does guarantee this, but anyways i am trying to come up with a better way to schedule the receive coroutine after a single pop from redis queue
self.conn = await aioredis.create_redis(**self.kwargs) | ||
return self.conn | ||
# get connection key for the current loop and the given index | ||
connection_key = asyncio.get_event_loop().__hash__() + self.index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why hash rather than ID? I would say ID makes more sense since it's an object instance, unless event loops implemented the hash function when I wasn't looking!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup.agreed. will change
Regarding #83 #100
Hi,
This PR makes use of the redis connection pool instead of native redis connection objects.