44this module: the ``FILTER_LANG`` type alias, the top-level ``OR``
55splitter / chunker, the per-request URL-budget probe, the
66lexicographic-pitfall guard (see the module comment on
7- ``_NUMERIC_COMPARE_RE`` for why), and the fan-out orchestrator used
8- by ``get_ogc_data``.
9-
10- The feature is intentionally isolated here so it can be rolled back
11- cleanly: if the ``filter`` passthrough is ever dropped, deleting this
12- file leaves only a handful of lines to trim in ``utils.py`` (the
13- ``fetch_combined`` call + import) and ``api.py`` (the eight
14- ``filter`` / ``filter_lang`` kwargs + ``FILTER_LANG`` import). No
15- other module depends on this one.
7+ ``_NUMERIC_COMPARE_RE`` for why), and the ``chunked`` decorator that
8+ ``utils.py`` applies to its single-request fetch function.
9+
10+ Isolation contract (rolling the feature back):
11+
12+ - ``dataretrieval/waterdata/filters.py`` and
13+ ``tests/waterdata_filters_test.py`` can be deleted wholesale.
14+ - ``utils.py``: drop the ``from . import filters`` import and the
15+ ``@filters.chunked(...)`` decorator on ``_fetch_once``. The two
16+ function bodies themselves (``_fetch_once``, ``get_ogc_data``) are
17+ already filter-unaware and need no changes. The two-line
18+ ``filter_lang`` → ``filter-lang`` translation inside
19+ ``_construct_api_requests`` becomes dead code but is harmless.
20+ - ``api.py``: drop the ``from .filters import FILTER_LANG`` import and
21+ the eight ``filter`` / ``filter_lang`` kwarg pairs on the OGC
22+ getters.
23+ - ``__init__.py``: drop the ``FILTER_LANG`` re-export.
24+
25+ Exports:
26+ - ``FILTER_LANG`` — type alias used by ``api.py`` and re-exported.
27+ - ``chunked`` — decorator used by ``utils.py`` on its fetch helper.
28+
29+ Everything else in this module is private (leading underscore).
1630"""
1731
1832from __future__ import annotations
1933
34+ import functools
2035import re
2136from collections .abc import Callable , Iterator
2237from typing import Any , Literal
2540import pandas as pd
2641import requests
2742
28- from dataretrieval .utils import BaseMetadata
29-
3043# ---------------------------------------------------------------------
3144# Public types
3245# ---------------------------------------------------------------------
3952# ---------------------------------------------------------------------
4053
4154# Conservative fallback budget (characters) for a single CQL ``filter``
42- # query parameter, used when the caller invokes ``_chunk_cql_or`` without
43- # a ``max_len``. ``fetch_combined `` computes a tighter per-request budget
44- # from ``_WATERDATA_URL_BYTE_LIMIT`` below.
55+ # query parameter, used when ``_chunk_cql_or`` is called without a
56+ # ``max_len``. The ``chunked `` decorator computes a tighter
57+ # per-request budget from ``_WATERDATA_URL_BYTE_LIMIT`` below.
4558_CQL_FILTER_CHUNK_LEN = 5000
4659
4760# Total URL byte limit the Water Data API will accept before replying
@@ -332,54 +345,17 @@ def _raise_pitfall(field: str, offense: str) -> None:
332345
333346
334347# ---------------------------------------------------------------------
335- # Plan + fetch + combine orchestration
348+ # Chunked fan-out (decorator)
336349# ---------------------------------------------------------------------
337350
338351
339- def _plan_filter_chunks (
340- args : dict [str , Any ],
341- build_request : Callable [..., Any ],
342- ) -> list [str | None ]:
343- """Decide how to fan ``args["filter"]`` out across HTTP calls.
344-
345- Returns one entry per request to send. A ``None`` entry means "send
346- ``args`` as-is" — either there is no filter, or the filter language
347- is not one we can safely split (only cql-text top-level ``OR``
348- chains are chunkable). Otherwise each string entry is a chunked
349- cql-text expression that replaces ``args["filter"]`` for its
350- sub-request. Overlapping user OR-clauses are deduplicated by feature
351- id later in ``_combine_chunk_frames``.
352- """
353- filter_expr = args .get ("filter" )
354- filter_lang = args .get ("filter_lang" )
355- chunkable = (
352+ def _is_chunkable (filter_expr : Any , filter_lang : Any ) -> bool :
353+ """Only non-empty cql-text filters can be safely split at top-level OR."""
354+ return (
356355 isinstance (filter_expr , str )
357- and filter_expr
356+ and bool ( filter_expr )
358357 and filter_lang in {None , "cql-text" }
359358 )
360- if not chunkable :
361- return [None ]
362- _check_numeric_filter_pitfall (filter_expr )
363- raw_budget = _effective_filter_budget (args , filter_expr , build_request )
364- return _chunk_cql_or (filter_expr , max_len = raw_budget )
365-
366-
367- def _fetch_chunks (
368- args : dict [str , Any ],
369- chunks : list [str | None ],
370- build_request : Callable [..., Any ],
371- walk_pages : Callable [[Any ], tuple [pd .DataFrame , requests .Response ]],
372- ) -> tuple [list [pd .DataFrame ], list [requests .Response ]]:
373- """Send one request per chunk; return the per-chunk frames and responses."""
374- frames : list [pd .DataFrame ] = []
375- responses : list [requests .Response ] = []
376- for chunk in chunks :
377- chunk_args = args if chunk is None else {** args , "filter" : chunk }
378- req = build_request (** chunk_args )
379- frame , response = walk_pages (req )
380- frames .append (frame )
381- responses .append (response )
382- return frames , responses
383359
384360
385361def _combine_chunk_frames (frames : list [pd .DataFrame ]) -> pd .DataFrame :
@@ -403,40 +379,85 @@ def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
403379 return combined
404380
405381
406- def _aggregate_response_metadata (
382+ def _aggregate_chunk_responses (
407383 responses : list [requests .Response ],
408- ) -> BaseMetadata :
409- """Build metadata from the first response, summing elapsed across chunks.
410-
411- The first response's URL and headers are the representative ones to
412- return. When the filter was fanned across multiple chunks, replace
413- its elapsed with the sum so ``query_time`` reflects the full
414- operation rather than just the first sub-request.
384+ ) -> requests .Response :
385+ """Return a response whose URL/headers come from the first chunk and
386+ whose ``elapsed`` is the sum across all chunks.
387+
388+ Mutates the first response in place (adjusting only ``elapsed``) so
389+ the caller can still wrap it in ``BaseMetadata`` as it would any
390+ single-request response — the decorator's output shape matches the
391+ undecorated function's output shape.
415392 """
416393 metadata_response = responses [0 ]
417394 if len (responses ) > 1 :
418395 metadata_response .elapsed = sum (
419396 (r .elapsed for r in responses [1 :]),
420397 start = metadata_response .elapsed ,
421398 )
422- return BaseMetadata (metadata_response )
423-
424-
425- def fetch_combined (
426- args : dict [str , Any ],
427- * ,
428- build_request : Callable [..., Any ],
429- walk_pages : Callable [[Any ], tuple [pd .DataFrame , requests .Response ]],
430- ) -> tuple [pd .DataFrame , BaseMetadata ]:
431- """Plan chunks, fetch each, combine frames, aggregate metadata.
432-
433- One-call entry point used by ``utils.get_ogc_data``. When the
434- request has no ``filter`` (or a non-chunkable one), this falls
435- through to a single-request path with no concat / dedup / elapsed
436- aggregation overhead. That means deleting this module only
437- requires replacing the single call site with an inline
438- ``build_request(**args)`` + ``walk_pages(req)`` + ``BaseMetadata(response)``.
399+ return metadata_response
400+
401+
402+ def chunked (
403+ * , build_request : Callable [..., Any ]
404+ ) -> Callable [
405+ [Callable [[dict [str , Any ]], tuple [pd .DataFrame , requests .Response ]]],
406+ Callable [[dict [str , Any ]], tuple [pd .DataFrame , requests .Response ]],
407+ ]:
408+ """Decorator that adds CQL-filter chunking to a single-request fetch.
409+
410+ The wrapped function must have signature
411+ ``(args: dict) -> (pd.DataFrame, requests.Response)`` and represent
412+ one HTTP round-trip (build a request, walk its pages). The
413+ decorator inspects ``args``:
414+
415+ - If no chunkable filter is present, it calls the wrapped function
416+ once and returns the result unchanged.
417+ - If a chunkable cql-text filter is present, it validates the
418+ filter against the lexicographic-comparison pitfall, splits it
419+ into URL-length-safe sub-expressions, calls the wrapped function
420+ once per chunk with ``{**args, "filter": chunk}``, concatenates
421+ the resulting frames (dropping empties, dedup'ing by feature
422+ ``id``), and returns an aggregated response (first chunk's
423+ URL/headers + summed ``elapsed``).
424+
425+ Either way the return type matches the wrapped function's — the
426+ caller wraps the response in ``BaseMetadata`` the same way in
427+ both paths. That's what lets the feature be removed by dropping
428+ just the decorator line.
429+
430+ ``build_request`` is injected so the decorator can probe URL
431+ length for budget computation without importing any specific HTTP
432+ builder. It receives the same kwargs the wrapped function's
433+ ``args`` would, and returns a prepared-request-like object with a
434+ ``.url`` attribute.
439435 """
440- chunks = _plan_filter_chunks (args , build_request )
441- frames , responses = _fetch_chunks (args , chunks , build_request , walk_pages )
442- return _combine_chunk_frames (frames ), _aggregate_response_metadata (responses )
436+
437+ def decorator (
438+ fetch_once : Callable [[dict [str , Any ]], tuple [pd .DataFrame , requests .Response ]],
439+ ) -> Callable [[dict [str , Any ]], tuple [pd .DataFrame , requests .Response ]]:
440+ @functools .wraps (fetch_once )
441+ def wrapper (
442+ args : dict [str , Any ],
443+ ) -> tuple [pd .DataFrame , requests .Response ]:
444+ filter_expr = args .get ("filter" )
445+ if not _is_chunkable (filter_expr , args .get ("filter_lang" )):
446+ return fetch_once (args )
447+
448+ _check_numeric_filter_pitfall (filter_expr )
449+ budget = _effective_filter_budget (args , filter_expr , build_request )
450+ chunks = _chunk_cql_or (filter_expr , max_len = budget )
451+
452+ frames : list [pd .DataFrame ] = []
453+ responses : list [requests .Response ] = []
454+ for chunk in chunks :
455+ frame , response = fetch_once ({** args , "filter" : chunk })
456+ frames .append (frame )
457+ responses .append (response )
458+
459+ return _combine_chunk_frames (frames ), _aggregate_chunk_responses (responses )
460+
461+ return wrapper
462+
463+ return decorator
0 commit comments