-
Notifications
You must be signed in to change notification settings - Fork 20
Fix async streaming hangs on dead connections #212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e0a587a
abb4ba3
816f53c
5dd6ec2
e628977
6d3d2f7
461116b
af263c3
1550484
238e00f
d7714a8
84ecf1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,6 +23,9 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import json | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import sys | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import threading | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import uuid | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Any | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -51,6 +54,49 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| MAX_API_RETRY = 5 # Maximum number of consecutive API error retries | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TASK_RETRY_LIMIT = 3 # Maximum retry attempts for a failed task | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TASK_RETRY_BACKOFF = 10 # Initial backoff in seconds between task retries | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Application-level backstop: kill a streaming run if no events yielded for 30 min. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Complements the TCP-level httpx.Timeout(read=300s) in agent.py which catches | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # dead sockets; this catches subtler hangs where the connection stays open but | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # the server (or async generator) stops producing events. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| STREAM_IDLE_TIMEOUT = 1800 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Watchdog: a non-asyncio thread that force-kills the process if the event | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # loop stops making progress. Covers every hang variant (dead connections, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # asyncio cleanup spin, MCP cleanup, etc.) because it runs outside asyncio. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| WATCHDOG_IDLE_TIMEOUT = int(os.environ.get("WATCHDOG_IDLE_TIMEOUT", "2100")) # 35 min default | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _watchdog_last_activity = time.monotonic() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _watchdog_lock = threading.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def watchdog_ping() -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Call from any coroutine/callback to signal the process is alive.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| global _watchdog_last_activity | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with _watchdog_lock: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _watchdog_last_activity = time.monotonic() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _watchdog_thread(timeout: int) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Background thread: force-exit if no activity for *timeout* seconds.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| check_interval = min(60, max(1, timeout // 5)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while True: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| time.sleep(check_interval) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with _watchdog_lock: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| idle = time.monotonic() - _watchdog_last_activity | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if idle > timeout: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logging.error( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"Watchdog: no activity for {idle:.0f}s (limit {timeout}s) — " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "force-exiting to prevent hang" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| sys.stderr.flush() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| sys.stdout.flush() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| os._exit(2) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def start_watchdog(timeout: int = WATCHDOG_IDLE_TIMEOUT) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Start the watchdog thread (idempotent, daemon thread).""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| t = threading.Thread(target=_watchdog_thread, args=(timeout,), daemon=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| t.start() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _resolve_model_config( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -367,6 +413,7 @@ async def deploy_task_agents( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| server_prompts=server_prompts, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| important_guidelines=important_guidelines, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| agent0 = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| agent0 = TaskAgent( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| name=primary_name, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| instructions=prompt_with_handoff_instructions(system_prompt) if handoffs else system_prompt, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -391,9 +438,32 @@ async def _run_streamed() -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while rate_limit_backoff: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| result = agent0.run_streamed(prompt, max_turns=max_turns) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async for event in result.stream_events(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stream = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stream = result.stream_events() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async_iter = stream.__aiter__() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while True: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event = await asyncio.wait_for( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async_iter.__anext__(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timeout=STREAM_IDLE_TIMEOUT, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except StopAsyncIteration: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except asyncio.TimeoutError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logging.error( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "connection likely dead, raising APITimeoutError" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise APITimeoutError("Stream idle timeout exceeded") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| watchdog_ping() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if stream is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| aclose = getattr(stream, "aclose", None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if aclose is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await aclose() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await aclose() | |
| try: | |
| await aclose() | |
| except Exception: | |
| logging.exception("Failed to close streamed response") |
Copilot
AI
Apr 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This finally only closes the primary TaskAgent’s AsyncOpenAI client. Handoff agents are currently created via TaskAgent(...).agent, which keeps the underlying AsyncOpenAI client alive but makes it impossible to close() later, so their httpx connection pools can still leak and reintroduce the same CLOSE_WAIT/kqueue-spin behavior. Consider retaining the handoff TaskAgent wrappers (or their clients) and closing all of them here; also wrap agent0.close() in a try/except so a close failure doesn’t mask an earlier exception or prevent MCP cleanup.
Copilot
AI
Apr 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the primary TaskAgent’s AsyncOpenAI client is being closed here. Any additional handoff agents created earlier via TaskAgent(...).agent will still retain their own AsyncOpenAI/httpx pools (the wrapper instance is discarded, but the Agent keeps a reference to the client through its model). To fully prevent lingering CLOSE_WAIT sockets / event-loop spin, track and close all TaskAgent-owned clients (or share a single client across agents) in this finally block.
| # Close the AsyncOpenAI client to release httpx connection pool. | |
| # Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open. | |
| if agent0 is not None: | |
| await agent0.close() | |
| # Close all AsyncOpenAI clients reachable from the task agent graph to | |
| # release every httpx connection pool created for handoff agents too. | |
| # Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open. | |
| if agent0 is not None: | |
| root_agent = getattr(agent0, "agent", None) | |
| agents_to_visit: list[Agent[Any]] = [] | |
| if root_agent is not None: | |
| agents_to_visit.append(root_agent) | |
| seen_agent_ids: set[int] = set() | |
| extra_clients: list[Any] = [] | |
| seen_client_ids: set[int] = set() | |
| primary_client = getattr(getattr(root_agent, "model", None), "client", None) | |
| if primary_client is not None: | |
| seen_client_ids.add(id(primary_client)) | |
| while agents_to_visit: | |
| current_agent = agents_to_visit.pop() | |
| current_agent_id = id(current_agent) | |
| if current_agent_id in seen_agent_ids: | |
| continue | |
| seen_agent_ids.add(current_agent_id) | |
| current_client = getattr(getattr(current_agent, "model", None), "client", None) | |
| if current_client is not None and id(current_client) not in seen_client_ids: | |
| seen_client_ids.add(id(current_client)) | |
| extra_clients.append(current_client) | |
| for handoff in getattr(current_agent, "handoffs", ()) or (): | |
| handoff_agent = getattr(handoff, "agent", handoff) | |
| if handoff_agent is not None: | |
| agents_to_visit.append(handoff_agent) | |
| await agent0.close() | |
| for client in extra_clients: | |
| close_client = getattr(client, "close", None) | |
| if close_client is None: | |
| continue | |
| try: | |
| await close_client() | |
| except Exception: | |
| logging.exception("Exception while closing handoff agent client") |
Copilot
AI
Apr 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After mcp_sessions.cancel(), the code does await mcp_sessions with no timeout. This can hang indefinitely if the session task suppresses cancellation during cleanup (it catches CancelledError inside its cleanup loop). Use a bounded wait (e.g., asyncio.wait_for(..., timeout=MCP_CLEANUP_TIMEOUT)) and log if cancellation doesn’t complete, so shutdown can’t re-hang in the cleanup path.
| await mcp_sessions | |
| except (asyncio.CancelledError, Exception): | |
| pass | |
| await asyncio.wait_for(mcp_sessions, timeout=MCP_CLEANUP_TIMEOUT) | |
| except asyncio.TimeoutError: | |
| logging.warning( | |
| "Timed out waiting for MCP session task cancellation after %s seconds", | |
| MCP_CLEANUP_TIMEOUT, | |
| ) | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception: | |
| logging.exception("Exception while waiting for mcp session task cancellation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start_watchdogis documented as idempotent, but the current implementation always spawns a new daemon thread on every call. Also,_watchdog_last_activityis initialized at module import time andstart_watchdog()does not reset it, so if this module is imported andstart_watchdog()is invoked later thantimeoutseconds, the watchdog can force-exit almost immediately. Consider (1) tracking a module-level started flag/thread to make this truly idempotent, and (2) callingwatchdog_ping()(or otherwise resetting the timestamp) insidestart_watchdog()before starting the thread.