Skip to content

Commit 7850186

Browse files
thodson-usgsclaude
andcommitted
refactor(waterdata): Hone OO shape — ChunkPlan.__init__, _ChunkExecutor, axis-symmetric docstring
Addresses three PR DOI-USGS#283 review comments: - **Module docstring reframed for axis symmetry.** The previous text read as "filter is the outer loop, list dims are inner," which obscured that both axis kinds are chunkable dimensions. The new framing leads with "every multi-value list parameter and the filter are chunkable axes" and explains *why* the algorithm enumerates filter counts in the outer loop (filter chunking is discrete in OR-clause cardinality; list dims are continuously halvable) rather than presenting the asymmetry as arbitrary. - **``ChunkPlan.from_args`` → ``ChunkPlan.__init__``.** Now that the passthrough case is just a trivial plan (never ``None``), the classmethod-constructor pattern was unjustified. ``__init__`` does the planning directly: ``ChunkPlan(args, build_request, url_limit)`` reads as "construct a plan for these args." Dropped ``@dataclass``; the fields are still simple attributes, just assigned in ``__init__``. Extracted the search loop to a free helper ``_search_best_chunking`` so ``__init__`` stays readable. - **``_ChunkExecution`` → ``_ChunkExecutor``.** Classes should be nouns; "Execution" reads as an event, "Executor" as an actor. Pairs cleanly with ``ChunkPlan`` — the plan is the recipe, the executor runs it. The wrapper is unchanged in shape: return ChunkPlan(args, build_request, limit).execute(fetch_once) Tests updated to use the direct constructor; all 145 unit tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5d931fa commit 7850186

2 files changed

Lines changed: 129 additions & 141 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 122 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11
"""Joint URL-byte chunking for the Water Data OGC getters.
22
3-
Long multi-value list params (sites, parameter codes, ...) and long
4-
top-level-``OR`` CQL filters independently risk overflowing the
5-
server's ~8 KB URL byte limit. ``multi_value_chunked`` builds a
6-
``ChunkPlan`` that plans both chunking dimensions together, picks the
7-
allocation that minimizes total sub-requests, and iterates the joint
8-
cartesian product so every sub-request URL fits. Requests that
9-
already fit get a trivial single-step plan — the wrapper has one code
10-
path either way.
11-
12-
Planning: for a filter with ``n_clauses`` top-level OR clauses, try
13-
candidate filter chunk counts ``k = 1, 2, 4, ..., n_clauses``. For
14-
each, partition clauses into ``k`` count-balanced groups joined by
15-
``OR``, take the longest (URL-encoded) group as the worst-case filter,
16-
then plan list-dim chunking by greedy halving against the remaining
17-
budget. Keep the candidate with the smallest ``list_count × k``.
18-
19-
Quota: after the first sub-request the execution reads
3+
A Water Data query has several chunkable axes — every multi-value list
4+
parameter (sites, parameter codes, …) and the cql-text ``filter``
5+
(splittable along its top-level OR clauses) — each of which can fan
6+
the URL past the server's ~8 KB byte limit. ``ChunkPlan`` picks a
7+
fan-out for each axis that minimizes total sub-requests under the URL
8+
budget; ``_ChunkExecutor`` iterates the joint cartesian product so
9+
every sub-request URL fits. Requests that already fit get a trivial
10+
single-step plan — the executor has one code path either way.
11+
12+
Planning treats the two axis kinds symmetrically as "dimensions to
13+
split," but their cardinalities differ: list dims can be halved
14+
continuously, while filter chunking is discrete in OR-clause
15+
cardinality (you can only split into whole-clause groups). The
16+
planner therefore enumerates candidate filter chunk counts
17+
(``k = 1, 2, 4, …, n_clauses``); for each, it commits the worst-case
18+
(longest, URL-encoded) clause group as the filter and greedy-halves
19+
the list dims against the remaining budget. The candidate with the
20+
smallest total sub-request count (``list_count × k``) wins.
21+
22+
Quota: after the first sub-request the executor reads
2023
``x-ratelimit-remaining``; if the rest of the plan won't fit, it
2124
raises ``RequestExceedsQuota`` before burning more budget. A 429
2225
on any sub-request surfaces as ``QuotaExhausted`` carrying whatever
@@ -36,7 +39,6 @@
3639
import itertools
3740
import math
3841
from collections.abc import Callable, Iterator
39-
from dataclasses import dataclass
4042
from typing import Any
4143
from urllib.parse import quote_plus
4244

@@ -364,17 +366,62 @@ def _filter_candidates(
364366
yield [_OR_SEP.join(g) for g in groups], _OR_SEP.join(worst)
365367

366368

367-
@dataclass(frozen=True)
368-
class ChunkPlan:
369-
"""A precomputed strategy for issuing one user-level request as a
370-
sequence of sub-requests whose URLs each fit ``url_limit``.
369+
def _search_best_chunking(
370+
args: dict[str, Any],
371+
build_request: Callable[..., Any],
372+
url_limit: int,
373+
clauses: list[str],
374+
filter_expr: str | None,
375+
) -> tuple[dict[str, list[list[Any]]], list[str | None]]:
376+
"""Enumerate filter chunk counts and greedy-halve list dims for each;
377+
return the ``(list_chunks, filter_chunks)`` pair with the smallest
378+
total sub-request count. Raises ``RequestTooLarge`` if no candidate
379+
fits ``url_limit``."""
380+
best: tuple[int, dict[str, list[list[Any]]], list[str | None]] | None = None
381+
last_error: RequestTooLarge | None = None
382+
383+
for filter_chunks, worst_filter in _filter_candidates(clauses, filter_expr):
384+
plan_args = (
385+
args if worst_filter is None else {**args, _FILTER_KEY: worst_filter}
386+
)
387+
try:
388+
list_chunks = _plan_list_chunks(plan_args, build_request, url_limit)
389+
except RequestTooLarge as exc:
390+
last_error = exc
391+
continue
392+
if list_chunks is None:
393+
list_chunks = {}
394+
# ``_plan_list_chunks`` returns ``None`` both when no list dims
395+
# are chunkable AND when the request fits. Filter chunking
396+
# alone has to close the gap — verify before committing to a
397+
# list-empty candidate.
398+
if not list_chunks and _request_bytes(build_request(**plan_args)) > url_limit:
399+
continue
400+
list_count = math.prod((len(c) for c in list_chunks.values()), start=1)
401+
total = list_count * len(filter_chunks)
402+
if best is None or total < best[0]:
403+
best = (total, list_chunks, filter_chunks)
404+
405+
if best is None:
406+
raise last_error or RequestTooLarge(
407+
"No filter-chunking candidate produces a fitting plan. "
408+
"Reduce list sizes or simplify the filter."
409+
)
410+
return best[1], best[2]
371411

372-
``ChunkPlan.from_args`` always returns a plan, even when no
373-
chunking is needed: the passthrough case is represented by empty
374-
``list_chunks`` and a single-element ``filter_chunks=[None]`` so
375-
``total == 1`` and ``iter_sub_args`` yields the original args
376-
unchanged. The wrapper's loop is therefore the same shape whether
377-
chunking was needed or not.
412+
413+
class ChunkPlan:
414+
"""A strategy for issuing one user-level request as a sequence of
415+
sub-requests whose URLs each fit ``url_limit``. Constructing a plan
416+
*is* planning: ``ChunkPlan(args, build_request, url_limit)`` runs
417+
the joint search and stores the result, raising ``RequestTooLarge``
418+
only when chunking is needed but no candidate plan fits.
419+
420+
Passthrough requests (nothing to chunk, or already fitting) are
421+
represented as a trivial plan with ``list_chunks={}``,
422+
``filter_chunks=[None]``, and ``total == 1``; ``iter_sub_args``
423+
yields the original args unchanged. The executor's loop has one
424+
shape either way.
378425
379426
Attributes
380427
----------
@@ -392,15 +439,46 @@ class ChunkPlan:
392439
URL of the full original request, used to overwrite the first
393440
chunk's ``response.url`` so ``BaseMetadata`` reflects the
394441
user's full query. ``None`` on the nothing-to-chunk passthrough
395-
path: ``fetch_once``'s response already carries the canonical
396-
URL, so the override is skipped to avoid an extra
442+
path ``fetch_once``'s response already carries the canonical
443+
URL there, so the executor skips the override to avoid an extra
397444
``build_request`` call on the hot path.
398445
"""
399446

400-
args: dict[str, Any]
401-
list_chunks: dict[str, list[list[Any]]]
402-
filter_chunks: list[str | None]
403-
canonical_url: str | None
447+
def __init__(
448+
self,
449+
args: dict[str, Any],
450+
build_request: Callable[..., Any],
451+
url_limit: int,
452+
) -> None:
453+
self.args = args
454+
# Defaults model the trivial-passthrough shape; the branches
455+
# below promote them when chunking is actually needed.
456+
self.list_chunks: dict[str, list[list[Any]]] = {}
457+
self.filter_chunks: list[str | None] = [None]
458+
self.canonical_url: str | None = None
459+
460+
filter_expr = args.get(_FILTER_KEY)
461+
clauses: list[str] = []
462+
if _is_chunkable(filter_expr, args.get("filter_lang")):
463+
_check_numeric_filter_pitfall(filter_expr)
464+
clauses = _split_top_level_or(filter_expr)
465+
466+
# Trivial passthrough: chunking has no leverage. Skip the
467+
# ``build_request`` call entirely — the common Water Data call
468+
# shape doesn't pay for an unused request prep.
469+
if not _chunkable_params(args) and len(clauses) < 2:
470+
return
471+
472+
initial_request = build_request(**args)
473+
self.canonical_url = initial_request.url
474+
475+
# Already-fits passthrough: chunking is possible but unnecessary.
476+
if _request_bytes(initial_request) <= url_limit:
477+
return
478+
479+
self.list_chunks, self.filter_chunks = _search_best_chunking(
480+
args, build_request, url_limit, clauses, filter_expr
481+
)
404482

405483
@property
406484
def total(self) -> int:
@@ -433,98 +511,8 @@ def iter_sub_args(self) -> Iterator[dict[str, Any]]:
433511

434512
def execute(self, fetch_once: _FetchOnce) -> tuple[pd.DataFrame, requests.Response]:
435513
"""Run the plan and return the combined result. See
436-
``_ChunkExecution`` for the per-sub-request semantics."""
437-
return _ChunkExecution(self, fetch_once).run()
438-
439-
@classmethod
440-
def from_args(
441-
cls,
442-
args: dict[str, Any],
443-
build_request: Callable[..., Any],
444-
url_limit: int,
445-
) -> ChunkPlan:
446-
"""Compute the cheapest joint plan for ``args``. Returns a
447-
passthrough plan when the request already fits or nothing's
448-
chunkable; raises ``RequestTooLarge`` only when chunking *is*
449-
needed but no candidate plan fits ``url_limit``.
450-
451-
Algorithm: enumerate filter chunk counts ``k = 1, 2, 4, ...,
452-
n_clauses``; for each, partition clauses into ``k``
453-
count-balanced groups joined by ``OR`` and pick the worst
454-
(longest URL-encoded) group; substitute that as the filter
455-
and plan list chunking with greedy halving. Keep the candidate
456-
whose ``list_count × k`` is smallest.
457-
"""
458-
filter_expr = args.get(_FILTER_KEY)
459-
clauses: list[str] = []
460-
if _is_chunkable(filter_expr, args.get("filter_lang")):
461-
_check_numeric_filter_pitfall(filter_expr)
462-
clauses = _split_top_level_or(filter_expr)
463-
464-
# Trivial passthrough: no multi-value lists and no top-level-OR
465-
# filter to split, so chunking has no leverage. Skip the
466-
# ``build_request`` call entirely — ``fetch_once``'s response
467-
# will carry the canonical URL already (set by
468-
# ``_finalize_paginated_response``), so the wrapper can elide
469-
# the override. This is the common Water Data call shape, so
470-
# the saved request prep is worth a small branch here.
471-
if not _chunkable_params(args) and len(clauses) < 2:
472-
return cls(
473-
args=args, list_chunks={}, filter_chunks=[None], canonical_url=None
474-
)
475-
476-
initial_request = build_request(**args)
477-
canonical_url = initial_request.url
478-
479-
# Already-fits passthrough: chunking is possible but unnecessary.
480-
if _request_bytes(initial_request) <= url_limit:
481-
return cls(
482-
args=args,
483-
list_chunks={},
484-
filter_chunks=[None],
485-
canonical_url=canonical_url,
486-
)
487-
488-
best: tuple[int, dict[str, list[list[Any]]], list[str | None]] | None = None
489-
last_error: RequestTooLarge | None = None
490-
491-
for filter_chunks, worst_filter in _filter_candidates(clauses, filter_expr):
492-
plan_args = (
493-
args if worst_filter is None else {**args, _FILTER_KEY: worst_filter}
494-
)
495-
try:
496-
list_chunks = _plan_list_chunks(plan_args, build_request, url_limit)
497-
except RequestTooLarge as exc:
498-
last_error = exc
499-
continue
500-
if list_chunks is None:
501-
list_chunks = {}
502-
# ``_plan_list_chunks`` returns ``None`` both when no list
503-
# dims are chunkable AND when the request fits. Filter
504-
# chunking alone has to close the gap — verify before
505-
# committing to a list-empty candidate.
506-
if (
507-
not list_chunks
508-
and _request_bytes(build_request(**plan_args)) > url_limit
509-
):
510-
continue
511-
list_count = math.prod((len(c) for c in list_chunks.values()), start=1)
512-
total = list_count * len(filter_chunks)
513-
if best is None or total < best[0]:
514-
best = (total, list_chunks, filter_chunks)
515-
516-
if best is None:
517-
raise last_error or RequestTooLarge(
518-
"No filter-chunking candidate produces a fitting plan. "
519-
"Reduce list sizes or simplify the filter."
520-
)
521-
522-
return cls(
523-
args=args,
524-
list_chunks=best[1],
525-
filter_chunks=best[2],
526-
canonical_url=canonical_url,
527-
)
514+
``_ChunkExecutor`` for the per-sub-request semantics."""
515+
return _ChunkExecutor(self, fetch_once).run()
528516

529517

530518
def _read_remaining(response: requests.Response) -> int | None:
@@ -602,12 +590,12 @@ def _combine_chunk_responses(
602590
return head
603591

604592

605-
class _ChunkExecution:
606-
"""In-flight execution of a ``ChunkPlan``. Issues each sub-request,
607-
accumulates frames and responses, translates 429s into
608-
``QuotaExhausted`` with the partial state captured so far, and
609-
raises ``RequestExceedsQuota`` after the first sub-request if the
610-
rest of the plan won't fit the current rate-limit window."""
593+
class _ChunkExecutor:
594+
"""Runs a ``ChunkPlan`` against a ``fetch_once`` callable. Issues
595+
each sub-request, accumulates frames and responses, translates 429s
596+
into ``QuotaExhausted`` carrying the partial state captured so far,
597+
and raises ``RequestExceedsQuota`` after the first sub-request when
598+
the rest of the plan won't fit the current rate-limit window."""
611599

612600
def __init__(self, plan: ChunkPlan, fetch_once: _FetchOnce) -> None:
613601
self.plan = plan
@@ -673,7 +661,7 @@ def multi_value_chunked(
673661
requests are a trivial single-step plan, so there's one code path
674662
either way.
675663
676-
See ``ChunkPlan`` and ``_ChunkExecution`` for planning and
664+
See ``ChunkPlan`` and ``_ChunkExecutor`` for planning and
677665
rate-limit semantics. Exceptions: ``RequestTooLarge`` if no plan
678666
fits, ``RequestExceedsQuota`` if the remaining plan can't fit the
679667
current rate-limit window, ``QuotaExhausted`` on a 429 mid-execution.
@@ -685,7 +673,7 @@ def wrapper(
685673
args: dict[str, Any],
686674
) -> tuple[pd.DataFrame, requests.Response]:
687675
limit = _WATERDATA_URL_BYTE_LIMIT if url_limit is None else url_limit
688-
return ChunkPlan.from_args(args, build_request, limit).execute(fetch_once)
676+
return ChunkPlan(args, build_request, limit).execute(fetch_once)
689677

690678
return wrapper
691679

tests/waterdata_chunking_test.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def test_chunk_plan_fans_out_filter_when_list_alone_cannot_fit():
144144
}
145145
# Singleton list + full filter ≈ 200 + 10 + 86 = 296 (over limit 240).
146146
# Joint planner must split the filter into k >= 2 groups.
147-
plan = ChunkPlan.from_args(args, _fake_build, url_limit=240)
147+
plan = ChunkPlan(args, _fake_build, url_limit=240)
148148
# Either the filter was chunked, the list was chunked, or both.
149149
assert len(plan.filter_chunks) > 1 or any(
150150
len(v) > 1 for v in plan.list_chunks.values()
@@ -165,7 +165,7 @@ def test_chunk_plan_minimizes_total_sub_requests():
165165
"filter": " OR ".join(clauses),
166166
}
167167
# Tight limit forces both dims to participate.
168-
plan = ChunkPlan.from_args(args, _fake_build, url_limit=380)
168+
plan = ChunkPlan(args, _fake_build, url_limit=380)
169169
# Plan must beat the bail-floor-style worst case (8 singletons × 16
170170
# filter chunks = 128 sub-requests) by a healthy margin.
171171
assert plan.total < 128
@@ -182,15 +182,15 @@ def test_chunk_plan_raises_when_smallest_plan_doesnt_fit():
182182
# Base 200 + singleton site (10) + singleton clause (9) = 219; limit
183183
# below 219 → no joint plan can fit.
184184
with pytest.raises(RequestTooLarge):
185-
ChunkPlan.from_args(args, _fake_build, url_limit=210)
185+
ChunkPlan(args, _fake_build, url_limit=210)
186186

187187

188188
def test_chunk_plan_passthrough_when_request_fits():
189189
"""A request that already fits gets a trivial single-step plan:
190190
no list chunks, ``filter_chunks=[None]``, ``total == 1``. The
191191
wrapper still iterates it through one fetch_once call."""
192192
args = {"monitoring_location_id": ["A", "B", "C"]}
193-
plan = ChunkPlan.from_args(args, _fake_build, url_limit=8000)
193+
plan = ChunkPlan(args, _fake_build, url_limit=8000)
194194
assert plan.list_chunks == {}
195195
assert plan.filter_chunks == [None]
196196
assert plan.total == 1
@@ -202,7 +202,7 @@ def test_chunk_plan_passthrough_when_nothing_chunkable():
202202
the limit (the server may 414, but the chunker has nothing to
203203
split)."""
204204
args = {"monitoring_location_id": "scalar-only"}
205-
plan = ChunkPlan.from_args(args, _fake_build, url_limit=10)
205+
plan = ChunkPlan(args, _fake_build, url_limit=10)
206206
assert plan.list_chunks == {}
207207
assert plan.filter_chunks == [None]
208208
assert plan.total == 1
@@ -212,7 +212,7 @@ def test_chunk_plan_iter_sub_args_passthrough_yields_original_args_once():
212212
"""The passthrough plan's ``iter_sub_args`` yields exactly one
213213
sub-args dict equal to the original args (modulo dict identity)."""
214214
args = {"monitoring_location_id": ["A", "B", "C"], "limit": 100}
215-
plan = ChunkPlan.from_args(args, _fake_build, url_limit=8000)
215+
plan = ChunkPlan(args, _fake_build, url_limit=8000)
216216
subs = list(plan.iter_sub_args())
217217
assert len(subs) == 1
218218
assert subs[0] == args
@@ -627,7 +627,7 @@ def test_joint_planner_url_construction_long_filter_and_long_sites():
627627
}
628628
url_limit = 8000
629629

630-
plan = ChunkPlan.from_args(args, _construct_api_requests, url_limit)
630+
plan = ChunkPlan(args, _construct_api_requests, url_limit)
631631
assert plan.total > 1, "expected non-trivial plan for over-limit request"
632632
list_plan = plan.list_chunks
633633
filter_chunks = plan.filter_chunks

0 commit comments

Comments
 (0)