Skip to content

Commit 88fd692

Browse files
thodson-usgsclaude
andcommitted
fix(chunking): Propagate CancelledError ahead of transient siblings
`_fan_out_rest` previously walked `failures` looking for the first recognized transient and wrapped it as `ChunkInterrupted`. If a sibling sub-request happened to raise `asyncio.CancelledError` alongside (or after) a transient HTTPError, the transient was classified first and the cancellation was silently consumed — the user's stop signal got rewritten as a "retry me" handle. Adds a pre-walk that propagates any non-`Exception` BaseException (`CancelledError`, `KeyboardInterrupt`, `SystemExit`) unmodified before the transient-classification walk runs. asyncio's cancellation contract takes precedence over the chunker's retry semantics. Regression test: `test_async_fan_out_cancellation_wins_over_transient_sibling` deterministically sequences a 3-call fan-out (probe + transient + CancelledError) by relying on gather's submission-order scheduling when the inner fetch_async has no awaits. Pre-fix the test got `ServiceInterrupted`; post-fix it gets `CancelledError`. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 6028dd5 commit 88fd692

2 files changed

Lines changed: 54 additions & 6 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,18 +1462,29 @@ async def _fan_out_rest(
14621462
14631463
Completed pairs survive a sibling's transient failure via
14641464
``return_exceptions=True``, so the partial result stays
1465-
recoverable through :meth:`ChunkedCall.resume`. On any failure,
1466-
prefer raising the first *recognized transient* — so the user
1467-
still gets a resumable :class:`ChunkInterrupted` even when a
1468-
non-transient bug landed earlier in submission order. Fall back
1469-
to the first failure (preserving its type) when nothing is
1470-
transient.
1465+
recoverable through :meth:`ChunkedCall.resume`. Failure
1466+
precedence, in order:
1467+
1468+
1. **Cancellation / interrupt signals** (``asyncio.CancelledError``,
1469+
``KeyboardInterrupt``, ``SystemExit`` — anything that isn't an
1470+
:class:`Exception` subclass) propagate unmodified. Wrapping
1471+
them as a transient :class:`ChunkInterrupted` would silently
1472+
swallow the user's stop signal in favor of a resumable handle.
1473+
2. **Recognized transients** (``RateLimited``, ``ServiceUnavailable``,
1474+
bare ``httpx.HTTPError``) wrap as :class:`ChunkInterrupted`, so
1475+
the user still gets a resumable handle even when a non-transient
1476+
failure landed earlier in submission order.
1477+
3. **Otherwise** raise the first failure (submission order),
1478+
preserving its type.
14711479
"""
14721480
results = await asyncio.gather(
14731481
*(track(i, args) for i, args in enumerate(sub_args_rest, start=1)),
14741482
return_exceptions=True,
14751483
)
14761484
failures = [r for r in results if isinstance(r, BaseException)]
1485+
for exc in failures:
1486+
if not isinstance(exc, Exception):
1487+
raise exc
14771488
for exc in failures:
14781489
if (interrupted := call.wrap_failure(exc)) is not None:
14791490
raise interrupted from exc

tests/waterdata_chunking_test.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,6 +1464,43 @@ def fetch(args):
14641464
assert len(df) == len(sync_calls)
14651465

14661466

1467+
def test_async_fan_out_cancellation_wins_over_transient_sibling(monkeypatch):
1468+
"""``asyncio.CancelledError`` raised by any sub-request must
1469+
propagate unmodified, even when a sibling raises a recognized
1470+
transient (which would otherwise wrap as a resumable
1471+
:class:`ChunkInterrupted`). Cancellation is asyncio's abort
1472+
signal — letting a transient-classification path consume it
1473+
would silently swallow the user's stop request.
1474+
1475+
fetch_async has no ``await`` inside its body, so gather schedules
1476+
the tasks in submission order and each runs synchronously to its
1477+
raise — making ``call_count`` deterministic for this test:
1478+
1 = probe, 2 = first fan-out task (transient), 3 = second
1479+
fan-out task (cancellation).
1480+
"""
1481+
call_count = {"async": 0}
1482+
1483+
async def fetch_async(args):
1484+
call_count["async"] += 1
1485+
if call_count["async"] == 1:
1486+
return pd.DataFrame({"id": [_atom_id(args)]}), _ok_response(remaining=99)
1487+
if call_count["async"] == 2:
1488+
raise ServiceUnavailable("503: transient sibling")
1489+
if call_count["async"] == 3:
1490+
raise asyncio.CancelledError("user cancel")
1491+
return pd.DataFrame({"id": [_atom_id(args)]}), _ok_response(remaining=99)
1492+
1493+
fetch = _async_chunked_fetch(monkeypatch, fetch_async)
1494+
1495+
# 8 × 20-byte sites force the planner to >=3 sub-args under
1496+
# url_limit=240, so the fan-out gather sees at least the
1497+
# transient (call 2) AND the cancellation (call 3).
1498+
sites = [f"S{i}" * 10 for i in range(1, 9)]
1499+
1500+
with pytest.raises(asyncio.CancelledError):
1501+
fetch({"sites": sites})
1502+
1503+
14671504
def test_combine_chunk_responses_does_not_mutate_input_urls():
14681505
"""Regression for the _set_response_url aliasing bug.
14691506

0 commit comments

Comments
 (0)