Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sweep: fix error in _invoke_callback(). In broadcast_service/core.py, _invoice_callback() function uses the result method of the future object, which can cause thread blocking and thus fail to achieve asynchronous effects. We should sovle it, so that when multiple callback functions listen to a topic at the same time, they will be triggered simultaneously instead of serially. #18

Open
2 tasks done
Undertone0809 opened this issue Dec 11, 2023 · 1 comment · May be fixed by #20 or #21
Labels
bug Something isn't working sweep Sweep your software chores

Comments

@Undertone0809
Copy link
Owner

Undertone0809 commented Dec 11, 2023

In broadcast_service/core.py, _invoke_callback() function uses the result method of the future object, which can cause thread blocking and thus fail to achieve asynchronous effects. We should sovle it, so that when multiple callback functions listen to a topic at the same time, they will be triggered simultaneously instead of serially.

Checklist
  • Modify broadcast_service/_core.py8282112 Edit
  • Running GitHub Actions for broadcast_service/_core.pyEdit

Flowchart

@sweep-ai sweep-ai bot added the sweep Sweep your software chores label Dec 11, 2023
@Undertone0809 Undertone0809 added bug Something isn't working sweep Sweep your software chores and removed sweep Sweep your software chores labels Dec 11, 2023
Copy link
Contributor

sweep-ai bot commented Dec 11, 2023

Here's the PR! #21. See Sweep's process at dashboard.

Sweep Basic Tier: I'm using GPT-4. You have 4 GPT-4 tickets left for the month and 2 for the day. (tracking ID: c7fdbe6b6a)

For more GPT-4 tickets, visit our payment portal. For a one week free trial, try Sweep Pro (unlimited GPT-4 tickets).

Actions (click)

  • ↻ Restart Sweep

Sandbox Execution ✓

Here are the sandbox execution logs prior to making any changes:

Sandbox logs for 7893281
Checking broadcast_service/_core.py for syntax errors... ✅ broadcast_service/_core.py has no syntax errors! 1/1 ✓
Checking broadcast_service/_core.py for syntax errors...
✅ broadcast_service/_core.py has no syntax errors!

Sandbox passed on the latest main, so sandbox checks will be enabled for this issue.

Install Sweep Configs: Pull Request

Step 1: 🔎 Searching

I found the following snippets in your repository. I will now analyze these snippets and come up with a plan.

Some code snippets I think are relevant in decreasing order of relevance (click to expand). If some file is missing from here, you can mention the path in the ticket description.

broadcast_service.config(
callback=handle_publisher_callback,
).publish("topic")
if __name__ == '__main__':
main()
```
It should be noted that the order of the three elements `args[0]`, `args[1]`, and `args[2]` in the above example is not uniquely determined, which depends on the execution time of the subscriber's callback function. However, in most cases, we cannot judge which subscriber callback function ends first, so `broadcast-service` development specifications recommend that when using this function, let the return value types of the subscriber callback functions be consistent as much as possible to reduce the cost of additional data judgment.
## Passing different parameters when publishing a topic multiple times
If you want to pass different parameters when publishing a topic multiple times. The following example show how to do.
```python

if __name__ == '__main__':
main()
```
**output:**
```text
handle_subscriber_callback 1
handle_subscriber_callback 2
handle_publisher_callback
```
## Publisher Multiple Executions
`broadcast-service` also supports publishing multiple topics at the same time. The following example shows how to publish multiple topics at the same time.

`broadcast-service` also supports publishing multiple topics at the same time. The following example shows how to publish multiple topics at the same time.
```python
from broadcast_service import broadcast_service
@broadcast_service.on_listen("topic")
def handle_subscriber_callback():
print("handle_subscriber_callback")
def main():
broadcast_service.config(
num_of_executions=5,
).publish("topic")
if __name__ == '__main__':
main()

```text
handle_subscriber_callback
handle_publisher_callback
handle_subscriber_callback
handle_publisher_callback
handle_subscriber_callback
handle_publisher_callback
```
It can be seen that the topic was published three times, and the publisher callback function was executed three times. If you want the publisher callback function to be executed only once after all topics are published and all subscriber callback functions are executed, you can use the parameter `enable_final_return=False` to achieve this goal.
```python
from broadcast_service import broadcast_service

) -> 'BroadcastService':
"""Provide more complex topic publish mode
Args:
num_of_executions: default is 1, indicating the number of times the same topic is published at once
callback: default is None. You can get callback and the parameters of subscriber
after all subscribers' callback functions have been completed.
enable_final_return: default is False, it means you can get callback after you publish
n times topic. In this case, finish_callback params is store in *args rather than **kwargs.
interval: publish interval. Unit seconds.
split_parameters: If you initiate multiple calls and want to pass different parameters to the subscriber
in each call, you can use this parameter for parameter passing. Additionally, when you use this
parameter, you do not need to pass any parameters in the broadcast() function.
Returns:
Returns current object, which is used to call broadcast with configuration.
"""
self.enable_config = True
self.publish_dispatch_config_manager.create_publisher_callback(
num_of_executions=num_of_executions,
callback=callback,
enable_final_return=enable_final_return,
interval=interval,
status=PUBLISHER_CALLBACK_STATUS['RUNNING'],
split_parameters=split_parameters
)

Note that if the publisher's callback function needs to receive parameters, you must use `*args` to receive parameters. Therefore, if multiple subscriber callback functions return information, the publisher's callback cannot be set. Therefore, `*args` is used as a parameter pool to receive data returned from the subscriber's callback function. `*args` is a tuple, which can store any type of data, as long as you can successfully obtain the parameter information of `args`.
The following example shows a complex scenario where multiple subscriber callback functions return information.
```python
from broadcast_service import broadcast_service
@broadcast_service.on_listen("topic")
def handle_subscriber_callback1():
return "handle_subscriber_callback 1"
@broadcast_service.on_listen("topic")
def handle_subscriber_callback2():
return [1, 2, 3, 4, 5]

def listen_all(self, callback: Callable):
"""
'__all__' is a special topic. It can receive any topic message.
"""
self._invoke_listen_topic('__all__', callback)
def broadcast(self, topics: str or List[str], *args, **kwargs):
"""
Launch broadcast on the specify topic. If all subscribe callback finish, it will call finish_callback.
"""
self.logger.debug(f"[broadcast-service] broadcast topic <{topics}>")
if type(topics) == str:
self._invoke_broadcast_topic(topics, *args, **kwargs)
elif type(topics) == list:
for topic in topics:
self._invoke_broadcast_topic(topic, *args, **kwargs)
else:
raise ValueError("Unknown broadcast-service error, please submit "
"issue to https://github.com/Undertone0809/broadcast-service/issues")
def broadcast_all(self, *args, **kwargs):
"""
All topics listened on will be called back.
Attention: Not all callback function will be called. If your publisher callback
and your subscriber callback takes different arguments, your callback function
will not be executed.
"""
for topic in self.pubsub_channels.keys():
self._invoke_broadcast_topic(topic, *args, **kwargs)


Step 2: ⌨️ Coding

Modify broadcast_service/_core.py with contents:
• Locate the `_invoke_callback()` function in the `broadcast_service/_core.py` file.
• Currently, this function uses the `result()` method of the `Future` object, which can cause thread blocking. We need to change this to prevent blocking and allow for asynchronous execution.
• Instead of using `result()`, use the `add_done_callback()` method of the `Future` object. This method adds a function to be called at some point in the future when the `Future` is completed. The callback function will be called with the `Future` as its only argument.
• The callback function should handle the result of the `Future` and any exceptions that may have occurred during its execution. This can be done by using the `result()` method within the callback function, which will not block because it is called after the `Future` is completed.
• Here is an example of how to use `add_done_callback()`: ```python def handle_future(future): try: result = future.result() # Handle result except Exception as e: # Handle exception

future.add_done_callback(handle_future)
```
• Replace the current usage of result() in `_invoke_callback()` with a similar structure to the above example.

--- 
+++ 
@@ -39,9 +39,15 @@
 ) -> Any:
     if enable_async:
         future_result = thread_pool.submit(callback, *args, **kwargs)
-        if future_result.result() is not None:
-            logger.debug(f"[broadcast-service invoke_callback result] {future_result.result()}")
-            return future_result.result()
+        def handle_future(future):
+            try:
+                result = future.result()
+                if result is not None:
+                    logger.debug(f"[broadcast-service invoke_callback result] {result}")
+                    return result
+            except Exception as e:
+                logger.error(f"[broadcast-service invoke_callback error] {str(e)}")
+        future_result.add_done_callback(handle_future)
     else:
         return callback(*args, **kwargs)
 
  • Running GitHub Actions for broadcast_service/_core.pyEdit
Check broadcast_service/_core.py with contents:

Ran GitHub Actions for 8282112afe43a774129985809a79429751877737:


Step 3: 🔁 Code Review

I have finished reviewing the code for completeness. I did not find errors for sweep/fix-thread-blocking.


🎉 Latest improvements to Sweep:

  • We just released a dashboard to track Sweep's progress on your issue in real-time, showing every stage of the process – from search to planning and coding.
  • Sweep uses OpenAI's latest Assistant API to plan code changes and modify code! This is 3x faster and significantly more reliable as it allows Sweep to edit code and validate the changes in tight iterations, the same way as a human would.

💡 To recreate the pull request edit the issue title or description. To tweak the pull request, leave a comment on the pull request.
Join Our Discord

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working sweep Sweep your software chores
Projects
None yet
1 participant