Skip to content

Fix async streaming hangs on dead connections#212

Draft
anticomputer wants to merge 3 commits intomainfrom
anticomputer/async-hang-fixes
Draft

Fix async streaming hangs on dead connections#212
anticomputer wants to merge 3 commits intomainfrom
anticomputer/async-hang-fixes

Conversation

@anticomputer
Copy link
Copy Markdown
Contributor

Add httpx timeout to AsyncOpenAI client and wrap stream event iteration with an idle timeout to detect and recover from dead connections stuck in CLOSE_WAIT state.

Add httpx timeout to AsyncOpenAI client and wrap stream event
iteration with an idle timeout to detect and recover from dead
connections stuck in CLOSE_WAIT state.
Copilot AI review requested due to automatic review settings April 9, 2026 05:37
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds guardrails to prevent async streaming runs from hanging indefinitely on dead HTTP connections by applying client-side HTTP timeouts and an application-level idle timeout around stream event iteration.

Changes:

  • Add httpx.Timeout(...) to the AsyncOpenAI client used by TaskAgent.
  • Introduce STREAM_IDLE_TIMEOUT and wrap stream_events() iteration in asyncio.wait_for(...) to detect stalled streams and raise APITimeoutError.
  • Log a specific error when the stream is idle beyond the configured threshold.
Show a summary per file
File Description
src/seclab_taskflow_agent/runner.py Adds a streaming idle-timeout loop around result.stream_events() to fail fast and retry when events stop arriving.
src/seclab_taskflow_agent/agent.py Configures AsyncOpenAI with explicit httpx connect/read/write/pool timeouts to avoid hanging requests.

Copilot's findings

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

  • Files reviewed: 2/2 changed files
  • Comments generated: 3

Comment on lines 395 to 412
stream = result.stream_events()
async_iter = stream.__aiter__()
while True:
try:
event = await asyncio.wait_for(
async_iter.__anext__(),
timeout=STREAM_IDLE_TIMEOUT,
)
except StopAsyncIteration:
break
except asyncio.TimeoutError:
logging.error(
f"Stream idle for {STREAM_IDLE_TIMEOUT}s — "
"connection likely dead, raising APITimeoutError"
)
raise APITimeoutError("Stream idle timeout exceeded")
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
await render_model_output(event.data.delta, async_task=async_task, task_id=task_id)
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manual __aiter__/__anext__ loop no longer guarantees the stream iterator is closed when exiting early (e.g., on asyncio.TimeoutError). Unlike async for, this can leave the async generator / underlying streaming run unclosed, potentially leaking tasks/sockets and undermining the goal of recovering from dead connections. Ensure the stream is explicitly closed in a finally (e.g., via contextlib.aclosing(...) or calling aclose()/a provided result cleanup API) before raising/retrying.

Suggested change
stream = result.stream_events()
async_iter = stream.__aiter__()
while True:
try:
event = await asyncio.wait_for(
async_iter.__anext__(),
timeout=STREAM_IDLE_TIMEOUT,
)
except StopAsyncIteration:
break
except asyncio.TimeoutError:
logging.error(
f"Stream idle for {STREAM_IDLE_TIMEOUT}s — "
"connection likely dead, raising APITimeoutError"
)
raise APITimeoutError("Stream idle timeout exceeded")
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
await render_model_output(event.data.delta, async_task=async_task, task_id=task_id)
stream = None
try:
stream = result.stream_events()
async_iter = stream.__aiter__()
while True:
try:
event = await asyncio.wait_for(
async_iter.__anext__(),
timeout=STREAM_IDLE_TIMEOUT,
)
except StopAsyncIteration:
break
except asyncio.TimeoutError:
logging.error(
f"Stream idle for {STREAM_IDLE_TIMEOUT}s — "
"connection likely dead, raising APITimeoutError"
)
raise APITimeoutError("Stream idle timeout exceeded")
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
await render_model_output(event.data.delta, async_task=async_task, task_id=task_id)
finally:
if stream is not None:
aclose = getattr(stream, "aclose", None)
if aclose is not None:
await aclose()

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — addressed in abb4ba3. Added finally block with aclose() to ensure the async generator is cleaned up on timeout/early exit.

Comment on lines +394 to +410
result = agent0.run_streamed(prompt, max_turns=max_turns)
async for event in result.stream_events():
stream = result.stream_events()
async_iter = stream.__aiter__()
while True:
try:
event = await asyncio.wait_for(
async_iter.__anext__(),
timeout=STREAM_IDLE_TIMEOUT,
)
except StopAsyncIteration:
break
except asyncio.TimeoutError:
logging.error(
f"Stream idle for {STREAM_IDLE_TIMEOUT}s — "
"connection likely dead, raising APITimeoutError"
)
raise APITimeoutError("Stream idle timeout exceeded")
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces a stream-idle timeout behavior but there are no unit tests covering the new retry path (timeout -> APITimeoutError -> retry) or verifying that iteration stops/cleans up correctly. Consider adding a focused test that stubs run_streamed().stream_events() to yield a couple events and then hang, asserting the idle timeout triggers and cleanup/retry behavior occurs as intended.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a targeted hotfix for a production hang we hit during a live experiment — the timeout/retry path was validated manually against Python 3.13 (normal iteration, StopAsyncIteration propagation, TimeoutError, and cancellation safety). Happy to add a proper test suite in a follow-up but didn't want to block the fix on that.

Comment on lines 182 to 187
client = AsyncOpenAI(
base_url=resolved_endpoint,
api_key=resolved_token,
default_headers=provider.extra_headers or None,
timeout=httpx.Timeout(connect=10.0, read=300.0, write=300.0, pool=60.0),
)
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client read timeout is set to 300s while the runner’s STREAM_IDLE_TIMEOUT is 1800s. Since these are two independent timeouts affecting streaming behavior, the effective idle-kill behavior can become unclear to maintainers and may not match the intended 30-minute threshold. Consider centralizing/documenting the relationship between these timeouts (or aligning them) so the configured behavior is unambiguous.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are intentionally different — they guard different failure modes at different layers:

  • httpx.Timeout(read=300s) — TCP-level. Catches dead connections where the socket itself stops delivering bytes (CLOSE_WAIT). This is the first line of defense.
  • STREAM_IDLE_TIMEOUT(1800s) — Application-level. Catches hangs where the connection is technically alive but no events arrive (e.g. the async generator is stuck, or the server stops sending SSE frames while keeping the connection open).

The read timeout fires per individual socket read; the idle timeout fires when no complete event has been yielded for 30 minutes. In practice the httpx timeout catches most dead-connection cases and the idle timeout is a backstop for subtler hangs. I'll add a comment in the code clarifying the relationship.

Copilot AI review requested due to automatic review settings April 9, 2026 05:46
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants