Skip to content

Commit 29b57fb

Browse files
authored
Merge pull request #18 from mkmeral/bidi-run
feat(bidirectional): Add agent.run
2 parents 11d57c9 + 73e85dc commit 29b57fb

1 file changed

Lines changed: 70 additions & 0 deletions

File tree

  • src/strands/experimental/bidirectional_streaming/agent

src/strands/experimental/bidirectional_streaming/agent/agent.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,76 @@ async def end(self) -> None:
431431
await stop_bidirectional_connection(self._session)
432432
self._session = None
433433

434+
async def run(
435+
self,
436+
*,
437+
sender: Callable[[Any], Any],
438+
receiver: Callable[[], Any],
439+
) -> None:
440+
"""Run the agent with send/receive loop management.
441+
442+
Starts the session, pipes events between the agent and transport layer,
443+
and handles cleanup on disconnection.
444+
445+
Args:
446+
sender: Async callable that sends events to the client (e.g., websocket.send_json).
447+
receiver: Async callable that receives events from the client (e.g., websocket.receive_json).
448+
449+
Example:
450+
```python
451+
# With WebSocket
452+
agent = BidirectionalAgent(model=model, tools=[calculator])
453+
await agent.run(sender=websocket.send_json, receiver=websocket.receive_json)
454+
455+
# With custom transport
456+
async def custom_send(event):
457+
# Custom send logic
458+
pass
459+
460+
async def custom_receive():
461+
# Custom receive logic
462+
return event
463+
464+
await agent.run(sender=custom_send, receiver=custom_receive)
465+
```
466+
467+
Raises:
468+
Exception: Any exception from the transport layer (e.g., WebSocketDisconnect).
469+
"""
470+
await self.start()
471+
472+
async def receive_from_agent():
473+
"""Receive events from agent and send to client."""
474+
try:
475+
async for event in self.receive():
476+
await sender(event)
477+
except Exception as e:
478+
logger.debug(f"Receive from agent stopped: {e}")
479+
raise
480+
481+
async def send_to_agent():
482+
"""Receive events from client and send to agent."""
483+
try:
484+
while self._session and self._session.active:
485+
event = await receiver()
486+
await self.send(event)
487+
except Exception as e:
488+
logger.debug(f"Send to agent stopped: {e}")
489+
raise
490+
491+
try:
492+
# Run both loops concurrently
493+
await asyncio.gather(
494+
receive_from_agent(),
495+
send_to_agent(),
496+
return_exceptions=True
497+
)
498+
finally:
499+
try:
500+
await self.end()
501+
except Exception as e:
502+
logger.debug(f"Error during cleanup: {e}")
503+
434504
def _validate_active_session(self) -> None:
435505
"""Validate that an active session exists.
436506

0 commit comments

Comments
 (0)