Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/agents/memory/sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(
self.messages_table = messages_table
self._local = threading.local()
self._lock = threading.Lock()
self._file_lock = threading.Lock()

# For in-memory databases, we need a shared connection to avoid thread isolation
# For file databases, we use thread-local connections for better concurrency
Expand Down Expand Up @@ -128,7 +129,7 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:

def _get_items_sync():
conn = self._get_connection()
with self._lock if self._is_memory_db else threading.Lock():
with self._lock if self._is_memory_db else self._file_lock:
if session_limit is None:
# Fetch all items in chronological order
cursor = conn.execute(
Expand Down Expand Up @@ -182,7 +183,7 @@ async def add_items(self, items: list[TResponseInputItem]) -> None:
def _add_items_sync():
conn = self._get_connection()

with self._lock if self._is_memory_db else threading.Lock():
with self._lock if self._is_memory_db else self._file_lock:
# Ensure session exists
conn.execute(
f"""
Expand Down Expand Up @@ -223,7 +224,7 @@ async def pop_item(self) -> TResponseInputItem | None:

def _pop_item_sync():
conn = self._get_connection()
with self._lock if self._is_memory_db else threading.Lock():
with self._lock if self._is_memory_db else self._file_lock:
# Use DELETE with RETURNING to atomically delete and return the most recent item
cursor = conn.execute(
f"""
Expand Down Expand Up @@ -260,7 +261,7 @@ async def clear_session(self) -> None:

def _clear_session_sync():
conn = self._get_connection()
with self._lock if self._is_memory_db else threading.Lock():
with self._lock if self._is_memory_db else self._file_lock:
conn.execute(
f"DELETE FROM {self.messages_table} WHERE session_id = ?",
(self.session_id,),
Expand Down
6 changes: 6 additions & 0 deletions src/agents/run_internal/run_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
InputGuardrailTripwireTriggered,
MaxTurnsExceeded,
ModelBehaviorError,
OutputGuardrailTripwireTriggered,
RunErrorDetails,
UserError,
)
Expand Down Expand Up @@ -344,7 +345,12 @@ async def _run_output_guardrails_for_stream(

try:
return cast(list[Any], await streamed_result._output_guardrails_task)
except OutputGuardrailTripwireTriggered:
raise
except asyncio.CancelledError:
raise
except Exception:
logger.error("Unexpected error in output guardrails", exc_info=True)
return []


Expand Down