Skip to content

Commit aeb0f0a

Browse files
thodson-usgsclaude
andcommitted
Simplify chunking module: shared helpers, idiomatic max(), tighter types
Five targeted cleanups from review, no behavior change: - Drop the duplicate ``_FetchOnce`` TypeVar in chunking.py; import the one already defined in filters.py. The two had identical bodies. - Extract ``_max_per_clause_encoding_ratio(parts)`` in filters.py. Both ``_effective_filter_budget`` and the outer ``_filter_aware_probe_args`` need the same worst-case ratio formula; pinning it in one place keeps them from drifting. - Replace the manual ``best: tuple | None`` sentinel + nested-loop scan in ``_plan_chunks`` with a generator + ``max(..., key=..., default=None)``. Removes the sentinel, the conditional-update branch, and the post-loop ``if best is None`` check. - Extract ``_finalize_paginated_response`` in utils.py so the 4-line "carry last page's headers + cumulative elapsed onto the initial response" pattern lives in one spot instead of duplicated across ``_walk_pages`` and the stats helper. - Tighten parametrized type hints from ``dict[str, list]`` to ``dict[str, list[Any]]`` (and the planner's return type) per PEP 585. Also trimmed the 17-line ``_filter_aware_probe_args`` docstring to 9 lines; the substance is preserved, the prose is leaner. All 209 waterdata tests pass; ruff clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 82087ad commit aeb0f0a

3 files changed

Lines changed: 66 additions & 55 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import itertools
4040
import math
4141
from collections.abc import Callable
42-
from typing import Any, TypeVar
42+
from typing import Any
4343
from urllib.parse import quote_plus
4444

4545
import pandas as pd
@@ -49,7 +49,9 @@
4949
from .filters import (
5050
_combine_chunk_frames,
5151
_combine_chunk_responses,
52+
_FetchOnce,
5253
_is_chunkable,
54+
_max_per_clause_encoding_ratio,
5355
_split_top_level_or,
5456
)
5557

@@ -161,7 +163,7 @@ def __init__(
161163
self.remaining = remaining
162164

163165

164-
def _chunkable_params(args: dict[str, Any]) -> dict[str, list]:
166+
def _chunkable_params(args: dict[str, Any]) -> dict[str, list[Any]]:
165167
"""Return ``{name: list(values)}`` for every list/tuple kwarg with
166168
>1 element that is allowed to chunk."""
167169
return {
@@ -173,24 +175,16 @@ def _chunkable_params(args: dict[str, Any]) -> dict[str, list]:
173175

174176
def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]:
175177
"""Substitute the filter with a synthetic ASCII clause sized to the
176-
inner chunker's bail floor if the filter is chunkable, otherwise
177-
return ``args`` unchanged.
178-
179-
The inner ``filters.chunked`` decorator splits a filter into chunks
180-
each whose URL-encoded length is ≤ the per-sub-request budget, but
181-
bails (emits the full filter unchanged) when ANY single OR-clause's
182-
URL-encoded length exceeds the budget. Mirroring ``filters._
183-
effective_filter_budget``, the bail floor on the longest clause is
184-
``len(longest) * max(per_clause_encoding_ratio)``: even a clause
185-
whose own ratio is low inherits the worst per-call ratio because
186-
the budget is computed against the heaviest-encoding clause.
187-
188-
Substituting a synthetic ASCII clause of that exact length (ASCII
189-
has a 1:1 encoding ratio, so ``quote_plus`` is a no-op) makes the
190-
planner's URL probe and the inner chunker's bail condition agree
191-
on worst-case size — the planner won't approve a plan the inner
192-
chunker would then refuse to emit, and won't prematurely raise
193-
when the inner chunker could have made it fit.
178+
inner chunker's bail floor, so the planner's URL probe matches what
179+
the inner chunker would emit.
180+
181+
The inner ``filters.chunked`` bails (emits the full filter) when any
182+
single OR-clause's URL-encoded length exceeds the per-sub-request
183+
budget. Mirroring ``filters._effective_filter_budget``, that floor
184+
is ``len(longest_clause) * max(per-clause encoding ratio)``.
185+
Substituting an ASCII clause of that exact length makes
186+
``quote_plus`` a no-op, so the URL builder sees exactly the
187+
bail-floor byte count.
194188
"""
195189
filter_expr = args.get("filter")
196190
filter_lang = args.get("filter_lang")
@@ -199,9 +193,8 @@ def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]:
199193
parts = _split_top_level_or(filter_expr)
200194
if len(parts) < 2:
201195
return args # one-clause filter — inner chunker can't shrink it
202-
encoding_ratio_max = max(len(quote_plus(p)) / len(p) for p in parts)
203196
longest_raw = max(len(p) for p in parts)
204-
probe_size = math.ceil(longest_raw * encoding_ratio_max)
197+
probe_size = math.ceil(longest_raw * _max_per_clause_encoding_ratio(parts))
205198
return {**args, "filter": "x" * probe_size}
206199

207200

@@ -239,7 +232,7 @@ def _request_bytes(req: requests.PreparedRequest) -> int:
239232

240233

241234
def _worst_case_args(
242-
probe_args: dict[str, Any], plan: dict[str, list[list]]
235+
probe_args: dict[str, Any], plan: dict[str, list[list[Any]]]
243236
) -> dict[str, Any]:
244237
"""Args dict using the LARGEST chunk from each dim — represents the
245238
most byte-heavy sub-request the plan will issue, with the filter
@@ -255,7 +248,7 @@ def _plan_chunks(
255248
build_request: Callable[..., Any],
256249
url_limit: int,
257250
max_chunks: int | None = None,
258-
) -> dict[str, list[list]] | None:
251+
) -> dict[str, list[list[Any]]] | None:
259252
"""Greedy halving until the worst-case sub-request URL fits.
260253
261254
Returns ``None`` when no chunking is needed (request as-is fits or
@@ -280,34 +273,31 @@ def _plan_chunks(
280273
if _request_bytes(build_request(**probe_args)) <= url_limit:
281274
return None
282275

283-
plan: dict[str, list[list]] = {k: [v] for k, v in chunkable.items()}
276+
plan: dict[str, list[list[Any]]] = {k: [v] for k, v in chunkable.items()}
284277

285278
while True:
286279
worst = _worst_case_args(probe_args, plan)
287280
if _request_bytes(build_request(**worst)) <= url_limit:
288281
return plan
289282

290-
# Find the single biggest chunk across all dims and halve it.
291-
best: tuple[str, int, int] | None = None # (dim, chunk_index, size)
292-
for dim, dim_chunks in plan.items():
293-
for idx, chunk in enumerate(dim_chunks):
294-
if len(chunk) <= 1:
295-
continue
296-
size = _chunk_bytes(chunk)
297-
if best is None or size > best[2]:
298-
best = (dim, idx, size)
299-
300-
if best is None:
283+
# Largest splittable chunk across all dims, by URL-encoded bytes.
284+
splittable = (
285+
(dim, idx, chunk)
286+
for dim, dim_chunks in plan.items()
287+
for idx, chunk in enumerate(dim_chunks)
288+
if len(chunk) > 1
289+
)
290+
biggest = max(splittable, key=lambda t: _chunk_bytes(t[2]), default=None)
291+
if biggest is None:
301292
raise RequestTooLarge(
302293
f"Request exceeds {url_limit} bytes (URL + body) even "
303294
f"with every multi-value parameter at a singleton chunk "
304295
f"and any chunkable filter reduced to one OR-clause. "
305296
f"Reduce the number of values or split the call manually."
306297
)
307-
dim, idx, _ = best
308-
big = plan[dim][idx]
309-
mid = len(big) // 2
310-
plan[dim] = plan[dim][:idx] + [big[:mid], big[mid:]] + plan[dim][idx + 1 :]
298+
dim, idx, chunk = biggest
299+
mid = len(chunk) // 2
300+
plan[dim] = plan[dim][:idx] + [chunk[:mid], chunk[mid:]] + plan[dim][idx + 1 :]
311301

312302
# Each split only grows the cartesian product, so once we
313303
# cross max_chunks we can never come back under. Bail now
@@ -323,12 +313,6 @@ def _plan_chunks(
323313
)
324314

325315

326-
_FetchOnce = TypeVar(
327-
"_FetchOnce",
328-
bound=Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]],
329-
)
330-
331-
332316
def _read_remaining(response: requests.Response) -> int:
333317
"""Parse ``x-ratelimit-remaining`` from a response. Missing or
334318
malformed header → return ``_QUOTA_UNKNOWN`` so the safety check

dataretrieval/waterdata/filters.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,18 @@ def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]:
152152
return chunks
153153

154154

155+
def _max_per_clause_encoding_ratio(parts: list[str]) -> float:
156+
"""Worst per-clause ``len(quote_plus(p)) / len(p)`` across OR-clauses.
157+
158+
Any sub-request chunk could end up containing only the heavier-encoding
159+
clauses, so per-sub-request byte budgets must be sized against the
160+
worst (not average) ratio to avoid overflow. Used by both this
161+
module's filter chunker and the outer ``chunking._filter_aware_probe_args``;
162+
pinning the formula here keeps the two from drifting.
163+
"""
164+
return max(len(quote_plus(p)) / len(p) for p in parts)
165+
166+
155167
def _effective_filter_budget(
156168
args: dict[str, Any],
157169
filter_expr: str,
@@ -163,8 +175,7 @@ def _effective_filter_budget(
163175
non-filter URL bytes by building the request with a 1-byte placeholder
164176
filter, subtract from the URL limit to get the bytes available for the
165177
encoded filter, then convert back to raw CQL bytes via the *maximum*
166-
per-clause encoding ratio (a chunk could contain only the heavier-encoding
167-
clauses, so budgeting by the average ratio could overflow).
178+
per-clause encoding ratio.
168179
"""
169180
# Fast path: encoded filter clearly fits with room for any plausible
170181
# non-filter URL. Skips the PreparedRequest build and splitter scan.
@@ -179,7 +190,7 @@ def _effective_filter_budget(
179190
# the caller sees one 414 instead of N parallel sub-request failures.
180191
return len(filter_expr) + 1
181192
parts = _split_top_level_or(filter_expr) or [filter_expr]
182-
encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts)
193+
encoding_ratio = _max_per_clause_encoding_ratio(parts)
183194
return max(100, int(available_url_bytes / encoding_ratio))
184195

185196

dataretrieval/waterdata/utils.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,26 @@ def _get_resp_data(resp: requests.Response, geopd: bool) -> pd.DataFrame:
618618
return df
619619

620620

621+
def _finalize_paginated_response(
622+
initial: requests.Response,
623+
last: requests.Response,
624+
total_elapsed,
625+
) -> None:
626+
"""Carry the last page's headers + cumulative elapsed onto the initial
627+
response in place.
628+
629+
The initial response stays canonical for ``md.url`` (user's original
630+
query), but its ``.headers`` and ``.elapsed`` are overwritten so the
631+
multi-value chunker's ``QuotaExhausted`` guard sees current
632+
``x-ratelimit-remaining`` and ``md.query_time`` reflects total
633+
wall-clock across pages. No-op when ``initial is last`` (single page).
634+
"""
635+
if last is initial:
636+
return
637+
initial.headers = last.headers
638+
initial.elapsed = total_elapsed
639+
640+
621641
def _walk_pages(
622642
geopd: bool,
623643
req: requests.PreparedRequest,
@@ -703,9 +723,7 @@ def _walk_pages(
703723
)
704724
curr_url = None
705725

706-
if resp is not initial_response:
707-
initial_response.headers = resp.headers
708-
initial_response.elapsed = total_elapsed
726+
_finalize_paginated_response(initial_response, resp, total_elapsed)
709727

710728
# Concatenate all pages at once for efficiency
711729
return pd.concat(dfs, ignore_index=True), initial_response
@@ -1180,9 +1198,7 @@ def get_stats_data(
11801198
)
11811199
next_token = None
11821200

1183-
if resp is not initial_response:
1184-
initial_response.headers = resp.headers
1185-
initial_response.elapsed = total_elapsed
1201+
_finalize_paginated_response(initial_response, resp, total_elapsed)
11861202

11871203
dfs = pd.concat(all_dfs, ignore_index=True) if len(all_dfs) > 1 else all_dfs[0]
11881204

0 commit comments

Comments
 (0)