AsyncPipeline.stream POC#11258
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. 1 Skipped Deployment
|
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||||||||||||||
|
@mpangrazzi @sjrl I'd appreciate your thoughts |
|
@anakin87 its looking good! I took a look at your demo repo and looks reasonable to me. I think @mpangrazzi can speak better on the implications of integrating this with Hayhooks. Perhaps requiring using of an AsyncIterable messes with easy integration into FastAPI? (I'm not really sure). The major question I had is more on what this would look for the sync pipeline? You mention that it could be problematic to set up and I was wondering if you could explain more in detail what you mean by that. |
| Iterate the handle to consume chunks; after iteration ends, `handle.result` holds the final pipeline output dict | ||
| (same as `run_async`). | ||
|
|
||
| For every async-capable component whose `run_async` accepts `streaming_callback`, a forwarder is injected |
There was a problem hiding this comment.
Let's be a little more explicit here to say that we only check if the input param is called streaming_callback in run_async. I don't think we do any type checking past that right?
| if is_callable_async_compatible(user_callback): | ||
| await cast(AsyncStreamingCallbackT, user_callback)(chunk) | ||
| else: | ||
| cast(SyncStreamingCallbackT, user_callback)(chunk) |
There was a problem hiding this comment.
Oh interesting so we want to support both async and sync streaming callbacks?
There was a problem hiding this comment.
Maybe in the future, but here it was wrong. I am now switching to the same select_streaming_callback utility we use everywhere.
| if not isinstance(comp_inputs, dict): | ||
| continue |
There was a problem hiding this comment.
Could you explain what this is catching? Do we allow non-dict entries in our data?
There was a problem hiding this comment.
Relic of the initial vibe-coded solution. Removing it now.
Co-authored-by: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com>
| if not isinstance(comp_inputs, dict): | ||
| continue | ||
| runtime_callback: StreamingCallbackT | None = comp_inputs.get("streaming_callback") | ||
| init_callback: StreamingCallbackT | None = getattr(instance, "streaming_callback", None) |
There was a problem hiding this comment.
This feels a bit fragile. E.g. it could be stored under a different attribute name. At the very least could we update the docstring The forwarder composes with any user-supplied streaming_callback. to mention how users can or should supply a streaming callback?
There was a problem hiding this comment.
Also should we allow users to directly provide a streaming callback in the stream method?
There was a problem hiding this comment.
- I now tried to explain better in the docstring (b72c878). LMK if still unclear/fragile
- Yes, can be passed via
data(now explained)
I think that this new feature provides value to async users in APIs (#8742, #9347). For sync users, the Why? Stream does: a Pipeline produces chunks in the background; consumer pulls them on demand. In Hayhooks, motivated by the API context and by the fact that most old pipelines are sync, we bridge sync pipelines into the async streaming endpoint, but, talking with Michele, this creates similar problems. |
mpangrazzi
left a comment
There was a problem hiding this comment.
I think this is going in the right direction! In Hayhooks we've already learned some of the "hard parts": cleanup on cancellation, wrapper/socket detection, and selection of which components stream. I would maybe look on how to port those ideas here.
| def __aiter__(self) -> "PipelineStreamHandle": | ||
| return self | ||
|
|
||
| async def __anext__(self) -> StreamingChunk: |
There was a problem hiding this comment.
I think this needs lifecycle handling for abandoned iteration. If a caller breaks out of async for, is cancelled, or an HTTP client disconnects (and it happens), __anext__ is no longer awaited and the underlying pipeline task keeps running unless the caller remembers to call handle.aclose() manually.
We may want to handle this, maybe with opt-out? (In some cases you want the pipeline to complete running regardless e.g. of client disconnections)
In Hayhooks this is solved this by exposing an async generator with a finally block that cancels/awaits the pipeline task during generator close. Could we either make stream() return an async generator with cleanup semantics, or make PipelineStreamHandle an async context manager and document/enforce async with usage?
A test that consumes one chunk and exits early would be useful here.
| for name in self.graph.nodes | ||
| if getattr(self.graph.nodes[name]["instance"], "__haystack_supports_async__", False) | ||
| and "streaming_callback" in inspect.signature(self.graph.nodes[name]["instance"].run_async).parameters | ||
| } |
There was a problem hiding this comment.
This streaming-capable detection may be too narrow. It only checks inspect.signature(instance.run_async).parameters, but some Haystack wrappers/components can expose streaming_callback through Haystack input sockets without reflecting it directly in the Python method signature.
In Hayhooks we had to add a fallback through __haystack_input__._sockets_dict for wrapper components (e.g. code components). Should this use the component socket metadata as the source of truth, or at least fall back to it?
| "input1": 1, "input2": 2, | ||
| } | ||
| ``` | ||
| :param streaming_components: Names of components to stream from. If `None` (default), every streaming-capable |
There was a problem hiding this comment.
On Hayhooks by default we stream only the last capable component - do you feel this is still safe as a default?
| async def forwarder(chunk: StreamingChunk) -> None: | ||
| await queue.put(chunk) | ||
| if user_callback is not None: | ||
| await user_callback(chunk) |
There was a problem hiding this comment.
I agree that calling user callback after putting the chunk on the queue is ok, because the returned iterator is the primary consumer and a side callback failure should not cause the current chunk to be lost.
BTW, it would be good to add an explicit test: callback raises after one chunk, the stream yields that chunk, then iteration surfaces the pipeline/callback failure.
Related Issues
Proposed Changes:
AsyncPipeline.streammethod that returns aPipelineStreamHandleasync for chunk in handleresultfield to get the final resultChoices and limitations:
PipelineStreamHandle). This currently makes the integration with Hayhooks not exactly ergonomic, but we can work to improve itDoes not contain breaking changes but it's thought for Haystack 3.
How did you test it?
Checklist
fix:,feat:,build:,chore:,ci:,docs:,style:,refactor:,perf:,test:and added!in case the PR includes breaking changes.