forked from DOI-USGS/dataretrieval-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
2042 lines (1775 loc) · 73.2 KB
/
Copy pathutils.py
File metadata and controls
2042 lines (1775 loc) · 73.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import copy
import functools
import json
import logging
import numbers
import os
import re
from collections.abc import (
AsyncIterator,
Awaitable,
Callable,
Iterable,
Iterator,
Mapping,
)
from contextlib import asynccontextmanager, contextmanager
from contextvars import ContextVar
from datetime import datetime, timedelta
from typing import Any, TypeVar, get_args
from zoneinfo import ZoneInfo
import httpx
import pandas as pd
from anyio.from_thread import start_blocking_portal
from dataretrieval import __version__
from dataretrieval.utils import HTTPX_DEFAULTS, BaseMetadata
from dataretrieval.waterdata import _progress, chunking
from dataretrieval.waterdata.chunking import (
_QUOTA_HEADER,
RateLimited,
ServiceUnavailable,
_safe_elapsed,
get_active_client,
)
from dataretrieval.waterdata.types import (
PROFILE_LOOKUP,
PROFILES,
SERVICES,
)
try:
import geopandas as gpd
GEOPANDAS = True
except ImportError:
GEOPANDAS = False
# Set up logger for this module
logger = logging.getLogger(__name__)
# Whether geopandas is present is a static, environment-level fact, so warn once
# here at import time rather than per query/chunk. That avoids the warning
# repeating on every call and avoids it interleaving with the progress line's
# carriage-return rewrites.
if not GEOPANDAS:
logger.warning(
"Geopandas not installed. Geometries will be flattened into pandas DataFrames."
)
BASE_URL = "https://api.waterdata.usgs.gov"
OGC_API_VERSION = "v0"
OGC_API_URL = f"{BASE_URL}/ogcapi/{OGC_API_VERSION}"
SAMPLES_URL = f"{BASE_URL}/samples-data"
STATISTICS_API_VERSION = "v0"
STATISTICS_API_URL = f"{BASE_URL}/statistics/{STATISTICS_API_VERSION}"
# Maps each OGC waterdata service to its user-facing ``id`` column (the name the
# typed getters rename the wire ``id`` to, e.g. ``daily`` -> ``daily_id``).
# ``get_cql`` validates its ``service`` argument against these keys and
# uses the value as the ``output_id`` for result shaping. Keep in sync with the
# ``types.WATERDATA_SERVICES`` Literal (same keys).
_OUTPUT_ID_BY_SERVICE: dict[str, str] = {
"channel-measurements": "channel_measurements_id",
"combined-metadata": "combined_meta_id",
"continuous": "continuous_id",
"daily": "daily_id",
"field-measurements": "field_measurement_id",
"field-measurements-metadata": "field_series_id",
"latest-continuous": "latest_continuous_id",
"latest-daily": "latest_daily_id",
"monitoring-locations": "monitoring_location_id",
"peaks": "peak_id",
"time-series-metadata": "time_series_id",
}
# Every service's output id EXCEPT the two that are genuinely user-facing
# (``monitoring_location_id`` and ``time_series_id``). The rest are synthetic
# per-record ids that ``_arrange_cols`` moves to the end of a result frame.
# Derived from ``_OUTPUT_ID_BY_SERVICE`` so adding a service can't silently
# leave a stray id column at the front again.
_EXTRA_ID_COLS = set(_OUTPUT_ID_BY_SERVICE.values()) - {
"monitoring_location_id",
"time_series_id",
}
def _switch_arg_id(ls: dict[str, Any], id_name: str, service: str):
"""
Switch argument id from its package-specific identifier to the standardized "id" key
that the API recognizes.
Sets the "id" key in the provided dictionary `ls`
with the value from either the service name or the expected id column name.
If neither key exists, "id" will be set to None.
Parameters
----------
ls : Dict[str, Any]
The dictionary containing identifier keys to be standardized.
id_name : str
The name of the specific identifier key to look for.
service : str
The service name.
Returns
-------
Dict[str, Any]
The modified dictionary with the "id" key set appropriately.
Examples
--------
For service "time-series-metadata", the function will look for either
"time_series_metadata_id" or "time_series_id" and change the key to simply
"id".
"""
service_id = service.replace("-", "_") + "_id"
if "id" not in ls:
if service_id in ls:
ls["id"] = ls[service_id]
elif id_name in ls:
ls["id"] = ls[id_name]
# Remove the original keys regardless of whether they were used
ls.pop(service_id, None)
ls.pop(id_name, None)
return ls
def _switch_properties_id(properties: list[str] | None, id_name: str, service: str):
"""
Switch properties id from its package-specific identifier to the
standardized "id" key that the API recognizes.
Sets the "id" key in the provided dictionary `ls` with the value from either
the service name or the expected id column name. If neither key exists, "id"
will be set to None.
Parameters
----------
properties : Optional[List[str]]
A list containing the properties or column names to be pulled from the
service, or None.
id_name : str
The name of the specific identifier key to look for.
service : str
The service name.
Returns
-------
List[str]
The modified list with the "id" key set appropriately.
Examples
--------
For service "monitoring-locations", it will look for
"monitoring_location_id" and change
it to "id".
"""
if not properties:
return []
service_id = service.replace("-", "_") + "_id"
last_letter = service[-1]
service_id_singular = ""
if last_letter == "s":
service_singular = service[:-1]
service_id_singular = service_singular.replace("-", "_") + "_id"
# Replace id fields with "id"
id_fields = [service_id, service_id_singular, id_name]
properties = ["id" if p in id_fields else p.replace("-", "_") for p in properties]
# Remove unwanted fields
return [p for p in properties if p not in ["geometry", service_id]]
_DATETIME_FORMATS = (
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S.%f",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d",
)
# Anchored to ``[Pp]\d`` so a normal word containing ``p`` (e.g. ``"Apr"``)
# doesn't get mis-classified as an ISO 8601 duration; the optional ``T``
# admits time-only forms like ``PT36H``.
_DURATION_RE = re.compile(r"^[Pp]T?\d")
# OGC API parameters that carry a date/datetime value (single string,
# two-element range, or interval/duration string) rather than a multi-value
# string list. Used by ``_construct_api_requests`` to keep them out of the
# POST/CQL2 multi-value path and to route them through ``_format_api_dates``,
# and by ``_NO_NORMALIZE_PARAMS`` to bypass string-iterable normalization.
_DATE_RANGE_PARAMS = frozenset(
{"datetime", "last_modified", "begin", "begin_utc", "end", "end_utc", "time"}
)
# Services that don't support comma-separated values for multi-value GET
# parameters and require POST with CQL2 JSON instead.
_CQL2_REQUIRED_SERVICES = frozenset({"monitoring-locations"})
def _parse_datetime(value: str) -> datetime | None:
"""Parse a single datetime string against the supported formats.
Returns a ``datetime`` (tz-aware iff the input carried a UTC offset),
or ``None`` if no format matched.
"""
# ``datetime.strptime`` accepts a numeric offset like ``+00:00`` but not
# the ``Z`` shorthand, so normalize trailing ``Z`` first.
candidate = value[:-1] + "+00:00" if value.endswith("Z") else value
for fmt in _DATETIME_FORMATS:
try:
return datetime.strptime(candidate, fmt)
except ValueError:
continue
return None
def _format_one(dt, *, date: bool) -> str | None:
"""Format a single datetime element for inclusion in the API time arg."""
if pd.isna(dt) or dt == "" or dt is None:
return ".."
parsed = _parse_datetime(dt)
if parsed is None:
return None
if date:
return parsed.strftime("%Y-%m-%d")
# Naive inputs are interpreted in the system local zone (for backwards
# compatibility). Use ``.astimezone()`` rather than a fixed offset so each
# value is resolved against the DST rules for ITS OWN date — a frozen
# ``datetime.now()`` offset shifted off-season inputs by an hour.
aware = parsed if parsed.tzinfo is not None else parsed.astimezone()
return aware.astimezone(ZoneInfo("UTC")).strftime("%Y-%m-%dT%H:%M:%SZ")
def _format_api_dates(
datetime_input: str | list[str | None] | None, date: bool = False
) -> str | None:
"""
Formats date or datetime input(s) for use with an API.
Handles single values or ranges, and converting to ISO 8601 or date-only
formats as needed.
Parameters
----------
datetime_input : Union[str, List[Optional[str]], None]
A single date/datetime string or a list of one or two date/datetime
strings. Accepts formats like "%Y-%m-%d %H:%M:%S", ISO 8601 (with or
without ``Z``/numeric offset), or relative periods (e.g., "P7D" /
"PT36H"). Range endpoints may be ``None``/``NaN``/empty to denote a
half-bounded range.
date : bool, optional
If True, uses only the date portion ("YYYY-MM-DD"). If False (default),
returns full datetime in UTC ISO 8601 format ("YYYY-MM-DDTHH:MM:SSZ").
Returns
-------
Union[str, None]
- If input is a single value, returns the formatted date/datetime string
or None if parsing fails.
- If input is a list of two values, returns a date/datetime range string
separated by "/" (e.g., "YYYY-MM-DD/YYYY-MM-DD" or
"YYYY-MM-DDTHH:MM:SSZ/YYYY-MM-DDTHH:MM:SSZ").
- Returns None if input is empty, all NA, or cannot be parsed.
Raises
------
ValueError
If `datetime_input` contains more than two values.
Notes
-----
- A single blank/NA value returns None. In a two-value range, a blank/NA
endpoint is rendered as ``".."`` to denote an open bound (e.g.
``"2024-01-01/.."``); the range is only None when *every* element is
blank/NA or any non-NA element fails to parse.
- Supports ISO 8601 durations such as "P7D" and "PT36H" and pre-formatted
intervals containing ``"/"``; both are passed through unchanged.
- Converts datetimes to UTC and formats as ISO 8601 with 'Z' suffix when
`date` is False. Inputs with an explicit offset (``Z`` or ``+HH:MM``) are
converted from that offset to UTC; naive inputs are interpreted in the
local time zone for backwards compatibility.
"""
if datetime_input is None:
return None
# Convert single string to list for uniform processing
if isinstance(datetime_input, str):
datetime_input = [datetime_input]
elif isinstance(datetime_input, Mapping):
# `list(mapping)` returns keys, which silently accepts the wrong shape.
raise TypeError(
f"date input must be a string or sequence of strings, "
f"not {type(datetime_input).__name__}."
)
elif not isinstance(datetime_input, (list, tuple)):
# Materialize any other iterable (pandas.Series, numpy.ndarray,
# generator, ...) so the len()/subscript operations below work.
datetime_input = list(datetime_input)
# Check for null or all NA and return None
if all(pd.isna(dt) or dt == "" or dt is None for dt in datetime_input):
return None
if len(datetime_input) > 2:
raise ValueError("datetime_input should only include 1-2 values")
# Pass through duration ("P7D", "PT36H") and pre-formatted interval ("a/b")
# strings untouched.
if len(datetime_input) == 1 and isinstance(datetime_input[0], str):
single = datetime_input[0]
if _DURATION_RE.match(single) or "/" in single:
return single
# Half-bounded ranges: NA endpoints render as ".."; any unparseable non-NA
# element invalidates the range.
formatted = [_format_one(dt, date=date) for dt in datetime_input]
if any(f is None for f in formatted):
return None
return "/".join(formatted)
def _cql2_param(args: dict[str, Any]) -> str:
"""
Convert query parameters to CQL2 JSON format for POST requests.
Parameters
----------
args : Dict[str, Any]
Dictionary of query parameters to convert to CQL2 format.
Returns
-------
str
Compact JSON string representation of the CQL2 query.
Notes
-----
Serialized with the tightest separators (no indentation or
whitespace). The body counts against the server's ~8 KB request-size
limit and against :func:`chunking._request_bytes` when planning
chunks, so every saved byte fits more values per POST: compact
encoding roughly halves the per-value cost versus pretty-printing,
which roughly doubles how many monitoring-location ids fit in one
sub-request and so halves the chunk count for large id lists.
"""
filters = []
for key, values in args.items():
filters.append({"op": "in", "args": [{"property": key}, values]})
query = {"op": "and", "args": filters}
return json.dumps(query, separators=(",", ":"))
def _default_headers():
"""
Generate default HTTP headers for API requests.
Returns
-------
dict
A dictionary containing default headers including 'Accept-Encoding',
'Accept', 'User-Agent', and 'lang'. If the environment variable
'API_USGS_PAT' is set, its value is included as the 'X-Api-Key' header.
"""
headers = {
"Accept-Encoding": "compress, gzip",
"Accept": "application/json",
"User-Agent": f"python-dataretrieval/{__version__}",
"lang": "en-US",
}
token = os.getenv("API_USGS_PAT")
if token:
headers["X-Api-Key"] = token
return headers
def _check_ogc_requests(endpoint: str = "daily", req_type: str = "queryables"):
"""
Sends an HTTP GET request to the specified OGC endpoint and request type,
returning the JSON response.
Parameters
----------
endpoint : str, optional
The OGC collection endpoint to query (default is "daily").
req_type : str, optional
The type of request to make. Must be either "queryables" or "schema"
(default is "queryables").
Returns
-------
dict
The JSON response from the OGC endpoint.
Raises
------
ValueError
If req_type is not "queryables" or "schema".
RateLimited, ServiceUnavailable, RuntimeError
From :func:`_raise_for_non_200` on any non-200 — same typed
contract as the main data path so callers can use one
``except`` clause everywhere.
"""
if req_type not in ("queryables", "schema"):
raise ValueError(f"req_type must be 'queryables' or 'schema', got {req_type!r}")
url = f"{OGC_API_URL}/collections/{endpoint}/{req_type}"
resp = httpx.get(url, headers=_default_headers(), **HTTPX_DEFAULTS)
_raise_for_non_200(resp)
return resp.json()
def _error_body(resp: httpx.Response):
"""
Build an informative error message from an HTTP response.
Parameters
----------
resp : httpx.Response
The HTTP response object to extract the error message from.
Returns
-------
str
An error message string assembled per status code:
* **429** — predefined message describing the rate-limit and pointing
at the API-token path; the response body is not consulted.
* **403** — predefined message describing the most common cause
(query exceeding server limits); the response body is not
consulted.
* **other statuses** — attempts ``resp.json()`` and renders
``"<status>: <code>. <description>."`` from the JSON error
envelope. If the body is not JSON (e.g. an HTML 502 from a
gateway), falls back to ``"<status>: <reason>. <snippet>"`` with
the first 200 characters of ``resp.text``; an empty body
degrades to ``"<status>: <reason>."``.
"""
status = resp.status_code
if status == 429:
return (
"429: Too many requests made. Please obtain an API token "
"or try again later."
)
elif status == 403:
return (
"403: Query request denied. Possible reasons include "
"query exceeding server limits."
)
try:
j_txt = resp.json()
except ValueError:
snippet = (resp.text or "").strip()[:200]
reason = resp.reason_phrase or "Error"
if snippet:
return f"{status}: {reason}. {snippet}"
return f"{status}: {reason}."
return (
f"{status}: {j_txt.get('code', 'Unknown type')}. "
f"{j_txt.get('description', 'No description provided')}."
)
def _parse_retry_after(value: str | None) -> float | None:
"""
Parse a USGS ``Retry-After`` header into seconds.
Parameters
----------
value : str or None
The raw header value, or ``None`` if absent.
Returns
-------
float or None
Non-negative delta-seconds, clamped at zero. ``None`` when the
header is absent or unparseable; ``ChunkedCall`` treats
``None`` as "fall back to my own retry policy".
Notes
-----
USGS sends ``Retry-After`` as integer delta-seconds (empirically
verified — e.g. ``Retry-After: 2619``). The HTTP spec also allows
HTTP-date form, but USGS doesn't use it, so this function doesn't
bother parsing it.
"""
if not value:
return None
try:
return max(0.0, float(value.strip()))
except ValueError:
return None
def _raise_for_non_200(resp: httpx.Response) -> None:
"""
Raise a typed exception for any non-200 response.
Routes through :func:`_error_body` (USGS-API-aware: handles
429/403 specially, extracts ``code``/``description`` from JSON
error bodies) rather than ``Response.raise_for_status``, which
raises ``HTTPStatusError`` with a generic message.
Parameters
----------
resp : httpx.Response
The HTTP response to inspect.
Raises
------
RateLimited
On HTTP 429 — typed so ``ChunkedCall`` can wrap as a resumable
:class:`~dataretrieval.waterdata.chunking.QuotaExhausted`.
ServiceUnavailable
On HTTP 5xx — typed so ``ChunkedCall`` can wrap as a resumable
:class:`~dataretrieval.waterdata.chunking.ServiceInterrupted`.
RuntimeError
On any other non-200 (4xx other than 429) — these are
programmer errors that retry won't fix.
"""
status = resp.status_code
if status == 200:
return
body = _error_body(resp)
retry_after = _parse_retry_after(resp.headers.get("Retry-After"))
if status == 429:
raise RateLimited(body, retry_after=retry_after)
if 500 <= status < 600:
raise ServiceUnavailable(body, retry_after=retry_after)
raise RuntimeError(body)
def _paginated_failure_message(pages_collected: int, cause: BaseException) -> str:
"""
Build a user-facing message for a mid-pagination failure.
The API exposes no resume cursor, so the caller's only recovery is
to retry the whole call — the message lists the practical knobs,
tailored to whether the failure was rate-limit (429) or something
else.
Parameters
----------
pages_collected : int
Number of pages successfully fetched before the failure.
cause : BaseException
The underlying exception that interrupted pagination.
Returns
-------
str
A message suitable for the ``RuntimeError`` that
``_walk_pages`` and ``get_stats_data`` raise from the
original exception.
"""
cause_str = str(cause).removesuffix(".")
# Some ``httpx`` exceptions (e.g. ``TimeoutException()`` with no args)
# stringify to empty; fall back to the class name so the
# returned message is always informative.
if not cause_str.strip():
cause_str = type(cause).__name__
if isinstance(cause, RateLimited):
action = "wait for the rate-limit window to reset and retry"
else:
action = "retry the request (possibly after a short backoff)"
return (
f"Paginated request failed after collecting {pages_collected} "
f"page(s): {cause_str}. To recover: {action}, reduce the "
f"request size (e.g. fewer locations, a shorter time range, or "
f"a smaller ``limit``), or obtain an API token."
)
def _ogc_query_params(
params: dict[str, Any],
*,
properties: list[str] | None,
bbox: list[float] | None,
limit: int | None,
skip_geometry: bool | None,
) -> dict[str, Any]:
"""Add the shared OGC query knobs to ``params`` (mutated in place).
Factors out the ``skipGeometry``/``limit``/``bbox``/``properties`` block
common to every OGC request so the typed getters
(:func:`_construct_api_requests`) and the generalized CQL2 path
(:func:`_construct_cql_request`) build identical URL parameters.
``skip_geometry=None`` leaves ``skipGeometry`` unset (the server defaults to
including geometry); the typed getters always pass a bool, so their behavior
is unchanged.
"""
if skip_geometry is not None:
params["skipGeometry"] = skip_geometry
params["limit"] = 50000 if limit is None or limit > 50000 else limit
# `len()` instead of truthiness: a numpy ndarray would raise on `if bbox:`.
if bbox is not None and len(bbox) > 0:
params["bbox"] = ",".join(map(str, bbox))
if properties:
params["properties"] = ",".join(properties)
return params
def _construct_api_requests(
service: str,
properties: list[str] | None = None,
bbox: list[float] | None = None,
limit: int | None = None,
skip_geometry: bool = False,
**kwargs,
) -> httpx.Request:
"""
Constructs an HTTP request object for the specified water data API service.
For most services, list parameters are comma-joined and sent as a single
GET request (e.g. ``parameter_code=["00060","00010"]`` becomes
``parameter_code=00060,00010`` in the URL). For services that do not
support comma-separated values (currently only ``monitoring-locations``),
a POST request with CQL2 JSON is used instead.
Parameters
----------
service : str
The name of the API service to query (e.g., "daily").
properties : Optional[List[str]], optional
List of property names to include in the request.
bbox : Optional[List[float]], optional
Bounding box coordinates as a list of floats.
limit : Optional[int], optional
Maximum number of results to return per request.
skip_geometry : bool, optional
Whether to exclude geometry from the response (default is False).
**kwargs
Additional query parameters, including date/time filters and other
API-specific options.
Returns
-------
httpx.Request
The constructed HTTP request object ready to be sent.
Notes
-----
- Date/time parameters are automatically formatted to ISO8601.
"""
service_url = f"{OGC_API_URL}/collections/{service}/items"
# Format date/time parameters to ISO8601 first — both routing paths need it.
for key in _DATE_RANGE_PARAMS:
if key in kwargs:
kwargs[key] = _format_api_dates(
kwargs[key],
date=(service == "daily" and key != "last_modified"),
)
if service in _CQL2_REQUIRED_SERVICES:
# POST with CQL2 JSON: multi-value params go in the request body.
# The date-range loop above has already collapsed any _DATE_RANGE_PARAMS
# value to a string, so the list/tuple check below cannot match them.
post_params = {
k: v
for k, v in kwargs.items()
if isinstance(v, (list, tuple)) and len(v) > 1
}
params = {k: v for k, v in kwargs.items() if k not in post_params}
else:
# GET with comma-separated values: join list/tuple values into one string.
# Skip empty lists/tuples so they're omitted rather than emitted as a
# filterless ``¶m=`` (which the server reads as "match empty").
post_params = {}
params = {
k: ",".join(str(x) for x in v) if isinstance(v, (list, tuple)) else v
for k, v in kwargs.items()
if not (isinstance(v, (list, tuple)) and len(v) == 0)
}
_ogc_query_params(
params,
properties=properties,
bbox=bbox,
limit=limit,
skip_geometry=skip_geometry,
)
# Translate CQL filter Python names to the hyphenated URL parameter that
# the OGC API expects. The Python kwarg is `filter_lang` because hyphens
# aren't valid in Python identifiers.
if "filter_lang" in params:
params["filter-lang"] = params.pop("filter_lang")
headers = _default_headers()
if post_params:
headers["Content-Type"] = "application/query-cql-json"
return httpx.Request(
method="POST",
url=service_url,
headers=headers,
content=_cql2_param(post_params),
params=params,
)
return httpx.Request(
method="GET",
url=service_url,
headers=headers,
params=params,
)
def _construct_cql_request(
service: str,
cql_body: str,
*,
properties: list[str] | None = None,
bbox: list[float] | None = None,
limit: int | None = None,
skip_geometry: bool | None = None,
) -> httpx.Request:
"""Build a POST/CQL2 request from a verbatim CQL2 body.
The OGC-API counterpart to :func:`_construct_api_requests` for the
generalized :func:`~dataretrieval.waterdata.api.get_cql` path: the
caller supplies an already-serialized CQL2 JSON document (any predicate the
grammar allows), sent unchanged as the request body, while
``properties``/``bbox``/``limit``/``skip_geometry`` go on the URL via the
shared :func:`_ogc_query_params` — so a generalized query and an equivalent
typed getter produce the same URL parameters.
Parameters
----------
service : str
OGC collection name (e.g. ``"daily"``).
cql_body : str
Serialized CQL2 JSON document, sent as the POST body verbatim.
properties, bbox, limit, skip_geometry
See :func:`_ogc_query_params`. ``properties`` are wire-format
(``id``-translated) names.
Returns
-------
httpx.Request
A POST request with ``Content-Type: application/query-cql-json``.
"""
service_url = f"{OGC_API_URL}/collections/{service}/items"
params = _ogc_query_params(
{},
properties=properties,
bbox=bbox,
limit=limit,
skip_geometry=skip_geometry,
)
headers = _default_headers()
headers["Content-Type"] = "application/query-cql-json"
return httpx.Request(
method="POST",
url=service_url,
headers=headers,
content=cql_body,
params=params,
)
def _next_req_url(
resp: httpx.Response, *, body: dict[str, Any] | None = None
) -> str | None:
"""
Extracts the URL for the next page of results from an HTTP response from a
water data endpoint.
Parameters
----------
resp : httpx.Response
The HTTP response object containing JSON data and headers.
body : dict, optional
Pre-parsed JSON body for ``resp``. When provided, skips the
``resp.json()`` call — useful when the caller has already
decoded the body for its own use (avoids a second parse pass).
Returns
-------
Optional[str]
The URL for the next page of results if available, otherwise None.
Notes
-----
- Returns None when the response carries no features.
- Expects the response JSON to contain a "links" list with objects having
"rel" and "href" keys.
- Checks for the "next" relation in the "links" to determine the next URL.
"""
if body is None:
body = resp.json()
if not body.get("numberReturned"):
return None
for link in body.get("links", []):
if link.get("rel") != "next":
continue
href = link.get("href")
if not href:
return None
# Refuse to follow a next-page link to a different host —
# the request's headers/auth were minted for the original
# host and shouldn't leak to whatever a poisoned response
# body might supply. Guarded against mock-shaped ``resp.url``
# attributes (tests sometimes set strings or ``MagicMock``)
# by falling open when host extraction isn't reliable.
try:
next_host = httpx.URL(href).host
resp_url = (
resp.url
if isinstance(resp.url, httpx.URL)
else httpx.URL(str(resp.url))
)
cur_host = resp_url.host
except (httpx.InvalidURL, TypeError):
next_host = cur_host = None
if next_host and cur_host and next_host != cur_host:
raise RuntimeError(
f"Refusing to follow cross-host next-page URL: "
f"{next_host} != {cur_host}"
)
return href
return None
def _get_resp_data(
resp: httpx.Response,
geopd: bool,
*,
body: dict[str, Any] | None = None,
) -> pd.DataFrame:
"""
Extracts and normalizes data from an HTTP response containing GeoJSON features.
Parameters
----------
resp : httpx.Response
The HTTP response object expected to contain a JSON body
with a "features" key.
geopd : bool
Indicates whether geopandas is installed and should be used to
handle geometries.
body : dict, optional
Pre-parsed JSON body for ``resp``. When provided, skips the
``resp.json()`` call — useful when the caller has already
decoded the body for its own use (avoids a second parse pass).
Returns
-------
gpd.GeoDataFrame or pd.DataFrame
A ``GeoDataFrame`` when ``geopd`` is True; otherwise a plain
``DataFrame`` carrying the feature properties plus an ``id``
column and a ``geometry`` column (coordinates list) where the
response includes them. Returns an empty ``DataFrame`` when no
features are returned.
Notes
-----
The non-geopandas branch builds the frame directly from each
feature's ``properties`` dict, plus the top-level ``id`` and
``geometry.coordinates`` columns — but adds the ``id`` and
``geometry`` columns only when at least one feature actually
carries them. This skips the GeoJSON envelope entirely, so
newly-added Feature-level fields (e.g. ``geometry.type`` after
USGS migrated to full GeoJSON geometry objects) can't leak into
the result frame; no reactive drop-list needs maintenance every
time the upstream schema grows.
"""
if body is None:
body = resp.json()
if not body.get("numberReturned"):
# Preserve the GeoDataFrame type on empty short-circuit so a
# downstream ``pd.concat([empty_page, geo_page])`` doesn't
# downgrade the geopd-installed user's result to a plain
# DataFrame (stripping geometry/CRS).
return gpd.GeoDataFrame() if geopd else pd.DataFrame()
# Defensive: a 200 with ``numberReturned > 0`` but missing
# ``features`` is a real schema-drift shape (mirrors the guard in
# ``_handle_stats_nesting``). Treat as empty rather than crash with
# ``KeyError`` — the wrapped failure would otherwise look like a
# transient transport error to ``_paginate``'s exception handler.
features = body.get("features") or []
if not features:
return gpd.GeoDataFrame() if geopd else pd.DataFrame()
if not geopd:
df = pd.json_normalize([f.get("properties") or {} for f in features], sep="_")
# Always materialize the ``id`` column (may be all-None) so
# ``_arrange_cols``'s ``df.rename(columns={"id": output_id})``
# produces the documented service-specific output_id column
# (daily_id, channel_measurements_id, …) even if the upstream
# response carried no feature-level id.
df["id"] = [f.get("id") for f in features]
geoms = [(f.get("geometry") or {}).get("coordinates") for f in features]
if any(g is not None for g in geoms):
df["geometry"] = geoms
return df
# Organize json into geodataframe and make sure id column comes along.
df = gpd.GeoDataFrame.from_features(features)
# Mirror the non-geopandas branch's defensive ``f.get("id")`` so a feature
# missing a top-level ``id`` yields None rather than a KeyError.
df["id"] = [f.get("id") for f in features]
df = df[["id"] + [col for col in df.columns if col != "id"]]
# If no geometry present, then return pandas dataframe. A geodataframe
# is not needed.
if df["geometry"].isnull().all():
df = pd.DataFrame(df.drop(columns="geometry"))
return df
@asynccontextmanager
async def _client_for(
client: httpx.AsyncClient | None,
) -> AsyncIterator[httpx.AsyncClient]:
"""
Yield a usable async client, picking the best available source.
Resolution order:
1. ``client`` if the caller supplied one (borrowed; not closed
here — the caller owns its lifecycle).
2. The chunker's shared async client if we're inside a
:class:`~dataretrieval.waterdata.chunking.ChunkedCall` run (per
:func:`chunking.get_active_client`). Borrowed; the chunker
closes it on exit.
3. A fresh short-lived ``httpx.AsyncClient`` opened here and closed
on context exit.
Parameters
----------
client : httpx.AsyncClient or None
A caller-owned client to borrow, or ``None`` to defer to the
chunker's shared client or a temporary one.
Yields
------
httpx.AsyncClient
The chosen client.
"""
if client is not None:
yield client
return
shared = get_active_client()
if shared is not None:
yield shared
return
async with httpx.AsyncClient(**HTTPX_DEFAULTS) as new:
yield new
def _aggregate_paginated_response(
initial: httpx.Response,
last: httpx.Response,
total_elapsed: timedelta,
) -> httpx.Response:
"""
Build a single response covering a paginated call.
Returns a shallow copy of ``initial`` with ``.headers`` set to the
LAST page's (so downstream sees current ``x-ratelimit-remaining``)
and ``.elapsed`` set to total wall-clock. The canonical
``initial.url`` is preserved (it's the user's original query).
Both ``initial`` and ``last`` are left unmutated, mirroring the
convention of
:func:`dataretrieval.waterdata.chunking._combine_chunk_responses`.
Parameters
----------
initial : httpx.Response
First-page response (the canonical one for ``md.url``).
last : httpx.Response
Last-page response — supplies the headers to copy over.
total_elapsed : datetime.timedelta
Cumulative wall-clock across every page, including ``initial``.
Returns
-------