fix(query): restore trio compatibility via sniffio dispatch#870
fix(query): restore trio compatibility via sniffio dispatch#870
Conversation
E2E Test ResultsReal API roundtrips on both async backends. PR branch (
|
| Backend | query() |
ClaudeSDKClient |
|---|---|---|
| asyncio | OK — assistant said 'pong' |
— (covered by existing suite) |
| trio | OK — assistant said 'pong' |
OK — assistant said 'pong' |
=== E2E: trio compat for query() ===
[asyncio] OK — assistant said: 'pong'
[trio] OK — assistant said: 'pong'
RESULT: PASS
[trio ClaudeSDKClient] OK — assistant said: 'pong'
main (regression baseline, c1eb34e)
[trio on main] FAILED as expected: RuntimeError: no running event loop
Summary: trio is broken on main (RuntimeError: no running event loop from asyncio.get_running_loop()); on this branch both query() and ClaudeSDKClient complete a real API roundtrip under asyncio and trio. PASS.
a34f84e to
1faa93c
Compare
PR #746 replaced anyio.TaskGroup with asyncio.create_task() in Query to fix #378 (100% CPU spin) and #454 (cross-task cancel-scope RuntimeError), but asyncio.get_running_loop() raises 'RuntimeError: no running event loop' under trio, breaking ClaudeSDKClient.connect() for trio users. This adds _internal/_task_compat.py with a TaskHandle abstraction that dispatches via sniffio: asyncio uses loop.create_task() (unchanged from PR #746, so #378/#454 stay fixed); trio uses trio.lowlevel.spawn_system_task with a per-task CancelScope. Query.start/spawn_task/close now use TaskHandle and no longer import asyncio. Adds trio-backend tests for Query and ClaudeSDKClient.connect.
1faa93c to
194474c
Compare
trio's level-triggered cancellation re-raises Cancelled at every checkpoint inside a cancelled scope, so 'await send(...)' in the finally block dropped the sentinel after close(), leaving a consumer blocked in receive_messages() hung. send_nowait is checkpoint-free. Also shields the transcript_mirror_batcher.flush() await in the same finally block so it runs to completion when reached via cancellation.
- spawn_detached(trio): pass context=contextvars.copy_context() to trio.lowlevel.spawn_system_task so spawned tasks inherit the caller's contextvars, matching asyncio's loop.create_task() semantics. Adds TestContextVarPropagation parity tests. - _read_messages finally: close _message_send after the send_nowait so receivers exit on EndOfStream even if the sentinel was dropped due to WouldBlock (buffer-full natural-EOF path). - Query.close(): close _message_send/_message_receive (sync, idempotent) to drop the 'ResourceWarning: Unclosed <MemoryObject*Stream>' warnings surfaced by the new trio-backend tests.
Closing the receive stream from close() makes a non-parked consumer hit
ClosedResourceError on its next iteration and drop buffered messages.
_message_send.close() alone provides correct EndOfStream termination
after the buffer drains.
Adds test_buffered_messages_drain_after_close_{asyncio,trio} which fail
with ClosedResourceError when _message_receive.close() is present.
ResourceWarning for the unclosed receive stream is filtered in the new
trio test classes (cosmetic; pre-existing on main, consumer-owned).
There was a problem hiding this comment.
No further issues found after b611d71 — all five earlier review points are addressed with regression tests. That said, this reworks the core Query task lifecycle around a new detached-task abstraction (including trio.lowlevel.spawn_system_task and altered end-of-stream/cancellation semantics in _read_messages' finally), so it's worth a human pass before merging.
Extended reasoning...
Overview
Adds _internal/_task_compat.py (new ~176-line module: TaskHandle ABC, _AsyncioTaskHandle, _TrioTaskHandle, spawn_detached() with sniffio dispatch) and rewires Query.start/spawn_task/_spawn_control_request_handler/close plus _read_messages' finally block to use it. query.py drops its asyncio import in favour of anyio.get_cancelled_exc_class(), switches the end sentinel from await send() to send_nowait() + _message_send.close(), and shields the batcher flush. Adds sniffio as an explicit runtime dep (already transitive via anyio). ~440 lines of new tests across three files covering both backends.
Security risks
None identified. No auth, crypto, permissions, subprocess-argument, or deserialization changes; the diff is confined to internal async task scheduling and test scaffolding. The new dep (sniffio) is already a transitive dependency of anyio.
Level of scrutiny
High. Query is the SDK's core bidirectional message pump — every query() and ClaudeSDKClient call flows through it. The change deliberately steps outside structured concurrency via trio.lowlevel.spawn_system_task (a documented but low-level escape hatch) and alters cancellation/stream-close semantics that interact with level- vs edge-triggered cancellation, anyio memory-stream buffer/close ordering, and cross-task finalization. The review history bears this out: four distinct concurrency edge cases (level-triggered cancel dropping the sentinel, contextvar non-propagation, WouldBlock on full buffer at EOF, ClosedResourceError from closing the receive side) were found and fixed iteratively. That's a strong signal that this is exactly the kind of change a human maintainer should sign off on, both for the design choice (spawn_system_task vs an anyio-TaskGroup restructure) and to confirm the asyncio path truly remains behaviorally identical to PR #746.
Other factors
On the positive side: each fix landed with a targeted regression test (unblock-on-close, buffered-drain-after-close, contextvar parity, unhandled-exception logging), the asyncio path is a thin pass-through to loop.create_task(), existing #454/#751 guard tests are untouched, e2e roundtrips on both backends were posted, and the bug-hunting pass on b611d71 is clean. The remaining open inline thread (logging parity) is addressed by b611d71. I'm deferring rather than approving because the change is neither simple nor mechanical and touches a critical code path.
E2E Test Results — trio compatibilityBranch:
Test script"""E2E proof: query() and ClaudeSDKClient work under both asyncio and trio."""
import sys
import asyncio
import trio
from claude_agent_sdk import query, ClaudeSDKClient, ClaudeAgentOptions, AssistantMessage, TextBlock
OPTS = ClaudeAgentOptions(max_turns=1)
PROMPT = "say PONG and nothing else"
def extract_text(msgs):
out = []
for m in msgs:
if isinstance(m, AssistantMessage):
for b in m.content:
if isinstance(b, TextBlock):
out.append(b.text)
return " ".join(out).strip()
async def run_query():
msgs = []
async for m in query(prompt=PROMPT, options=OPTS):
msgs.append(m)
return extract_text(msgs)
async def run_client():
msgs = []
async with ClaudeSDKClient(options=OPTS) as client:
await client.query(PROMPT)
async for m in client.receive_response():
msgs.append(m)
return extract_text(msgs)
def check(label, fn):
try:
result = fn()
print(f"[PASS] {label}: {result!r}")
return True
except Exception as e:
print(f"[FAIL] {label}: {type(e).__name__}: {e}")
return False
if __name__ == "__main__":
mode = sys.argv[1] if len(sys.argv) > 1 else "all"
ok = True
if mode in ("all", "asyncio-query"):
ok &= check("asyncio query()", lambda: asyncio.run(run_query()))
if mode in ("all", "trio-query"):
ok &= check("trio query()", lambda: trio.run(run_query))
if mode in ("all", "trio-client"):
ok &= check("trio ClaudeSDKClient", lambda: trio.run(run_client))
sys.exit(0 if ok else 1)Output on PR branch (
|
The filter was added in #870 to suppress the receive-stream leak that this PR fixes; with disconnect() now closing the stream, the trio test no longer emits the warning. Verified with -W error::ResourceWarning.
Problem
PR #746 (v0.1.51+) replaced
anyio.TaskGroupwithasyncio.create_task()inQueryto fix #378 (100% CPU spin in_deliver_cancellation) and #454 (cross-task cancel-scopeRuntimeError). However,asyncio.get_running_loop()raisesRuntimeError: no running event loopunder trio, breakingClaudeSDKClient.connect()for trio users since v0.1.51:Approach
sniffio dispatch. Adds
_internal/_task_compat.pywith aTaskHandleabstraction andspawn_detached(coro):loop.create_task()wrapped in_AsyncioTaskHandle. Behaviorally identical to PR fix: resolve cross-task cancel scope RuntimeError on async generator cleanup (#454) #746 —cancel(),done(),add_done_callback(), andwait()are thin pass-throughs toasyncio.Task. Query.close() can hang indefinitely causing 100% CPU usage due to missing timeout on task group cleanup #378/RuntimeError on async generator cleanup due to task group context mismatch #454 stay fixed.trio.lowlevel.spawn_system_taskwith a per-taskCancelScopewrapped in_TrioTaskHandle.CancelScope.cancel()is sync and has no task affinity, soclose()from any task is safe (the RuntimeError on async generator cleanup due to task group context mismatch #454 invariant holds for trio too).Query.start/spawn_task/closeand_spawn_control_request_handlernow useTaskHandle.query.pyno longer importsasyncio; the two cancellation-exception sites useanyio.get_cancelled_exc_class().The full anyio-TaskGroup restructure was previously attempted in #364 and proved tricky; this change keeps the asyncio path untouched to minimize regression risk.
Why
trio.lowlevel.spawn_system_task?trio has no
create_task()equivalent by design (structured concurrency).spawn_system_taskis the documented escape hatch for detached tasks. Each spawned coro is wrapped intry/except BaseExceptionso a failure can never propagate asTrioInternalError; the exception is stored on the handle and re-raised bywait().Out of scope (follow-ups)
_internal/session_resume.py,_internal/transcript_mirror_batcher.py, and_internal/sessions.pyalso have directasynciousage. These are opt-in features gated behindoptions.session_storeand were never trio-compatible — not regressions from #746. Tracked separately.Testing
tests/test_task_compat.py— 9 unit tests, both backends (spawn/wait, cancel, done-callback, exception propagation, cross-task cancel)TestQueryTrioBackend(3 tests) —start/close/spawn_taskunderanyio.run(..., backend="trio")TestClaudeSDKClientTrioBackend::test_client_connect_under_trio— the repro above as a unit testTestQueryCrossTaskCleanup(RuntimeError on async generator cleanup due to task group context mismatch #454 guard) andTestControlCancelRequest(fix: implement control_cancel_request handling #751 guard) still passtrio.run()against live CLI returnsAssistantMessage+ResultMessage(success)Deps
Adds
sniffio>=1.0.0to runtime deps (already a transitive dep ofanyio>=4.0.0; just made explicit).