Fix async streaming hangs on dead connections#212
Fix async streaming hangs on dead connections#212anticomputer wants to merge 13 commits intomainfrom
Conversation
Add httpx timeout to AsyncOpenAI client and wrap stream event iteration with an idle timeout to detect and recover from dead connections stuck in CLOSE_WAIT state.
There was a problem hiding this comment.
Pull request overview
Adds guardrails to prevent async streaming runs from hanging indefinitely on dead HTTP connections by applying client-side HTTP timeouts and an application-level idle timeout around stream event iteration.
Changes:
- Add
httpx.Timeout(...)to theAsyncOpenAIclient used byTaskAgent. - Introduce
STREAM_IDLE_TIMEOUTand wrapstream_events()iteration inasyncio.wait_for(...)to detect stalled streams and raiseAPITimeoutError. - Log a specific error when the stream is idle beyond the configured threshold.
Show a summary per file
| File | Description |
|---|---|
src/seclab_taskflow_agent/runner.py |
Adds a streaming idle-timeout loop around result.stream_events() to fail fast and retry when events stop arriving. |
src/seclab_taskflow_agent/agent.py |
Configures AsyncOpenAI with explicit httpx connect/read/write/pool timeouts to avoid hanging requests. |
Copilot's findings
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 2/2 changed files
- Comments generated: 3
src/seclab_taskflow_agent/runner.py
Outdated
| stream = result.stream_events() | ||
| async_iter = stream.__aiter__() | ||
| while True: | ||
| try: | ||
| event = await asyncio.wait_for( | ||
| async_iter.__anext__(), | ||
| timeout=STREAM_IDLE_TIMEOUT, | ||
| ) | ||
| except StopAsyncIteration: | ||
| break | ||
| except asyncio.TimeoutError: | ||
| logging.error( | ||
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | ||
| "connection likely dead, raising APITimeoutError" | ||
| ) | ||
| raise APITimeoutError("Stream idle timeout exceeded") | ||
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | ||
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) |
There was a problem hiding this comment.
The manual __aiter__/__anext__ loop no longer guarantees the stream iterator is closed when exiting early (e.g., on asyncio.TimeoutError). Unlike async for, this can leave the async generator / underlying streaming run unclosed, potentially leaking tasks/sockets and undermining the goal of recovering from dead connections. Ensure the stream is explicitly closed in a finally (e.g., via contextlib.aclosing(...) or calling aclose()/a provided result cleanup API) before raising/retrying.
| stream = result.stream_events() | |
| async_iter = stream.__aiter__() | |
| while True: | |
| try: | |
| event = await asyncio.wait_for( | |
| async_iter.__anext__(), | |
| timeout=STREAM_IDLE_TIMEOUT, | |
| ) | |
| except StopAsyncIteration: | |
| break | |
| except asyncio.TimeoutError: | |
| logging.error( | |
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | |
| "connection likely dead, raising APITimeoutError" | |
| ) | |
| raise APITimeoutError("Stream idle timeout exceeded") | |
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) | |
| stream = None | |
| try: | |
| stream = result.stream_events() | |
| async_iter = stream.__aiter__() | |
| while True: | |
| try: | |
| event = await asyncio.wait_for( | |
| async_iter.__anext__(), | |
| timeout=STREAM_IDLE_TIMEOUT, | |
| ) | |
| except StopAsyncIteration: | |
| break | |
| except asyncio.TimeoutError: | |
| logging.error( | |
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | |
| "connection likely dead, raising APITimeoutError" | |
| ) | |
| raise APITimeoutError("Stream idle timeout exceeded") | |
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) | |
| finally: | |
| if stream is not None: | |
| aclose = getattr(stream, "aclose", None) | |
| if aclose is not None: | |
| await aclose() |
There was a problem hiding this comment.
Good catch — addressed in abb4ba3. Added finally block with aclose() to ensure the async generator is cleaned up on timeout/early exit.
src/seclab_taskflow_agent/runner.py
Outdated
| result = agent0.run_streamed(prompt, max_turns=max_turns) | ||
| async for event in result.stream_events(): | ||
| stream = result.stream_events() | ||
| async_iter = stream.__aiter__() | ||
| while True: | ||
| try: | ||
| event = await asyncio.wait_for( | ||
| async_iter.__anext__(), | ||
| timeout=STREAM_IDLE_TIMEOUT, | ||
| ) | ||
| except StopAsyncIteration: | ||
| break | ||
| except asyncio.TimeoutError: | ||
| logging.error( | ||
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | ||
| "connection likely dead, raising APITimeoutError" | ||
| ) | ||
| raise APITimeoutError("Stream idle timeout exceeded") |
There was a problem hiding this comment.
This change introduces a stream-idle timeout behavior but there are no unit tests covering the new retry path (timeout -> APITimeoutError -> retry) or verifying that iteration stops/cleans up correctly. Consider adding a focused test that stubs run_streamed().stream_events() to yield a couple events and then hang, asserting the idle timeout triggers and cleanup/retry behavior occurs as intended.
There was a problem hiding this comment.
This is a targeted hotfix for a production hang we hit during a live experiment — the timeout/retry path was validated manually against Python 3.13 (normal iteration, StopAsyncIteration propagation, TimeoutError, and cancellation safety). Happy to add a proper test suite in a follow-up but didn't want to block the fix on that.
| client = AsyncOpenAI( | ||
| base_url=resolved_endpoint, | ||
| api_key=resolved_token, | ||
| default_headers=provider.extra_headers or None, | ||
| timeout=httpx.Timeout(connect=10.0, read=300.0, write=300.0, pool=60.0), | ||
| ) |
There was a problem hiding this comment.
The client read timeout is set to 300s while the runner’s STREAM_IDLE_TIMEOUT is 1800s. Since these are two independent timeouts affecting streaming behavior, the effective idle-kill behavior can become unclear to maintainers and may not match the intended 30-minute threshold. Consider centralizing/documenting the relationship between these timeouts (or aligning them) so the configured behavior is unambiguous.
There was a problem hiding this comment.
These are intentionally different — they guard different failure modes at different layers:
httpx.Timeout(read=300s)— TCP-level. Catches dead connections where the socket itself stops delivering bytes (CLOSE_WAIT). This is the first line of defense.STREAM_IDLE_TIMEOUT(1800s)— Application-level. Catches hangs where the connection is technically alive but no events arrive (e.g. the async generator is stuck, or the server stops sending SSE frames while keeping the connection open).
The read timeout fires per individual socket read; the idle timeout fires when no complete event has been yielded for 30 minutes. In practice the httpx timeout catches most dead-connection cases and the idle timeout is a backstop for subtler hangs. I'll add a comment in the code clarifying the relationship.
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent async streaming runs from hanging indefinitely on dead or stalled connections by adding timeouts and ensuring network/session resources are cleaned up reliably.
Changes:
- Add an application-level stream idle timeout around streamed event iteration.
- Configure
AsyncOpenAIwith explicithttpx.Timeoutvalues to detect dead sockets sooner. - Add explicit cleanup steps (close OpenAI client; cancel MCP session task) to avoid dangling sockets/tasks.
Show a summary per file
| File | Description |
|---|---|
src/seclab_taskflow_agent/runner.py |
Adds stream idle timeout loop, closes the agent client in finally, and attempts to cancel MCP session tasks during cleanup. |
src/seclab_taskflow_agent/agent.py |
Adds httpx client timeouts to AsyncOpenAI and introduces an async close() method to release the underlying connection pool. |
Copilot's findings
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 2/2 changed files
- Comments generated: 3
| system_prompt = mcp_system_prompt( | ||
| primary_personality.personality, | ||
| primary_personality.task, | ||
| server_prompts=server_prompts, | ||
| important_guidelines=important_guidelines, | ||
| ) | ||
| agent0 = None | ||
| agent0 = TaskAgent( | ||
| name=primary_name, |
There was a problem hiding this comment.
agent0 is referenced in the outer finally block, but it’s only initialized to None here after multiple earlier operations in the try block (e.g., building handoff agents / system prompt). If an exception is raised before this line runs, the finally block will raise UnboundLocalError when checking agent0, preventing cleanup. Initialize agent0 to None at the very start of the outer try (or before it) and then assign the TaskAgent once created; also avoids the current redundant double-assignment.
| if stream is not None: | ||
| aclose = getattr(stream, "aclose", None) | ||
| if aclose is not None: | ||
| await aclose() |
There was a problem hiding this comment.
The finally cleanup calls await aclose() directly. Any exception raised during aclose() will override the original streaming error/timeout and can break the retry path. Consider wrapping the aclose() call in a try/except (logging on failure) so cleanup failures don’t mask the root cause.
| await aclose() | |
| try: | |
| await aclose() | |
| except Exception: | |
| logging.exception("Failed to close streamed response") |
| await mcp_sessions | ||
| except (asyncio.CancelledError, Exception): | ||
| pass |
There was a problem hiding this comment.
After mcp_sessions.cancel(), the code does await mcp_sessions with no timeout. This can hang indefinitely if the session task suppresses cancellation during cleanup (it catches CancelledError inside its cleanup loop). Use a bounded wait (e.g., asyncio.wait_for(..., timeout=MCP_CLEANUP_TIMEOUT)) and log if cancellation doesn’t complete, so shutdown can’t re-hang in the cleanup path.
| await mcp_sessions | |
| except (asyncio.CancelledError, Exception): | |
| pass | |
| await asyncio.wait_for(mcp_sessions, timeout=MCP_CLEANUP_TIMEOUT) | |
| except asyncio.TimeoutError: | |
| logging.warning( | |
| "Timed out waiting for MCP session task cancellation after %s seconds", | |
| MCP_CLEANUP_TIMEOUT, | |
| ) | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception: | |
| logging.exception("Exception while waiting for mcp session task cancellation") |
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent async streaming runs from hanging indefinitely on dead/stalled connections by adding explicit network timeouts and an application-level idle timeout, plus more aggressive cleanup of long-lived async tasks/clients.
Changes:
- Add an application-level stream idle timeout around
stream_events()iteration to detect “no events yielded” hangs and convert them intoAPITimeoutErrorretries. - Configure
AsyncOpenAIwith an explicithttpx.Timeoutand addTaskAgent.close()to release the underlying httpx connection pool. - Add extra cleanup/cancellation for the MCP session task; CLI currently force-exits on success.
Show a summary per file
| File | Description |
|---|---|
| src/seclab_taskflow_agent/runner.py | Adds stream idle timeout loop + closes the primary OpenAI client + cancels MCP session task. |
| src/seclab_taskflow_agent/agent.py | Adds httpx client timeouts and introduces a close() method to dispose the AsyncOpenAI client. |
| src/seclab_taskflow_agent/cli.py | Force-exits after successful asyncio.run() to avoid lingering spin/hangs. |
Copilot's findings
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 3/3 changed files
- Comments generated: 2
src/seclab_taskflow_agent/cli.py
Outdated
| # Force-exit on success to prevent asyncio event loop spin on | ||
| # dangling connections/tasks that survive cleanup. | ||
| os._exit(0) |
There was a problem hiding this comment.
Calling os._exit(0) on success will bypass normal interpreter shutdown (no flushing stdio buffers, no finally/atexit handlers, no coverage/profiling hooks), which can truncate output and make the CLI unsafe to embed/invoke from tests or other Python code. Prefer fixing the underlying dangling-task issue (which this PR already addresses) and exiting normally (return from main / let asyncio.run finish, or raise typer.Exit(0) / sys.exit(0) if an explicit exit is needed).
| # Force-exit on success to prevent asyncio event loop spin on | |
| # dangling connections/tasks that survive cleanup. | |
| os._exit(0) | |
| return |
| finally: | ||
| # Close the AsyncOpenAI client to release httpx connection pool. | ||
| # Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open. | ||
| if agent0 is not None: | ||
| await agent0.close() |
There was a problem hiding this comment.
This finally only closes the primary TaskAgent’s AsyncOpenAI client. Handoff agents are currently created via TaskAgent(...).agent, which keeps the underlying AsyncOpenAI client alive but makes it impossible to close() later, so their httpx connection pools can still leak and reintroduce the same CLOSE_WAIT/kqueue-spin behavior. Consider retaining the handoff TaskAgent wrappers (or their clients) and closing all of them here; also wrap agent0.close() in a try/except so a close failure doesn’t mask an earlier exception or prevent MCP cleanup.
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent async streaming runs from hanging indefinitely on dead/stalled connections by adding both transport-level and application-level timeouts, plus additional cleanup to avoid lingering tasks/sockets.
Changes:
- Add an
httpx.Timeoutto theAsyncOpenAIclient used byTaskAgent. - Add a 30-minute stream-idle backstop around
stream_events()iteration (with explicit stream closing). - Add additional shutdown cleanup (closing the OpenAI client, cancelling MCP session tasks) and a forced process exit on successful completion.
Show a summary per file
| File | Description |
|---|---|
| src/seclab_taskflow_agent/runner.py | Adds stream idle timeout handling, closes the agent client in finally, cancels MCP session task, and introduces a forced os._exit(0) on success. |
| src/seclab_taskflow_agent/agent.py | Configures AsyncOpenAI with an explicit httpx.Timeout and adds a TaskAgent.close() to release the httpx connection pool. |
Copilot's findings
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 2/2 changed files
- Comments generated: 2
| # Close the AsyncOpenAI client to release httpx connection pool. | ||
| # Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open. | ||
| if agent0 is not None: | ||
| await agent0.close() |
There was a problem hiding this comment.
Only the primary TaskAgent’s AsyncOpenAI client is being closed here. Any additional handoff agents created earlier via TaskAgent(...).agent will still retain their own AsyncOpenAI/httpx pools (the wrapper instance is discarded, but the Agent keeps a reference to the client through its model). To fully prevent lingering CLOSE_WAIT sockets / event-loop spin, track and close all TaskAgent-owned clients (or share a single client across agents) in this finally block.
| # Close the AsyncOpenAI client to release httpx connection pool. | |
| # Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open. | |
| if agent0 is not None: | |
| await agent0.close() | |
| # Close all AsyncOpenAI clients reachable from the task agent graph to | |
| # release every httpx connection pool created for handoff agents too. | |
| # Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open. | |
| if agent0 is not None: | |
| root_agent = getattr(agent0, "agent", None) | |
| agents_to_visit: list[Agent[Any]] = [] | |
| if root_agent is not None: | |
| agents_to_visit.append(root_agent) | |
| seen_agent_ids: set[int] = set() | |
| extra_clients: list[Any] = [] | |
| seen_client_ids: set[int] = set() | |
| primary_client = getattr(getattr(root_agent, "model", None), "client", None) | |
| if primary_client is not None: | |
| seen_client_ids.add(id(primary_client)) | |
| while agents_to_visit: | |
| current_agent = agents_to_visit.pop() | |
| current_agent_id = id(current_agent) | |
| if current_agent_id in seen_agent_ids: | |
| continue | |
| seen_agent_ids.add(current_agent_id) | |
| current_client = getattr(getattr(current_agent, "model", None), "client", None) | |
| if current_client is not None and id(current_client) not in seen_client_ids: | |
| seen_client_ids.add(id(current_client)) | |
| extra_clients.append(current_client) | |
| for handoff in getattr(current_agent, "handoffs", ()) or (): | |
| handoff_agent = getattr(handoff, "agent", handoff) | |
| if handoff_agent is not None: | |
| agents_to_visit.append(handoff_agent) | |
| await agent0.close() | |
| for client in extra_clients: | |
| close_client = getattr(client, "close", None) | |
| if close_client is None: | |
| continue | |
| try: | |
| await close_client() | |
| except Exception: | |
| logging.exception("Exception while closing handoff agent client") |
src/seclab_taskflow_agent/runner.py
Outdated
| # Force-exit after successful completion only: asyncio.run() cleanup | ||
| # spins on dangling tasks/connections from the responses API path. | ||
| # Failure paths (must_complete break, personality mode) use normal exit. | ||
| if taskflow_path and session is not None and session.finished: | ||
| sys.stdout.flush() | ||
| sys.stderr.flush() | ||
| os._exit(0) |
There was a problem hiding this comment.
Using os._exit(0) inside run_main force-terminates the entire process and bypasses normal shutdown (pending finally blocks in other code, atexit handlers, log handler flush/close, coverage hooks, etc.). This makes run_main unsafe to call from any embedded/programmatic context and can cause silent data loss. Prefer fixing the underlying dangling task/connection issue (or gating a forced exit behind a CLI-only flag/environment variable, outside the exported API).
| # Force-exit after successful completion only: asyncio.run() cleanup | |
| # spins on dangling tasks/connections from the responses API path. | |
| # Failure paths (must_complete break, personality mode) use normal exit. | |
| if taskflow_path and session is not None and session.finished: | |
| sys.stdout.flush() | |
| sys.stderr.flush() | |
| os._exit(0) | |
| # Successful completion should return normally so embedded/programmatic | |
| # callers retain control of process shutdown and normal cleanup runs. | |
| if taskflow_path and session is not None and session.finished: | |
| sys.stdout.flush() | |
| sys.stderr.flush() | |
| return |
A daemon thread monitors last-activity timestamps and calls os._exit(2) if no progress for WATCHDOG_IDLE_TIMEOUT (default 30 min, env override). Pings are placed in the streaming loop, tool call hooks, and MCP cleanup. This covers every hang variant: dead connections, asyncio cleanup spin, MCP shutdown, etc. — the thread runs outside asyncio and cannot be blocked by event loop issues.
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent async streaming runs from hanging indefinitely on dead/stalled connections by adding layered timeouts and more aggressive cleanup/termination behavior around streaming and MCP session lifecycle management.
Changes:
- Add an
httpx.Timeoutto theAsyncOpenAIclient and expose aTaskAgent.close()to release the underlying httpx connection pool. - Wrap streaming event iteration with an application-level idle timeout to detect streams that stop yielding events.
- Add a watchdog thread and additional cleanup/cancellation logic intended to prevent event-loop spins and stuck tasks.
Show a summary per file
| File | Description |
|---|---|
src/seclab_taskflow_agent/runner.py |
Adds stream idle timeout, watchdog thread, extra cleanup/cancellation, and a forced process-exit path to avoid hangs/spins. |
src/seclab_taskflow_agent/agent.py |
Configures AsyncOpenAI with explicit httpx timeouts and adds a close method to release the httpx pool. |
Copilot's findings
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comments suppressed due to low confidence (3)
src/seclab_taskflow_agent/runner.py:828
run_mainnow unconditionally evaluatessessionwhen computing theos._exit(...)status. In personality-only mode (-pwithout taskflow/resume), thesessionvariable is never defined, so this will raiseUnboundLocalErrorright before exiting. Initializesession: TaskflowSession | None = Nonenear the top ofrun_main(outside the conditional), or guard this exit path when no session exists.
# Force-exit to prevent asyncio event loop spin on dangling
# tasks/connections from the responses API path. Flush first.
sys.stdout.flush()
sys.stderr.flush()
os._exit(0 if (session is None or session.finished) else 1)
src/seclab_taskflow_agent/runner.py:828
- Calling
os._exit(...)insiderun_mainis a hard process termination that bypasses normal exception propagation,asyncio.runteardown, and any caller cleanup. Sincerun_mainis re-exported for backwards compatibility and can be imported as a library API, this is a breaking behavior change; prefer returning normally and letting the CLI layer decide exit codes (e.g., by raisingtyper.Exitincli.pyor returning a status).
# Force-exit to prevent asyncio event loop spin on dangling
# tasks/connections from the responses API path. Flush first.
sys.stdout.flush()
sys.stderr.flush()
os._exit(0 if (session is None or session.finished) else 1)
src/seclab_taskflow_agent/runner.py:510
- Only the primary
TaskAgent(agent0) is closed here, but the handoff agents are created viaTaskAgent(...).agentearlier, which discards the wrapper and leaves their underlyingAsyncOpenAI/ httpx pools without an explicit close. If the goal is to eliminate stuck sockets/CPU spin, consider retaining theTaskAgentwrappers for handoffs too and closing all of them in thisfinallyblock.
finally:
# Close the AsyncOpenAI client to release httpx connection pool.
# Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open.
watchdog_ping()
if agent0 is not None:
await agent0.close()
- Files reviewed: 2/2 changed files
- Comments generated: 2
| system_prompt = mcp_system_prompt( | ||
| primary_personality.personality, | ||
| primary_personality.task, | ||
| server_prompts=server_prompts, | ||
| important_guidelines=important_guidelines, | ||
| ) | ||
| agent0 = None | ||
| agent0 = TaskAgent( | ||
| name=primary_name, |
There was a problem hiding this comment.
agent0 is initialized to None only after several operations that can raise (e.g., building prompts / handoff agent setup). If an exception occurs before line 416 executes, the finally block later will reference an unbound local agent0 when attempting to close it, raising UnboundLocalError and masking the original failure. Define agent0: TaskAgent | None = None before the outer try: (or otherwise ensure it is always bound) before entering code that may throw.
This issue also appears in the following locations of the same file:
- line 505
- line 824
- line 824
| def start_watchdog(timeout: int = WATCHDOG_IDLE_TIMEOUT) -> None: | ||
| """Start the watchdog thread (idempotent, daemon thread).""" | ||
| t = threading.Thread(target=_watchdog_thread, args=(timeout,), daemon=True) | ||
| t.start() |
There was a problem hiding this comment.
start_watchdog is documented as idempotent, but the current implementation always spawns a new daemon thread on every call. Also, _watchdog_last_activity is initialized at module import time and start_watchdog() does not reset it, so if this module is imported and start_watchdog() is invoked later than timeout seconds, the watchdog can force-exit almost immediately. Consider (1) tracking a module-level started flag/thread to make this truly idempotent, and (2) calling watchdog_ping() (or otherwise resetting the timestamp) inside start_watchdog() before starting the thread.
What this PR fixesWhen using the responses API, the agent process can hang indefinitely in several ways. This PR adds layered defenses against all observed variants. Bug 1: Dead TCP connections (CLOSE_WAIT spin)When the API server drops a streaming connection, the TCP socket enters CLOSE_WAIT state. The asyncio event loop busy-polls the dead fd via kqueue, pinning the CPU at 40-50% per process. The streaming iterator blocks forever on an internal event queue because the feeder task is stuck on the dead socket. Fix: Add httpx.Timeout(connect=10, read=300, write=300, pool=60) to the AsyncOpenAI client as a first line of defense against dead connections at the TCP level. Bug 2: Stream idle hangEven with httpx timeouts, the streaming generator can stall if the connection stays alive but the server stops producing events. The existing retry loop only catches exceptions, not silent stalls. Fix: Wrap each anext() call in asyncio.wait_for with a 30-minute STREAM_IDLE_TIMEOUT. On timeout, raise APITimeoutError to feed into the existing retry loop (MAX_API_RETRY=5). Also call aclose() on the stream in a finally block to release resources. Bug 3: Asyncio event loop spin on process exitAfter run_main completes (success or failure), asyncio.run() attempts to cancel remaining tasks and close the event loop. Dangling connections and tasks from the responses API path cause this cleanup phase to spin indefinitely at high CPU. Fix: Call os._exit() at the end of run_main with the correct exit code, before returning to asyncio.run()'s cleanup. All session state, logs, and checkpoints are persisted before this point. stdout/stderr are flushed before exiting. Bug 4: Hangs inside deploy_task_agentsThe os._exit at the end of run_main only fires after all tasks complete. For multi-task workflows like audit_issue_local_iter, if any single deploy_task_agents call hangs (in streaming, MCP cleanup, or anywhere else), run_main never reaches os._exit and the process spins indefinitely. Fix: A watchdog daemon thread that monitors last-activity timestamps from outside asyncio. Pings are placed in the streaming loop, tool call hooks, and MCP cleanup entry. If no activity for WATCHDOG_IDLE_TIMEOUT (default 35 min, configurable via env var), the thread calls os._exit(2). The timeout is set 5 minutes above STREAM_IDLE_TIMEOUT so the in-loop timeout gets first shot at a graceful retry. DesignThese are defense-in-depth layers, not redundant fixes:
Each layer addresses a failure mode the others cannot. If the upstream SDK fixes its asyncio cleanup, all four can be removed without functional impact. |
When stream_events() is interrupted by a timeout and we retry, the old RunResultStreaming object's _run_impl_task continues running as a dangling asyncio.Task. Calling aclose() on the stream_events() async generator throws GeneratorExit which skips the SDK's _cleanup_tasks() call, so the background task is never cancelled. Each timeout retry leaked one task still streaming from the API and holding MCP server references. On process exit, asyncio.run() blocks trying to clean up these orphaned tasks — a likely contributor to the post-completion hangs requiring watchdog intervention. Fix: call result.cancel() in the finally block after aclose(). This cancels _run_impl_task, input/output guardrail tasks, and clears the event queue before retrying with a fresh RunResultStreaming.
Add httpx timeout to AsyncOpenAI client and wrap stream event iteration with an idle timeout to detect and recover from dead connections stuck in CLOSE_WAIT state.