Fix async streaming hangs on dead connections#212
Conversation
Add httpx timeout to AsyncOpenAI client and wrap stream event iteration with an idle timeout to detect and recover from dead connections stuck in CLOSE_WAIT state.
There was a problem hiding this comment.
Pull request overview
Adds guardrails to prevent async streaming runs from hanging indefinitely on dead HTTP connections by applying client-side HTTP timeouts and an application-level idle timeout around stream event iteration.
Changes:
- Add
httpx.Timeout(...)to theAsyncOpenAIclient used byTaskAgent. - Introduce
STREAM_IDLE_TIMEOUTand wrapstream_events()iteration inasyncio.wait_for(...)to detect stalled streams and raiseAPITimeoutError. - Log a specific error when the stream is idle beyond the configured threshold.
Show a summary per file
| File | Description |
|---|---|
src/seclab_taskflow_agent/runner.py |
Adds a streaming idle-timeout loop around result.stream_events() to fail fast and retry when events stop arriving. |
src/seclab_taskflow_agent/agent.py |
Configures AsyncOpenAI with explicit httpx connect/read/write/pool timeouts to avoid hanging requests. |
Copilot's findings
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 2/2 changed files
- Comments generated: 3
src/seclab_taskflow_agent/runner.py
Outdated
| stream = result.stream_events() | ||
| async_iter = stream.__aiter__() | ||
| while True: | ||
| try: | ||
| event = await asyncio.wait_for( | ||
| async_iter.__anext__(), | ||
| timeout=STREAM_IDLE_TIMEOUT, | ||
| ) | ||
| except StopAsyncIteration: | ||
| break | ||
| except asyncio.TimeoutError: | ||
| logging.error( | ||
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | ||
| "connection likely dead, raising APITimeoutError" | ||
| ) | ||
| raise APITimeoutError("Stream idle timeout exceeded") | ||
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | ||
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) |
There was a problem hiding this comment.
The manual __aiter__/__anext__ loop no longer guarantees the stream iterator is closed when exiting early (e.g., on asyncio.TimeoutError). Unlike async for, this can leave the async generator / underlying streaming run unclosed, potentially leaking tasks/sockets and undermining the goal of recovering from dead connections. Ensure the stream is explicitly closed in a finally (e.g., via contextlib.aclosing(...) or calling aclose()/a provided result cleanup API) before raising/retrying.
| stream = result.stream_events() | |
| async_iter = stream.__aiter__() | |
| while True: | |
| try: | |
| event = await asyncio.wait_for( | |
| async_iter.__anext__(), | |
| timeout=STREAM_IDLE_TIMEOUT, | |
| ) | |
| except StopAsyncIteration: | |
| break | |
| except asyncio.TimeoutError: | |
| logging.error( | |
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | |
| "connection likely dead, raising APITimeoutError" | |
| ) | |
| raise APITimeoutError("Stream idle timeout exceeded") | |
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) | |
| stream = None | |
| try: | |
| stream = result.stream_events() | |
| async_iter = stream.__aiter__() | |
| while True: | |
| try: | |
| event = await asyncio.wait_for( | |
| async_iter.__anext__(), | |
| timeout=STREAM_IDLE_TIMEOUT, | |
| ) | |
| except StopAsyncIteration: | |
| break | |
| except asyncio.TimeoutError: | |
| logging.error( | |
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | |
| "connection likely dead, raising APITimeoutError" | |
| ) | |
| raise APITimeoutError("Stream idle timeout exceeded") | |
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) | |
| finally: | |
| if stream is not None: | |
| aclose = getattr(stream, "aclose", None) | |
| if aclose is not None: | |
| await aclose() |
There was a problem hiding this comment.
Good catch — addressed in abb4ba3. Added finally block with aclose() to ensure the async generator is cleaned up on timeout/early exit.
src/seclab_taskflow_agent/runner.py
Outdated
| result = agent0.run_streamed(prompt, max_turns=max_turns) | ||
| async for event in result.stream_events(): | ||
| stream = result.stream_events() | ||
| async_iter = stream.__aiter__() | ||
| while True: | ||
| try: | ||
| event = await asyncio.wait_for( | ||
| async_iter.__anext__(), | ||
| timeout=STREAM_IDLE_TIMEOUT, | ||
| ) | ||
| except StopAsyncIteration: | ||
| break | ||
| except asyncio.TimeoutError: | ||
| logging.error( | ||
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | ||
| "connection likely dead, raising APITimeoutError" | ||
| ) | ||
| raise APITimeoutError("Stream idle timeout exceeded") |
There was a problem hiding this comment.
This change introduces a stream-idle timeout behavior but there are no unit tests covering the new retry path (timeout -> APITimeoutError -> retry) or verifying that iteration stops/cleans up correctly. Consider adding a focused test that stubs run_streamed().stream_events() to yield a couple events and then hang, asserting the idle timeout triggers and cleanup/retry behavior occurs as intended.
There was a problem hiding this comment.
This is a targeted hotfix for a production hang we hit during a live experiment — the timeout/retry path was validated manually against Python 3.13 (normal iteration, StopAsyncIteration propagation, TimeoutError, and cancellation safety). Happy to add a proper test suite in a follow-up but didn't want to block the fix on that.
| client = AsyncOpenAI( | ||
| base_url=resolved_endpoint, | ||
| api_key=resolved_token, | ||
| default_headers=provider.extra_headers or None, | ||
| timeout=httpx.Timeout(connect=10.0, read=300.0, write=300.0, pool=60.0), | ||
| ) |
There was a problem hiding this comment.
The client read timeout is set to 300s while the runner’s STREAM_IDLE_TIMEOUT is 1800s. Since these are two independent timeouts affecting streaming behavior, the effective idle-kill behavior can become unclear to maintainers and may not match the intended 30-minute threshold. Consider centralizing/documenting the relationship between these timeouts (or aligning them) so the configured behavior is unambiguous.
There was a problem hiding this comment.
These are intentionally different — they guard different failure modes at different layers:
httpx.Timeout(read=300s)— TCP-level. Catches dead connections where the socket itself stops delivering bytes (CLOSE_WAIT). This is the first line of defense.STREAM_IDLE_TIMEOUT(1800s)— Application-level. Catches hangs where the connection is technically alive but no events arrive (e.g. the async generator is stuck, or the server stops sending SSE frames while keeping the connection open).
The read timeout fires per individual socket read; the idle timeout fires when no complete event has been yielded for 30 minutes. In practice the httpx timeout catches most dead-connection cases and the idle timeout is a backstop for subtler hangs. I'll add a comment in the code clarifying the relationship.
Add httpx timeout to AsyncOpenAI client and wrap stream event iteration with an idle timeout to detect and recover from dead connections stuck in CLOSE_WAIT state.