Skip to content

Commit 23868b5

Browse files
committed
fix(query): use send_nowait for end sentinel in _read_messages finally
trio's level-triggered cancellation re-raises Cancelled at every checkpoint inside a cancelled scope, so 'await send(...)' in the finally block dropped the sentinel after close(), leaving a consumer blocked in receive_messages() hung. send_nowait is checkpoint-free. Also shields the transcript_mirror_batcher.flush() await in the same finally block so it runs to completion when reached via cancellation.
1 parent 194474c commit 23868b5

2 files changed

Lines changed: 90 additions & 4 deletions

File tree

src/claude_agent_sdk/_internal/query.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import uuid
77
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable
8+
from contextlib import suppress
89
from typing import TYPE_CHECKING, Any, Literal
910

1011
import anyio
@@ -313,14 +314,19 @@ async def _read_messages(self) -> None:
313314
finally:
314315
# Flush any remaining transcript mirror entries before closing so
315316
# an early stdout EOF or transport error doesn't drop entries
316-
# batched this turn. flush() never raises.
317+
# batched this turn. flush() never raises. Shielded so the await
318+
# still runs when this finally is reached via cancellation.
317319
if self._transcript_mirror_batcher is not None:
318-
await self._transcript_mirror_batcher.flush()
320+
with anyio.CancelScope(shield=True):
321+
await self._transcript_mirror_batcher.flush()
319322
# Unblock any waiters (e.g. string-prompt path waiting for first
320323
# result) so they don't stall for the full timeout on early exit.
321324
self._first_result_event.set()
322-
# Always signal end of stream
323-
await self._message_send.send({"type": "end"})
325+
# Always signal end of stream. send_nowait: trio's level-triggered
326+
# cancellation would re-raise Cancelled at an await checkpoint
327+
# here, dropping the sentinel and leaving receive_messages() hung.
328+
with suppress(anyio.WouldBlock):
329+
self._message_send.send_nowait({"type": "end"})
324330

325331
async def _handle_control_request(self, request: SDKControlRequest) -> None:
326332
"""Handle incoming control request from CLI."""

tests/test_query.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,86 @@ async def close_in_other_task():
669669

670670
anyio.run(_test, backend="trio")
671671

672+
@staticmethod
673+
def _make_blocking_transport():
674+
"""Mock transport whose read_messages() blocks forever.
675+
676+
Needed to reproduce the level-triggered-cancellation hang: the
677+
read task must still be running when close() cancels it, so the
678+
finally block executes inside a cancelled scope.
679+
"""
680+
mock_transport = AsyncMock()
681+
682+
async def blocking_read():
683+
await anyio.Event().wait() # never set
684+
yield {} # pragma: no cover - unreachable, makes this a generator
685+
686+
mock_transport.read_messages = blocking_read
687+
mock_transport.connect = AsyncMock()
688+
mock_transport.close = AsyncMock()
689+
mock_transport.end_input = AsyncMock()
690+
mock_transport.write = AsyncMock()
691+
mock_transport.is_ready = Mock(return_value=True)
692+
return mock_transport
693+
694+
def test_receive_messages_unblocks_on_close_under_trio(self):
695+
"""Consumer blocked in receive_messages() must unblock on close().
696+
697+
trio's level-triggered cancellation re-raises Cancelled at every
698+
checkpoint inside a cancelled scope; if the end sentinel is sent
699+
via ``await send()`` in the read task's ``finally``, it is dropped
700+
and the consumer hangs. ``send_nowait`` is checkpoint-free.
701+
"""
702+
703+
async def _test():
704+
with anyio.fail_after(5.0):
705+
mock_transport = self._make_blocking_transport()
706+
q = Query(transport=mock_transport, is_streaming_mode=True)
707+
await q.start()
708+
709+
consumer_done = anyio.Event()
710+
711+
async def consumer():
712+
async for _msg in q.receive_messages():
713+
pass
714+
consumer_done.set()
715+
716+
async with anyio.create_task_group() as tg:
717+
tg.start_soon(consumer)
718+
await anyio.sleep(0.01) # let consumer block on receive
719+
await q.close()
720+
await consumer_done.wait()
721+
722+
assert consumer_done.is_set()
723+
724+
anyio.run(_test, backend="trio")
725+
726+
def test_receive_messages_unblocks_on_close_under_asyncio(self):
727+
"""asyncio parity for the unblock-on-close test above."""
728+
729+
async def _test():
730+
with anyio.fail_after(5.0):
731+
mock_transport = self._make_blocking_transport()
732+
q = Query(transport=mock_transport, is_streaming_mode=True)
733+
await q.start()
734+
735+
consumer_done = anyio.Event()
736+
737+
async def consumer():
738+
async for _msg in q.receive_messages():
739+
pass
740+
consumer_done.set()
741+
742+
async with anyio.create_task_group() as tg:
743+
tg.start_soon(consumer)
744+
await anyio.sleep(0.01)
745+
await q.close()
746+
await consumer_done.wait()
747+
748+
assert consumer_done.is_set()
749+
750+
anyio.run(_test, backend="asyncio")
751+
672752

673753
class TestControlCancelRequest:
674754
"""Tests for control_cancel_request handling (issue #739).

0 commit comments

Comments
 (0)