Skip to content

Commit bd3b7a6

Browse files
authored
fix: spawn wait_for_result_and_end_input as background task for string prompts (#780)
## Problem `query()` with a string prompt and hooks/MCP servers deadlocks once the internal 100-slot anyio message buffer fills up (~50 tool calls). Each tool call produces ~2 messages, so the buffer fills after about 50 tool calls. ### Root cause For string prompts, `client.py:141` awaited `wait_for_result_and_end_input()` **before** `receive_messages()` started draining the buffer: ```python if isinstance(prompt, str): await chosen_transport.write(json.dumps(user_message) + "\n") await query.wait_for_result_and_end_input() # blocks until "result" arrives async for data in query.receive_messages(): # buffer drain starts here ``` Meanwhile `_read_messages()` keeps reading CLI stdout and pushing into the 100-slot channel. After ~50 tool calls the channel is full and `_message_send.send()` blocks. Now `_read_messages` can't read anything else from stdout, including the "result" message that `wait_for_result_and_end_input` needs — deadlock. ## Fix Spawn `wait_for_result_and_end_input()` as a background task instead of awaiting it inline. This matches the existing `AsyncIterable` path which already uses `spawn_task(stream_input())`, and allows `receive_messages()` to start draining the buffer immediately. ```python # Before (deadlocks) await query.wait_for_result_and_end_input() # After (concurrent) query.spawn_task(query.wait_for_result_and_end_input()) ``` ## Testing - Added regression test verifying `spawn_task` is called instead of direct `await` - Fixed existing test warnings from unawaited mock coroutines - All 425 tests pass - Lint, format, and mypy all clean Fixes #779
1 parent 566e41f commit bd3b7a6

2 files changed

Lines changed: 64 additions & 1 deletion

File tree

src/claude_agent_sdk/_internal/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ async def process_query(
138138
"parent_tool_use_id": None,
139139
}
140140
await chosen_transport.write(json.dumps(user_message) + "\n")
141-
await query.wait_for_result_and_end_input()
141+
query.spawn_task(query.wait_for_result_and_end_input())
142142
elif isinstance(prompt, AsyncIterable):
143143
# Stream input in background for async iterables
144144
query.spawn_task(query.stream_input(prompt))

tests/test_client.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ async def _test():
155155
mock_query.close = AsyncMock()
156156
mock_query._tg = None
157157

158+
def _consume_coro(coro):
159+
coro.close()
160+
return Mock()
161+
162+
mock_query.spawn_task = Mock(side_effect=_consume_coro)
163+
158164
async def mock_receive():
159165
yield {
160166
"type": "result",
@@ -192,3 +198,60 @@ def test_query_uses_default_initialize_timeout(self):
192198
env_patch={},
193199
expected_timeout=60.0,
194200
)
201+
202+
def test_string_prompt_spawns_wait_for_result_as_task(self):
203+
"""Test that string prompts spawn wait_for_result_and_end_input as a background
204+
task instead of awaiting it inline, preventing deadlock when the message
205+
buffer fills up (e.g. >50 tool calls with hooks)."""
206+
207+
async def _test():
208+
with (
209+
patch(
210+
"claude_agent_sdk._internal.client.SubprocessCLITransport"
211+
) as mock_transport_class,
212+
patch("claude_agent_sdk._internal.client.Query") as mock_query_class,
213+
):
214+
mock_transport = AsyncMock()
215+
mock_transport_class.return_value = mock_transport
216+
mock_transport.connect = AsyncMock()
217+
mock_transport.close = AsyncMock()
218+
mock_transport.end_input = AsyncMock()
219+
mock_transport.write = AsyncMock()
220+
mock_transport.is_ready = Mock(return_value=True)
221+
222+
mock_query = AsyncMock()
223+
mock_query_class.return_value = mock_query
224+
mock_query.start = AsyncMock()
225+
mock_query.initialize = AsyncMock()
226+
mock_query.close = AsyncMock()
227+
mock_query._tg = None
228+
229+
def _consume_coro(coro):
230+
coro.close()
231+
return Mock()
232+
233+
mock_query.spawn_task = Mock(side_effect=_consume_coro)
234+
235+
async def mock_receive():
236+
yield {
237+
"type": "result",
238+
"subtype": "success",
239+
"duration_ms": 100,
240+
"duration_api_ms": 80,
241+
"is_error": False,
242+
"num_turns": 1,
243+
"session_id": "test",
244+
}
245+
246+
mock_query.receive_messages = mock_receive
247+
248+
async for _ in query(prompt="test", options=ClaudeAgentOptions()):
249+
pass
250+
251+
mock_query.spawn_task.assert_called_once()
252+
assert not mock_query.wait_for_result_and_end_input.await_args_list, (
253+
"wait_for_result_and_end_input should be spawned as a task, "
254+
"not awaited directly"
255+
)
256+
257+
anyio.run(_test)

0 commit comments

Comments
 (0)