Skip to content

fix(query): restore trio compatibility via sniffio dispatch#870

Merged
qing-ant merged 5 commits intomainfrom
qing/fix-trio-compat-query
Apr 24, 2026
Merged

fix(query): restore trio compatibility via sniffio dispatch#870
qing-ant merged 5 commits intomainfrom
qing/fix-trio-compat-query

Conversation

@qing-ant
Copy link
Copy Markdown
Contributor

@qing-ant qing-ant commented Apr 24, 2026

Problem

PR #746 (v0.1.51+) replaced anyio.TaskGroup with asyncio.create_task() in Query to fix #378 (100% CPU spin in _deliver_cancellation) and #454 (cross-task cancel-scope RuntimeError). However, asyncio.get_running_loop() raises RuntimeError: no running event loop under trio, breaking ClaudeSDKClient.connect() for trio users since v0.1.51:

import trio
from claude_agent_sdk import ClaudeSDKClient
async def main():
    async with ClaudeSDKClient() as c:  # RuntimeError: no running event loop
        ...
trio.run(main)

Approach

sniffio dispatch. Adds _internal/_task_compat.py with a TaskHandle abstraction and spawn_detached(coro):

Query.start/spawn_task/close and _spawn_control_request_handler now use TaskHandle. query.py no longer imports asyncio; the two cancellation-exception sites use anyio.get_cancelled_exc_class().

The full anyio-TaskGroup restructure was previously attempted in #364 and proved tricky; this change keeps the asyncio path untouched to minimize regression risk.

Why trio.lowlevel.spawn_system_task?

trio has no create_task() equivalent by design (structured concurrency). spawn_system_task is the documented escape hatch for detached tasks. Each spawned coro is wrapped in try/except BaseException so a failure can never propagate as TrioInternalError; the exception is stored on the handle and re-raised by wait().

Out of scope (follow-ups)

_internal/session_resume.py, _internal/transcript_mirror_batcher.py, and _internal/sessions.py also have direct asyncio usage. These are opt-in features gated behind options.session_store and were never trio-compatible — not regressions from #746. Tracked separately.

Testing

  • New tests/test_task_compat.py — 9 unit tests, both backends (spawn/wait, cancel, done-callback, exception propagation, cross-task cancel)
  • New TestQueryTrioBackend (3 tests) — start/close/spawn_task under anyio.run(..., backend="trio")
  • New TestClaudeSDKClientTrioBackend::test_client_connect_under_trio — the repro above as a unit test
  • Existing TestQueryCrossTaskCleanup (RuntimeError on async generator cleanup due to task group context mismatch #454 guard) and TestControlCancelRequest (fix: implement control_cancel_request handling #751 guard) still pass
  • 748 passed, 3 skipped; ruff + mypy clean
  • Manual e2e: real query under trio.run() against live CLI returns AssistantMessage + ResultMessage(success)

Deps

Adds sniffio>=1.0.0 to runtime deps (already a transitive dep of anyio>=4.0.0; just made explicit).

@qing-ant
Copy link
Copy Markdown
Contributor Author

qing-ant commented Apr 24, 2026

E2E Test Results

Real API roundtrips on both async backends. query() exercises Query.start()/close(); ClaudeSDKClient additionally exercises spawn_task() + cancel()/wait() on disconnect(). Ran the trio path against main to confirm the regression.

PR branch (qing/fix-trio-compat-query @ 194474c)

Backend query() ClaudeSDKClient
asyncio OK — assistant said 'pong' — (covered by existing suite)
trio OK — assistant said 'pong' OK — assistant said 'pong'
=== E2E: trio compat for query() ===

[asyncio] OK — assistant said: 'pong'
[trio] OK — assistant said: 'pong'

RESULT: PASS
[trio ClaudeSDKClient] OK — assistant said: 'pong'

main (regression baseline, c1eb34e)

[trio on main] FAILED as expected: RuntimeError: no running event loop

Summary: trio is broken on main (RuntimeError: no running event loop from asyncio.get_running_loop()); on this branch both query() and ClaudeSDKClient complete a real API roundtrip under asyncio and trio. PASS.

@qing-ant qing-ant force-pushed the qing/fix-trio-compat-query branch from a34f84e to 1faa93c Compare April 24, 2026 20:33
@qing-ant qing-ant marked this pull request as ready for review April 24, 2026 20:50
PR #746 replaced anyio.TaskGroup with asyncio.create_task() in Query to
fix #378 (100% CPU spin) and #454 (cross-task cancel-scope RuntimeError),
but asyncio.get_running_loop() raises 'RuntimeError: no running event
loop' under trio, breaking ClaudeSDKClient.connect() for trio users.

This adds _internal/_task_compat.py with a TaskHandle abstraction that
dispatches via sniffio: asyncio uses loop.create_task() (unchanged from
PR #746, so #378/#454 stay fixed); trio uses
trio.lowlevel.spawn_system_task with a per-task CancelScope.

Query.start/spawn_task/close now use TaskHandle and no longer import
asyncio. Adds trio-backend tests for Query and ClaudeSDKClient.connect.
@qing-ant qing-ant force-pushed the qing/fix-trio-compat-query branch from 1faa93c to 194474c Compare April 24, 2026 20:53
Comment thread src/claude_agent_sdk/_internal/_task_compat.py
Comment thread src/claude_agent_sdk/_internal/_task_compat.py Outdated
trio's level-triggered cancellation re-raises Cancelled at every
checkpoint inside a cancelled scope, so 'await send(...)' in the
finally block dropped the sentinel after close(), leaving a consumer
blocked in receive_messages() hung. send_nowait is checkpoint-free.

Also shields the transcript_mirror_batcher.flush() await in the same
finally block so it runs to completion when reached via cancellation.
Comment thread src/claude_agent_sdk/_internal/query.py
- spawn_detached(trio): pass context=contextvars.copy_context() to
  trio.lowlevel.spawn_system_task so spawned tasks inherit the caller's
  contextvars, matching asyncio's loop.create_task() semantics. Adds
  TestContextVarPropagation parity tests.
- _read_messages finally: close _message_send after the send_nowait so
  receivers exit on EndOfStream even if the sentinel was dropped due to
  WouldBlock (buffer-full natural-EOF path).
- Query.close(): close _message_send/_message_receive (sync, idempotent)
  to drop the 'ResourceWarning: Unclosed <MemoryObject*Stream>' warnings
  surfaced by the new trio-backend tests.
Comment thread src/claude_agent_sdk/_internal/query.py Outdated
Closing the receive stream from close() makes a non-parked consumer hit
ClosedResourceError on its next iteration and drop buffered messages.
_message_send.close() alone provides correct EndOfStream termination
after the buffer drains.

Adds test_buffered_messages_drain_after_close_{asyncio,trio} which fail
with ClosedResourceError when _message_receive.close() is present.
ResourceWarning for the unclosed receive stream is filtered in the new
trio test classes (cosmetic; pre-existing on main, consumer-owned).
Comment thread src/claude_agent_sdk/_internal/_task_compat.py
Copy link
Copy Markdown
Contributor

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No further issues found after b611d71 — all five earlier review points are addressed with regression tests. That said, this reworks the core Query task lifecycle around a new detached-task abstraction (including trio.lowlevel.spawn_system_task and altered end-of-stream/cancellation semantics in _read_messages' finally), so it's worth a human pass before merging.

Extended reasoning...

Overview

Adds _internal/_task_compat.py (new ~176-line module: TaskHandle ABC, _AsyncioTaskHandle, _TrioTaskHandle, spawn_detached() with sniffio dispatch) and rewires Query.start/spawn_task/_spawn_control_request_handler/close plus _read_messages' finally block to use it. query.py drops its asyncio import in favour of anyio.get_cancelled_exc_class(), switches the end sentinel from await send() to send_nowait() + _message_send.close(), and shields the batcher flush. Adds sniffio as an explicit runtime dep (already transitive via anyio). ~440 lines of new tests across three files covering both backends.

Security risks

None identified. No auth, crypto, permissions, subprocess-argument, or deserialization changes; the diff is confined to internal async task scheduling and test scaffolding. The new dep (sniffio) is already a transitive dependency of anyio.

Level of scrutiny

High. Query is the SDK's core bidirectional message pump — every query() and ClaudeSDKClient call flows through it. The change deliberately steps outside structured concurrency via trio.lowlevel.spawn_system_task (a documented but low-level escape hatch) and alters cancellation/stream-close semantics that interact with level- vs edge-triggered cancellation, anyio memory-stream buffer/close ordering, and cross-task finalization. The review history bears this out: four distinct concurrency edge cases (level-triggered cancel dropping the sentinel, contextvar non-propagation, WouldBlock on full buffer at EOF, ClosedResourceError from closing the receive side) were found and fixed iteratively. That's a strong signal that this is exactly the kind of change a human maintainer should sign off on, both for the design choice (spawn_system_task vs an anyio-TaskGroup restructure) and to confirm the asyncio path truly remains behaviorally identical to PR #746.

Other factors

On the positive side: each fix landed with a targeted regression test (unblock-on-close, buffered-drain-after-close, contextvar parity, unhandled-exception logging), the asyncio path is a thin pass-through to loop.create_task(), existing #454/#751 guard tests are untouched, e2e roundtrips on both backends were posted, and the bug-hunting pass on b611d71 is clean. The remaining open inline thread (logging parity) is addressed by b611d71. I'm deferring rather than approving because the change is neither simple nor mechanical and touches a critical code path.

@qing-ant qing-ant enabled auto-merge (squash) April 24, 2026 22:04
@qing-ant
Copy link
Copy Markdown
Contributor Author

E2E Test Results — trio compatibility

Branch: qing/fix-trio-compat-query @ b611d71
Env: Python 3.13.12, trio 0.31.0+dev, sniffio 1.3.0, anyio 4.9.0

Check Runtime Result
query() one-shot, real API asyncio PASS
query() one-shot, real API trio PASS
ClaudeSDKClient context-manager, real API trio PASS
query() one-shot on main @ c1eb34e trio FAIL (expected regression)

Test script

"""E2E proof: query() and ClaudeSDKClient work under both asyncio and trio."""
import sys
import asyncio
import trio
from claude_agent_sdk import query, ClaudeSDKClient, ClaudeAgentOptions, AssistantMessage, TextBlock

OPTS = ClaudeAgentOptions(max_turns=1)
PROMPT = "say PONG and nothing else"


def extract_text(msgs):
    out = []
    for m in msgs:
        if isinstance(m, AssistantMessage):
            for b in m.content:
                if isinstance(b, TextBlock):
                    out.append(b.text)
    return " ".join(out).strip()


async def run_query():
    msgs = []
    async for m in query(prompt=PROMPT, options=OPTS):
        msgs.append(m)
    return extract_text(msgs)


async def run_client():
    msgs = []
    async with ClaudeSDKClient(options=OPTS) as client:
        await client.query(PROMPT)
        async for m in client.receive_response():
            msgs.append(m)
    return extract_text(msgs)


def check(label, fn):
    try:
        result = fn()
        print(f"[PASS] {label}: {result!r}")
        return True
    except Exception as e:
        print(f"[FAIL] {label}: {type(e).__name__}: {e}")
        return False


if __name__ == "__main__":
    mode = sys.argv[1] if len(sys.argv) > 1 else "all"
    ok = True
    if mode in ("all", "asyncio-query"):
        ok &= check("asyncio query()", lambda: asyncio.run(run_query()))
    if mode in ("all", "trio-query"):
        ok &= check("trio    query()", lambda: trio.run(run_query))
    if mode in ("all", "trio-client"):
        ok &= check("trio    ClaudeSDKClient", lambda: trio.run(run_client))
    sys.exit(0 if ok else 1)

Output on PR branch (b611d71)

$ timeout 45 python /tmp/e2e_trio_compat.py asyncio-query
[PASS] asyncio query(): 'PONG'

$ timeout 45 python /tmp/e2e_trio_compat.py trio-query
[PASS] trio    query(): 'PONG'

$ timeout 45 python /tmp/e2e_trio_compat.py trio-client
[PASS] trio    ClaudeSDKClient: 'PONG'

Regression baseline on main (c1eb34e)

$ timeout 45 python /tmp/e2e_trio_compat.py trio-query
[FAIL] trio    query(): RuntimeError: no running event loop
---exit=1---

RESULT: PASS — trio compatibility is restored; asyncio path unaffected; main reproduces the RuntimeError: no running event loop regression that this PR fixes.

@qing-ant qing-ant merged commit 62c8bfd into main Apr 24, 2026
16 checks passed
@qing-ant qing-ant deleted the qing/fix-trio-compat-query branch April 24, 2026 23:24
qing-ant pushed a commit that referenced this pull request May 4, 2026
The filter was added in #870 to suppress the receive-stream leak that
this PR fixes; with disconnect() now closing the stream, the trio test
no longer emits the warning. Verified with -W error::ResourceWarning.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Query.close() can hang indefinitely causing 100% CPU usage due to missing timeout on task group cleanup

2 participants