Skip to content

Allow streaming events while running a pipeline#4804

Merged
schustmi merged 4 commits into
developfrom
feature/streaming
May 27, 2026
Merged

Allow streaming events while running a pipeline#4804
schustmi merged 4 commits into
developfrom
feature/streaming

Conversation

@schustmi
Copy link
Copy Markdown
Contributor

@schustmi schustmi commented May 7, 2026

Describe changes

Live event streaming on pipeline runs over SSE. Can be enabled by setting stream_broker_implementation_source.

Producer

Call zenml.streaming.publish({...}, kind=..., correlation_id=..., index=...) from inside any step or dynamic pipeline.

Server

Add StreamBroker interface and RedisStreamsBroker implementation. StreamBroadcaster runs one broker reader per stream, fans out to N SSE consumers, handles catch-up, gap signalling, idle cleanup, per-stream consumer cap.

Endpoint

GET /api/v1/runs/{id}/events/stream

Multi-value filters: ?kinds=, ?step_names=, ?correlation_ids=. Resume via Last-Event-ID header or ?since=.

Pre-requisites

Please ensure you have done the following:

  • I have read the CONTRIBUTING.md document.
  • I have added tests to cover my changes.
  • I have based my new branch on develop and the open PR is targeting develop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.
  • IMPORTANT: I made sure that my changes are reflected properly in the following resources:
    • ZenML Docs
    • Dashboard: Needs to be communicated to the frontend team.
    • Templates: Might need adjustments (that are not reflected in the template tests) in case of non-breaking changes and deprecations.
    • Projects: Depending on the version dependencies, different projects might get affected.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Other (add details above)

@github-actions github-actions Bot added internal To filter out internal PRs and issues enhancement New feature or request labels May 7, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

✅ No broken links found!

@schustmi schustmi force-pushed the feature/streaming branch 6 times, most recently from 043fd2c to ccf4e83 Compare May 11, 2026 08:27
@schustmi schustmi added the release-notes Release notes will be attached and used publicly for this PR. label May 11, 2026
@schustmi schustmi force-pushed the feature/streaming branch 9 times, most recently from dd1101a to 5f0da36 Compare May 18, 2026 08:39
@schustmi schustmi force-pushed the feature/streaming branch 10 times, most recently from 524403d to 2e1ea8d Compare May 21, 2026 15:18
@schustmi schustmi marked this pull request as ready for review May 21, 2026 15:18
@schustmi schustmi force-pushed the feature/streaming branch 2 times, most recently from 81cb5f6 to 2781e2c Compare May 21, 2026 16:01
@schustmi schustmi force-pushed the feature/streaming branch 9 times, most recently from bdc2f04 to 5036479 Compare May 26, 2026 08:05
@schustmi schustmi force-pushed the feature/streaming branch from 5036479 to 15a4318 Compare May 26, 2026 08:58
Comment thread src/zenml/zen_server/streaming/brokers/redis_streams.py
@schustmi schustmi force-pushed the feature/streaming branch from 15a4318 to 964b92d Compare May 26, 2026 09:46
@schustmi schustmi force-pushed the feature/streaming branch from 964b92d to 8537156 Compare May 26, 2026 10:17
Comment thread src/zenml/constants.py Outdated
Comment thread src/zenml/zen_server/routers/runs_endpoints.py Outdated
return_when=asyncio.FIRST_COMPLETED,
)
if not done:
# TODO: consider checking the run status here and synthesizing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting I like it. We can reach such conditions, I think it would be good to have it implemented as part of this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason I didn't implement this was that I'm concerned about overloading our server.

  • there might be multiple consumers of the same stream on different replicas
  • for each consumer, this would send a request for the run status every 30 seconds or whatever the configured heartbeat interval
  • the current implementation scales pretty well with multiple consumers, as each replica only subscribes to each stream only once. but this would now add DB load relative to the consumer count

From my understanding (but I might be wrong), idling consumers waiting on an empty stream shouldn't consume any noticeable resources.

On the other hand, it usually won't really matter for the consumer if the connection isn't closed as soon as the run ends. The user will usually be on a website that displays the stream, and once they change the URL the browser will close the connection.

So essentially, we'd add these potentially many run status checks for little benefit, and the happy case should already publish the end frame just fine, so all of this would be just to cover some unlikely edge cases.

What do you think?

Copy link
Copy Markdown
Contributor

@Json-Andriopoulos Json-Andriopoulos May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I was thinking about a more infrequent HB (5, 10 minutes would make sense) to avoid the mid/long-term effect of dangling sessions but even that is probably a stretch. An alternative here is: We don't care as long as we can inspect it. For instance, there is a server performance issue how can I rule out SSE, queues etc?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Json-Andriopoulos What do you mean by not caring here? Not caring about idle connections? Or not caring about DB calls?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idle connections :)

Comment thread src/zenml/streaming/publishing.py
Comment thread src/zenml/streaming/publishing.py Outdated
@schustmi schustmi force-pushed the feature/streaming branch from 058d02a to fca9cf2 Compare May 27, 2026 13:20
@socket-security
Copy link
Copy Markdown

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addedredis@​7.4.099100100100100

View full report

@schustmi schustmi merged commit 28cf3a2 into develop May 27, 2026
54 of 59 checks passed
@schustmi schustmi deleted the feature/streaming branch May 27, 2026 15:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request internal To filter out internal PRs and issues release-notes Release notes will be attached and used publicly for this PR.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants