Commit 57ff5bd
feat(waterdata): Add async parallel chunker over httpx.AsyncClient
Adds a parallel fan-out path to `multi_value_chunked`. When
`API_USGS_CONCURRENT` resolves to >1 (default: 16), the decorator
runs the sub-requests of an over-budget plan concurrently under
one shared `httpx.AsyncClient`, instead of issuing them serially.
Falls back to the serial sync path (with a one-time UserWarning)
when no async fetch sibling is wired or when an asyncio event
loop is already running (Jupyter, IPython, async apps —
`asyncio.run` would otherwise raise).
Architecture (`dataretrieval/waterdata/chunking.py`):
* `_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent)`
is the orchestrator. Probe-first: issues sub-request 0 alone via
`_probe_first` so its `x-ratelimit-remaining` header gates the
rest of the plan; if the window can't fit, raises
`RequestExceedsQuota` before the burst goes out. Then
`_fan_out_rest` dispatches indices 1..N-1 concurrently via
`asyncio.gather(return_exceptions=True)`. Completed pairs survive
a sibling's transient failure, so partial state stays recoverable
through `ChunkedCall.resume()` on the sync path.
* Failure precedence in `_fan_out_rest`:
1. Cancellation/interrupt signals (CancelledError,
KeyboardInterrupt, SystemExit) propagate unmodified — never
wrapped as transients. Cancellation is asyncio's abort
signal; rewriting it as ChunkInterrupted would silently
consume the user's stop request.
2. Recognized transients (RateLimited, ServiceUnavailable, bare
httpx.HTTPError) wrap as ChunkInterrupted so the user gets
a resumable handle even when a non-transient bug landed
earlier in submission order.
3. Otherwise raise the first failure in submission order,
preserving its type.
* `_execute_in_parallel` owns the sync→async bridge:
`asyncio.run` dispatch with the `fetch_async is None` and
running-event-loop fallbacks (one-time UserWarning + serial).
* `_publish_async_session` / `get_active_async_session` /
`_chunked_async_session` ContextVar let async paginated-loop
helpers (`_walk_pages_async`, `_paginate_async`) reuse one
`AsyncClient` connection pool across every concurrent
sub-request.
Wiring (`dataretrieval/waterdata/utils.py`):
* `_walk_pages_async`, `_paginate_async`, `_async_session`,
`_fetch_once_async` — async siblings of the sync paginate path.
* The `@chunking.multi_value_chunked(fetch_async=_fetch_once_async)`
decorator on `_fetch_once` wires the async sibling so the
parallel path is available to every Water Data OGC getter.
Concurrency cap (`API_USGS_CONCURRENT`):
* Integer N >= 1: bounded fan-out (semaphore-gated, N=1 forces
serial sync). Default 16 — the server-friendly sweet spot.
* `unbounded`: no per-call cap (`Semaphore(sys.maxsize)`).
* Unset: default 16.
Tests: 6 async-path tests in `tests/waterdata_chunking_test.py`
(one-call-per-sub-request, probe quota check, mid-fan-out
transient yields resumable ChunkInterrupted, fallback-to-serial
parametrized over running-loop and missing-fetch_async,
cancellation-wins-over-transient-sibling regression).
`tests/waterdata_progress_test.py` adds a progress-reporter
integration test for `_fan_out_async` and a `_paginate_async`
test. `tests/waterdata_utils_test.py` adds a `_walk_pages_async`
initial-parse-error test.
Test count: 420 passing (mocked), 2 skipped, ruff clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>1 parent d6bf7bb commit 57ff5bd
5 files changed
Lines changed: 859 additions & 9 deletions
File tree
- dataretrieval/waterdata
- tests
0 commit comments