Skip to content

Commit 6aa6f4b

Browse files
d-v-bclaude
andcommitted
test(core): verify consumer-break cancels pending fetches
Drives many slow ranges with a small max_concurrency, breaks out of the async-for after the first yield, and verifies that at least one still-running fetch was cancelled rather than being left to run to completion. Cancellation is observed via a counter in the fetch's CancelledError branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5b1d8cd commit 6aa6f4b

1 file changed

Lines changed: 42 additions & 0 deletions

File tree

tests/test_coalesce.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,48 @@ async def test_key_missing_on_uncoalescable_input_yields_nothing(
365365
assert groups == []
366366

367367

368+
async def test_consumer_break_cancels_pending_fetches() -> None:
369+
# Kick off many slow ranges with small max_concurrency, break after the
370+
# first yielded group, and verify the remaining tasks are cancelled rather
371+
# than allowed to run to completion.
372+
completed_calls = 0
373+
cancelled_calls = 0
374+
375+
async def fetch(byte_range: ByteRequest | None) -> Buffer | None:
376+
nonlocal completed_calls, cancelled_calls
377+
assert isinstance(byte_range, RangeByteRequest)
378+
start = byte_range.start
379+
try:
380+
# First fetch returns fast so the async for body runs and can break.
381+
# Later fetches sleep long enough that cancellation has room to land.
382+
await asyncio.sleep(0.001 if start == 0 else 2.0)
383+
except asyncio.CancelledError:
384+
cancelled_calls += 1
385+
raise
386+
completed_calls += 1
387+
return _buf(b"x")
388+
389+
opts: CoalesceOptions = {
390+
"max_gap_bytes": -1, # no merging
391+
"max_coalesced_bytes": 1 << 20,
392+
"max_concurrency": 3,
393+
}
394+
ranges: list[ByteRequest | None] = [RangeByteRequest(i * 1000, i * 1000 + 1) for i in range(6)]
395+
396+
agen = coalesced_get(fetch, ranges, options=opts)
397+
# Break after receiving the first yield.
398+
async for _group in agen:
399+
break
400+
# Make sure the async generator is fully closed so its finally runs.
401+
await agen.aclose()
402+
403+
# At least one slow fetch was actually running under the semaphore and got
404+
# cancelled (rather than running to completion).
405+
assert cancelled_calls >= 1
406+
# And the first range's fetch completed normally (no spurious cancels there).
407+
assert completed_calls >= 1
408+
409+
368410
async def test_fetch_raises_propagates() -> None:
369411
fetch = FakeFetch(
370412
b"x" * 100,

0 commit comments

Comments
 (0)