diff --git a/src/claude_agent_sdk/__init__.py b/src/claude_agent_sdk/__init__.py
index 0ff4fb4b..c9d1e897 100644
--- a/src/claude_agent_sdk/__init__.py
+++ b/src/claude_agent_sdk/__init__.py
@@ -37,6 +37,7 @@
tag_session_via_store,
)
from ._internal.session_store import InMemorySessionStore, project_key_for_directory
+from ._internal.session_summary import fold_session_summary
from ._internal.sessions import (
get_session_info,
get_session_info_from_store,
@@ -109,6 +110,7 @@
SessionStore,
SessionStoreEntry,
SessionStoreListEntry,
+ SessionSummaryEntry,
SettingSource,
StopHookInput,
StreamEvent,
@@ -602,8 +604,10 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
"SessionStore",
"SessionStoreEntry",
"SessionStoreListEntry",
+ "SessionSummaryEntry",
"SessionListSubkeysKey",
"InMemorySessionStore",
+ "fold_session_summary",
"MirrorErrorMessage",
"project_key_for_directory",
# Session listing (SessionStore-backed async variants)
diff --git a/src/claude_agent_sdk/_internal/session_store.py b/src/claude_agent_sdk/_internal/session_store.py
index 0d5a0e90..bb6a2155 100644
--- a/src/claude_agent_sdk/_internal/session_store.py
+++ b/src/claude_agent_sdk/_internal/session_store.py
@@ -12,7 +12,9 @@
SessionStore,
SessionStoreEntry,
SessionStoreListEntry,
+ SessionSummaryEntry,
)
+from .session_summary import fold_session_summary
from .sessions import project_key_for_directory
__all__ = [
@@ -41,11 +43,42 @@ class InMemorySessionStore(SessionStore):
def __init__(self) -> None:
self._store: dict[str, list[SessionStoreEntry]] = {}
self._mtimes: dict[str, int] = {}
+ self._summaries: dict[tuple[str, str], SessionSummaryEntry] = {}
+ self._last_mtime = 0
+
+ def _next_mtime(self) -> int:
+ """Storage write time for this adapter, in Unix epoch ms.
+
+ Guaranteed strictly monotonically increasing across calls within the
+ process so back-to-back appends always produce distinct mtimes (real
+ storage backends — file mtime on modern filesystems, S3
+ LastModified, Postgres updated_at — get this property for free from
+ their commit ordering).
+ """
+ now_ms = int(time.time() * 1000)
+ if now_ms <= self._last_mtime:
+ now_ms = self._last_mtime + 1
+ self._last_mtime = now_ms
+ return now_ms
async def append(self, key: SessionKey, entries: list[SessionStoreEntry]) -> None:
k = _key_to_string(key)
self._store.setdefault(k, []).extend(entries)
- self._mtimes[k] = int(time.time() * 1000)
+ now_ms = self._next_mtime()
+ # Maintain the per-session summary sidecar incrementally so
+ # list_session_summaries() never re-reads. Subagent subpaths don't
+ # contribute to the main session's summary.
+ if key.get("subpath") is None:
+ sk = (key["project_key"], key["session_id"])
+ folded = fold_session_summary(self._summaries.get(sk), key, entries)
+ # Stamp the sidecar with this adapter's storage write time — the
+ # SAME clock list_sessions() exposes below. SessionSummaryEntry.
+ # mtime is contractually storage write time (not entry time), so
+ # the fast-path staleness check (summary.mtime < list_sessions
+ # mtime) works correctly.
+ folded["mtime"] = now_ms
+ self._summaries[sk] = folded
+ self._mtimes[k] = now_ms
async def load(self, key: SessionKey) -> list[SessionStoreEntry] | None:
entries = self._store.get(_key_to_string(key))
@@ -64,6 +97,11 @@ async def list_sessions(self, project_key: str) -> list[SessionStoreListEntry]:
)
return results
+ async def list_session_summaries(
+ self, project_key: str
+ ) -> list[SessionSummaryEntry]:
+ return [s for (pk, _), s in self._summaries.items() if pk == project_key]
+
async def delete(self, key: SessionKey) -> None:
k = _key_to_string(key)
self._store.pop(k, None)
@@ -72,6 +110,7 @@ async def delete(self, key: SessionKey) -> None:
# transcripts, metadata) so they aren't orphaned. A targeted delete
# with an explicit subpath removes only that one entry.
if key.get("subpath") is None:
+ self._summaries.pop((key["project_key"], key["session_id"]), None)
prefix = f"{key['project_key']}/{key['session_id']}/"
for store_key in [sk for sk in self._store if sk.startswith(prefix)]:
self._store.pop(store_key, None)
@@ -103,6 +142,8 @@ def clear(self) -> None:
"""Test helper — clear all stored data."""
self._store.clear()
self._mtimes.clear()
+ self._summaries.clear()
+ self._last_mtime = 0
def file_path_to_session_key(file_path: str, projects_dir: str) -> SessionKey | None:
diff --git a/src/claude_agent_sdk/_internal/session_summary.py b/src/claude_agent_sdk/_internal/session_summary.py
new file mode 100644
index 00000000..3a509572
--- /dev/null
+++ b/src/claude_agent_sdk/_internal/session_summary.py
@@ -0,0 +1,233 @@
+"""Incremental session-summary derivation for :class:`SessionStore` adapters.
+
+:func:`fold_session_summary` lets a store maintain a per-session
+:class:`SessionSummaryEntry` sidecar incrementally inside ``append()`` so
+``list_sessions_from_store()`` can fetch all metadata in a single
+``list_session_summaries()`` call instead of N per-session ``load()`` calls.
+
+Every derived field is append-incremental (set-once or last-wins) so adapters
+never need to re-read previously appended entries.
+"""
+
+from __future__ import annotations
+
+from datetime import datetime
+from typing import Any, cast
+
+from ..types import (
+ SDKSessionInfo,
+ SessionKey,
+ SessionStoreEntry,
+ SessionSummaryEntry,
+)
+from .sessions import _COMMAND_NAME_RE, _SKIP_FIRST_PROMPT_PATTERN
+
+__all__ = ["fold_session_summary", "summary_entry_to_sdk_info"]
+
+
+# Map of JSONL entry keys → SessionSummaryEntry keys for last-wins string
+# fields. Each appended entry overwrites the previous value when present.
+_LAST_WINS_FIELDS: dict[str, str] = {
+ "customTitle": "custom_title",
+ "aiTitle": "ai_title",
+ "lastPrompt": "last_prompt",
+ "summary": "summary_hint",
+ "gitBranch": "git_branch",
+}
+
+
+def _iso_to_epoch_ms(ts: Any) -> int | None:
+ """Parse an ISO-8601 timestamp string to Unix epoch milliseconds."""
+ if not isinstance(ts, str):
+ return None
+ try:
+ # Python 3.10's fromisoformat doesn't support trailing 'Z'
+ norm = ts.replace("Z", "+00:00") if ts.endswith("Z") else ts
+ return int(datetime.fromisoformat(norm).timestamp() * 1000)
+ except ValueError:
+ return None
+
+
+def _entry_text_blocks(entry: dict[str, Any]) -> list[str]:
+ """Extract text strings from a ``type=="user"`` entry's message content."""
+ message = entry.get("message")
+ if not isinstance(message, dict):
+ return []
+ content = message.get("content")
+ texts: list[str] = []
+ if isinstance(content, str):
+ texts.append(content)
+ elif isinstance(content, list):
+ for block in content:
+ if (
+ isinstance(block, dict)
+ and block.get("type") == "text"
+ and isinstance(block.get("text"), str)
+ ):
+ texts.append(block["text"])
+ return texts
+
+
+def _fold_first_prompt(data: dict[str, Any], entry: dict[str, Any]) -> None:
+ """Replicate ``_extract_first_prompt_from_head`` for a single parsed entry.
+
+ Mutates ``data`` in place: sets ``first_prompt`` + ``first_prompt_locked``
+ on a real match, or stashes a ``command_fallback`` for slash-command
+ messages. Skips tool_result, isMeta, isCompactSummary, and auto-generated
+ patterns.
+ """
+ if data.get("first_prompt_locked"):
+ return
+ if entry.get("type") != "user":
+ return
+ if entry.get("isMeta") is True or entry.get("isCompactSummary") is True:
+ return
+ # Skip tool_result-carrying user messages.
+ message = entry.get("message")
+ if isinstance(message, dict):
+ content = message.get("content")
+ if isinstance(content, list) and any(
+ isinstance(b, dict) and b.get("type") == "tool_result" for b in content
+ ):
+ return
+
+ for raw in _entry_text_blocks(entry):
+ result = raw.replace("\n", " ").strip()
+ if not result:
+ continue
+ cmd_match = _COMMAND_NAME_RE.search(result)
+ if cmd_match:
+ if not data.get("command_fallback"):
+ data["command_fallback"] = cmd_match.group(1)
+ continue
+ if _SKIP_FIRST_PROMPT_PATTERN.match(result):
+ continue
+ if len(result) > 200:
+ result = result[:200].rstrip() + "\u2026"
+ data["first_prompt"] = result
+ data["first_prompt_locked"] = True
+ return
+
+
+def fold_session_summary(
+ prev: SessionSummaryEntry | None,
+ key: SessionKey,
+ entries: list[SessionStoreEntry],
+) -> SessionSummaryEntry:
+ """Fold a batch of appended entries into the running summary for ``key``.
+
+ Stores call this from inside ``append()`` to keep a
+ :class:`SessionSummaryEntry` sidecar up to date without re-reading the
+ transcript. ``prev`` is the previous summary for the same key (or ``None``
+ for the first append).
+
+ Do not call this for keys with a ``subpath`` — subagent transcripts must
+ not contribute to the main session's summary. Guard with
+ ``if key.get("subpath") is None:`` before calling.
+
+ All derived state lives in the opaque ``data`` dict; stores persist it
+ verbatim and do not interpret it.
+
+ ``mtime`` is NOT touched by the fold — it is the sidecar's storage
+ write time and must be stamped by the adapter after persisting. It has
+ to share a clock with the ``mtime`` returned by
+ :meth:`SessionStore.list_sessions` for the same session (typically file
+ mtime, S3 ``LastModified``, Postgres ``updated_at``, or whatever native
+ timestamp the adapter surfaces); deriving it from entry ISO timestamps
+ would make every batched-write sidecar appear strictly older than the
+ session's current mtime, defeating the fast-path staleness check. For a
+ new session (``prev is None``) the fold returns ``mtime=0`` as a
+ placeholder; the adapter is expected to overwrite it.
+
+ ``created_at`` latches the first parseable entry timestamp; the disk
+ lite-parse only inspects the first line, so for streams whose first
+ entry lacks a timestamp (does not occur in CLI-produced transcripts)
+ the fold path yields a non-``None`` ``created_at`` where lite-parse
+ yields ``None``.
+ """
+ if prev is not None:
+ summary: SessionSummaryEntry = {
+ "session_id": prev["session_id"],
+ "mtime": prev["mtime"],
+ "data": dict(prev["data"]),
+ }
+ else:
+ summary = {"session_id": key["session_id"], "mtime": 0, "data": {}}
+ data = summary["data"]
+
+ for raw in entries:
+ # SessionStoreEntry is a permissive TypedDict; widen to a plain dict
+ # so .get() of unknown keys type-checks.
+ entry = cast("dict[str, Any]", raw)
+
+ ms = _iso_to_epoch_ms(entry.get("timestamp"))
+
+ if "is_sidechain" not in data:
+ data["is_sidechain"] = entry.get("isSidechain") is True
+ if "created_at" not in data and ms is not None:
+ data["created_at"] = ms
+
+ if "cwd" not in data:
+ cwd = entry.get("cwd")
+ if isinstance(cwd, str) and cwd:
+ data["cwd"] = cwd
+
+ _fold_first_prompt(data, entry)
+
+ for src, dst in _LAST_WINS_FIELDS.items():
+ val = entry.get(src)
+ if isinstance(val, str):
+ data[dst] = val
+
+ if entry.get("type") == "tag":
+ tag_val = entry.get("tag")
+ if isinstance(tag_val, str) and tag_val:
+ data["tag"] = tag_val
+ else:
+ # Empty string or absent tag clears the tag.
+ data.pop("tag", None)
+
+ return summary
+
+
+def summary_entry_to_sdk_info(
+ entry: SessionSummaryEntry, project_path: str | None
+) -> SDKSessionInfo | None:
+ """Convert a :class:`SessionSummaryEntry` to :class:`SDKSessionInfo`.
+
+ Returns ``None`` for sidechain sessions or sessions with no extractable
+ summary, matching ``_parse_session_info_from_lite``'s filtering.
+ """
+ data = entry["data"]
+ if data.get("is_sidechain"):
+ return None
+
+ first_prompt = (
+ data.get("first_prompt")
+ if data.get("first_prompt_locked")
+ else data.get("command_fallback")
+ ) or None
+ custom_title = data.get("custom_title") or data.get("ai_title") or None
+ summary = (
+ custom_title
+ or data.get("last_prompt")
+ or data.get("summary_hint")
+ or first_prompt
+ )
+ if not summary:
+ return None
+
+ return SDKSessionInfo(
+ session_id=entry["session_id"],
+ summary=summary,
+ last_modified=entry["mtime"],
+ # file_size is a JSONL byte count — meaningful only for the local-disk
+ # path (see SDKSessionInfo.file_size). Stores have no equivalent.
+ file_size=None,
+ custom_title=custom_title,
+ first_prompt=first_prompt,
+ git_branch=data.get("git_branch") or None,
+ cwd=data.get("cwd") or project_path or None,
+ tag=data.get("tag") or None,
+ created_at=data.get("created_at"),
+ )
diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py
index dbd07d37..e495d476 100644
--- a/src/claude_agent_sdk/_internal/sessions.py
+++ b/src/claude_agent_sdk/_internal/sessions.py
@@ -8,6 +8,7 @@
import asyncio
import json
+import logging
import os
import re
import subprocess
@@ -21,6 +22,8 @@
from ..types import SDKSessionInfo, SessionKey, SessionMessage, SessionStore
from .session_store_validation import _store_implements
+logger = logging.getLogger(__name__)
+
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
@@ -1514,6 +1517,53 @@ async def _load_store_entries_as_jsonl(
return _entries_to_jsonl(entries)
+async def _derive_infos_via_load(
+ session_store: SessionStore,
+ listing: list[Any],
+ directory: str | None,
+ project_path: str,
+) -> list[SDKSessionInfo]:
+ """Derive ``SDKSessionInfo`` for each ``listing`` entry via per-session
+ ``store.load()`` + lite-parse.
+
+ Loads run concurrently with a fixed bound so large listings don't exhaust
+ adapter connection pools or hit backend rate limits; adapter errors degrade
+ that row to an empty summary instead of failing the whole list. Sidechain
+ and no-summary sessions are dropped.
+ """
+ sem = asyncio.Semaphore(_STORE_LIST_LOAD_CONCURRENCY)
+
+ async def _bounded_load(sid: str) -> str | None:
+ async with sem:
+ return await _load_store_entries_as_jsonl(session_store, sid, directory)
+
+ settled = await asyncio.gather(
+ *(_bounded_load(e["session_id"]) for e in listing),
+ return_exceptions=True,
+ )
+ results: list[SDKSessionInfo] = []
+ for entry, outcome in zip(listing, settled, strict=True):
+ sid = entry["session_id"]
+ mtime = entry["mtime"]
+ if isinstance(outcome, BaseException):
+ results.append(
+ SDKSessionInfo(session_id=sid, summary="", last_modified=mtime)
+ )
+ continue
+ if outcome is None:
+ continue
+ parsed = _parse_session_info_from_lite(
+ sid, _jsonl_to_lite(outcome, mtime), project_path
+ )
+ if parsed is None:
+ # Sidechain or no extractable summary — drop, matching the
+ # filesystem path.
+ continue
+ parsed.last_modified = mtime
+ results.append(parsed)
+ return results
+
+
async def list_sessions_from_store(
session_store: SessionStore,
directory: str | None = None,
@@ -1529,7 +1579,8 @@ async def list_sessions_from_store(
Args:
session_store: The store to read from. Must implement
- :meth:`SessionStore.list_sessions`.
+ :meth:`SessionStore.list_session_summaries` or
+ :meth:`SessionStore.list_sessions` (or both).
directory: Project directory used to compute the ``project_key``.
Defaults to the current working directory.
limit: Maximum number of sessions to return.
@@ -1540,7 +1591,8 @@ async def list_sessions_from_store(
List of ``SDKSessionInfo`` sorted by ``last_modified`` descending.
Raises:
- ValueError: If ``session_store`` does not implement
+ ValueError: If ``session_store`` implements neither
+ :meth:`SessionStore.list_session_summaries` nor
:meth:`SessionStore.list_sessions`.
Note:
@@ -1548,60 +1600,124 @@ async def list_sessions_from_store(
the store path — the store operates on a single ``project_key``.
.. note::
- This performs one full ``store.load()`` per session in the listing
- to derive summaries. On remote backends with many or large sessions
- this can be expensive (e.g., S3 egress, Postgres large-row reads).
- Consider denormalizing summary metadata into your adapter's
- ``list_sessions()`` index.
+ If the store implements ``list_session_summaries``, this is one batch
+ summary call plus one cheap ``list_sessions()`` enumeration to
+ gap-fill sessions missing a sidecar or whose sidecar is stale
+ (``summary.mtime < list_sessions.mtime``) — zero per-session
+ ``load()`` calls when sidecars are complete and fresh. Otherwise
+ falls back to one ``store.load()`` per session (bounded at 16
+ concurrent), which on remote backends with many or large sessions
+ can be expensive (e.g., S3 egress, Postgres large-row reads).
+
+ Gap-fill requires ``list_sessions``: if the store implements
+ ``list_session_summaries`` but not ``list_sessions``, sessions
+ without a sidecar cannot be discovered and will be absent from the
+ result.
"""
- if not _store_implements(session_store, "list_sessions"):
- raise ValueError(
- "session_store does not implement list_sessions() -- cannot list "
- "sessions. Provide a store with a list_sessions() method."
- )
project_path = _canonicalize_path(str(directory) if directory is not None else ".")
project_key = _sanitize_path(project_path)
- raw = await session_store.list_sessions(project_key)
- # Copy — store.list_sessions() may return a reference to internal state.
- listing = list(raw)
-
- # Derive a real summary per session by loading its entries and reusing
- # the filesystem path's lite-parse. Loads run concurrently with a fixed
- # bound so large listings don't exhaust adapter connection pools or hit
- # backend rate limits; adapter errors degrade that row instead of failing
- # the whole list. Filtering (sidechain/empty drop) happens before
- # pagination so ``limit``/``offset`` index the same filtered set as the
- # disk path.
- sem = asyncio.Semaphore(_STORE_LIST_LOAD_CONCURRENCY)
+ has_list_sessions = _store_implements(session_store, "list_sessions")
- async def _bounded_load(sid: str) -> str | None:
- async with sem:
- return await _load_store_entries_as_jsonl(session_store, sid, directory)
+ # Fast path: if the store maintains incremental summaries, fetch them in
+ # one call instead of N per-session load()s.
+ if _store_implements(session_store, "list_session_summaries"):
+ from .session_summary import summary_entry_to_sdk_info
- settled = await asyncio.gather(
- *(_bounded_load(e["session_id"]) for e in listing),
- return_exceptions=True,
- )
- results: list[SDKSessionInfo] = []
- for entry, outcome in zip(listing, settled, strict=True):
- sid = entry["session_id"]
- mtime = entry["mtime"]
- if isinstance(outcome, BaseException):
- results.append(
- SDKSessionInfo(session_id=sid, summary="", last_modified=mtime)
- )
- continue
- if outcome is None:
- continue
- parsed = _parse_session_info_from_lite(
- sid, _jsonl_to_lite(outcome, mtime), project_path
+ try:
+ summaries = await session_store.list_session_summaries(project_key)
+ except NotImplementedError:
+ pass
+ else:
+ # Build a unified slot list. Fresh summaries (mtime >= the
+ # session's current mtime from list_sessions) get their info up
+ # front; sessions present in list_sessions() but missing OR with a
+ # stale sidecar (summary.mtime < known mtime) get a placeholder
+ # slot routed through the same gap-fill path so the fold is
+ # recomputed from source entries.
+ # Summary-backed sidechain/empty sessions are dropped here (free —
+ # already determined) so they don't consume offset/limit positions,
+ # matching the disk and slow-path filter-then-paginate semantics.
+ if has_list_sessions:
+ listing = list(await session_store.list_sessions(project_key))
+ known_mtimes = {e["session_id"]: e["mtime"] for e in listing}
+ else:
+ listing = []
+ known_mtimes = {}
+ logger.debug(
+ "list_session_summaries without list_sessions: gap-fill "
+ "skipped; sessions lacking a sidecar will be omitted"
+ )
+
+ slots: list[dict[str, Any]] = []
+ fresh_summary_ids: set[str] = set()
+ for s in summaries:
+ sid = s["session_id"]
+ if has_list_sessions:
+ known = known_mtimes.get(sid)
+ if known is None:
+ # Summary for a session list_sessions() no longer
+ # reports — drop it.
+ continue
+ if s["mtime"] < known:
+ # Stale sidecar — let gap-fill re-fold from source.
+ continue
+ info = summary_entry_to_sdk_info(s, project_path)
+ if info is None:
+ fresh_summary_ids.add(sid)
+ continue
+ slots.append({"mtime": s["mtime"], "info": info})
+ fresh_summary_ids.add(sid)
+ if has_list_sessions:
+ slots.extend(
+ {"mtime": e["mtime"], "session_id": e["session_id"], "info": None}
+ for e in listing
+ if e["session_id"] not in fresh_summary_ids
+ )
+
+ # Paginate BEFORE per-session load so gap-fill load() count is
+ # bounded by page size, not total missing — 500 sessions lacking
+ # sidecars with limit=10 issues at most 10 load()s, not 500.
+ slots.sort(key=lambda sl: sl["mtime"], reverse=True)
+ # Mirror _apply_sort_limit_offset's guards so negative/zero
+ # offset and non-positive limit behave identically to the slow
+ # and disk paths.
+ page = slots[offset:] if offset > 0 else slots
+ if limit is not None and limit > 0:
+ page = page[:limit]
+
+ to_fill = [sl for sl in page if sl["info"] is None]
+ if to_fill:
+ filled = await _derive_infos_via_load(
+ session_store, to_fill, directory, project_path
+ )
+ by_sid = {f.session_id: f for f in filled}
+ for sl in to_fill:
+ sl["info"] = by_sid.get(sl["session_id"])
+
+ # Gap-fill placeholders that resolved to None (sidechain / no
+ # extractable summary after load) are dropped here, AFTER
+ # pagination — that case alone can short-page. Summary-backed
+ # slots were already pre-filtered above, so a store with complete
+ # and fresh sidecars never short-pages; a present-but-stale
+ # sidecar is routed through gap-fill (same as a missing one) and
+ # can short-page if load() yields no extractable summary.
+ return [sl["info"] for sl in page if sl["info"] is not None]
+
+ if not has_list_sessions:
+ raise ValueError(
+ "session_store implements neither list_session_summaries() nor "
+ "list_sessions() -- cannot list sessions. Provide a store with at "
+ "least one of those methods."
)
- if parsed is None:
- # Sidechain or no extractable summary — drop, matching the
- # filesystem path.
- continue
- parsed.last_modified = mtime
- results.append(parsed)
+ # Copy — store.list_sessions() may return a reference to internal state.
+ listing = list(await session_store.list_sessions(project_key))
+ # Derive a real summary per session by loading its entries and reusing
+ # the filesystem path's lite-parse. Filtering (sidechain/empty drop)
+ # happens before pagination so ``limit``/``offset`` index the same
+ # filtered set as the disk path.
+ results = await _derive_infos_via_load(
+ session_store, listing, directory, project_path
+ )
return _apply_sort_limit_offset(results, limit, offset)
diff --git a/src/claude_agent_sdk/testing/session_store_conformance.py b/src/claude_agent_sdk/testing/session_store_conformance.py
index c5a9657f..2b7f8a7f 100644
--- a/src/claude_agent_sdk/testing/session_store_conformance.py
+++ b/src/claude_agent_sdk/testing/session_store_conformance.py
@@ -1,9 +1,10 @@
"""Shared conformance test suite for :class:`SessionStore` adapters.
Call :func:`run_session_store_conformance` from an async test to assert the
-13 behavioral contracts every adapter must satisfy. Tests for optional
-methods (``list_sessions``, ``delete``, ``list_subkeys``) are skipped when
-named in ``skip_optional`` or when the store does not override that method.
+14 behavioral contracts every adapter must satisfy. Tests for optional
+methods (``list_sessions``, ``list_session_summaries``, ``delete``,
+``list_subkeys``) are skipped when named in ``skip_optional`` or when the
+store does not override that method.
Example::
@@ -24,9 +25,11 @@ async def test_my_store_conformance():
from ..types import SessionKey, SessionStore
-OptionalMethod = str # "list_sessions" | "delete" | "list_subkeys"
+OptionalMethod = (
+ str # "list_sessions" | "list_session_summaries" | "delete" | "list_subkeys"
+)
_OPTIONAL_METHODS: frozenset[str] = frozenset(
- {"list_sessions", "delete", "list_subkeys"}
+ {"list_sessions", "list_session_summaries", "delete", "list_subkeys"}
)
_KEY: SessionKey = {"project_key": "proj", "session_id": "sess"}
@@ -53,12 +56,12 @@ async def run_session_store_conformance(
*,
skip_optional: frozenset[str] = frozenset(),
) -> None:
- """Assert the 13 :class:`SessionStore` behavioral contracts.
+ """Assert the 14 :class:`SessionStore` behavioral contracts.
``make_store`` is invoked once per contract to provide isolation. It may be
sync or async. Contracts for optional methods (``list_sessions``,
- ``delete``, ``list_subkeys``) are skipped when named in ``skip_optional``
- or when the store does not override that method.
+ ``list_session_summaries``, ``delete``, ``list_subkeys``) are skipped when
+ named in ``skip_optional`` or when the store does not override that method.
"""
invalid = skip_optional - _OPTIONAL_METHODS
assert not invalid, f"unknown optional methods in skip_optional: {invalid}"
@@ -71,6 +74,7 @@ async def fresh() -> SessionStore:
probe = await fresh()
has_list_sessions = _has_optional(probe, "list_sessions", skip_optional)
+ has_list_summaries = _has_optional(probe, "list_session_summaries", skip_optional)
has_delete = _has_optional(probe, "delete", skip_optional)
has_list_subkeys = _has_optional(probe, "list_subkeys", skip_optional)
@@ -160,6 +164,72 @@ async def fresh() -> SessionStore:
sessions = await store.list_sessions("proj")
assert [s["session_id"] for s in sessions] == ["main"]
+ # --- Optional: list_session_summaries ----------------------------------
+
+ if has_list_summaries:
+ # 14. list_session_summaries returns persisted fold output that
+ # round-trips through fold_session_summary again. Stores must NOT
+ # interpret ``data`` — only persist it verbatim.
+ from .._internal.session_summary import fold_session_summary
+
+ store = await fresh()
+ key: SessionKey = {"project_key": "proj", "session_id": "summ-sess"}
+ await store.append(
+ key,
+ [
+ _e({"timestamp": "2024-01-01T00:00:00.000Z", "customTitle": "first"}),
+ _e({"timestamp": "2024-01-01T00:00:01.000Z"}),
+ ],
+ )
+ await store.append(
+ key,
+ [_e({"timestamp": "2024-01-01T00:00:02.000Z", "customTitle": "second"})],
+ )
+ await store.append(
+ {"project_key": "other", "session_id": "elsewhere"},
+ [_e({"timestamp": "2024-01-01T00:00:00.000Z"})],
+ )
+ summaries = await store.list_session_summaries("proj")
+ by_id = {s["session_id"]: s for s in summaries}
+ assert set(by_id) == {"summ-sess"}
+ summ = by_id["summ-sess"]
+ # mtime must be epoch-ms; >1e12 rules out epoch-seconds.
+ assert math.isfinite(summ["mtime"]) and summ["mtime"] > 1e12
+ # Clock alignment: sidecar mtime is storage write time (adapter-
+ # stamped at persist), and must share a clock with
+ # list_sessions().mtime for the same session. Adapters that derive
+ # sidecar mtime from entry ISO timestamps would report a strictly
+ # older value than list_sessions()'s storage-time mtime and make
+ # every sidecar look stale to the fast-path freshness check in
+ # list_sessions_from_store(); this assertion catches that.
+ if has_list_sessions:
+ ls_by_id = {
+ e["session_id"]: e["mtime"] for e in await store.list_sessions("proj")
+ }
+ assert summ["mtime"] >= ls_by_id["summ-sess"]
+ # data is opaque; the contract is that it round-trips into the fold.
+ assert isinstance(summ["data"], dict)
+ refolded = fold_session_summary(
+ summ, key, [_e({"timestamp": "2024-01-01T00:00:03.000Z"})]
+ )
+ assert refolded["session_id"] == "summ-sess"
+ # The fold preserves prev["mtime"] verbatim — mtime is stamped by
+ # the adapter after persisting, not by the fold.
+ assert refolded["mtime"] == summ["mtime"]
+ # Subagent appends must NOT affect the main session's summary.
+ await store.append(
+ {**key, "subpath": "subagents/agent-1"},
+ [_e({"timestamp": "2024-01-01T00:00:09.000Z", "customTitle": "subagent"})],
+ )
+ after_sub = {
+ s["session_id"]: s for s in await store.list_session_summaries("proj")
+ }
+ assert after_sub["summ-sess"]["data"] == summ["data"]
+ assert await store.list_session_summaries("never-appended-project") == []
+ if has_delete:
+ await store.delete(key)
+ assert await store.list_session_summaries("proj") == []
+
# --- Optional: delete --------------------------------------------------
if has_delete:
diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py
index 279bd0d2..26e25b89 100644
--- a/src/claude_agent_sdk/types.py
+++ b/src/claude_agent_sdk/types.py
@@ -1159,6 +1159,33 @@ class SessionStoreListEntry(TypedDict):
modification time (e.g. Redis) must maintain their own index."""
+class SessionSummaryEntry(TypedDict):
+ """Incrementally-maintained session summary.
+
+ Stores obtain this from :func:`fold_session_summary` inside
+ :meth:`SessionStore.append` and persist it verbatim; they return the
+ full set from :meth:`SessionStore.list_session_summaries`. The ``data``
+ field is opaque SDK-owned state — stores MUST NOT interpret it.
+ """
+
+ session_id: str
+ mtime: int
+ """Storage write time of the sidecar, in Unix epoch milliseconds. Must use
+ the same clock source as the ``mtime`` returned by
+ :meth:`SessionStore.list_sessions` for this session — typically file
+ mtime, S3 ``LastModified``, Postgres ``updated_at``, or whatever native
+ timestamp the adapter surfaces. Do NOT derive this from entry ISO
+ timestamps: adapters that write in batches with any persist latency
+ (every real backend) would report storage times strictly later than the
+ last entry's timestamp, making every sidecar appear stale and defeating
+ the fast-path staleness check in ``list_sessions_from_store``.
+ :func:`fold_session_summary` preserves whatever ``mtime`` the caller
+ passes in via ``prev`` and does not set it itself; stamp it after
+ persisting."""
+ data: dict[str, Any]
+ """Opaque SDK-owned summary state. Persist verbatim; do not interpret."""
+
+
class SessionListSubkeysKey(TypedDict):
"""Key argument to :meth:`SessionStore.list_subkeys` (no ``subpath``)."""
@@ -1233,6 +1260,30 @@ async def list_sessions(self, project_key: str) -> list[SessionStoreListEntry]:
"""
raise NotImplementedError
+ async def list_session_summaries(
+ self, project_key: str
+ ) -> list[SessionSummaryEntry]:
+ """Return incrementally-maintained summaries for all sessions in one call.
+
+ Stores should maintain these via :func:`fold_session_summary` inside
+ :meth:`append`. Skip the fold for keys with a ``subpath`` — subagent
+ transcripts must not contribute to the main session's summary.
+
+ Like :meth:`list_sessions`, results are scoped to a single
+ ``project_key`` and exclude ``subpath`` entries.
+
+ Optional — if unimplemented, ``list_sessions_from_store()`` falls back
+ to ``list_sessions()`` + per-session ``load()``.
+
+ .. note::
+ Stores that maintain summaries inside ``append()`` MUST serialize
+ sidecar writes if ``append()`` calls can race for the same session
+ — e.g., wrap the read-fold-write in a transaction/CAS, or hold a
+ per-session lock. The SDK's :func:`fold_session_summary` is pure;
+ concurrency control is the store's responsibility.
+ """
+ raise NotImplementedError
+
async def delete(self, key: SessionKey) -> None:
"""Delete a session.
diff --git a/tests/test_session_helpers_store.py b/tests/test_session_helpers_store.py
index 5854a5f1..0e2859bf 100644
--- a/tests/test_session_helpers_store.py
+++ b/tests/test_session_helpers_store.py
@@ -126,7 +126,7 @@ async def test_limit_and_offset(self) -> None:
async def test_raises_when_store_lacks_list_sessions(self) -> None:
store = _MinimalStore()
- with pytest.raises(ValueError, match="does not implement list_sessions"):
+ with pytest.raises(ValueError, match="list_sessions"):
await list_sessions_from_store(store, directory=DIR)
async def test_drops_sidechain_sessions(self) -> None:
@@ -157,8 +157,20 @@ async def test_limit_offset_applied_after_sidechain_filter(self) -> None:
first and ``_apply_sort_limit_offset`` paginates the filtered set, so
``limit=N`` returns N rows even when sidechains exist. The store path
must do the same — paginating before filtering would return short
- pages and let sidechains consume page slots."""
- store = InMemorySessionStore()
+ pages and let sidechains consume page slots.
+
+ Pinned to the slow path (``list_session_summaries`` suppressed)
+ because the fast path deliberately locks in paginate-THEN-drop for
+ sidechain-shaped summary slots (see
+ ``test_sidechain_summary_short_pages``); slow-path filter-THEN-
+ paginate is what this test covers.
+ """
+
+ class SlowPathStore(InMemorySessionStore):
+ async def list_session_summaries(self, project_key): # type: ignore[override]
+ raise NotImplementedError
+
+ store = SlowPathStore()
valid_sids: list[str] = []
for _ in range(5):
sid = str(uuid_mod.uuid4())
@@ -210,6 +222,10 @@ async def test_adapter_load_error_degrades_row(self) -> None:
"""One failing load() degrades that row instead of failing the list."""
class FlakeyStore(InMemorySessionStore):
+ # Force the per-session load() fallback path under test.
+ async def list_session_summaries(self, project_key):
+ raise NotImplementedError
+
async def load(self, key):
if key["session_id"] == bad_sid:
raise RuntimeError("backend down")
@@ -238,6 +254,10 @@ async def test_load_concurrency_is_bounded(self) -> None:
gate = asyncio.Event()
class SlowStore(InMemorySessionStore):
+ # Force the per-session load() fallback path under test.
+ async def list_session_summaries(self, project_key):
+ raise NotImplementedError
+
async def load(self, key):
nonlocal in_flight, peak
in_flight += 1
diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py
new file mode 100644
index 00000000..603014b8
--- /dev/null
+++ b/tests/test_session_summary.py
@@ -0,0 +1,884 @@
+"""Tests for incremental session-summary derivation.
+
+Covers ``fold_session_summary``, ``summary_entry_to_sdk_info``,
+``InMemorySessionStore.list_session_summaries``, and the
+``list_sessions_from_store`` fast path that consumes them.
+"""
+
+from __future__ import annotations
+
+import uuid as uuid_mod
+from typing import Any
+
+import pytest
+
+from claude_agent_sdk import (
+ InMemorySessionStore,
+ SessionSummaryEntry,
+ fold_session_summary,
+ list_sessions_from_store,
+ project_key_for_directory,
+)
+from claude_agent_sdk._internal.session_summary import summary_entry_to_sdk_info
+from claude_agent_sdk._internal.sessions import (
+ _entries_to_jsonl,
+ _jsonl_to_lite,
+ _parse_session_info_from_lite,
+)
+from claude_agent_sdk.types import SessionKey
+
+DIR = "/workspace/project"
+PROJECT_KEY = project_key_for_directory(DIR)
+KEY: SessionKey = {
+ "project_key": PROJECT_KEY,
+ "session_id": "11111111-1111-4111-8111-111111111111",
+}
+
+
+def _user(
+ text: str | list[dict[str, Any]], ts: str = "2024-01-01T00:00:00.000Z", **extra: Any
+) -> dict[str, Any]:
+ return {
+ "type": "user",
+ "timestamp": ts,
+ "message": {"role": "user", "content": text},
+ **extra,
+ }
+
+
+# ---------------------------------------------------------------------------
+# fold_session_summary unit tests
+# ---------------------------------------------------------------------------
+
+
+class TestFoldSessionSummary:
+ def test_init_from_none(self) -> None:
+ s = fold_session_summary(None, KEY, [])
+ assert s == {"session_id": KEY["session_id"], "mtime": 0, "data": {}}
+
+ def test_set_once_fields_freeze(self) -> None:
+ s = fold_session_summary(
+ None,
+ KEY,
+ [
+ {
+ "type": "x",
+ "timestamp": "2024-01-01T00:00:00.000Z",
+ "cwd": "/a",
+ "isSidechain": False,
+ },
+ {"type": "x", "timestamp": "2024-01-01T00:00:05.000Z", "cwd": "/b"},
+ ],
+ )
+ assert s["data"]["created_at"] == 1704067200000
+ assert s["data"]["cwd"] == "/a"
+ assert s["data"]["is_sidechain"] is False
+ # Second append must not overwrite set-once fields.
+ s2 = fold_session_summary(
+ s,
+ KEY,
+ [
+ {
+ "type": "x",
+ "timestamp": "2024-01-02T00:00:00.000Z",
+ "cwd": "/c",
+ "isSidechain": True,
+ }
+ ],
+ )
+ assert s2["data"]["created_at"] == 1704067200000
+ assert s2["data"]["cwd"] == "/a"
+ assert s2["data"]["is_sidechain"] is False
+
+ def test_last_wins_overwrite(self) -> None:
+ s = fold_session_summary(
+ None,
+ KEY,
+ [
+ {
+ "type": "x",
+ "timestamp": "2024-01-01T00:00:00Z",
+ "customTitle": "t1",
+ "gitBranch": "main",
+ },
+ {"type": "x", "timestamp": "2024-01-01T00:00:01Z", "customTitle": "t2"},
+ ],
+ )
+ assert s["data"]["custom_title"] == "t2"
+ assert s["data"]["git_branch"] == "main"
+ s2 = fold_session_summary(
+ s,
+ KEY,
+ [
+ {
+ "type": "x",
+ "aiTitle": "ai",
+ "lastPrompt": "lp",
+ "summary": "sm",
+ "gitBranch": "dev",
+ }
+ ],
+ )
+ assert s2["data"]["custom_title"] == "t2"
+ assert s2["data"]["ai_title"] == "ai"
+ assert s2["data"]["last_prompt"] == "lp"
+ assert s2["data"]["summary_hint"] == "sm"
+ assert s2["data"]["git_branch"] == "dev"
+
+ def test_mtime_not_derived_from_entries(self) -> None:
+ """The fold must not touch mtime — it is the sidecar's storage write
+ time, stamped by the adapter at persist time, on the same clock as
+ list_sessions().mtime. Deriving it from entry ISO timestamps would
+ make every batched-write sidecar appear strictly older than the
+ session's current mtime, defeating the fast-path staleness check."""
+ s = fold_session_summary(
+ None,
+ KEY,
+ [
+ {"type": "x", "timestamp": "2024-01-01T00:00:05.000Z"},
+ {"type": "x", "timestamp": "2024-01-01T00:00:01.000Z"},
+ ],
+ )
+ # New session: fold returns mtime=0 placeholder, adapter must stamp.
+ assert s["mtime"] == 0
+
+ # Carry-over: prev mtime is preserved verbatim regardless of entry
+ # timestamps in the new batch.
+ prev: SessionSummaryEntry = {
+ "session_id": KEY["session_id"],
+ "mtime": 42,
+ "data": {},
+ }
+ s2 = fold_session_summary(
+ prev, KEY, [{"type": "x", "timestamp": "2024-01-01T00:00:10.000Z"}]
+ )
+ assert s2["mtime"] == 42
+
+ def test_tag_set_and_clear(self) -> None:
+ s = fold_session_summary(None, KEY, [{"type": "tag", "tag": "wip"}])
+ assert s["data"]["tag"] == "wip"
+ s2 = fold_session_summary(s, KEY, [{"type": "tag", "tag": ""}])
+ assert "tag" not in s2["data"]
+ # Non-tag entries with a "tag" key (e.g. tool_use input) are ignored.
+ s3 = fold_session_summary(s, KEY, [{"type": "user", "tag": "ignored"}])
+ assert s3["data"]["tag"] == "wip"
+
+ def test_sidechain_from_first_entry(self) -> None:
+ s = fold_session_summary(
+ None,
+ KEY,
+ [{"type": "x", "timestamp": "2024-01-01T00:00:00Z", "isSidechain": True}],
+ )
+ assert s["data"]["is_sidechain"] is True
+
+ def test_sidechain_latched_when_first_entry_lacks_timestamp(self) -> None:
+ """Regression: is_sidechain must latch on entry 0 even if its timestamp
+ is absent/unparseable, so entry 1 cannot overwrite it to False."""
+ s = fold_session_summary(
+ None,
+ KEY,
+ [
+ {"type": "user", "isSidechain": True},
+ {"type": "x", "timestamp": "2024-01-01T00:00:00Z"},
+ ],
+ )
+ assert s["data"]["is_sidechain"] is True
+ # created_at still picks up the first parseable timestamp.
+ assert s["data"]["created_at"] == 1704067200000
+
+ def test_first_prompt_skips_meta_tool_result_and_compact(self) -> None:
+ s = fold_session_summary(
+ None,
+ KEY,
+ [
+ _user("ignored meta", isMeta=True),
+ _user("ignored compact", isCompactSummary=True),
+ _user([{"type": "tool_result", "tool_use_id": "x", "content": "res"}]),
+ _user("real first"),
+ _user("not me"),
+ ],
+ )
+ assert s["data"]["first_prompt"] == "real first"
+ assert s["data"]["first_prompt_locked"] is True
+
+ def test_first_prompt_command_fallback(self) -> None:
+ s = fold_session_summary(
+ None,
+ KEY,
+ [
+ _user("/init stuff"),
+ _user("/second"),
+ ],
+ )
+ assert s["data"].get("first_prompt_locked") is not True
+ assert s["data"]["command_fallback"] == "/init"
+ # A later real prompt locks it.
+ s2 = fold_session_summary(s, KEY, [_user("now real")])
+ assert s2["data"]["first_prompt"] == "now real"
+ assert s2["data"]["first_prompt_locked"] is True
+
+ def test_first_prompt_skip_pattern(self) -> None:
+ s = fold_session_summary(
+ None,
+ KEY,
+ [_user(" some output"), _user("hello")],
+ )
+ assert s["data"]["first_prompt"] == "hello"
+
+ def test_first_prompt_truncated(self) -> None:
+ s = fold_session_summary(None, KEY, [_user("x" * 300)])
+ assert len(s["data"]["first_prompt"]) <= 201
+ assert s["data"]["first_prompt"].endswith("\u2026")
+
+ def test_prev_is_not_mutated(self) -> None:
+ prev: SessionSummaryEntry = {"session_id": "a", "mtime": 5, "data": {}}
+ fold_session_summary(prev, KEY, [{"type": "x", "customTitle": "t"}])
+ assert prev == {"session_id": "a", "mtime": 5, "data": {}}
+
+
+# ---------------------------------------------------------------------------
+# summary_entry_to_sdk_info
+# ---------------------------------------------------------------------------
+
+
+class TestSummaryEntryToSdkInfo:
+ def test_sidechain_returns_none(self) -> None:
+ assert (
+ summary_entry_to_sdk_info(
+ {
+ "session_id": "s",
+ "mtime": 1,
+ "data": {"is_sidechain": True, "custom_title": "t"},
+ },
+ None,
+ )
+ is None
+ )
+
+ def test_empty_summary_returns_none(self) -> None:
+ assert (
+ summary_entry_to_sdk_info({"session_id": "s", "mtime": 1, "data": {}}, None)
+ is None
+ )
+
+ def test_precedence_chain(self) -> None:
+ data: dict[str, Any] = {
+ "first_prompt": "fp",
+ "first_prompt_locked": True,
+ "command_fallback": "/cmd",
+ "summary_hint": "sh",
+ "last_prompt": "lp",
+ "ai_title": "ai",
+ "custom_title": "ct",
+ }
+ base: SessionSummaryEntry = {"session_id": "s", "mtime": 1, "data": data}
+ info = summary_entry_to_sdk_info(base, None)
+ assert info is not None and info.summary == "ct" and info.custom_title == "ct"
+
+ del data["custom_title"]
+ info = summary_entry_to_sdk_info(base, None)
+ assert info is not None and info.summary == "ai" and info.custom_title == "ai"
+
+ del data["ai_title"]
+ info = summary_entry_to_sdk_info(base, None)
+ assert info is not None and info.summary == "lp" and info.custom_title is None
+
+ del data["last_prompt"]
+ info = summary_entry_to_sdk_info(base, None)
+ assert info is not None and info.summary == "sh"
+
+ del data["summary_hint"]
+ info = summary_entry_to_sdk_info(base, None)
+ assert info is not None and info.summary == "fp" and info.first_prompt == "fp"
+
+ data["first_prompt_locked"] = False
+ info = summary_entry_to_sdk_info(base, None)
+ assert (
+ info is not None and info.summary == "/cmd" and info.first_prompt == "/cmd"
+ )
+
+ def test_cwd_fallback_to_project_path(self) -> None:
+ info = summary_entry_to_sdk_info(
+ {"session_id": "s", "mtime": 1, "data": {"custom_title": "t"}}, "/proj"
+ )
+ assert info is not None and info.cwd == "/proj"
+ info2 = summary_entry_to_sdk_info(
+ {
+ "session_id": "s",
+ "mtime": 1,
+ "data": {"custom_title": "t", "cwd": "/own"},
+ },
+ "/proj",
+ )
+ assert info2 is not None and info2.cwd == "/own"
+
+ def test_field_passthrough(self) -> None:
+ info = summary_entry_to_sdk_info(
+ {
+ "session_id": "s",
+ "mtime": 99,
+ "data": {
+ "custom_title": "t",
+ "git_branch": "main",
+ "tag": "wip",
+ "created_at": 50,
+ },
+ },
+ None,
+ )
+ assert info is not None
+ assert info.session_id == "s"
+ assert info.last_modified == 99
+ assert info.git_branch == "main"
+ assert info.tag == "wip"
+ assert info.created_at == 50
+ # file_size is local-JSONL-only; store-backed summaries always None.
+ assert info.file_size is None
+
+
+# ---------------------------------------------------------------------------
+# InMemorySessionStore.list_session_summaries
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+class TestInMemoryListSessionSummaries:
+ async def test_tracks_appends(self) -> None:
+ store = InMemorySessionStore()
+ a: SessionKey = {"project_key": PROJECT_KEY, "session_id": "a"}
+ b: SessionKey = {"project_key": PROJECT_KEY, "session_id": "b"}
+ await store.append(a, [_user("hello a", ts="2024-01-01T00:00:00Z")])
+ await store.append(a, [{"type": "x", "customTitle": "Title A"}])
+ await store.append(b, [_user("hello b", ts="2024-01-02T00:00:00Z")])
+ summaries = {
+ s["session_id"]: s for s in await store.list_session_summaries(PROJECT_KEY)
+ }
+ assert set(summaries) == {"a", "b"}
+ assert summaries["a"]["data"]["custom_title"] == "Title A"
+ assert summaries["a"]["data"]["first_prompt"] == "hello a"
+ assert summaries["b"]["data"]["first_prompt"] == "hello b"
+
+ async def test_subpath_appends_ignored(self) -> None:
+ store = InMemorySessionStore()
+ main: SessionKey = {"project_key": PROJECT_KEY, "session_id": "m"}
+ sub: SessionKey = {
+ "project_key": PROJECT_KEY,
+ "session_id": "m",
+ "subpath": "subagents/agent-1",
+ }
+ await store.append(main, [_user("main prompt")])
+ await store.append(
+ sub, [_user("sub prompt"), {"type": "x", "customTitle": "sub"}]
+ )
+ summaries = await store.list_session_summaries(PROJECT_KEY)
+ assert len(summaries) == 1
+ assert summaries[0]["data"]["first_prompt"] == "main prompt"
+ assert "custom_title" not in summaries[0]["data"]
+
+ async def test_delete_drops_summary(self) -> None:
+ store = InMemorySessionStore()
+ k: SessionKey = {"project_key": PROJECT_KEY, "session_id": "x"}
+ await store.append(k, [_user("hi")])
+ assert len(await store.list_session_summaries(PROJECT_KEY)) == 1
+ await store.delete(k)
+ assert await store.list_session_summaries(PROJECT_KEY) == []
+
+ async def test_project_isolation(self) -> None:
+ store = InMemorySessionStore()
+ await store.append({"project_key": "A", "session_id": "s"}, [_user("a")])
+ await store.append({"project_key": "B", "session_id": "s"}, [_user("b")])
+ assert len(await store.list_session_summaries("A")) == 1
+ assert len(await store.list_session_summaries("B")) == 1
+ assert await store.list_session_summaries("C") == []
+
+
+# ---------------------------------------------------------------------------
+# list_sessions_from_store integration — fast path
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+class TestListSessionsFromStoreFastPath:
+ async def test_fast_path_skips_load(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ """With list_session_summaries() available, load() must NOT be called."""
+ store = InMemorySessionStore()
+ sid_a = str(uuid_mod.uuid4())
+ sid_b = str(uuid_mod.uuid4())
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_a},
+ [_user("first a", ts="2024-01-01T00:00:00Z", cwd=DIR)],
+ )
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_b},
+ [_user("first b", ts="2024-01-02T00:00:00Z", cwd=DIR)],
+ )
+
+ async def _boom(self, key): # noqa: ANN001
+ raise AssertionError("load() must not be called on the fast path")
+
+ monkeypatch.setattr(InMemorySessionStore, "load", _boom)
+
+ sessions = await list_sessions_from_store(store, directory=DIR)
+ assert {s.session_id for s in sessions} == {sid_a, sid_b}
+ # Sorted by last_modified descending — sid_b's timestamp is newer.
+ assert sessions[0].session_id == sid_b
+ assert sessions[0].summary == "first b"
+ assert sessions[1].first_prompt == "first a"
+
+ async def test_fast_path_filters_sidechain_and_empty(self) -> None:
+ store = InMemorySessionStore()
+ sid_main = str(uuid_mod.uuid4())
+ sid_side = str(uuid_mod.uuid4())
+ sid_empty = str(uuid_mod.uuid4())
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_main},
+ [_user("hello", ts="2024-01-01T00:00:00Z")],
+ )
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_side},
+ [
+ {
+ "type": "user",
+ "timestamp": "2024-01-01T00:00:00Z",
+ "isSidechain": True,
+ "message": {"content": "x"},
+ }
+ ],
+ )
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_empty},
+ [{"type": "x", "timestamp": "2024-01-01T00:00:00Z"}],
+ )
+ sessions = await list_sessions_from_store(store, directory=DIR)
+ assert {s.session_id for s in sessions} == {sid_main}
+
+ async def test_fast_path_limit_offset(self) -> None:
+ store = InMemorySessionStore()
+ sids = [str(uuid_mod.uuid4()) for _ in range(5)]
+ for i, sid in enumerate(sids):
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid},
+ [_user(f"p{i}", ts=f"2024-01-0{i + 1}T00:00:00Z")],
+ )
+ page = await list_sessions_from_store(store, directory=DIR, limit=2, offset=1)
+ assert len(page) == 2
+ assert page[0].session_id == sids[3]
+ assert page[1].session_id == sids[2]
+
+ async def test_not_implemented_falls_back_to_load(self) -> None:
+ """A store that overrides list_session_summaries but raises
+ NotImplementedError must fall back to the per-session load() path."""
+
+ class FallbackStore(InMemorySessionStore):
+ async def list_session_summaries(self, project_key: str): # noqa: ANN201
+ raise NotImplementedError
+
+ store = FallbackStore()
+ sid = str(uuid_mod.uuid4())
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid},
+ [_user("hi", ts="2024-01-01T00:00:00Z")],
+ )
+ sessions = await list_sessions_from_store(store, directory=DIR)
+ assert len(sessions) == 1
+ assert sessions[0].summary == "hi"
+
+ async def test_mixed_sessions_gap_filled(self) -> None:
+ """A store with summaries for only SOME sessions (e.g. adopted the
+ method mid-stream) must have the rest gap-filled via per-session
+ load() so old sessions aren't silently dropped."""
+ sid_with = str(uuid_mod.uuid4())
+ sid_without = str(uuid_mod.uuid4())
+
+ class PartialStore(InMemorySessionStore):
+ load_calls: list[str] = []
+
+ async def list_session_summaries(self, project_key: str): # noqa: ANN201
+ full = await super().list_session_summaries(project_key)
+ return [s for s in full if s["session_id"] == sid_with]
+
+ async def load(self, key): # noqa: ANN001, ANN201
+ self.load_calls.append(key["session_id"])
+ return await super().load(key)
+
+ store = PartialStore()
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_with},
+ [_user("has sidecar", ts="2024-01-02T00:00:00Z")],
+ )
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_without},
+ [_user("no sidecar", ts="2024-01-01T00:00:00Z")],
+ )
+
+ sessions = await list_sessions_from_store(store, directory=DIR)
+ by_id = {s.session_id: s for s in sessions}
+ assert set(by_id) == {sid_with, sid_without}
+ assert by_id[sid_with].summary == "has sidecar"
+ assert by_id[sid_without].summary == "no sidecar"
+ # Only the missing session should have been load()ed.
+ assert store.load_calls == [sid_without]
+ # Merged result sorts on a single clock (storage write time). In
+ # InMemorySessionStore, that's a strictly monotonic counter, so
+ # sid_without (appended second) sorts newest.
+ assert sessions[0].session_id == sid_without
+
+ async def test_gap_fill_load_bounded_by_limit(self) -> None:
+ """Gap-fill paginates BEFORE per-session load(), so load() count is
+ bounded by page size, not total missing."""
+
+ class CountingStore(InMemorySessionStore):
+ def __init__(self) -> None:
+ super().__init__()
+ self.load_calls: list[str] = []
+
+ async def list_session_summaries(self, project_key: str): # noqa: ANN201
+ full = await super().list_session_summaries(project_key)
+ return [s for s in full if s["session_id"] == sid_with]
+
+ async def load(self, key): # noqa: ANN001, ANN201
+ self.load_calls.append(key["session_id"])
+ return await super().load(key)
+
+ store = CountingStore()
+ sid_with = str(uuid_mod.uuid4())
+ # 5 sessions without sidecars. InMemorySessionStore stamps storage
+ # mtime strictly monotonically per append, so these first 5 appends
+ # are all older than sid_with below.
+ sids_without = [str(uuid_mod.uuid4()) for _ in range(5)]
+ for i, sid in enumerate(sids_without):
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid},
+ [_user(f"without {i}", ts=f"2024-01-0{i + 1}T00:00:00Z")],
+ )
+ # Append sid_with last so storage mtime makes it the newest session.
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_with},
+ [_user("with", ts="2024-01-10T00:00:00Z")],
+ )
+
+ page = await list_sessions_from_store(store, directory=DIR, limit=2)
+ # Page = newest 2: sid_with (sidecar) + newest 1 missing.
+ assert len(page) == 2
+ assert page[0].session_id == sid_with
+ # load() bounded by page size (≤2), not total missing (5).
+ assert len(store.load_calls) <= 2
+ # Specifically, only the one placeholder in the page was loaded.
+ assert len(store.load_calls) == 1
+
+ async def test_sidechain_summary_does_not_consume_page_slot(self) -> None:
+ """Summary-backed sidechain/empty sessions are dropped BEFORE
+ pagination (free — already determined from the sidecar) so they don't
+ consume offset/limit positions. Matches the disk and slow-path
+ filter-then-paginate semantics. Only gap-fill placeholders that
+ resolve to None after load can short-page."""
+ store = InMemorySessionStore()
+ sids = [str(uuid_mod.uuid4()) for _ in range(3)]
+ # Append order determines storage mtime (InMemorySessionStore
+ # monotonic counter). We append the two real sessions first and the
+ # sidechain LAST so the sidechain is the newest-by-storage-mtime.
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sids[2]},
+ [_user("real 2", ts="2024-01-01T00:00:00Z")],
+ )
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sids[1]},
+ [_user("real 1", ts="2024-01-02T00:00:00Z")],
+ )
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sids[0]},
+ [
+ {
+ "type": "user",
+ "timestamp": "2024-01-03T00:00:00Z",
+ "isSidechain": True,
+ "message": {"content": "x"},
+ }
+ ],
+ )
+
+ page = await list_sessions_from_store(store, directory=DIR, limit=2)
+ # The sidechain summary is pre-filtered, so limit=2 returns both real
+ # sessions — full page, matching the slow path. sids[1] was appended
+ # after sids[2] and is therefore newer by storage mtime.
+ assert len(page) == 2
+ assert [s.session_id for s in page] == [sids[1], sids[2]]
+
+ async def test_stale_sidecar_triggers_gap_fill(self) -> None:
+ """A sidecar whose mtime lags the session's current mtime from
+ list_sessions must be treated as missing: route it through gap-fill so
+ the SDK re-folds from source entries and the result reflects fresh
+ transcript state, not the stale sidecar values."""
+ sid = str(uuid_mod.uuid4())
+ stale_mtime = 1_704_067_260_000 # 2024-01-01T00:01:00Z
+ fresh_mtime = 1_704_153_660_000 # 2024-01-02T00:01:00Z
+
+ class StaleSidecarStore(InMemorySessionStore):
+ """Reports fresh transcript state but a stale summary sidecar."""
+
+ def __init__(self) -> None:
+ super().__init__()
+ self.load_calls: list[str] = []
+
+ async def list_session_summaries(self, project_key): # noqa: ANN001, ANN201
+ # Serve a stale summary reflecting T1 state.
+ if project_key != PROJECT_KEY:
+ return []
+ return [
+ {
+ "session_id": sid,
+ "mtime": stale_mtime,
+ "data": {
+ "custom_title": "old",
+ "first_prompt": "old prompt",
+ "first_prompt_locked": True,
+ "created_at": stale_mtime,
+ },
+ }
+ ]
+
+ async def load(self, key): # noqa: ANN001, ANN201
+ self.load_calls.append(key["session_id"])
+ return await super().load(key)
+
+ store = StaleSidecarStore()
+ # Populate the real transcript with fresh entries so list_sessions()
+ # reports the fresh mtime and a gap-fill load() yields fresh info.
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid},
+ [
+ _user("fresh prompt", ts="2024-01-02T00:00:00Z"),
+ {
+ "type": "x",
+ "timestamp": "2024-01-02T00:01:00Z",
+ "customTitle": "fresh",
+ },
+ ],
+ )
+ # Sanity-check the setup: list_sessions reports fresh mtime.
+ listed = await InMemorySessionStore.list_sessions(store, PROJECT_KEY)
+ assert listed[0]["mtime"] > stale_mtime
+
+ sessions = await list_sessions_from_store(store, directory=DIR)
+ assert len(sessions) == 1
+ info = sessions[0]
+ assert info.session_id == sid
+ # Fresh transcript state wins — stale customTitle must NOT leak through.
+ assert info.custom_title == "fresh"
+ assert info.summary == "fresh"
+ assert info.last_modified >= fresh_mtime
+ # Stale entry was routed into gap-fill, so load() was called for it.
+ assert store.load_calls == [sid]
+
+ async def test_fresh_sidecar_with_storage_newer_mtime_not_gap_filled(self) -> None:
+ """The sidecar's mtime is storage write time, not entry time. For
+ adapters that use native storage mtime (file mtime, S3 LastModified,
+ Postgres updated_at), every successful batched append records a
+ storage mtime strictly later than the last entry's ISO timestamp
+ (~100ms batch cadence + network latency). The staleness check must
+ NOT flag these fresh sidecars as stale — otherwise every slot routes
+ through gap-fill load() and the fast path's N->1 goal is defeated.
+ """
+ sid = str(uuid_mod.uuid4())
+ # T1: entry ISO timestamp embedded in the transcript.
+ t1 = 1_704_067_200_000 # 2024-01-01T00:00:00Z
+ # T2: storage write time for both list_sessions and sidecar —
+ # strictly later than T1 (batcher + network delay). The point of
+ # this test is that T2 > T1 does NOT by itself mark the sidecar
+ # stale: both list_sessions().mtime and sidecar.mtime come from
+ # the same storage clock (T2), so summary.mtime == known.mtime.
+ t2 = 1_704_067_200_250 # +250ms of persist latency
+
+ class StorageMtimeStore(InMemorySessionStore):
+ """Adapter that stamps both list_sessions and the sidecar with
+ storage-native mtime — strictly later than entry ISO timestamps.
+ """
+
+ def __init__(self) -> None:
+ super().__init__()
+ self.load_calls: list[str] = []
+
+ async def list_sessions(self, project_key: str): # noqa: ANN201
+ # Pretend storage mtime is T2 for every session.
+ full = await super().list_sessions(project_key)
+ return [{"session_id": e["session_id"], "mtime": t2} for e in full]
+
+ async def list_session_summaries(self, project_key: str): # noqa: ANN201
+ # Sidecar mtime is also T2 (same clock).
+ full = await super().list_session_summaries(project_key)
+ return [
+ {"session_id": s["session_id"], "mtime": t2, "data": s["data"]}
+ for s in full
+ ]
+
+ async def load(self, key): # noqa: ANN001, ANN201
+ self.load_calls.append(key["session_id"])
+ return await super().load(key)
+
+ store = StorageMtimeStore()
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid},
+ [
+ _user("fresh prompt", ts="2024-01-01T00:00:00.000Z"),
+ {
+ "type": "x",
+ "timestamp": "2024-01-01T00:00:00.000Z",
+ "customTitle": "fresh",
+ },
+ ],
+ )
+ # Entries carry ISO timestamps at T1; storage records the write at T2.
+ # Sanity-check the adapter reports T2 from both surfaces.
+ listed = await store.list_sessions(PROJECT_KEY)
+ assert listed[0]["mtime"] == t2
+ summ = await store.list_session_summaries(PROJECT_KEY)
+ assert summ[0]["mtime"] == t2
+ # Entry timestamps are at T1, strictly older than T2.
+ assert t2 > t1
+
+ sessions = await list_sessions_from_store(store, directory=DIR)
+ assert len(sessions) == 1
+ info = sessions[0]
+ assert info.session_id == sid
+ assert info.summary == "fresh"
+ assert info.last_modified == t2
+ # Fast path: load() must NOT have been called — summary.mtime ==
+ # known.mtime means not stale, so the slot returns its summary-derived
+ # info directly.
+ assert store.load_calls == []
+
+ async def test_summary_without_listing_is_dropped(self) -> None:
+ """A summary for a session that list_sessions() no longer reports must
+ be dropped from the fast-path result."""
+ sid_real = str(uuid_mod.uuid4())
+ sid_ghost = str(uuid_mod.uuid4())
+
+ class GhostStore(InMemorySessionStore):
+ async def list_sessions(self, project_key: str): # noqa: ANN201
+ full = await super().list_sessions(project_key)
+ return [s for s in full if s["session_id"] != sid_ghost]
+
+ store = GhostStore()
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_real},
+ [_user("real", ts="2024-01-02T00:00:00Z")],
+ )
+ await store.append(
+ {"project_key": PROJECT_KEY, "session_id": sid_ghost},
+ [_user("ghost", ts="2024-01-01T00:00:00Z")],
+ )
+
+ sessions = await list_sessions_from_store(store, directory=DIR)
+ assert {s.session_id for s in sessions} == {sid_real}
+
+ async def test_gap_fill_bounded_concurrency(self) -> None:
+ """Gap-fill reuses the bounded per-session load helper, so
+ ``_STORE_LIST_LOAD_CONCURRENCY`` applies to the missing-session set."""
+ import asyncio
+
+ from claude_agent_sdk._internal import sessions as _sessions
+
+ in_flight = 0
+ peak = 0
+ gate = asyncio.Event()
+
+ class PartialSlowStore(InMemorySessionStore):
+ async def list_session_summaries(self, project_key: str): # noqa: ANN201
+ return [] # everything is "missing"
+
+ async def load(self, key): # noqa: ANN001, ANN201
+ nonlocal in_flight, peak
+ in_flight += 1
+ peak = max(peak, in_flight)
+ await gate.wait()
+ in_flight -= 1
+ return await super().load(key)
+
+ store = PartialSlowStore()
+ n = _sessions._STORE_LIST_LOAD_CONCURRENCY * 2
+ for i in range(n):
+ await InMemorySessionStore.append(
+ store,
+ {"project_key": PROJECT_KEY, "session_id": str(uuid_mod.uuid4())},
+ [_user(f"p{i}")],
+ )
+
+ task = asyncio.create_task(list_sessions_from_store(store, directory=DIR))
+ for _ in range(5):
+ await asyncio.sleep(0)
+ assert 0 < peak <= _sessions._STORE_LIST_LOAD_CONCURRENCY
+ gate.set()
+ await task
+ assert peak == _sessions._STORE_LIST_LOAD_CONCURRENCY
+
+
+# ---------------------------------------------------------------------------
+# Parity: incremental fold == batch lite-parse
+# ---------------------------------------------------------------------------
+
+
+class TestParityWithLiteParse:
+ def test_incremental_equals_batch(self) -> None:
+ """``summary_entry_to_sdk_info(fold(...))`` must equal
+ ``_parse_session_info_from_lite`` on the same entry stream."""
+ sid = "22222222-2222-4222-8222-222222222222"
+ k: SessionKey = {"project_key": PROJECT_KEY, "session_id": sid}
+ entries: list[dict[str, Any]] = [
+ _user(
+ "/clear",
+ ts="2024-01-01T00:00:00.000Z",
+ cwd="/work",
+ gitBranch="main",
+ ),
+ _user("ignored", ts="2024-01-01T00:00:01.000Z", isMeta=True),
+ _user("real prompt here", ts="2024-01-01T00:00:02.000Z"),
+ {
+ "type": "assistant",
+ "timestamp": "2024-01-01T00:00:03.000Z",
+ "message": {"content": [{"type": "text", "text": "ok"}]},
+ },
+ {
+ "type": "x",
+ "timestamp": "2024-01-01T00:00:04.000Z",
+ "aiTitle": "AI Named",
+ },
+ {"type": "tag", "timestamp": "2024-01-01T00:00:05.000Z", "tag": "wip"},
+ {
+ "type": "x",
+ "timestamp": "2024-01-01T00:00:06.000Z",
+ "customTitle": "User Named",
+ "gitBranch": "feature",
+ },
+ ]
+
+ # Incremental — fold across two append batches to exercise carry-over.
+ folded = fold_session_summary(None, k, entries[:3])
+ folded = fold_session_summary(folded, k, entries[3:])
+ incremental = summary_entry_to_sdk_info(folded, "/work")
+
+ # Batch — same path list_sessions_from_store fallback uses.
+ jsonl = _entries_to_jsonl(entries)
+ batch = _parse_session_info_from_lite(
+ sid, _jsonl_to_lite(jsonl, folded["mtime"]), "/work"
+ )
+
+ assert incremental is not None and batch is not None
+ # file_size is a byte count only meaningful for the JSONL path.
+ batch.file_size = None
+ assert incremental == batch
+
+ def test_parity_first_prompt_only(self) -> None:
+ sid = "33333333-3333-4333-8333-333333333333"
+ k: SessionKey = {"project_key": PROJECT_KEY, "session_id": sid}
+ entries: list[dict[str, Any]] = [
+ _user("just a prompt", ts="2024-02-01T00:00:00.000Z", cwd="/w"),
+ ]
+ folded = fold_session_summary(None, k, entries)
+ incremental = summary_entry_to_sdk_info(folded, "/w")
+ jsonl = _entries_to_jsonl(entries)
+ batch = _parse_session_info_from_lite(
+ sid, _jsonl_to_lite(jsonl, folded["mtime"]), "/w"
+ )
+ assert incremental is not None and batch is not None
+ batch.file_size = None
+ assert incremental == batch