Skip to content

Commit 5b1d8cd

Browse files
d-v-bclaude
andcommitted
test(core): cover mid-stream miss with concurrency > 1
Exercises the concurrent path where a missing key is observed while other fetches are still in flight. Uses an asyncio.Event to gate late arrivals until after the miss has been processed, giving the drain loop an opportunity to observe and discard post-stop completions, and verifies the iterator terminates cleanly without hanging or raising. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a4c3330 commit 5b1d8cd

1 file changed

Lines changed: 59 additions & 0 deletions

File tree

tests/test_coalesce.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,65 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None:
287287
assert len(groups[0]) == 1
288288

289289

290+
async def test_key_missing_mid_stream_with_concurrency_drains_late_arrivals() -> None:
291+
# Schedule multiple non-mergeable fetches under max_concurrency=3.
292+
# - Fetch #0 completes FIRST (short sleep) -> at least one yield observed.
293+
# - Fetch #1 returns None shortly after -> triggers the miss path.
294+
# - Fetches #2..#N are gated on an asyncio.Event so they only unblock
295+
# AFTER the miss has been observed, producing late arrivals that
296+
# exercise the `if stopped: continue` discard branch in the drain loop.
297+
late_gate = asyncio.Event()
298+
miss_fired = asyncio.Event()
299+
300+
async def fetch(byte_range: ByteRequest | None) -> Buffer | None:
301+
assert isinstance(byte_range, RangeByteRequest)
302+
start = byte_range.start
303+
if start == 0:
304+
# First to complete: small sleep so it arrives before the miss.
305+
await asyncio.sleep(0.01)
306+
return _buf(b"ok")
307+
if start == 1000:
308+
# Miss: a little later than #0 so #0 yields first.
309+
await asyncio.sleep(0.03)
310+
miss_fired.set()
311+
return None
312+
# Late arrivals: wait until the miss has been processed, then return
313+
# a buffer so the drain loop sees them post-stop.
314+
await asyncio.wait_for(miss_fired.wait(), timeout=5.0)
315+
await asyncio.wait_for(late_gate.wait(), timeout=5.0)
316+
return _buf(b"ok")
317+
318+
opts: CoalesceOptions = {
319+
"max_gap_bytes": -1, # force no merging (each range its own fetch)
320+
"max_coalesced_bytes": 1 << 20,
321+
"max_concurrency": 3,
322+
}
323+
# Stride by 1000 to avoid merging. 7 items fits within max_concurrency=3
324+
# while producing pending work after the miss.
325+
ranges: list[ByteRequest | None] = [RangeByteRequest(i * 1000, i * 1000 + 1) for i in range(7)]
326+
327+
groups: list[list[tuple[int, Buffer | None]]] = []
328+
agen = coalesced_get(fetch, ranges, options=opts)
329+
try:
330+
async for group in agen:
331+
groups.append(list(group))
332+
# After the first yield, release the late gate so remaining
333+
# in-flight tasks can complete and arrive post-stop.
334+
late_gate.set()
335+
finally:
336+
# Guard against a bug preventing any yield: unblock waiters anyway.
337+
late_gate.set()
338+
339+
# We observed exactly the one pre-miss yield.
340+
assert len(groups) == 1
341+
assert len(groups[0]) == 1
342+
idx, buf = groups[0][0]
343+
assert idx == 0
344+
assert buf is not None
345+
# The iterator completed cleanly without raising.
346+
assert miss_fired.is_set()
347+
348+
290349
@pytest.mark.parametrize(
291350
"byte_range",
292351
[

0 commit comments

Comments
 (0)