Skip to content

Commit f7960c2

Browse files
thodson-usgsclaude
andcommitted
feat(waterdata): async parallel chunker over httpx.AsyncClient
Multi-value Water Data queries (many monitoring locations, many parameter codes, large CQL2 filters) can exceed the server's ~8 KB URL/body limit and need to be split into multiple sub-requests. This adds an ``async``-only chunker that: * Plans the fan-out: every multi-value list parameter and the cql-text ``filter`` (along its top-level ``OR`` clauses) is modeled as a chunkable axis; ``ChunkPlan`` greedy-halves the biggest axis until every sub-request URL fits the byte budget, then iterates the cartesian product. * Dispatches concurrently: ``ChunkedCall._run`` gathers every pending sub-request through one shared ``httpx.AsyncClient`` with the connection pool sized from ``API_USGS_CONCURRENT`` (default 16; ``unbounded`` removes the cap). A single ``anyio`` blocking portal lets the sync facade work from inside event loops (Jupyter, async apps). * Survives transients: typed ``RateLimited`` (429) and ``ServiceUnavailable`` (5xx) trigger bounded retry-with-backoff (``API_USGS_RETRIES`` default 4; full jitter; honors ``Retry-After`` up to a 60 s cap). Anything still failing escalates to a resumable ``ChunkInterrupted`` subclass carrying ``.call`` — call ``.call.resume()`` once the underlying condition clears; only the still-pending sub-requests are re-issued. * Combines and finalizes: the OGC getters inject ``utils._finalize_ogc`` (type coercion, column arrangement, ``max_rows`` truncation, ``BaseMetadata``) through the chunker's ``finalize`` hook so a successful first call and a resumed call yield the same shape. Surfaces and integration: * ``dataretrieval/waterdata/chunking.py`` (new module): ``RetryPolicy``, ``ChunkPlan``, ``ChunkedCall``, ``ChunkInterrupted`` / ``QuotaExhausted`` / ``ServiceInterrupted``, ``multi_value_chunked`` decorator. * ``utils._fetch_once`` is the decorated async fetcher; pagination helpers ``_paginate`` / ``_walk_pages`` are async and share the chunker's client via a ``ContextVar``. * ``api.get_reference_table(... max_rows=...)``: new preview cap. * ``_progress``: per-call status line (chunk count, pages, rows, rate-limit remaining); ``API_USGS_PROGRESS`` opt-in/off. Deps: add ``geopandas>=0.10`` + ``mapclassify`` to ``[doc]`` extras so ``WaterData_demo.ipynb``'s ``.set_crs().explore()`` cell executes (the plain-pandas frame lacks ``.set_crs``). Tests: full async chunker suite (planning, retry taxonomy, resume, client-sharing, progress reporter, finalize injection) + live-API regression tests covering every public getter. 298 offline + 63 live tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 906c859 commit f7960c2

11 files changed

Lines changed: 2034 additions & 447 deletions

dataretrieval/waterdata/_progress.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ def __init__(
121121
# The hourly request quota (``x-ratelimit-limit``), shown as the
122122
# denominator when the server reports it.
123123
self.rate_limit: str | None = None
124+
# Transient note shown while a sub-request backs off before a
125+
# retry; cleared by the next page/chunk so it doesn't linger.
126+
self.retry_note: str | None = None
124127
self._last_len = 0
125128
# Whether anything was actually written to the stream — drives whether
126129
# close() needs a terminating newline. (``current_chunk`` is a poor
@@ -140,13 +143,33 @@ def start_chunk(self, index: int) -> None:
140143
avoids a premature "0 pages" frame before the first page arrives.
141144
"""
142145
self.current_chunk = index
146+
self.retry_note = None
143147
if self.total_chunks > 1:
144148
self._render()
145149

146150
def add_page(self, rows: int = 0) -> None:
147151
"""Record one fetched page carrying ``rows`` rows and redraw."""
148152
self.pages += 1
149153
self.rows += int(rows)
154+
self.retry_note = None
155+
self._render()
156+
157+
def note_retry(self, *, attempt: int, wait: float) -> None:
158+
"""Show that a sub-request is backing off before retry ``attempt``.
159+
160+
Cleared by the next :meth:`add_page` / :meth:`start_chunk` (or by
161+
:meth:`close`) so the line returns to normal once the retry resolves.
162+
"""
163+
# Keep sub-second waits explicit (avoid misleading ``0s``) while
164+
# rendering whole-second waits without unnecessary ``.0`` noise.
165+
# ``float()`` to support Python 3.9-3.11: ``round(int, 1)`` returns an
166+
# int and ``int.is_integer()`` (used below) only exists on 3.12+.
167+
wait_1dp = round(float(wait), 1)
168+
if wait_1dp < 1 or not wait_1dp.is_integer():
169+
secs = f"{wait_1dp:.1f}s"
170+
else:
171+
secs = f"{wait_1dp:.0f}s"
172+
self.retry_note = f"retrying (attempt {attempt}, waiting {secs})"
150173
self._render()
151174

152175
def set_rate_remaining(
@@ -179,6 +202,8 @@ def _format(self) -> str:
179202
else:
180203
segment = f"{remaining} requests remaining"
181204
parts.append(segment)
205+
if self.retry_note is not None:
206+
parts.append(self.retry_note)
182207
if self.service:
183208
return f"Retrieving: {self.service} · " + " · ".join(parts)
184209
return "Progress: " + " · ".join(parts)
@@ -209,6 +234,13 @@ def close(self) -> None:
209234
"""
210235
if self._closed:
211236
return
237+
# A retry note set during the final backoff would otherwise freeze as
238+
# the persisted last line of a call that has since completed or given
239+
# up; clear it and redraw (while still un-closed, so ``_render`` runs)
240+
# so the final state isn't a stale "retrying".
241+
if self.enabled and self._rendered and self.retry_note is not None:
242+
self.retry_note = None
243+
self._render()
212244
self._closed = True
213245
if not (self.enabled and self._rendered):
214246
return

dataretrieval/waterdata/api.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2022,6 +2022,7 @@ def get_reference_table(
20222022
collection: str,
20232023
limit: int | None = None,
20242024
query: dict | None = None,
2025+
max_rows: int | None = None,
20252026
) -> tuple[pd.DataFrame, BaseMetadata]:
20262027
"""Get metadata reference tables for the USGS Water Data API.
20272028
@@ -2046,6 +2047,12 @@ def get_reference_table(
20462047
query: dictionary, optional
20472048
The optional args parameter can be used to pass a dictionary of
20482049
query parameters to the collection API call.
2050+
max_rows : int, optional
2051+
Cap the total number of rows returned, stopping pagination early
2052+
instead of downloading the whole table. Useful for cheaply
2053+
previewing large tables (e.g. ``hydrologic-unit-codes`` has ~125k
2054+
rows). Unlike ``limit`` (the per-page size), this bounds the total
2055+
result. The default (None) downloads every page.
20492056
20502057
Returns
20512058
-------
@@ -2092,7 +2099,9 @@ def get_reference_table(
20922099
query_args = dict(query) if query else {}
20932100
if limit is not None:
20942101
query_args["limit"] = limit
2095-
return get_ogc_data(args=query_args, output_id=output_id, service=collection)
2102+
return get_ogc_data(
2103+
args=query_args, output_id=output_id, service=collection, max_rows=max_rows
2104+
)
20962105

20972106

20982107
def get_codes(code_service: CODE_SERVICES) -> pd.DataFrame:

0 commit comments

Comments
 (0)