Context
The Agent.stream() method currently streams token-by-token responses but provides no hooks for applications to react to streaming events (e.g., token received, stream started, stream completed, error occurred). This forces developers to either:
- Parse streamed text manually post-completion for state updates
- Wrap the Agent class to inject custom streaming logic
- Sacrifice streaming benefits to use synchronous
.run() calls with post-processing
Problem
Looking at the current implementation in strands/agent.py, the stream() method yields tokens but doesn't expose lifecycle events. Real-world applications need to:
- Update UI/progress indicators as tokens arrive
- Track token counts for cost monitoring
- Implement cancellation handlers
- Log or cache streaming completions
- Handle provider-specific streaming errors gracefully
Without event hooks, these become bolted-on wrappers that duplicate stream iteration logic.
What Good Looks Like
LangChain's BaseCallbackHandler and LiteLLM's streaming callbacks demonstrate this pattern:
class StreamingCallback:
def on_token_received(self, token: str, **kwargs): pass
def on_stream_start(self, **kwargs): pass
def on_stream_end(self, **kwargs): pass
def on_stream_error(self, error: Exception, **kwargs): pass
# Usage
agent = Agent(tools=[...], streaming_callbacks=[MyCallback()])
for chunk in agent.stream("query"):
# Callbacks fire automatically
pass
Proposed Solution
- Add
streaming_callbacks parameter to Agent.__init__() accepting a list of callback objects
- Define a
StreamingCallback base class with optional methods:
on_token_received(token: str, full_response: str, **kwargs)
on_stream_start(**kwargs)
on_stream_end(full_response: str, **kwargs)
on_stream_error(error: Exception, **kwargs)
- Update
stream() to invoke callbacks at appropriate points
- Document with example: token counter for billing, UI progress bar, and error recovery
Why This Matters
Streaming is a core feature for responsive agent UX. Without events, developers can't build production-grade applications that rely on streaming (e.g., real-time dashboards, cost tracking, cancellation). This addresses issue #1819 and unblocks downstream tooling.
Contributed by Klement Gunndu
Context
The
Agent.stream()method currently streams token-by-token responses but provides no hooks for applications to react to streaming events (e.g., token received, stream started, stream completed, error occurred). This forces developers to either:.run()calls with post-processingProblem
Looking at the current implementation in
strands/agent.py, thestream()method yields tokens but doesn't expose lifecycle events. Real-world applications need to:Without event hooks, these become bolted-on wrappers that duplicate stream iteration logic.
What Good Looks Like
LangChain's
BaseCallbackHandlerand LiteLLM's streaming callbacks demonstrate this pattern:Proposed Solution
streaming_callbacksparameter toAgent.__init__()accepting a list of callback objectsStreamingCallbackbase class with optional methods:on_token_received(token: str, full_response: str, **kwargs)on_stream_start(**kwargs)on_stream_end(full_response: str, **kwargs)on_stream_error(error: Exception, **kwargs)stream()to invoke callbacks at appropriate pointsWhy This Matters
Streaming is a core feature for responsive agent UX. Without events, developers can't build production-grade applications that rely on streaming (e.g., real-time dashboards, cost tracking, cancellation). This addresses issue #1819 and unblocks downstream tooling.
Contributed by Klement Gunndu