Skip to content

Commit c923782

Browse files
committed
feat(session_store): add session_store_flush option for eager mirroring
Adds ClaudeAgentOptions.session_store_flush ("batched" | "eager", default "batched"). With "eager", build_mirror_batcher() zeroes the TranscriptMirrorBatcher pending thresholds so every transcript_mirror frame schedules a background flush, delivering entries to SessionStore.append() in near real time instead of coalescing until the end-of-turn result message. Appends remain serialized in enqueue order; a slow adapter does not stall the read loop (frames coalesce while it is busy). Exports the SessionStoreFlushMode type alias.
1 parent b512f25 commit c923782

6 files changed

Lines changed: 164 additions & 3 deletions

File tree

src/claude_agent_sdk/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
SessionMessage,
114114
SessionStore,
115115
SessionStoreEntry,
116+
SessionStoreFlushMode,
116117
SessionStoreListEntry,
117118
SessionSummaryEntry,
118119
SettingSource,
@@ -610,6 +611,7 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
610611
"SessionKey",
611612
"SessionStore",
612613
"SessionStoreEntry",
614+
"SessionStoreFlushMode",
613615
"SessionStoreListEntry",
614616
"SessionSummaryEntry",
615617
"SessionListSubkeysKey",

src/claude_agent_sdk/_internal/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ async def _on_mirror_error(key: Any, error: str) -> None:
192192
materialized=materialized,
193193
env=configured_options.env,
194194
on_error=_on_mirror_error,
195+
flush_mode=configured_options.session_store_flush,
195196
)
196197
)
197198

src/claude_agent_sdk/_internal/session_resume.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,14 @@
3131
from pathlib import Path
3232
from typing import Any
3333

34-
from ..types import ClaudeAgentOptions, SessionKey, SessionStore
34+
from ..types import ClaudeAgentOptions, SessionKey, SessionStore, SessionStoreFlushMode
3535
from .session_store_validation import _store_implements
3636
from .sessions import _get_projects_dir, _validate_uuid, project_key_for_directory
37-
from .transcript_mirror_batcher import TranscriptMirrorBatcher
37+
from .transcript_mirror_batcher import (
38+
MAX_PENDING_BYTES,
39+
MAX_PENDING_ENTRIES,
40+
TranscriptMirrorBatcher,
41+
)
3842

3943
logger = logging.getLogger(__name__)
4044

@@ -87,23 +91,31 @@ def build_mirror_batcher(
8791
materialized: MaterializedResume | None,
8892
env: dict[str, str] | None,
8993
on_error: Callable[[SessionKey | None, str], Awaitable[None]],
94+
flush_mode: SessionStoreFlushMode = "batched",
9095
) -> TranscriptMirrorBatcher:
9196
"""Construct the :class:`TranscriptMirrorBatcher` for a session.
9297
9398
Resolves ``projects_dir`` to the materialized temp dir when present
9499
(so file_path → key resolution matches what the subprocess writes),
95100
otherwise to the standard projects directory under the effective
96101
``CLAUDE_CONFIG_DIR``.
102+
103+
``flush_mode="eager"`` zeroes the batcher's pending thresholds so every
104+
enqueued frame schedules a background flush; ``"batched"`` keeps the
105+
defaults (flush on ``result`` or 500-entry / 1 MiB overflow).
97106
"""
98107
projects_dir = (
99108
str(materialized.config_dir / "projects")
100109
if materialized is not None
101110
else str(_get_projects_dir(env))
102111
)
112+
eager = flush_mode == "eager"
103113
return TranscriptMirrorBatcher(
104114
store=store,
105115
projects_dir=projects_dir,
106116
on_error=on_error,
117+
max_pending_entries=0 if eager else MAX_PENDING_ENTRIES,
118+
max_pending_bytes=0 if eager else MAX_PENDING_BYTES,
107119
)
108120

109121

src/claude_agent_sdk/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ async def _on_mirror_error(key: Any, error: str) -> None:
248248
materialized=self._materialized,
249249
env=self.options.env,
250250
on_error=_on_mirror_error,
251+
flush_mode=self.options.session_store_flush,
251252
)
252253
)
253254

src/claude_agent_sdk/types.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,19 @@ class SessionListSubkeysKey(TypedDict):
12481248
session_id: str
12491249

12501250

1251+
SessionStoreFlushMode = Literal["batched", "eager"]
1252+
"""Controls when transcript-mirror entries are flushed to a :class:`SessionStore`.
1253+
1254+
- ``"batched"`` (default): buffer entries and flush once per turn (on the
1255+
``result`` message) or when the pending buffer exceeds 500 entries / 1 MiB.
1256+
Keeps adapter latency off the streaming hot path.
1257+
- ``"eager"``: trigger a background flush after every ``transcript_mirror``
1258+
frame so ``SessionStore.append()`` sees entries in near real time. Appends
1259+
are still serialized in enqueue order; a slow adapter will not stall the
1260+
read loop but will see frames coalesced while it is busy.
1261+
"""
1262+
1263+
12511264
class SessionStore(Protocol):
12521265
"""Adapter for mirroring session transcripts to external storage.
12531266
@@ -1758,6 +1771,16 @@ class ClaudeAgentOptions:
17581771
when the local file is absent.
17591772
"""
17601773

1774+
session_store_flush: SessionStoreFlushMode = "batched"
1775+
"""When to flush mirrored transcript entries to ``session_store``.
1776+
1777+
``"batched"`` (default) coalesces entries and flushes once per turn or when
1778+
the buffer exceeds 500 entries / 1 MiB. ``"eager"`` triggers a background
1779+
flush after every frame for near-real-time delivery (each flush still runs
1780+
off the read loop, so a slow adapter does not stall message streaming).
1781+
Ignored when ``session_store`` is ``None``.
1782+
"""
1783+
17611784
load_timeout_ms: int = 60_000
17621785
"""Timeout for each ``session_store.load()`` / ``list_subkeys()`` call during
17631786
resume materialization, in milliseconds.

tests/test_transcript_mirror.py

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
query,
2525
)
2626
from claude_agent_sdk._internal.query import Query
27+
from claude_agent_sdk._internal.session_resume import build_mirror_batcher
2728
from claude_agent_sdk._internal.session_store import file_path_to_session_key
2829
from claude_agent_sdk._internal.sessions import _get_projects_dir
2930
from claude_agent_sdk._internal.transcript_mirror_batcher import (
@@ -503,6 +504,62 @@ async def append(self, key, entries):
503504
assert order == [1, 2, 3]
504505

505506

507+
# ---------------------------------------------------------------------------
508+
# build_mirror_batcher / session_store_flush
509+
# ---------------------------------------------------------------------------
510+
511+
512+
class TestBuildMirrorBatcherFlushMode:
513+
"""``session_store_flush`` threads through ``build_mirror_batcher`` to the
514+
batcher's pending thresholds: ``"batched"`` keeps the defaults,
515+
``"eager"`` zeroes them so every enqueue schedules a background flush."""
516+
517+
@pytest.mark.parametrize(
518+
("kwargs", "want_entries", "want_bytes"),
519+
[
520+
({}, MAX_PENDING_ENTRIES, MAX_PENDING_BYTES),
521+
({"flush_mode": "batched"}, MAX_PENDING_ENTRIES, MAX_PENDING_BYTES),
522+
({"flush_mode": "eager"}, 0, 0),
523+
],
524+
ids=["default", "batched", "eager"],
525+
)
526+
def test_flush_mode_sets_thresholds(
527+
self, kwargs: dict[str, Any], want_entries: int, want_bytes: int
528+
) -> None:
529+
batcher = build_mirror_batcher(
530+
store=InMemorySessionStore(),
531+
materialized=None,
532+
env={"CLAUDE_CONFIG_DIR": str(Path(PROJECTS_DIR).parent)},
533+
on_error=_noop_error,
534+
**kwargs,
535+
)
536+
assert batcher.max_pending_entries == want_entries
537+
assert batcher.max_pending_bytes == want_bytes
538+
539+
@pytest.mark.asyncio
540+
async def test_eager_mode_flushes_per_frame(self) -> None:
541+
store = _RecordingStore()
542+
batcher = build_mirror_batcher(
543+
store=store,
544+
materialized=None,
545+
env={"CLAUDE_CONFIG_DIR": str(Path(PROJECTS_DIR).parent)},
546+
on_error=_noop_error,
547+
flush_mode="eager",
548+
)
549+
batcher.enqueue(_main_path(), [{"type": "user", "n": 1}])
550+
await asyncio.sleep(0)
551+
await asyncio.sleep(0)
552+
assert len(store.append_calls) == 1
553+
batcher.enqueue(_main_path(), [{"type": "assistant", "n": 2}])
554+
await asyncio.sleep(0)
555+
await asyncio.sleep(0)
556+
assert len(store.append_calls) == 2
557+
assert [e["n"] for c in store.append_calls for e in c[1]] == [1, 2]
558+
559+
def test_options_default_is_batched(self) -> None:
560+
assert ClaudeAgentOptions().session_store_flush == "batched"
561+
562+
506563
# ---------------------------------------------------------------------------
507564
# --session-mirror CLI flag
508565
# ---------------------------------------------------------------------------
@@ -533,11 +590,15 @@ def test_flag_absent_when_session_store_unset(self) -> None:
533590
# ---------------------------------------------------------------------------
534591

535592

536-
def _make_mock_transport(messages: list[dict[str, Any]]) -> Any:
593+
def _make_mock_transport(
594+
messages: list[dict[str, Any]], *, yield_between: bool = False
595+
) -> Any:
537596
mock_transport = AsyncMock()
538597

539598
async def mock_receive():
540599
for msg in messages:
600+
if yield_between:
601+
await anyio.sleep(0)
541602
yield msg
542603

543604
mock_transport.read_messages = mock_receive
@@ -710,6 +771,67 @@ async def _test() -> None:
710771

711772
anyio.run(_test)
712773

774+
def test_eager_flush_mode_appends_per_frame_before_result(self) -> None:
775+
"""With ``session_store_flush="eager"`` each ``transcript_mirror`` frame
776+
is flushed as it arrives, so the store sees one ``append()`` per frame
777+
rather than a single coalesced batch at ``result`` time."""
778+
779+
async def _test() -> None:
780+
store = _RecordingStore()
781+
frame1 = {
782+
"type": "transcript_mirror",
783+
"filePath": _main_path("p", "s"),
784+
"entries": [{"type": "user", "uuid": "u1"}],
785+
}
786+
frame2 = {
787+
"type": "transcript_mirror",
788+
"filePath": _main_path("p", "s"),
789+
"entries": [{"type": "assistant", "uuid": "a1"}],
790+
}
791+
# Yield to the event loop between frames so the eager background
792+
# drain scheduled by enqueue() can run before the next frame
793+
# arrives — models the await on real stdout I/O. Without this the
794+
# mock delivers both frames synchronously and they coalesce, which
795+
# is correct back-pressure behaviour but not what we're asserting.
796+
mock_transport = _make_mock_transport(
797+
[frame1, frame2, _ASSISTANT_MSG, _RESULT_MSG],
798+
yield_between=True,
799+
)
800+
801+
with (
802+
patch(
803+
"claude_agent_sdk._internal.client.SubprocessCLITransport"
804+
) as mock_cls,
805+
patch(
806+
"claude_agent_sdk._internal.query.Query.initialize",
807+
new_callable=AsyncMock,
808+
),
809+
patch(
810+
"claude_agent_sdk._internal.session_resume._get_projects_dir",
811+
return_value=PROJECTS_DIR,
812+
),
813+
):
814+
mock_cls.return_value = mock_transport
815+
appends_at_assistant = None
816+
async for msg in query(
817+
prompt="Hello",
818+
options=ClaudeAgentOptions(
819+
session_store=store, session_store_flush="eager"
820+
),
821+
):
822+
if isinstance(msg, AssistantMessage):
823+
appends_at_assistant = len(store.append_calls)
824+
825+
# Both frames flushed individually before the assistant message
826+
# was yielded (eager background flush ran while the read loop
827+
# awaited the next stdout line).
828+
assert appends_at_assistant == 2
829+
assert len(store.append_calls) == 2
830+
assert store.append_calls[0][1] == [{"type": "user", "uuid": "u1"}]
831+
assert store.append_calls[1][1] == [{"type": "assistant", "uuid": "a1"}]
832+
833+
anyio.run(_test)
834+
713835
def test_mirror_frames_dropped_when_no_session_store(self) -> None:
714836
"""Without a session_store the batcher isn't attached; frames are
715837
peeled and dropped (still not yielded), normal messages flow."""

0 commit comments

Comments
 (0)