From 86501787e7a24965d7a03febd77090abe9461220 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Sun, 22 Mar 2026 15:53:20 +0800 Subject: [PATCH] fix: re-raise output guardrail exceptions in streaming path - Fix OutputGuardrailTripwireTriggered silently swallowed by bare except Exception in run_loop.py streaming path - Re-raise asyncio.CancelledError for Python 3.9/3.10 compatibility - Fix SQLiteSession race condition: replace per-call threading.Lock() with shared self._file_lock for file-based databases --- src/agents/memory/sqlite_session.py | 9 +++++---- src/agents/run_internal/run_loop.py | 6 ++++++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/agents/memory/sqlite_session.py b/src/agents/memory/sqlite_session.py index 92c9630c9b..18186ff4bc 100644 --- a/src/agents/memory/sqlite_session.py +++ b/src/agents/memory/sqlite_session.py @@ -47,6 +47,7 @@ def __init__( self.messages_table = messages_table self._local = threading.local() self._lock = threading.Lock() + self._file_lock = threading.Lock() # For in-memory databases, we need a shared connection to avoid thread isolation # For file databases, we use thread-local connections for better concurrency @@ -128,7 +129,7 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: def _get_items_sync(): conn = self._get_connection() - with self._lock if self._is_memory_db else threading.Lock(): + with self._lock if self._is_memory_db else self._file_lock: if session_limit is None: # Fetch all items in chronological order cursor = conn.execute( @@ -182,7 +183,7 @@ async def add_items(self, items: list[TResponseInputItem]) -> None: def _add_items_sync(): conn = self._get_connection() - with self._lock if self._is_memory_db else threading.Lock(): + with self._lock if self._is_memory_db else self._file_lock: # Ensure session exists conn.execute( f""" @@ -223,7 +224,7 @@ async def pop_item(self) -> TResponseInputItem | None: def _pop_item_sync(): conn = self._get_connection() - with self._lock if self._is_memory_db else threading.Lock(): + with self._lock if self._is_memory_db else self._file_lock: # Use DELETE with RETURNING to atomically delete and return the most recent item cursor = conn.execute( f""" @@ -260,7 +261,7 @@ async def clear_session(self) -> None: def _clear_session_sync(): conn = self._get_connection() - with self._lock if self._is_memory_db else threading.Lock(): + with self._lock if self._is_memory_db else self._file_lock: conn.execute( f"DELETE FROM {self.messages_table} WHERE session_id = ?", (self.session_id,), diff --git a/src/agents/run_internal/run_loop.py b/src/agents/run_internal/run_loop.py index 3d21d89fda..86c36dada2 100644 --- a/src/agents/run_internal/run_loop.py +++ b/src/agents/run_internal/run_loop.py @@ -30,6 +30,7 @@ InputGuardrailTripwireTriggered, MaxTurnsExceeded, ModelBehaviorError, + OutputGuardrailTripwireTriggered, RunErrorDetails, UserError, ) @@ -344,7 +345,12 @@ async def _run_output_guardrails_for_stream( try: return cast(list[Any], await streamed_result._output_guardrails_task) + except OutputGuardrailTripwireTriggered: + raise + except asyncio.CancelledError: + raise except Exception: + logger.error("Unexpected error in output guardrails", exc_info=True) return []