Skip to content

Commit 2daf967

Browse files
committed
fix(_task_compat,query): contextvar parity + close message streams
- spawn_detached(trio): pass context=contextvars.copy_context() to trio.lowlevel.spawn_system_task so spawned tasks inherit the caller's contextvars, matching asyncio's loop.create_task() semantics. Adds TestContextVarPropagation parity tests. - _read_messages finally: close _message_send after the send_nowait so receivers exit on EndOfStream even if the sentinel was dropped due to WouldBlock (buffer-full natural-EOF path). - Query.close(): close _message_send/_message_receive (sync, idempotent) to drop the 'ResourceWarning: Unclosed <MemoryObject*Stream>' warnings surfaced by the new trio-backend tests.
1 parent 23868b5 commit 2daf967

3 files changed

Lines changed: 52 additions & 1 deletion

File tree

src/claude_agent_sdk/_internal/_task_compat.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from __future__ import annotations
1919

20+
import contextvars
2021
from collections.abc import Callable, Coroutine
2122
from contextlib import suppress
2223
from typing import Any
@@ -151,7 +152,10 @@ async def _runner() -> None:
151152
finally:
152153
handle._mark_done(exc)
153154

154-
trio.lowlevel.spawn_system_task(_runner)
155+
# Pass context= so trio system tasks inherit the caller's
156+
# contextvars (asyncio's loop.create_task() does this implicitly;
157+
# spawn_system_task does not).
158+
trio.lowlevel.spawn_system_task(_runner, context=contextvars.copy_context())
155159
return handle
156160
# Unsupported backend: close the coroutine so we don't leak a "coroutine
157161
# was never awaited" RuntimeWarning on top of the RuntimeError.

src/claude_agent_sdk/_internal/query.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,8 +325,12 @@ async def _read_messages(self) -> None:
325325
# Always signal end of stream. send_nowait: trio's level-triggered
326326
# cancellation would re-raise Cancelled at an await checkpoint
327327
# here, dropping the sentinel and leaving receive_messages() hung.
328+
# close() is the fallback for the buffer-full case where
329+
# send_nowait raises WouldBlock — receivers then exit on
330+
# EndOfStream after draining.
328331
with suppress(anyio.WouldBlock):
329332
self._message_send.send_nowait({"type": "end"})
333+
self._message_send.close()
330334

331335
async def _handle_control_request(self, request: SDKControlRequest) -> None:
332336
"""Handle incoming control request from CLI."""
@@ -813,6 +817,14 @@ async def close(self) -> None:
813817
self._read_task.cancel()
814818
await self._read_task.wait()
815819
self._read_task = None
820+
# The read task's finally closed the send side; close the receive
821+
# side here so callers get EndOfStream and anyio doesn't emit
822+
# ResourceWarning: Unclosed <MemoryObject*Stream> at GC time.
823+
# _message_send.close() is repeated for the case where start() was
824+
# never called (so _read_messages' finally never ran). Both are
825+
# sync, idempotent, and checkpoint-free.
826+
self._message_send.close()
827+
self._message_receive.close()
816828
await self.transport.close()
817829

818830
# Make Query an async iterator

tests/test_task_compat.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,41 @@ async def coro():
119119
anyio.run(_test, backend="trio")
120120

121121

122+
class TestContextVarPropagation:
123+
"""Spawned tasks must see the caller's contextvars on both backends.
124+
125+
asyncio's ``loop.create_task()`` copies the current context implicitly;
126+
trio's ``spawn_system_task`` does not unless ``context=`` is passed.
127+
"""
128+
129+
@staticmethod
130+
def _run(backend: str) -> str:
131+
import contextvars
132+
133+
cv: contextvars.ContextVar[str] = contextvars.ContextVar(
134+
"cv", default="DEFAULT"
135+
)
136+
seen: list[str] = []
137+
138+
async def _test():
139+
cv.set("PARENT")
140+
141+
async def coro():
142+
seen.append(cv.get())
143+
144+
handle = spawn_detached(coro())
145+
await handle.wait()
146+
147+
anyio.run(_test, backend=backend)
148+
return seen[0]
149+
150+
def test_contextvar_propagates_asyncio(self):
151+
assert self._run("asyncio") == "PARENT"
152+
153+
def test_contextvar_propagates_trio(self):
154+
assert self._run("trio") == "PARENT"
155+
156+
122157
class TestCrossTaskCancel:
123158
def test_cancel_from_different_task_trio(self):
124159
"""Cancelling from a different task than the spawner must not raise.

0 commit comments

Comments
 (0)