Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions src/agents/extensions/memory/advanced_sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,15 @@ async def get_items(
self,
limit: int | None = None,
branch_id: str | None = None,
offset: int = 0,
) -> list[TResponseInputItem]:
"""Get items from current or specified branch.

Args:
limit: Maximum number of items to return. If None, uses session_settings.limit.
branch_id: Branch to get items from. If None, uses current branch.
offset: Number of most-recent items to skip before applying the limit.
Defaults to 0. Use with limit to paginate backwards through history.

Returns:
List of conversation items from the specified branch.
Expand All @@ -180,7 +183,7 @@ def _get_all_items_sync():
"""Synchronous helper to get all items for a branch."""
with self._locked_connection() as conn:
with closing(conn.cursor()) as cursor:
if session_limit is None:
if session_limit is None and offset == 0:
cursor.execute(
f"""
SELECT m.message_data
Expand All @@ -192,20 +195,21 @@ def _get_all_items_sync():
(self.session_id, branch_id),
)
else:
sql_limit = session_limit if session_limit is not None else -1
cursor.execute(
f"""
SELECT m.message_data
FROM {self.messages_table} m
JOIN message_structure s ON m.id = s.message_id
WHERE m.session_id = ? AND s.branch_id = ?
ORDER BY s.sequence_number DESC
LIMIT ?
LIMIT ? OFFSET ?
""",
(self.session_id, branch_id, session_limit),
(self.session_id, branch_id, sql_limit, offset),
)

rows = cursor.fetchall()
if session_limit is not None:
if session_limit is not None or offset > 0:
rows = list(reversed(rows))

items = []
Expand All @@ -224,7 +228,7 @@ def _get_items_sync():
with self._locked_connection() as conn:
with closing(conn.cursor()) as cursor:
# Get message IDs in correct order for this branch
if session_limit is None:
if session_limit is None and offset == 0:
cursor.execute(
f"""
SELECT m.message_data
Expand All @@ -236,20 +240,21 @@ def _get_items_sync():
(self.session_id, branch_id),
)
else:
sql_limit = session_limit if session_limit is not None else -1
cursor.execute(
f"""
SELECT m.message_data
FROM {self.messages_table} m
JOIN message_structure s ON m.id = s.message_id
WHERE m.session_id = ? AND s.branch_id = ?
ORDER BY s.sequence_number DESC
LIMIT ?
LIMIT ? OFFSET ?
""",
(self.session_id, branch_id, session_limit),
(self.session_id, branch_id, sql_limit, offset),
)

rows = cursor.fetchall()
if session_limit is not None:
if session_limit is not None or offset > 0:
rows = list(reversed(rows))

items = []
Expand Down
16 changes: 11 additions & 5 deletions src/agents/extensions/memory/async_sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,23 @@ async def _locked_connection(self) -> AsyncIterator[aiosqlite.Connection]:
conn = await self._get_connection()
yield conn

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
offset: Number of most-recent items to skip before applying the limit.
Defaults to 0. Use with limit to paginate backwards through history.

Returns:
List of input items representing the conversation history
"""

async with self._locked_connection() as conn:
if limit is None:
if limit is None and offset == 0:
cursor = await conn.execute(
f"""
SELECT message_data FROM {self.messages_table}
Expand All @@ -124,20 +128,22 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
(self.session_id,),
)
else:
# LIMIT -1 means no limit in SQLite.
sql_limit = limit if limit is not None else -1
cursor = await conn.execute(
f"""
SELECT message_data FROM {self.messages_table}
WHERE session_id = ?
ORDER BY id DESC
LIMIT ?
LIMIT ? OFFSET ?
""",
(self.session_id, limit),
(self.session_id, sql_limit, offset),
)

rows = list(await cursor.fetchall())
await cursor.close()

if limit is not None:
if limit is not None or offset > 0:
rows = rows[::-1]

items: list[TResponseInputItem] = []
Expand Down
11 changes: 10 additions & 1 deletion src/agents/extensions/memory/dapr_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,16 @@ async def _handle_concurrency_conflict(self, error: Exception, attempt: int) ->
# Session protocol implementation
# ------------------------------------------------------------------

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
When specified, returns the latest N items in chronological order.
offset: Number of most-recent items to skip before applying the limit.
Defaults to 0. Use with limit to paginate backwards through history.

Returns:
List of input items representing the conversation history
Expand All @@ -255,6 +259,11 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
messages = self._decode_messages(response.data)
if not messages:
return []
# Apply offset from the newest end, then limit.
end = len(messages) - offset if offset > 0 else len(messages)
if end <= 0:
return []
messages = messages[:end]
if session_limit is not None:
if session_limit <= 0:
return []
Expand Down
6 changes: 4 additions & 2 deletions src/agents/extensions/memory/encrypt_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ def _unwrap(self, item: TResponseInputItem | EncryptedEnvelope) -> TResponseInpu
except (InvalidToken, KeyError):
return None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
encrypted_items = await self.underlying_session.get_items(limit)
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
encrypted_items = await self.underlying_session.get_items(limit, offset)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Call wrapped get_items with keyword offset

Forwarding limit and offset positionally here breaks wrapped sessions whose second positional parameter is not offset (notably AdvancedSQLiteSession, where it is branch_id) and can also raise TypeError for custom sessions that still accept only limit or make offset keyword-only. This means EncryptedSession.get_items() can silently fetch the wrong history (often empty) even with default arguments; forward using keywords to preserve compatibility.

Useful? React with 👍 / 👎.

valid_items: list[TResponseInputItem] = []
for enc in encrypted_items:
item = self._unwrap(enc)
Expand Down
22 changes: 16 additions & 6 deletions src/agents/extensions/memory/redis_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,38 @@ async def _set_ttl_if_configured(self, *keys: str) -> None:
# Session protocol implementation
# ------------------------------------------------------------------

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
When specified, returns the latest N items in chronological order.
offset: Number of most-recent items to skip before applying the limit.
Defaults to 0. Use with limit to paginate backwards through history.

Returns:
List of input items representing the conversation history
"""
session_limit = resolve_session_limit(limit, self.session_settings)

async with self._lock:
if session_limit is None:
if session_limit is None and offset == 0:
# Get all messages in chronological order
raw_messages = await self._redis.lrange(self._messages_key, 0, -1) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context
else:
if session_limit <= 0:
if session_limit is not None and session_limit <= 0:
return []
# Get the latest N messages (Redis list is ordered chronologically)
# Use negative indices to get from the end - Redis uses -N to -1 for last N items
raw_messages = await self._redis.lrange(self._messages_key, -session_limit, -1) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context
# Redis list is ordered chronologically (index 0 = oldest).
# Use negative indices to address from the newest end.
# -1 = newest, -(offset+1) = newest after skipping offset items.
end = -(offset + 1) if offset > 0 else -1
if session_limit is None:
start = 0
else:
start = -(session_limit + offset)
raw_messages = await self._redis.lrange(self._messages_key, start, end) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context

items: list[TResponseInputItem] = []
for raw_msg in raw_messages:
Expand Down
16 changes: 11 additions & 5 deletions src/agents/extensions/memory/sqlalchemy_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,16 @@ async def _ensure_tables(self) -> None:
finally:
self._init_lock.release()

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
When specified, returns the latest N items in chronological order.
offset: Number of most-recent items to skip before applying the limit.
Defaults to 0. Use with limit to paginate backwards through history.

Returns:
List of input items representing the conversation history
Expand All @@ -289,7 +293,7 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
session_limit = resolve_session_limit(limit, self.session_settings)

async with self._session_factory() as sess:
if session_limit is None:
if session_limit is None and offset == 0:
stmt = (
select(self._messages.c.message_data)
.where(self._messages.c.session_id == self.session_id)
Expand All @@ -302,19 +306,21 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
stmt = (
select(self._messages.c.message_data)
.where(self._messages.c.session_id == self.session_id)
# Use DESC + LIMIT to get the latest N
# Use DESC + LIMIT/OFFSET to get the latest N after skipping offset items,
# then reverse later for chronological order.
.order_by(
self._messages.c.created_at.desc(),
self._messages.c.id.desc(),
)
.limit(session_limit)
.offset(offset)
)
if session_limit is not None:
stmt = stmt.limit(session_limit)

result = await sess.execute(stmt)
rows: list[str] = [row[0] for row in result.all()]

if session_limit is not None:
if session_limit is not None or offset > 0:
rows.reverse()

items: list[TResponseInputItem] = []
Expand Down
15 changes: 11 additions & 4 deletions src/agents/memory/openai_conversations_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,36 @@ async def _get_session_id(self) -> str:
async def _clear_session_id(self) -> None:
self._session_id = None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
session_id = await self._get_session_id()

session_limit = resolve_session_limit(limit, self.session_settings)

all_items = []
if session_limit is None:
if session_limit is None and offset == 0:
async for item in self._openai_client.conversations.items.list(
conversation_id=session_id,
order="asc",
):
# calling model_dump() to make this serializable
all_items.append(item.model_dump(exclude_unset=True))
else:
# Fetch limit+offset items in DESC order (newest first), skip the first
# `offset` (most-recent) items, then reverse for chronological order.
fetch_limit = (session_limit + offset) if session_limit is not None else None
async for item in self._openai_client.conversations.items.list(
conversation_id=session_id,
limit=session_limit,
**({"limit": fetch_limit} if fetch_limit is not None else {}),
order="desc",
):
# calling model_dump() to make this serializable
all_items.append(item.model_dump(exclude_unset=True))
if session_limit is not None and len(all_items) >= session_limit:
if fetch_limit is not None and len(all_items) >= fetch_limit:
break
# Drop the `offset` newest items (head of the DESC list) then restore order.
all_items = all_items[offset:]
all_items.reverse()

return all_items # type: ignore
Expand Down
6 changes: 4 additions & 2 deletions src/agents/memory/openai_responses_compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None
f"candidates={len(self._compaction_candidate_items)})"
)

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
return await self.underlying_session.get_items(limit)
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
return await self.underlying_session.get_items(limit, offset)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Forward offset as keyword to underlying session

This positional forward has the same compatibility regression: wrapped sessions with a non-offset second positional argument (e.g., AdvancedSQLiteSession.branch_id) or older/custom get_items(limit) implementations will misbehave or fail when compaction calls through this path. Because this wrapper is intended to decorate arbitrary Session implementations, offset should be passed by keyword to avoid breaking valid underlying sessions.

Useful? React with 👍 / 👎.


async def _defer_compaction(self, response_id: str, store: bool | None = None) -> None:
if self._deferred_response_id is not None:
Expand Down
12 changes: 10 additions & 2 deletions src/agents/memory/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ class Session(Protocol):
session_id: str
session_settings: SessionSettings | None = None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
offset: Number of most-recent items to skip before applying the limit.
Defaults to 0. Use with limit to paginate backwards through history.

Returns:
List of input items representing the conversation history
Expand Down Expand Up @@ -68,12 +72,16 @@ class SessionABC(ABC):
session_settings: SessionSettings | None = None

@abstractmethod
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
offset: Number of most-recent items to skip before applying the limit.
Defaults to 0. Use with limit to paginate backwards through history.

Returns:
List of input items representing the conversation history
Expand Down
Loading