Skip to content

Commit c0be417

Browse files
thodson-usgsclaude
andcommitted
feat(waterdata): add get_cql for generalized CQL2 queries
Adds get_cql(service, cql, ...), a single public entry point for querying any Water Data OGC API collection with an arbitrary CQL2 filter — for predicates the typed getters (get_daily, get_continuous, …) can't express: a top-level or, like with % wildcards, comparison operators, nested boolean trees, and geometry predicates beyond a bounding box. The CQL2 body (str or dict) is POSTed verbatim; the result is shaped like the typed getters (wire id renamed, columns ordered/sorted, dtypes coerced) and returned as (DataFrame, BaseMetadata). Like get_stats_data, it's a single request (the CQL body is opaque, so nothing to chunk); server-side CQL errors surface as the module's standard typed errors. Reuses existing machinery rather than duplicating it: _construct_cql_request shares the skipGeometry/limit/bbox/properties URL block with _construct_api_requests via _ogc_query_params; the non-chunked anyio-portal fetch path (_run_sync) is shared with get_stats_data; result shaping goes through the existing _finalize_ogc hook. WATERDATA_SERVICES enumerates the 11 OGC collections, kept in sync with _OUTPUT_ID_BY_SERVICE (guarded by a test). Also adopts _OUTPUT_ID_BY_SERVICE across the typed getters: get_ogc_data derives output_id from service via the map (single source of truth) instead of each getter hardcoding it; output_id becomes an optional override that get_reference_table still passes for its metadata collections. Tests: unit (request construction, skip_geometry omission, service validation, the WATERDATA_SERVICES/_OUTPUT_ID_BY_SERVICE sync invariant) + live (compound AND/IN, str/dict equivalence, id translation, LIKE wildcard); all typed getters re-verified against the live API after the refactor. Addresses #198 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 484a86a commit c0be417

5 files changed

Lines changed: 466 additions & 42 deletions

File tree

dataretrieval/waterdata/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
get_codes,
1616
get_combined_metadata,
1717
get_continuous,
18+
get_cql,
1819
get_daily,
1920
get_field_measurements,
2021
get_field_measurements_metadata,
@@ -37,6 +38,7 @@
3738
PROFILE_LOOKUP,
3839
PROFILES,
3940
SERVICES,
41+
WATERDATA_SERVICES,
4042
)
4143

4244
__all__ = [
@@ -45,10 +47,12 @@
4547
"PROFILES",
4648
"PROFILE_LOOKUP",
4749
"SERVICES",
50+
"WATERDATA_SERVICES",
4851
"get_channel",
4952
"get_codes",
5053
"get_combined_metadata",
5154
"get_continuous",
55+
"get_cql",
5256
"get_daily",
5357
"get_field_measurements",
5458
"get_field_measurements_metadata",

dataretrieval/waterdata/api.py

Lines changed: 161 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,22 @@
2828
METADATA_COLLECTIONS,
2929
PROFILES,
3030
SERVICES,
31+
WATERDATA_SERVICES,
3132
)
3233
from dataretrieval.waterdata.utils import (
34+
_OUTPUT_ID_BY_SERVICE,
35+
GEOPANDAS,
3336
SAMPLES_URL,
37+
_as_str_list,
3438
_check_profiles,
39+
_construct_cql_request,
3540
_default_headers,
41+
_finalize_ogc,
3642
_get_args,
3743
_raise_for_non_200,
44+
_run_sync,
45+
_switch_properties_id,
46+
_walk_pages,
3847
get_ogc_data,
3948
get_stats_data,
4049
)
@@ -252,12 +261,11 @@ def get_daily(
252261
... )
253262
"""
254263
service = "daily"
255-
output_id = "daily_id"
256264

257265
# Build argument dictionary, omitting None values
258266
args = _get_args(locals())
259267

260-
return get_ogc_data(args, output_id, service)
268+
return get_ogc_data(args, service)
261269

262270

263271
def get_continuous(
@@ -440,12 +448,11 @@ def get_continuous(
440448
... )
441449
"""
442450
service = "continuous"
443-
output_id = "continuous_id"
444451

445452
# Build argument dictionary, omitting None values
446453
args = _get_args(locals())
447454

448-
return get_ogc_data(args, output_id, service)
455+
return get_ogc_data(args, service)
449456

450457

451458
def get_monitoring_locations(
@@ -738,12 +745,11 @@ def get_monitoring_locations(
738745
... )
739746
"""
740747
service = "monitoring-locations"
741-
output_id = "monitoring_location_id"
742748

743749
# Build argument dictionary, omitting None values
744750
args = _get_args(locals())
745751

746-
return get_ogc_data(args, output_id, service)
752+
return get_ogc_data(args, service)
747753

748754

749755
def get_time_series_metadata(
@@ -961,12 +967,11 @@ def get_time_series_metadata(
961967
... )
962968
"""
963969
service = "time-series-metadata"
964-
output_id = "time_series_id"
965970

966971
# Build argument dictionary, omitting None values
967972
args = _get_args(locals())
968973

969-
return get_ogc_data(args, output_id, service)
974+
return get_ogc_data(args, service)
970975

971976

972977
def get_combined_metadata(
@@ -1194,11 +1199,10 @@ def get_combined_metadata(
11941199
11951200
"""
11961201
service = "combined-metadata"
1197-
output_id = "combined_meta_id"
11981202

11991203
args = _get_args(locals())
12001204

1201-
return get_ogc_data(args, output_id, service)
1205+
return get_ogc_data(args, service)
12021206

12031207

12041208
def get_latest_continuous(
@@ -1388,12 +1392,11 @@ def get_latest_continuous(
13881392
... )
13891393
"""
13901394
service = "latest-continuous"
1391-
output_id = "latest_continuous_id"
13921395

13931396
# Build argument dictionary, omitting None values
13941397
args = _get_args(locals())
13951398

1396-
return get_ogc_data(args, output_id, service)
1399+
return get_ogc_data(args, service)
13971400

13981401

13991402
def get_latest_daily(
@@ -1584,12 +1587,11 @@ def get_latest_daily(
15841587
... )
15851588
"""
15861589
service = "latest-daily"
1587-
output_id = "latest_daily_id"
15881590

15891591
# Build argument dictionary, omitting None values
15901592
args = _get_args(locals())
15911593

1592-
return get_ogc_data(args, output_id, service)
1594+
return get_ogc_data(args, service)
15931595

15941596

15951597
def get_field_measurements(
@@ -1774,12 +1776,11 @@ def get_field_measurements(
17741776
... )
17751777
"""
17761778
service = "field-measurements"
1777-
output_id = "field_measurement_id"
17781779

17791780
# Build argument dictionary, omitting None values
17801781
args = _get_args(locals())
17811782

1782-
return get_ogc_data(args, output_id, service)
1783+
return get_ogc_data(args, service)
17831784

17841785

17851786
def get_field_measurements_metadata(
@@ -1892,11 +1893,10 @@ def get_field_measurements_metadata(
18921893
18931894
"""
18941895
service = "field-measurements-metadata"
1895-
output_id = "field_series_id"
18961896

18971897
args = _get_args(locals())
18981898

1899-
return get_ogc_data(args, output_id, service)
1899+
return get_ogc_data(args, service)
19001900

19011901

19021902
def get_peaks(
@@ -2012,11 +2012,10 @@ def get_peaks(
20122012
20132013
"""
20142014
service = "peaks"
2015-
output_id = "peak_id"
20162015

20172016
args = _get_args(locals())
20182017

2019-
return get_ogc_data(args, output_id, service)
2018+
return get_ogc_data(args, service)
20202019

20212020

20222021
def get_reference_table(
@@ -2846,8 +2845,148 @@ def get_channel(
28462845
... )
28472846
"""
28482847
service = "channel-measurements"
2849-
output_id = "channel_measurements_id"
28502848

28512849
args = _get_args(locals())
28522850

2853-
return get_ogc_data(args, output_id, service)
2851+
return get_ogc_data(args, service)
2852+
2853+
2854+
def get_cql(
2855+
service: WATERDATA_SERVICES,
2856+
cql: str | dict,
2857+
*,
2858+
properties: str | Iterable[str] | None = None,
2859+
bbox: list[float] | None = None,
2860+
limit: int | None = None,
2861+
skip_geometry: bool | None = None,
2862+
convert_type: bool = True,
2863+
) -> tuple[pd.DataFrame, BaseMetadata]:
2864+
"""Query a Water Data OGC API collection with an arbitrary CQL2 filter.
2865+
2866+
Sends ``cql`` as a CQL2 filter against ``service`` and returns the matching
2867+
features, shaped like the typed getters (``get_daily``, ``get_continuous``,
2868+
…): the wire ``id`` renamed to the service's id column, columns ordered and
2869+
sorted, and dtypes coerced. Use it when you need a predicate the typed
2870+
getters can't express — a top-level ``or``, ``like`` with ``%`` wildcards,
2871+
comparison operators, nested boolean trees, or a geometry predicate beyond a
2872+
bounding box; prefer a typed getter when one covers the query.
2873+
2874+
The request is a single POST with the ``cql`` body sent verbatim, so there
2875+
are no multi-value arguments to chunk: narrow a query whose URL or body
2876+
would exceed the server's size cap rather than relying on automatic
2877+
chunking.
2878+
2879+
The CQL2 grammar is documented at
2880+
https://api.waterdata.usgs.gov/docs/ogcapi/complex-queries/.
2881+
2882+
Parameters
2883+
----------
2884+
service : str
2885+
OGC collection name. Must be one of
2886+
:data:`dataretrieval.waterdata.types.WATERDATA_SERVICES`
2887+
(e.g. ``"daily"``, ``"monitoring-locations"``).
2888+
cql : str or dict
2889+
CQL2 query. A ``dict`` is JSON-serialized for transport; a ``str`` is
2890+
sent through unchanged. The query goes into the HTTP POST body with
2891+
``Content-Type: application/query-cql-json``.
2892+
properties : str or iterable of str, optional
2893+
Server-side property whitelist (passed as ``properties=`` on the URL).
2894+
Reduces payload size. ``"id"`` resolves to the service's ``output_id``
2895+
(e.g. ``daily_id``) the same way it does in the typed wrappers.
2896+
bbox : list of float, optional
2897+
Bounding box ``[xmin, ymin, xmax, ymax]`` in CRS 4326. Combines with the
2898+
CQL filter as an additional spatial predicate.
2899+
limit : int, optional
2900+
Page size, clamped server-side to 50,000.
2901+
skip_geometry : bool, optional
2902+
If True, the server omits geometry from each feature
2903+
(``skipGeometry=true``).
2904+
convert_type : bool, default True
2905+
Coerce date/datetime/numeric columns to typed dtypes after the
2906+
DataFrame is built.
2907+
2908+
Returns
2909+
-------
2910+
df : pandas.DataFrame or geopandas.GeoDataFrame
2911+
Result of the query. GeoDataFrame when ``geopandas`` is installed and
2912+
geometry is present.
2913+
md : :class:`dataretrieval.utils.BaseMetadata`
2914+
Request metadata (URL, query time, response headers).
2915+
2916+
Examples
2917+
--------
2918+
.. code::
2919+
2920+
>>> # Daily values for two parameter codes at two sites
2921+
>>> # (compound AND-of-INs).
2922+
>>> from dataretrieval import waterdata
2923+
>>> cql = {
2924+
... "op": "and",
2925+
... "args": [
2926+
... {
2927+
... "op": "in",
2928+
... "args": [
2929+
... {"property": "parameter_code"},
2930+
... ["00060", "00065"],
2931+
... ],
2932+
... },
2933+
... {
2934+
... "op": "in",
2935+
... "args": [
2936+
... {"property": "monitoring_location_id"},
2937+
... ["USGS-07367300", "USGS-03277200"],
2938+
... ],
2939+
... },
2940+
... ],
2941+
... }
2942+
>>> df, md = waterdata.get_cql(service="daily", cql=cql)
2943+
2944+
>>> # Monitoring locations whose HUC starts with "02070010"
2945+
>>> # (LIKE with the CQL2 ``%`` wildcard).
2946+
>>> df, md = waterdata.get_cql(
2947+
... service="monitoring-locations",
2948+
... cql='{"op": "like", "args": ['
2949+
... '{"property": "hydrologic_unit_code"},'
2950+
... ' "02070010%"]}',
2951+
... )
2952+
"""
2953+
if service not in _OUTPUT_ID_BY_SERVICE:
2954+
raise ValueError(
2955+
f"Unknown service {service!r}. Valid services: "
2956+
f"{sorted(_OUTPUT_ID_BY_SERVICE)}."
2957+
)
2958+
output_id = _OUTPUT_ID_BY_SERVICE[service]
2959+
2960+
# ``dict`` is the pythonic input — serialize on the way out. ``str`` is sent
2961+
# verbatim so callers who already have a CQL2 doc (e.g. imported from a
2962+
# config file) don't need to re-parse it.
2963+
body = json.dumps(cql, separators=(",", ":")) if isinstance(cql, dict) else cql
2964+
2965+
properties_list = _as_str_list(properties, "properties")
2966+
2967+
# Translate user-facing names (``daily_id``/``id``) to the wire ``id`` the
2968+
# OGC API expects, matching the typed getters.
2969+
wire_properties = _switch_properties_id(properties_list, output_id, service)
2970+
2971+
req = _construct_cql_request(
2972+
service,
2973+
body,
2974+
properties=wire_properties,
2975+
bbox=bbox,
2976+
limit=limit,
2977+
skip_geometry=skip_geometry,
2978+
)
2979+
2980+
async def _run() -> tuple[pd.DataFrame, httpx.Response]:
2981+
return await _walk_pages(geopd=GEOPANDAS, req=req)
2982+
2983+
df, response = _run_sync(_run, service=service)
2984+
2985+
return _finalize_ogc(
2986+
df,
2987+
response,
2988+
properties=properties_list,
2989+
output_id=output_id,
2990+
convert_type=convert_type,
2991+
service=service,
2992+
)

dataretrieval/waterdata/types.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,24 @@
4040
"results",
4141
]
4242

43+
# OGC API time-series/monitoring collections queryable via ``get_cql``.
44+
# Keep in sync with ``utils._OUTPUT_ID_BY_SERVICE`` (same keys): that dict maps
45+
# each service to its user-facing ``id`` column and is the runtime source of
46+
# truth ``get_cql`` validates against.
47+
WATERDATA_SERVICES = Literal[
48+
"channel-measurements",
49+
"combined-metadata",
50+
"continuous",
51+
"daily",
52+
"field-measurements",
53+
"field-measurements-metadata",
54+
"latest-continuous",
55+
"latest-daily",
56+
"monitoring-locations",
57+
"peaks",
58+
"time-series-metadata",
59+
]
60+
4361
PROFILES = Literal[
4462
"actgroup",
4563
"actmetric",

0 commit comments

Comments
 (0)