(feat): Improve bidi event loop#10
Conversation
| # Synchronization primitives | ||
| self.interrupted = False | ||
| self.interruption_lock = asyncio.Lock() | ||
| self.conversation_lock = asyncio.Lock() # Race condition prevention |
There was a problem hiding this comment.
This lock is used when adding to conversation history. Using a lock here to ensure the conversation history is not corrupted or overwritten. For example if multiple tools complete at the same time, then a lock will help ensure that all the results are written to the conversation history
There was a problem hiding this comment.
Does order of writing tool results matter? I guess tools will execute as they appear, and the model will continue to stream its response.
There was a problem hiding this comment.
Technically speaking we shouldn't need a lock for the following reasons:
- async tasks run one at a time but in a cooperative manner. But because they run one at a time, their is no concern of resource conflicts when operating in memory.
- The messages array is first updated in memory, which can only be altered by one task at a time.
- The session managers operate in hooks, which run synchronously. Assuming hooks could run asynchronously, I would say it is up to them to place in locks.
| # Thread-safe counter increment | ||
| current_tool_number = self.tool_count + 1 | ||
| self.tool_count = current_tool_number | ||
| print(f"\nTool #{current_tool_number}: {tool_name}") |
There was a problem hiding this comment.
similar to PrintingCallbackHandler in sdk-python/src/strands/handlers/callback_handler.py
There was a problem hiding this comment.
Given that we are getting closer to release, we should not have any prints in the code
| break | ||
|
|
||
| # Remove completed tasks from supervision list | ||
| tasks_to_supervise = [task for task in tasks_to_supervise if not task.done()] |
There was a problem hiding this comment.
In this case the only task being supervised will be the _process_model_events task in normal connection/session and thus tasks_to_supervise list remains unchanged - no tasks removed. However, The removal logic exists as a safety net in case the model events task unexpectedly completes (network failure) and we need graceful shutdown.
| return event_loop | ||
|
|
||
|
|
||
| async def stop_bidirectional_connection(event_loop: "BidirectionalEventLoop") -> None: |
There was a problem hiding this comment.
just a wrapper that calls event_loop.stop(). Mainly to preserve existing api of agent.start_bidirectional_connection and agent.stop_bidirectional_connection in the agent class.
| # Synchronization primitives | ||
| self.interrupted = False | ||
| self.interruption_lock = asyncio.Lock() | ||
| self.conversation_lock = asyncio.Lock() # Race condition prevention |
There was a problem hiding this comment.
Does order of writing tool results matter? I guess tools will execute as they appear, and the model will continue to stream its response.
| while maintaining a simple interface for agent interaction. | ||
|
|
||
| Features: | ||
| - Concurrent task management for model events and tool execution |
There was a problem hiding this comment.
_process_tool_execution I saw this function is removed but mentioned in doc.
There was a problem hiding this comment.
this was just to compare the old architecture
| tool_use_id = tool_result.get("toolUseId") | ||
| await self.model_session.send_tool_result(tool_use_id, tool_result) | ||
| logger.debug("Tool result sent: %s", tool_use_id) | ||
| elif isinstance(tool_event, ToolStreamEvent): |
There was a problem hiding this comment.
Just curious how we deal with ToolStreamEvent
There was a problem hiding this comment.
should we also add these to the output queue? We want our customers to develop nice UI around this
| "start_bidirectional_connection", | ||
| "stop_bidirectional_connection", | ||
| "bidirectional_event_loop_cycle", | ||
| "BidirectionalAgentLoop", |
There was a problem hiding this comment.
do we actually want to export this? Is there any use case where customers would like to access agentic loop directly?
| logger.debug("Session cleanup starting") | ||
| session.active = False | ||
| # Cancel pending tool tasks | ||
| for task in self.pending_tool_tasks.values(): |
There was a problem hiding this comment.
nit: instead of duplicating cancellation, you can move all_tasks = list(self.pending_tool_tasks.values()) + self.background_tasks to top, and just do task loop cancellation once.
| self.tool_queue = asyncio.Queue() | ||
| self.audio_output_queue = asyncio.Queue() | ||
| # Task tracking | ||
| self.background_tasks: List[asyncio.Task] = [] |
There was a problem hiding this comment.
given that there is a single background task, do we need it to be a list?
| # Thread-safe counter increment | ||
| current_tool_number = self.tool_count + 1 | ||
| self.tool_count = current_tool_number | ||
| print(f"\nTool #{current_tool_number}: {tool_name}") |
There was a problem hiding this comment.
Given that we are getting closer to release, we should not have any prints in the code
| async def bidirectional_event_loop_cycle(session: BidirectionalConnection) -> None: | ||
| """Main event loop coordinator that runs continuously during the session. | ||
| # Create task with UUID tracking | ||
| task_id = str(uuid.uuid4()) |
| self.audio_output_queue = asyncio.Queue() | ||
| # Task tracking | ||
| self.background_tasks: List[asyncio.Task] = [] | ||
| self.pending_tool_tasks: Dict[str, asyncio.Task] = {} |
There was a problem hiding this comment.
nit: pending tool tasks to me sounds like tool uses that we still haven't started executing. From the code, it looks like currently running tool tasks right?
| logger.debug("Interruption already in progress") | ||
| return | ||
| if active_tool_tasks: | ||
| logger.debug("Tools are protected - %d tools currently executing", len(active_tool_tasks)) |
| logger.debug("Tool tasks cancelled: %d", cancelled_tools) | ||
| # Clear output queue | ||
| cleared_count = 0 | ||
| while True: |
| if not isinstance(provider_event, dict): | ||
| continue | ||
|
|
||
| strands_event = provider_event |
| tool_use_id = tool_result.get("toolUseId") | ||
| await self.model_session.send_tool_result(tool_use_id, tool_result) | ||
| logger.debug("Tool result sent: %s", tool_use_id) | ||
| elif isinstance(tool_event, ToolStreamEvent): |
There was a problem hiding this comment.
should we also add these to the output queue? We want our customers to develop nice UI around this
Bidirectional Agent Loop Implementation
This implementation represents a significant architectural simplification of the bidirectional streaming event loop, reducing complexity from 3 background tasks to 1 while maintaining all original functionality and performance benefits.
The refactoring eliminates complex supervisor pattern in favor of standard Python error handling, removes coordinator function layers, and provides tool execution concurrent operations.
What has changed?
1. Simplified Background Task Architecture
Before:
After:
2. Eliminated Complex Supervision Pattern
Before:
After:
3. Removed Coordinator Function Layer
Before:
After:
4. Enhanced Tool Execution
Before:
After:
Performance Improvements
Breaking Changes
None. All changes maintain full backward compatibility:
Related Issues
None