Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/agents/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@

from .util._pretty_print import pretty_print_run_error_details

_DRAIN_STREAM_EVENTS_ATTR = "_agents_drain_queued_stream_events"


def _mark_error_to_drain_stream_events(error: Exception) -> None:
setattr(error, _DRAIN_STREAM_EVENTS_ATTR, True)


def _should_drain_stream_events_before_raising(error: Exception) -> bool:
return bool(getattr(error, _DRAIN_STREAM_EVENTS_ATTR, False))


@dataclass
class RunErrorDetails:
Expand Down
22 changes: 19 additions & 3 deletions src/agents/extensions/models/any_llm_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
from ...logger import logger
from ...model_settings import ModelSettings
from ...models._openai_retry import get_openai_retry_advice
from ...models._response_terminal import (
response_error_event_failure_error,
response_terminal_failure_error,
)
from ...models._retry_runtime import should_disable_provider_managed_retries
from ...models.chatcmpl_converter import Converter
from ...models.chatcmpl_helpers import HEADERS, HEADERS_OVERRIDE, ChatCmplHelpers
Expand Down Expand Up @@ -421,18 +425,30 @@ async def _stream_response_via_responses(
)

final_response: Response | None = None
terminal_failure_error: ModelBehaviorError | None = None
try:
async for chunk in stream:
chunk_type = getattr(chunk, "type", None)
if isinstance(chunk, ResponseCompletedEvent):
final_response = chunk.response
elif getattr(chunk, "type", None) in {"response.failed", "response.incomplete"}:
elif chunk_type in {"response.failed", "response.incomplete"}:
terminal_response = getattr(chunk, "response", None)
if isinstance(terminal_response, Response):
final_response = terminal_response
terminal_failure_error = response_terminal_failure_error(
cast(str, chunk_type),
terminal_response if isinstance(terminal_response, Response) else None,
)
elif chunk_type in {"error", "response.error"}:
terminal_failure_error = response_error_event_failure_error(
cast(str, chunk_type),
chunk,
)
yield chunk
finally:
await self._maybe_aclose(stream)

if terminal_failure_error is not None:
raise terminal_failure_error

if tracing.include_data() and final_response:
span_response.span_data.response = final_response
span_response.span_data.input = input
Expand Down
64 changes: 64 additions & 0 deletions src/agents/models/_response_terminal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

from typing import Any

from openai.types.responses import Response

from ..exceptions import ModelBehaviorError, _mark_error_to_drain_stream_events


def format_response_terminal_failure(
event_type: str,
response: Response | None,
) -> str:
message = f"Responses stream ended with terminal event `{event_type}`."
if response is None:
return message

details: list[str] = []
status = getattr(response, "status", None)
if status:
details.append(f"status={status}")
error = getattr(response, "error", None)
if error:
details.append(f"error={error}")
incomplete_details = getattr(response, "incomplete_details", None)
if incomplete_details:
details.append(f"incomplete_details={incomplete_details}")

if details:
message = f"{message} {'; '.join(details)}."
return message


def format_response_error_event(event_type: str, event: Any) -> str:
message = f"Responses stream ended with terminal event `{event_type}`."
details: list[str] = []
code = getattr(event, "code", None)
if code:
details.append(f"code={code}")
error_message = getattr(event, "message", None)
if error_message:
details.append(f"message={error_message}")
param = getattr(event, "param", None)
if param:
details.append(f"param={param}")

if details:
message = f"{message} {'; '.join(details)}."
return message


def response_terminal_failure_error(
event_type: str,
response: Response | None,
) -> ModelBehaviorError:
error = ModelBehaviorError(format_response_terminal_failure(event_type, response))
_mark_error_to_drain_stream_events(error)
return error


def response_error_event_failure_error(event_type: str, event: Any) -> ModelBehaviorError:
error = ModelBehaviorError(format_response_error_event(event_type, event))
_mark_error_to_drain_stream_events(error)
return error
26 changes: 21 additions & 5 deletions src/agents/models/openai_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
)
from ..agent_output import AgentOutputSchemaBase
from ..computer import AsyncComputer, Computer
from ..exceptions import UserError
from ..exceptions import ModelBehaviorError, UserError
from ..handoffs import Handoff
from ..items import ItemHelpers, ModelResponse, TResponseInputItem
from ..logger import logger
Expand Down Expand Up @@ -68,6 +68,7 @@
from ..util._json import _to_dump_compatible
from ..version import __version__
from ._openai_retry import get_openai_retry_advice
from ._response_terminal import response_error_event_failure_error, response_terminal_failure_error
from ._retry_runtime import (
should_disable_provider_managed_retries,
should_disable_websocket_pre_event_retries,
Expand Down Expand Up @@ -555,6 +556,7 @@ async def stream_response(
)

final_response: Response | None = None
terminal_failure_error: ModelBehaviorError | None = None
yielded_terminal_event = False
close_stream_in_background = False
try:
Expand All @@ -567,12 +569,22 @@ async def stream_response(
"response.incomplete",
}:
terminal_response = getattr(chunk, "response", None)
if isinstance(terminal_response, Response):
final_response = terminal_response
terminal_failure_error = response_terminal_failure_error(
cast(str, chunk_type),
terminal_response
if isinstance(terminal_response, Response)
else None,
)
elif chunk_type in {"error", "response.error"}:
terminal_failure_error = response_error_event_failure_error(
cast(str, chunk_type),
chunk,
)
if chunk_type in {
"response.completed",
"response.failed",
"response.incomplete",
"error",
"response.error",
}:
yielded_terminal_event = True
Expand All @@ -592,6 +604,8 @@ async def stream_response(
)
else:
raise
if terminal_failure_error is not None:
raise terminal_failure_error

if final_response and tracing.include_data():
span_response.span_data.response = final_response
Expand Down Expand Up @@ -1089,8 +1103,10 @@ async def _fetch_response(
elif event_type in {"response.incomplete", "response.failed"}:
terminal_event_type = cast(str, event_type)
terminal_response = getattr(event, "response", None)
if isinstance(terminal_response, Response):
final_response = terminal_response
raise response_terminal_failure_error(
terminal_event_type,
terminal_response if isinstance(terminal_response, Response) else None,
)

if final_response is None:
terminal_event_hint = (
Expand Down
8 changes: 7 additions & 1 deletion src/agents/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
InputGuardrailTripwireTriggered,
MaxTurnsExceeded,
RunErrorDetails,
_should_drain_stream_events_before_raising,
)
from .guardrail import InputGuardrailResult, OutputGuardrailResult
from .items import (
Expand Down Expand Up @@ -705,7 +706,12 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
try:
while True:
self._check_errors()
should_drain_queued_events = isinstance(self._stored_exception, MaxTurnsExceeded)
should_drain_queued_events = isinstance(
self._stored_exception, MaxTurnsExceeded
) or (
self._stored_exception is not None
and _should_drain_stream_events_before_raising(self._stored_exception)
)
if self._stored_exception and (
not should_drain_queued_events or self._event_queue.empty()
):
Expand Down
13 changes: 11 additions & 2 deletions src/agents/run_internal/run_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
from ..lifecycle import RunHooks
from ..logger import logger
from ..memory import Session
from ..models._response_terminal import (
response_error_event_failure_error,
response_terminal_failure_error,
)
from ..result import RunResultStreaming
from ..run_config import ReasoningItemIdPolicy, RunConfig
from ..run_context import AgentHookContext, RunContextWrapper, TContext
Expand Down Expand Up @@ -1484,9 +1488,14 @@ async def rewind_model_request() -> None:
is_completed_event = True
terminal_response = event.response
elif getattr(event, "type", None) in {"response.incomplete", "response.failed"}:
event_type = cast(str, event.type)
maybe_response = getattr(event, "response", None)
if isinstance(maybe_response, Response):
terminal_response = maybe_response
raise response_terminal_failure_error(
event_type,
maybe_response if isinstance(maybe_response, Response) else None,
)
elif getattr(event, "type", None) in {"error", "response.error"}:
raise response_error_event_failure_error(cast(str, event.type), event)

if terminal_response is not None:
if is_completed_event and not terminal_response.output and streamed_response_output:
Expand Down
89 changes: 89 additions & 0 deletions tests/models/test_any_llm_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from openai.types.chat.chat_completion_chunk import ChoiceDelta
from openai.types.completion_usage import CompletionUsage, PromptTokensDetails
from openai.types.responses import Response, ResponseCompletedEvent, ResponseOutputMessage
from openai.types.responses.response_error_event import ResponseErrorEvent
from openai.types.responses.response_failed_event import ResponseFailedEvent
from openai.types.responses.response_incomplete_event import ResponseIncompleteEvent
from openai.types.responses.response_output_text import ResponseOutputText
from openai.types.responses.response_usage import (
InputTokensDetails,
Expand All @@ -28,6 +31,7 @@
from agents import (
Agent,
Handoff,
ModelBehaviorError,
ModelSettings,
ModelTracing,
Tool,
Expand Down Expand Up @@ -534,6 +538,91 @@ async def response_stream() -> AsyncIterator[ResponseCompletedEvent]:
assert provider.private_responses_calls[0]["params"].conversation == "conv_123"


@pytest.mark.allow_call_model_methods
@pytest.mark.asyncio
@pytest.mark.parametrize(
("terminal_event_type", "terminal_event_cls"),
[
("response.incomplete", ResponseIncompleteEvent),
("response.failed", ResponseFailedEvent),
],
)
async def test_any_llm_responses_stream_rejects_failed_terminal_events(
monkeypatch,
terminal_event_type: str,
terminal_event_cls: type[Any],
) -> None:
async def response_stream() -> AsyncIterator[Any]:
yield terminal_event_cls(
type=terminal_event_type,
response=_response("partial", response_id="resp-terminal"),
sequence_number=1,
)

provider = FakeAnyLLMProvider(supports_responses=True, responses_response=response_stream())
module, _create_calls = _import_any_llm_module(monkeypatch, provider)
AnyLLMModel = module.AnyLLMModel

model = AnyLLMModel(model="openai/gpt-5.4-mini")
events = []
with pytest.raises(ModelBehaviorError, match=terminal_event_type):
async for event in model.stream_response(
system_instructions=None,
input="hi",
model_settings=ModelSettings(),
tools=[],
output_schema=None,
handoffs=[],
tracing=ModelTracing.DISABLED,
previous_response_id=None,
conversation_id=None,
prompt=None,
):
events.append(event)

assert len(events) == 1
assert events[0].type == terminal_event_type
assert events[0].response.id == "resp-terminal"


@pytest.mark.allow_call_model_methods
@pytest.mark.asyncio
async def test_any_llm_responses_stream_rejects_error_event(monkeypatch) -> None:
async def response_stream() -> AsyncIterator[ResponseErrorEvent]:
yield ResponseErrorEvent(
type="error",
code="invalid_request_error",
message="bad request",
param=None,
sequence_number=1,
)

provider = FakeAnyLLMProvider(supports_responses=True, responses_response=response_stream())
module, _create_calls = _import_any_llm_module(monkeypatch, provider)
AnyLLMModel = module.AnyLLMModel

model = AnyLLMModel(model="openai/gpt-5.4-mini")
events = []
with pytest.raises(ModelBehaviorError, match="invalid_request_error"):
async for event in model.stream_response(
system_instructions=None,
input="hi",
model_settings=ModelSettings(),
tools=[],
output_schema=None,
handoffs=[],
tracing=ModelTracing.DISABLED,
previous_response_id=None,
conversation_id=None,
prompt=None,
):
events.append(event)

assert len(events) == 1
assert events[0].type == "error"
assert events[0].code == "invalid_request_error"


@pytest.mark.allow_call_model_methods
@pytest.mark.asyncio
async def test_any_llm_responses_path_passes_transport_kwargs_via_private_provider_api(
Expand Down
Loading