Skip to content

Commit a2c9516

Browse files
committed
fix(query): suppress redundant ProcessError after error result
When the CLI emits a result message with is_error=True (e.g. subtype=error_max_turns, error_during_execution) it then exits with a non-zero code. The transport raised ProcessError on that exit code, which _read_messages converted to a {type: error} stream message and receive_messages re-raised as a bare Exception — after the consumer had already received the structured ResultMessage. Track whether an error result was delivered; if so, treat the subsequent ProcessError as expected termination and let the stream end cleanly. ProcessError before any result, or after a success result, still propagates. Fixes #913.
1 parent 99479bc commit a2c9516

2 files changed

Lines changed: 131 additions & 8 deletions

File tree

src/claude_agent_sdk/_internal/query.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
ListToolsRequest,
1616
)
1717

18+
from .._errors import ProcessError
1819
from ..types import (
1920
PermissionMode,
2021
PermissionResultAllow,
@@ -128,6 +129,7 @@ def __init__(
128129

129130
# Track first result for proper stream closure with SDK MCP servers
130131
self._first_result_event = anyio.Event()
132+
self._got_error_result = False
131133

132134
# SessionStore mirroring (set via set_transcript_mirror_batcher)
133135
self._transcript_mirror_batcher: TranscriptMirrorBatcher | None = None
@@ -294,6 +296,8 @@ async def _read_messages(self) -> None:
294296
if self._transcript_mirror_batcher is not None:
295297
await self._transcript_mirror_batcher.flush()
296298
self._first_result_event.set()
299+
if message.get("is_error"):
300+
self._got_error_result = True
297301

298302
# Regular SDK messages go to the stream
299303
await self._message_send.send(message)
@@ -303,14 +307,25 @@ async def _read_messages(self) -> None:
303307
logger.debug("Read task cancelled")
304308
raise # Re-raise to properly handle cancellation
305309
except Exception as e:
306-
logger.error(f"Fatal error in message reader: {e}")
307-
# Signal all pending control requests so they fail fast instead of timing out
308-
for request_id, event in list(self.pending_control_responses.items()):
309-
if request_id not in self.pending_control_results:
310-
self.pending_control_results[request_id] = e
311-
event.set()
312-
# Put error in stream so iterators can handle it
313-
await self._message_send.send({"type": "error", "error": str(e)})
310+
if isinstance(e, ProcessError) and self._got_error_result:
311+
# CLI exits non-zero after emitting an error result
312+
# (error_max_turns, error_during_execution, ...). The consumer
313+
# already received the structured ResultMessage; don't follow
314+
# it with a redundant bare Exception.
315+
logger.debug(
316+
"CLI exited with code %s after error result; "
317+
"treating as clean termination",
318+
e.exit_code,
319+
)
320+
else:
321+
logger.error(f"Fatal error in message reader: {e}")
322+
# Signal all pending control requests so they fail fast instead of timing out
323+
for request_id, event in list(self.pending_control_responses.items()):
324+
if request_id not in self.pending_control_results:
325+
self.pending_control_results[request_id] = e
326+
event.set()
327+
# Put error in stream so iterators can handle it
328+
await self._message_send.send({"type": "error", "error": str(e)})
314329
finally:
315330
# Flush any remaining transcript mirror entries before closing so
316331
# an early stdout EOF or transport error doesn't drop entries

tests/test_query.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
query,
2323
tool,
2424
)
25+
from claude_agent_sdk._errors import ProcessError
2526
from claude_agent_sdk._internal.query import Query
2627
from claude_agent_sdk.types import HookMatcher
2728

@@ -949,3 +950,110 @@ async def _test():
949950
assert "fast_1" not in q._inflight_requests
950951

951952
asyncio.run(_test())
953+
954+
955+
class TestProcessExitAfterErrorResult:
956+
"""Regression tests for #913: when the CLI emits a result message with
957+
is_error=True (e.g. subtype=error_max_turns) and then exits non-zero,
958+
the SDK should treat that as clean termination — the consumer already
959+
received the structured ResultMessage and shouldn't see a redundant
960+
bare Exception."""
961+
962+
def _make_transport_then_raise(self, messages, exc):
963+
mock_transport = AsyncMock()
964+
965+
async def mock_receive():
966+
for msg in messages:
967+
yield msg
968+
raise exc
969+
970+
mock_transport.read_messages = mock_receive
971+
mock_transport.connect = AsyncMock()
972+
mock_transport.close = AsyncMock()
973+
mock_transport.end_input = AsyncMock()
974+
mock_transport.write = AsyncMock()
975+
mock_transport.is_ready = Mock(return_value=True)
976+
return mock_transport
977+
978+
def test_process_error_after_error_result_is_suppressed(self):
979+
async def _test():
980+
transport = self._make_transport_then_raise(
981+
messages=[
982+
{
983+
"type": "result",
984+
"subtype": "error_max_turns",
985+
"is_error": True,
986+
"num_turns": 60,
987+
"session_id": "s",
988+
"duration_ms": 1,
989+
"duration_api_ms": 1,
990+
"total_cost_usd": 0.0,
991+
}
992+
],
993+
exc=ProcessError(
994+
"Command failed with exit code 1", exit_code=1, stderr=""
995+
),
996+
)
997+
q = Query(transport=transport, is_streaming_mode=True)
998+
await q.start()
999+
1000+
received = []
1001+
async for msg in q.receive_messages():
1002+
received.append(msg)
1003+
await q.close()
1004+
1005+
assert len(received) == 1
1006+
assert received[0]["subtype"] == "error_max_turns"
1007+
1008+
anyio.run(_test)
1009+
1010+
def test_process_error_without_result_still_raises(self):
1011+
async def _test():
1012+
transport = self._make_transport_then_raise(
1013+
messages=[],
1014+
exc=ProcessError(
1015+
"Command failed with exit code 1", exit_code=1, stderr=""
1016+
),
1017+
)
1018+
q = Query(transport=transport, is_streaming_mode=True)
1019+
await q.start()
1020+
1021+
with pytest.raises(Exception, match="Command failed"):
1022+
async for _ in q.receive_messages():
1023+
pass
1024+
await q.close()
1025+
1026+
anyio.run(_test)
1027+
1028+
def test_process_error_after_success_result_still_raises(self):
1029+
async def _test():
1030+
transport = self._make_transport_then_raise(
1031+
messages=[
1032+
{
1033+
"type": "result",
1034+
"subtype": "success",
1035+
"is_error": False,
1036+
"num_turns": 1,
1037+
"session_id": "s",
1038+
"duration_ms": 1,
1039+
"duration_api_ms": 1,
1040+
"total_cost_usd": 0.0,
1041+
}
1042+
],
1043+
exc=ProcessError(
1044+
"Command failed with exit code 1", exit_code=1, stderr=""
1045+
),
1046+
)
1047+
q = Query(transport=transport, is_streaming_mode=True)
1048+
await q.start()
1049+
1050+
received = []
1051+
with pytest.raises(Exception, match="Command failed"):
1052+
async for msg in q.receive_messages():
1053+
received.append(msg)
1054+
await q.close()
1055+
1056+
assert len(received) == 1
1057+
assert received[0]["subtype"] == "success"
1058+
1059+
anyio.run(_test)

0 commit comments

Comments
 (0)