Skip to content

feat(session_store): add session_store_flush option for eager mirroring#905

Merged
qing-ant merged 1 commit into
anthropics:mainfrom
lorenzojb:feat/session-store-eager-flush
May 3, 2026
Merged

feat(session_store): add session_store_flush option for eager mirroring#905
qing-ant merged 1 commit into
anthropics:mainfrom
lorenzojb:feat/session-store-eager-flush

Conversation

@lorenzojb
Copy link
Copy Markdown
Contributor

Summary

Adds ClaudeAgentOptions.session_store_flush: Literal["batched", "eager"] (default "batched") so callers can opt into near-real-time SessionStore.append() delivery instead of waiting for the end-of-turn flush.

Today the TranscriptMirrorBatcher buffers transcript_mirror frames and only flushes when the result message arrives (or on 500-entry / 1 MiB overflow). That keeps adapter latency off the streaming hot path, but it means an external store can't observe a turn until it's finished — a problem for live-tailing UIs, cross-process resume, or crash-durability use cases that want the mirror to track the on-disk JSONL closely.

With "eager", build_mirror_batcher() zeroes both pending thresholds so every enqueued frame schedules a background drain. The drain still runs off the read loop via asyncio.ensure_future, so a slow adapter does not stall message streaming — it just sees frames coalesced while it's busy. Append ordering is preserved by the existing batcher lock.

Also exports the SessionStoreFlushMode type alias for callers that thread the value through their own config.

API

ClaudeAgentOptions(
    session_store=my_store,
    session_store_flush="eager",  # default: "batched"
)

Tests

6 new tests in tests/test_transcript_mirror.py:

  • TestBuildMirrorBatcherFlushMode::test_flush_mode_sets_thresholds[default|batched|eager] — parametrized: omitted/"batched" keep MAX_PENDING_* defaults, "eager" zeroes both
  • TestBuildMirrorBatcherFlushMode::test_eager_mode_flushes_per_frame — two enqueues → two separate append() calls without an explicit flush()
  • TestBuildMirrorBatcherFlushMode::test_options_default_is_batchedClaudeAgentOptions() default
  • TestReceiveLoopFramePeeling::test_eager_flush_mode_appends_per_frame_before_result — end-to-end through query(): with session_store_flush="eager" and a transport that yields between frames, the store sees one append() per frame before the AssistantMessage is yielded (vs. a single coalesced batch in the default mode)

_make_mock_transport() gains a yield_between kwarg so the integration test can model the await on real stdout I/O between frames.

Test plan

  • python -m ruff check src/ tests/
  • python -m ruff format src/ tests/
  • python -m mypy src/
  • python -m pytest tests/ (734 passed, 4 skipped)

Adds ClaudeAgentOptions.session_store_flush ("batched" | "eager",
default "batched"). With "eager", build_mirror_batcher() zeroes the
TranscriptMirrorBatcher pending thresholds so every transcript_mirror
frame schedules a background flush, delivering entries to
SessionStore.append() in near real time instead of coalescing until the
end-of-turn result message. Appends remain serialized in enqueue order;
a slow adapter does not stall the read loop (frames coalesce while it
is busy).

Exports the SessionStoreFlushMode type alias.
@codecov-commenter
Copy link
Copy Markdown

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

✅ All modified and coverable lines are covered by tests.
⚠️ Please upload report for BASE (main@b512f25). Learn more about missing BASE report.
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@           Coverage Diff           @@
##             main     #905   +/-   ##
=======================================
  Coverage        ?   88.24%           
=======================================
  Files           ?       23           
  Lines           ?     3904           
  Branches        ?        0           
=======================================
  Hits            ?     3445           
  Misses          ?      459           
  Partials        ?        0           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@qing-ant
Copy link
Copy Markdown
Contributor

qing-ant commented May 3, 2026

/sdk-e2e-proof — PR #905

Manual end-to-end proof against the real CLI for ClaudeAgentOptions.session_store_flush.

Setup: in-memory SessionStore recording (timestamp, len(entries)) on every append(); observed append-call counts mid-stream vs after ResultMessage.

Case A: query() eager — PASS

  • append() calls before ResultMessage: 3; total append() calls: 3; total entries: 8; per-message snapshots (msg#, appends): [(1, 0), (2, 2), (3, 3), (4, 3)]

Case B: query() batched — PASS

  • append() calls before ResultMessage: 0; total append() calls: 1; total entries: 8; per-message snapshots (msg#, appends): [(1, 0), (2, 0), (3, 0), (4, 1)]

Case C: client eager multi-turn — PASS

  • append() calls per turn: [3, 3]; grand total append() calls: 6; total entries: 13

Overall: PASS

Eager mode produced multiple append() calls before the result message (near-real-time mirroring); batched mode coalesced into an end-of-turn flush as expected.

@qing-ant qing-ant enabled auto-merge (squash) May 3, 2026 17:57
@qing-ant qing-ant merged commit 0a69e94 into anthropics:main May 3, 2026
7 checks passed
qing-ant added a commit that referenced this pull request May 5, 2026
#908)

Fixes #859.

## Problem

`Query.close()` deliberately leaves `_message_receive` open so a
concurrently-draining consumer can read buffered messages (anyio's
`receive_nowait()` checks `_closed` before the buffer, so closing it
inside `close()` would make a non-parked consumer hit
`ClosedResourceError` and drop them — see
`test_buffered_messages_drain_after_close_*`). But nothing ever closed
the receive side afterward, so anyio's `__del__` emitted

```
ResourceWarning: Unclosed <MemoryObjectReceiveStream at ...>
```

under `PYTHONDEVMODE` for every `ClaudeSDKClient` context-manager exit
and every `query()` call.

## Fix

Add `Query.close_receive_stream()` and call it from the two consumers —
`ClaudeSDKClient.disconnect()` and the `query()` generator's `finally` —
once iteration is done. This preserves the buffered-drain-after-close
invariant (`close()` itself still doesn't touch the receive side) while
ensuring the stream is released in the normal teardown path.

## Relationship to #657

#657 closes `_message_receive` inside `Query.close()`, which would
regress `test_buffered_messages_drain_after_close_*`. This PR closes it
at the consumer boundary instead, so that test stays green.

## Tests

- New
`TestClaudeSDKClientResourceCleanup::test_disconnect_closes_receive_stream`
— fails on `main` (`receive_nowait()` raises `EndOfStream`, i.e.
open-but-exhausted), passes here (`ClosedResourceError`).
- Existing `test_buffered_messages_drain_after_close_{asyncio,trio}`
still pass.
- 711/713 unit tests pass; the 2 failures are pre-existing on `main`
(`test_transcript_mirror.py` eager-flush, from #905) and unrelated.
- ruff / mypy clean.

---
_Generated by [Claude
Code](https://claude.ai/code/session_014NKFGXAKK5knopg1q8BhPQ)_

---------

Co-authored-by: Claude <noreply@anthropic.com>
qing-ant added a commit that referenced this pull request May 5, 2026
…911)

## Summary

`PostToolUseHookSpecificOutput` was missing the `updatedToolOutput`
field that the CLI already accepts. This field replaces the output of
**any** tool (Bash, Read, Edit, MCP, …) before it reaches the model,
whereas the existing `updatedMCPToolOutput` only applies to MCP tools.

The TypeScript SDK already declares this field on its
`PostToolUseHookSpecificOutput` type, so this is a parity fix — Python
users currently only see `updatedMCPToolOutput` in autocomplete and have
no type-safe way to express the all-tools variant.

Relates to #781 (where the reporter is using `updatedMCPToolOutput` and
hitting shape issues; `updatedToolOutput` is the recommended field per
the CLI's own schema description: "Prefer updatedToolOutput, which works
for all tools").

## Change

- `types.py`: add `updatedToolOutput: NotRequired[Any]` with a docstring
noting that for built-in tools the value must match the tool's output
schema (e.g. `{"stdout", "stderr", "interrupted"}` for Bash) — a
mismatched shape is rejected and the original output is kept.
- `types.py`: add the "prefer updatedToolOutput" docstring to the
existing `updatedMCPToolOutput` field.
- `tests/test_types.py`: type-level test mirroring the existing
`updatedMCPToolOutput` test.

No runtime code changes — the field already round-trips through
`_convert_hook_output_for_cli` since it's a passthrough.

## Tests

- 717 passed, 2 pre-existing failures in `test_transcript_mirror.py`
(eager-flush, from #905) — same baseline as #908/#909.
- ruff / mypy: clean (2 pre-existing `_task_compat.py` trio
import-not-found, none introduced).
- Live e2e proof in PR comment below.

---
_Generated by [Claude
Code](https://claude.ai/code/session_01T7Xiu2PBZNHNyT38bcGrK6)_
qing-ant added a commit that referenced this pull request May 5, 2026
## Summary

The previous `can_use_tool` docstring said it is "called before each
tool execution", which led users to expect a universal pre-tool
interceptor (#469 has 6 commenters over 4 months hitting this). In
practice the CLI only emits the `can_use_tool` control request when its
permission rules evaluate to **"ask"** — tool calls already permitted by
`allowed_tools`, `permission_mode` (e.g. `acceptEdits` /
`bypassPermissions`), or `permissions.allow` rules in settings never
reach the callback. The new docstring explains this and points users at
`PreToolUse` hooks for the universal-interceptor use case.

Also clarifies that `ToolPermissionContext.tool_use_id` is always a
non-empty string when delivered via `can_use_tool` (the wire schema
marks it required); the `Optional` typing exists only for dataclass
field-ordering compatibility, so callers don't need a `None` fallback
(#844).

Relates to #469, #844.

## Why docs and not a behavior change

`can_use_tool` is the SDK replacement for the *interactive permission
prompt* — by design it's only consulted when the CLI would have asked a
human. Changing it to fire on every tool call would silently change
behavior for existing consumers (suddenly prompting for
previously-allowed calls). Users who want to observe/gate every call
already have `PreToolUse` hooks, which are documented for exactly that.

## Tests

- 716 passed, 2 pre-existing failures in `test_transcript_mirror.py`
(eager-flush, from #905 — same baseline as #908/#909/#911).
- ruff / mypy clean (2 pre-existing `_task_compat.py` trio
import-not-found, none introduced).
- Live e2e proof in PR comment below.

---
_Generated by [Claude
Code](https://claude.ai/code/session_01QVkHJ8kBykwJHjbTHmZTD8)_
qing-ant added a commit that referenced this pull request May 5, 2026
## Summary

Adds `"xhigh"` to the `effort` `Literal` on both `ClaudeAgentOptions`
and `AgentDefinition`, matching the CLI (`claude --help` lists `low,
medium, high, xhigh, max`) and the TypeScript SDK's `EffortLevel` type.
The docstring is updated to note that `xhigh` is Opus 4.7-specific and
falls back to `high` on other models.

Previously, type checkers rejected `ClaudeAgentOptions(effort="xhigh")`
even though the CLI supports it:

```
error: Argument "effort" to "ClaudeAgentOptions" has incompatible type
"Literal['xhigh']"; expected "Literal['low', 'medium', 'high', 'max'] | None"
```

Addresses the typing half of #834. The error-surfacing half (CLI
validation failures surface as opaque `ProcessError` without stderr)
overlaps with #798.

## Relationship to #835

#835 also adds `xhigh` plus stderr retention, but is based on a pre-#885
main and now conflicts (it touches `_stderr_task_group`, which #885
removed). This PR is the minimal, conflict-free typing-only fix; the
stderr work can land via #828 or a rebased #835.

## Tests

- New `test_effort_accepts_xhigh_level` (AgentDefinition serialization)
and `test_build_command_with_effort_xhigh` (CLI flag construction).
- Full suite: 718 passed, 2 pre-existing failures in
`test_transcript_mirror.py` (eager-flush, from #905) — same baseline as
#908/#909/#911/#912.
- `ruff check` / `ruff format`: clean.
- `mypy src/`: clean.
- Live e2e proof in PR comment below.

## Changelog

### New Features

- Added `"xhigh"` to the `effort` Literal on `ClaudeAgentOptions` and
`AgentDefinition` (#914)

---
_Generated by [Claude
Code](https://claude.ai/code/session_012Qn9YazWceoUqnjM74eJvU)_

Co-authored-by: Claude <noreply@anthropic.com>
qing-ant added a commit that referenced this pull request May 7, 2026
Fixes #913.

## Problem

When the CLI emits a result message with `is_error: True` (e.g.
`subtype=error_max_turns`, `error_during_execution`,
`error_max_budget_usd`) it then exits with a non-zero code. The
transport's `_read_messages_impl` raises `ProcessError` on that exit
code, which `Query._read_messages` converts to a `{"type": "error"}`
stream message, and `receive_messages()` re-raises as a bare `Exception`
— *after* the consumer has already received the structured
`ResultMessage`:

```
[msg] ResultMessage subtype=error_max_turns is_error=True num_turns=2
Exception: Command failed with exit code 1 (exit code: 1)
Error output: Check stderr output for details
```

The consumer has all the information it needs in the `ResultMessage`
(subtype, turn count, cost). The trailing exception is redundant noise
that forces every consumer to wrap their `async for` in a defensive
`try/except Exception` and inspect turn counts to distinguish "hit my
own `max_turns`" from "real transport error" — see the workaround in
#913.

## Fix

Track whether an error result was delivered. If so, treat a subsequent
`ProcessError` as expected termination and let the `finally` block send
`{"type": "end"}` so the consumer's `async for` exits cleanly.
`ProcessError` raised *before* any result, or after a *success* result,
still propagates (those indicate genuinely unexpected failures).

The general "exception flattening / stderr swallowed" problem from #798
is unchanged by this PR — that's a broader fix tracked at #828. This PR
only addresses the specific case where the CLI's exit code is the
*expected consequence* of the result it just emitted.

## Tests

- `test_process_error_after_error_result_is_suppressed` — fails on
`main` (`Exception: Command failed with exit code 1`), passes here.
- `test_process_error_without_result_still_raises` — `ProcessError`
before any result still surfaces.
- `test_process_error_after_success_result_still_raises` —
`ProcessError` after `is_error: False` still surfaces.
- Full suite: 719 passed, 2 pre-existing failures in
`test_transcript_mirror.py` (eager-flush, #905) — same baseline as
#908/#909/#911/#912/#914.
- `ruff check` / `ruff format` / `mypy src/`: clean.
- Live e2e proof in PR comment below.

---
_Generated by [Claude
Code](https://claude.ai/code/session_01QHcQB9ZxPcZiCt8p95tZJo)_
ashwin-ant pushed a commit that referenced this pull request May 14, 2026
…ic wait (#933)

## Summary

Fixes #928.

The two eager-flush tests assumed 2 `await asyncio.sleep(0)` yields
between consecutive `enqueue` calls were enough for each drain to
complete and append. Under lock contention between drains the path from
`enqueue` to `store.append` needs ~4 turns (drain releases lock → next
drain acquires it → `wait_for(store.append)` schedules its inner task →
record). Both tests fail 5/5 locally on Python 3.11.14 / macOS arm64; CI
got lucky on event-loop scheduling at merge time of #905. See #928 for
the full probe and yield-count sweep.

## Changes

### Unit-level test (`test_eager_mode_flushes_per_frame`)

Replace fixed `sleep(0)` count with a new `_wait_until(predicate,
timeout=1.0)` helper that yields until `len(store.append_calls)` reaches
the expected value, with a 1-second deadline. Deterministic — works
regardless of Python / pytest-asyncio / OS scheduling differences.

### Integration-level test
(`test_eager_flush_mode_appends_per_frame_before_result`)

Convert `_make_mock_transport`'s `yield_between: bool` to
`yields_between: int` (default `0`) and pass `yields_between=10` for
this test, so the mock yields the loop enough times between frames for
each eager flush to drain before the next frame arrives. Robust headroom
— 4 was the observed minimum, 10 leaves room for slower environments.

The signature change touches only one caller (this same test); other
callers omit the parameter and behave identically to before.

## Test plan

- [x] `for i in 1 2 3 4 5; do uv run pytest <both tests> -q; done` → 5/5
passed (was 5/5 failed before)
- [x] `uv run pytest tests/test_transcript_mirror.py` → 42/42 passed
- [x] `ruff check / ruff format` clean

## Related issues / PRs

- Filed alongside two other fixes from the same audit pass: #929 (stderr
callback swallow → PR #932), #930 (cancellation log noise → PR #931).
Independent of those.

Co-authored-by: Xian Zheng <xian.zheng@challenger.gauntletai.com>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants