From c92378296453388464c2de6e9e587d705baef9d7 Mon Sep 17 00:00:00 2001 From: Lorenzo Bernasconi Date: Sun, 3 May 2026 13:52:29 +0100 Subject: [PATCH] feat(session_store): add session_store_flush option for eager mirroring Adds ClaudeAgentOptions.session_store_flush ("batched" | "eager", default "batched"). With "eager", build_mirror_batcher() zeroes the TranscriptMirrorBatcher pending thresholds so every transcript_mirror frame schedules a background flush, delivering entries to SessionStore.append() in near real time instead of coalescing until the end-of-turn result message. Appends remain serialized in enqueue order; a slow adapter does not stall the read loop (frames coalesce while it is busy). Exports the SessionStoreFlushMode type alias. --- src/claude_agent_sdk/__init__.py | 2 + src/claude_agent_sdk/_internal/client.py | 1 + .../_internal/session_resume.py | 16 ++- src/claude_agent_sdk/client.py | 1 + src/claude_agent_sdk/types.py | 23 ++++ tests/test_transcript_mirror.py | 124 +++++++++++++++++- 6 files changed, 164 insertions(+), 3 deletions(-) diff --git a/src/claude_agent_sdk/__init__.py b/src/claude_agent_sdk/__init__.py index 3e60d5bd..22658d5e 100644 --- a/src/claude_agent_sdk/__init__.py +++ b/src/claude_agent_sdk/__init__.py @@ -113,6 +113,7 @@ SessionMessage, SessionStore, SessionStoreEntry, + SessionStoreFlushMode, SessionStoreListEntry, SessionSummaryEntry, SettingSource, @@ -610,6 +611,7 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any: "SessionKey", "SessionStore", "SessionStoreEntry", + "SessionStoreFlushMode", "SessionStoreListEntry", "SessionSummaryEntry", "SessionListSubkeysKey", diff --git a/src/claude_agent_sdk/_internal/client.py b/src/claude_agent_sdk/_internal/client.py index 2d0029a9..44bdbc2a 100644 --- a/src/claude_agent_sdk/_internal/client.py +++ b/src/claude_agent_sdk/_internal/client.py @@ -192,6 +192,7 @@ async def _on_mirror_error(key: Any, error: str) -> None: materialized=materialized, env=configured_options.env, on_error=_on_mirror_error, + flush_mode=configured_options.session_store_flush, ) ) diff --git a/src/claude_agent_sdk/_internal/session_resume.py b/src/claude_agent_sdk/_internal/session_resume.py index 1bbace38..7bfa9baf 100644 --- a/src/claude_agent_sdk/_internal/session_resume.py +++ b/src/claude_agent_sdk/_internal/session_resume.py @@ -31,10 +31,14 @@ from pathlib import Path from typing import Any -from ..types import ClaudeAgentOptions, SessionKey, SessionStore +from ..types import ClaudeAgentOptions, SessionKey, SessionStore, SessionStoreFlushMode from .session_store_validation import _store_implements from .sessions import _get_projects_dir, _validate_uuid, project_key_for_directory -from .transcript_mirror_batcher import TranscriptMirrorBatcher +from .transcript_mirror_batcher import ( + MAX_PENDING_BYTES, + MAX_PENDING_ENTRIES, + TranscriptMirrorBatcher, +) logger = logging.getLogger(__name__) @@ -87,6 +91,7 @@ def build_mirror_batcher( materialized: MaterializedResume | None, env: dict[str, str] | None, on_error: Callable[[SessionKey | None, str], Awaitable[None]], + flush_mode: SessionStoreFlushMode = "batched", ) -> TranscriptMirrorBatcher: """Construct the :class:`TranscriptMirrorBatcher` for a session. @@ -94,16 +99,23 @@ def build_mirror_batcher( (so file_path → key resolution matches what the subprocess writes), otherwise to the standard projects directory under the effective ``CLAUDE_CONFIG_DIR``. + + ``flush_mode="eager"`` zeroes the batcher's pending thresholds so every + enqueued frame schedules a background flush; ``"batched"`` keeps the + defaults (flush on ``result`` or 500-entry / 1 MiB overflow). """ projects_dir = ( str(materialized.config_dir / "projects") if materialized is not None else str(_get_projects_dir(env)) ) + eager = flush_mode == "eager" return TranscriptMirrorBatcher( store=store, projects_dir=projects_dir, on_error=on_error, + max_pending_entries=0 if eager else MAX_PENDING_ENTRIES, + max_pending_bytes=0 if eager else MAX_PENDING_BYTES, ) diff --git a/src/claude_agent_sdk/client.py b/src/claude_agent_sdk/client.py index 25d4d353..58c3a9b8 100644 --- a/src/claude_agent_sdk/client.py +++ b/src/claude_agent_sdk/client.py @@ -248,6 +248,7 @@ async def _on_mirror_error(key: Any, error: str) -> None: materialized=self._materialized, env=self.options.env, on_error=_on_mirror_error, + flush_mode=self.options.session_store_flush, ) ) diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 9c2be63f..c76b5f00 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -1248,6 +1248,19 @@ class SessionListSubkeysKey(TypedDict): session_id: str +SessionStoreFlushMode = Literal["batched", "eager"] +"""Controls when transcript-mirror entries are flushed to a :class:`SessionStore`. + +- ``"batched"`` (default): buffer entries and flush once per turn (on the + ``result`` message) or when the pending buffer exceeds 500 entries / 1 MiB. + Keeps adapter latency off the streaming hot path. +- ``"eager"``: trigger a background flush after every ``transcript_mirror`` + frame so ``SessionStore.append()`` sees entries in near real time. Appends + are still serialized in enqueue order; a slow adapter will not stall the + read loop but will see frames coalesced while it is busy. +""" + + class SessionStore(Protocol): """Adapter for mirroring session transcripts to external storage. @@ -1758,6 +1771,16 @@ class ClaudeAgentOptions: when the local file is absent. """ + session_store_flush: SessionStoreFlushMode = "batched" + """When to flush mirrored transcript entries to ``session_store``. + + ``"batched"`` (default) coalesces entries and flushes once per turn or when + the buffer exceeds 500 entries / 1 MiB. ``"eager"`` triggers a background + flush after every frame for near-real-time delivery (each flush still runs + off the read loop, so a slow adapter does not stall message streaming). + Ignored when ``session_store`` is ``None``. + """ + load_timeout_ms: int = 60_000 """Timeout for each ``session_store.load()`` / ``list_subkeys()`` call during resume materialization, in milliseconds. diff --git a/tests/test_transcript_mirror.py b/tests/test_transcript_mirror.py index 1b9608b3..72852127 100644 --- a/tests/test_transcript_mirror.py +++ b/tests/test_transcript_mirror.py @@ -24,6 +24,7 @@ query, ) from claude_agent_sdk._internal.query import Query +from claude_agent_sdk._internal.session_resume import build_mirror_batcher from claude_agent_sdk._internal.session_store import file_path_to_session_key from claude_agent_sdk._internal.sessions import _get_projects_dir from claude_agent_sdk._internal.transcript_mirror_batcher import ( @@ -503,6 +504,62 @@ async def append(self, key, entries): assert order == [1, 2, 3] +# --------------------------------------------------------------------------- +# build_mirror_batcher / session_store_flush +# --------------------------------------------------------------------------- + + +class TestBuildMirrorBatcherFlushMode: + """``session_store_flush`` threads through ``build_mirror_batcher`` to the + batcher's pending thresholds: ``"batched"`` keeps the defaults, + ``"eager"`` zeroes them so every enqueue schedules a background flush.""" + + @pytest.mark.parametrize( + ("kwargs", "want_entries", "want_bytes"), + [ + ({}, MAX_PENDING_ENTRIES, MAX_PENDING_BYTES), + ({"flush_mode": "batched"}, MAX_PENDING_ENTRIES, MAX_PENDING_BYTES), + ({"flush_mode": "eager"}, 0, 0), + ], + ids=["default", "batched", "eager"], + ) + def test_flush_mode_sets_thresholds( + self, kwargs: dict[str, Any], want_entries: int, want_bytes: int + ) -> None: + batcher = build_mirror_batcher( + store=InMemorySessionStore(), + materialized=None, + env={"CLAUDE_CONFIG_DIR": str(Path(PROJECTS_DIR).parent)}, + on_error=_noop_error, + **kwargs, + ) + assert batcher.max_pending_entries == want_entries + assert batcher.max_pending_bytes == want_bytes + + @pytest.mark.asyncio + async def test_eager_mode_flushes_per_frame(self) -> None: + store = _RecordingStore() + batcher = build_mirror_batcher( + store=store, + materialized=None, + env={"CLAUDE_CONFIG_DIR": str(Path(PROJECTS_DIR).parent)}, + on_error=_noop_error, + flush_mode="eager", + ) + batcher.enqueue(_main_path(), [{"type": "user", "n": 1}]) + await asyncio.sleep(0) + await asyncio.sleep(0) + assert len(store.append_calls) == 1 + batcher.enqueue(_main_path(), [{"type": "assistant", "n": 2}]) + await asyncio.sleep(0) + await asyncio.sleep(0) + assert len(store.append_calls) == 2 + assert [e["n"] for c in store.append_calls for e in c[1]] == [1, 2] + + def test_options_default_is_batched(self) -> None: + assert ClaudeAgentOptions().session_store_flush == "batched" + + # --------------------------------------------------------------------------- # --session-mirror CLI flag # --------------------------------------------------------------------------- @@ -533,11 +590,15 @@ def test_flag_absent_when_session_store_unset(self) -> None: # --------------------------------------------------------------------------- -def _make_mock_transport(messages: list[dict[str, Any]]) -> Any: +def _make_mock_transport( + messages: list[dict[str, Any]], *, yield_between: bool = False +) -> Any: mock_transport = AsyncMock() async def mock_receive(): for msg in messages: + if yield_between: + await anyio.sleep(0) yield msg mock_transport.read_messages = mock_receive @@ -710,6 +771,67 @@ async def _test() -> None: anyio.run(_test) + def test_eager_flush_mode_appends_per_frame_before_result(self) -> None: + """With ``session_store_flush="eager"`` each ``transcript_mirror`` frame + is flushed as it arrives, so the store sees one ``append()`` per frame + rather than a single coalesced batch at ``result`` time.""" + + async def _test() -> None: + store = _RecordingStore() + frame1 = { + "type": "transcript_mirror", + "filePath": _main_path("p", "s"), + "entries": [{"type": "user", "uuid": "u1"}], + } + frame2 = { + "type": "transcript_mirror", + "filePath": _main_path("p", "s"), + "entries": [{"type": "assistant", "uuid": "a1"}], + } + # Yield to the event loop between frames so the eager background + # drain scheduled by enqueue() can run before the next frame + # arrives — models the await on real stdout I/O. Without this the + # mock delivers both frames synchronously and they coalesce, which + # is correct back-pressure behaviour but not what we're asserting. + mock_transport = _make_mock_transport( + [frame1, frame2, _ASSISTANT_MSG, _RESULT_MSG], + yield_between=True, + ) + + with ( + patch( + "claude_agent_sdk._internal.client.SubprocessCLITransport" + ) as mock_cls, + patch( + "claude_agent_sdk._internal.query.Query.initialize", + new_callable=AsyncMock, + ), + patch( + "claude_agent_sdk._internal.session_resume._get_projects_dir", + return_value=PROJECTS_DIR, + ), + ): + mock_cls.return_value = mock_transport + appends_at_assistant = None + async for msg in query( + prompt="Hello", + options=ClaudeAgentOptions( + session_store=store, session_store_flush="eager" + ), + ): + if isinstance(msg, AssistantMessage): + appends_at_assistant = len(store.append_calls) + + # Both frames flushed individually before the assistant message + # was yielded (eager background flush ran while the read loop + # awaited the next stdout line). + assert appends_at_assistant == 2 + assert len(store.append_calls) == 2 + assert store.append_calls[0][1] == [{"type": "user", "uuid": "u1"}] + assert store.append_calls[1][1] == [{"type": "assistant", "uuid": "a1"}] + + anyio.run(_test) + def test_mirror_frames_dropped_when_no_session_store(self) -> None: """Without a session_store the batcher isn't attached; frames are peeled and dropped (still not yielded), normal messages flow."""