Skip to content

Commit 38e62e3

Browse files
committed
fix: preserve responses terminal stream events
1 parent 54f853f commit 38e62e3

9 files changed

Lines changed: 309 additions & 29 deletions

File tree

src/agents/exceptions.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@
1616

1717
from .util._pretty_print import pretty_print_run_error_details
1818

19+
_DRAIN_STREAM_EVENTS_ATTR = "_agents_drain_queued_stream_events"
20+
21+
22+
def _mark_error_to_drain_stream_events(error: Exception) -> None:
23+
setattr(error, _DRAIN_STREAM_EVENTS_ATTR, True)
24+
25+
26+
def _should_drain_stream_events_before_raising(error: Exception) -> bool:
27+
return bool(getattr(error, _DRAIN_STREAM_EVENTS_ATTR, False))
28+
1929

2030
@dataclass
2131
class RunErrorDetails:

src/agents/extensions/models/any_llm_model.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
from ...logger import logger
3030
from ...model_settings import ModelSettings
3131
from ...models._openai_retry import get_openai_retry_advice
32+
from ...models._response_terminal import (
33+
response_error_event_failure_error,
34+
response_terminal_failure_error,
35+
)
3236
from ...models._retry_runtime import should_disable_provider_managed_retries
3337
from ...models.chatcmpl_converter import Converter
3438
from ...models.chatcmpl_helpers import HEADERS, HEADERS_OVERRIDE, ChatCmplHelpers
@@ -421,18 +425,30 @@ async def _stream_response_via_responses(
421425
)
422426

423427
final_response: Response | None = None
428+
terminal_failure_error: ModelBehaviorError | None = None
424429
try:
425430
async for chunk in stream:
431+
chunk_type = getattr(chunk, "type", None)
426432
if isinstance(chunk, ResponseCompletedEvent):
427433
final_response = chunk.response
428-
elif getattr(chunk, "type", None) in {"response.failed", "response.incomplete"}:
434+
elif chunk_type in {"response.failed", "response.incomplete"}:
429435
terminal_response = getattr(chunk, "response", None)
430-
if isinstance(terminal_response, Response):
431-
final_response = terminal_response
436+
terminal_failure_error = response_terminal_failure_error(
437+
cast(str, chunk_type),
438+
terminal_response if isinstance(terminal_response, Response) else None,
439+
)
440+
elif chunk_type in {"error", "response.error"}:
441+
terminal_failure_error = response_error_event_failure_error(
442+
cast(str, chunk_type),
443+
chunk,
444+
)
432445
yield chunk
433446
finally:
434447
await self._maybe_aclose(stream)
435448

449+
if terminal_failure_error is not None:
450+
raise terminal_failure_error
451+
436452
if tracing.include_data() and final_response:
437453
span_response.span_data.response = final_response
438454
span_response.span_data.input = input

src/agents/models/_response_terminal.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
from __future__ import annotations
22

3+
from typing import Any
4+
35
from openai.types.responses import Response
46

7+
from ..exceptions import ModelBehaviorError, _mark_error_to_drain_stream_events
8+
59

610
def format_response_terminal_failure(
711
event_type: str,
@@ -25,3 +29,36 @@ def format_response_terminal_failure(
2529
if details:
2630
message = f"{message} {'; '.join(details)}."
2731
return message
32+
33+
34+
def format_response_error_event(event_type: str, event: Any) -> str:
35+
message = f"Responses stream ended with terminal event `{event_type}`."
36+
details: list[str] = []
37+
code = getattr(event, "code", None)
38+
if code:
39+
details.append(f"code={code}")
40+
error_message = getattr(event, "message", None)
41+
if error_message:
42+
details.append(f"message={error_message}")
43+
param = getattr(event, "param", None)
44+
if param:
45+
details.append(f"param={param}")
46+
47+
if details:
48+
message = f"{message} {'; '.join(details)}."
49+
return message
50+
51+
52+
def response_terminal_failure_error(
53+
event_type: str,
54+
response: Response | None,
55+
) -> ModelBehaviorError:
56+
error = ModelBehaviorError(format_response_terminal_failure(event_type, response))
57+
_mark_error_to_drain_stream_events(error)
58+
return error
59+
60+
61+
def response_error_event_failure_error(event_type: str, event: Any) -> ModelBehaviorError:
62+
error = ModelBehaviorError(format_response_error_event(event_type, event))
63+
_mark_error_to_drain_stream_events(error)
64+
return error

src/agents/models/openai_responses.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
from ..util._json import _to_dump_compatible
6969
from ..version import __version__
7070
from ._openai_retry import get_openai_retry_advice
71-
from ._response_terminal import format_response_terminal_failure
71+
from ._response_terminal import response_error_event_failure_error, response_terminal_failure_error
7272
from ._retry_runtime import (
7373
should_disable_provider_managed_retries,
7474
should_disable_websocket_pre_event_retries,
@@ -569,18 +569,22 @@ async def stream_response(
569569
"response.incomplete",
570570
}:
571571
terminal_response = getattr(chunk, "response", None)
572-
terminal_failure_error = ModelBehaviorError(
573-
format_response_terminal_failure(
574-
cast(str, chunk_type),
575-
terminal_response
576-
if isinstance(terminal_response, Response)
577-
else None,
578-
)
572+
terminal_failure_error = response_terminal_failure_error(
573+
cast(str, chunk_type),
574+
terminal_response
575+
if isinstance(terminal_response, Response)
576+
else None,
577+
)
578+
elif chunk_type in {"error", "response.error"}:
579+
terminal_failure_error = response_error_event_failure_error(
580+
cast(str, chunk_type),
581+
chunk,
579582
)
580583
if chunk_type in {
581584
"response.completed",
582585
"response.failed",
583586
"response.incomplete",
587+
"error",
584588
"response.error",
585589
}:
586590
yielded_terminal_event = True
@@ -1099,11 +1103,9 @@ async def _fetch_response(
10991103
elif event_type in {"response.incomplete", "response.failed"}:
11001104
terminal_event_type = cast(str, event_type)
11011105
terminal_response = getattr(event, "response", None)
1102-
raise ModelBehaviorError(
1103-
format_response_terminal_failure(
1104-
terminal_event_type,
1105-
terminal_response if isinstance(terminal_response, Response) else None,
1106-
)
1106+
raise response_terminal_failure_error(
1107+
terminal_event_type,
1108+
terminal_response if isinstance(terminal_response, Response) else None,
11071109
)
11081110

11091111
if final_response is None:

src/agents/result.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
InputGuardrailTripwireTriggered,
1919
MaxTurnsExceeded,
2020
RunErrorDetails,
21+
_should_drain_stream_events_before_raising,
2122
)
2223
from .guardrail import InputGuardrailResult, OutputGuardrailResult
2324
from .items import (
@@ -705,7 +706,12 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
705706
try:
706707
while True:
707708
self._check_errors()
708-
should_drain_queued_events = isinstance(self._stored_exception, MaxTurnsExceeded)
709+
should_drain_queued_events = isinstance(
710+
self._stored_exception, MaxTurnsExceeded
711+
) or (
712+
self._stored_exception is not None
713+
and _should_drain_stream_events_before_raising(self._stored_exception)
714+
)
709715
if self._stored_exception and (
710716
not should_drain_queued_events or self._event_queue.empty()
711717
):

src/agents/run_internal/run_loop.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@
5858
from ..lifecycle import RunHooks
5959
from ..logger import logger
6060
from ..memory import Session
61-
from ..models._response_terminal import format_response_terminal_failure
61+
from ..models._response_terminal import (
62+
response_error_event_failure_error,
63+
response_terminal_failure_error,
64+
)
6265
from ..result import RunResultStreaming
6366
from ..run_config import ReasoningItemIdPolicy, RunConfig
6467
from ..run_context import AgentHookContext, RunContextWrapper, TContext
@@ -1487,12 +1490,12 @@ async def rewind_model_request() -> None:
14871490
elif getattr(event, "type", None) in {"response.incomplete", "response.failed"}:
14881491
event_type = cast(str, event.type)
14891492
maybe_response = getattr(event, "response", None)
1490-
raise ModelBehaviorError(
1491-
format_response_terminal_failure(
1492-
event_type,
1493-
maybe_response if isinstance(maybe_response, Response) else None,
1494-
)
1493+
raise response_terminal_failure_error(
1494+
event_type,
1495+
maybe_response if isinstance(maybe_response, Response) else None,
14951496
)
1497+
elif getattr(event, "type", None) in {"error", "response.error"}:
1498+
raise response_error_event_failure_error(cast(str, event.type), event)
14961499

14971500
if terminal_response is not None:
14981501
if is_completed_event and not terminal_response.output and streamed_response_output:

tests/models/test_any_llm_model.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
from openai.types.chat.chat_completion_chunk import ChoiceDelta
1818
from openai.types.completion_usage import CompletionUsage, PromptTokensDetails
1919
from openai.types.responses import Response, ResponseCompletedEvent, ResponseOutputMessage
20+
from openai.types.responses.response_error_event import ResponseErrorEvent
21+
from openai.types.responses.response_failed_event import ResponseFailedEvent
22+
from openai.types.responses.response_incomplete_event import ResponseIncompleteEvent
2023
from openai.types.responses.response_output_text import ResponseOutputText
2124
from openai.types.responses.response_usage import (
2225
InputTokensDetails,
@@ -28,6 +31,7 @@
2831
from agents import (
2932
Agent,
3033
Handoff,
34+
ModelBehaviorError,
3135
ModelSettings,
3236
ModelTracing,
3337
Tool,
@@ -534,6 +538,91 @@ async def response_stream() -> AsyncIterator[ResponseCompletedEvent]:
534538
assert provider.private_responses_calls[0]["params"].conversation == "conv_123"
535539

536540

541+
@pytest.mark.allow_call_model_methods
542+
@pytest.mark.asyncio
543+
@pytest.mark.parametrize(
544+
("terminal_event_type", "terminal_event_cls"),
545+
[
546+
("response.incomplete", ResponseIncompleteEvent),
547+
("response.failed", ResponseFailedEvent),
548+
],
549+
)
550+
async def test_any_llm_responses_stream_rejects_failed_terminal_events(
551+
monkeypatch,
552+
terminal_event_type: str,
553+
terminal_event_cls: type[Any],
554+
) -> None:
555+
async def response_stream() -> AsyncIterator[Any]:
556+
yield terminal_event_cls(
557+
type=terminal_event_type,
558+
response=_response("partial", response_id="resp-terminal"),
559+
sequence_number=1,
560+
)
561+
562+
provider = FakeAnyLLMProvider(supports_responses=True, responses_response=response_stream())
563+
module, _create_calls = _import_any_llm_module(monkeypatch, provider)
564+
AnyLLMModel = module.AnyLLMModel
565+
566+
model = AnyLLMModel(model="openai/gpt-5.4-mini")
567+
events = []
568+
with pytest.raises(ModelBehaviorError, match=terminal_event_type):
569+
async for event in model.stream_response(
570+
system_instructions=None,
571+
input="hi",
572+
model_settings=ModelSettings(),
573+
tools=[],
574+
output_schema=None,
575+
handoffs=[],
576+
tracing=ModelTracing.DISABLED,
577+
previous_response_id=None,
578+
conversation_id=None,
579+
prompt=None,
580+
):
581+
events.append(event)
582+
583+
assert len(events) == 1
584+
assert events[0].type == terminal_event_type
585+
assert events[0].response.id == "resp-terminal"
586+
587+
588+
@pytest.mark.allow_call_model_methods
589+
@pytest.mark.asyncio
590+
async def test_any_llm_responses_stream_rejects_error_event(monkeypatch) -> None:
591+
async def response_stream() -> AsyncIterator[ResponseErrorEvent]:
592+
yield ResponseErrorEvent(
593+
type="error",
594+
code="invalid_request_error",
595+
message="bad request",
596+
param=None,
597+
sequence_number=1,
598+
)
599+
600+
provider = FakeAnyLLMProvider(supports_responses=True, responses_response=response_stream())
601+
module, _create_calls = _import_any_llm_module(monkeypatch, provider)
602+
AnyLLMModel = module.AnyLLMModel
603+
604+
model = AnyLLMModel(model="openai/gpt-5.4-mini")
605+
events = []
606+
with pytest.raises(ModelBehaviorError, match="invalid_request_error"):
607+
async for event in model.stream_response(
608+
system_instructions=None,
609+
input="hi",
610+
model_settings=ModelSettings(),
611+
tools=[],
612+
output_schema=None,
613+
handoffs=[],
614+
tracing=ModelTracing.DISABLED,
615+
previous_response_id=None,
616+
conversation_id=None,
617+
prompt=None,
618+
):
619+
events.append(event)
620+
621+
assert len(events) == 1
622+
assert events[0].type == "error"
623+
assert events[0].code == "invalid_request_error"
624+
625+
537626
@pytest.mark.allow_call_model_methods
538627
@pytest.mark.asyncio
539628
async def test_any_llm_responses_path_passes_transport_kwargs_via_private_provider_api(

tests/models/test_openai_responses.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import httpx
99
import pytest
1010
from openai import NOT_GIVEN, APIConnectionError, RateLimitError, omit
11-
from openai.types.responses import ResponseCompletedEvent
11+
from openai.types.responses import ResponseCompletedEvent, ResponseErrorEvent
1212
from openai.types.shared.reasoning import Reasoning
1313

1414
from agents import (
@@ -1770,6 +1770,56 @@ async def fake_open(
17701770
assert cast(Any, events[0]).response.id == "resp-terminal"
17711771

17721772

1773+
@pytest.mark.allow_call_model_methods
1774+
@pytest.mark.asyncio
1775+
async def test_stream_response_rejects_response_error_terminal_event(monkeypatch):
1776+
model = OpenAIResponsesModel(model="gpt-4", openai_client=object()) # type: ignore[arg-type]
1777+
1778+
async def dummy_fetch_response(
1779+
system_instructions,
1780+
input,
1781+
model_settings,
1782+
tools,
1783+
output_schema,
1784+
handoffs,
1785+
previous_response_id,
1786+
conversation_id,
1787+
stream,
1788+
prompt,
1789+
):
1790+
class DummyStream:
1791+
async def __aiter__(self):
1792+
yield ResponseErrorEvent(
1793+
type="error",
1794+
code="invalid_request_error",
1795+
message="bad request",
1796+
param=None,
1797+
sequence_number=0,
1798+
)
1799+
1800+
return DummyStream()
1801+
1802+
monkeypatch.setattr(model, "_fetch_response", dummy_fetch_response)
1803+
1804+
events = []
1805+
with pytest.raises(ModelBehaviorError, match="invalid_request_error"):
1806+
async for event in model.stream_response(
1807+
system_instructions=None,
1808+
input="hi",
1809+
model_settings=ModelSettings(),
1810+
tools=[],
1811+
output_schema=None,
1812+
handoffs=[],
1813+
tracing=ModelTracing.DISABLED,
1814+
):
1815+
events.append(event)
1816+
1817+
assert len(events) == 1
1818+
assert events[0].type == "error"
1819+
assert events[0].code == "invalid_request_error"
1820+
assert events[0].message == "bad request"
1821+
1822+
17731823
@pytest.mark.allow_call_model_methods
17741824
@pytest.mark.asyncio
17751825
async def test_websocket_model_get_response_surfaces_response_error_event(monkeypatch):

0 commit comments

Comments
 (0)