Skip to content

Commit a37cb5c

Browse files
thodson-usgsclaude
andcommitted
refactor(waterdata): simplify chunking for global coherence
Quality cleanup of the chunking machinery after its several iterations — no behavior change (live bit-for-bit chunked-vs-unchunked still identical for the GET getters; 165 affected tests + README/time-conventions doctests pass). - Delete the vestigial `ChunkPlan.execute` pass-through (one caller, no tests) and inline `ChunkedCall(plan, fetch, postprocess).resume()` into the decorator — removes a hop and a ~35-line docstring that duplicated ChunkedCall's contract. - `ChunkedCall.resume()` reads its own `partial_frame` / `partial_response` accessors instead of re-deriving the combine inline — one definition of "the combined result". - Add a `_ChunkedFetch` alias so the decorator's return type is honest (`multi_value_chunked` no longer claims to return a bare `_FetchOnce`; the wrapper takes `*, postprocess` and returns `(df, Any)`). - Collapse the finalizer rationale to one home (`_PostProcess`); trim the repeated copies in `ChunkedCall.__init__` and `get_ogc_data._postprocess`. - Fix stale/garbled comments: the `copy.copy` snapshot rationale (false on the single-response path), the `_combine_chunk_frames` all-empty comment (it referenced a removed `pd.concat([result, geo_page])` and had broken grammar), and the dropped `ChunkPlan.execute` docstring reference. Net -45 lines. ruff clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent dee1d5d commit a37cb5c

2 files changed

Lines changed: 30 additions & 75 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 27 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ def get_active_client() -> httpx.Client | None:
151151
# default) returns the raw combined ``(frame, response)``.
152152
_PostProcess = Callable[[pd.DataFrame, httpx.Response], tuple[pd.DataFrame, Any]]
153153

154+
# The callable ``multi_value_chunked`` returns: like ``_FetchOnce`` but with an
155+
# optional keyword-only ``postprocess`` finalizer, returning either the raw
156+
# combined ``(frame, response)`` or the finalizer's output.
157+
_ChunkedFetch = Callable[..., tuple[pd.DataFrame, Any]]
158+
154159

155160
class _RetryableTransportError(RuntimeError):
156161
"""
@@ -320,10 +325,11 @@ def __init__(
320325
# empty-frame schema fetch, or BaseMetadata reading an unset
321326
# ``elapsed``) could mask this interruption and destroy the
322327
# resumable ``.call`` handle.
323-
# ``partial_frame`` gets a defensive ``.copy()`` because
324-
# ``_combine_chunk_frames`` may return a chunk frame verbatim
325-
# in the single-completed-chunk fast path; ``partial_response``
326-
# already comes via ``copy.copy`` from ``_combine_chunk_responses``.
328+
# ``partial_frame`` gets a defensive ``.copy()`` so the snapshot is
329+
# independent of the live combine (``_combine_chunk_frames`` can
330+
# return an underlying chunk frame verbatim). The aggregated
331+
# ``partial_response`` is only ever read, never mutated, so it
332+
# needs no copy.
327333
if call is None:
328334
self.partial_frame: pd.DataFrame = pd.DataFrame()
329335
self.partial_response: httpx.Response | None = None
@@ -784,46 +790,6 @@ def iter_sub_args(self) -> Iterator[dict[str, Any]]:
784790
sub_args[axis.arg_key] = axis.render(chunk)
785791
yield sub_args
786792

787-
def execute(
788-
self,
789-
fetch_once: _FetchOnce,
790-
postprocess: _PostProcess | None = None,
791-
) -> tuple[pd.DataFrame, Any]:
792-
"""
793-
Run the plan and return the combined result.
794-
795-
Thin wrapper around
796-
``ChunkedCall(self, fetch_once, postprocess).resume()``; see
797-
:class:`ChunkedCall` for the per-sub-request semantics.
798-
799-
Parameters
800-
----------
801-
fetch_once : Callable
802-
Function that issues a single sub-request, given the
803-
substituted args dict, and returns ``(frame, response)``.
804-
postprocess : Callable, optional
805-
Finalizer applied to the combined result on successful
806-
completion (see :data:`_PostProcess`). ``None`` (default)
807-
returns the raw combined ``(frame, response)``.
808-
809-
Returns
810-
-------
811-
df : pandas.DataFrame
812-
Combined data from every successful sub-request.
813-
response : httpx.Response or BaseMetadata
814-
The aggregated response, or the finalizer's output when
815-
``postprocess`` is supplied.
816-
817-
Raises
818-
------
819-
ChunkInterrupted
820-
On a mid-stream transient failure
821-
(:class:`QuotaExhausted` for 429,
822-
:class:`ServiceInterrupted` for 5xx). The resumable handle
823-
is on ``exc.call``.
824-
"""
825-
return ChunkedCall(self, fetch_once, postprocess).resume()
826-
827793

828794
def _classify_chunk_error(
829795
exc: BaseException,
@@ -908,12 +874,11 @@ def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
908874
"""
909875
non_empty = [f for f in frames if not f.empty]
910876
if not non_empty:
911-
# Preserve the frame type (GeoDataFrame vs DataFrame) of the
912-
# input even when every chunk is empty — ``_get_resp_data``
913-
# returns ``gpd.GeoDataFrame()`` on empty geopd responses, and
914-
# returning a plain ``pd.DataFrame()`` here would downgrade
915-
# the type a downstream ``pd.concat([result, geo_page])`` to a
916-
# plain DataFrame and strip geometry/CRS.
877+
# Preserve the input frame's type (GeoDataFrame vs DataFrame)
878+
# even when every chunk is empty: ``_get_resp_data`` returns a
879+
# ``gpd.GeoDataFrame()`` for empty geopandas responses, and
880+
# returning a bare ``pd.DataFrame()`` here would strip
881+
# geometry/CRS for callers that concat the result with geo frames.
917882
return frames[0] if frames else pd.DataFrame()
918883
if len(non_empty) == 1:
919884
# Single-completed-chunk fast path. Return a copy so callers
@@ -1005,8 +970,8 @@ class ChunkedCall:
1005970
Holds the in-flight state (per-sub-request frames and responses)
1006971
and exposes a single :meth:`resume` entry point that drives the
1007972
call from wherever it is to completion — used both for the first
1008-
invocation (from :meth:`ChunkPlan.execute`) and for subsequent
1009-
retries after a :class:`ChunkInterrupted`.
973+
invocation and for subsequent retries after a
974+
:class:`ChunkInterrupted`.
1010975
1011976
A ``ChunkedCall`` is created internally when a :class:`ChunkPlan`
1012977
executes; callers reach it via :attr:`ChunkInterrupted.call` on
@@ -1054,13 +1019,10 @@ def __init__(
10541019
) -> None:
10551020
self.plan = plan
10561021
self.fetch_once = fetch_once
1057-
# Optional finalizer (see ``_PostProcess``), bound here at
1058-
# construction by the caller that owns post-processing. It rides
1059-
# on this persistent call object, so it survives repeated
1022+
# Optional finalizer (see ``_PostProcess``), bound at construction
1023+
# so it rides on this persistent call object and survives repeated
10601024
# interruptions — every ``ChunkInterrupted`` re-exposes the same
1061-
# call with its finalizer intact — and ``resume()`` applies it
1062-
# exactly once, on successful completion. ``None`` returns the raw
1063-
# combined ``(frame, response)``.
1025+
# call with its finalizer intact, applied once on success.
10641026
self.postprocess = postprocess
10651027
# Completed (frame, response) pairs keyed by sub-args index;
10661028
# ``resume()`` skips indices already present.
@@ -1172,13 +1134,11 @@ def resume(self) -> tuple[pd.DataFrame, Any]:
11721134
if reporter is not None:
11731135
reporter.start_chunk(i + 1)
11741136
self._issue(i, sub_args)
1175-
ordered = self._ordered_chunks()
1176-
frame = _combine_chunk_frames([f for f, _ in ordered])
1177-
response = _combine_chunk_responses(
1178-
[r for _, r in ordered], self.plan.canonical_url
1179-
)
1137+
# The combined result is exactly what the live ``partial_*``
1138+
# accessors expose — every sub-request has completed here.
1139+
frame, response = self.partial_frame, self.partial_response
11801140
# Post-process ONLY on successful completion (and only when a
1181-
# closure was supplied). The partial/error path keeps the raw
1141+
# finalizer was bound). The partial/error path keeps the raw
11821142
# combined pair, so an interruption can never be masked by a
11831143
# failure inside post-processing.
11841144
if self.postprocess is None:
@@ -1221,7 +1181,7 @@ def multi_value_chunked(
12211181
*,
12221182
build_request: Callable[..., httpx.Request],
12231183
url_limit: int | None = None,
1224-
) -> Callable[[_FetchOnce], _FetchOnce]:
1184+
) -> Callable[[_FetchOnce], _ChunkedFetch]:
12251185
"""
12261186
Decorate a fetch function to transparently chunk over-budget requests.
12271187
@@ -1265,7 +1225,7 @@ def multi_value_chunked(
12651225
ChunkedCall : Per-sub-request execution and resume semantics.
12661226
"""
12671227

1268-
def decorator(fetch_once: _FetchOnce) -> _FetchOnce:
1228+
def decorator(fetch_once: _FetchOnce) -> _ChunkedFetch:
12691229
@functools.wraps(fetch_once)
12701230
def wrapper(
12711231
args: dict[str, Any],
@@ -1274,7 +1234,7 @@ def wrapper(
12741234
) -> tuple[pd.DataFrame, Any]:
12751235
limit = _WATERDATA_URL_BYTE_LIMIT if url_limit is None else url_limit
12761236
plan = ChunkPlan(args, build_request, limit)
1277-
return plan.execute(fetch_once, postprocess)
1237+
return ChunkedCall(plan, fetch_once, postprocess).resume()
12781238

12791239
return wrapper
12801240

dataretrieval/waterdata/utils.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,14 +1294,9 @@ def _postprocess(
12941294
) -> tuple[pd.DataFrame, BaseMetadata]:
12951295
"""Coerce dtypes, arrange/rename columns, sort rows, and wrap the
12961296
response as ``BaseMetadata`` — the shared tail every OGC result
1297-
passes through.
1298-
1299-
Handed to the chunker as the call's finalizer, so it runs once in
1300-
``ChunkedCall.resume()`` on successful completion — whether the
1301-
call ran straight through or was resumed by the caller after a
1302-
``ChunkInterrupted``. That single binding keeps a resumed result
1303-
identical in shape to a non-interrupted one instead of leaking
1304-
raw, string-typed chunk data with a bare ``httpx.Response``.
1297+
passes through. Passed to the chunker as the call's finalizer (see
1298+
``chunking._PostProcess``) so a resumed result is shaped identically
1299+
to a non-interrupted one.
13051300
"""
13061301
frame = _deal_with_empty(frame, properties, service)
13071302
if convert_type:

0 commit comments

Comments
 (0)