Skip to content

Commit f16555d

Browse files
thodson-usgsclaude
andcommitted
refactor(waterdata): Address PR #283 review — relocate chunker helpers, clarify docs
Three review responses bundled together: - chunking.py module docstring: define ``k`` as the candidate filter chunk count before using it in the planner description. - ``QuotaExhausted`` docstring: drop the stale "silently truncate" framing. PR #273 / #279 already raise on a mid-pagination 429, so this exception is the structured-recovery alternative (partial frames in hand) rather than a defense against silent truncation. - Move chunker-only orphans from filters.py to chunking.py: ``_WATERDATA_URL_BYTE_LIMIT`` (the URL byte ceiling), ``_FetchOnce`` TypeVar, ``_combine_chunk_frames``, and ``_combine_chunk_responses``. filters.py was a leftover home from the pre-unification two-decorator stack; these helpers have no callers outside the chunker. Test ``test_multi_value_chunked_lazy_url_limit`` now monkeypatches the constant on its new module. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1348304 commit f16555d

3 files changed

Lines changed: 78 additions & 84 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
- Chunkable dims include multi-value list params (sites, parameter
1313
codes, ...) and the cql-text ``filter`` (split at top-level ``OR``
1414
to keep each chunk valid CQL).
15-
- The planner enumerates candidate filter chunk counts
16-
(``k = 1, 2, 4, ..., total_clauses``); for each, partitions clauses
17-
into ``k`` roughly-balanced groups joined by ``OR``, substitutes the
18-
worst (longest, URL-encoded) group as the filter, then plans list
15+
- For a filter with ``n_clauses`` top-level OR clauses, the planner
16+
enumerates candidate filter chunk counts ``k`` (the number of
17+
sub-filters to split into) at powers of two from 1 through
18+
``n_clauses``. For each ``k``, it partitions clauses into ``k``
19+
roughly-balanced groups joined by ``OR``, substitutes the worst
20+
(longest, URL-encoded) group as the filter, then plans list
1921
chunking by greedy halving. The candidate that minimizes
20-
``list_count × k`` wins.
22+
``list_count × k`` (total sub-request count) wins.
2123
- Sub-chunks of the same list dim never overlap, so frame concat needs
2224
no dedup across list chunks. Filter sub-chunks can match overlapping
2325
records (a row matching both ``a=1`` and ``b=2`` returns from both),
@@ -35,22 +37,23 @@
3537
import itertools
3638
import math
3739
from collections.abc import Callable, Iterator
38-
from typing import Any
40+
from typing import Any, TypeVar
3941
from urllib.parse import quote_plus
4042

4143
import pandas as pd
4244
import requests
4345

44-
from . import filters
4546
from .filters import (
4647
_check_numeric_filter_pitfall,
47-
_combine_chunk_frames,
48-
_combine_chunk_responses,
49-
_FetchOnce,
5048
_is_chunkable,
5149
_split_top_level_or,
5250
)
5351

52+
# Empirically the API replies HTTP 414 above ~8200 bytes of full URL —
53+
# matches nginx's default ``large_client_header_buffers`` of 8 KB. 8000
54+
# leaves ~200 bytes for request-line framing and proxy variance.
55+
_WATERDATA_URL_BYTE_LIMIT = 8000
56+
5457
# Default rule: any list-shaped kwarg with >1 element is chunked across
5558
# sub-requests — each chunk becomes a comma-joined sub-list in the URL.
5659
# The OGC getters expose ~90 such list-shaped params (IDs, codes,
@@ -114,6 +117,12 @@
114117
_OR_SEP = " OR "
115118

116119

120+
_FetchOnce = TypeVar(
121+
"_FetchOnce",
122+
bound=Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]],
123+
)
124+
125+
117126
class RequestTooLarge(ValueError):
118127
"""Raised when a chunked request cannot be issued. Either the URL
119128
exceeds the byte limit even at the smallest reducible plan (every
@@ -125,9 +134,14 @@ class RequestTooLarge(ValueError):
125134
class QuotaExhausted(RuntimeError):
126135
"""Raised mid-chunked-call when the API's reported remaining quota
127136
(``x-ratelimit-remaining`` header) drops below the configured safety
128-
floor. The chunker stops before issuing the next sub-request to
129-
avoid a mid-call HTTP 429 that would silently truncate paginated
130-
results.
137+
floor. The chunker stops before issuing the next sub-request and
138+
surfaces the partial result so callers can resume after the hourly
139+
window resets.
140+
141+
A bare 429 raised by ``_walk_pages`` would also abort the call but
142+
discard the chunks completed so far; this exception is the
143+
structured-recovery alternative, triggered pre-emptively while the
144+
accumulated frames are still in hand.
131145
132146
Attributes
133147
----------
@@ -472,6 +486,49 @@ def _iter_sub_args(
472486
yield base if filter_chunk is None else {**base, "filter": filter_chunk}
473487

474488

489+
def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
490+
"""Concatenate per-chunk frames, dropping empties and deduping by ``id``.
491+
492+
``_get_resp_data`` returns a plain ``pd.DataFrame()`` on empty responses;
493+
concat'ing it with real GeoDataFrames downgrades the result to plain
494+
DataFrame and strips geometry/CRS, so empties are dropped first. Dedup
495+
on the pre-rename feature ``id`` keeps overlapping user OR-clauses from
496+
producing duplicate rows across chunks.
497+
"""
498+
non_empty = [f for f in frames if not f.empty]
499+
if not non_empty:
500+
return pd.DataFrame()
501+
if len(non_empty) == 1:
502+
return non_empty[0]
503+
combined = pd.concat(non_empty, ignore_index=True)
504+
if "id" in combined.columns:
505+
combined = combined.drop_duplicates(subset="id", ignore_index=True)
506+
return combined
507+
508+
509+
def _combine_chunk_responses(
510+
responses: list[requests.Response],
511+
) -> requests.Response:
512+
"""Return one response with the last chunk's headers (for current
513+
rate-limit state) and summed ``elapsed`` (for total wall-clock).
514+
515+
The returned response's ``.url`` is the *first chunk's* URL, which
516+
only reflects the first slice of the user's query. ``_finalize_response``
517+
overwrites ``.url`` with the canonical original-query URL so
518+
``BaseMetadata`` reflects the user's request, not the first sub-chunk.
519+
520+
Mutates the first response in place: ``.headers`` is replaced with
521+
the last response's headers and ``.elapsed`` is accumulated across
522+
all chunks. Downstream reads ``.url``, ``.headers``, and
523+
``.elapsed`` (via ``BaseMetadata``).
524+
"""
525+
head = responses[0]
526+
if len(responses) > 1:
527+
head.headers = responses[-1].headers
528+
head.elapsed = sum((r.elapsed for r in responses[1:]), start=head.elapsed)
529+
return head
530+
531+
475532
def _finalize_response(
476533
responses: list[requests.Response], canonical_url: str
477534
) -> requests.Response:
@@ -491,7 +548,7 @@ def multi_value_chunked(
491548
) -> Callable[[_FetchOnce], _FetchOnce]:
492549
"""Decorator that splits multi-value list params and cql-text
493550
filters across sub-requests so each sub-request URL fits
494-
``url_limit`` bytes (defaults to ``filters._WATERDATA_URL_BYTE_LIMIT``)
551+
``url_limit`` bytes (defaults to ``_WATERDATA_URL_BYTE_LIMIT``)
495552
and the joint cartesian-product plan stays ≤ ``max_chunks``
496553
sub-requests (defaults to ``_DEFAULT_MAX_CHUNKS``). All defaults are
497554
resolved at call time so tests/users that patch the module constants
@@ -522,9 +579,7 @@ def decorator(fetch_once: _FetchOnce) -> _FetchOnce:
522579
def wrapper(
523580
args: dict[str, Any],
524581
) -> tuple[pd.DataFrame, requests.Response]:
525-
limit = (
526-
filters._WATERDATA_URL_BYTE_LIMIT if url_limit is None else url_limit
527-
)
582+
limit = _WATERDATA_URL_BYTE_LIMIT if url_limit is None else url_limit
528583
floor = (
529584
_DEFAULT_QUOTA_SAFETY_FLOOR
530585
if quota_safety_floor is None

dataretrieval/waterdata/filters.py

Lines changed: 3 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66
77
Internal helpers used by ``chunking.multi_value_chunked``'s joint
88
planner: ``_split_top_level_or`` (clause partitioning),
9-
``_is_chunkable`` (filter-language gate), ``_check_numeric_filter_pitfall``
10-
(the lexicographic-comparison guard), ``_combine_chunk_frames`` /
11-
``_combine_chunk_responses`` (aggregation), and the constant
12-
``_WATERDATA_URL_BYTE_LIMIT``.
9+
``_is_chunkable`` (filter-language gate), and
10+
``_check_numeric_filter_pitfall`` (the lexicographic-comparison guard).
1311
1412
Other CQL shapes (``AND``, ``NOT``, ``LIKE``, spatial/temporal
1513
predicates, function calls) are forwarded verbatim — only top-level
@@ -20,19 +18,10 @@
2018
from __future__ import annotations
2119

2220
import re
23-
from collections.abc import Callable
24-
from typing import Any, Literal, TypeVar
25-
26-
import pandas as pd
27-
import requests
21+
from typing import Any, Literal
2822

2923
FILTER_LANG = Literal["cql-text", "cql-json"]
3024

31-
# Empirically the API replies HTTP 414 above ~8200 bytes of full URL —
32-
# matches nginx's default ``large_client_header_buffers`` of 8 KB. 8000
33-
# leaves ~200 bytes for request-line framing and proxy variance.
34-
_WATERDATA_URL_BYTE_LIMIT = 8000
35-
3625

3726
_NUM = r"-?(?:\d+(?:\.\d+)?|\.\d+)(?:[eE][+-]?\d+)?"
3827
_IDENT = r"[A-Za-z_]\w*"
@@ -171,53 +160,3 @@ def _is_chunkable(filter_expr: Any, filter_lang: Any) -> bool:
171160
and bool(filter_expr)
172161
and filter_lang in {None, "cql-text"}
173162
)
174-
175-
176-
def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
177-
"""Concatenate per-chunk frames, dropping empties and deduping by ``id``.
178-
179-
``_get_resp_data`` returns a plain ``pd.DataFrame()`` on empty responses;
180-
concat'ing it with real GeoDataFrames downgrades the result to plain
181-
DataFrame and strips geometry/CRS, so empties are dropped first. Dedup
182-
on the pre-rename feature ``id`` keeps overlapping user OR-clauses from
183-
producing duplicate rows across chunks.
184-
"""
185-
non_empty = [f for f in frames if not f.empty]
186-
if not non_empty:
187-
return pd.DataFrame()
188-
if len(non_empty) == 1:
189-
return non_empty[0]
190-
combined = pd.concat(non_empty, ignore_index=True)
191-
if "id" in combined.columns:
192-
combined = combined.drop_duplicates(subset="id", ignore_index=True)
193-
return combined
194-
195-
196-
def _combine_chunk_responses(
197-
responses: list[requests.Response],
198-
) -> requests.Response:
199-
"""Return one response with the last chunk's headers (for current
200-
rate-limit state) and summed ``elapsed`` (for total wall-clock).
201-
202-
The returned response's ``.url`` is the *first chunk's* URL, which
203-
only reflects the first slice of the user's query. Callers wanting
204-
the canonical original-query URL on ``BaseMetadata`` must overwrite
205-
``.url`` themselves; ``chunking.multi_value_chunked``'s wrapper does
206-
this via ``build_request(**original_args).url``.
207-
208-
Mutates the first response in place: ``.headers`` is replaced with
209-
the last response's headers and ``.elapsed`` is accumulated across
210-
all chunks. Downstream reads ``.url``, ``.headers``, and
211-
``.elapsed`` (via ``BaseMetadata``).
212-
"""
213-
head = responses[0]
214-
if len(responses) > 1:
215-
head.headers = responses[-1].headers
216-
head.elapsed = sum((r.elapsed for r in responses[1:]), start=head.elapsed)
217-
return head
218-
219-
220-
_FetchOnce = TypeVar(
221-
"_FetchOnce",
222-
bound=Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]],
223-
)

tests/waterdata_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
if sys.version_info < (3, 10):
1414
pytest.skip("Skip entire module on Python < 3.10", allow_module_level=True)
1515

16-
from dataretrieval.waterdata import filters as _filters
16+
from dataretrieval.waterdata import chunking as _chunking
1717
from dataretrieval.waterdata import (
1818
get_channel,
1919
get_combined_metadata,
@@ -475,7 +475,7 @@ def fetch(args):
475475

476476

477477
def test_multi_value_chunked_lazy_url_limit(monkeypatch):
478-
"""``url_limit=None`` → resolve filters._WATERDATA_URL_BYTE_LIMIT at call
478+
"""``url_limit=None`` → resolve chunking._WATERDATA_URL_BYTE_LIMIT at call
479479
time, so tests that patch the constant affect this decorator too."""
480480
calls = []
481481

@@ -484,7 +484,7 @@ def fetch(args):
484484
calls.append(args)
485485
return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1))
486486

487-
monkeypatch.setattr(_filters, "_WATERDATA_URL_BYTE_LIMIT", 240)
487+
monkeypatch.setattr(_chunking, "_WATERDATA_URL_BYTE_LIMIT", 240)
488488
# 4 sites of 10 chars → exceeds 240 → planner splits.
489489
fetch({"sites": ["S" * 10 + str(i) for i in range(4)]})
490490
assert len(calls) > 1, "patched constant should drive chunking"

0 commit comments

Comments
 (0)