Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions tests/test_transcript_mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import asyncio
import os
import time
from collections.abc import Callable
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, Mock, patch
Expand Down Expand Up @@ -152,6 +154,23 @@ async def _noop_error(_key: SessionKey | None, _err: str) -> None:
pass


async def _wait_until(predicate: Callable[[], bool], *, timeout: float = 1.0) -> None:
"""Yield to the event loop until ``predicate()`` returns truthy.

Replaces fixed ``await asyncio.sleep(0)`` counts in eager-flush tests
where the path from ``enqueue`` to ``store.append`` requires multiple
event-loop turns under lock contention. See #928 for the original
flakiness analysis.
"""
deadline = time.monotonic() + timeout
while not predicate():
if time.monotonic() > deadline:
raise AssertionError(
f"_wait_until predicate did not become truthy within {timeout}s"
)
await asyncio.sleep(0)


# Patch target for the retry backoff — the batcher does ``import asyncio`` so
# patching this attribute swaps the global ``asyncio.sleep`` for the duration
# of the ``with`` block.
Expand Down Expand Up @@ -546,14 +565,13 @@ async def test_eager_mode_flushes_per_frame(self) -> None:
on_error=_noop_error,
flush_mode="eager",
)
# Use _wait_until rather than a fixed ``sleep(0)`` count: the path
# from enqueue() to store.append() needs multiple event-loop turns
# under lock contention between consecutive drains. See #928.
batcher.enqueue(_main_path(), [{"type": "user", "n": 1}])
await asyncio.sleep(0)
await asyncio.sleep(0)
assert len(store.append_calls) == 1
await _wait_until(lambda: len(store.append_calls) == 1)
batcher.enqueue(_main_path(), [{"type": "assistant", "n": 2}])
await asyncio.sleep(0)
await asyncio.sleep(0)
assert len(store.append_calls) == 2
await _wait_until(lambda: len(store.append_calls) == 2)
assert [e["n"] for c in store.append_calls for e in c[1]] == [1, 2]

def test_options_default_is_batched(self) -> None:
Expand Down Expand Up @@ -591,13 +609,17 @@ def test_flag_absent_when_session_store_unset(self) -> None:


def _make_mock_transport(
messages: list[dict[str, Any]], *, yield_between: bool = False
messages: list[dict[str, Any]], *, yields_between: int = 0
) -> Any:
"""Mock transport. ``yields_between`` inserts that many ``sleep(0)``
cycles before each frame (except the first) — needed by eager-flush
tests so background drain tasks have enough event-loop turns to
complete before the next frame arrives. See #928."""
mock_transport = AsyncMock()

async def mock_receive():
for msg in messages:
if yield_between:
for _ in range(yields_between):
await anyio.sleep(0)
yield msg

Expand Down Expand Up @@ -788,14 +810,14 @@ async def _test() -> None:
"filePath": _main_path("p", "s"),
"entries": [{"type": "assistant", "uuid": "a1"}],
}
# Yield to the event loop between frames so the eager background
# drain scheduled by enqueue() can run before the next frame
# arrives — models the await on real stdout I/O. Without this the
# mock delivers both frames synchronously and they coalesce, which
# is correct back-pressure behaviour but not what we're asserting.
# Yield to the event loop multiple times between frames so the
# eager background drain scheduled by enqueue() can complete
# before the next frame arrives — models the await on real
# stdout I/O. Two yields aren't enough under lock contention
# between consecutive drains (~4 needed). See #928.
mock_transport = _make_mock_transport(
[frame1, frame2, _ASSISTANT_MSG, _RESULT_MSG],
yield_between=True,
yields_between=10,
)

with (
Expand Down