Skip to content

Commit 9ee52df

Browse files
fix(transport): use spawn_detached for stderr reader (trio nursery corruption) (#885)
## Problem When `query()` is iterated inside a trio nursery task with `options.stderr` set, cancelling or breaking out of the iteration can raise: RuntimeError: Nursery stack corrupted: Nursery surrounding <Task ...> was closed before the task exited `SubprocessCLITransport.connect()` spawns the stderr reader by manually calling `__aenter__` on an `anyio.TaskGroup`: ```python self._stderr_task_group = anyio.create_task_group() await self._stderr_task_group.__aenter__() self._stderr_task_group.start_soon(self._handle_stderr) ``` On the trio backend this pushes a nursery onto the calling task's nursery stack. The matching `__aexit__` lives in `close()`, which is reached via `process_query`'s `finally` → `inner.aclose()` → `transport.close()`. If the caller cancels the iteration (e.g. wraps the `async for` in a `CancelScope` and breaks early), the awaits along that cleanup path checkpoint while the scope is cancelled and re-raise `Cancelled` before `__aexit__` runs. The nursery is left on the task's stack; when the task exits, trio raises the corruption error. This is the same task-affinity hazard that `_task_compat.spawn_detached()` already solves for the read loop (`Query._read_task`). The stderr reader predates that helper and was not migrated. ## Fix Spawn the stderr reader via `spawn_detached()` and clean it up with `TaskHandle.cancel()` + `await wait()`, matching the read-loop pattern in `_internal/query.py`. This removes the manual `__aenter__`/`__aexit__` and the trio task-affinity requirement. ## Repro ```python import trio from claude_agent_sdk import query, ClaudeAgentOptions async def one(): opts = ClaudeAgentOptions(stderr=lambda line: None, max_turns=1) with trio.CancelScope() as cs: async for msg in query(prompt="hi", options=opts): cs.cancel() # early-break after first message async def main(): async with trio.open_nursery() as n: for _ in range(5): n.start_soon(one) trio.run(main) ``` Before: `RuntimeError: Nursery stack corrupted` from the nursery `__aexit__`. After: clean exit.
1 parent e21b457 commit 9ee52df

1 file changed

Lines changed: 11 additions & 11 deletions

File tree

src/claude_agent_sdk/_internal/transport/subprocess_cli.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313
from typing import Any, cast
1414

1515
import anyio
16-
import anyio.abc
1716
from anyio.abc import Process
1817
from anyio.streams.text import TextReceiveStream, TextSendStream
1918

2019
from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError
2120
from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
2221
from ..._version import __version__
2322
from ...types import ClaudeAgentOptions, SystemPromptFile, SystemPromptPreset
23+
from .._task_compat import TaskHandle, spawn_detached
2424
from . import Transport
2525

2626
logger = logging.getLogger(__name__)
@@ -50,7 +50,7 @@ def __init__(
5050
self._stdout_stream: TextReceiveStream | None = None
5151
self._stdin_stream: TextSendStream | None = None
5252
self._stderr_stream: TextReceiveStream | None = None
53-
self._stderr_task_group: anyio.abc.TaskGroup | None = None
53+
self._stderr_task: TaskHandle | None = None
5454
self._ready = False
5555
self._exit_error: Exception | None = None # Track process exit errors
5656
self._max_buffer_size = (
@@ -463,10 +463,10 @@ async def connect(self) -> None:
463463
# Setup stderr stream if piped
464464
if stderr_dest is PIPE and self._process.stderr:
465465
self._stderr_stream = TextReceiveStream(self._process.stderr)
466-
# Start async task to read stderr
467-
self._stderr_task_group = anyio.create_task_group()
468-
await self._stderr_task_group.__aenter__()
469-
self._stderr_task_group.start_soon(self._handle_stderr)
466+
# Spawn the stderr reader via spawn_detached (not a manually-
467+
# entered TaskGroup) so cleanup has no trio task-affinity —
468+
# same pattern as Query._read_task.
469+
self._stderr_task = spawn_detached(self._handle_stderr())
470470

471471
# Setup stdin for streaming (always used now)
472472
if self._process.stdin:
@@ -515,12 +515,12 @@ async def close(self) -> None:
515515
self._ready = False
516516
return
517517

518-
# Close stderr task group if active
519-
if self._stderr_task_group:
518+
# Cancel stderr reader if active
519+
if self._stderr_task is not None and not self._stderr_task.done():
520+
self._stderr_task.cancel()
520521
with suppress(Exception):
521-
self._stderr_task_group.cancel_scope.cancel()
522-
await self._stderr_task_group.__aexit__(None, None, None)
523-
self._stderr_task_group = None
522+
await self._stderr_task.wait()
523+
self._stderr_task = None
524524

525525
# Close stdin stream (acquire lock to prevent race with concurrent writes)
526526
async with self._write_lock:

0 commit comments

Comments
 (0)