From 2a16b47e3df1f91e8503ba738348203f0d560d16 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Sun, 19 Apr 2026 06:55:16 +0000 Subject: [PATCH 01/17] =?UTF-8?q?feat:=20SessionStore.list=5Fsession=5Fsum?= =?UTF-8?q?maries=20=E2=80=94=20batch=20summary=20primitive=20replacing=20?= =?UTF-8?q?per-session=20load=5Frange?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/claude_agent_sdk/__init__.py | 4 + .../_internal/session_store.py | 18 + .../_internal/session_summary.py | 208 +++++++ src/claude_agent_sdk/_internal/sessions.py | 22 +- .../testing/session_store_conformance.py | 54 +- src/claude_agent_sdk/types.py | 47 ++ tests/test_session_helpers_store.py | 8 + tests/test_session_summary.py | 515 ++++++++++++++++++ 8 files changed, 866 insertions(+), 10 deletions(-) create mode 100644 src/claude_agent_sdk/_internal/session_summary.py create mode 100644 tests/test_session_summary.py diff --git a/src/claude_agent_sdk/__init__.py b/src/claude_agent_sdk/__init__.py index 0ff4fb4b..c9d1e897 100644 --- a/src/claude_agent_sdk/__init__.py +++ b/src/claude_agent_sdk/__init__.py @@ -37,6 +37,7 @@ tag_session_via_store, ) from ._internal.session_store import InMemorySessionStore, project_key_for_directory +from ._internal.session_summary import fold_session_summary from ._internal.sessions import ( get_session_info, get_session_info_from_store, @@ -109,6 +110,7 @@ SessionStore, SessionStoreEntry, SessionStoreListEntry, + SessionSummaryEntry, SettingSource, StopHookInput, StreamEvent, @@ -602,8 +604,10 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any: "SessionStore", "SessionStoreEntry", "SessionStoreListEntry", + "SessionSummaryEntry", "SessionListSubkeysKey", "InMemorySessionStore", + "fold_session_summary", "MirrorErrorMessage", "project_key_for_directory", # Session listing (SessionStore-backed async variants) diff --git a/src/claude_agent_sdk/_internal/session_store.py b/src/claude_agent_sdk/_internal/session_store.py index 0d5a0e90..e26d004c 100644 --- a/src/claude_agent_sdk/_internal/session_store.py +++ b/src/claude_agent_sdk/_internal/session_store.py @@ -12,7 +12,9 @@ SessionStore, SessionStoreEntry, SessionStoreListEntry, + SessionSummaryEntry, ) +from .session_summary import fold_session_summary from .sessions import project_key_for_directory __all__ = [ @@ -41,11 +43,20 @@ class InMemorySessionStore(SessionStore): def __init__(self) -> None: self._store: dict[str, list[SessionStoreEntry]] = {} self._mtimes: dict[str, int] = {} + self._summaries: dict[tuple[str, str], SessionSummaryEntry] = {} async def append(self, key: SessionKey, entries: list[SessionStoreEntry]) -> None: k = _key_to_string(key) self._store.setdefault(k, []).extend(entries) self._mtimes[k] = int(time.time() * 1000) + # Maintain the per-session summary sidecar incrementally so + # list_session_summaries() never re-reads. Subagent subpaths don't + # contribute to the main session's summary. + if key.get("subpath") is None: + sk = (key["project_key"], key["session_id"]) + self._summaries[sk] = fold_session_summary( + self._summaries.get(sk), key, entries + ) async def load(self, key: SessionKey) -> list[SessionStoreEntry] | None: entries = self._store.get(_key_to_string(key)) @@ -64,6 +75,11 @@ async def list_sessions(self, project_key: str) -> list[SessionStoreListEntry]: ) return results + async def list_session_summaries( + self, project_key: str + ) -> list[SessionSummaryEntry]: + return [s for (pk, _), s in self._summaries.items() if pk == project_key] + async def delete(self, key: SessionKey) -> None: k = _key_to_string(key) self._store.pop(k, None) @@ -72,6 +88,7 @@ async def delete(self, key: SessionKey) -> None: # transcripts, metadata) so they aren't orphaned. A targeted delete # with an explicit subpath removes only that one entry. if key.get("subpath") is None: + self._summaries.pop((key["project_key"], key["session_id"]), None) prefix = f"{key['project_key']}/{key['session_id']}/" for store_key in [sk for sk in self._store if sk.startswith(prefix)]: self._store.pop(store_key, None) @@ -103,6 +120,7 @@ def clear(self) -> None: """Test helper — clear all stored data.""" self._store.clear() self._mtimes.clear() + self._summaries.clear() def file_path_to_session_key(file_path: str, projects_dir: str) -> SessionKey | None: diff --git a/src/claude_agent_sdk/_internal/session_summary.py b/src/claude_agent_sdk/_internal/session_summary.py new file mode 100644 index 00000000..047c1fa3 --- /dev/null +++ b/src/claude_agent_sdk/_internal/session_summary.py @@ -0,0 +1,208 @@ +"""Incremental session-summary derivation for :class:`SessionStore` adapters. + +:func:`fold_session_summary` lets a store maintain a per-session +:class:`SessionSummaryEntry` sidecar incrementally inside ``append()`` so +``list_sessions_from_store()`` can fetch all metadata in a single +``list_session_summaries()`` call instead of N per-session ``load()`` calls. + +Every derived field is append-incremental (set-once or last-wins) so adapters +never need to re-read previously appended entries. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, cast + +from ..types import ( + SDKSessionInfo, + SessionKey, + SessionStoreEntry, + SessionSummaryEntry, +) +from .sessions import _COMMAND_NAME_RE, _SKIP_FIRST_PROMPT_PATTERN + +__all__ = ["fold_session_summary", "summary_entry_to_sdk_info"] + + +# Map of JSONL entry keys → SessionSummaryEntry keys for last-wins string +# fields. Each appended entry overwrites the previous value when present. +_LAST_WINS_FIELDS: dict[str, str] = { + "customTitle": "custom_title", + "aiTitle": "ai_title", + "lastPrompt": "last_prompt", + "summary": "summary_hint", + "gitBranch": "git_branch", +} + + +def _iso_to_epoch_ms(ts: Any) -> int | None: + """Parse an ISO-8601 timestamp string to Unix epoch milliseconds.""" + if not isinstance(ts, str): + return None + try: + # Python 3.10's fromisoformat doesn't support trailing 'Z' + norm = ts.replace("Z", "+00:00") if ts.endswith("Z") else ts + return int(datetime.fromisoformat(norm).timestamp() * 1000) + except ValueError: + return None + + +def _entry_text_blocks(entry: dict[str, Any]) -> list[str]: + """Extract text strings from a ``type=="user"`` entry's message content.""" + message = entry.get("message") + if not isinstance(message, dict): + return [] + content = message.get("content") + texts: list[str] = [] + if isinstance(content, str): + texts.append(content) + elif isinstance(content, list): + for block in content: + if ( + isinstance(block, dict) + and block.get("type") == "text" + and isinstance(block.get("text"), str) + ): + texts.append(block["text"]) + return texts + + +def _fold_first_prompt(summary: SessionSummaryEntry, entry: dict[str, Any]) -> None: + """Replicate ``_extract_first_prompt_from_head`` for a single parsed entry. + + Mutates ``summary`` in place: sets ``first_prompt`` + ``first_prompt_locked`` + on a real match, or stashes a ``command_fallback`` for slash-command + messages. Skips tool_result, isMeta, isCompactSummary, and auto-generated + patterns. + """ + if summary.get("first_prompt_locked"): + return + if entry.get("type") != "user": + return + if entry.get("isMeta") is True or entry.get("isCompactSummary") is True: + return + # Skip tool_result-carrying user messages. + message = entry.get("message") + if isinstance(message, dict): + content = message.get("content") + if isinstance(content, list) and any( + isinstance(b, dict) and b.get("type") == "tool_result" for b in content + ): + return + + for raw in _entry_text_blocks(entry): + result = raw.replace("\n", " ").strip() + if not result: + continue + cmd_match = _COMMAND_NAME_RE.search(result) + if cmd_match: + if not summary.get("command_fallback"): + summary["command_fallback"] = cmd_match.group(1) + continue + if _SKIP_FIRST_PROMPT_PATTERN.match(result): + continue + if len(result) > 200: + result = result[:200].rstrip() + "\u2026" + summary["first_prompt"] = result + summary["first_prompt_locked"] = True + return + + +def fold_session_summary( + prev: SessionSummaryEntry | None, + key: SessionKey, + entries: list[SessionStoreEntry], +) -> SessionSummaryEntry: + """Fold a batch of appended entries into the running summary for ``key``. + + Stores call this from inside ``append()`` to keep a + :class:`SessionSummaryEntry` sidecar up to date without re-reading the + transcript. ``prev`` is the previous summary for the same key (or ``None`` + for the first append). + + Set-once fields (``is_sidechain``, ``created_at``, ``cwd``, + ``first_prompt``) freeze on first sight; last-wins fields + (``custom_title``, ``ai_title``, ``last_prompt``, ``summary_hint``, + ``git_branch``, ``tag``, ``mtime``) overwrite on every appearance. + """ + if prev is not None: + summary = cast(SessionSummaryEntry, dict(prev)) + else: + summary = {"session_id": key["session_id"], "mtime": 0} + + for raw in entries: + # SessionStoreEntry is a permissive TypedDict; widen to a plain dict + # so .get() of unknown keys type-checks. + entry = cast("dict[str, Any]", raw) + + ms = _iso_to_epoch_ms(entry.get("timestamp")) + if ms is not None and ms > summary["mtime"]: + summary["mtime"] = ms + + if "created_at" not in summary: + if ms is not None: + summary["created_at"] = ms + summary["is_sidechain"] = entry.get("isSidechain") is True + + if "cwd" not in summary: + cwd = entry.get("cwd") + if isinstance(cwd, str) and cwd: + summary["cwd"] = cwd + + _fold_first_prompt(summary, entry) + + for src, dst in _LAST_WINS_FIELDS.items(): + val = entry.get(src) + if isinstance(val, str): + summary[dst] = val # type: ignore[literal-required] + + if entry.get("type") == "tag": + tag_val = entry.get("tag") + if isinstance(tag_val, str) and tag_val: + summary["tag"] = tag_val + else: + # Empty string or absent tag clears the tag. + summary.pop("tag", None) + + return summary + + +def summary_entry_to_sdk_info( + entry: SessionSummaryEntry, project_path: str | None +) -> SDKSessionInfo | None: + """Convert a :class:`SessionSummaryEntry` to :class:`SDKSessionInfo`. + + Returns ``None`` for sidechain sessions or sessions with no extractable + summary, matching ``_parse_session_info_from_lite``'s filtering. + """ + if entry.get("is_sidechain"): + return None + + first_prompt = ( + entry.get("first_prompt") + if entry.get("first_prompt_locked") + else entry.get("command_fallback") + ) or None + custom_title = entry.get("custom_title") or entry.get("ai_title") or None + summary = ( + custom_title + or entry.get("last_prompt") + or entry.get("summary_hint") + or first_prompt + ) + if not summary: + return None + + return SDKSessionInfo( + session_id=entry["session_id"], + summary=summary, + last_modified=entry["mtime"], + file_size=entry.get("file_size"), + custom_title=custom_title, + first_prompt=first_prompt, + git_branch=entry.get("git_branch") or None, + cwd=entry.get("cwd") or project_path or None, + tag=entry.get("tag") or None, + created_at=entry.get("created_at"), + ) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index dbd07d37..df169d98 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1554,13 +1554,31 @@ async def list_sessions_from_store( Consider denormalizing summary metadata into your adapter's ``list_sessions()`` index. """ + project_path = _canonicalize_path(str(directory) if directory is not None else ".") + project_key = _sanitize_path(project_path) + + # Fast path: if the store maintains incremental summaries, fetch them in + # one call instead of N per-session load()s. + if _store_implements(session_store, "list_session_summaries"): + from .session_summary import summary_entry_to_sdk_info + + try: + summaries = await session_store.list_session_summaries(project_key) + except NotImplementedError: + pass + else: + infos = [ + info + for s in summaries + if (info := summary_entry_to_sdk_info(s, project_path)) is not None + ] + return _apply_sort_limit_offset(infos, limit, offset) + if not _store_implements(session_store, "list_sessions"): raise ValueError( "session_store does not implement list_sessions() -- cannot list " "sessions. Provide a store with a list_sessions() method." ) - project_path = _canonicalize_path(str(directory) if directory is not None else ".") - project_key = _sanitize_path(project_path) raw = await session_store.list_sessions(project_key) # Copy — store.list_sessions() may return a reference to internal state. listing = list(raw) diff --git a/src/claude_agent_sdk/testing/session_store_conformance.py b/src/claude_agent_sdk/testing/session_store_conformance.py index c5a9657f..c8fcd9cc 100644 --- a/src/claude_agent_sdk/testing/session_store_conformance.py +++ b/src/claude_agent_sdk/testing/session_store_conformance.py @@ -1,9 +1,10 @@ """Shared conformance test suite for :class:`SessionStore` adapters. Call :func:`run_session_store_conformance` from an async test to assert the -13 behavioral contracts every adapter must satisfy. Tests for optional -methods (``list_sessions``, ``delete``, ``list_subkeys``) are skipped when -named in ``skip_optional`` or when the store does not override that method. +14 behavioral contracts every adapter must satisfy. Tests for optional +methods (``list_sessions``, ``list_session_summaries``, ``delete``, +``list_subkeys``) are skipped when named in ``skip_optional`` or when the +store does not override that method. Example:: @@ -24,9 +25,11 @@ async def test_my_store_conformance(): from ..types import SessionKey, SessionStore -OptionalMethod = str # "list_sessions" | "delete" | "list_subkeys" +OptionalMethod = ( + str # "list_sessions" | "list_session_summaries" | "delete" | "list_subkeys" +) _OPTIONAL_METHODS: frozenset[str] = frozenset( - {"list_sessions", "delete", "list_subkeys"} + {"list_sessions", "list_session_summaries", "delete", "list_subkeys"} ) _KEY: SessionKey = {"project_key": "proj", "session_id": "sess"} @@ -53,12 +56,12 @@ async def run_session_store_conformance( *, skip_optional: frozenset[str] = frozenset(), ) -> None: - """Assert the 13 :class:`SessionStore` behavioral contracts. + """Assert the 14 :class:`SessionStore` behavioral contracts. ``make_store`` is invoked once per contract to provide isolation. It may be sync or async. Contracts for optional methods (``list_sessions``, - ``delete``, ``list_subkeys``) are skipped when named in ``skip_optional`` - or when the store does not override that method. + ``list_session_summaries``, ``delete``, ``list_subkeys``) are skipped when + named in ``skip_optional`` or when the store does not override that method. """ invalid = skip_optional - _OPTIONAL_METHODS assert not invalid, f"unknown optional methods in skip_optional: {invalid}" @@ -71,6 +74,7 @@ async def fresh() -> SessionStore: probe = await fresh() has_list_sessions = _has_optional(probe, "list_sessions", skip_optional) + has_list_summaries = _has_optional(probe, "list_session_summaries", skip_optional) has_delete = _has_optional(probe, "delete", skip_optional) has_list_subkeys = _has_optional(probe, "list_subkeys", skip_optional) @@ -160,6 +164,40 @@ async def fresh() -> SessionStore: sessions = await store.list_sessions("proj") assert [s["session_id"] for s in sessions] == ["main"] + # --- Optional: list_session_summaries ---------------------------------- + + if has_list_summaries: + # 14. list_session_summaries reflects incremental fold on append + store = await fresh() + key: SessionKey = {"project_key": "proj", "session_id": "summ-sess"} + await store.append( + key, + [ + _e({"timestamp": "2024-01-01T00:00:00.000Z", "customTitle": "first"}), + _e({"timestamp": "2024-01-01T00:00:01.000Z"}), + ], + ) + await store.append( + key, + [_e({"timestamp": "2024-01-01T00:00:02.000Z", "customTitle": "second"})], + ) + await store.append( + {"project_key": "other", "session_id": "elsewhere"}, + [_e({"timestamp": "2024-01-01T00:00:00.000Z"})], + ) + summaries = await store.list_session_summaries("proj") + by_id = {s["session_id"]: s for s in summaries} + assert set(by_id) == {"summ-sess"} + summ = by_id["summ-sess"] + # mtime must be epoch-ms; >1e12 rules out epoch-seconds. + assert math.isfinite(summ["mtime"]) and summ["mtime"] > 1e12 + # custom_title is last-wins across append calls. + assert summ.get("custom_title") == "second" + assert await store.list_session_summaries("never-appended-project") == [] + if has_delete: + await store.delete(key) + assert await store.list_session_summaries("proj") == [] + # --- Optional: delete -------------------------------------------------- if has_delete: diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 279bd0d2..ab1ffcad 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -1159,6 +1159,39 @@ class SessionStoreListEntry(TypedDict): modification time (e.g. Redis) must maintain their own index.""" +class SessionSummaryEntry(TypedDict, total=False): + """Incrementally-maintained session summary. + + Stores update this on :meth:`SessionStore.append` via + :func:`fold_session_summary` and return the full set from + :meth:`SessionStore.list_session_summaries`. Every field is + append-incremental (set-once or last-wins) so adapters never re-read. + """ + + session_id: Required[str] + mtime: Required[int] + """Last-modified time in Unix epoch milliseconds (last entry timestamp).""" + is_sidechain: bool + created_at: int + """First entry timestamp in Unix epoch milliseconds.""" + cwd: str + first_prompt: str + """First meaningful user prompt, truncated to 200 chars.""" + first_prompt_locked: bool + """Internal: ``True`` once a non-command prompt has been found.""" + command_fallback: str + """Internal: first ```` seen, used when no real prompt + appears.""" + custom_title: str + ai_title: str + last_prompt: str + summary_hint: str + """Raw ``summary`` key from JSONL.""" + git_branch: str + tag: str + file_size: int + + class SessionListSubkeysKey(TypedDict): """Key argument to :meth:`SessionStore.list_subkeys` (no ``subpath``).""" @@ -1233,6 +1266,20 @@ async def list_sessions(self, project_key: str) -> list[SessionStoreListEntry]: """ raise NotImplementedError + async def list_session_summaries( + self, project_key: str + ) -> list[SessionSummaryEntry]: + """Return incrementally-maintained summaries for all sessions in one call. + + Stores should maintain these via :func:`fold_session_summary` inside + :meth:`append`. If not implemented, ``list_sessions_from_store()`` + falls back to per-session :meth:`load`. + + Optional — if unimplemented, ``list_sessions_from_store()`` falls back + to ``list_sessions()`` + per-session ``load()``. + """ + raise NotImplementedError + async def delete(self, key: SessionKey) -> None: """Delete a session. diff --git a/tests/test_session_helpers_store.py b/tests/test_session_helpers_store.py index 5854a5f1..45454199 100644 --- a/tests/test_session_helpers_store.py +++ b/tests/test_session_helpers_store.py @@ -210,6 +210,10 @@ async def test_adapter_load_error_degrades_row(self) -> None: """One failing load() degrades that row instead of failing the list.""" class FlakeyStore(InMemorySessionStore): + # Force the per-session load() fallback path under test. + async def list_session_summaries(self, project_key): + raise NotImplementedError + async def load(self, key): if key["session_id"] == bad_sid: raise RuntimeError("backend down") @@ -238,6 +242,10 @@ async def test_load_concurrency_is_bounded(self) -> None: gate = asyncio.Event() class SlowStore(InMemorySessionStore): + # Force the per-session load() fallback path under test. + async def list_session_summaries(self, project_key): + raise NotImplementedError + async def load(self, key): nonlocal in_flight, peak in_flight += 1 diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py new file mode 100644 index 00000000..6aaccbaa --- /dev/null +++ b/tests/test_session_summary.py @@ -0,0 +1,515 @@ +"""Tests for incremental session-summary derivation. + +Covers ``fold_session_summary``, ``summary_entry_to_sdk_info``, +``InMemorySessionStore.list_session_summaries``, and the +``list_sessions_from_store`` fast path that consumes them. +""" + +from __future__ import annotations + +import uuid as uuid_mod +from typing import Any + +import pytest + +from claude_agent_sdk import ( + InMemorySessionStore, + SessionSummaryEntry, + fold_session_summary, + list_sessions_from_store, + project_key_for_directory, +) +from claude_agent_sdk._internal.session_summary import summary_entry_to_sdk_info +from claude_agent_sdk._internal.sessions import ( + _entries_to_jsonl, + _jsonl_to_lite, + _parse_session_info_from_lite, +) +from claude_agent_sdk.types import SessionKey + +DIR = "/workspace/project" +PROJECT_KEY = project_key_for_directory(DIR) +KEY: SessionKey = { + "project_key": PROJECT_KEY, + "session_id": "11111111-1111-4111-8111-111111111111", +} + + +def _user( + text: str | list[dict[str, Any]], ts: str = "2024-01-01T00:00:00.000Z", **extra: Any +) -> dict[str, Any]: + return { + "type": "user", + "timestamp": ts, + "message": {"role": "user", "content": text}, + **extra, + } + + +# --------------------------------------------------------------------------- +# fold_session_summary unit tests +# --------------------------------------------------------------------------- + + +class TestFoldSessionSummary: + def test_init_from_none(self) -> None: + s = fold_session_summary(None, KEY, []) + assert s == {"session_id": KEY["session_id"], "mtime": 0} + + def test_set_once_fields_freeze(self) -> None: + s = fold_session_summary( + None, + KEY, + [ + { + "type": "x", + "timestamp": "2024-01-01T00:00:00.000Z", + "cwd": "/a", + "isSidechain": False, + }, + {"type": "x", "timestamp": "2024-01-01T00:00:05.000Z", "cwd": "/b"}, + ], + ) + assert s["created_at"] == 1704067200000 + assert s["cwd"] == "/a" + assert s["is_sidechain"] is False + # Second append must not overwrite set-once fields. + s2 = fold_session_summary( + s, + KEY, + [ + { + "type": "x", + "timestamp": "2024-01-02T00:00:00.000Z", + "cwd": "/c", + "isSidechain": True, + } + ], + ) + assert s2["created_at"] == 1704067200000 + assert s2["cwd"] == "/a" + assert s2["is_sidechain"] is False + + def test_last_wins_overwrite(self) -> None: + s = fold_session_summary( + None, + KEY, + [ + { + "type": "x", + "timestamp": "2024-01-01T00:00:00Z", + "customTitle": "t1", + "gitBranch": "main", + }, + {"type": "x", "timestamp": "2024-01-01T00:00:01Z", "customTitle": "t2"}, + ], + ) + assert s["custom_title"] == "t2" + assert s["git_branch"] == "main" + s2 = fold_session_summary( + s, + KEY, + [ + { + "type": "x", + "aiTitle": "ai", + "lastPrompt": "lp", + "summary": "sm", + "gitBranch": "dev", + } + ], + ) + assert s2["custom_title"] == "t2" + assert s2["ai_title"] == "ai" + assert s2["last_prompt"] == "lp" + assert s2["summary_hint"] == "sm" + assert s2["git_branch"] == "dev" + + def test_mtime_takes_max(self) -> None: + s = fold_session_summary( + None, + KEY, + [ + {"type": "x", "timestamp": "2024-01-01T00:00:05.000Z"}, + {"type": "x", "timestamp": "2024-01-01T00:00:01.000Z"}, + ], + ) + assert s["mtime"] == 1704067205000 + + def test_tag_set_and_clear(self) -> None: + s = fold_session_summary(None, KEY, [{"type": "tag", "tag": "wip"}]) + assert s["tag"] == "wip" + s2 = fold_session_summary(s, KEY, [{"type": "tag", "tag": ""}]) + assert "tag" not in s2 + # Non-tag entries with a "tag" key (e.g. tool_use input) are ignored. + s3 = fold_session_summary(s, KEY, [{"type": "user", "tag": "ignored"}]) + assert s3["tag"] == "wip" + + def test_sidechain_from_first_entry(self) -> None: + s = fold_session_summary( + None, + KEY, + [{"type": "x", "timestamp": "2024-01-01T00:00:00Z", "isSidechain": True}], + ) + assert s["is_sidechain"] is True + + def test_first_prompt_skips_meta_tool_result_and_compact(self) -> None: + s = fold_session_summary( + None, + KEY, + [ + _user("ignored meta", isMeta=True), + _user("ignored compact", isCompactSummary=True), + _user([{"type": "tool_result", "tool_use_id": "x", "content": "res"}]), + _user("real first"), + _user("not me"), + ], + ) + assert s["first_prompt"] == "real first" + assert s["first_prompt_locked"] is True + + def test_first_prompt_command_fallback(self) -> None: + s = fold_session_summary( + None, + KEY, + [ + _user("/init stuff"), + _user("/second"), + ], + ) + assert s.get("first_prompt_locked") is not True + assert s["command_fallback"] == "/init" + # A later real prompt locks it. + s2 = fold_session_summary(s, KEY, [_user("now real")]) + assert s2["first_prompt"] == "now real" + assert s2["first_prompt_locked"] is True + + def test_first_prompt_skip_pattern(self) -> None: + s = fold_session_summary( + None, + KEY, + [_user(" some output"), _user("hello")], + ) + assert s["first_prompt"] == "hello" + + def test_first_prompt_truncated(self) -> None: + s = fold_session_summary(None, KEY, [_user("x" * 300)]) + assert len(s["first_prompt"]) <= 201 + assert s["first_prompt"].endswith("\u2026") + + def test_prev_is_not_mutated(self) -> None: + prev: SessionSummaryEntry = {"session_id": "a", "mtime": 5} + fold_session_summary(prev, KEY, [{"type": "x", "customTitle": "t"}]) + assert prev == {"session_id": "a", "mtime": 5} + + +# --------------------------------------------------------------------------- +# summary_entry_to_sdk_info +# --------------------------------------------------------------------------- + + +class TestSummaryEntryToSdkInfo: + def test_sidechain_returns_none(self) -> None: + assert ( + summary_entry_to_sdk_info( + { + "session_id": "s", + "mtime": 1, + "is_sidechain": True, + "custom_title": "t", + }, + None, + ) + is None + ) + + def test_empty_summary_returns_none(self) -> None: + assert summary_entry_to_sdk_info({"session_id": "s", "mtime": 1}, None) is None + + def test_precedence_chain(self) -> None: + base: SessionSummaryEntry = { + "session_id": "s", + "mtime": 1, + "first_prompt": "fp", + "first_prompt_locked": True, + "command_fallback": "/cmd", + "summary_hint": "sh", + "last_prompt": "lp", + "ai_title": "ai", + "custom_title": "ct", + } + info = summary_entry_to_sdk_info(base, None) + assert info is not None and info.summary == "ct" and info.custom_title == "ct" + + del base["custom_title"] + info = summary_entry_to_sdk_info(base, None) + assert info is not None and info.summary == "ai" and info.custom_title == "ai" + + del base["ai_title"] + info = summary_entry_to_sdk_info(base, None) + assert info is not None and info.summary == "lp" and info.custom_title is None + + del base["last_prompt"] + info = summary_entry_to_sdk_info(base, None) + assert info is not None and info.summary == "sh" + + del base["summary_hint"] + info = summary_entry_to_sdk_info(base, None) + assert info is not None and info.summary == "fp" and info.first_prompt == "fp" + + base["first_prompt_locked"] = False + info = summary_entry_to_sdk_info(base, None) + assert ( + info is not None and info.summary == "/cmd" and info.first_prompt == "/cmd" + ) + + def test_cwd_fallback_to_project_path(self) -> None: + info = summary_entry_to_sdk_info( + {"session_id": "s", "mtime": 1, "custom_title": "t"}, "/proj" + ) + assert info is not None and info.cwd == "/proj" + info2 = summary_entry_to_sdk_info( + {"session_id": "s", "mtime": 1, "custom_title": "t", "cwd": "/own"}, "/proj" + ) + assert info2 is not None and info2.cwd == "/own" + + def test_field_passthrough(self) -> None: + info = summary_entry_to_sdk_info( + { + "session_id": "s", + "mtime": 99, + "custom_title": "t", + "git_branch": "main", + "tag": "wip", + "created_at": 50, + "file_size": 1234, + }, + None, + ) + assert info is not None + assert info.session_id == "s" + assert info.last_modified == 99 + assert info.git_branch == "main" + assert info.tag == "wip" + assert info.created_at == 50 + assert info.file_size == 1234 + + +# --------------------------------------------------------------------------- +# InMemorySessionStore.list_session_summaries +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestInMemoryListSessionSummaries: + async def test_tracks_appends(self) -> None: + store = InMemorySessionStore() + a: SessionKey = {"project_key": PROJECT_KEY, "session_id": "a"} + b: SessionKey = {"project_key": PROJECT_KEY, "session_id": "b"} + await store.append(a, [_user("hello a", ts="2024-01-01T00:00:00Z")]) + await store.append(a, [{"type": "x", "customTitle": "Title A"}]) + await store.append(b, [_user("hello b", ts="2024-01-02T00:00:00Z")]) + summaries = { + s["session_id"]: s for s in await store.list_session_summaries(PROJECT_KEY) + } + assert set(summaries) == {"a", "b"} + assert summaries["a"]["custom_title"] == "Title A" + assert summaries["a"]["first_prompt"] == "hello a" + assert summaries["b"]["first_prompt"] == "hello b" + + async def test_subpath_appends_ignored(self) -> None: + store = InMemorySessionStore() + main: SessionKey = {"project_key": PROJECT_KEY, "session_id": "m"} + sub: SessionKey = { + "project_key": PROJECT_KEY, + "session_id": "m", + "subpath": "subagents/agent-1", + } + await store.append(main, [_user("main prompt")]) + await store.append( + sub, [_user("sub prompt"), {"type": "x", "customTitle": "sub"}] + ) + summaries = await store.list_session_summaries(PROJECT_KEY) + assert len(summaries) == 1 + assert summaries[0]["first_prompt"] == "main prompt" + assert "custom_title" not in summaries[0] + + async def test_delete_drops_summary(self) -> None: + store = InMemorySessionStore() + k: SessionKey = {"project_key": PROJECT_KEY, "session_id": "x"} + await store.append(k, [_user("hi")]) + assert len(await store.list_session_summaries(PROJECT_KEY)) == 1 + await store.delete(k) + assert await store.list_session_summaries(PROJECT_KEY) == [] + + async def test_project_isolation(self) -> None: + store = InMemorySessionStore() + await store.append({"project_key": "A", "session_id": "s"}, [_user("a")]) + await store.append({"project_key": "B", "session_id": "s"}, [_user("b")]) + assert len(await store.list_session_summaries("A")) == 1 + assert len(await store.list_session_summaries("B")) == 1 + assert await store.list_session_summaries("C") == [] + + +# --------------------------------------------------------------------------- +# list_sessions_from_store integration — fast path +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestListSessionsFromStoreFastPath: + async def test_fast_path_skips_load(self, monkeypatch: pytest.MonkeyPatch) -> None: + """With list_session_summaries() available, load() must NOT be called.""" + store = InMemorySessionStore() + sid_a = str(uuid_mod.uuid4()) + sid_b = str(uuid_mod.uuid4()) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_a}, + [_user("first a", ts="2024-01-01T00:00:00Z", cwd=DIR)], + ) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_b}, + [_user("first b", ts="2024-01-02T00:00:00Z", cwd=DIR)], + ) + + async def _boom(self, key): # noqa: ANN001 + raise AssertionError("load() must not be called on the fast path") + + monkeypatch.setattr(InMemorySessionStore, "load", _boom) + + sessions = await list_sessions_from_store(store, directory=DIR) + assert {s.session_id for s in sessions} == {sid_a, sid_b} + # Sorted by last_modified descending — sid_b's timestamp is newer. + assert sessions[0].session_id == sid_b + assert sessions[0].summary == "first b" + assert sessions[1].first_prompt == "first a" + + async def test_fast_path_filters_sidechain_and_empty(self) -> None: + store = InMemorySessionStore() + sid_main = str(uuid_mod.uuid4()) + sid_side = str(uuid_mod.uuid4()) + sid_empty = str(uuid_mod.uuid4()) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_main}, + [_user("hello", ts="2024-01-01T00:00:00Z")], + ) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_side}, + [ + { + "type": "user", + "timestamp": "2024-01-01T00:00:00Z", + "isSidechain": True, + "message": {"content": "x"}, + } + ], + ) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_empty}, + [{"type": "x", "timestamp": "2024-01-01T00:00:00Z"}], + ) + sessions = await list_sessions_from_store(store, directory=DIR) + assert {s.session_id for s in sessions} == {sid_main} + + async def test_fast_path_limit_offset(self) -> None: + store = InMemorySessionStore() + sids = [str(uuid_mod.uuid4()) for _ in range(5)] + for i, sid in enumerate(sids): + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid}, + [_user(f"p{i}", ts=f"2024-01-0{i + 1}T00:00:00Z")], + ) + page = await list_sessions_from_store(store, directory=DIR, limit=2, offset=1) + assert len(page) == 2 + assert page[0].session_id == sids[3] + assert page[1].session_id == sids[2] + + async def test_not_implemented_falls_back_to_load(self) -> None: + """A store that overrides list_session_summaries but raises + NotImplementedError must fall back to the per-session load() path.""" + + class FallbackStore(InMemorySessionStore): + async def list_session_summaries(self, project_key: str): # noqa: ANN201 + raise NotImplementedError + + store = FallbackStore() + sid = str(uuid_mod.uuid4()) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid}, + [_user("hi", ts="2024-01-01T00:00:00Z")], + ) + sessions = await list_sessions_from_store(store, directory=DIR) + assert len(sessions) == 1 + assert sessions[0].summary == "hi" + + +# --------------------------------------------------------------------------- +# Parity: incremental fold == batch lite-parse +# --------------------------------------------------------------------------- + + +class TestParityWithLiteParse: + def test_incremental_equals_batch(self) -> None: + """``summary_entry_to_sdk_info(fold(...))`` must equal + ``_parse_session_info_from_lite`` on the same entry stream.""" + sid = "22222222-2222-4222-8222-222222222222" + k: SessionKey = {"project_key": PROJECT_KEY, "session_id": sid} + entries: list[dict[str, Any]] = [ + _user( + "/clear", + ts="2024-01-01T00:00:00.000Z", + cwd="/work", + gitBranch="main", + ), + _user("ignored", ts="2024-01-01T00:00:01.000Z", isMeta=True), + _user("real prompt here", ts="2024-01-01T00:00:02.000Z"), + { + "type": "assistant", + "timestamp": "2024-01-01T00:00:03.000Z", + "message": {"content": [{"type": "text", "text": "ok"}]}, + }, + { + "type": "x", + "timestamp": "2024-01-01T00:00:04.000Z", + "aiTitle": "AI Named", + }, + {"type": "tag", "timestamp": "2024-01-01T00:00:05.000Z", "tag": "wip"}, + { + "type": "x", + "timestamp": "2024-01-01T00:00:06.000Z", + "customTitle": "User Named", + "gitBranch": "feature", + }, + ] + + # Incremental — fold across two append batches to exercise carry-over. + folded = fold_session_summary(None, k, entries[:3]) + folded = fold_session_summary(folded, k, entries[3:]) + incremental = summary_entry_to_sdk_info(folded, "/work") + + # Batch — same path list_sessions_from_store fallback uses. + jsonl = _entries_to_jsonl(entries) + batch = _parse_session_info_from_lite( + sid, _jsonl_to_lite(jsonl, folded["mtime"]), "/work" + ) + + assert incremental is not None and batch is not None + # file_size is a byte count only meaningful for the JSONL path. + batch.file_size = None + assert incremental == batch + + def test_parity_first_prompt_only(self) -> None: + sid = "33333333-3333-4333-8333-333333333333" + k: SessionKey = {"project_key": PROJECT_KEY, "session_id": sid} + entries: list[dict[str, Any]] = [ + _user("just a prompt", ts="2024-02-01T00:00:00.000Z", cwd="/w"), + ] + folded = fold_session_summary(None, k, entries) + incremental = summary_entry_to_sdk_info(folded, "/w") + jsonl = _entries_to_jsonl(entries) + batch = _parse_session_info_from_lite( + sid, _jsonl_to_lite(jsonl, folded["mtime"]), "/w" + ) + assert incremental is not None and batch is not None + batch.file_size = None + assert incremental == batch From ea201217e5cc35b5c83eb59dee441c293c0db305 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Sun, 19 Apr 2026 06:59:24 +0000 Subject: [PATCH 02/17] fix: latch is_sidechain independently of created_at; update list_sessions_from_store docstring --- src/claude_agent_sdk/_internal/session_summary.py | 6 +++--- src/claude_agent_sdk/_internal/sessions.py | 10 +++++----- tests/test_session_summary.py | 15 +++++++++++++++ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/claude_agent_sdk/_internal/session_summary.py b/src/claude_agent_sdk/_internal/session_summary.py index 047c1fa3..218d589a 100644 --- a/src/claude_agent_sdk/_internal/session_summary.py +++ b/src/claude_agent_sdk/_internal/session_summary.py @@ -140,10 +140,10 @@ def fold_session_summary( if ms is not None and ms > summary["mtime"]: summary["mtime"] = ms - if "created_at" not in summary: - if ms is not None: - summary["created_at"] = ms + if "is_sidechain" not in summary: summary["is_sidechain"] = entry.get("isSidechain") is True + if "created_at" not in summary and ms is not None: + summary["created_at"] = ms if "cwd" not in summary: cwd = entry.get("cwd") diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index df169d98..7d95fdb6 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1548,11 +1548,11 @@ async def list_sessions_from_store( the store path — the store operates on a single ``project_key``. .. note:: - This performs one full ``store.load()`` per session in the listing - to derive summaries. On remote backends with many or large sessions - this can be expensive (e.g., S3 egress, Postgres large-row reads). - Consider denormalizing summary metadata into your adapter's - ``list_sessions()`` index. + If the store implements ``list_session_summaries``, this is a single + store call. Otherwise falls back to one ``store.load()`` per session + (bounded at 16 concurrent), which on remote backends with many or + large sessions can be expensive (e.g., S3 egress, Postgres large-row + reads). """ project_path = _canonicalize_path(str(directory) if directory is not None else ".") project_key = _sanitize_path(project_path) diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py index 6aaccbaa..776220c5 100644 --- a/tests/test_session_summary.py +++ b/tests/test_session_summary.py @@ -153,6 +153,21 @@ def test_sidechain_from_first_entry(self) -> None: ) assert s["is_sidechain"] is True + def test_sidechain_latched_when_first_entry_lacks_timestamp(self) -> None: + """Regression: is_sidechain must latch on entry 0 even if its timestamp + is absent/unparseable, so entry 1 cannot overwrite it to False.""" + s = fold_session_summary( + None, + KEY, + [ + {"type": "user", "isSidechain": True}, + {"type": "x", "timestamp": "2024-01-01T00:00:00Z"}, + ], + ) + assert s["is_sidechain"] is True + # created_at still picks up the first parseable timestamp. + assert s["created_at"] == 1704067200000 + def test_first_prompt_skips_meta_tool_result_and_compact(self) -> None: s = fold_session_summary( None, From 27014925aa3a38d5034b5e8d838bd67f25381639 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Sun, 19 Apr 2026 07:09:36 +0000 Subject: [PATCH 03/17] refactor: underscore-prefix internal fold-state fields in SessionSummaryEntry --- .../_internal/session_summary.py | 20 +++++++++---------- src/claude_agent_sdk/types.py | 14 ++++++++----- tests/test_session_summary.py | 14 ++++++------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/src/claude_agent_sdk/_internal/session_summary.py b/src/claude_agent_sdk/_internal/session_summary.py index 218d589a..a6786e23 100644 --- a/src/claude_agent_sdk/_internal/session_summary.py +++ b/src/claude_agent_sdk/_internal/session_summary.py @@ -71,12 +71,12 @@ def _entry_text_blocks(entry: dict[str, Any]) -> list[str]: def _fold_first_prompt(summary: SessionSummaryEntry, entry: dict[str, Any]) -> None: """Replicate ``_extract_first_prompt_from_head`` for a single parsed entry. - Mutates ``summary`` in place: sets ``first_prompt`` + ``first_prompt_locked`` - on a real match, or stashes a ``command_fallback`` for slash-command - messages. Skips tool_result, isMeta, isCompactSummary, and auto-generated - patterns. + Mutates ``summary`` in place: sets ``first_prompt`` + + ``_first_prompt_locked`` on a real match, or stashes a + ``_command_fallback`` for slash-command messages. Skips tool_result, + isMeta, isCompactSummary, and auto-generated patterns. """ - if summary.get("first_prompt_locked"): + if summary.get("_first_prompt_locked"): return if entry.get("type") != "user": return @@ -97,15 +97,15 @@ def _fold_first_prompt(summary: SessionSummaryEntry, entry: dict[str, Any]) -> N continue cmd_match = _COMMAND_NAME_RE.search(result) if cmd_match: - if not summary.get("command_fallback"): - summary["command_fallback"] = cmd_match.group(1) + if not summary.get("_command_fallback"): + summary["_command_fallback"] = cmd_match.group(1) continue if _SKIP_FIRST_PROMPT_PATTERN.match(result): continue if len(result) > 200: result = result[:200].rstrip() + "\u2026" summary["first_prompt"] = result - summary["first_prompt_locked"] = True + summary["_first_prompt_locked"] = True return @@ -181,8 +181,8 @@ def summary_entry_to_sdk_info( first_prompt = ( entry.get("first_prompt") - if entry.get("first_prompt_locked") - else entry.get("command_fallback") + if entry.get("_first_prompt_locked") + else entry.get("_command_fallback") ) or None custom_title = entry.get("custom_title") or entry.get("ai_title") or None summary = ( diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index ab1ffcad..24de74eb 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -1166,6 +1166,10 @@ class SessionSummaryEntry(TypedDict, total=False): :func:`fold_session_summary` and return the full set from :meth:`SessionStore.list_session_summaries`. Every field is append-incremental (set-once or last-wins) so adapters never re-read. + + Fields prefixed ``_`` are opaque fold state — stores MUST persist them + verbatim across :func:`fold_session_summary` calls but SHOULD NOT + interpret them. """ session_id: Required[str] @@ -1177,11 +1181,6 @@ class SessionSummaryEntry(TypedDict, total=False): cwd: str first_prompt: str """First meaningful user prompt, truncated to 200 chars.""" - first_prompt_locked: bool - """Internal: ``True`` once a non-command prompt has been found.""" - command_fallback: str - """Internal: first ```` seen, used when no real prompt - appears.""" custom_title: str ai_title: str last_prompt: str @@ -1190,6 +1189,11 @@ class SessionSummaryEntry(TypedDict, total=False): git_branch: str tag: str file_size: int + _first_prompt_locked: bool + """Opaque fold state: ``True`` once a non-command prompt has been found.""" + _command_fallback: str + """Opaque fold state: first ```` seen, used when no real + prompt appears.""" class SessionListSubkeysKey(TypedDict): diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py index 776220c5..b33db340 100644 --- a/tests/test_session_summary.py +++ b/tests/test_session_summary.py @@ -181,7 +181,7 @@ def test_first_prompt_skips_meta_tool_result_and_compact(self) -> None: ], ) assert s["first_prompt"] == "real first" - assert s["first_prompt_locked"] is True + assert s["_first_prompt_locked"] is True def test_first_prompt_command_fallback(self) -> None: s = fold_session_summary( @@ -192,12 +192,12 @@ def test_first_prompt_command_fallback(self) -> None: _user("/second"), ], ) - assert s.get("first_prompt_locked") is not True - assert s["command_fallback"] == "/init" + assert s.get("_first_prompt_locked") is not True + assert s["_command_fallback"] == "/init" # A later real prompt locks it. s2 = fold_session_summary(s, KEY, [_user("now real")]) assert s2["first_prompt"] == "now real" - assert s2["first_prompt_locked"] is True + assert s2["_first_prompt_locked"] is True def test_first_prompt_skip_pattern(self) -> None: s = fold_session_summary( @@ -246,8 +246,8 @@ def test_precedence_chain(self) -> None: "session_id": "s", "mtime": 1, "first_prompt": "fp", - "first_prompt_locked": True, - "command_fallback": "/cmd", + "_first_prompt_locked": True, + "_command_fallback": "/cmd", "summary_hint": "sh", "last_prompt": "lp", "ai_title": "ai", @@ -272,7 +272,7 @@ def test_precedence_chain(self) -> None: info = summary_entry_to_sdk_info(base, None) assert info is not None and info.summary == "fp" and info.first_prompt == "fp" - base["first_prompt_locked"] = False + base["_first_prompt_locked"] = False info = summary_entry_to_sdk_info(base, None) assert ( info is not None and info.summary == "/cmd" and info.first_prompt == "/cmd" From 0288f13e44c8a01489d29aafda45b63452c00ae5 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Sun, 19 Apr 2026 07:17:54 +0000 Subject: [PATCH 04/17] =?UTF-8?q?refactor:=20collapse=20SessionSummaryEntr?= =?UTF-8?q?y=20to=20{session=5Fid,=20mtime,=20data}=20=E2=80=94=20opaque?= =?UTF-8?q?=20to=20stores?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../_internal/session_summary.py | 81 ++++++------ .../testing/session_store_conformance.py | 15 ++- src/claude_agent_sdk/types.py | 42 ++---- tests/test_session_summary.py | 120 ++++++++++-------- 4 files changed, 130 insertions(+), 128 deletions(-) diff --git a/src/claude_agent_sdk/_internal/session_summary.py b/src/claude_agent_sdk/_internal/session_summary.py index a6786e23..f931bf2d 100644 --- a/src/claude_agent_sdk/_internal/session_summary.py +++ b/src/claude_agent_sdk/_internal/session_summary.py @@ -68,15 +68,15 @@ def _entry_text_blocks(entry: dict[str, Any]) -> list[str]: return texts -def _fold_first_prompt(summary: SessionSummaryEntry, entry: dict[str, Any]) -> None: +def _fold_first_prompt(data: dict[str, Any], entry: dict[str, Any]) -> None: """Replicate ``_extract_first_prompt_from_head`` for a single parsed entry. - Mutates ``summary`` in place: sets ``first_prompt`` + - ``_first_prompt_locked`` on a real match, or stashes a - ``_command_fallback`` for slash-command messages. Skips tool_result, - isMeta, isCompactSummary, and auto-generated patterns. + Mutates ``data`` in place: sets ``first_prompt`` + ``first_prompt_locked`` + on a real match, or stashes a ``command_fallback`` for slash-command + messages. Skips tool_result, isMeta, isCompactSummary, and auto-generated + patterns. """ - if summary.get("_first_prompt_locked"): + if data.get("first_prompt_locked"): return if entry.get("type") != "user": return @@ -97,15 +97,15 @@ def _fold_first_prompt(summary: SessionSummaryEntry, entry: dict[str, Any]) -> N continue cmd_match = _COMMAND_NAME_RE.search(result) if cmd_match: - if not summary.get("_command_fallback"): - summary["_command_fallback"] = cmd_match.group(1) + if not data.get("command_fallback"): + data["command_fallback"] = cmd_match.group(1) continue if _SKIP_FIRST_PROMPT_PATTERN.match(result): continue if len(result) > 200: result = result[:200].rstrip() + "\u2026" - summary["first_prompt"] = result - summary["_first_prompt_locked"] = True + data["first_prompt"] = result + data["first_prompt_locked"] = True return @@ -121,15 +121,19 @@ def fold_session_summary( transcript. ``prev`` is the previous summary for the same key (or ``None`` for the first append). - Set-once fields (``is_sidechain``, ``created_at``, ``cwd``, - ``first_prompt``) freeze on first sight; last-wins fields - (``custom_title``, ``ai_title``, ``last_prompt``, ``summary_hint``, - ``git_branch``, ``tag``, ``mtime``) overwrite on every appearance. + All derived state lives in the opaque ``data`` dict; stores persist it + verbatim and do not interpret it. ``mtime`` stays top-level so stores + can index on it. """ if prev is not None: - summary = cast(SessionSummaryEntry, dict(prev)) + summary: SessionSummaryEntry = { + "session_id": prev["session_id"], + "mtime": prev["mtime"], + "data": dict(prev["data"]), + } else: - summary = {"session_id": key["session_id"], "mtime": 0} + summary = {"session_id": key["session_id"], "mtime": 0, "data": {}} + data = summary["data"] for raw in entries: # SessionStoreEntry is a permissive TypedDict; widen to a plain dict @@ -140,30 +144,30 @@ def fold_session_summary( if ms is not None and ms > summary["mtime"]: summary["mtime"] = ms - if "is_sidechain" not in summary: - summary["is_sidechain"] = entry.get("isSidechain") is True - if "created_at" not in summary and ms is not None: - summary["created_at"] = ms + if "is_sidechain" not in data: + data["is_sidechain"] = entry.get("isSidechain") is True + if "created_at" not in data and ms is not None: + data["created_at"] = ms - if "cwd" not in summary: + if "cwd" not in data: cwd = entry.get("cwd") if isinstance(cwd, str) and cwd: - summary["cwd"] = cwd + data["cwd"] = cwd - _fold_first_prompt(summary, entry) + _fold_first_prompt(data, entry) for src, dst in _LAST_WINS_FIELDS.items(): val = entry.get(src) if isinstance(val, str): - summary[dst] = val # type: ignore[literal-required] + data[dst] = val if entry.get("type") == "tag": tag_val = entry.get("tag") if isinstance(tag_val, str) and tag_val: - summary["tag"] = tag_val + data["tag"] = tag_val else: # Empty string or absent tag clears the tag. - summary.pop("tag", None) + data.pop("tag", None) return summary @@ -176,19 +180,20 @@ def summary_entry_to_sdk_info( Returns ``None`` for sidechain sessions or sessions with no extractable summary, matching ``_parse_session_info_from_lite``'s filtering. """ - if entry.get("is_sidechain"): + data = entry["data"] + if data.get("is_sidechain"): return None first_prompt = ( - entry.get("first_prompt") - if entry.get("_first_prompt_locked") - else entry.get("_command_fallback") + data.get("first_prompt") + if data.get("first_prompt_locked") + else data.get("command_fallback") ) or None - custom_title = entry.get("custom_title") or entry.get("ai_title") or None + custom_title = data.get("custom_title") or data.get("ai_title") or None summary = ( custom_title - or entry.get("last_prompt") - or entry.get("summary_hint") + or data.get("last_prompt") + or data.get("summary_hint") or first_prompt ) if not summary: @@ -198,11 +203,11 @@ def summary_entry_to_sdk_info( session_id=entry["session_id"], summary=summary, last_modified=entry["mtime"], - file_size=entry.get("file_size"), + file_size=data.get("file_size"), custom_title=custom_title, first_prompt=first_prompt, - git_branch=entry.get("git_branch") or None, - cwd=entry.get("cwd") or project_path or None, - tag=entry.get("tag") or None, - created_at=entry.get("created_at"), + git_branch=data.get("git_branch") or None, + cwd=data.get("cwd") or project_path or None, + tag=data.get("tag") or None, + created_at=data.get("created_at"), ) diff --git a/src/claude_agent_sdk/testing/session_store_conformance.py b/src/claude_agent_sdk/testing/session_store_conformance.py index c8fcd9cc..ca039c7f 100644 --- a/src/claude_agent_sdk/testing/session_store_conformance.py +++ b/src/claude_agent_sdk/testing/session_store_conformance.py @@ -167,7 +167,11 @@ async def fresh() -> SessionStore: # --- Optional: list_session_summaries ---------------------------------- if has_list_summaries: - # 14. list_session_summaries reflects incremental fold on append + # 14. list_session_summaries returns persisted fold output that + # round-trips through fold_session_summary again. Stores must NOT + # interpret ``data`` — only persist it verbatim. + from .._internal.session_summary import fold_session_summary + store = await fresh() key: SessionKey = {"project_key": "proj", "session_id": "summ-sess"} await store.append( @@ -191,8 +195,13 @@ async def fresh() -> SessionStore: summ = by_id["summ-sess"] # mtime must be epoch-ms; >1e12 rules out epoch-seconds. assert math.isfinite(summ["mtime"]) and summ["mtime"] > 1e12 - # custom_title is last-wins across append calls. - assert summ.get("custom_title") == "second" + # data is opaque; the contract is that it round-trips into the fold. + assert isinstance(summ["data"], dict) + refolded = fold_session_summary( + summ, key, [_e({"timestamp": "2024-01-01T00:00:03.000Z"})] + ) + assert refolded["session_id"] == "summ-sess" + assert refolded["mtime"] >= summ["mtime"] assert await store.list_session_summaries("never-appended-project") == [] if has_delete: await store.delete(key) diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 24de74eb..3f2ec248 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -1159,41 +1159,21 @@ class SessionStoreListEntry(TypedDict): modification time (e.g. Redis) must maintain their own index.""" -class SessionSummaryEntry(TypedDict, total=False): +class SessionSummaryEntry(TypedDict): """Incrementally-maintained session summary. - Stores update this on :meth:`SessionStore.append` via - :func:`fold_session_summary` and return the full set from - :meth:`SessionStore.list_session_summaries`. Every field is - append-incremental (set-once or last-wins) so adapters never re-read. - - Fields prefixed ``_`` are opaque fold state — stores MUST persist them - verbatim across :func:`fold_session_summary` calls but SHOULD NOT - interpret them. + Stores obtain this from :func:`fold_session_summary` inside + :meth:`SessionStore.append` and persist it verbatim; they return the + full set from :meth:`SessionStore.list_session_summaries`. The ``data`` + field is opaque SDK-owned state — stores MUST NOT interpret it. """ - session_id: Required[str] - mtime: Required[int] - """Last-modified time in Unix epoch milliseconds (last entry timestamp).""" - is_sidechain: bool - created_at: int - """First entry timestamp in Unix epoch milliseconds.""" - cwd: str - first_prompt: str - """First meaningful user prompt, truncated to 200 chars.""" - custom_title: str - ai_title: str - last_prompt: str - summary_hint: str - """Raw ``summary`` key from JSONL.""" - git_branch: str - tag: str - file_size: int - _first_prompt_locked: bool - """Opaque fold state: ``True`` once a non-command prompt has been found.""" - _command_fallback: str - """Opaque fold state: first ```` seen, used when no real - prompt appears.""" + session_id: str + mtime: int + """Last-modified time in Unix epoch milliseconds (last entry timestamp). + Stores may index on this.""" + data: dict[str, Any] + """Opaque SDK-owned summary state. Persist verbatim; do not interpret.""" class SessionListSubkeysKey(TypedDict): diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py index b33db340..d70e5b1f 100644 --- a/tests/test_session_summary.py +++ b/tests/test_session_summary.py @@ -54,7 +54,7 @@ def _user( class TestFoldSessionSummary: def test_init_from_none(self) -> None: s = fold_session_summary(None, KEY, []) - assert s == {"session_id": KEY["session_id"], "mtime": 0} + assert s == {"session_id": KEY["session_id"], "mtime": 0, "data": {}} def test_set_once_fields_freeze(self) -> None: s = fold_session_summary( @@ -70,9 +70,9 @@ def test_set_once_fields_freeze(self) -> None: {"type": "x", "timestamp": "2024-01-01T00:00:05.000Z", "cwd": "/b"}, ], ) - assert s["created_at"] == 1704067200000 - assert s["cwd"] == "/a" - assert s["is_sidechain"] is False + assert s["data"]["created_at"] == 1704067200000 + assert s["data"]["cwd"] == "/a" + assert s["data"]["is_sidechain"] is False # Second append must not overwrite set-once fields. s2 = fold_session_summary( s, @@ -86,9 +86,9 @@ def test_set_once_fields_freeze(self) -> None: } ], ) - assert s2["created_at"] == 1704067200000 - assert s2["cwd"] == "/a" - assert s2["is_sidechain"] is False + assert s2["data"]["created_at"] == 1704067200000 + assert s2["data"]["cwd"] == "/a" + assert s2["data"]["is_sidechain"] is False def test_last_wins_overwrite(self) -> None: s = fold_session_summary( @@ -104,8 +104,8 @@ def test_last_wins_overwrite(self) -> None: {"type": "x", "timestamp": "2024-01-01T00:00:01Z", "customTitle": "t2"}, ], ) - assert s["custom_title"] == "t2" - assert s["git_branch"] == "main" + assert s["data"]["custom_title"] == "t2" + assert s["data"]["git_branch"] == "main" s2 = fold_session_summary( s, KEY, @@ -119,11 +119,11 @@ def test_last_wins_overwrite(self) -> None: } ], ) - assert s2["custom_title"] == "t2" - assert s2["ai_title"] == "ai" - assert s2["last_prompt"] == "lp" - assert s2["summary_hint"] == "sm" - assert s2["git_branch"] == "dev" + assert s2["data"]["custom_title"] == "t2" + assert s2["data"]["ai_title"] == "ai" + assert s2["data"]["last_prompt"] == "lp" + assert s2["data"]["summary_hint"] == "sm" + assert s2["data"]["git_branch"] == "dev" def test_mtime_takes_max(self) -> None: s = fold_session_summary( @@ -138,12 +138,12 @@ def test_mtime_takes_max(self) -> None: def test_tag_set_and_clear(self) -> None: s = fold_session_summary(None, KEY, [{"type": "tag", "tag": "wip"}]) - assert s["tag"] == "wip" + assert s["data"]["tag"] == "wip" s2 = fold_session_summary(s, KEY, [{"type": "tag", "tag": ""}]) - assert "tag" not in s2 + assert "tag" not in s2["data"] # Non-tag entries with a "tag" key (e.g. tool_use input) are ignored. s3 = fold_session_summary(s, KEY, [{"type": "user", "tag": "ignored"}]) - assert s3["tag"] == "wip" + assert s3["data"]["tag"] == "wip" def test_sidechain_from_first_entry(self) -> None: s = fold_session_summary( @@ -151,7 +151,7 @@ def test_sidechain_from_first_entry(self) -> None: KEY, [{"type": "x", "timestamp": "2024-01-01T00:00:00Z", "isSidechain": True}], ) - assert s["is_sidechain"] is True + assert s["data"]["is_sidechain"] is True def test_sidechain_latched_when_first_entry_lacks_timestamp(self) -> None: """Regression: is_sidechain must latch on entry 0 even if its timestamp @@ -164,9 +164,9 @@ def test_sidechain_latched_when_first_entry_lacks_timestamp(self) -> None: {"type": "x", "timestamp": "2024-01-01T00:00:00Z"}, ], ) - assert s["is_sidechain"] is True + assert s["data"]["is_sidechain"] is True # created_at still picks up the first parseable timestamp. - assert s["created_at"] == 1704067200000 + assert s["data"]["created_at"] == 1704067200000 def test_first_prompt_skips_meta_tool_result_and_compact(self) -> None: s = fold_session_summary( @@ -180,8 +180,8 @@ def test_first_prompt_skips_meta_tool_result_and_compact(self) -> None: _user("not me"), ], ) - assert s["first_prompt"] == "real first" - assert s["_first_prompt_locked"] is True + assert s["data"]["first_prompt"] == "real first" + assert s["data"]["first_prompt_locked"] is True def test_first_prompt_command_fallback(self) -> None: s = fold_session_summary( @@ -192,12 +192,12 @@ def test_first_prompt_command_fallback(self) -> None: _user("/second"), ], ) - assert s.get("_first_prompt_locked") is not True - assert s["_command_fallback"] == "/init" + assert s["data"].get("first_prompt_locked") is not True + assert s["data"]["command_fallback"] == "/init" # A later real prompt locks it. s2 = fold_session_summary(s, KEY, [_user("now real")]) - assert s2["first_prompt"] == "now real" - assert s2["_first_prompt_locked"] is True + assert s2["data"]["first_prompt"] == "now real" + assert s2["data"]["first_prompt_locked"] is True def test_first_prompt_skip_pattern(self) -> None: s = fold_session_summary( @@ -205,17 +205,17 @@ def test_first_prompt_skip_pattern(self) -> None: KEY, [_user(" some output"), _user("hello")], ) - assert s["first_prompt"] == "hello" + assert s["data"]["first_prompt"] == "hello" def test_first_prompt_truncated(self) -> None: s = fold_session_summary(None, KEY, [_user("x" * 300)]) - assert len(s["first_prompt"]) <= 201 - assert s["first_prompt"].endswith("\u2026") + assert len(s["data"]["first_prompt"]) <= 201 + assert s["data"]["first_prompt"].endswith("\u2026") def test_prev_is_not_mutated(self) -> None: - prev: SessionSummaryEntry = {"session_id": "a", "mtime": 5} + prev: SessionSummaryEntry = {"session_id": "a", "mtime": 5, "data": {}} fold_session_summary(prev, KEY, [{"type": "x", "customTitle": "t"}]) - assert prev == {"session_id": "a", "mtime": 5} + assert prev == {"session_id": "a", "mtime": 5, "data": {}} # --------------------------------------------------------------------------- @@ -230,8 +230,7 @@ def test_sidechain_returns_none(self) -> None: { "session_id": "s", "mtime": 1, - "is_sidechain": True, - "custom_title": "t", + "data": {"is_sidechain": True, "custom_title": "t"}, }, None, ) @@ -239,40 +238,42 @@ def test_sidechain_returns_none(self) -> None: ) def test_empty_summary_returns_none(self) -> None: - assert summary_entry_to_sdk_info({"session_id": "s", "mtime": 1}, None) is None + assert ( + summary_entry_to_sdk_info({"session_id": "s", "mtime": 1, "data": {}}, None) + is None + ) def test_precedence_chain(self) -> None: - base: SessionSummaryEntry = { - "session_id": "s", - "mtime": 1, + data: dict[str, Any] = { "first_prompt": "fp", - "_first_prompt_locked": True, - "_command_fallback": "/cmd", + "first_prompt_locked": True, + "command_fallback": "/cmd", "summary_hint": "sh", "last_prompt": "lp", "ai_title": "ai", "custom_title": "ct", } + base: SessionSummaryEntry = {"session_id": "s", "mtime": 1, "data": data} info = summary_entry_to_sdk_info(base, None) assert info is not None and info.summary == "ct" and info.custom_title == "ct" - del base["custom_title"] + del data["custom_title"] info = summary_entry_to_sdk_info(base, None) assert info is not None and info.summary == "ai" and info.custom_title == "ai" - del base["ai_title"] + del data["ai_title"] info = summary_entry_to_sdk_info(base, None) assert info is not None and info.summary == "lp" and info.custom_title is None - del base["last_prompt"] + del data["last_prompt"] info = summary_entry_to_sdk_info(base, None) assert info is not None and info.summary == "sh" - del base["summary_hint"] + del data["summary_hint"] info = summary_entry_to_sdk_info(base, None) assert info is not None and info.summary == "fp" and info.first_prompt == "fp" - base["_first_prompt_locked"] = False + data["first_prompt_locked"] = False info = summary_entry_to_sdk_info(base, None) assert ( info is not None and info.summary == "/cmd" and info.first_prompt == "/cmd" @@ -280,11 +281,16 @@ def test_precedence_chain(self) -> None: def test_cwd_fallback_to_project_path(self) -> None: info = summary_entry_to_sdk_info( - {"session_id": "s", "mtime": 1, "custom_title": "t"}, "/proj" + {"session_id": "s", "mtime": 1, "data": {"custom_title": "t"}}, "/proj" ) assert info is not None and info.cwd == "/proj" info2 = summary_entry_to_sdk_info( - {"session_id": "s", "mtime": 1, "custom_title": "t", "cwd": "/own"}, "/proj" + { + "session_id": "s", + "mtime": 1, + "data": {"custom_title": "t", "cwd": "/own"}, + }, + "/proj", ) assert info2 is not None and info2.cwd == "/own" @@ -293,11 +299,13 @@ def test_field_passthrough(self) -> None: { "session_id": "s", "mtime": 99, - "custom_title": "t", - "git_branch": "main", - "tag": "wip", - "created_at": 50, - "file_size": 1234, + "data": { + "custom_title": "t", + "git_branch": "main", + "tag": "wip", + "created_at": 50, + "file_size": 1234, + }, }, None, ) @@ -328,9 +336,9 @@ async def test_tracks_appends(self) -> None: s["session_id"]: s for s in await store.list_session_summaries(PROJECT_KEY) } assert set(summaries) == {"a", "b"} - assert summaries["a"]["custom_title"] == "Title A" - assert summaries["a"]["first_prompt"] == "hello a" - assert summaries["b"]["first_prompt"] == "hello b" + assert summaries["a"]["data"]["custom_title"] == "Title A" + assert summaries["a"]["data"]["first_prompt"] == "hello a" + assert summaries["b"]["data"]["first_prompt"] == "hello b" async def test_subpath_appends_ignored(self) -> None: store = InMemorySessionStore() @@ -346,8 +354,8 @@ async def test_subpath_appends_ignored(self) -> None: ) summaries = await store.list_session_summaries(PROJECT_KEY) assert len(summaries) == 1 - assert summaries[0]["first_prompt"] == "main prompt" - assert "custom_title" not in summaries[0] + assert summaries[0]["data"]["first_prompt"] == "main prompt" + assert "custom_title" not in summaries[0]["data"] async def test_delete_drops_summary(self) -> None: store = InMemorySessionStore() From c8942baff7f0fe362e25cb5fa322fcc6ffe36358 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Sun, 19 Apr 2026 21:36:19 +0000 Subject: [PATCH 05/17] fix: gap-fill list_session_summaries fast-path for sessions missing a sidecar --- src/claude_agent_sdk/_internal/sessions.py | 108 +++++++++++++-------- tests/test_session_summary.py | 76 +++++++++++++++ 2 files changed, 145 insertions(+), 39 deletions(-) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index 7d95fdb6..8e652a71 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1514,6 +1514,53 @@ async def _load_store_entries_as_jsonl( return _entries_to_jsonl(entries) +async def _derive_infos_via_load( + session_store: SessionStore, + listing: list[Any], + directory: str | None, + project_path: str, +) -> list[SDKSessionInfo]: + """Derive ``SDKSessionInfo`` for each ``listing`` entry via per-session + ``store.load()`` + lite-parse. + + Loads run concurrently with a fixed bound so large listings don't exhaust + adapter connection pools or hit backend rate limits; adapter errors degrade + that row to an empty summary instead of failing the whole list. Sidechain + and no-summary sessions are dropped. + """ + sem = asyncio.Semaphore(_STORE_LIST_LOAD_CONCURRENCY) + + async def _bounded_load(sid: str) -> str | None: + async with sem: + return await _load_store_entries_as_jsonl(session_store, sid, directory) + + settled = await asyncio.gather( + *(_bounded_load(e["session_id"]) for e in listing), + return_exceptions=True, + ) + results: list[SDKSessionInfo] = [] + for entry, outcome in zip(listing, settled, strict=True): + sid = entry["session_id"] + mtime = entry["mtime"] + if isinstance(outcome, BaseException): + results.append( + SDKSessionInfo(session_id=sid, summary="", last_modified=mtime) + ) + continue + if outcome is None: + continue + parsed = _parse_session_info_from_lite( + sid, _jsonl_to_lite(outcome, mtime), project_path + ) + if parsed is None: + # Sidechain or no extractable summary — drop, matching the + # filesystem path. + continue + parsed.last_modified = mtime + results.append(parsed) + return results + + async def list_sessions_from_store( session_store: SessionStore, directory: str | None = None, @@ -1556,6 +1603,7 @@ async def list_sessions_from_store( """ project_path = _canonicalize_path(str(directory) if directory is not None else ".") project_key = _sanitize_path(project_path) + has_list_sessions = _store_implements(session_store, "list_sessions") # Fast path: if the store maintains incremental summaries, fetch them in # one call instead of N per-session load()s. @@ -1572,54 +1620,36 @@ async def list_sessions_from_store( for s in summaries if (info := summary_entry_to_sdk_info(s, project_path)) is not None ] + # Gap-fill: a store may have entries for sessions appended before + # it adopted list_session_summaries (no sidecar yet). Enumerate + # via list_sessions() and derive the missing ones via the + # per-session load() path so they aren't silently dropped. + if has_list_sessions: + summary_ids = {s["session_id"] for s in summaries} + listing = list(await session_store.list_sessions(project_key)) + missing = [e for e in listing if e["session_id"] not in summary_ids] + if missing: + infos.extend( + await _derive_infos_via_load( + session_store, missing, directory, project_path + ) + ) return _apply_sort_limit_offset(infos, limit, offset) - if not _store_implements(session_store, "list_sessions"): + if not has_list_sessions: raise ValueError( "session_store does not implement list_sessions() -- cannot list " "sessions. Provide a store with a list_sessions() method." ) - raw = await session_store.list_sessions(project_key) # Copy — store.list_sessions() may return a reference to internal state. - listing = list(raw) - + listing = list(await session_store.list_sessions(project_key)) # Derive a real summary per session by loading its entries and reusing - # the filesystem path's lite-parse. Loads run concurrently with a fixed - # bound so large listings don't exhaust adapter connection pools or hit - # backend rate limits; adapter errors degrade that row instead of failing - # the whole list. Filtering (sidechain/empty drop) happens before - # pagination so ``limit``/``offset`` index the same filtered set as the - # disk path. - sem = asyncio.Semaphore(_STORE_LIST_LOAD_CONCURRENCY) - - async def _bounded_load(sid: str) -> str | None: - async with sem: - return await _load_store_entries_as_jsonl(session_store, sid, directory) - - settled = await asyncio.gather( - *(_bounded_load(e["session_id"]) for e in listing), - return_exceptions=True, + # the filesystem path's lite-parse. Filtering (sidechain/empty drop) + # happens before pagination so ``limit``/``offset`` index the same + # filtered set as the disk path. + results = await _derive_infos_via_load( + session_store, listing, directory, project_path ) - results: list[SDKSessionInfo] = [] - for entry, outcome in zip(listing, settled, strict=True): - sid = entry["session_id"] - mtime = entry["mtime"] - if isinstance(outcome, BaseException): - results.append( - SDKSessionInfo(session_id=sid, summary="", last_modified=mtime) - ) - continue - if outcome is None: - continue - parsed = _parse_session_info_from_lite( - sid, _jsonl_to_lite(outcome, mtime), project_path - ) - if parsed is None: - # Sidechain or no extractable summary — drop, matching the - # filesystem path. - continue - parsed.last_modified = mtime - results.append(parsed) return _apply_sort_limit_offset(results, limit, offset) diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py index d70e5b1f..620fc2cd 100644 --- a/tests/test_session_summary.py +++ b/tests/test_session_summary.py @@ -465,6 +465,82 @@ async def list_session_summaries(self, project_key: str): # noqa: ANN201 assert len(sessions) == 1 assert sessions[0].summary == "hi" + async def test_mixed_sessions_gap_filled(self) -> None: + """A store with summaries for only SOME sessions (e.g. adopted the + method mid-stream) must have the rest gap-filled via per-session + load() so old sessions aren't silently dropped.""" + sid_with = str(uuid_mod.uuid4()) + sid_without = str(uuid_mod.uuid4()) + + class PartialStore(InMemorySessionStore): + load_calls: list[str] = [] + + async def list_session_summaries(self, project_key: str): # noqa: ANN201 + full = await super().list_session_summaries(project_key) + return [s for s in full if s["session_id"] == sid_with] + + async def load(self, key): # noqa: ANN001, ANN201 + self.load_calls.append(key["session_id"]) + return await super().load(key) + + store = PartialStore() + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_with}, + [_user("has sidecar", ts="2024-01-02T00:00:00Z")], + ) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_without}, + [_user("no sidecar", ts="2024-01-01T00:00:00Z")], + ) + + sessions = await list_sessions_from_store(store, directory=DIR) + by_id = {s.session_id: s for s in sessions} + assert set(by_id) == {sid_with, sid_without} + assert by_id[sid_with].summary == "has sidecar" + assert by_id[sid_without].summary == "no sidecar" + # Only the missing session should have been load()ed. + assert store.load_calls == [sid_without] + + async def test_gap_fill_bounded_concurrency(self) -> None: + """Gap-fill reuses the bounded per-session load helper, so + ``_STORE_LIST_LOAD_CONCURRENCY`` applies to the missing-session set.""" + import asyncio + + from claude_agent_sdk._internal import sessions as _sessions + + in_flight = 0 + peak = 0 + gate = asyncio.Event() + + class PartialSlowStore(InMemorySessionStore): + async def list_session_summaries(self, project_key: str): # noqa: ANN201 + return [] # everything is "missing" + + async def load(self, key): # noqa: ANN001, ANN201 + nonlocal in_flight, peak + in_flight += 1 + peak = max(peak, in_flight) + await gate.wait() + in_flight -= 1 + return await super().load(key) + + store = PartialSlowStore() + n = _sessions._STORE_LIST_LOAD_CONCURRENCY * 2 + for i in range(n): + await InMemorySessionStore.append( + store, + {"project_key": PROJECT_KEY, "session_id": str(uuid_mod.uuid4())}, + [_user(f"p{i}")], + ) + + task = asyncio.create_task(list_sessions_from_store(store, directory=DIR)) + for _ in range(5): + await asyncio.sleep(0) + assert 0 < peak <= _sessions._STORE_LIST_LOAD_CONCURRENCY + gate.set() + await task + assert peak == _sessions._STORE_LIST_LOAD_CONCURRENCY + # --------------------------------------------------------------------------- # Parity: incremental fold == batch lite-parse From 2bf54aeef8e8f7bf786ce810e247951670cf09a3 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 20 Apr 2026 06:33:00 +0000 Subject: [PATCH 06/17] docs: note sidecar write serialization + gap-fill list_sessions requirement --- src/claude_agent_sdk/_internal/sessions.py | 13 +++++++++++++ src/claude_agent_sdk/types.py | 7 +++++++ 2 files changed, 20 insertions(+) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index 8e652a71..ad2f4c12 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -8,6 +8,7 @@ import asyncio import json +import logging import os import re import subprocess @@ -21,6 +22,8 @@ from ..types import SDKSessionInfo, SessionKey, SessionMessage, SessionStore from .session_store_validation import _store_implements +logger = logging.getLogger(__name__) + # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- @@ -1600,6 +1603,11 @@ async def list_sessions_from_store( (bounded at 16 concurrent), which on remote backends with many or large sessions can be expensive (e.g., S3 egress, Postgres large-row reads). + + Gap-fill requires ``list_sessions``: if the store implements + ``list_session_summaries`` but not ``list_sessions``, sessions + without a sidecar cannot be discovered and will be absent from the + result. """ project_path = _canonicalize_path(str(directory) if directory is not None else ".") project_key = _sanitize_path(project_path) @@ -1634,6 +1642,11 @@ async def list_sessions_from_store( session_store, missing, directory, project_path ) ) + else: + logger.debug( + "list_session_summaries without list_sessions: gap-fill " + "skipped; sessions lacking a sidecar will be omitted" + ) return _apply_sort_limit_offset(infos, limit, offset) if not has_list_sessions: diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 3f2ec248..2b6be263 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -1261,6 +1261,13 @@ async def list_session_summaries( Optional — if unimplemented, ``list_sessions_from_store()`` falls back to ``list_sessions()`` + per-session ``load()``. + + .. note:: + Stores that maintain summaries inside ``append()`` MUST serialize + sidecar writes if ``append()`` calls can race for the same session + — e.g., wrap the read-fold-write in a transaction/CAS, or hold a + per-session lock. The SDK's :func:`fold_session_summary` is pure; + concurrency control is the store's responsibility. """ raise NotImplementedError From 3425fec55e6d8f1f45f9b9d14274a8921a9dab10 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 20 Apr 2026 06:43:12 +0000 Subject: [PATCH 07/17] =?UTF-8?q?fix:=20address=20review=20comments=20on?= =?UTF-8?q?=20#847=20=E2=80=94=20correct=20fast-path=20call-count=20in=20c?= =?UTF-8?q?ost-note=20docstring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/claude_agent_sdk/_internal/sessions.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index ad2f4c12..7c9d8d54 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1598,11 +1598,13 @@ async def list_sessions_from_store( the store path — the store operates on a single ``project_key``. .. note:: - If the store implements ``list_session_summaries``, this is a single - store call. Otherwise falls back to one ``store.load()`` per session - (bounded at 16 concurrent), which on remote backends with many or - large sessions can be expensive (e.g., S3 egress, Postgres large-row - reads). + If the store implements ``list_session_summaries``, this is one batch + summary call plus one cheap ``list_sessions()`` enumeration to + gap-fill sessions missing a sidecar — zero per-session ``load()`` + calls when sidecars are complete. Otherwise falls back to one + ``store.load()`` per session (bounded at 16 concurrent), which on + remote backends with many or large sessions can be expensive (e.g., + S3 egress, Postgres large-row reads). Gap-fill requires ``list_sessions``: if the store implements ``list_session_summaries`` but not ``list_sessions``, sessions From a1e70f04758192a3beb2121e9c9f3c092dcbf98e Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 20 Apr 2026 06:50:05 +0000 Subject: [PATCH 08/17] fix: paginate gap-fill before per-session load; drop dead file_size; align InMemory mtime clock (parity with TS review) --- .../_internal/session_store.py | 12 +++-- .../_internal/session_summary.py | 4 +- src/claude_agent_sdk/_internal/sessions.py | 50 +++++++++++++------ tests/test_session_summary.py | 46 ++++++++++++++++- 4 files changed, 90 insertions(+), 22 deletions(-) diff --git a/src/claude_agent_sdk/_internal/session_store.py b/src/claude_agent_sdk/_internal/session_store.py index e26d004c..c90d8217 100644 --- a/src/claude_agent_sdk/_internal/session_store.py +++ b/src/claude_agent_sdk/_internal/session_store.py @@ -48,15 +48,19 @@ def __init__(self) -> None: async def append(self, key: SessionKey, entries: list[SessionStoreEntry]) -> None: k = _key_to_string(key) self._store.setdefault(k, []).extend(entries) - self._mtimes[k] = int(time.time() * 1000) # Maintain the per-session summary sidecar incrementally so # list_session_summaries() never re-reads. Subagent subpaths don't # contribute to the main session's summary. if key.get("subpath") is None: sk = (key["project_key"], key["session_id"]) - self._summaries[sk] = fold_session_summary( - self._summaries.get(sk), key, entries - ) + folded = fold_session_summary(self._summaries.get(sk), key, entries) + self._summaries[sk] = folded + # Prefer the entry-timestamp clock so list_sessions() and + # list_session_summaries() sort on the same axis; fall back to + # wall-clock for synthetic entries with no timestamp. + self._mtimes[k] = folded["mtime"] or int(time.time() * 1000) + else: + self._mtimes[k] = int(time.time() * 1000) async def load(self, key: SessionKey) -> list[SessionStoreEntry] | None: entries = self._store.get(_key_to_string(key)) diff --git a/src/claude_agent_sdk/_internal/session_summary.py b/src/claude_agent_sdk/_internal/session_summary.py index f931bf2d..57129d41 100644 --- a/src/claude_agent_sdk/_internal/session_summary.py +++ b/src/claude_agent_sdk/_internal/session_summary.py @@ -203,7 +203,9 @@ def summary_entry_to_sdk_info( session_id=entry["session_id"], summary=summary, last_modified=entry["mtime"], - file_size=data.get("file_size"), + # file_size is a JSONL byte count — meaningful only for the local-disk + # path (see SDKSessionInfo.file_size). Stores have no equivalent. + file_size=None, custom_title=custom_title, first_prompt=first_prompt, git_branch=data.get("git_branch") or None, diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index 7c9d8d54..2a8735ea 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1625,31 +1625,51 @@ async def list_sessions_from_store( except NotImplementedError: pass else: - infos = [ - info + # Build a unified slot list — summaries get their info up front; + # sessions present in list_sessions() but missing a sidecar get a + # placeholder slot so they sort into the page correctly. + slots: list[dict[str, Any]] = [ + { + "mtime": s["mtime"], + "info": summary_entry_to_sdk_info(s, project_path), + } for s in summaries - if (info := summary_entry_to_sdk_info(s, project_path)) is not None ] - # Gap-fill: a store may have entries for sessions appended before - # it adopted list_session_summaries (no sidecar yet). Enumerate - # via list_sessions() and derive the missing ones via the - # per-session load() path so they aren't silently dropped. if has_list_sessions: summary_ids = {s["session_id"] for s in summaries} listing = list(await session_store.list_sessions(project_key)) - missing = [e for e in listing if e["session_id"] not in summary_ids] - if missing: - infos.extend( - await _derive_infos_via_load( - session_store, missing, directory, project_path - ) - ) + slots.extend( + {"mtime": e["mtime"], "session_id": e["session_id"], "info": None} + for e in listing + if e["session_id"] not in summary_ids + ) else: logger.debug( "list_session_summaries without list_sessions: gap-fill " "skipped; sessions lacking a sidecar will be omitted" ) - return _apply_sort_limit_offset(infos, limit, offset) + + # Paginate BEFORE per-session load so gap-fill load() count is + # bounded by page size, not total missing — 500 sessions lacking + # sidecars with limit=10 issues at most 10 load()s, not 500. + slots.sort(key=lambda sl: sl["mtime"], reverse=True) + page = slots[offset:] + if limit is not None and limit > 0: + page = page[:limit] + + to_fill = [sl for sl in page if sl["info"] is None and "session_id" in sl] + if to_fill: + filled = await _derive_infos_via_load( + session_store, to_fill, directory, project_path + ) + by_sid = {f.session_id: f for f in filled} + for sl in to_fill: + sl["info"] = by_sid.get(sl["session_id"]) + + # Slots whose info resolved to None (sidechain / no extractable + # summary) are dropped — matches the fallback path's + # post-derivation filtering semantics. + return [sl["info"] for sl in page if sl["info"] is not None] if not has_list_sessions: raise ValueError( diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py index 620fc2cd..59357553 100644 --- a/tests/test_session_summary.py +++ b/tests/test_session_summary.py @@ -304,7 +304,6 @@ def test_field_passthrough(self) -> None: "git_branch": "main", "tag": "wip", "created_at": 50, - "file_size": 1234, }, }, None, @@ -315,7 +314,8 @@ def test_field_passthrough(self) -> None: assert info.git_branch == "main" assert info.tag == "wip" assert info.created_at == 50 - assert info.file_size == 1234 + # file_size is local-JSONL-only; store-backed summaries always None. + assert info.file_size is None # --------------------------------------------------------------------------- @@ -500,6 +500,48 @@ async def load(self, key): # noqa: ANN001, ANN201 assert by_id[sid_without].summary == "no sidecar" # Only the missing session should have been load()ed. assert store.load_calls == [sid_without] + # Merged result sorts on a single clock (entry timestamps). + assert sessions[0].session_id == sid_with + + async def test_gap_fill_load_bounded_by_limit(self) -> None: + """Gap-fill paginates BEFORE per-session load(), so load() count is + bounded by page size, not total missing.""" + + class CountingStore(InMemorySessionStore): + def __init__(self) -> None: + super().__init__() + self.load_calls: list[str] = [] + + async def list_session_summaries(self, project_key: str): # noqa: ANN201 + full = await super().list_session_summaries(project_key) + return [s for s in full if s["session_id"] == sid_with] + + async def load(self, key): # noqa: ANN001, ANN201 + self.load_calls.append(key["session_id"]) + return await super().load(key) + + store = CountingStore() + sid_with = str(uuid_mod.uuid4()) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_with}, + [_user("with", ts="2024-01-10T00:00:00Z")], + ) + # 5 sessions without sidecars, all older than sid_with. + sids_without = [str(uuid_mod.uuid4()) for _ in range(5)] + for i, sid in enumerate(sids_without): + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid}, + [_user(f"without {i}", ts=f"2024-01-0{i + 1}T00:00:00Z")], + ) + + page = await list_sessions_from_store(store, directory=DIR, limit=2) + # Page = newest 2: sid_with (sidecar) + 1 missing. + assert len(page) == 2 + assert page[0].session_id == sid_with + # load() bounded by page size (≤2), not total missing (5). + assert len(store.load_calls) <= 2 + # Specifically, only the one placeholder in the page was loaded. + assert len(store.load_calls) == 1 async def test_gap_fill_bounded_concurrency(self) -> None: """Gap-fill reuses the bounded per-session load helper, so From 17c87e91711662e1a676aaacc142ecd66212ea69 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 20 Apr 2026 07:04:02 +0000 Subject: [PATCH 09/17] fix: thread project_path through gap-fill; align slot drop semantics with slow path (parity with TS review) --- src/claude_agent_sdk/_internal/sessions.py | 8 +++-- tests/test_session_summary.py | 34 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index 2a8735ea..899fc1b9 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1667,8 +1667,12 @@ async def list_sessions_from_store( sl["info"] = by_sid.get(sl["session_id"]) # Slots whose info resolved to None (sidechain / no extractable - # summary) are dropped — matches the fallback path's - # post-derivation filtering semantics. + # summary) are dropped AFTER pagination — both summary-derived and + # gap-fill slots, so the page is internally consistent. This can + # return fewer than ``limit`` rows; that short-page is the price of + # bounding load() by page size instead of total session count (the + # slow path below drops-then-paginates and never short-pages, but + # at O(N) load cost). return [sl["info"] for sl in page if sl["info"] is not None] if not has_list_sessions: diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py index 59357553..8ba62680 100644 --- a/tests/test_session_summary.py +++ b/tests/test_session_summary.py @@ -543,6 +543,40 @@ async def load(self, key): # noqa: ANN001, ANN201 # Specifically, only the one placeholder in the page was loaded. assert len(store.load_calls) == 1 + async def test_sidechain_summary_short_pages(self) -> None: + """Fast-path drops sidechain/empty summaries AFTER pagination — same + as gap-fill placeholders — so a sidechain in the page yields a short + page. Locks in the paginate-then-drop trade-off (load() bounded by + page size; slow path is drop-then-paginate at O(N) load cost).""" + store = InMemorySessionStore() + sids = [str(uuid_mod.uuid4()) for _ in range(3)] + # Newest is a sidechain; next two are real. + await store.append( + {"project_key": PROJECT_KEY, "session_id": sids[0]}, + [ + { + "type": "user", + "timestamp": "2024-01-03T00:00:00Z", + "isSidechain": True, + "message": {"content": "x"}, + } + ], + ) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sids[1]}, + [_user("real 1", ts="2024-01-02T00:00:00Z")], + ) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sids[2]}, + [_user("real 2", ts="2024-01-01T00:00:00Z")], + ) + + page = await list_sessions_from_store(store, directory=DIR, limit=2) + # limit=2 picks the 2 newest slots; the sidechain one is dropped + # post-pagination → short page of 1. + assert len(page) == 1 + assert page[0].session_id == sids[1] + async def test_gap_fill_bounded_concurrency(self) -> None: """Gap-fill reuses the bounded per-session load helper, so ``_STORE_LIST_LOAD_CONCURRENCY`` applies to the missing-session set.""" From 2cd5f973842b7766379fcecbf0f0feb629b86d62 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 20 Apr 2026 07:10:35 +0000 Subject: [PATCH 10/17] =?UTF-8?q?fix:=20address=20review=20on=20#847=20?= =?UTF-8?q?=E2=80=94=20pre-filter=20summary-backed=20None=20slots;=20dedup?= =?UTF-8?q?e=20fallback=20docstring;=20correct=20Args/Raises=20contract?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/claude_agent_sdk/_internal/sessions.py | 32 ++++++++++++---------- src/claude_agent_sdk/types.py | 3 +- tests/test_session_helpers_store.py | 2 +- tests/test_session_summary.py | 19 +++++++------ 4 files changed, 29 insertions(+), 27 deletions(-) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index 899fc1b9..f8f89d9f 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1579,7 +1579,8 @@ async def list_sessions_from_store( Args: session_store: The store to read from. Must implement - :meth:`SessionStore.list_sessions`. + :meth:`SessionStore.list_session_summaries` or + :meth:`SessionStore.list_sessions` (or both). directory: Project directory used to compute the ``project_key``. Defaults to the current working directory. limit: Maximum number of sessions to return. @@ -1590,7 +1591,8 @@ async def list_sessions_from_store( List of ``SDKSessionInfo`` sorted by ``last_modified`` descending. Raises: - ValueError: If ``session_store`` does not implement + ValueError: If ``session_store`` implements neither + :meth:`SessionStore.list_session_summaries` nor :meth:`SessionStore.list_sessions`. Note: @@ -1628,12 +1630,13 @@ async def list_sessions_from_store( # Build a unified slot list — summaries get their info up front; # sessions present in list_sessions() but missing a sidecar get a # placeholder slot so they sort into the page correctly. + # Summary-backed sidechain/empty sessions are dropped here (free — + # already determined) so they don't consume offset/limit positions, + # matching the disk and slow-path filter-then-paginate semantics. slots: list[dict[str, Any]] = [ - { - "mtime": s["mtime"], - "info": summary_entry_to_sdk_info(s, project_path), - } + {"mtime": s["mtime"], "info": info} for s in summaries + if (info := summary_entry_to_sdk_info(s, project_path)) is not None ] if has_list_sessions: summary_ids = {s["session_id"] for s in summaries} @@ -1666,19 +1669,18 @@ async def list_sessions_from_store( for sl in to_fill: sl["info"] = by_sid.get(sl["session_id"]) - # Slots whose info resolved to None (sidechain / no extractable - # summary) are dropped AFTER pagination — both summary-derived and - # gap-fill slots, so the page is internally consistent. This can - # return fewer than ``limit`` rows; that short-page is the price of - # bounding load() by page size instead of total session count (the - # slow path below drops-then-paginates and never short-pages, but - # at O(N) load cost). + # Gap-fill placeholders that resolved to None (sidechain / no + # extractable summary after load) are dropped here, AFTER + # pagination — that case alone can short-page. Summary-backed + # slots were already pre-filtered above, so a store with complete + # sidecars never short-pages. return [sl["info"] for sl in page if sl["info"] is not None] if not has_list_sessions: raise ValueError( - "session_store does not implement list_sessions() -- cannot list " - "sessions. Provide a store with a list_sessions() method." + "session_store implements neither list_session_summaries() nor " + "list_sessions() -- cannot list sessions. Provide a store with at " + "least one of those methods." ) # Copy — store.list_sessions() may return a reference to internal state. listing = list(await session_store.list_sessions(project_key)) diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 2b6be263..ef89ca5d 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -1256,8 +1256,7 @@ async def list_session_summaries( """Return incrementally-maintained summaries for all sessions in one call. Stores should maintain these via :func:`fold_session_summary` inside - :meth:`append`. If not implemented, ``list_sessions_from_store()`` - falls back to per-session :meth:`load`. + :meth:`append`. Optional — if unimplemented, ``list_sessions_from_store()`` falls back to ``list_sessions()`` + per-session ``load()``. diff --git a/tests/test_session_helpers_store.py b/tests/test_session_helpers_store.py index 45454199..ba537e08 100644 --- a/tests/test_session_helpers_store.py +++ b/tests/test_session_helpers_store.py @@ -126,7 +126,7 @@ async def test_limit_and_offset(self) -> None: async def test_raises_when_store_lacks_list_sessions(self) -> None: store = _MinimalStore() - with pytest.raises(ValueError, match="does not implement list_sessions"): + with pytest.raises(ValueError, match="list_sessions"): await list_sessions_from_store(store, directory=DIR) async def test_drops_sidechain_sessions(self) -> None: diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py index 8ba62680..39095850 100644 --- a/tests/test_session_summary.py +++ b/tests/test_session_summary.py @@ -543,11 +543,12 @@ async def load(self, key): # noqa: ANN001, ANN201 # Specifically, only the one placeholder in the page was loaded. assert len(store.load_calls) == 1 - async def test_sidechain_summary_short_pages(self) -> None: - """Fast-path drops sidechain/empty summaries AFTER pagination — same - as gap-fill placeholders — so a sidechain in the page yields a short - page. Locks in the paginate-then-drop trade-off (load() bounded by - page size; slow path is drop-then-paginate at O(N) load cost).""" + async def test_sidechain_summary_does_not_consume_page_slot(self) -> None: + """Summary-backed sidechain/empty sessions are dropped BEFORE + pagination (free — already determined from the sidecar) so they don't + consume offset/limit positions. Matches the disk and slow-path + filter-then-paginate semantics. Only gap-fill placeholders that + resolve to None after load can short-page.""" store = InMemorySessionStore() sids = [str(uuid_mod.uuid4()) for _ in range(3)] # Newest is a sidechain; next two are real. @@ -572,10 +573,10 @@ async def test_sidechain_summary_short_pages(self) -> None: ) page = await list_sessions_from_store(store, directory=DIR, limit=2) - # limit=2 picks the 2 newest slots; the sidechain one is dropped - # post-pagination → short page of 1. - assert len(page) == 1 - assert page[0].session_id == sids[1] + # The sidechain summary is pre-filtered, so limit=2 returns both real + # sessions — full page, matching the slow path. + assert len(page) == 2 + assert [s.session_id for s in page] == [sids[1], sids[2]] async def test_gap_fill_bounded_concurrency(self) -> None: """Gap-fill reuses the bounded per-session load helper, so From 1d6e4acd9a12741f9945cff834689f802fc68d58 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 20 Apr 2026 07:24:57 +0000 Subject: [PATCH 11/17] docs: fold_session_summary subpath guard + conformance check that subagent appends don't affect main summary --- src/claude_agent_sdk/_internal/session_summary.py | 4 ++++ .../testing/session_store_conformance.py | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/src/claude_agent_sdk/_internal/session_summary.py b/src/claude_agent_sdk/_internal/session_summary.py index 57129d41..e77a4b9e 100644 --- a/src/claude_agent_sdk/_internal/session_summary.py +++ b/src/claude_agent_sdk/_internal/session_summary.py @@ -121,6 +121,10 @@ def fold_session_summary( transcript. ``prev`` is the previous summary for the same key (or ``None`` for the first append). + Do not call this for keys with a ``subpath`` — subagent transcripts must + not contribute to the main session's summary. Guard with + ``if key.get("subpath") is None:`` before calling. + All derived state lives in the opaque ``data`` dict; stores persist it verbatim and do not interpret it. ``mtime`` stays top-level so stores can index on it. diff --git a/src/claude_agent_sdk/testing/session_store_conformance.py b/src/claude_agent_sdk/testing/session_store_conformance.py index ca039c7f..535178d4 100644 --- a/src/claude_agent_sdk/testing/session_store_conformance.py +++ b/src/claude_agent_sdk/testing/session_store_conformance.py @@ -202,6 +202,15 @@ async def fresh() -> SessionStore: ) assert refolded["session_id"] == "summ-sess" assert refolded["mtime"] >= summ["mtime"] + # Subagent appends must NOT affect the main session's summary. + await store.append( + {**key, "subpath": "subagents/agent-1"}, + [_e({"timestamp": "2024-01-01T00:00:09.000Z", "customTitle": "subagent"})], + ) + after_sub = { + s["session_id"]: s for s in await store.list_session_summaries("proj") + } + assert after_sub["summ-sess"]["data"] == summ["data"] assert await store.list_session_summaries("never-appended-project") == [] if has_delete: await store.delete(key) From 8188b8b03e2b3d738e8f87dbdff4b7d691b53c2c Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 20 Apr 2026 07:34:57 +0000 Subject: [PATCH 12/17] fix: mirror _apply_sort_limit_offset offset>0 guard in fast-path slot pagination --- src/claude_agent_sdk/_internal/sessions.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index f8f89d9f..476ed883 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1656,7 +1656,10 @@ async def list_sessions_from_store( # bounded by page size, not total missing — 500 sessions lacking # sidecars with limit=10 issues at most 10 load()s, not 500. slots.sort(key=lambda sl: sl["mtime"], reverse=True) - page = slots[offset:] + # Mirror _apply_sort_limit_offset's guards so negative/zero + # offset and non-positive limit behave identically to the slow + # and disk paths. + page = slots[offset:] if offset > 0 else slots if limit is not None and limit > 0: page = page[:limit] From 46fec6ef5a693a4fc7445861a3c1ee3ce9ecb1b4 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 20 Apr 2026 07:50:07 +0000 Subject: [PATCH 13/17] docs: note created_at fold-vs-lite divergence for first-entry-missing-timestamp edge --- src/claude_agent_sdk/_internal/session_summary.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/claude_agent_sdk/_internal/session_summary.py b/src/claude_agent_sdk/_internal/session_summary.py index e77a4b9e..60768a38 100644 --- a/src/claude_agent_sdk/_internal/session_summary.py +++ b/src/claude_agent_sdk/_internal/session_summary.py @@ -128,6 +128,12 @@ def fold_session_summary( All derived state lives in the opaque ``data`` dict; stores persist it verbatim and do not interpret it. ``mtime`` stays top-level so stores can index on it. + + ``created_at`` latches the first parseable entry timestamp; the disk + lite-parse only inspects the first line, so for streams whose first + entry lacks a timestamp (does not occur in CLI-produced transcripts) + the fold path yields a non-``None`` ``created_at`` where lite-parse + yields ``None``. """ if prev is not None: summary: SessionSummaryEntry = { From 6835c186d5a077345572a6911433c1e37f5a5ab7 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 20 Apr 2026 08:09:19 +0000 Subject: [PATCH 14/17] refactor: drop dead 'session_id in sl' guard in to_fill (vestigial after 2cd5f97 pre-filter) --- src/claude_agent_sdk/_internal/sessions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index 476ed883..553b3a78 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1663,7 +1663,7 @@ async def list_sessions_from_store( if limit is not None and limit > 0: page = page[:limit] - to_fill = [sl for sl in page if sl["info"] is None and "session_id" in sl] + to_fill = [sl for sl in page if sl["info"] is None] if to_fill: filled = await _derive_infos_via_load( session_store, to_fill, directory, project_path From 6449a39566e4dd062b93d7a175fb2998075480bb Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Tue, 21 Apr 2026 00:51:17 +0000 Subject: [PATCH 15/17] feat(session-summary): gap-fill stale sidecars via mtime check Compare SessionSummaryEntry.mtime against the session's current mtime from list_sessions. When the sidecar lags the transcript, route the session into the existing gap-fill path so the SDK re-folds from source entries. Adds a conformance case and staleness docs. --- src/claude_agent_sdk/_internal/sessions.py | 49 ++++++++---- tests/test_session_summary.py | 90 ++++++++++++++++++++++ 2 files changed, 125 insertions(+), 14 deletions(-) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index 553b3a78..0af78659 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1627,31 +1627,52 @@ async def list_sessions_from_store( except NotImplementedError: pass else: - # Build a unified slot list — summaries get their info up front; - # sessions present in list_sessions() but missing a sidecar get a - # placeholder slot so they sort into the page correctly. + # Build a unified slot list. Fresh summaries (mtime >= the + # session's current mtime from list_sessions) get their info up + # front; sessions present in list_sessions() but missing OR with a + # stale sidecar (summary.mtime < known mtime) get a placeholder + # slot routed through the same gap-fill path so the fold is + # recomputed from source entries. # Summary-backed sidechain/empty sessions are dropped here (free — # already determined) so they don't consume offset/limit positions, # matching the disk and slow-path filter-then-paginate semantics. - slots: list[dict[str, Any]] = [ - {"mtime": s["mtime"], "info": info} - for s in summaries - if (info := summary_entry_to_sdk_info(s, project_path)) is not None - ] if has_list_sessions: - summary_ids = {s["session_id"] for s in summaries} listing = list(await session_store.list_sessions(project_key)) - slots.extend( - {"mtime": e["mtime"], "session_id": e["session_id"], "info": None} - for e in listing - if e["session_id"] not in summary_ids - ) + known_mtimes = {e["session_id"]: e["mtime"] for e in listing} else: + listing = [] + known_mtimes = {} logger.debug( "list_session_summaries without list_sessions: gap-fill " "skipped; sessions lacking a sidecar will be omitted" ) + slots: list[dict[str, Any]] = [] + fresh_summary_ids: set[str] = set() + for s in summaries: + sid = s["session_id"] + if has_list_sessions: + known = known_mtimes.get(sid) + if known is None: + # Summary for a session list_sessions() no longer + # reports — drop it. + continue + if s["mtime"] < known: + # Stale sidecar — let gap-fill re-fold from source. + continue + info = summary_entry_to_sdk_info(s, project_path) + if info is None: + fresh_summary_ids.add(sid) + continue + slots.append({"mtime": s["mtime"], "info": info}) + fresh_summary_ids.add(sid) + if has_list_sessions: + slots.extend( + {"mtime": e["mtime"], "session_id": e["session_id"], "info": None} + for e in listing + if e["session_id"] not in fresh_summary_ids + ) + # Paginate BEFORE per-session load so gap-fill load() count is # bounded by page size, not total missing — 500 sessions lacking # sidecars with limit=10 issues at most 10 load()s, not 500. diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py index 39095850..72a4e3be 100644 --- a/tests/test_session_summary.py +++ b/tests/test_session_summary.py @@ -578,6 +578,96 @@ async def test_sidechain_summary_does_not_consume_page_slot(self) -> None: assert len(page) == 2 assert [s.session_id for s in page] == [sids[1], sids[2]] + async def test_stale_sidecar_triggers_gap_fill(self) -> None: + """A sidecar whose mtime lags the session's current mtime from + list_sessions must be treated as missing: route it through gap-fill so + the SDK re-folds from source entries and the result reflects fresh + transcript state, not the stale sidecar values.""" + sid = str(uuid_mod.uuid4()) + stale_mtime = 1_704_067_260_000 # 2024-01-01T00:01:00Z + fresh_mtime = 1_704_153_660_000 # 2024-01-02T00:01:00Z + + class StaleSidecarStore(InMemorySessionStore): + """Reports fresh transcript state but a stale summary sidecar.""" + + def __init__(self) -> None: + super().__init__() + self.load_calls: list[str] = [] + + async def list_session_summaries(self, project_key): # noqa: ANN001, ANN201 + # Serve a stale summary reflecting T1 state. + if project_key != PROJECT_KEY: + return [] + return [ + { + "session_id": sid, + "mtime": stale_mtime, + "data": { + "custom_title": "old", + "first_prompt": "old prompt", + "first_prompt_locked": True, + "created_at": stale_mtime, + }, + } + ] + + async def load(self, key): # noqa: ANN001, ANN201 + self.load_calls.append(key["session_id"]) + return await super().load(key) + + store = StaleSidecarStore() + # Populate the real transcript with fresh entries so list_sessions() + # reports the fresh mtime and a gap-fill load() yields fresh info. + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid}, + [ + _user("fresh prompt", ts="2024-01-02T00:00:00Z"), + { + "type": "x", + "timestamp": "2024-01-02T00:01:00Z", + "customTitle": "fresh", + }, + ], + ) + # Sanity-check the setup: list_sessions reports fresh mtime. + listed = await InMemorySessionStore.list_sessions(store, PROJECT_KEY) + assert listed[0]["mtime"] > stale_mtime + + sessions = await list_sessions_from_store(store, directory=DIR) + assert len(sessions) == 1 + info = sessions[0] + assert info.session_id == sid + # Fresh transcript state wins — stale customTitle must NOT leak through. + assert info.custom_title == "fresh" + assert info.summary == "fresh" + assert info.last_modified >= fresh_mtime + # Stale entry was routed into gap-fill, so load() was called for it. + assert store.load_calls == [sid] + + async def test_summary_without_listing_is_dropped(self) -> None: + """A summary for a session that list_sessions() no longer reports must + be dropped from the fast-path result.""" + sid_real = str(uuid_mod.uuid4()) + sid_ghost = str(uuid_mod.uuid4()) + + class GhostStore(InMemorySessionStore): + async def list_sessions(self, project_key: str): # noqa: ANN201 + full = await super().list_sessions(project_key) + return [s for s in full if s["session_id"] != sid_ghost] + + store = GhostStore() + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_real}, + [_user("real", ts="2024-01-02T00:00:00Z")], + ) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_ghost}, + [_user("ghost", ts="2024-01-01T00:00:00Z")], + ) + + sessions = await list_sessions_from_store(store, directory=DIR) + assert {s.session_id for s in sessions} == {sid_real} + async def test_gap_fill_bounded_concurrency(self) -> None: """Gap-fill reuses the bounded per-session load helper, so ``_STORE_LIST_LOAD_CONCURRENCY`` applies to the missing-session set.""" From e3ebe579927ede51cdac14ecd531709f2f163fb3 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Tue, 21 Apr 2026 01:15:23 +0000 Subject: [PATCH 16/17] fix(session-summary): sidecar mtime is storage write time, not entry time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Clarify and enforce that SessionSummaryEntry.mtime shares a clock with list_sessions().mtime for the same session — typically storage write time (file mtime, S3 LastModified, Postgres updated_at). Adapters using batched/deferred writes report storage times strictly later than the last entry's ISO timestamp, making every sidecar look stale under the previous semantics. Fold helper no longer sets mtime; adapter stamps it at persist time. Reference store updated to use a single clock for both list_sessions and list_session_summaries. Adds a test proving a storage-newer sidecar with entry-older timestamps stays on the fast path. --- .../_internal/session_store.py | 31 +++- .../_internal/session_summary.py | 16 +- src/claude_agent_sdk/types.py | 14 +- tests/test_session_summary.py | 143 +++++++++++++++--- 4 files changed, 172 insertions(+), 32 deletions(-) diff --git a/src/claude_agent_sdk/_internal/session_store.py b/src/claude_agent_sdk/_internal/session_store.py index c90d8217..bb6a2155 100644 --- a/src/claude_agent_sdk/_internal/session_store.py +++ b/src/claude_agent_sdk/_internal/session_store.py @@ -44,23 +44,41 @@ def __init__(self) -> None: self._store: dict[str, list[SessionStoreEntry]] = {} self._mtimes: dict[str, int] = {} self._summaries: dict[tuple[str, str], SessionSummaryEntry] = {} + self._last_mtime = 0 + + def _next_mtime(self) -> int: + """Storage write time for this adapter, in Unix epoch ms. + + Guaranteed strictly monotonically increasing across calls within the + process so back-to-back appends always produce distinct mtimes (real + storage backends — file mtime on modern filesystems, S3 + LastModified, Postgres updated_at — get this property for free from + their commit ordering). + """ + now_ms = int(time.time() * 1000) + if now_ms <= self._last_mtime: + now_ms = self._last_mtime + 1 + self._last_mtime = now_ms + return now_ms async def append(self, key: SessionKey, entries: list[SessionStoreEntry]) -> None: k = _key_to_string(key) self._store.setdefault(k, []).extend(entries) + now_ms = self._next_mtime() # Maintain the per-session summary sidecar incrementally so # list_session_summaries() never re-reads. Subagent subpaths don't # contribute to the main session's summary. if key.get("subpath") is None: sk = (key["project_key"], key["session_id"]) folded = fold_session_summary(self._summaries.get(sk), key, entries) + # Stamp the sidecar with this adapter's storage write time — the + # SAME clock list_sessions() exposes below. SessionSummaryEntry. + # mtime is contractually storage write time (not entry time), so + # the fast-path staleness check (summary.mtime < list_sessions + # mtime) works correctly. + folded["mtime"] = now_ms self._summaries[sk] = folded - # Prefer the entry-timestamp clock so list_sessions() and - # list_session_summaries() sort on the same axis; fall back to - # wall-clock for synthetic entries with no timestamp. - self._mtimes[k] = folded["mtime"] or int(time.time() * 1000) - else: - self._mtimes[k] = int(time.time() * 1000) + self._mtimes[k] = now_ms async def load(self, key: SessionKey) -> list[SessionStoreEntry] | None: entries = self._store.get(_key_to_string(key)) @@ -125,6 +143,7 @@ def clear(self) -> None: self._store.clear() self._mtimes.clear() self._summaries.clear() + self._last_mtime = 0 def file_path_to_session_key(file_path: str, projects_dir: str) -> SessionKey | None: diff --git a/src/claude_agent_sdk/_internal/session_summary.py b/src/claude_agent_sdk/_internal/session_summary.py index 60768a38..3a509572 100644 --- a/src/claude_agent_sdk/_internal/session_summary.py +++ b/src/claude_agent_sdk/_internal/session_summary.py @@ -126,8 +126,18 @@ def fold_session_summary( ``if key.get("subpath") is None:`` before calling. All derived state lives in the opaque ``data`` dict; stores persist it - verbatim and do not interpret it. ``mtime`` stays top-level so stores - can index on it. + verbatim and do not interpret it. + + ``mtime`` is NOT touched by the fold — it is the sidecar's storage + write time and must be stamped by the adapter after persisting. It has + to share a clock with the ``mtime`` returned by + :meth:`SessionStore.list_sessions` for the same session (typically file + mtime, S3 ``LastModified``, Postgres ``updated_at``, or whatever native + timestamp the adapter surfaces); deriving it from entry ISO timestamps + would make every batched-write sidecar appear strictly older than the + session's current mtime, defeating the fast-path staleness check. For a + new session (``prev is None``) the fold returns ``mtime=0`` as a + placeholder; the adapter is expected to overwrite it. ``created_at`` latches the first parseable entry timestamp; the disk lite-parse only inspects the first line, so for streams whose first @@ -151,8 +161,6 @@ def fold_session_summary( entry = cast("dict[str, Any]", raw) ms = _iso_to_epoch_ms(entry.get("timestamp")) - if ms is not None and ms > summary["mtime"]: - summary["mtime"] = ms if "is_sidechain" not in data: data["is_sidechain"] = entry.get("isSidechain") is True diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index ef89ca5d..64b5536b 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -1170,8 +1170,18 @@ class SessionSummaryEntry(TypedDict): session_id: str mtime: int - """Last-modified time in Unix epoch milliseconds (last entry timestamp). - Stores may index on this.""" + """Storage write time of the sidecar, in Unix epoch milliseconds. Must use + the same clock source as the ``mtime`` returned by + :meth:`SessionStore.list_sessions` for this session — typically file + mtime, S3 ``LastModified``, Postgres ``updated_at``, or whatever native + timestamp the adapter surfaces. Do NOT derive this from entry ISO + timestamps: adapters that write in batches with any persist latency + (every real backend) would report storage times strictly later than the + last entry's timestamp, making every sidecar appear stale and defeating + the fast-path staleness check in ``list_sessions_from_store``. + :func:`fold_session_summary` preserves whatever ``mtime`` the caller + passes in via ``prev`` and does not set it itself; stamp it after + persisting.""" data: dict[str, Any] """Opaque SDK-owned summary state. Persist verbatim; do not interpret.""" diff --git a/tests/test_session_summary.py b/tests/test_session_summary.py index 72a4e3be..603014b8 100644 --- a/tests/test_session_summary.py +++ b/tests/test_session_summary.py @@ -125,7 +125,12 @@ def test_last_wins_overwrite(self) -> None: assert s2["data"]["summary_hint"] == "sm" assert s2["data"]["git_branch"] == "dev" - def test_mtime_takes_max(self) -> None: + def test_mtime_not_derived_from_entries(self) -> None: + """The fold must not touch mtime — it is the sidecar's storage write + time, stamped by the adapter at persist time, on the same clock as + list_sessions().mtime. Deriving it from entry ISO timestamps would + make every batched-write sidecar appear strictly older than the + session's current mtime, defeating the fast-path staleness check.""" s = fold_session_summary( None, KEY, @@ -134,7 +139,20 @@ def test_mtime_takes_max(self) -> None: {"type": "x", "timestamp": "2024-01-01T00:00:01.000Z"}, ], ) - assert s["mtime"] == 1704067205000 + # New session: fold returns mtime=0 placeholder, adapter must stamp. + assert s["mtime"] == 0 + + # Carry-over: prev mtime is preserved verbatim regardless of entry + # timestamps in the new batch. + prev: SessionSummaryEntry = { + "session_id": KEY["session_id"], + "mtime": 42, + "data": {}, + } + s2 = fold_session_summary( + prev, KEY, [{"type": "x", "timestamp": "2024-01-01T00:00:10.000Z"}] + ) + assert s2["mtime"] == 42 def test_tag_set_and_clear(self) -> None: s = fold_session_summary(None, KEY, [{"type": "tag", "tag": "wip"}]) @@ -500,8 +518,10 @@ async def load(self, key): # noqa: ANN001, ANN201 assert by_id[sid_without].summary == "no sidecar" # Only the missing session should have been load()ed. assert store.load_calls == [sid_without] - # Merged result sorts on a single clock (entry timestamps). - assert sessions[0].session_id == sid_with + # Merged result sorts on a single clock (storage write time). In + # InMemorySessionStore, that's a strictly monotonic counter, so + # sid_without (appended second) sorts newest. + assert sessions[0].session_id == sid_without async def test_gap_fill_load_bounded_by_limit(self) -> None: """Gap-fill paginates BEFORE per-session load(), so load() count is @@ -522,20 +542,23 @@ async def load(self, key): # noqa: ANN001, ANN201 store = CountingStore() sid_with = str(uuid_mod.uuid4()) - await store.append( - {"project_key": PROJECT_KEY, "session_id": sid_with}, - [_user("with", ts="2024-01-10T00:00:00Z")], - ) - # 5 sessions without sidecars, all older than sid_with. + # 5 sessions without sidecars. InMemorySessionStore stamps storage + # mtime strictly monotonically per append, so these first 5 appends + # are all older than sid_with below. sids_without = [str(uuid_mod.uuid4()) for _ in range(5)] for i, sid in enumerate(sids_without): await store.append( {"project_key": PROJECT_KEY, "session_id": sid}, [_user(f"without {i}", ts=f"2024-01-0{i + 1}T00:00:00Z")], ) + # Append sid_with last so storage mtime makes it the newest session. + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid_with}, + [_user("with", ts="2024-01-10T00:00:00Z")], + ) page = await list_sessions_from_store(store, directory=DIR, limit=2) - # Page = newest 2: sid_with (sidecar) + 1 missing. + # Page = newest 2: sid_with (sidecar) + newest 1 missing. assert len(page) == 2 assert page[0].session_id == sid_with # load() bounded by page size (≤2), not total missing (5). @@ -551,7 +574,17 @@ async def test_sidechain_summary_does_not_consume_page_slot(self) -> None: resolve to None after load can short-page.""" store = InMemorySessionStore() sids = [str(uuid_mod.uuid4()) for _ in range(3)] - # Newest is a sidechain; next two are real. + # Append order determines storage mtime (InMemorySessionStore + # monotonic counter). We append the two real sessions first and the + # sidechain LAST so the sidechain is the newest-by-storage-mtime. + await store.append( + {"project_key": PROJECT_KEY, "session_id": sids[2]}, + [_user("real 2", ts="2024-01-01T00:00:00Z")], + ) + await store.append( + {"project_key": PROJECT_KEY, "session_id": sids[1]}, + [_user("real 1", ts="2024-01-02T00:00:00Z")], + ) await store.append( {"project_key": PROJECT_KEY, "session_id": sids[0]}, [ @@ -563,18 +596,11 @@ async def test_sidechain_summary_does_not_consume_page_slot(self) -> None: } ], ) - await store.append( - {"project_key": PROJECT_KEY, "session_id": sids[1]}, - [_user("real 1", ts="2024-01-02T00:00:00Z")], - ) - await store.append( - {"project_key": PROJECT_KEY, "session_id": sids[2]}, - [_user("real 2", ts="2024-01-01T00:00:00Z")], - ) page = await list_sessions_from_store(store, directory=DIR, limit=2) # The sidechain summary is pre-filtered, so limit=2 returns both real - # sessions — full page, matching the slow path. + # sessions — full page, matching the slow path. sids[1] was appended + # after sids[2] and is therefore newer by storage mtime. assert len(page) == 2 assert [s.session_id for s in page] == [sids[1], sids[2]] @@ -644,6 +670,83 @@ async def load(self, key): # noqa: ANN001, ANN201 # Stale entry was routed into gap-fill, so load() was called for it. assert store.load_calls == [sid] + async def test_fresh_sidecar_with_storage_newer_mtime_not_gap_filled(self) -> None: + """The sidecar's mtime is storage write time, not entry time. For + adapters that use native storage mtime (file mtime, S3 LastModified, + Postgres updated_at), every successful batched append records a + storage mtime strictly later than the last entry's ISO timestamp + (~100ms batch cadence + network latency). The staleness check must + NOT flag these fresh sidecars as stale — otherwise every slot routes + through gap-fill load() and the fast path's N->1 goal is defeated. + """ + sid = str(uuid_mod.uuid4()) + # T1: entry ISO timestamp embedded in the transcript. + t1 = 1_704_067_200_000 # 2024-01-01T00:00:00Z + # T2: storage write time for both list_sessions and sidecar — + # strictly later than T1 (batcher + network delay). The point of + # this test is that T2 > T1 does NOT by itself mark the sidecar + # stale: both list_sessions().mtime and sidecar.mtime come from + # the same storage clock (T2), so summary.mtime == known.mtime. + t2 = 1_704_067_200_250 # +250ms of persist latency + + class StorageMtimeStore(InMemorySessionStore): + """Adapter that stamps both list_sessions and the sidecar with + storage-native mtime — strictly later than entry ISO timestamps. + """ + + def __init__(self) -> None: + super().__init__() + self.load_calls: list[str] = [] + + async def list_sessions(self, project_key: str): # noqa: ANN201 + # Pretend storage mtime is T2 for every session. + full = await super().list_sessions(project_key) + return [{"session_id": e["session_id"], "mtime": t2} for e in full] + + async def list_session_summaries(self, project_key: str): # noqa: ANN201 + # Sidecar mtime is also T2 (same clock). + full = await super().list_session_summaries(project_key) + return [ + {"session_id": s["session_id"], "mtime": t2, "data": s["data"]} + for s in full + ] + + async def load(self, key): # noqa: ANN001, ANN201 + self.load_calls.append(key["session_id"]) + return await super().load(key) + + store = StorageMtimeStore() + await store.append( + {"project_key": PROJECT_KEY, "session_id": sid}, + [ + _user("fresh prompt", ts="2024-01-01T00:00:00.000Z"), + { + "type": "x", + "timestamp": "2024-01-01T00:00:00.000Z", + "customTitle": "fresh", + }, + ], + ) + # Entries carry ISO timestamps at T1; storage records the write at T2. + # Sanity-check the adapter reports T2 from both surfaces. + listed = await store.list_sessions(PROJECT_KEY) + assert listed[0]["mtime"] == t2 + summ = await store.list_session_summaries(PROJECT_KEY) + assert summ[0]["mtime"] == t2 + # Entry timestamps are at T1, strictly older than T2. + assert t2 > t1 + + sessions = await list_sessions_from_store(store, directory=DIR) + assert len(sessions) == 1 + info = sessions[0] + assert info.session_id == sid + assert info.summary == "fresh" + assert info.last_modified == t2 + # Fast path: load() must NOT have been called — summary.mtime == + # known.mtime means not stale, so the slot returns its summary-derived + # info directly. + assert store.load_calls == [] + async def test_summary_without_listing_is_dropped(self) -> None: """A summary for a session that list_sessions() no longer reports must be dropped from the fast-path result.""" From 108cb6eaf046fd7401b8a89176ab07a0e1055b7e Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Tue, 21 Apr 2026 04:58:28 +0000 Subject: [PATCH 17/17] =?UTF-8?q?fix(session-summary):=20address=20yellow?= =?UTF-8?q?=20review=20=E2=80=94=20stale-sidecar=20docs,=20subpath=20skip?= =?UTF-8?q?=20note,=20conformance=20clock=20alignment?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - sessions.py: fast-path doctring note + inline return comment now say 'complete and fresh' sidecars never short-page; a present-but-stale sidecar (summary.mtime < list_sessions.mtime) is routed through the same gap-fill as a missing one and can short-page. - types.py: SessionStore.list_session_summaries docstring explicitly notes that subagent (subpath) keys must be skipped by the fold and that results are scoped to project_key like list_sessions. - conformance: replace tautological 'refolded[mtime] >= summ[mtime]' check (the fold preserves prev mtime verbatim) with an explicit equality, and add a clock-alignment assertion under has_list_sessions that catches adapters deriving sidecar mtime from entry ISO timestamps (which would make every sidecar look stale to the fast-path freshness check). - test: pin test_limit_offset_applied_after_sidechain_filter to the slow path (list_session_summaries -> NotImplementedError) so it keeps covering slow-path filter-THEN-paginate — the fast path deliberately locks in paginate-THEN-drop for summary-backed sidechain slots. --- src/claude_agent_sdk/_internal/sessions.py | 15 +++++++++------ .../testing/session_store_conformance.py | 16 +++++++++++++++- src/claude_agent_sdk/types.py | 6 +++++- tests/test_session_helpers_store.py | 16 ++++++++++++++-- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index 0af78659..e495d476 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -1602,11 +1602,12 @@ async def list_sessions_from_store( .. note:: If the store implements ``list_session_summaries``, this is one batch summary call plus one cheap ``list_sessions()`` enumeration to - gap-fill sessions missing a sidecar — zero per-session ``load()`` - calls when sidecars are complete. Otherwise falls back to one - ``store.load()`` per session (bounded at 16 concurrent), which on - remote backends with many or large sessions can be expensive (e.g., - S3 egress, Postgres large-row reads). + gap-fill sessions missing a sidecar or whose sidecar is stale + (``summary.mtime < list_sessions.mtime``) — zero per-session + ``load()`` calls when sidecars are complete and fresh. Otherwise + falls back to one ``store.load()`` per session (bounded at 16 + concurrent), which on remote backends with many or large sessions + can be expensive (e.g., S3 egress, Postgres large-row reads). Gap-fill requires ``list_sessions``: if the store implements ``list_session_summaries`` but not ``list_sessions``, sessions @@ -1697,7 +1698,9 @@ async def list_sessions_from_store( # extractable summary after load) are dropped here, AFTER # pagination — that case alone can short-page. Summary-backed # slots were already pre-filtered above, so a store with complete - # sidecars never short-pages. + # and fresh sidecars never short-pages; a present-but-stale + # sidecar is routed through gap-fill (same as a missing one) and + # can short-page if load() yields no extractable summary. return [sl["info"] for sl in page if sl["info"] is not None] if not has_list_sessions: diff --git a/src/claude_agent_sdk/testing/session_store_conformance.py b/src/claude_agent_sdk/testing/session_store_conformance.py index 535178d4..2b7f8a7f 100644 --- a/src/claude_agent_sdk/testing/session_store_conformance.py +++ b/src/claude_agent_sdk/testing/session_store_conformance.py @@ -195,13 +195,27 @@ async def fresh() -> SessionStore: summ = by_id["summ-sess"] # mtime must be epoch-ms; >1e12 rules out epoch-seconds. assert math.isfinite(summ["mtime"]) and summ["mtime"] > 1e12 + # Clock alignment: sidecar mtime is storage write time (adapter- + # stamped at persist), and must share a clock with + # list_sessions().mtime for the same session. Adapters that derive + # sidecar mtime from entry ISO timestamps would report a strictly + # older value than list_sessions()'s storage-time mtime and make + # every sidecar look stale to the fast-path freshness check in + # list_sessions_from_store(); this assertion catches that. + if has_list_sessions: + ls_by_id = { + e["session_id"]: e["mtime"] for e in await store.list_sessions("proj") + } + assert summ["mtime"] >= ls_by_id["summ-sess"] # data is opaque; the contract is that it round-trips into the fold. assert isinstance(summ["data"], dict) refolded = fold_session_summary( summ, key, [_e({"timestamp": "2024-01-01T00:00:03.000Z"})] ) assert refolded["session_id"] == "summ-sess" - assert refolded["mtime"] >= summ["mtime"] + # The fold preserves prev["mtime"] verbatim — mtime is stamped by + # the adapter after persisting, not by the fold. + assert refolded["mtime"] == summ["mtime"] # Subagent appends must NOT affect the main session's summary. await store.append( {**key, "subpath": "subagents/agent-1"}, diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 64b5536b..26e25b89 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -1266,7 +1266,11 @@ async def list_session_summaries( """Return incrementally-maintained summaries for all sessions in one call. Stores should maintain these via :func:`fold_session_summary` inside - :meth:`append`. + :meth:`append`. Skip the fold for keys with a ``subpath`` — subagent + transcripts must not contribute to the main session's summary. + + Like :meth:`list_sessions`, results are scoped to a single + ``project_key`` and exclude ``subpath`` entries. Optional — if unimplemented, ``list_sessions_from_store()`` falls back to ``list_sessions()`` + per-session ``load()``. diff --git a/tests/test_session_helpers_store.py b/tests/test_session_helpers_store.py index ba537e08..0e2859bf 100644 --- a/tests/test_session_helpers_store.py +++ b/tests/test_session_helpers_store.py @@ -157,8 +157,20 @@ async def test_limit_offset_applied_after_sidechain_filter(self) -> None: first and ``_apply_sort_limit_offset`` paginates the filtered set, so ``limit=N`` returns N rows even when sidechains exist. The store path must do the same — paginating before filtering would return short - pages and let sidechains consume page slots.""" - store = InMemorySessionStore() + pages and let sidechains consume page slots. + + Pinned to the slow path (``list_session_summaries`` suppressed) + because the fast path deliberately locks in paginate-THEN-drop for + sidechain-shaped summary slots (see + ``test_sidechain_summary_short_pages``); slow-path filter-THEN- + paginate is what this test covers. + """ + + class SlowPathStore(InMemorySessionStore): + async def list_session_summaries(self, project_key): # type: ignore[override] + raise NotImplementedError + + store = SlowPathStore() valid_sids: list[str] = [] for _ in range(5): sid = str(uuid_mod.uuid4())