Skip to content

Commit 7b118b5

Browse files
committed
fix: cancel RunResultStreaming on retry to prevent leaked asyncio tasks
When stream_events() is interrupted by a timeout and we retry, the old RunResultStreaming object's _run_impl_task continues running as a dangling asyncio.Task. Calling aclose() on the stream_events() async generator throws GeneratorExit which skips the SDK's _cleanup_tasks() call, so the background task is never cancelled. Each timeout retry leaked one task still streaming from the API and holding MCP server references. On process exit, asyncio.run() blocks trying to clean up these orphaned tasks — a likely contributor to the post-completion hangs requiring watchdog intervention. Fix: call result.cancel() in the finally block after aclose(). This cancels _run_impl_task, input/output guardrail tasks, and clears the event queue before retrying with a fresh RunResultStreaming.
1 parent 84ecf1d commit 7b118b5

1 file changed

Lines changed: 7 additions & 0 deletions

File tree

src/seclab_taskflow_agent/runner.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ async def _run_streamed() -> None:
436436
max_retry = MAX_API_RETRY
437437
rate_limit_backoff = RATE_LIMIT_BACKOFF
438438
while rate_limit_backoff:
439+
result = None
439440
try:
440441
result = agent0.run_streamed(prompt, max_turns=max_turns)
441442
stream = None
@@ -464,6 +465,12 @@ async def _run_streamed() -> None:
464465
aclose = getattr(stream, "aclose", None)
465466
if aclose is not None:
466467
await aclose()
468+
# Cancel the RunResultStreaming background tasks.
469+
# aclose() on the stream_events() async generator throws
470+
# GeneratorExit which skips _cleanup_tasks(), so we must
471+
# cancel explicitly to avoid leaking _run_impl_task.
472+
if result is not None:
473+
result.cancel()
467474
await render_model_output("\n\n", async_task=async_task, task_id=task_id)
468475
return
469476
except APITimeoutError:

0 commit comments

Comments
 (0)