Skip to content

Commit 22a09c7

Browse files
thodson-usgsclaude
andcommitted
Add multi-value GET-parameter chunker for waterdata OGC API
Wraps _fetch_once with a cartesian-product chunker that sits OUTSIDE @filters.chunked. Splits multi-value list params (monitoring_location_id, parameter_code, statistic_id, etc.) across sub-requests so each URL fits the server's ~8 KB byte limit. Coordination with @filters.chunked: the planner's URL probe substitutes the filter with its longest top-level OR-clause via _filter_aware_probe_args, modeling the per-sub-request URL the inner filter chunker will actually emit. Without this coordination, a long OR-filter plus multi-value lists triggered premature RequestTooLarge even when the combined chunkers would have made things fit. Two safety guards: - max_chunks=1000 cap on cartesian-product size (matches USGS API hourly quota; raises RequestTooLarge with the actual count when exceeded). - QuotaExhausted abort: between sub-requests, reads x-ratelimit-remaining; if below quota_safety_floor (default 50), raises with the partial frame and chunk offset so callers can resume instead of crashing into a mid-call HTTP 429. 30 unit tests cover the planner, filter-aware coordination, the cap, and the quota-aware abort. Live tests in /tmp verify a 3-dim equivalence case (chunked == unchunked, 16 sub-requests, all axes split), 6 edge-case stress scenarios, and 3 mv/filter composition regimes. Depends on #273 (paginated silent-truncation fix) — this PR multiplies the frequency at which the silent-truncation bug class would have surfaced. Merge order: #273 -> #233 -> this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fa78869 commit 22a09c7

3 files changed

Lines changed: 770 additions & 6 deletions

File tree

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
"""Multi-value GET-parameter chunking for the Water Data OGC getters.
2+
3+
PR 233 routes most services through GET with comma-separated values
4+
(e.g. ``monitoring_location_id=USGS-A,USGS-B,...``). Long lists can blow
5+
the server's ~8 KB URL byte limit. This module adds a decorator that
6+
sits OUTSIDE ``filters.chunked`` and splits multi-value list params
7+
across multiple sub-requests so each URL fits.
8+
9+
Design (orthogonal to filter chunking):
10+
11+
- N-dimensional cartesian product: for each chunkable list param, the
12+
values are partitioned into sub-lists; the planner emits the cartesian
13+
product of those partitions. Sub-chunks of the same dim never overlap,
14+
so frame concat needs no dedup across multi-value chunks.
15+
- Greedy halving of the largest chunk in any dim until the worst-case
16+
sub-request URL fits the limit. Minimises total request count.
17+
- Date params, ``bbox``, and ``properties`` are not chunked: dates are
18+
intervals not enumerable sets; bbox is a coord array; ``properties``
19+
determines output schema and chunking it would shard columns.
20+
21+
Coordination with ``filters.chunked``:
22+
The planner probes URL length using the SHORTEST top-level OR-clause
23+
when a chunkable filter is present, not the full filter. ``filters.
24+
chunked`` (inner) will split the filter per sub-request, so probing
25+
with the smallest clause models the per-sub-request URL the stack will
26+
actually produce. Without this, a long OR-filter plus multi-value
27+
lists would trigger a premature ``RequestTooLarge`` even though the
28+
combined chunkers would have made things fit.
29+
"""
30+
31+
from __future__ import annotations
32+
33+
import functools
34+
import itertools
35+
from collections.abc import Callable
36+
from typing import Any, TypeVar
37+
38+
import pandas as pd
39+
import requests
40+
41+
from . import filters
42+
from .filters import (
43+
_combine_chunk_frames,
44+
_combine_chunk_responses,
45+
_is_chunkable,
46+
_split_top_level_or,
47+
)
48+
49+
# Params that look like lists but must NOT be chunked. ``properties`` is
50+
# excluded because it defines the response schema; chunking it would
51+
# return frames with different columns per sub-request. ``bbox`` is a
52+
# fixed 4-element coord tuple. Date params are intervals not sets. The
53+
# CQL ``filter`` (and its ``filter_lang``) is a string that has its own
54+
# inner chunker (``filters.chunked``); if a caller passes ``filter`` as
55+
# a list, treating it as a multi-value param would emit malformed CQL.
56+
_NEVER_CHUNK = frozenset(
57+
{
58+
"properties",
59+
"bbox",
60+
"datetime",
61+
"last_modified",
62+
"begin",
63+
"begin_utc",
64+
"end",
65+
"end_utc",
66+
"time",
67+
"filter",
68+
"filter_lang",
69+
}
70+
)
71+
72+
# Default cap on the number of sub-requests a single chunked call may
73+
# emit. The USGS Water Data API rate-limits each HTTP request (including
74+
# pagination), so the true budget is ``hourly_quota / avg_pages_per_chunk``.
75+
# 1000 matches the default hourly quota and is a reasonable upper bound
76+
# for single-page sub-requests; tune lower if your queries paginate.
77+
# Override per-decorator via ``max_chunks=`` or by monkeypatching this
78+
# module attribute (read lazily in the wrapper).
79+
_DEFAULT_MAX_CHUNKS = 1000
80+
81+
# When ``x-ratelimit-remaining`` drops below this between sub-requests,
82+
# the chunker bails with ``QuotaExhausted`` rather than risk a mid-call
83+
# HTTP 429. Carries the partial result so callers can resume from a
84+
# known offset instead of retrying the whole chunked call from scratch.
85+
_DEFAULT_QUOTA_SAFETY_FLOOR = 50
86+
87+
88+
class RequestTooLarge(ValueError):
89+
"""Raised when a chunked request cannot be issued. Two cases:
90+
(1) URL exceeds the byte limit even with every multi-value param at
91+
a singleton chunk and any chunkable filter reduced to its smallest
92+
top-level OR-clause; (2) the cartesian-product plan would issue more
93+
than ``max_chunks`` sub-requests."""
94+
95+
96+
class QuotaExhausted(RuntimeError):
97+
"""Raised mid-chunked-call when the API's reported remaining quota
98+
(``x-ratelimit-remaining`` header) drops below the configured safety
99+
floor. The chunker stops before issuing the next sub-request to
100+
avoid a mid-call HTTP 429 that would silently truncate paginated
101+
results (see PR #273 for the pagination side of that bug).
102+
103+
The exception carries everything needed to resume: the combined
104+
partial frame from completed sub-requests, the metadata for the
105+
last successful sub-request, the number of chunks completed out of
106+
the plan total, and the last-observed ``remaining`` value.
107+
108+
Attributes
109+
----------
110+
partial_frame : pd.DataFrame
111+
Concatenated, deduplicated result of every sub-request that
112+
completed before the floor was crossed.
113+
partial_response : requests.Response
114+
Aggregated response (URL/headers of the first sub-request,
115+
summed ``elapsed``). Wrap in ``BaseMetadata`` to surface to
116+
the caller alongside the partial frame.
117+
completed_chunks : int
118+
Number of sub-requests successfully completed.
119+
total_chunks : int
120+
Total sub-requests in the cartesian-product plan.
121+
remaining : int
122+
Last observed ``x-ratelimit-remaining`` value.
123+
"""
124+
125+
def __init__(
126+
self,
127+
*,
128+
partial_frame: pd.DataFrame,
129+
partial_response: requests.Response,
130+
completed_chunks: int,
131+
total_chunks: int,
132+
remaining: int,
133+
) -> None:
134+
super().__init__(
135+
f"x-ratelimit-remaining dropped to {remaining} after "
136+
f"{completed_chunks}/{total_chunks} chunks; aborting to avoid "
137+
f"mid-call HTTP 429. Catch QuotaExhausted to access "
138+
f".partial_frame and resume from chunk {completed_chunks}."
139+
)
140+
self.partial_frame = partial_frame
141+
self.partial_response = partial_response
142+
self.completed_chunks = completed_chunks
143+
self.total_chunks = total_chunks
144+
self.remaining = remaining
145+
146+
147+
def _chunkable_params(args: dict[str, Any]) -> dict[str, list]:
148+
"""Return ``{name: list(values)}`` for every list/tuple kwarg with
149+
>1 element that is allowed to chunk."""
150+
return {
151+
k: list(v)
152+
for k, v in args.items()
153+
if k not in _NEVER_CHUNK and isinstance(v, (list, tuple)) and len(v) > 1
154+
}
155+
156+
157+
def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]:
158+
"""Substitute the filter with its shortest top-level OR-clause if the
159+
filter is chunkable, otherwise return ``args`` unchanged.
160+
161+
The inner ``filters.chunked`` decorator will reduce the filter per
162+
sub-request to at most one OR-clause (its hard floor — see
163+
``_chunk_cql_or``). Probing with that minimum models the per-sub-
164+
request URL the decorator stack will actually emit, so we don't
165+
plan around bytes the filter chunker has already promised to remove.
166+
"""
167+
filter_expr = args.get("filter")
168+
filter_lang = args.get("filter_lang")
169+
if not _is_chunkable(filter_expr, filter_lang):
170+
return args
171+
parts = _split_top_level_or(filter_expr)
172+
if len(parts) < 2:
173+
return args # one-clause filter — filter chunker can't shrink it
174+
return {**args, "filter": min(parts, key=len)}
175+
176+
177+
def _worst_case_args(
178+
probe_args: dict[str, Any], plan: dict[str, list[list]]
179+
) -> dict[str, Any]:
180+
"""Args dict using the LARGEST chunk from each dim — represents the
181+
most byte-heavy sub-request the plan will issue, with the filter
182+
already reduced to its filter-chunker floor."""
183+
out = dict(probe_args)
184+
for k, chunks in plan.items():
185+
out[k] = max(chunks, key=lambda c: len(",".join(map(str, c))))
186+
return out
187+
188+
189+
def _plan_chunks(
190+
args: dict[str, Any],
191+
build_request: Callable[..., Any],
192+
url_limit: int,
193+
max_chunks: int = _DEFAULT_MAX_CHUNKS,
194+
) -> dict[str, list[list]] | None:
195+
"""Greedy halving until the worst-case sub-request URL fits.
196+
197+
Returns ``None`` when no chunking is needed (request as-is fits or
198+
no chunkable lists). Raises ``RequestTooLarge`` when:
199+
- every multi-value param is already a singleton chunk AND the
200+
filter (if any) is already at its smallest OR-clause and the URL
201+
still exceeds ``url_limit`` (irreducible), or
202+
- the converged cartesian-product plan would issue more than
203+
``max_chunks`` sub-requests (hourly API budget).
204+
"""
205+
chunkable = _chunkable_params(args)
206+
if not chunkable:
207+
return None
208+
probe_args = _filter_aware_probe_args(args)
209+
if len(build_request(**probe_args).url) <= url_limit:
210+
return None
211+
212+
plan: dict[str, list[list]] = {k: [v] for k, v in chunkable.items()}
213+
214+
while True:
215+
worst = _worst_case_args(probe_args, plan)
216+
if len(build_request(**worst).url) <= url_limit:
217+
break
218+
219+
# Find the single biggest chunk across all dims and halve it.
220+
best: tuple[str, int, int] | None = None # (dim, chunk_index, size)
221+
for dim, dim_chunks in plan.items():
222+
for idx, chunk in enumerate(dim_chunks):
223+
if len(chunk) <= 1:
224+
continue
225+
size = len(",".join(map(str, chunk)))
226+
if best is None or size > best[2]:
227+
best = (dim, idx, size)
228+
229+
if best is None:
230+
raise RequestTooLarge(
231+
f"Request URL exceeds {url_limit} bytes even with every "
232+
f"multi-value parameter at a singleton chunk and any "
233+
f"chunkable filter reduced to one OR-clause. Reduce the "
234+
f"number of values or split the call manually."
235+
)
236+
dim, idx, _ = best
237+
big = plan[dim][idx]
238+
mid = len(big) // 2
239+
plan[dim] = plan[dim][:idx] + [big[:mid], big[mid:]] + plan[dim][idx + 1 :]
240+
241+
total = 1
242+
for chunks in plan.values():
243+
total *= len(chunks)
244+
if total > max_chunks:
245+
raise RequestTooLarge(
246+
f"Chunked plan would issue {total} sub-requests, exceeding "
247+
f"max_chunks={max_chunks} (USGS API's default hourly rate "
248+
f"limit per key). Reduce input list sizes, narrow the time "
249+
f"window, or raise max_chunks if you have a higher quota."
250+
)
251+
return plan
252+
253+
254+
_FetchOnce = TypeVar(
255+
"_FetchOnce",
256+
bound=Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]],
257+
)
258+
259+
260+
def _read_remaining(response: requests.Response) -> int:
261+
"""Parse ``x-ratelimit-remaining`` from a response. Missing or
262+
malformed header → return a large sentinel so the safety check
263+
treats it as 'plenty of quota' (don't abort on header glitches)."""
264+
raw = response.headers.get("x-ratelimit-remaining")
265+
if raw is None:
266+
return 10**9
267+
try:
268+
return int(raw)
269+
except (TypeError, ValueError):
270+
return 10**9
271+
272+
273+
def multi_value_chunked(
274+
*,
275+
build_request: Callable[..., Any],
276+
url_limit: int | None = None,
277+
max_chunks: int | None = None,
278+
quota_safety_floor: int | None = None,
279+
) -> Callable[[_FetchOnce], _FetchOnce]:
280+
"""Decorator that splits multi-value list params across sub-requests so
281+
each URL fits ``url_limit`` bytes (defaults to ``filters._WATERDATA_
282+
URL_BYTE_LIMIT``) and the cartesian-product plan stays ≤ ``max_chunks``
283+
sub-requests (defaults to ``_DEFAULT_MAX_CHUNKS``). All defaults are
284+
resolved at call time so tests/users that patch the module constants
285+
affect this decorator uniformly.
286+
287+
Between sub-requests the wrapper reads ``x-ratelimit-remaining`` from
288+
each response. If it drops below ``quota_safety_floor`` (default
289+
``_DEFAULT_QUOTA_SAFETY_FLOOR``), the wrapper raises ``QuotaExhausted``
290+
carrying the combined partial result and the chunk offset so callers
291+
can resume after the hourly window resets, instead of crashing into
292+
a mid-pagination HTTP 429 (which the upstream pagination loop in
293+
``_walk_pages`` historically truncated silently — see PR #273).
294+
295+
Sits OUTSIDE ``@filters.chunked``: list-chunking is the outer loop,
296+
filter-chunking is the inner loop. The wrapped function has the same
297+
signature as ``filters.chunked`` expects — ``(args: dict) -> (frame,
298+
response)`` — so the two decorators compose cleanly. The planner is
299+
filter-aware so it doesn't raise prematurely when the inner filter
300+
chunker would have shrunk the per-sub-request URL on its own.
301+
"""
302+
303+
def decorator(fetch_once: _FetchOnce) -> _FetchOnce:
304+
@functools.wraps(fetch_once)
305+
def wrapper(
306+
args: dict[str, Any],
307+
) -> tuple[pd.DataFrame, requests.Response]:
308+
limit = (
309+
url_limit
310+
if url_limit is not None
311+
else filters._WATERDATA_URL_BYTE_LIMIT
312+
)
313+
cap = max_chunks if max_chunks is not None else _DEFAULT_MAX_CHUNKS
314+
floor = (
315+
quota_safety_floor
316+
if quota_safety_floor is not None
317+
else _DEFAULT_QUOTA_SAFETY_FLOOR
318+
)
319+
plan = _plan_chunks(args, build_request, limit, cap)
320+
if plan is None:
321+
return fetch_once(args)
322+
323+
keys = list(plan)
324+
total = 1
325+
for k in keys:
326+
total *= len(plan[k])
327+
frames: list[pd.DataFrame] = []
328+
responses: list[requests.Response] = []
329+
for i, combo in enumerate(itertools.product(*(plan[k] for k in keys))):
330+
sub_args = {**args, **dict(zip(keys, combo))}
331+
frame, response = fetch_once(sub_args)
332+
frames.append(frame)
333+
responses.append(response)
334+
# Quota check happens BETWEEN sub-requests: skip on the
335+
# last iteration because there's nothing left to abort.
336+
if i < total - 1:
337+
remaining = _read_remaining(response)
338+
if remaining < floor:
339+
raise QuotaExhausted(
340+
partial_frame=_combine_chunk_frames(frames),
341+
partial_response=_combine_chunk_responses(responses),
342+
completed_chunks=i + 1,
343+
total_chunks=total,
344+
remaining=remaining,
345+
)
346+
347+
return (
348+
_combine_chunk_frames(frames),
349+
_combine_chunk_responses(responses),
350+
)
351+
352+
return wrapper # type: ignore[return-value]
353+
354+
return decorator

dataretrieval/waterdata/utils.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from dataretrieval import __version__
1616
from dataretrieval.utils import BaseMetadata
17-
from dataretrieval.waterdata import filters
17+
from dataretrieval.waterdata import chunking, filters
1818
from dataretrieval.waterdata.types import (
1919
PROFILE_LOOKUP,
2020
PROFILES,
@@ -912,17 +912,20 @@ def get_ogc_data(
912912
return return_list, BaseMetadata(response)
913913

914914

915+
@chunking.multi_value_chunked(build_request=_construct_api_requests)
915916
@filters.chunked(build_request=_construct_api_requests)
916917
def _fetch_once(
917918
args: dict[str, Any],
918919
) -> tuple[pd.DataFrame, requests.Response]:
919920
"""Send one prepared-args OGC request; return the frame + response.
920921
921-
Filter chunking is added orthogonally by the ``@filters.chunked``
922-
decorator: with no filter (or an un-chunkable one) the decorator
923-
passes ``args`` through to this body; with a chunkable filter it
924-
fans out and calls this body once per sub-filter, then combines.
925-
Either way the return shape is ``(frame, response)``.
922+
Two orthogonal chunkers wrap this body. ``@chunking.multi_value_chunked``
923+
(outer) splits multi-value list params (e.g. ``monitoring_location_id``)
924+
across sub-requests so each URL fits the server byte limit; the
925+
cartesian product of per-dim chunks is iterated. ``@filters.chunked``
926+
(inner) splits long cql-text filters at top-level ``OR``. With no
927+
chunkable inputs both pass through unchanged. Either way the return
928+
shape is ``(frame, response)``.
926929
"""
927930
req = _construct_api_requests(**args)
928931
return _walk_pages(geopd=GEOPANDAS, req=req)

0 commit comments

Comments
 (0)