Skip to content

Commit a7a5ead

Browse files
authored
sdk(session_store): bounded retry on mirror append + uuid idempotency docs (#857)
## Summary Adds bounded retry (3 attempts, (0.2s, 0.8s) backoff) to `TranscriptMirrorBatcher._do_flush` so transient `SessionStore.append()` failures don't immediately drop the batch. Only after retries are exhausted is the batch dropped and `MirrorErrorMessage` emitted. Also documents that adapters should treat each entry's `uuid` as an idempotency key. ## Changes - `_internal/transcript_mirror_batcher.py`: retry loop, constants `MIRROR_APPEND_MAX_ATTEMPTS=3` / `MIRROR_APPEND_BACKOFF_S=(0.2, 0.8)` - `types.py`: `SessionStore.append` docstring - `tests/test_transcript_mirror.py`: 2 new retry tests; patched `asyncio.sleep` in 3 existing failure tests Paired TS SDK change: anthropics/claude-cli-internal#30980 <!-- CHANGELOG:START --> - `SessionStore.append()` failures are now retried up to 3 times with short backoff before the batch is dropped and `MirrorErrorMessage` is emitted <!-- CHANGELOG:END -->
1 parent 32f09c1 commit a7a5ead

3 files changed

Lines changed: 182 additions & 14 deletions

File tree

src/claude_agent_sdk/_internal/transcript_mirror_batcher.py

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@
2727
MAX_PENDING_BYTES = 1 << 20 # 1 MiB
2828
SEND_TIMEOUT_SECONDS = 60.0
2929

30+
# Bounded retry for transient adapter failures. Backoff list length must be
31+
# MAX_ATTEMPTS - 1 (one delay between each pair of attempts).
32+
MIRROR_APPEND_MAX_ATTEMPTS = 3
33+
MIRROR_APPEND_BACKOFF_S = (0.2, 0.8)
34+
3035

3136
@dataclass
3237
class _MirrorEntry:
@@ -44,9 +49,14 @@ class TranscriptMirrorBatcher:
4449
an eager flush fires in the background so memory stays flat during long
4550
turns where no ``result`` (and thus no explicit ``flush()``) arrives.
4651
47-
Adapter failures are reported via ``on_error`` and the failed batch is
48-
dropped (at-most-once delivery). Failures never raise — the local-disk
49-
transcript is already durable so the session must continue unaffected.
52+
Adapter failures are retried (``MIRROR_APPEND_MAX_ATTEMPTS`` attempts
53+
total) with short backoff; timeouts are not retried since the in-flight
54+
call may still land. Only after the final attempt fails is the batch
55+
dropped and reported via ``on_error``. Failures never raise — the
56+
local-disk transcript is already durable so the session must continue
57+
unaffected. Adapters should dedupe by ``entry["uuid"]`` when present
58+
(some entry types lack a uuid) since a retried batch may partially
59+
overlap a prior partial write.
5060
"""
5161

5262
store: SessionStore
@@ -163,12 +173,46 @@ async def _do_flush(
163173
self.projects_dir,
164174
)
165175
continue
166-
try:
167-
await asyncio.wait_for(
168-
self.store.append(key, entries), timeout=self.send_timeout
169-
)
170-
except Exception as e:
176+
last_err: Exception | None = None
177+
succeeded = False
178+
for attempt in range(MIRROR_APPEND_MAX_ATTEMPTS):
179+
if attempt > 0:
180+
await asyncio.sleep(MIRROR_APPEND_BACKOFF_S[attempt - 1])
181+
try:
182+
await asyncio.wait_for(
183+
self.store.append(key, entries), timeout=self.send_timeout
184+
)
185+
succeeded = True
186+
break
187+
except asyncio.TimeoutError as e:
188+
# Don't retry on timeout: wait_for cancels the task but
189+
# cancellation is best-effort for adapters wrapping
190+
# non-cancellable I/O, so the in-flight call may still
191+
# land — a retry would launch a concurrent duplicate.
192+
# Also keeps worst-case lock hold at ~send_timeout rather
193+
# than ~3×send_timeout + backoff.
194+
last_err = e
195+
logger.debug(
196+
"[TranscriptMirrorBatcher] append timed out after "
197+
"%.1fs for %s — not retrying",
198+
self.send_timeout,
199+
file_path,
200+
)
201+
break
202+
except Exception as e: # noqa: BLE001 - adapter is user code
203+
last_err = e
204+
logger.debug(
205+
"[TranscriptMirrorBatcher] append attempt %d/%d failed "
206+
"for %s: %s",
207+
attempt + 1,
208+
MIRROR_APPEND_MAX_ATTEMPTS,
209+
file_path,
210+
e,
211+
)
212+
if not succeeded:
171213
logger.error(
172-
"[TranscriptMirrorBatcher] flush failed for %s: %s", file_path, e
214+
"[TranscriptMirrorBatcher] flush failed for %s: %s",
215+
file_path,
216+
last_err,
173217
)
174-
errors.append((key, str(e)))
218+
errors.append((key, str(last_err)))

src/claude_agent_sdk/types.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,8 +1274,13 @@ async def append(self, key: SessionKey, entries: list[SessionStoreEntry]) -> Non
12741274
Within a single process, persist entries in append-call order; across
12751275
concurrent processes, order is by storage commit time, not call time.
12761276
1277-
Exceptions are logged; the subprocess continues unaffected.
1278-
At-most-once delivery — failed batches are not retried.
1277+
Most entries carry a stable ``uuid`` that adapters should treat as an
1278+
idempotency key (upsert / ignore-duplicate). Entries without a
1279+
``uuid`` (e.g. titles, tags, mode markers) should be appended without
1280+
dedup. Exceptions are logged and the subprocess continues unaffected
1281+
— failed batches are retried (3 attempts total) with short backoff
1282+
before being dropped and surfaced as a ``MirrorErrorMessage``;
1283+
timeouts are not retried since the in-flight call may still land.
12791284
"""
12801285
...
12811286

tests/test_transcript_mirror.py

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ async def _noop_error(_key: SessionKey | None, _err: str) -> None:
151151
pass
152152

153153

154+
# Patch target for the retry backoff — the batcher does ``import asyncio`` so
155+
# patching this attribute swaps the global ``asyncio.sleep`` for the duration
156+
# of the ``with`` block.
157+
_BATCHER_SLEEP = "claude_agent_sdk._internal.transcript_mirror_batcher.asyncio.sleep"
158+
159+
154160
class _RecordingStore(InMemorySessionStore):
155161
"""InMemorySessionStore that records each append call separately."""
156162

@@ -260,17 +266,22 @@ async def on_error(key: SessionKey | None, err: str) -> None:
260266
store=FailingStore(), projects_dir=PROJECTS_DIR, on_error=on_error
261267
)
262268
batcher.enqueue(_main_path(), [{"type": "x"}])
263-
await batcher.flush() # must not raise
269+
with patch(_BATCHER_SLEEP, new=AsyncMock()):
270+
await batcher.flush() # must not raise
264271

265272
assert len(errors) == 1
266273
assert errors[0][0] == {"project_key": "proj", "session_id": "sess"}
267274
assert "boom" in errors[0][1]
268275

269276
@pytest.mark.asyncio
270277
async def test_append_timeout_calls_on_error(self) -> None:
278+
"""Timeout → on_error fires once, append is NOT retried (1 attempt)."""
279+
calls: list[int] = []
280+
271281
class HangingStore(InMemorySessionStore):
272282
async def append(self, key, entries):
273-
await asyncio.sleep(10)
283+
calls.append(1)
284+
await asyncio.Event().wait() # never resolves
274285

275286
errors: list[str] = []
276287

@@ -284,8 +295,115 @@ async def on_error(_key: SessionKey | None, err: str) -> None:
284295
send_timeout=0.05,
285296
)
286297
batcher.enqueue(_main_path(), [{"type": "x"}])
298+
sleep_mock = AsyncMock()
299+
with patch(_BATCHER_SLEEP, new=sleep_mock):
300+
await batcher.flush()
301+
assert len(calls) == 1 # not retried on timeout
302+
assert len(errors) == 1
303+
sleep_mock.assert_not_awaited() # no backoff sleep
304+
305+
@pytest.mark.asyncio
306+
async def test_append_timeout_no_concurrent_retry(self) -> None:
307+
"""A slow append that outlives send_timeout is attempted exactly once;
308+
no retry overlaps the still-in-flight first call."""
309+
in_flight = 0
310+
max_in_flight = 0
311+
calls = 0
312+
313+
class SlowStore(InMemorySessionStore):
314+
async def append(self, key, entries):
315+
nonlocal in_flight, max_in_flight, calls
316+
calls += 1
317+
in_flight += 1
318+
max_in_flight = max(max_in_flight, in_flight)
319+
try:
320+
# Outlives send_timeout=0.02 and shields against the
321+
# cancellation wait_for issues — models a non-cancellable
322+
# adapter (e.g. sync I/O in a thread).
323+
await asyncio.shield(asyncio.sleep(0.1))
324+
finally:
325+
in_flight -= 1
326+
327+
errors: list[str] = []
328+
329+
async def on_error(_key: SessionKey | None, err: str) -> None:
330+
errors.append(err)
331+
332+
batcher = TranscriptMirrorBatcher(
333+
store=SlowStore(),
334+
projects_dir=PROJECTS_DIR,
335+
on_error=on_error,
336+
send_timeout=0.02,
337+
)
338+
batcher.enqueue(_main_path(), [{"type": "x"}])
287339
await batcher.flush()
340+
# Let any (incorrectly) shielded/retried task observe overlap.
341+
await asyncio.sleep(0.15)
342+
343+
assert calls == 1
344+
assert max_in_flight == 1
345+
assert len(errors) == 1
346+
347+
@pytest.mark.asyncio
348+
async def test_append_retries_then_succeeds_no_error_reported(self) -> None:
349+
"""Transient outage: append raises twice then succeeds on the 3rd
350+
attempt — batch is delivered, no mirror error reported."""
351+
attempts: list[int] = []
352+
353+
class FlakyStore(InMemorySessionStore):
354+
async def append(self, key, entries):
355+
attempts.append(1)
356+
if len(attempts) < 3:
357+
raise RuntimeError("transient")
358+
await super().append(key, entries)
359+
360+
errors: list[tuple[SessionKey | None, str]] = []
361+
362+
async def on_error(key: SessionKey | None, err: str) -> None:
363+
errors.append((key, err))
364+
365+
store = FlakyStore()
366+
batcher = TranscriptMirrorBatcher(
367+
store=store, projects_dir=PROJECTS_DIR, on_error=on_error
368+
)
369+
batcher.enqueue(_main_path(), [{"type": "x"}])
370+
sleep_mock = AsyncMock()
371+
with patch(_BATCHER_SLEEP, new=sleep_mock):
372+
await batcher.flush()
373+
374+
assert len(attempts) == 3
375+
assert errors == []
376+
assert await store.load({"project_key": "proj", "session_id": "sess"}) == [
377+
{"type": "x"}
378+
]
379+
# Backoff schedule honoured between attempts.
380+
assert [c.args[0] for c in sleep_mock.await_args_list] == [0.2, 0.8]
381+
382+
@pytest.mark.asyncio
383+
async def test_append_retries_exhausted_reports_error_once(self) -> None:
384+
"""append raises on all 3 attempts → exactly one mirror error."""
385+
attempts: list[int] = []
386+
387+
class AlwaysFailingStore(InMemorySessionStore):
388+
async def append(self, key, entries):
389+
attempts.append(1)
390+
raise RuntimeError("boom")
391+
392+
errors: list[tuple[SessionKey | None, str]] = []
393+
394+
async def on_error(key: SessionKey | None, err: str) -> None:
395+
errors.append((key, err))
396+
397+
batcher = TranscriptMirrorBatcher(
398+
store=AlwaysFailingStore(), projects_dir=PROJECTS_DIR, on_error=on_error
399+
)
400+
batcher.enqueue(_main_path(), [{"type": "x"}])
401+
with patch(_BATCHER_SLEEP, new=AsyncMock()):
402+
await batcher.flush()
403+
404+
assert len(attempts) == 3
288405
assert len(errors) == 1
406+
assert "boom" in errors[0][1]
289407

290408
@pytest.mark.asyncio
291409
async def test_close_flushes_pending(self) -> None:
@@ -656,6 +774,7 @@ async def append(self, key, entries):
656774
"claude_agent_sdk._internal.session_resume._get_projects_dir",
657775
return_value=PROJECTS_DIR,
658776
),
777+
patch(_BATCHER_SLEEP, new=AsyncMock()),
659778
):
660779
mock_cls.return_value = mock_transport
661780
messages = [

0 commit comments

Comments
 (0)