Skip to content

Commit 95413fd

Browse files
authored
feat(session_store): add import_session_to_store() for local→store replay (#858)
## Summary Ports the TypeScript SDK's `importSessionToStore()` (`@alpha`, `agentSdk.ts`) to Python as `import_session_to_store()`. Streams a local `~/.claude/projects/<dir>/<sessionId>.jsonl` line-by-line into `SessionStore.append()` in batches (default 500 entries / 1 MiB), optionally including subagent transcripts under `<sessionId>/subagents/**` and their `.meta.json` sidecars (appended as `{"type": "agent_metadata", ...}` so `materialize_resume_session()` can recreate them). This is the inverse of `materialize_resume_session()` (store → temp disk). Key derivation uses `project_key_for_directory()` and the subpath construction matches `file_path_to_session_key()`, so an imported session is indistinguishable from one that was live-mirrored via `TranscriptMirrorBatcher` and is resumable via `query(options=ClaudeAgentOptions(session_store=store, resume=session_id))` from the same cwd. Useful for migrating existing local sessions to a remote store, or catching a store up after a `MirrorErrorMessage` indicated a live-mirror gap. Pairs with #857's uuid-idempotency docs — adapters should treat `entry["uuid"]` as an idempotency key so re-import is duplicate-safe. ## API ```python async def import_session_to_store( session_id: str, store: SessionStore, *, directory: str | None = None, include_subagents: bool = True, batch_size: int = 500, ) -> None: ``` (Uses `directory=` to match `list_sessions`/`get_session_info`/`fork_session` rather than TS's `dir`, which shadows a Python builtin.) ## Tests 12 tests in `tests/test_session_import.py`: - main transcript import → `InMemorySessionStore`, entries match - `batch_size=2` with 5 entries → 3 `append()` calls (2+2+1), spy-verified - blank lines skipped; `batch_size<=0` falls back to default - subagent transcripts imported with correct `subpath` (flat + nested `workflows/run-1/`) - `.meta.json` sidecar imported as `agent_metadata` entry - `include_subagents=False` skips subagent files - missing subagents dir is a no-op - invalid UUID → `ValueError`; missing session → `FileNotFoundError` - key parity with `file_path_to_session_key()` (round-trip with `TranscriptMirrorBatcher`) `pytest` 12/12, `ruff` clean, `mypy src/` clean. <!-- CHANGELOG:START --> - Added `import_session_to_store()` to replay a local `~/.claude` session transcript into a `SessionStore` (parity with TypeScript SDK `importSessionToStore`) <!-- CHANGELOG:END -->
1 parent a7a5ead commit 95413fd

4 files changed

Lines changed: 494 additions & 22 deletions

File tree

e2e-tests/test_agents_and_settings.py

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
"""End-to-end tests for agents and setting sources with real Claude API calls."""
22

3-
import asyncio
43
import json
5-
import sys
64
import tempfile
75
from pathlib import Path
86

@@ -154,7 +152,9 @@ async def test_filesystem_agent_loading():
154152
The bug in #406 causes the iterator to complete after only the
155153
init SystemMessage, never yielding AssistantMessage or ResultMessage.
156154
"""
157-
with tempfile.TemporaryDirectory() as tmpdir:
155+
# ignore_cleanup_errors: on Windows the CLI subprocess (cwd=tmpdir) may
156+
# still hold a handle when __exit__ tries to rmdir; cleanup is best-effort.
157+
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
158158
# Create a temporary project with a filesystem agent
159159
project_dir = Path(tmpdir)
160160
agents_dir = project_dir / ".claude" / "agents"
@@ -207,16 +207,14 @@ async def test_filesystem_agent_loading():
207207
)
208208
break
209209

210-
# On Windows, wait for file handles to be released before cleanup
211-
if sys.platform == "win32":
212-
await asyncio.sleep(0.5)
213-
214210

215211
@pytest.mark.e2e
216212
@pytest.mark.asyncio
217213
async def test_setting_sources_default():
218214
"""Test that default (no setting_sources) lets CLI load all settings normally."""
219-
with tempfile.TemporaryDirectory() as tmpdir:
215+
# ignore_cleanup_errors: on Windows the CLI subprocess (cwd=tmpdir) may
216+
# still hold a handle when __exit__ tries to rmdir; cleanup is best-effort.
217+
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
220218
# Create a temporary project with local settings
221219
project_dir = Path(tmpdir)
222220
claude_dir = project_dir / ".claude"
@@ -245,16 +243,14 @@ async def test_setting_sources_default():
245243
)
246244
break
247245

248-
# On Windows, wait for file handles to be released before cleanup
249-
if sys.platform == "win32":
250-
await asyncio.sleep(0.5)
251-
252246

253247
@pytest.mark.e2e
254248
@pytest.mark.asyncio
255249
async def test_setting_sources_user_only():
256250
"""Test that setting_sources=['user'] excludes project settings."""
257-
with tempfile.TemporaryDirectory() as tmpdir:
251+
# ignore_cleanup_errors: on Windows the CLI subprocess (cwd=tmpdir) may
252+
# still hold a handle when __exit__ tries to rmdir; cleanup is best-effort.
253+
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
258254
# Create a temporary project with a slash command
259255
project_dir = Path(tmpdir)
260256
commands_dir = project_dir / ".claude" / "commands"
@@ -289,16 +285,14 @@ async def test_setting_sources_user_only():
289285
)
290286
break
291287

292-
# On Windows, wait for file handles to be released before cleanup
293-
if sys.platform == "win32":
294-
await asyncio.sleep(0.5)
295-
296288

297289
@pytest.mark.e2e
298290
@pytest.mark.asyncio
299291
async def test_setting_sources_project_included():
300292
"""Test that setting_sources=['user', 'project'] includes project settings."""
301-
with tempfile.TemporaryDirectory() as tmpdir:
293+
# ignore_cleanup_errors: on Windows the CLI subprocess (cwd=tmpdir) may
294+
# still hold a handle when __exit__ tries to rmdir; cleanup is best-effort.
295+
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
302296
# Create a temporary project with local settings
303297
project_dir = Path(tmpdir)
304298
claude_dir = project_dir / ".claude"
@@ -327,10 +321,6 @@ async def test_setting_sources_project_included():
327321
)
328322
break
329323

330-
# On Windows, wait for file handles to be released before cleanup
331-
if sys.platform == "win32":
332-
await asyncio.sleep(0.5)
333-
334324

335325
@pytest.mark.e2e
336326
@pytest.mark.asyncio

src/claude_agent_sdk/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
CLINotFoundError,
2626
ProcessError,
2727
)
28+
from ._internal.session_import import import_session_to_store
2829
from ._internal.session_mutations import (
2930
ForkSessionResult,
3031
delete_session,
@@ -616,6 +617,7 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
616617
"fold_session_summary",
617618
"MirrorErrorMessage",
618619
"project_key_for_directory",
620+
"import_session_to_store",
619621
# Session listing (SessionStore-backed async variants)
620622
"list_sessions_from_store",
621623
"get_session_info_from_store",
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
"""Replay a local on-disk session transcript into a :class:`SessionStore`.
2+
3+
This is the inverse of :mod:`session_resume` — where ``materialize_resume_session``
4+
reads a store and writes a temp ``~/.claude`` tree, ``import_session_to_store``
5+
reads the local ``~/.claude/projects/<dir>/<sessionId>.jsonl`` (plus subagent
6+
transcripts) and replays each line into ``store.append()``.
7+
8+
Mirrors the TypeScript SDK's ``importSessionToStore``.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import errno
14+
import json
15+
from collections.abc import Iterator
16+
from pathlib import Path
17+
18+
from ..types import SessionKey, SessionStore, SessionStoreEntry
19+
from .sessions import (
20+
_resolve_session_file_path,
21+
_validate_uuid,
22+
)
23+
from .transcript_mirror_batcher import MAX_PENDING_BYTES, MAX_PENDING_ENTRIES
24+
25+
__all__ = ["import_session_to_store"]
26+
27+
28+
async def import_session_to_store(
29+
session_id: str,
30+
store: SessionStore,
31+
*,
32+
directory: str | None = None,
33+
include_subagents: bool = True,
34+
batch_size: int = MAX_PENDING_ENTRIES,
35+
) -> None:
36+
"""Replay a local session transcript into a :class:`SessionStore`.
37+
38+
Streams the on-disk JSONL line-by-line and calls ``store.append(key, batch)``
39+
every ``batch_size`` entries (or 1 MiB of line bytes, whichever comes
40+
first). Useful for migrating existing local sessions to a remote store, or
41+
for catching a store up after a :class:`MirrorErrorMessage` indicated a
42+
live-mirror gap. Adapters should treat ``entry["uuid"]`` as an idempotency
43+
key so re-import is duplicate-safe.
44+
45+
The destination ``project_key`` is the name of the on-disk project
46+
directory the session file was found in — the same key
47+
:func:`file_path_to_session_key` (and thus ``TranscriptMirrorBatcher``)
48+
would have produced for the same file — so an imported session is
49+
indistinguishable from a live-mirrored one and resumable via
50+
``query(options=ClaudeAgentOptions(session_store=store, resume=session_id))``
51+
from the original ``cwd``.
52+
53+
Args:
54+
session_id: UUID of the session to import.
55+
store: Destination :class:`SessionStore`.
56+
directory: Project directory path (same semantics as
57+
:func:`list_sessions`). When omitted, all project directories are
58+
searched for the session file.
59+
include_subagents: If ``True`` (default), also import subagent
60+
transcripts under ``<sessionId>/subagents/**`` and their
61+
``.meta.json`` sidecars.
62+
batch_size: Maximum entries per ``store.append()`` call. Default 500.
63+
64+
Raises:
65+
ValueError: If ``session_id`` is not a valid UUID.
66+
FileNotFoundError: If the session JSONL cannot be found on disk.
67+
"""
68+
if not _validate_uuid(session_id):
69+
raise ValueError(f"Invalid session_id: {session_id}")
70+
71+
resolved = _resolve_session_file_path(session_id, directory)
72+
if resolved is None:
73+
raise FileNotFoundError(f"Session {session_id} not found")
74+
75+
# Key under the on-disk project directory name — matches
76+
# file_path_to_session_key() / TranscriptMirrorBatcher even when the
77+
# resolver's search (directory=None) or worktree fallback found the file
78+
# somewhere other than `directory`.
79+
project_key = resolved.parent.name
80+
if batch_size <= 0:
81+
batch_size = MAX_PENDING_ENTRIES
82+
83+
main_key: SessionKey = {"project_key": project_key, "session_id": session_id}
84+
await _append_jsonl_file_in_batches(resolved, main_key, store, batch_size)
85+
86+
if not include_subagents:
87+
return
88+
89+
# Subagent transcripts live at <projectDir>/<sessionId>/subagents/**.
90+
session_dir = resolved.with_suffix("")
91+
subagents_dir = session_dir / "subagents"
92+
for file_path in _collect_jsonl_files(subagents_dir):
93+
# subpath is the path relative to session_dir, '/'-joined, sans .jsonl —
94+
# e.g. subagents/agent-abc or subagents/workflows/run-1/agent-def.
95+
# Matches file_path_to_session_key() so list_subkeys() and
96+
# get_subagent_messages_from_store() round-trip.
97+
rel_parts = list(file_path.relative_to(session_dir).parts)
98+
rel_parts[-1] = rel_parts[-1][: -len(".jsonl")]
99+
sub_key: SessionKey = {
100+
"project_key": project_key,
101+
"session_id": session_id,
102+
"subpath": "/".join(rel_parts),
103+
}
104+
await _append_jsonl_file_in_batches(file_path, sub_key, store, batch_size)
105+
106+
# The on-disk .jsonl does NOT contain agent_metadata entries — those
107+
# are only sent to live mirrors and persisted in the .meta.json
108+
# sidecar. Import the sidecar so materialize_resume_session() can
109+
# recreate it and resumed subagents keep their agentType/worktreePath.
110+
meta_path = file_path.with_name(file_path.name[: -len(".jsonl")] + ".meta.json")
111+
try:
112+
meta = json.loads(meta_path.read_text(encoding="utf-8"))
113+
except OSError as e:
114+
if e.errno != errno.ENOENT:
115+
raise
116+
else:
117+
meta_entry: SessionStoreEntry = {"type": "agent_metadata"}
118+
meta_entry.update(meta)
119+
await store.append(sub_key, [meta_entry])
120+
121+
122+
async def _append_jsonl_file_in_batches(
123+
file_path: Path,
124+
key: SessionKey,
125+
store: SessionStore,
126+
batch_size: int,
127+
) -> None:
128+
"""Stream-read a JSONL file line-by-line, parsing each line and flushing to
129+
``store.append()`` in batches of ``batch_size`` entries (or
130+
``MAX_PENDING_BYTES`` of line text, whichever comes first). Skips blank
131+
lines."""
132+
batch: list[SessionStoreEntry] = []
133+
nbytes = 0
134+
with file_path.open(encoding="utf-8") as f:
135+
for line in f:
136+
line = line.rstrip("\n")
137+
if not line:
138+
continue
139+
batch.append(json.loads(line))
140+
nbytes += len(line)
141+
if len(batch) >= batch_size or nbytes >= MAX_PENDING_BYTES:
142+
await store.append(key, batch)
143+
batch = []
144+
nbytes = 0
145+
if batch:
146+
await store.append(key, batch)
147+
148+
149+
def _collect_jsonl_files(base_dir: Path) -> Iterator[Path]:
150+
"""Recursively yield all ``*.jsonl`` file paths under ``base_dir``.
151+
152+
Yields nothing if ``base_dir`` does not exist. Sorted per directory so
153+
import order is deterministic across platforms.
154+
"""
155+
try:
156+
dirents = sorted(base_dir.iterdir(), key=lambda p: p.name)
157+
except OSError:
158+
return
159+
for entry in dirents:
160+
if entry.is_dir():
161+
yield from _collect_jsonl_files(entry)
162+
elif entry.is_file() and entry.name.endswith(".jsonl"):
163+
yield entry

0 commit comments

Comments
 (0)