-
-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add EventStream class based on 'pulse' prototype #1990
base: main
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1990 +/- ##
==========================================
- Coverage 99.56% 99.51% -0.06%
==========================================
Files 114 114
Lines 14618 14722 +104
Branches 1117 1127 +10
==========================================
+ Hits 14554 14650 +96
- Misses 43 49 +6
- Partials 21 23 +2
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks pretty clean overall. Thanks for picking this up! I think it'll get used a lot.
The addition of close
is an interesting idea. Do you have a use case in mind? So far I've always used this for cases where there's some persistent data and subscribers are waiting for it to change, so "closing" isn't very intuitive -- the data might stop changing, but it's still there! But it's also a natural addition in some ways...
I'm uncertain about the value of coalesce=False
– can you make the case for it here so we can consider? Ditto for from_start=False
, and particularly for making it the default.
skeptical about coalesce=False
Naming: naming is always hard, and especially for something like this, where there's no prior art (that I know of). But, it's important, since I think this is one of those cases where at first look the semantics are counterintuitive and hard to explain, even though they feel very natural once you wrap your head around them. So if the name can point people in the right direction, it'll help a lot.
"Stream" is maybe not ideal, because we use that as a term of art for byte streams. But the big thing I want is a name that hints at the coalescing, and the suitability for tracking persistent state rather than transient events.
VersionTracker
? UpdateNotifier
? ChangeNotifier
? Refresher
? StateTracker
?
except: | ||
import traceback | ||
traceback.print_exc() | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was pytest not printing the traceback properly somehow while you were developing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was reporting a ResourceWarning about an async generator being disposed:
E Traceback (most recent call last):
E File "/home/mauve/dev/trio/trio/_core/_asyncgens.py", line 79, in finalizer
E warnings.warn(
E ResourceWarning: Async generator 'trio._sync.EventStream.subscribe' was garbage collected before it had been exhausted. Surround its use in 'async with a
closing(...):' to ensure that it gets cleaned up as soon as you're done using it.
I didn't dig into what you're doing here, and it sounds a bit odd to me, but I rolled with it. This is one reason I added the .close()
method (but I think it is a good idea anyway).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, ok. A lot going on here :-)
I don't see how the except
block could interact with that warning. The warning is generated in "GC context", where raising exceptions is impossible. Well, it's really a fake GC context, because we re-enter Trio and run the finalizer in async context, so Trio tries to emulate the interpreter's regular GC context... it's possible we don't do it perfectly and some kind of exception escapes somewhere. But I don't think so?
Also, that warning ought to be harmless in this case. It's pointing out that if the async generator is holding any resources like file descriptors, then they are being cleaned up by the GC rather than explicitly. Is this a useful thing to point out? tbh I'm not sure. ResourceWarning
s are hidden by default so it's not necessarily a big deal to have some false positives. But in this case it's pure noise, since subscribe
doesn't hold any resources. And CPython doesn't bother pointing this out when it GC's regular sync generators -- the only reason we can do it is because the GC delegates async generator cleanup to Trio. Maybe it would be better to just drop the warning entirely? It's kind of a mess.
@oremanj, any thoughts you want to add?
""" | ||
if self._wakeup is None: | ||
self._wakeup = trio.Event() | ||
await self._wakeup.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're worried about efficiency, then also consider implementing this with a set
and wait_task_rescheduled
– basically inlining Event
.
.. autoclass:: EventStream | ||
:members: | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd put the docs for this right after the docs for Event
-- "if you clicked here because you were looking for multiple events, then maybe you want...". (Maybe add a cross-ref at the top of the Event
docs too.)
I find it's generally useful to have a way to signal receivers to stop waiting. If you were using this to notify writes to a log file, say, then you could close when you rotate log files, causing them to exit the loop and pick up the next log file. At least, there are two outcomes for anext(), yield a value or stop iteration. By adding close() an event sender can cause its receiver to take either path, and that seems like a generally useful capability. There's also a parallel with memory channels, when you view an EventStream as a transaction log.
I've mentally generalised EventStream as a transaction log, where each reader is independent and consumes all transactions in the transaction log. The EventStream class exposes integer read cursors and write cursors which indicate the position in a stream of transactions, but these counters were built into Pulse already; this is an interpretation of them. The EventStream class doesn't hold the transactions, which I think is fine for a concurrency primitive. But what history you hold and where is up to you. You could hold a full list of transactions in memory (this doesn't scale, but it is helpful for learners), in which case the read_cursor yielded by subscribe() is an index into a list. You could hold bounded history, like 100 events in a deque. A use case here is, say, broadcasting terminal output to many connected subscribers, and a change on the terminal is a small delta of a few bytes. If a user is behind by <=100 deltas it's best to stream them the deltas, but if they are further behind then maybe you cancel the subscription, send them a full-screen snapshot and then start streaming deltas again. And then there's the case where there's zero history kept. A new event causes a wakeup but woken tasks only need to act on the status quo at the time they are woken. I think this is the aspect you have in mind. So those are three use cases that only really vary by the amount of transaction history you expect to hold - infinite, finite, or zero. In the infinite/finite cases, it might be most convenient to subscribe without coalescing because you want to process each transaction in turn (but maybe it's cheaper to coalesce in order to act in bulk). In the zero history case you certainly want coalescing. If you're holding bounded history then you will certainly start each subscriber at the current write cursor. The start of the entire transaction stream isn't data you hold any more. For infinite history, the same is probably true; you don't want to send an unbounded number of transactions to a new subscriber, so makes sense to send a slice of recent data and then subscribe at the end. For zero history, we're going to want to coalesce anyway, so |
This is a really interesting example, thanks! I feel like it might be even easier to implement with coalescing, though? Something like: prev_event = 0
async for latest_event in obj.subscribe():
gap = latest_event - prev_event
if gap <= 100:
await send_deltas(history_deque[-gap:])
else:
await send_snapshot()
prev_event = latest_event OTOH if you don't coalesce events, I haven't actually implemented it, by it doesn't feel easy to me -- I think you have to separately track the latest-event counter to compare to the index you get from Oh hmm though, I just realized that the above code relies on a very subtle invariant: there are no checkpoints between when Of course the other advantage of coalescing is that it nudges you away from designs that require infinite history. This is another version of the backpressure problem: in every real system like this, you need some strategy to catch up a consumer that's arbitrarily far behind. Or put another way, if you really want to send every item exactly once and you don't care about the costs, you should be using something like a memory channel, not this thing :-). Re:
So in this approach, initially ...OTOH, unconditionally yielding immediately on first iteration is probably more convenient for all the places where I've been using this, where you're waiting for a condition to become true. Of course you could trigger the counter immediately when you set it up, but it feels error-prone. Maybe yielding 0 is better, and the example above should be tweaked to handle that appropriately. It also has the nice property that if your code isn't expecting an immediate yield, you'll probably notice quickly and fix that, because it will happen on every test case. Do you have any examples where you specifically don't want to yield immediately on first subscribe, especially where there have been previous events? I'm still struggling to visualize those. |
Here's some other inventions of the same fundamental idea. None of this affects the PR really, but I think it's nice to have cross-references in the tracker history, and it's nice to see multiple independent efforts converge around the same abstraction:
|
Add the EventStream class, which allows multiple subscribers to a stream of events without exhibiting a missing wakeups problem.
I think the documentation is far from fully fleshed out here - I want to draw up a diagram of the read/write cursor interpretation for EventStream - but this is the bare bones of the direction I took this so I'm submitting it for sokme early feedbac.