-
Notifications
You must be signed in to change notification settings - Fork 949
fix(query): restore trio compatibility via sniffio dispatch #870
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
194474c
fix(query): restore trio compatibility via sniffio dispatch
qing-ant 23868b5
fix(query): use send_nowait for end sentinel in _read_messages finally
qing-ant 2daf967
fix(_task_compat,query): contextvar parity + close message streams
qing-ant e3d8c51
fix(query): don't close _message_receive in close() — consumer owns it
qing-ant b611d71
feat(_task_compat): log unhandled trio child-task exceptions for pari…
qing-ant File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| """Backend-agnostic detached task spawning. | ||
|
|
||
| ``Query`` manages background tasks (the read loop, ``stream_input``, | ||
| control-request handlers) that must be cancellable from any task context | ||
| — including async-generator finalizers, which Python may run in a | ||
| different task than the one that called ``start()``. anyio's | ||
| ``TaskGroup`` cannot be used for this because its cancel scope has task | ||
| affinity: exiting it from a different task either raises ``RuntimeError: | ||
| Attempted to exit cancel scope in a different task than it was entered | ||
| in`` or busy-spins in ``_deliver_cancellation`` on the asyncio backend. | ||
|
|
||
| Under asyncio this is solved with plain ``loop.create_task()``, but that | ||
| raises ``RuntimeError: no running event loop`` under trio. This module | ||
| provides ``spawn_detached()`` which dispatches via sniffio to the | ||
| appropriate backend primitive, returning a uniform ``TaskHandle``. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import contextvars | ||
| from collections.abc import Callable, Coroutine | ||
| from contextlib import suppress | ||
| from typing import Any | ||
|
|
||
| import sniffio | ||
|
|
||
|
|
||
| class TaskHandle: | ||
| """Backend-agnostic handle to a detached background task. | ||
|
|
||
| Safe to ``.cancel()`` from any task — no anyio cancel-scope task | ||
| affinity. | ||
| """ | ||
|
|
||
| def cancel(self) -> None: | ||
| """Request cancellation of the wrapped task.""" | ||
| raise NotImplementedError | ||
|
|
||
| def done(self) -> bool: | ||
| """Return True if the wrapped task has finished.""" | ||
| raise NotImplementedError | ||
|
|
||
| def add_done_callback(self, callback: Callable[[TaskHandle], None]) -> None: | ||
| """Register ``callback(self)`` to run when the task finishes.""" | ||
| raise NotImplementedError | ||
|
|
||
| async def wait(self) -> None: | ||
| """Wait for the task to finish. | ||
|
|
||
| Suppresses the backend's cancellation exception (the task was | ||
| cancelled by us) but re-raises any other exception the task | ||
| raised. | ||
| """ | ||
| raise NotImplementedError | ||
|
|
||
|
|
||
| class _AsyncioTaskHandle(TaskHandle): | ||
| """Thin wrapper around ``asyncio.Task``.""" | ||
|
|
||
| def __init__(self, task: Any) -> None: | ||
| self._task = task | ||
|
|
||
| def cancel(self) -> None: | ||
| self._task.cancel() | ||
|
|
||
| def done(self) -> bool: | ||
| return bool(self._task.done()) | ||
|
|
||
| def add_done_callback(self, callback: Callable[[TaskHandle], None]) -> None: | ||
| self._task.add_done_callback(lambda _t: callback(self)) | ||
|
|
||
| async def wait(self) -> None: | ||
| import asyncio | ||
|
|
||
| with suppress(asyncio.CancelledError): | ||
| await self._task | ||
|
|
||
|
|
||
| class _TrioTaskHandle(TaskHandle): | ||
| """Wraps a trio system task with its own ``CancelScope``.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| import trio | ||
|
|
||
| self._cancel_scope = trio.CancelScope() | ||
| self._done_event = trio.Event() | ||
| self._exception: BaseException | None = None | ||
| self._callbacks: list[Callable[[TaskHandle], None]] = [] | ||
|
|
||
| def cancel(self) -> None: | ||
| # CancelScope.cancel() is sync and safe to call from any task. | ||
| self._cancel_scope.cancel() | ||
|
|
||
| def done(self) -> bool: | ||
| return self._done_event.is_set() | ||
|
|
||
| def add_done_callback(self, callback: Callable[[TaskHandle], None]) -> None: | ||
| if self.done(): | ||
| callback(self) | ||
| else: | ||
| self._callbacks.append(callback) | ||
|
|
||
| def _mark_done(self, exc: BaseException | None) -> None: | ||
| self._exception = exc | ||
| self._done_event.set() | ||
| for cb in self._callbacks: | ||
| # Suppress BaseException so a misbehaving callback can never | ||
| # propagate out of the system-task _runner (which would crash | ||
| # trio with TrioInternalError). The actual callbacks used here | ||
| # are set.discard / dict.pop, so this is purely defensive. | ||
| with suppress(BaseException): | ||
| cb(self) | ||
| self._callbacks.clear() | ||
|
|
||
| async def wait(self) -> None: | ||
| import trio | ||
|
|
||
| await self._done_event.wait() | ||
| if self._exception is not None and not isinstance( | ||
| self._exception, trio.Cancelled | ||
| ): | ||
| raise self._exception | ||
|
|
||
|
|
||
| def spawn_detached(coro: Coroutine[Any, Any, Any]) -> TaskHandle: | ||
| """Spawn ``coro`` as a detached background task on the current backend. | ||
|
|
||
| - **asyncio**: ``asyncio.get_running_loop().create_task(coro)``. | ||
| - **trio**: ``trio.lowlevel.spawn_system_task`` wrapping ``coro`` in a | ||
| per-task ``CancelScope`` so the handle supports ``.cancel()``. | ||
| """ | ||
| backend = sniffio.current_async_library() | ||
| if backend == "asyncio": | ||
| import asyncio | ||
|
|
||
| loop = asyncio.get_running_loop() | ||
| return _AsyncioTaskHandle(loop.create_task(coro)) | ||
| if backend == "trio": | ||
| import trio | ||
|
|
||
| handle = _TrioTaskHandle() | ||
|
|
||
| async def _runner() -> None: | ||
| exc: BaseException | None = None | ||
| try: | ||
| with handle._cancel_scope: | ||
| await coro | ||
| except BaseException as e: # noqa: BLE001 | ||
| # System tasks must not raise (would crash trio). Store | ||
| # the exception on the handle; ``.wait()`` re-raises it. | ||
| exc = e | ||
| finally: | ||
| handle._mark_done(exc) | ||
|
claude[bot] marked this conversation as resolved.
|
||
|
|
||
| # Pass context= so trio system tasks inherit the caller's | ||
| # contextvars (asyncio's loop.create_task() does this implicitly; | ||
| # spawn_system_task does not). | ||
| trio.lowlevel.spawn_system_task(_runner, context=contextvars.copy_context()) | ||
| return handle | ||
| # Unsupported backend: close the coroutine so we don't leak a "coroutine | ||
| # was never awaited" RuntimeWarning on top of the RuntimeError. | ||
| coro.close() | ||
| raise RuntimeError( | ||
| f"Unsupported async backend: {backend!r}. " | ||
| "claude_agent_sdk requires asyncio or trio." | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.