Skip to content

Commit a3364fc

Browse files
committed
review: track most-recent result; fail-fast control requests on suppressed exit
- _got_error_result now tracks the most recent result (bool(is_error)) rather than latching True forever, so [error result -> success result -> ProcessError] still propagates in long-lived ClaudeSDKClient sessions. - Hoist the pending_control_responses fail-fast loop above the if/else so in-flight control requests (interrupt, set_model, ...) fail immediately even when the ProcessError is suppressed for the message stream. - Add tests for both.
1 parent a2c9516 commit a3364fc

2 files changed

Lines changed: 89 additions & 7 deletions

File tree

src/claude_agent_sdk/_internal/query.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,7 @@ async def _read_messages(self) -> None:
296296
if self._transcript_mirror_batcher is not None:
297297
await self._transcript_mirror_batcher.flush()
298298
self._first_result_event.set()
299-
if message.get("is_error"):
300-
self._got_error_result = True
299+
self._got_error_result = bool(message.get("is_error"))
301300

302301
# Regular SDK messages go to the stream
303302
await self._message_send.send(message)
@@ -307,6 +306,11 @@ async def _read_messages(self) -> None:
307306
logger.debug("Read task cancelled")
308307
raise # Re-raise to properly handle cancellation
309308
except Exception as e:
309+
# Signal all pending control requests so they fail fast instead of timing out
310+
for request_id, event in list(self.pending_control_responses.items()):
311+
if request_id not in self.pending_control_results:
312+
self.pending_control_results[request_id] = e
313+
event.set()
310314
if isinstance(e, ProcessError) and self._got_error_result:
311315
# CLI exits non-zero after emitting an error result
312316
# (error_max_turns, error_during_execution, ...). The consumer
@@ -319,11 +323,6 @@ async def _read_messages(self) -> None:
319323
)
320324
else:
321325
logger.error(f"Fatal error in message reader: {e}")
322-
# Signal all pending control requests so they fail fast instead of timing out
323-
for request_id, event in list(self.pending_control_responses.items()):
324-
if request_id not in self.pending_control_results:
325-
self.pending_control_results[request_id] = e
326-
event.set()
327326
# Put error in stream so iterators can handle it
328327
await self._message_send.send({"type": "error", "error": str(e)})
329328
finally:

tests/test_query.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,3 +1057,86 @@ async def _test():
10571057
assert received[0]["subtype"] == "success"
10581058

10591059
anyio.run(_test)
1060+
1061+
def test_process_error_after_error_then_success_result_still_raises(self):
1062+
"""The flag tracks the *most recent* result, not a sticky latch."""
1063+
1064+
async def _test():
1065+
transport = self._make_transport_then_raise(
1066+
messages=[
1067+
{
1068+
"type": "result",
1069+
"subtype": "error_during_execution",
1070+
"is_error": True,
1071+
"num_turns": 1,
1072+
"session_id": "s",
1073+
"duration_ms": 1,
1074+
"duration_api_ms": 1,
1075+
"total_cost_usd": 0.0,
1076+
},
1077+
{
1078+
"type": "result",
1079+
"subtype": "success",
1080+
"is_error": False,
1081+
"num_turns": 2,
1082+
"session_id": "s",
1083+
"duration_ms": 1,
1084+
"duration_api_ms": 1,
1085+
"total_cost_usd": 0.0,
1086+
},
1087+
],
1088+
exc=ProcessError(
1089+
"Command failed with exit code 1", exit_code=1, stderr=""
1090+
),
1091+
)
1092+
q = Query(transport=transport, is_streaming_mode=True)
1093+
await q.start()
1094+
1095+
received = []
1096+
with pytest.raises(Exception, match="Command failed"):
1097+
async for msg in q.receive_messages():
1098+
received.append(msg)
1099+
await q.close()
1100+
1101+
assert len(received) == 2
1102+
1103+
anyio.run(_test)
1104+
1105+
def test_pending_control_requests_fail_fast_on_suppressed_exit(self):
1106+
"""Even when the ProcessError is suppressed for the message stream,
1107+
in-flight control requests must still fail fast (process is dead;
1108+
no control_response will ever arrive)."""
1109+
1110+
async def _test():
1111+
transport = self._make_transport_then_raise(
1112+
messages=[
1113+
{
1114+
"type": "result",
1115+
"subtype": "error_max_turns",
1116+
"is_error": True,
1117+
"num_turns": 1,
1118+
"session_id": "s",
1119+
"duration_ms": 1,
1120+
"duration_api_ms": 1,
1121+
"total_cost_usd": 0.0,
1122+
}
1123+
],
1124+
exc=ProcessError(
1125+
"Command failed with exit code 1", exit_code=1, stderr=""
1126+
),
1127+
)
1128+
q = Query(transport=transport, is_streaming_mode=True)
1129+
1130+
# Register a pending control request before the read loop runs.
1131+
event = anyio.Event()
1132+
q.pending_control_responses["req_1"] = event
1133+
1134+
await q.start()
1135+
async for _ in q.receive_messages():
1136+
pass
1137+
await q.close()
1138+
1139+
assert event.is_set()
1140+
assert isinstance(q.pending_control_results["req_1"], ProcessError)
1141+
1142+
anyio.run(_test)

0 commit comments

Comments
 (0)