Skip to content

(feat): Improve bidi event loop#10

Closed
mehtarac wants to merge 6 commits into
mainfrom
bar_raise_eventloop
Closed

(feat): Improve bidi event loop#10
mehtarac wants to merge 6 commits into
mainfrom
bar_raise_eventloop

Conversation

@mehtarac
Copy link
Copy Markdown
Owner

@mehtarac mehtarac commented Oct 21, 2025

Bidirectional Agent Loop Implementation

This implementation represents a significant architectural simplification of the bidirectional streaming event loop, reducing complexity from 3 background tasks to 1 while maintaining all original functionality and performance benefits.

The refactoring eliminates complex supervisor pattern in favor of standard Python error handling, removes coordinator function layers, and provides tool execution concurrent operations.

What has changed?

1. Simplified Background Task Architecture

Before:

# 3 separate background tasks with complex coordination
session = BidirectionalConnection(model_session=model_session, agent=agent)
session.background_tasks = [
    asyncio.create_task(_process_model_events(session)),
    asyncio.create_task(_process_tool_execution(session)),
]
session.main_cycle_task = asyncio.create_task(bidirectional_event_loop_cycle(session))

# Queue-based tool execution with polling delays
async def _process_tool_execution(session: BidirectionalConnection) -> None:
    while session.active:
        tool_use = await asyncio.wait_for(session.tool_queue.get(), timeout=0.5)
        # 0-500ms polling delay before tool execution

After:

# Single background task with integrated error handling
agent_loop = BidirectionalAgentLoop(model_session=model_session, agent=agent)
self.background_tasks = [
    asyncio.create_task(self._process_model_events())
]

# Tool execution with concurrent task creation
def schedule_tool_execution(self, tool_use: ToolUse) -> None:
    task = asyncio.create_task(self._execute_tool(tool_use))  # 0ms delay
    self.pending_tool_tasks[task_id] = task

2. Eliminated Complex Supervision Pattern

Before:

# Event-driven supervision with asyncio.wait monitoring
async def _supervise_session(self) -> None:
    while self.active and tasks_to_supervise:
        done, pending = await asyncio.wait(
            tasks_to_supervise,
            return_when=asyncio.FIRST_COMPLETED,
            timeout=1.0
        )
        # Complex task failure detection logic

After:

# Built-in error handling in main processor
async def _process_model_events(self) -> None:
    try:
        async for provider_event in self.model_session.receive_events():
            # Event processing logic...
    except Exception as e:
        logger.error("Agent loop processor failed: %s", e)
        self.active = False  # Trigger shutdown

3. Removed Coordinator Function Layer

Before:

# Agent uses coordinator functions as intermediary layer
from ..event_loop.bidirectional_event_loop import start_bidirectional_connection, stop_bidirectional_connection

async def start(self):
    self._session = await start_bidirectional_connection(self)

async def end(self):
    await stop_bidirectional_connection(self._session)

After:

# Agent creates and manages BidirectionalAgentLoop directly
from ..event_loop.bidirectional_event_loop import BidirectionalAgentLoop

async def start(self):
    model_session = await self.model.create_bidirectional_connection(...)
    self._session = BidirectionalAgentLoop(model_session=model_session, agent=self)
    await self._session.start()

async def end(self):
    await self._session.stop()

4. Enhanced Tool Execution

Before:

# Tools could be cancelled during interruptions
async def _handle_interruption(session: BidirectionalConnection) -> None:
    # Cancel all pending tool tasks regardless of execution state
    for task_id, task in list(session.pending_tool_tasks.items()):
        if not task.done():
            task.cancel()

After:

# Tools are protected from interruption once started
async def handle_interruption(self) -> None:
    active_tool_tasks = [task for task in self.pending_tool_tasks.values() if not task.done()]
    if active_tool_tasks:
        logger.debug("Tools are protected - %d tools currently executing", len(active_tool_tasks))
        # Clear audio but don't cancel running tools
    else:
        logger.debug("No active tools - full interruption handling")
    # Always clear audio queues for responsive interruption

Performance Improvements

  • Tool execution latency: Reduced from 0-500ms (queue polling) to 0ms (immediate scheduling)
  • Background tasks: Reduced from 3 tasks to 1 task (67% reduction)
  • Architecture: Simplified from multi-function coordination to single class

Breaking Changes

None. All changes maintain full backward compatibility:

  • Same public interface for bidirectional agent methods
  • Same functionality and behavior

Related Issues

None

# Synchronization primitives
self.interrupted = False
self.interruption_lock = asyncio.Lock()
self.conversation_lock = asyncio.Lock() # Race condition prevention
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock is used when adding to conversation history. Using a lock here to ensure the conversation history is not corrupted or overwritten. For example if multiple tools complete at the same time, then a lock will help ensure that all the results are written to the conversation history

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does order of writing tool results matter? I guess tools will execute as they appear, and the model will continue to stream its response.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically speaking we shouldn't need a lock for the following reasons:

  1. async tasks run one at a time but in a cooperative manner. But because they run one at a time, their is no concern of resource conflicts when operating in memory.
  2. The messages array is first updated in memory, which can only be altered by one task at a time.
  3. The session managers operate in hooks, which run synchronously. Assuming hooks could run asynchronously, I would say it is up to them to place in locks.

# Thread-safe counter increment
current_tool_number = self.tool_count + 1
self.tool_count = current_tool_number
print(f"\nTool #{current_tool_number}: {tool_name}")
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to PrintingCallbackHandler in sdk-python/src/strands/handlers/callback_handler.py

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we are getting closer to release, we should not have any prints in the code

break

# Remove completed tasks from supervision list
tasks_to_supervise = [task for task in tasks_to_supervise if not task.done()]
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case the only task being supervised will be the _process_model_events task in normal connection/session and thus tasks_to_supervise list remains unchanged - no tasks removed. However, The removal logic exists as a safety net in case the model events task unexpectedly completes (network failure) and we need graceful shutdown.

return event_loop


async def stop_bidirectional_connection(event_loop: "BidirectionalEventLoop") -> None:
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a wrapper that calls event_loop.stop(). Mainly to preserve existing api of agent.start_bidirectional_connection and agent.stop_bidirectional_connection in the agent class.

@mehtarac mehtarac self-assigned this Oct 21, 2025
# Synchronization primitives
self.interrupted = False
self.interruption_lock = asyncio.Lock()
self.conversation_lock = asyncio.Lock() # Race condition prevention
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does order of writing tool results matter? I guess tools will execute as they appear, and the model will continue to stream its response.

while maintaining a simple interface for agent interaction.

Features:
- Concurrent task management for model events and tool execution
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_process_tool_execution I saw this function is removed but mentioned in doc.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was just to compare the old architecture

tool_use_id = tool_result.get("toolUseId")
await self.model_session.send_tool_result(tool_use_id, tool_result)
logger.debug("Tool result sent: %s", tool_use_id)
elif isinstance(tool_event, ToolStreamEvent):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious how we deal with ToolStreamEvent

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also add these to the output queue? We want our customers to develop nice UI around this

"start_bidirectional_connection",
"stop_bidirectional_connection",
"bidirectional_event_loop_cycle",
"BidirectionalAgentLoop",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually want to export this? Is there any use case where customers would like to access agentic loop directly?

logger.debug("Session cleanup starting")
session.active = False
# Cancel pending tool tasks
for task in self.pending_tool_tasks.values():
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of duplicating cancellation, you can move all_tasks = list(self.pending_tool_tasks.values()) + self.background_tasks to top, and just do task loop cancellation once.

self.tool_queue = asyncio.Queue()
self.audio_output_queue = asyncio.Queue()
# Task tracking
self.background_tasks: List[asyncio.Task] = []
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that there is a single background task, do we need it to be a list?

# Thread-safe counter increment
current_tool_number = self.tool_count + 1
self.tool_count = current_tool_number
print(f"\nTool #{current_tool_number}: {tool_name}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we are getting closer to release, we should not have any prints in the code

async def bidirectional_event_loop_cycle(session: BidirectionalConnection) -> None:
"""Main event loop coordinator that runs continuously during the session.
# Create task with UUID tracking
task_id = str(uuid.uuid4())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just use tool use id?

self.audio_output_queue = asyncio.Queue()
# Task tracking
self.background_tasks: List[asyncio.Task] = []
self.pending_tool_tasks: Dict[str, asyncio.Task] = {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pending tool tasks to me sounds like tool uses that we still haven't started executing. From the code, it looks like currently running tool tasks right?

logger.debug("Interruption already in progress")
return
if active_tool_tasks:
logger.debug("Tools are protected - %d tools currently executing", len(active_tool_tasks))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: warn or info?

logger.debug("Tool tasks cancelled: %d", cancelled_tools)
# Clear output queue
cleared_count = 0
while True:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use queue.empty()

if not isinstance(provider_event, dict):
continue

strands_event = provider_event
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

tool_use_id = tool_result.get("toolUseId")
await self.model_session.send_tool_result(tool_use_id, tool_result)
logger.debug("Tool result sent: %s", tool_use_id)
elif isinstance(tool_event, ToolStreamEvent):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also add these to the output queue? We want our customers to develop nice UI around this

@github-actions github-actions Bot added the size/l label Nov 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants