Skip to content

Commit c8942ba

Browse files
committed
fix: gap-fill list_session_summaries fast-path for sessions missing a sidecar
1 parent 0288f13 commit c8942ba

2 files changed

Lines changed: 145 additions & 39 deletions

File tree

src/claude_agent_sdk/_internal/sessions.py

Lines changed: 69 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,6 +1514,53 @@ async def _load_store_entries_as_jsonl(
15141514
return _entries_to_jsonl(entries)
15151515

15161516

1517+
async def _derive_infos_via_load(
1518+
session_store: SessionStore,
1519+
listing: list[Any],
1520+
directory: str | None,
1521+
project_path: str,
1522+
) -> list[SDKSessionInfo]:
1523+
"""Derive ``SDKSessionInfo`` for each ``listing`` entry via per-session
1524+
``store.load()`` + lite-parse.
1525+
1526+
Loads run concurrently with a fixed bound so large listings don't exhaust
1527+
adapter connection pools or hit backend rate limits; adapter errors degrade
1528+
that row to an empty summary instead of failing the whole list. Sidechain
1529+
and no-summary sessions are dropped.
1530+
"""
1531+
sem = asyncio.Semaphore(_STORE_LIST_LOAD_CONCURRENCY)
1532+
1533+
async def _bounded_load(sid: str) -> str | None:
1534+
async with sem:
1535+
return await _load_store_entries_as_jsonl(session_store, sid, directory)
1536+
1537+
settled = await asyncio.gather(
1538+
*(_bounded_load(e["session_id"]) for e in listing),
1539+
return_exceptions=True,
1540+
)
1541+
results: list[SDKSessionInfo] = []
1542+
for entry, outcome in zip(listing, settled, strict=True):
1543+
sid = entry["session_id"]
1544+
mtime = entry["mtime"]
1545+
if isinstance(outcome, BaseException):
1546+
results.append(
1547+
SDKSessionInfo(session_id=sid, summary="", last_modified=mtime)
1548+
)
1549+
continue
1550+
if outcome is None:
1551+
continue
1552+
parsed = _parse_session_info_from_lite(
1553+
sid, _jsonl_to_lite(outcome, mtime), project_path
1554+
)
1555+
if parsed is None:
1556+
# Sidechain or no extractable summary — drop, matching the
1557+
# filesystem path.
1558+
continue
1559+
parsed.last_modified = mtime
1560+
results.append(parsed)
1561+
return results
1562+
1563+
15171564
async def list_sessions_from_store(
15181565
session_store: SessionStore,
15191566
directory: str | None = None,
@@ -1556,6 +1603,7 @@ async def list_sessions_from_store(
15561603
"""
15571604
project_path = _canonicalize_path(str(directory) if directory is not None else ".")
15581605
project_key = _sanitize_path(project_path)
1606+
has_list_sessions = _store_implements(session_store, "list_sessions")
15591607

15601608
# Fast path: if the store maintains incremental summaries, fetch them in
15611609
# one call instead of N per-session load()s.
@@ -1572,54 +1620,36 @@ async def list_sessions_from_store(
15721620
for s in summaries
15731621
if (info := summary_entry_to_sdk_info(s, project_path)) is not None
15741622
]
1623+
# Gap-fill: a store may have entries for sessions appended before
1624+
# it adopted list_session_summaries (no sidecar yet). Enumerate
1625+
# via list_sessions() and derive the missing ones via the
1626+
# per-session load() path so they aren't silently dropped.
1627+
if has_list_sessions:
1628+
summary_ids = {s["session_id"] for s in summaries}
1629+
listing = list(await session_store.list_sessions(project_key))
1630+
missing = [e for e in listing if e["session_id"] not in summary_ids]
1631+
if missing:
1632+
infos.extend(
1633+
await _derive_infos_via_load(
1634+
session_store, missing, directory, project_path
1635+
)
1636+
)
15751637
return _apply_sort_limit_offset(infos, limit, offset)
15761638

1577-
if not _store_implements(session_store, "list_sessions"):
1639+
if not has_list_sessions:
15781640
raise ValueError(
15791641
"session_store does not implement list_sessions() -- cannot list "
15801642
"sessions. Provide a store with a list_sessions() method."
15811643
)
1582-
raw = await session_store.list_sessions(project_key)
15831644
# Copy — store.list_sessions() may return a reference to internal state.
1584-
listing = list(raw)
1585-
1645+
listing = list(await session_store.list_sessions(project_key))
15861646
# Derive a real summary per session by loading its entries and reusing
1587-
# the filesystem path's lite-parse. Loads run concurrently with a fixed
1588-
# bound so large listings don't exhaust adapter connection pools or hit
1589-
# backend rate limits; adapter errors degrade that row instead of failing
1590-
# the whole list. Filtering (sidechain/empty drop) happens before
1591-
# pagination so ``limit``/``offset`` index the same filtered set as the
1592-
# disk path.
1593-
sem = asyncio.Semaphore(_STORE_LIST_LOAD_CONCURRENCY)
1594-
1595-
async def _bounded_load(sid: str) -> str | None:
1596-
async with sem:
1597-
return await _load_store_entries_as_jsonl(session_store, sid, directory)
1598-
1599-
settled = await asyncio.gather(
1600-
*(_bounded_load(e["session_id"]) for e in listing),
1601-
return_exceptions=True,
1647+
# the filesystem path's lite-parse. Filtering (sidechain/empty drop)
1648+
# happens before pagination so ``limit``/``offset`` index the same
1649+
# filtered set as the disk path.
1650+
results = await _derive_infos_via_load(
1651+
session_store, listing, directory, project_path
16021652
)
1603-
results: list[SDKSessionInfo] = []
1604-
for entry, outcome in zip(listing, settled, strict=True):
1605-
sid = entry["session_id"]
1606-
mtime = entry["mtime"]
1607-
if isinstance(outcome, BaseException):
1608-
results.append(
1609-
SDKSessionInfo(session_id=sid, summary="", last_modified=mtime)
1610-
)
1611-
continue
1612-
if outcome is None:
1613-
continue
1614-
parsed = _parse_session_info_from_lite(
1615-
sid, _jsonl_to_lite(outcome, mtime), project_path
1616-
)
1617-
if parsed is None:
1618-
# Sidechain or no extractable summary — drop, matching the
1619-
# filesystem path.
1620-
continue
1621-
parsed.last_modified = mtime
1622-
results.append(parsed)
16231653
return _apply_sort_limit_offset(results, limit, offset)
16241654

16251655

tests/test_session_summary.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,82 @@ async def list_session_summaries(self, project_key: str): # noqa: ANN201
465465
assert len(sessions) == 1
466466
assert sessions[0].summary == "hi"
467467

468+
async def test_mixed_sessions_gap_filled(self) -> None:
469+
"""A store with summaries for only SOME sessions (e.g. adopted the
470+
method mid-stream) must have the rest gap-filled via per-session
471+
load() so old sessions aren't silently dropped."""
472+
sid_with = str(uuid_mod.uuid4())
473+
sid_without = str(uuid_mod.uuid4())
474+
475+
class PartialStore(InMemorySessionStore):
476+
load_calls: list[str] = []
477+
478+
async def list_session_summaries(self, project_key: str): # noqa: ANN201
479+
full = await super().list_session_summaries(project_key)
480+
return [s for s in full if s["session_id"] == sid_with]
481+
482+
async def load(self, key): # noqa: ANN001, ANN201
483+
self.load_calls.append(key["session_id"])
484+
return await super().load(key)
485+
486+
store = PartialStore()
487+
await store.append(
488+
{"project_key": PROJECT_KEY, "session_id": sid_with},
489+
[_user("has sidecar", ts="2024-01-02T00:00:00Z")],
490+
)
491+
await store.append(
492+
{"project_key": PROJECT_KEY, "session_id": sid_without},
493+
[_user("no sidecar", ts="2024-01-01T00:00:00Z")],
494+
)
495+
496+
sessions = await list_sessions_from_store(store, directory=DIR)
497+
by_id = {s.session_id: s for s in sessions}
498+
assert set(by_id) == {sid_with, sid_without}
499+
assert by_id[sid_with].summary == "has sidecar"
500+
assert by_id[sid_without].summary == "no sidecar"
501+
# Only the missing session should have been load()ed.
502+
assert store.load_calls == [sid_without]
503+
504+
async def test_gap_fill_bounded_concurrency(self) -> None:
505+
"""Gap-fill reuses the bounded per-session load helper, so
506+
``_STORE_LIST_LOAD_CONCURRENCY`` applies to the missing-session set."""
507+
import asyncio
508+
509+
from claude_agent_sdk._internal import sessions as _sessions
510+
511+
in_flight = 0
512+
peak = 0
513+
gate = asyncio.Event()
514+
515+
class PartialSlowStore(InMemorySessionStore):
516+
async def list_session_summaries(self, project_key: str): # noqa: ANN201
517+
return [] # everything is "missing"
518+
519+
async def load(self, key): # noqa: ANN001, ANN201
520+
nonlocal in_flight, peak
521+
in_flight += 1
522+
peak = max(peak, in_flight)
523+
await gate.wait()
524+
in_flight -= 1
525+
return await super().load(key)
526+
527+
store = PartialSlowStore()
528+
n = _sessions._STORE_LIST_LOAD_CONCURRENCY * 2
529+
for i in range(n):
530+
await InMemorySessionStore.append(
531+
store,
532+
{"project_key": PROJECT_KEY, "session_id": str(uuid_mod.uuid4())},
533+
[_user(f"p{i}")],
534+
)
535+
536+
task = asyncio.create_task(list_sessions_from_store(store, directory=DIR))
537+
for _ in range(5):
538+
await asyncio.sleep(0)
539+
assert 0 < peak <= _sessions._STORE_LIST_LOAD_CONCURRENCY
540+
gate.set()
541+
await task
542+
assert peak == _sessions._STORE_LIST_LOAD_CONCURRENCY
543+
468544

469545
# ---------------------------------------------------------------------------
470546
# Parity: incremental fold == batch lite-parse

0 commit comments

Comments
 (0)