Skip to content

Commit 8650178

Browse files
committed
fix: re-raise output guardrail exceptions in streaming path
- Fix OutputGuardrailTripwireTriggered silently swallowed by bare except Exception in run_loop.py streaming path - Re-raise asyncio.CancelledError for Python 3.9/3.10 compatibility - Fix SQLiteSession race condition: replace per-call threading.Lock() with shared self._file_lock for file-based databases
1 parent 14090a2 commit 8650178

2 files changed

Lines changed: 11 additions & 4 deletions

File tree

src/agents/memory/sqlite_session.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__(
4747
self.messages_table = messages_table
4848
self._local = threading.local()
4949
self._lock = threading.Lock()
50+
self._file_lock = threading.Lock()
5051

5152
# For in-memory databases, we need a shared connection to avoid thread isolation
5253
# For file databases, we use thread-local connections for better concurrency
@@ -128,7 +129,7 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
128129

129130
def _get_items_sync():
130131
conn = self._get_connection()
131-
with self._lock if self._is_memory_db else threading.Lock():
132+
with self._lock if self._is_memory_db else self._file_lock:
132133
if session_limit is None:
133134
# Fetch all items in chronological order
134135
cursor = conn.execute(
@@ -182,7 +183,7 @@ async def add_items(self, items: list[TResponseInputItem]) -> None:
182183
def _add_items_sync():
183184
conn = self._get_connection()
184185

185-
with self._lock if self._is_memory_db else threading.Lock():
186+
with self._lock if self._is_memory_db else self._file_lock:
186187
# Ensure session exists
187188
conn.execute(
188189
f"""
@@ -223,7 +224,7 @@ async def pop_item(self) -> TResponseInputItem | None:
223224

224225
def _pop_item_sync():
225226
conn = self._get_connection()
226-
with self._lock if self._is_memory_db else threading.Lock():
227+
with self._lock if self._is_memory_db else self._file_lock:
227228
# Use DELETE with RETURNING to atomically delete and return the most recent item
228229
cursor = conn.execute(
229230
f"""
@@ -260,7 +261,7 @@ async def clear_session(self) -> None:
260261

261262
def _clear_session_sync():
262263
conn = self._get_connection()
263-
with self._lock if self._is_memory_db else threading.Lock():
264+
with self._lock if self._is_memory_db else self._file_lock:
264265
conn.execute(
265266
f"DELETE FROM {self.messages_table} WHERE session_id = ?",
266267
(self.session_id,),

src/agents/run_internal/run_loop.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
InputGuardrailTripwireTriggered,
3131
MaxTurnsExceeded,
3232
ModelBehaviorError,
33+
OutputGuardrailTripwireTriggered,
3334
RunErrorDetails,
3435
UserError,
3536
)
@@ -344,7 +345,12 @@ async def _run_output_guardrails_for_stream(
344345

345346
try:
346347
return cast(list[Any], await streamed_result._output_guardrails_task)
348+
except OutputGuardrailTripwireTriggered:
349+
raise
350+
except asyncio.CancelledError:
351+
raise
347352
except Exception:
353+
logger.error("Unexpected error in output guardrails", exc_info=True)
348354
return []
349355

350356

0 commit comments

Comments
 (0)