Skip to content

Commit 70ce829

Browse files
committed
fix: preserve responses terminal stream events
1 parent 54f853f commit 70ce829

7 files changed

Lines changed: 201 additions & 26 deletions

File tree

src/agents/exceptions.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@
1616

1717
from .util._pretty_print import pretty_print_run_error_details
1818

19+
_DRAIN_STREAM_EVENTS_ATTR = "_agents_drain_queued_stream_events"
20+
21+
22+
def _mark_error_to_drain_stream_events(error: Exception) -> None:
23+
setattr(error, _DRAIN_STREAM_EVENTS_ATTR, True)
24+
25+
26+
def _should_drain_stream_events_before_raising(error: Exception) -> bool:
27+
return bool(getattr(error, _DRAIN_STREAM_EVENTS_ATTR, False))
28+
1929

2030
@dataclass
2131
class RunErrorDetails:

src/agents/models/_response_terminal.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
from __future__ import annotations
22

3+
from typing import Any
4+
35
from openai.types.responses import Response
46

7+
from ..exceptions import ModelBehaviorError, _mark_error_to_drain_stream_events
8+
59

610
def format_response_terminal_failure(
711
event_type: str,
@@ -25,3 +29,36 @@ def format_response_terminal_failure(
2529
if details:
2630
message = f"{message} {'; '.join(details)}."
2731
return message
32+
33+
34+
def format_response_error_event(event_type: str, event: Any) -> str:
35+
message = f"Responses stream ended with terminal event `{event_type}`."
36+
details: list[str] = []
37+
code = getattr(event, "code", None)
38+
if code:
39+
details.append(f"code={code}")
40+
error_message = getattr(event, "message", None)
41+
if error_message:
42+
details.append(f"message={error_message}")
43+
param = getattr(event, "param", None)
44+
if param:
45+
details.append(f"param={param}")
46+
47+
if details:
48+
message = f"{message} {'; '.join(details)}."
49+
return message
50+
51+
52+
def response_terminal_failure_error(
53+
event_type: str,
54+
response: Response | None,
55+
) -> ModelBehaviorError:
56+
error = ModelBehaviorError(format_response_terminal_failure(event_type, response))
57+
_mark_error_to_drain_stream_events(error)
58+
return error
59+
60+
61+
def response_error_event_failure_error(event_type: str, event: Any) -> ModelBehaviorError:
62+
error = ModelBehaviorError(format_response_error_event(event_type, event))
63+
_mark_error_to_drain_stream_events(error)
64+
return error

src/agents/models/openai_responses.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
from ..util._json import _to_dump_compatible
6969
from ..version import __version__
7070
from ._openai_retry import get_openai_retry_advice
71-
from ._response_terminal import format_response_terminal_failure
71+
from ._response_terminal import response_error_event_failure_error, response_terminal_failure_error
7272
from ._retry_runtime import (
7373
should_disable_provider_managed_retries,
7474
should_disable_websocket_pre_event_retries,
@@ -569,18 +569,22 @@ async def stream_response(
569569
"response.incomplete",
570570
}:
571571
terminal_response = getattr(chunk, "response", None)
572-
terminal_failure_error = ModelBehaviorError(
573-
format_response_terminal_failure(
574-
cast(str, chunk_type),
575-
terminal_response
576-
if isinstance(terminal_response, Response)
577-
else None,
578-
)
572+
terminal_failure_error = response_terminal_failure_error(
573+
cast(str, chunk_type),
574+
terminal_response
575+
if isinstance(terminal_response, Response)
576+
else None,
577+
)
578+
elif chunk_type in {"error", "response.error"}:
579+
terminal_failure_error = response_error_event_failure_error(
580+
cast(str, chunk_type),
581+
chunk,
579582
)
580583
if chunk_type in {
581584
"response.completed",
582585
"response.failed",
583586
"response.incomplete",
587+
"error",
584588
"response.error",
585589
}:
586590
yielded_terminal_event = True
@@ -1099,11 +1103,9 @@ async def _fetch_response(
10991103
elif event_type in {"response.incomplete", "response.failed"}:
11001104
terminal_event_type = cast(str, event_type)
11011105
terminal_response = getattr(event, "response", None)
1102-
raise ModelBehaviorError(
1103-
format_response_terminal_failure(
1104-
terminal_event_type,
1105-
terminal_response if isinstance(terminal_response, Response) else None,
1106-
)
1106+
raise response_terminal_failure_error(
1107+
terminal_event_type,
1108+
terminal_response if isinstance(terminal_response, Response) else None,
11071109
)
11081110

11091111
if final_response is None:

src/agents/result.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
InputGuardrailTripwireTriggered,
1919
MaxTurnsExceeded,
2020
RunErrorDetails,
21+
_should_drain_stream_events_before_raising,
2122
)
2223
from .guardrail import InputGuardrailResult, OutputGuardrailResult
2324
from .items import (
@@ -705,7 +706,12 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
705706
try:
706707
while True:
707708
self._check_errors()
708-
should_drain_queued_events = isinstance(self._stored_exception, MaxTurnsExceeded)
709+
should_drain_queued_events = isinstance(
710+
self._stored_exception, MaxTurnsExceeded
711+
) or (
712+
self._stored_exception is not None
713+
and _should_drain_stream_events_before_raising(self._stored_exception)
714+
)
709715
if self._stored_exception and (
710716
not should_drain_queued_events or self._event_queue.empty()
711717
):

src/agents/run_internal/run_loop.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@
5858
from ..lifecycle import RunHooks
5959
from ..logger import logger
6060
from ..memory import Session
61-
from ..models._response_terminal import format_response_terminal_failure
61+
from ..models._response_terminal import (
62+
response_error_event_failure_error,
63+
response_terminal_failure_error,
64+
)
6265
from ..result import RunResultStreaming
6366
from ..run_config import ReasoningItemIdPolicy, RunConfig
6467
from ..run_context import AgentHookContext, RunContextWrapper, TContext
@@ -1487,12 +1490,12 @@ async def rewind_model_request() -> None:
14871490
elif getattr(event, "type", None) in {"response.incomplete", "response.failed"}:
14881491
event_type = cast(str, event.type)
14891492
maybe_response = getattr(event, "response", None)
1490-
raise ModelBehaviorError(
1491-
format_response_terminal_failure(
1492-
event_type,
1493-
maybe_response if isinstance(maybe_response, Response) else None,
1494-
)
1493+
raise response_terminal_failure_error(
1494+
event_type,
1495+
maybe_response if isinstance(maybe_response, Response) else None,
14951496
)
1497+
elif getattr(event, "type", None) in {"error", "response.error"}:
1498+
raise response_error_event_failure_error(cast(str, event.type), event)
14961499

14971500
if terminal_response is not None:
14981501
if is_completed_event and not terminal_response.output and streamed_response_output:

tests/models/test_openai_responses.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import httpx
99
import pytest
1010
from openai import NOT_GIVEN, APIConnectionError, RateLimitError, omit
11-
from openai.types.responses import ResponseCompletedEvent
11+
from openai.types.responses import ResponseCompletedEvent, ResponseErrorEvent
1212
from openai.types.shared.reasoning import Reasoning
1313

1414
from agents import (
@@ -1770,6 +1770,56 @@ async def fake_open(
17701770
assert cast(Any, events[0]).response.id == "resp-terminal"
17711771

17721772

1773+
@pytest.mark.allow_call_model_methods
1774+
@pytest.mark.asyncio
1775+
async def test_stream_response_rejects_response_error_terminal_event(monkeypatch):
1776+
model = OpenAIResponsesModel(model="gpt-4", openai_client=object()) # type: ignore[arg-type]
1777+
1778+
async def dummy_fetch_response(
1779+
system_instructions,
1780+
input,
1781+
model_settings,
1782+
tools,
1783+
output_schema,
1784+
handoffs,
1785+
previous_response_id,
1786+
conversation_id,
1787+
stream,
1788+
prompt,
1789+
):
1790+
class DummyStream:
1791+
async def __aiter__(self):
1792+
yield ResponseErrorEvent(
1793+
type="error",
1794+
code="invalid_request_error",
1795+
message="bad request",
1796+
param=None,
1797+
sequence_number=0,
1798+
)
1799+
1800+
return DummyStream()
1801+
1802+
monkeypatch.setattr(model, "_fetch_response", dummy_fetch_response)
1803+
1804+
events = []
1805+
with pytest.raises(ModelBehaviorError, match="invalid_request_error"):
1806+
async for event in model.stream_response(
1807+
system_instructions=None,
1808+
input="hi",
1809+
model_settings=ModelSettings(),
1810+
tools=[],
1811+
output_schema=None,
1812+
handoffs=[],
1813+
tracing=ModelTracing.DISABLED,
1814+
):
1815+
events.append(event)
1816+
1817+
assert len(events) == 1
1818+
assert events[0].type == "error"
1819+
assert events[0].code == "invalid_request_error"
1820+
assert events[0].message == "bad request"
1821+
1822+
17731823
@pytest.mark.allow_call_model_methods
17741824
@pytest.mark.asyncio
17751825
async def test_websocket_model_get_response_surfaces_response_error_event(monkeypatch):

tests/test_agent_runner_streamed.py

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from openai import APIConnectionError, BadRequestError
1010
from openai.types.responses import (
1111
ResponseCompletedEvent,
12+
ResponseErrorEvent,
1213
ResponseFailedEvent,
1314
ResponseFunctionToolCall,
1415
ResponseIncompleteEvent,
@@ -42,7 +43,7 @@
4243
from agents.run import RunConfig
4344
from agents.run_internal import run_loop
4445
from agents.run_internal.run_loop import QueueCompleteSentinel
45-
from agents.stream_events import AgentUpdatedStreamEvent, StreamEvent
46+
from agents.stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent, StreamEvent
4647
from agents.usage import Usage
4748

4849
from .fake_model import FakeModel, get_response_obj
@@ -188,10 +189,71 @@ async def stream_response(
188189
agent = Agent(name="test", model=model)
189190

190191
result = Runner.run_streamed(agent, input="test")
192+
stream_events: list[StreamEvent] = []
191193
with pytest.raises(ModelBehaviorError, match=terminal_event_type):
192-
async for _ in result.stream_events():
193-
pass
194+
async for event in result.stream_events():
195+
stream_events.append(event)
196+
197+
assert len(stream_events) == 2
198+
assert isinstance(stream_events[0], AgentUpdatedStreamEvent)
199+
assert isinstance(stream_events[1], RawResponsesStreamEvent)
200+
assert stream_events[1].data.type == terminal_event_type
201+
assert result.final_output is None
202+
assert result.raw_responses == []
203+
204+
205+
@pytest.mark.asyncio
206+
async def test_streamed_run_rejects_response_error_terminal_event() -> None:
207+
class TerminalErrorFakeModel(FakeModel):
208+
async def stream_response(
209+
self,
210+
system_instructions,
211+
input,
212+
model_settings,
213+
tools,
214+
output_schema,
215+
handoffs,
216+
tracing,
217+
*,
218+
previous_response_id=None,
219+
conversation_id=None,
220+
prompt=None,
221+
):
222+
self.last_turn_args = {
223+
"system_instructions": system_instructions,
224+
"input": input,
225+
"model_settings": model_settings,
226+
"tools": tools,
227+
"output_schema": output_schema,
228+
"previous_response_id": previous_response_id,
229+
"conversation_id": conversation_id,
230+
}
231+
if self.first_turn_args is None:
232+
self.first_turn_args = self.last_turn_args.copy()
233+
234+
yield ResponseErrorEvent(
235+
type="error",
236+
code="invalid_request_error",
237+
message="bad request",
238+
param=None,
239+
sequence_number=0,
240+
)
241+
242+
model = TerminalErrorFakeModel()
243+
agent = Agent(name="test", model=model)
194244

245+
result = Runner.run_streamed(agent, input="test")
246+
stream_events: list[StreamEvent] = []
247+
with pytest.raises(ModelBehaviorError, match="error"):
248+
async for event in result.stream_events():
249+
stream_events.append(event)
250+
251+
assert len(stream_events) == 2
252+
assert isinstance(stream_events[0], AgentUpdatedStreamEvent)
253+
assert isinstance(stream_events[1], RawResponsesStreamEvent)
254+
assert stream_events[1].data.type == "error"
255+
assert stream_events[1].data.code == "invalid_request_error"
256+
assert stream_events[1].data.message == "bad request"
195257
assert result.final_output is None
196258
assert result.raw_responses == []
197259

@@ -373,10 +435,15 @@ async def fake_open(
373435

374436
agent = Agent(name="test", model=model)
375437
result = Runner.run_streamed(agent, input="test")
438+
stream_events: list[StreamEvent] = []
376439
with pytest.raises(ModelBehaviorError, match=terminal_event_type):
377-
async for _ in result.stream_events():
378-
pass
440+
async for event in result.stream_events():
441+
stream_events.append(event)
379442

443+
assert len(stream_events) == 2
444+
assert isinstance(stream_events[0], AgentUpdatedStreamEvent)
445+
assert isinstance(stream_events[1], RawResponsesStreamEvent)
446+
assert stream_events[1].data.type == terminal_event_type
380447
assert result.final_output is None
381448
assert result.raw_responses == []
382449

0 commit comments

Comments
 (0)