Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/claude_agent_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
SessionMessage,
SessionStore,
SessionStoreEntry,
SessionStoreFlushMode,
SessionStoreListEntry,
SessionSummaryEntry,
SettingSource,
Expand Down Expand Up @@ -610,6 +611,7 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
"SessionKey",
"SessionStore",
"SessionStoreEntry",
"SessionStoreFlushMode",
"SessionStoreListEntry",
"SessionSummaryEntry",
"SessionListSubkeysKey",
Expand Down
1 change: 1 addition & 0 deletions src/claude_agent_sdk/_internal/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ async def _on_mirror_error(key: Any, error: str) -> None:
materialized=materialized,
env=configured_options.env,
on_error=_on_mirror_error,
flush_mode=configured_options.session_store_flush,
)
)

Expand Down
16 changes: 14 additions & 2 deletions src/claude_agent_sdk/_internal/session_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@
from pathlib import Path
from typing import Any

from ..types import ClaudeAgentOptions, SessionKey, SessionStore
from ..types import ClaudeAgentOptions, SessionKey, SessionStore, SessionStoreFlushMode
from .session_store_validation import _store_implements
from .sessions import _get_projects_dir, _validate_uuid, project_key_for_directory
from .transcript_mirror_batcher import TranscriptMirrorBatcher
from .transcript_mirror_batcher import (
MAX_PENDING_BYTES,
MAX_PENDING_ENTRIES,
TranscriptMirrorBatcher,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -87,23 +91,31 @@ def build_mirror_batcher(
materialized: MaterializedResume | None,
env: dict[str, str] | None,
on_error: Callable[[SessionKey | None, str], Awaitable[None]],
flush_mode: SessionStoreFlushMode = "batched",
) -> TranscriptMirrorBatcher:
"""Construct the :class:`TranscriptMirrorBatcher` for a session.

Resolves ``projects_dir`` to the materialized temp dir when present
(so file_path → key resolution matches what the subprocess writes),
otherwise to the standard projects directory under the effective
``CLAUDE_CONFIG_DIR``.

``flush_mode="eager"`` zeroes the batcher's pending thresholds so every
enqueued frame schedules a background flush; ``"batched"`` keeps the
defaults (flush on ``result`` or 500-entry / 1 MiB overflow).
"""
projects_dir = (
str(materialized.config_dir / "projects")
if materialized is not None
else str(_get_projects_dir(env))
)
eager = flush_mode == "eager"
return TranscriptMirrorBatcher(
store=store,
projects_dir=projects_dir,
on_error=on_error,
max_pending_entries=0 if eager else MAX_PENDING_ENTRIES,
max_pending_bytes=0 if eager else MAX_PENDING_BYTES,
)


Expand Down
1 change: 1 addition & 0 deletions src/claude_agent_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ async def _on_mirror_error(key: Any, error: str) -> None:
materialized=self._materialized,
env=self.options.env,
on_error=_on_mirror_error,
flush_mode=self.options.session_store_flush,
)
)

Expand Down
23 changes: 23 additions & 0 deletions src/claude_agent_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,19 @@ class SessionListSubkeysKey(TypedDict):
session_id: str


SessionStoreFlushMode = Literal["batched", "eager"]
"""Controls when transcript-mirror entries are flushed to a :class:`SessionStore`.

- ``"batched"`` (default): buffer entries and flush once per turn (on the
``result`` message) or when the pending buffer exceeds 500 entries / 1 MiB.
Keeps adapter latency off the streaming hot path.
- ``"eager"``: trigger a background flush after every ``transcript_mirror``
frame so ``SessionStore.append()`` sees entries in near real time. Appends
are still serialized in enqueue order; a slow adapter will not stall the
read loop but will see frames coalesced while it is busy.
"""


class SessionStore(Protocol):
"""Adapter for mirroring session transcripts to external storage.

Expand Down Expand Up @@ -1758,6 +1771,16 @@ class ClaudeAgentOptions:
when the local file is absent.
"""

session_store_flush: SessionStoreFlushMode = "batched"
"""When to flush mirrored transcript entries to ``session_store``.

``"batched"`` (default) coalesces entries and flushes once per turn or when
the buffer exceeds 500 entries / 1 MiB. ``"eager"`` triggers a background
flush after every frame for near-real-time delivery (each flush still runs
off the read loop, so a slow adapter does not stall message streaming).
Ignored when ``session_store`` is ``None``.
"""

load_timeout_ms: int = 60_000
"""Timeout for each ``session_store.load()`` / ``list_subkeys()`` call during
resume materialization, in milliseconds.
Expand Down
124 changes: 123 additions & 1 deletion tests/test_transcript_mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
query,
)
from claude_agent_sdk._internal.query import Query
from claude_agent_sdk._internal.session_resume import build_mirror_batcher
from claude_agent_sdk._internal.session_store import file_path_to_session_key
from claude_agent_sdk._internal.sessions import _get_projects_dir
from claude_agent_sdk._internal.transcript_mirror_batcher import (
Expand Down Expand Up @@ -503,6 +504,62 @@ async def append(self, key, entries):
assert order == [1, 2, 3]


# ---------------------------------------------------------------------------
# build_mirror_batcher / session_store_flush
# ---------------------------------------------------------------------------


class TestBuildMirrorBatcherFlushMode:
"""``session_store_flush`` threads through ``build_mirror_batcher`` to the
batcher's pending thresholds: ``"batched"`` keeps the defaults,
``"eager"`` zeroes them so every enqueue schedules a background flush."""

@pytest.mark.parametrize(
("kwargs", "want_entries", "want_bytes"),
[
({}, MAX_PENDING_ENTRIES, MAX_PENDING_BYTES),
({"flush_mode": "batched"}, MAX_PENDING_ENTRIES, MAX_PENDING_BYTES),
({"flush_mode": "eager"}, 0, 0),
],
ids=["default", "batched", "eager"],
)
def test_flush_mode_sets_thresholds(
self, kwargs: dict[str, Any], want_entries: int, want_bytes: int
) -> None:
batcher = build_mirror_batcher(
store=InMemorySessionStore(),
materialized=None,
env={"CLAUDE_CONFIG_DIR": str(Path(PROJECTS_DIR).parent)},
on_error=_noop_error,
**kwargs,
)
assert batcher.max_pending_entries == want_entries
assert batcher.max_pending_bytes == want_bytes

@pytest.mark.asyncio
async def test_eager_mode_flushes_per_frame(self) -> None:
store = _RecordingStore()
batcher = build_mirror_batcher(
store=store,
materialized=None,
env={"CLAUDE_CONFIG_DIR": str(Path(PROJECTS_DIR).parent)},
on_error=_noop_error,
flush_mode="eager",
)
batcher.enqueue(_main_path(), [{"type": "user", "n": 1}])
await asyncio.sleep(0)
await asyncio.sleep(0)
assert len(store.append_calls) == 1
batcher.enqueue(_main_path(), [{"type": "assistant", "n": 2}])
await asyncio.sleep(0)
await asyncio.sleep(0)
assert len(store.append_calls) == 2
assert [e["n"] for c in store.append_calls for e in c[1]] == [1, 2]

def test_options_default_is_batched(self) -> None:
assert ClaudeAgentOptions().session_store_flush == "batched"


# ---------------------------------------------------------------------------
# --session-mirror CLI flag
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -533,11 +590,15 @@ def test_flag_absent_when_session_store_unset(self) -> None:
# ---------------------------------------------------------------------------


def _make_mock_transport(messages: list[dict[str, Any]]) -> Any:
def _make_mock_transport(
messages: list[dict[str, Any]], *, yield_between: bool = False
) -> Any:
mock_transport = AsyncMock()

async def mock_receive():
for msg in messages:
if yield_between:
await anyio.sleep(0)
yield msg

mock_transport.read_messages = mock_receive
Expand Down Expand Up @@ -710,6 +771,67 @@ async def _test() -> None:

anyio.run(_test)

def test_eager_flush_mode_appends_per_frame_before_result(self) -> None:
"""With ``session_store_flush="eager"`` each ``transcript_mirror`` frame
is flushed as it arrives, so the store sees one ``append()`` per frame
rather than a single coalesced batch at ``result`` time."""

async def _test() -> None:
store = _RecordingStore()
frame1 = {
"type": "transcript_mirror",
"filePath": _main_path("p", "s"),
"entries": [{"type": "user", "uuid": "u1"}],
}
frame2 = {
"type": "transcript_mirror",
"filePath": _main_path("p", "s"),
"entries": [{"type": "assistant", "uuid": "a1"}],
}
# Yield to the event loop between frames so the eager background
# drain scheduled by enqueue() can run before the next frame
# arrives — models the await on real stdout I/O. Without this the
# mock delivers both frames synchronously and they coalesce, which
# is correct back-pressure behaviour but not what we're asserting.
mock_transport = _make_mock_transport(
[frame1, frame2, _ASSISTANT_MSG, _RESULT_MSG],
yield_between=True,
)

with (
patch(
"claude_agent_sdk._internal.client.SubprocessCLITransport"
) as mock_cls,
patch(
"claude_agent_sdk._internal.query.Query.initialize",
new_callable=AsyncMock,
),
patch(
"claude_agent_sdk._internal.session_resume._get_projects_dir",
return_value=PROJECTS_DIR,
),
):
mock_cls.return_value = mock_transport
appends_at_assistant = None
async for msg in query(
prompt="Hello",
options=ClaudeAgentOptions(
session_store=store, session_store_flush="eager"
),
):
if isinstance(msg, AssistantMessage):
appends_at_assistant = len(store.append_calls)

# Both frames flushed individually before the assistant message
# was yielded (eager background flush ran while the read loop
# awaited the next stdout line).
assert appends_at_assistant == 2
assert len(store.append_calls) == 2
assert store.append_calls[0][1] == [{"type": "user", "uuid": "u1"}]
assert store.append_calls[1][1] == [{"type": "assistant", "uuid": "a1"}]

anyio.run(_test)

def test_mirror_frames_dropped_when_no_session_store(self) -> None:
"""Without a session_store the batcher isn't attached; frames are
peeled and dropped (still not yielded), normal messages flow."""
Expand Down
Loading