Allow streaming events while running a pipeline#4804
Conversation
|
✅ No broken links found! |
043fd2c to
ccf4e83
Compare
dd1101a to
5f0da36
Compare
524403d to
2e1ea8d
Compare
81cb5f6 to
2781e2c
Compare
bdc2f04 to
5036479
Compare
5036479 to
15a4318
Compare
15a4318 to
964b92d
Compare
964b92d to
8537156
Compare
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
| if not done: | ||
| # TODO: consider checking the run status here and synthesizing |
There was a problem hiding this comment.
This is interesting I like it. We can reach such conditions, I think it would be good to have it implemented as part of this PR.
There was a problem hiding this comment.
The only reason I didn't implement this was that I'm concerned about overloading our server.
- there might be multiple consumers of the same stream on different replicas
- for each consumer, this would send a request for the run status every 30 seconds or whatever the configured heartbeat interval
- the current implementation scales pretty well with multiple consumers, as each replica only subscribes to each stream only once. but this would now add DB load relative to the consumer count
From my understanding (but I might be wrong), idling consumers waiting on an empty stream shouldn't consume any noticeable resources.
On the other hand, it usually won't really matter for the consumer if the connection isn't closed as soon as the run ends. The user will usually be on a website that displays the stream, and once they change the URL the browser will close the connection.
So essentially, we'd add these potentially many run status checks for little benefit, and the happy case should already publish the end frame just fine, so all of this would be just to cover some unlikely edge cases.
What do you think?
There was a problem hiding this comment.
Hmm I was thinking about a more infrequent HB (5, 10 minutes would make sense) to avoid the mid/long-term effect of dangling sessions but even that is probably a stretch. An alternative here is: We don't care as long as we can inspect it. For instance, there is a server performance issue how can I rule out SSE, queues etc?
There was a problem hiding this comment.
@Json-Andriopoulos What do you mean by not caring here? Not caring about idle connections? Or not caring about DB calls?
There was a problem hiding this comment.
Idle connections :)
058d02a to
fca9cf2
Compare
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
Describe changes
Live event streaming on pipeline runs over SSE. Can be enabled by setting
stream_broker_implementation_source.Producer
Call
zenml.streaming.publish({...}, kind=..., correlation_id=..., index=...)from inside any step or dynamic pipeline.Server
Add
StreamBrokerinterface andRedisStreamsBrokerimplementation.StreamBroadcasterruns one broker reader per stream, fans out to N SSE consumers, handles catch-up, gap signalling, idle cleanup, per-stream consumer cap.Endpoint
GET /api/v1/runs/{id}/events/stream
Multi-value filters:
?kinds=,?step_names=,?correlation_ids=. Resume viaLast-Event-IDheader or?since=.Pre-requisites
Please ensure you have done the following:
developand the open PR is targetingdevelop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.Types of changes