Skip to content

Commit 20308a3

Browse files
m1lestonesclaude
andcommitted
feat(sessions): add offset parameter to get_items() for pagination
Adds an optional offset: int = 0 parameter to get_items() across all session backends, enabling callers to paginate backwards through history. session.get_items(limit=10) # 10 most recent (unchanged) session.get_items(limit=10, offset=10) # next 10 older items session.get_items(offset=5) # all except 5 most recent Closes #2810 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 86739b1 commit 20308a3

13 files changed

Lines changed: 186 additions & 44 deletions

src/agents/extensions/memory/advanced_sqlite_session.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,12 +160,15 @@ async def get_items(
160160
self,
161161
limit: int | None = None,
162162
branch_id: str | None = None,
163+
offset: int = 0,
163164
) -> list[TResponseInputItem]:
164165
"""Get items from current or specified branch.
165166
166167
Args:
167168
limit: Maximum number of items to return. If None, uses session_settings.limit.
168169
branch_id: Branch to get items from. If None, uses current branch.
170+
offset: Number of most-recent items to skip before applying the limit.
171+
Defaults to 0. Use with limit to paginate backwards through history.
169172
170173
Returns:
171174
List of conversation items from the specified branch.
@@ -180,7 +183,7 @@ def _get_all_items_sync():
180183
"""Synchronous helper to get all items for a branch."""
181184
with self._locked_connection() as conn:
182185
with closing(conn.cursor()) as cursor:
183-
if session_limit is None:
186+
if session_limit is None and offset == 0:
184187
cursor.execute(
185188
f"""
186189
SELECT m.message_data
@@ -192,20 +195,21 @@ def _get_all_items_sync():
192195
(self.session_id, branch_id),
193196
)
194197
else:
198+
sql_limit = session_limit if session_limit is not None else -1
195199
cursor.execute(
196200
f"""
197201
SELECT m.message_data
198202
FROM {self.messages_table} m
199203
JOIN message_structure s ON m.id = s.message_id
200204
WHERE m.session_id = ? AND s.branch_id = ?
201205
ORDER BY s.sequence_number DESC
202-
LIMIT ?
206+
LIMIT ? OFFSET ?
203207
""",
204-
(self.session_id, branch_id, session_limit),
208+
(self.session_id, branch_id, sql_limit, offset),
205209
)
206210

207211
rows = cursor.fetchall()
208-
if session_limit is not None:
212+
if session_limit is not None or offset > 0:
209213
rows = list(reversed(rows))
210214

211215
items = []
@@ -224,7 +228,7 @@ def _get_items_sync():
224228
with self._locked_connection() as conn:
225229
with closing(conn.cursor()) as cursor:
226230
# Get message IDs in correct order for this branch
227-
if session_limit is None:
231+
if session_limit is None and offset == 0:
228232
cursor.execute(
229233
f"""
230234
SELECT m.message_data
@@ -236,20 +240,21 @@ def _get_items_sync():
236240
(self.session_id, branch_id),
237241
)
238242
else:
243+
sql_limit = session_limit if session_limit is not None else -1
239244
cursor.execute(
240245
f"""
241246
SELECT m.message_data
242247
FROM {self.messages_table} m
243248
JOIN message_structure s ON m.id = s.message_id
244249
WHERE m.session_id = ? AND s.branch_id = ?
245250
ORDER BY s.sequence_number DESC
246-
LIMIT ?
251+
LIMIT ? OFFSET ?
247252
""",
248-
(self.session_id, branch_id, session_limit),
253+
(self.session_id, branch_id, sql_limit, offset),
249254
)
250255

251256
rows = cursor.fetchall()
252-
if session_limit is not None:
257+
if session_limit is not None or offset > 0:
253258
rows = list(reversed(rows))
254259

255260
items = []

src/agents/extensions/memory/async_sqlite_session.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,19 +102,23 @@ async def _locked_connection(self) -> AsyncIterator[aiosqlite.Connection]:
102102
conn = await self._get_connection()
103103
yield conn
104104

105-
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
105+
async def get_items(
106+
self, limit: int | None = None, offset: int = 0
107+
) -> list[TResponseInputItem]:
106108
"""Retrieve the conversation history for this session.
107109
108110
Args:
109111
limit: Maximum number of items to retrieve. If None, retrieves all items.
110112
When specified, returns the latest N items in chronological order.
113+
offset: Number of most-recent items to skip before applying the limit.
114+
Defaults to 0. Use with limit to paginate backwards through history.
111115
112116
Returns:
113117
List of input items representing the conversation history
114118
"""
115119

116120
async with self._locked_connection() as conn:
117-
if limit is None:
121+
if limit is None and offset == 0:
118122
cursor = await conn.execute(
119123
f"""
120124
SELECT message_data FROM {self.messages_table}
@@ -124,20 +128,22 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
124128
(self.session_id,),
125129
)
126130
else:
131+
# LIMIT -1 means no limit in SQLite.
132+
sql_limit = limit if limit is not None else -1
127133
cursor = await conn.execute(
128134
f"""
129135
SELECT message_data FROM {self.messages_table}
130136
WHERE session_id = ?
131137
ORDER BY id DESC
132-
LIMIT ?
138+
LIMIT ? OFFSET ?
133139
""",
134-
(self.session_id, limit),
140+
(self.session_id, sql_limit, offset),
135141
)
136142

137143
rows = list(await cursor.fetchall())
138144
await cursor.close()
139145

140-
if limit is not None:
146+
if limit is not None or offset > 0:
141147
rows = rows[::-1]
142148

143149
items: list[TResponseInputItem] = []

src/agents/extensions/memory/dapr_session.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,16 @@ async def _handle_concurrency_conflict(self, error: Exception, attempt: int) ->
232232
# Session protocol implementation
233233
# ------------------------------------------------------------------
234234

235-
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
235+
async def get_items(
236+
self, limit: int | None = None, offset: int = 0
237+
) -> list[TResponseInputItem]:
236238
"""Retrieve the conversation history for this session.
237239
238240
Args:
239241
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
240242
When specified, returns the latest N items in chronological order.
243+
offset: Number of most-recent items to skip before applying the limit.
244+
Defaults to 0. Use with limit to paginate backwards through history.
241245
242246
Returns:
243247
List of input items representing the conversation history
@@ -255,6 +259,11 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
255259
messages = self._decode_messages(response.data)
256260
if not messages:
257261
return []
262+
# Apply offset from the newest end, then limit.
263+
end = len(messages) - offset if offset > 0 else len(messages)
264+
if end <= 0:
265+
return []
266+
messages = messages[:end]
258267
if session_limit is not None:
259268
if session_limit <= 0:
260269
return []

src/agents/extensions/memory/encrypt_session.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,10 @@ def _unwrap(self, item: TResponseInputItem | EncryptedEnvelope) -> TResponseInpu
170170
except (InvalidToken, KeyError):
171171
return None
172172

173-
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
174-
encrypted_items = await self.underlying_session.get_items(limit)
173+
async def get_items(
174+
self, limit: int | None = None, offset: int = 0
175+
) -> list[TResponseInputItem]:
176+
encrypted_items = await self.underlying_session.get_items(limit, offset)
175177
valid_items: list[TResponseInputItem] = []
176178
for enc in encrypted_items:
177179
item = self._unwrap(enc)

src/agents/extensions/memory/redis_session.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,28 +140,38 @@ async def _set_ttl_if_configured(self, *keys: str) -> None:
140140
# Session protocol implementation
141141
# ------------------------------------------------------------------
142142

143-
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
143+
async def get_items(
144+
self, limit: int | None = None, offset: int = 0
145+
) -> list[TResponseInputItem]:
144146
"""Retrieve the conversation history for this session.
145147
146148
Args:
147149
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
148150
When specified, returns the latest N items in chronological order.
151+
offset: Number of most-recent items to skip before applying the limit.
152+
Defaults to 0. Use with limit to paginate backwards through history.
149153
150154
Returns:
151155
List of input items representing the conversation history
152156
"""
153157
session_limit = resolve_session_limit(limit, self.session_settings)
154158

155159
async with self._lock:
156-
if session_limit is None:
160+
if session_limit is None and offset == 0:
157161
# Get all messages in chronological order
158162
raw_messages = await self._redis.lrange(self._messages_key, 0, -1) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context
159163
else:
160-
if session_limit <= 0:
164+
if session_limit is not None and session_limit <= 0:
161165
return []
162-
# Get the latest N messages (Redis list is ordered chronologically)
163-
# Use negative indices to get from the end - Redis uses -N to -1 for last N items
164-
raw_messages = await self._redis.lrange(self._messages_key, -session_limit, -1) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context
166+
# Redis list is ordered chronologically (index 0 = oldest).
167+
# Use negative indices to address from the newest end.
168+
# -1 = newest, -(offset+1) = newest after skipping offset items.
169+
end = -(offset + 1) if offset > 0 else -1
170+
if session_limit is None:
171+
start = 0
172+
else:
173+
start = -(session_limit + offset)
174+
raw_messages = await self._redis.lrange(self._messages_key, start, end) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context
165175

166176
items: list[TResponseInputItem] = []
167177
for raw_msg in raw_messages:

src/agents/extensions/memory/sqlalchemy_session.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,12 +274,16 @@ async def _ensure_tables(self) -> None:
274274
finally:
275275
self._init_lock.release()
276276

277-
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
277+
async def get_items(
278+
self, limit: int | None = None, offset: int = 0
279+
) -> list[TResponseInputItem]:
278280
"""Retrieve the conversation history for this session.
279281
280282
Args:
281283
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
282284
When specified, returns the latest N items in chronological order.
285+
offset: Number of most-recent items to skip before applying the limit.
286+
Defaults to 0. Use with limit to paginate backwards through history.
283287
284288
Returns:
285289
List of input items representing the conversation history
@@ -289,7 +293,7 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
289293
session_limit = resolve_session_limit(limit, self.session_settings)
290294

291295
async with self._session_factory() as sess:
292-
if session_limit is None:
296+
if session_limit is None and offset == 0:
293297
stmt = (
294298
select(self._messages.c.message_data)
295299
.where(self._messages.c.session_id == self.session_id)
@@ -302,19 +306,21 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
302306
stmt = (
303307
select(self._messages.c.message_data)
304308
.where(self._messages.c.session_id == self.session_id)
305-
# Use DESC + LIMIT to get the latest N
309+
# Use DESC + LIMIT/OFFSET to get the latest N after skipping offset items,
306310
# then reverse later for chronological order.
307311
.order_by(
308312
self._messages.c.created_at.desc(),
309313
self._messages.c.id.desc(),
310314
)
311-
.limit(session_limit)
315+
.offset(offset)
312316
)
317+
if session_limit is not None:
318+
stmt = stmt.limit(session_limit)
313319

314320
result = await sess.execute(stmt)
315321
rows: list[str] = [row[0] for row in result.all()]
316322

317-
if session_limit is not None:
323+
if session_limit is not None or offset > 0:
318324
rows.reverse()
319325

320326
items: list[TResponseInputItem] = []

src/agents/memory/openai_conversations_session.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,29 +70,36 @@ async def _get_session_id(self) -> str:
7070
async def _clear_session_id(self) -> None:
7171
self._session_id = None
7272

73-
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
73+
async def get_items(
74+
self, limit: int | None = None, offset: int = 0
75+
) -> list[TResponseInputItem]:
7476
session_id = await self._get_session_id()
7577

7678
session_limit = resolve_session_limit(limit, self.session_settings)
7779

7880
all_items = []
79-
if session_limit is None:
81+
if session_limit is None and offset == 0:
8082
async for item in self._openai_client.conversations.items.list(
8183
conversation_id=session_id,
8284
order="asc",
8385
):
8486
# calling model_dump() to make this serializable
8587
all_items.append(item.model_dump(exclude_unset=True))
8688
else:
89+
# Fetch limit+offset items in DESC order (newest first), skip the first
90+
# `offset` (most-recent) items, then reverse for chronological order.
91+
fetch_limit = (session_limit + offset) if session_limit is not None else None
8792
async for item in self._openai_client.conversations.items.list(
8893
conversation_id=session_id,
89-
limit=session_limit,
94+
**({"limit": fetch_limit} if fetch_limit is not None else {}),
9095
order="desc",
9196
):
9297
# calling model_dump() to make this serializable
9398
all_items.append(item.model_dump(exclude_unset=True))
94-
if session_limit is not None and len(all_items) >= session_limit:
99+
if fetch_limit is not None and len(all_items) >= fetch_limit:
95100
break
101+
# Drop the `offset` newest items (head of the DESC list) then restore order.
102+
all_items = all_items[offset:]
96103
all_items.reverse()
97104

98105
return all_items # type: ignore

src/agents/memory/openai_responses_compaction_session.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,10 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None
239239
f"candidates={len(self._compaction_candidate_items)})"
240240
)
241241

242-
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
243-
return await self.underlying_session.get_items(limit)
242+
async def get_items(
243+
self, limit: int | None = None, offset: int = 0
244+
) -> list[TResponseInputItem]:
245+
return await self.underlying_session.get_items(limit, offset)
244246

245247
async def _defer_compaction(self, response_id: str, store: bool | None = None) -> None:
246248
if self._deferred_response_id is not None:

src/agents/memory/session.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@ class Session(Protocol):
2121
session_id: str
2222
session_settings: SessionSettings | None = None
2323

24-
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
24+
async def get_items(
25+
self, limit: int | None = None, offset: int = 0
26+
) -> list[TResponseInputItem]:
2527
"""Retrieve the conversation history for this session.
2628
2729
Args:
2830
limit: Maximum number of items to retrieve. If None, retrieves all items.
2931
When specified, returns the latest N items in chronological order.
32+
offset: Number of most-recent items to skip before applying the limit.
33+
Defaults to 0. Use with limit to paginate backwards through history.
3034
3135
Returns:
3236
List of input items representing the conversation history
@@ -68,12 +72,16 @@ class SessionABC(ABC):
6872
session_settings: SessionSettings | None = None
6973

7074
@abstractmethod
71-
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
75+
async def get_items(
76+
self, limit: int | None = None, offset: int = 0
77+
) -> list[TResponseInputItem]:
7278
"""Retrieve the conversation history for this session.
7379
7480
Args:
7581
limit: Maximum number of items to retrieve. If None, retrieves all items.
7682
When specified, returns the latest N items in chronological order.
83+
offset: Number of most-recent items to skip before applying the limit.
84+
Defaults to 0. Use with limit to paginate backwards through history.
7785
7886
Returns:
7987
List of input items representing the conversation history

0 commit comments

Comments
 (0)