Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/claude_agent_sdk/_internal/session_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ async def load(self, key: SessionKey) -> list[SessionStoreEntry] | None:
entries = self._store.get(_key_to_string(key))
return None if entries is None else list(entries)

async def load_range(
self, key: SessionKey, *, head: int = 0, tail: int = 0
) -> tuple[list[SessionStoreEntry], list[SessionStoreEntry]] | None:
entries = self._store.get(_key_to_string(key))
if entries is None:
return None
return (
list(entries[:head]) if head else [],
list(entries[-tail:]) if tail else [],
)

async def list_sessions(self, project_key: str) -> list[SessionStoreListEntry]:
results: list[SessionStoreListEntry] = []
prefix = project_key + "/"
Expand Down
69 changes: 52 additions & 17 deletions src/claude_agent_sdk/_internal/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@
# adapter connection pools or tripping backend rate limits.
_STORE_LIST_LOAD_CONCURRENCY = 16

# Head/tail entry counts requested via ``store.load_range()`` when deriving
# session summaries. Tail must cover the CLI's metadata re-append threshold
# (custom-title/tag/last-prompt are re-appended every ~32 KB of transcript,
# i.e. up to ~160 small entries between re-appends — 200 leaves margin).
# Head must cover the preamble before the first user message (mode/progress/
# attachment/system/snapshot entries — observed up to index 14 in real
# transcripts). These are upper bounds — short sessions return fewer.
_LITE_LOAD_HEAD_ENTRIES = 20
_LITE_LOAD_TAIL_ENTRIES = 200

# Maximum length for a single filesystem path component. Most filesystems
# limit individual components to 255 bytes. We use 200 to leave room for
# the hash suffix and separator.
Expand Down Expand Up @@ -339,7 +349,7 @@ class _LiteSessionFile:

__slots__ = ("mtime", "size", "head", "tail")

def __init__(self, mtime: int, size: int, head: str, tail: str) -> None:
def __init__(self, mtime: int, size: int | None, head: str, tail: str) -> None:
self.mtime = mtime
self.size = size
self.head = head
Expand Down Expand Up @@ -1499,19 +1509,45 @@ def _filter_transcript_entries(entries: list[Any]) -> list[_TranscriptEntry]:
return result


async def _load_store_entries_as_jsonl(
async def _load_store_session_lite(
store: SessionStore, session_id: str, directory: str | None
) -> str | None:
"""Load entries from a SessionStore and serialize to a JSONL string.
) -> _LiteSessionFile | None:
"""Load a session from a SessionStore as a ``_LiteSessionFile``.

Only the head/tail slice needed for lite-parse summary derivation is
fetched when the store implements :meth:`SessionStore.load_range`;
otherwise falls back to a full :meth:`SessionStore.load` and slices via
``_jsonl_to_lite``. Head and tail are serialized separately on the
``load_range`` path so head-only scans (e.g. ``first_prompt``) never see
tail entries; ``size`` is ``None`` on that path since only a slice was
fetched.

Returns ``None`` if the session has no entries.
"""
project_key = project_key_for_directory(directory)
key: SessionKey = {"project_key": project_key, "session_id": session_id}
if _store_implements(store, "load_range"):
head, tail = await store.load_range(
key, head=_LITE_LOAD_HEAD_ENTRIES, tail=_LITE_LOAD_TAIL_ENTRIES
) or ([], [])
if not head and not tail:
return None
head_jsonl = _entries_to_jsonl(list(head)) if head else ""
# Mirror the disk path's "tail = head when small" semantics so
# adapters that return an empty tail for short sessions still expose
# the head entries to tail-scanning extractors.
tail_jsonl = _entries_to_jsonl(list(tail)) if tail else head_jsonl
return _LiteSessionFile(
mtime=_mtime_from_jsonl_tail(tail_jsonl or head_jsonl),
size=None,
head=head_jsonl,
tail=tail_jsonl,
)
entries = await store.load(key)
if not entries:
return None
return _entries_to_jsonl(entries)
jsonl = _entries_to_jsonl(entries)
return _jsonl_to_lite(jsonl, _mtime_from_jsonl_tail(jsonl))


async def list_sessions_from_store(
Expand Down Expand Up @@ -1548,11 +1584,13 @@ async def list_sessions_from_store(
the store path — the store operates on a single ``project_key``.

.. note::
This performs one full ``store.load()`` per session in the listing
to derive summaries. On remote backends with many or large sessions
This performs one ``store.load()`` per session in the listing to
derive summaries. On remote backends with many or large sessions
this can be expensive (e.g., S3 egress, Postgres large-row reads).
Consider denormalizing summary metadata into your adapter's
``list_sessions()`` index.
If your adapter implements ``load_range``, this uses it instead of
full ``load()`` so only a small head/tail slice is fetched per
session. Otherwise consider denormalizing summary metadata into
your adapter's ``list_sessions()`` index.
"""
if not _store_implements(session_store, "list_sessions"):
raise ValueError(
Expand All @@ -1574,9 +1612,9 @@ async def list_sessions_from_store(
# disk path.
sem = asyncio.Semaphore(_STORE_LIST_LOAD_CONCURRENCY)

async def _bounded_load(sid: str) -> str | None:
async def _bounded_load(sid: str) -> _LiteSessionFile | None:
async with sem:
return await _load_store_entries_as_jsonl(session_store, sid, directory)
return await _load_store_session_lite(session_store, sid, directory)

settled = await asyncio.gather(
*(_bounded_load(e["session_id"]) for e in listing),
Expand All @@ -1593,9 +1631,7 @@ async def _bounded_load(sid: str) -> str | None:
continue
if outcome is None:
continue
parsed = _parse_session_info_from_lite(
sid, _jsonl_to_lite(outcome, mtime), project_path
)
parsed = _parse_session_info_from_lite(sid, outcome, project_path)
if parsed is None:
# Sidechain or no extractable summary — drop, matching the
# filesystem path.
Expand Down Expand Up @@ -1627,10 +1663,9 @@ async def get_session_info_from_store(
"""
if not _validate_uuid(session_id):
return None
jsonl = await _load_store_entries_as_jsonl(session_store, session_id, directory)
if jsonl is None:
lite = await _load_store_session_lite(session_store, session_id, directory)
if lite is None:
return None
lite = _jsonl_to_lite(jsonl, _mtime_from_jsonl_tail(jsonl))
project_path = _canonicalize_path(str(directory) if directory is not None else ".")
return _parse_session_info_from_lite(session_id, lite, project_path)

Expand Down
38 changes: 30 additions & 8 deletions src/claude_agent_sdk/testing/session_store_conformance.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Shared conformance test suite for :class:`SessionStore` adapters.

Call :func:`run_session_store_conformance` from an async test to assert the
13 behavioral contracts every adapter must satisfy. Tests for optional
methods (``list_sessions``, ``delete``, ``list_subkeys``) are skipped when
named in ``skip_optional`` or when the store does not override that method.
14 behavioral contracts every adapter must satisfy. Tests for optional
methods (``list_sessions``, ``delete``, ``list_subkeys``, ``load_range``) are
skipped when named in ``skip_optional`` or when the store does not override
that method.

Example::

Expand All @@ -24,9 +25,9 @@ async def test_my_store_conformance():

from ..types import SessionKey, SessionStore

OptionalMethod = str # "list_sessions" | "delete" | "list_subkeys"
OptionalMethod = str # "list_sessions" | "delete" | "list_subkeys" | "load_range"
_OPTIONAL_METHODS: frozenset[str] = frozenset(
{"list_sessions", "delete", "list_subkeys"}
{"list_sessions", "delete", "list_subkeys", "load_range"}
)

_KEY: SessionKey = {"project_key": "proj", "session_id": "sess"}
Expand All @@ -53,12 +54,12 @@ async def run_session_store_conformance(
*,
skip_optional: frozenset[str] = frozenset(),
) -> None:
"""Assert the 13 :class:`SessionStore` behavioral contracts.
"""Assert the 14 :class:`SessionStore` behavioral contracts.

``make_store`` is invoked once per contract to provide isolation. It may be
sync or async. Contracts for optional methods (``list_sessions``,
``delete``, ``list_subkeys``) are skipped when named in ``skip_optional``
or when the store does not override that method.
``delete``, ``list_subkeys``, ``load_range``) are skipped when named in
``skip_optional`` or when the store does not override that method.
"""
invalid = skip_optional - _OPTIONAL_METHODS
assert not invalid, f"unknown optional methods in skip_optional: {invalid}"
Expand All @@ -73,6 +74,7 @@ async def fresh() -> SessionStore:
has_list_sessions = _has_optional(probe, "list_sessions", skip_optional)
has_delete = _has_optional(probe, "delete", skip_optional)
has_list_subkeys = _has_optional(probe, "list_subkeys", skip_optional)
has_load_range = _has_optional(probe, "load_range", skip_optional)

# --- Required: append + load -------------------------------------------

Expand Down Expand Up @@ -247,6 +249,26 @@ async def fresh() -> SessionStore:
== []
)

# --- Optional: load_range ----------------------------------------------

if has_load_range:
# 14. load_range returns first head / last tail entries
Comment thread
qing-ant marked this conversation as resolved.
store = await fresh()
seq = [_e({"n": i}) for i in range(6)]
await store.append(_KEY, seq)
ranged = await store.load_range(_KEY, head=2, tail=2)
assert ranged is not None
head, tail = ranged
assert head == seq[:2]
assert tail == seq[-2:]
# Unknown key returns None (matches load()).
assert (
await store.load_range(
{"project_key": "proj", "session_id": "never-written"}, head=2, tail=2
)
is None
)


def _e(d: dict[str, Any]) -> Any:
"""Build a test entry satisfying ``SessionStoreEntry`` (``type`` is required).
Expand Down
17 changes: 17 additions & 0 deletions src/claude_agent_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,23 @@ async def load(self, key: SessionKey) -> list[SessionStoreEntry] | None:
"""
...

async def load_range(
self, key: SessionKey, *, head: int = 0, tail: int = 0
) -> tuple[list[SessionStoreEntry], list[SessionStoreEntry]] | None:
"""Load only the first ``head`` and last ``tail`` entries.

Optional optimization. Used by :func:`list_sessions_from_store` and
:func:`get_session_info_from_store` to derive summaries without
fetching full transcripts. Adapters that can range-read (S3 first/last
part-file, Postgres ``LIMIT``, Redis ``LRANGE``) should implement this
to avoid loading multi-MB sessions just to extract a title.

Return ``None`` for a key that was never written. The two lists may
overlap or be short if the session has fewer than ``head + tail``
entries — callers handle that.
"""
raise NotImplementedError

async def list_sessions(self, project_key: str) -> list[SessionStoreListEntry]:
"""List sessions for a ``project_key``. Returns IDs + modification times.

Expand Down
25 changes: 15 additions & 10 deletions tests/test_session_helpers_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,13 @@ async def list_sessions(self, project_key): # type: ignore[override]
assert store.internal[1]["session_id"] == "b"

async def test_adapter_load_error_degrades_row(self) -> None:
"""One failing load() degrades that row instead of failing the list."""
"""One failing read degrades that row instead of failing the list."""

class FlakeyStore(InMemorySessionStore):
async def load(self, key):
async def load_range(self, key, *, head=0, tail=0):
if key["session_id"] == bad_sid:
raise RuntimeError("backend down")
return await super().load(key)
return await super().load_range(key, head=head, tail=tail)
Comment thread
qing-ant marked this conversation as resolved.

store = FlakeyStore()
good_sid = str(uuid_mod.uuid4())
Expand All @@ -229,7 +229,7 @@ async def load(self, key):

async def test_load_concurrency_is_bounded(self) -> None:
"""list_sessions_from_store must not issue unbounded concurrent
store.load() calls — large listings would otherwise exhaust adapter
store reads — large listings would otherwise exhaust adapter
connection pools. Regression for the paginate-after-filter refactor."""
from claude_agent_sdk._internal import sessions as _sessions

Expand All @@ -238,13 +238,13 @@ async def test_load_concurrency_is_bounded(self) -> None:
gate = asyncio.Event()

class SlowStore(InMemorySessionStore):
async def load(self, key):
async def load_range(self, key, *, head=0, tail=0):
nonlocal in_flight, peak
in_flight += 1
peak = max(peak, in_flight)
await gate.wait()
in_flight -= 1
return await super().load(key)
return await super().load_range(key, head=head, tail=tail)

store = SlowStore()
n = _sessions._STORE_LIST_LOAD_CONCURRENCY * 3
Expand Down Expand Up @@ -553,13 +553,18 @@ async def test_tag_survives_adapter_key_reordering(self) -> None:
object keys (Postgres JSONB does this). The tag extractor must not
depend on ``type`` being the first key in the serialized line."""

def _reorder(entries):
# Sort keys alphabetically — deep-equal but different order.
return [dict(sorted(e.items())) for e in entries]

class ReorderingStore(InMemorySessionStore):
async def load(self, key): # type: ignore[override]
entries = await super().load(key)
if entries is None:
return None
# Sort keys alphabetically — deep-equal but different order.
return [dict(sorted(e.items())) for e in entries]
return None if entries is None else _reorder(entries)

async def load_range(self, key, *, head=0, tail=0): # type: ignore[override]
r = await super().load_range(key, head=head, tail=tail)
return None if r is None else (_reorder(r[0]), _reorder(r[1]))

store = ReorderingStore()
sid = str(uuid_mod.uuid4())
Expand Down
Loading
Loading