forked from DOI-USGS/dataretrieval-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
1156 lines (978 loc) · 37.2 KB
/
Copy pathutils.py
File metadata and controls
1156 lines (978 loc) · 37.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import json
import logging
import os
import re
from datetime import datetime
from typing import Any, get_args
import pandas as pd
import requests
from zoneinfo import ZoneInfo
from dataretrieval import __version__
from dataretrieval.utils import BaseMetadata
from dataretrieval.waterdata import filters
from dataretrieval.waterdata.types import (
PROFILE_LOOKUP,
PROFILES,
SERVICES,
)
try:
import geopandas as gpd
GEOPANDAS = True
except ImportError:
GEOPANDAS = False
# Set up logger for this module
logger = logging.getLogger(__name__)
BASE_URL = "https://api.waterdata.usgs.gov"
OGC_API_VERSION = "v0"
OGC_API_URL = f"{BASE_URL}/ogcapi/{OGC_API_VERSION}"
SAMPLES_URL = f"{BASE_URL}/samples-data"
STATISTICS_API_VERSION = "v0"
STATISTICS_API_URL = f"{BASE_URL}/statistics/{STATISTICS_API_VERSION}"
def _switch_arg_id(ls: dict[str, Any], id_name: str, service: str):
"""
Switch argument id from its package-specific identifier to the standardized "id" key
that the API recognizes.
Sets the "id" key in the provided dictionary `ls`
with the value from either the service name or the expected id column name.
If neither key exists, "id" will be set to None.
Parameters
----------
ls : Dict[str, Any]
The dictionary containing identifier keys to be standardized.
id_name : str
The name of the specific identifier key to look for.
service : str
The service name.
Returns
-------
Dict[str, Any]
The modified dictionary with the "id" key set appropriately.
Examples
--------
For service "time-series-metadata", the function will look for either
"time_series_metadata_id" or "time_series_id" and change the key to simply
"id".
"""
service_id = service.replace("-", "_") + "_id"
if "id" not in ls:
if service_id in ls:
ls["id"] = ls[service_id]
elif id_name in ls:
ls["id"] = ls[id_name]
# Remove the original keys regardless of whether they were used
ls.pop(service_id, None)
ls.pop(id_name, None)
return ls
def _switch_properties_id(properties: list[str] | None, id_name: str, service: str):
"""
Switch properties id from its package-specific identifier to the
standardized "id" key that the API recognizes.
Sets the "id" key in the provided dictionary `ls` with the value from either
the service name or the expected id column name. If neither key exists, "id"
will be set to None.
Parameters
----------
properties : Optional[List[str]]
A list containing the properties or column names to be pulled from the
service, or None.
id_name : str
The name of the specific identifier key to look for.
service : str
The service name.
Returns
-------
List[str]
The modified list with the "id" key set appropriately.
Examples
--------
For service "monitoring-locations", it will look for
"monitoring_location_id" and change
it to "id".
"""
if not properties:
return []
service_id = service.replace("-", "_") + "_id"
last_letter = service[-1]
service_id_singular = ""
if last_letter == "s":
service_singular = service[:-1]
service_id_singular = service_singular.replace("-", "_") + "_id"
# Replace id fields with "id"
id_fields = [service_id, service_id_singular, id_name]
properties = ["id" if p in id_fields else p.replace("-", "_") for p in properties]
# Remove unwanted fields
return [p for p in properties if p not in ["geometry", service_id]]
def _format_api_dates(
datetime_input: str | list[str], date: bool = False
) -> str | None:
"""
Formats date or datetime input(s) for use with an API.
Handles single values or ranges, and converting to ISO 8601 or date-only
formats as needed.
Parameters
----------
datetime_input : Union[str, List[str]]
A single date/datetime string or a list of one or two date/datetime
strings. Accepts formats like "%Y-%m-%d %H:%M:%S", ISO 8601, or relative
periods (e.g., "P7D").
date : bool, optional
If True, uses only the date portion ("YYYY-MM-DD"). If False (default),
returns full datetime in UTC ISO 8601 format ("YYYY-MM-DDTHH:MM:SSZ").
Returns
-------
Union[str, None]
- If input is a single value, returns the formatted date/datetime string
or None if parsing fails.
- If input is a list of two values, returns a date/datetime range string
separated by "/" (e.g., "YYYY-MM-DD/YYYY-MM-DD" or
"YYYY-MM-DDTHH:MM:SSZ/YYYY-MM-DDTHH:MM:SSZ").
- Returns None if input is empty, all NA, or cannot be parsed.
Raises
------
ValueError
If `datetime_input` contains more than two values.
Notes
-----
- Handles blank or NA values by returning None.
- Supports relative period strings (e.g., "P7D") and passes them through
unchanged.
- Converts datetimes to UTC and formats as ISO 8601 with 'Z' suffix when
`date` is False.
- For date ranges, replaces "nan" with ".." in the output.
"""
# Get timezone
local_timezone = datetime.now().astimezone().tzinfo
# Convert single string to list for uniform processing
if isinstance(datetime_input, str):
datetime_input = [datetime_input]
# Check for null or all NA and return None
if all(pd.isna(dt) or dt == "" or dt is None for dt in datetime_input):
return None
if len(datetime_input) <= 2:
# If the list is of length 1, first look for things like "P7D" or dates
# already formatted in ISO08601. Otherwise, try to coerce to datetime
if len(datetime_input) == 1 and (
re.search(r"P", datetime_input[0], re.IGNORECASE)
or "/" in datetime_input[0]
):
return datetime_input[0]
# Otherwise, use list comprehension to parse dates
else:
try:
# Parse to naive datetime
parsed_dates = [
datetime.strptime(dt, "%Y-%m-%d %H:%M:%S") # noqa: DTZ007
for dt in datetime_input
]
except ValueError:
# Parse to date only
try:
parsed_dates = [
datetime.strptime(dt, "%Y-%m-%d") # noqa: DTZ007
for dt in datetime_input
]
except ValueError:
return None
# If the service only accepts dates for this input, not
# datetimes (e.g. "daily"), return just the dates separated by a
# "/", otherwise, return the datetime in UTC format.
if date:
return "/".join(dt.strftime("%Y-%m-%d") for dt in parsed_dates)
else:
parsed_locals = [
dt.replace(tzinfo=local_timezone) for dt in parsed_dates
]
formatted = "/".join(
dt.astimezone(ZoneInfo("UTC")).strftime("%Y-%m-%dT%H:%M:%SZ")
for dt in parsed_locals
)
return formatted
else:
raise ValueError("datetime_input should only include 1-2 values")
def _cql2_param(args: dict[str, Any]) -> str:
"""
Convert query parameters to CQL2 JSON format for POST requests.
Parameters
----------
args : Dict[str, Any]
Dictionary of query parameters to convert to CQL2 format.
Returns
-------
str
JSON string representation of the CQL2 query.
"""
filters = []
for key, values in args.items():
filters.append({"op": "in", "args": [{"property": key}, values]})
query = {"op": "and", "args": filters}
return json.dumps(query, indent=4)
def _default_headers():
"""
Generate default HTTP headers for API requests.
Returns
-------
dict
A dictionary containing default headers including 'Accept-Encoding',
'Accept', 'User-Agent', and 'lang'. If the environment variable
'API_USGS_PAT' is set, its value is included as the 'X-Api-Key' header.
"""
headers = {
"Accept-Encoding": "compress, gzip",
"Accept": "application/json",
"User-Agent": f"python-dataretrieval/{__version__}",
"lang": "en-US",
}
token = os.getenv("API_USGS_PAT")
if token:
headers["X-Api-Key"] = token
return headers
def _check_ogc_requests(endpoint: str = "daily", req_type: str = "queryables"):
"""
Sends an HTTP GET request to the specified OGC endpoint and request type,
returning the JSON response.
Parameters
----------
endpoint : str, optional
The OGC collection endpoint to query (default is "daily").
req_type : str, optional
The type of request to make. Must be either "queryables" or "schema"
(default is "queryables").
Returns
-------
dict
The JSON response from the OGC endpoint.
Raises
------
ValueError
If req_type is not "queryables" or "schema".
requests.HTTPError
If the HTTP request returns an unsuccessful status code.
"""
if req_type not in ("queryables", "schema"):
raise ValueError(f"req_type must be 'queryables' or 'schema', got {req_type!r}")
url = f"{OGC_API_URL}/collections/{endpoint}/{req_type}"
resp = requests.get(url, headers=_default_headers())
resp.raise_for_status()
return resp.json()
def _error_body(resp: requests.Response):
"""
Provide more informative error messages based on the response status.
Parameters
----------
resp : requests.Response
The HTTP response object to extract the error message from.
Returns
-------
str
The extracted error message. For status code 429, returns the 'message'
field from the JSON error object. For status code 403, returns a
predefined message indicating possible reasons for denial. For other
status codes, returns the raw response text.
"""
status = resp.status_code
if status == 429:
return (
"429: Too many requests made. Please obtain an API token "
"or try again later."
)
elif status == 403:
return (
"403: Query request denied. Possible reasons include "
"query exceeding server limits."
)
j_txt = resp.json()
return (
f"{status}: {j_txt.get('code', 'Unknown type')}. "
f"{j_txt.get('description', 'No description provided')}."
)
def _construct_api_requests(
service: str,
properties: list[str] | None = None,
bbox: list[float] | None = None,
limit: int | None = None,
skip_geometry: bool = False,
**kwargs,
):
"""
Constructs an HTTP request object for the specified water data API service.
Depending on the input parameters (whether there's lists of multiple
argument values), the function determines whether to use a GET or POST
request, formats parameters appropriately, and sets required headers.
Parameters
----------
service : str
The name of the API service to query (e.g., "daily").
properties : Optional[List[str]], optional
List of property names to include in the request.
bbox : Optional[List[float]], optional
Bounding box coordinates as a list of floats.
limit : Optional[int], optional
Maximum number of results to return per request.
skip_geometry : bool, optional
Whether to exclude geometry from the response (default is False).
**kwargs
Additional query parameters, including date/time filters and other
API-specific options.
Returns
-------
requests.PreparedRequest
The constructed HTTP request object ready to be sent.
Notes
-----
- Date/time parameters are automatically formatted to ISO8601.
- If multiple values are provided for non-single parameters, a POST request
is constructed.
- The function sets appropriate headers for GET and POST requests.
"""
service_url = f"{OGC_API_URL}/collections/{service}/items"
# Single parameters can only have one value
single_params = {"datetime", "last_modified", "begin", "end", "time"}
# Identify which parameters should be included in the POST content body
post_params = {
k: v
for k, v in kwargs.items()
if k not in single_params and isinstance(v, (list, tuple)) and len(v) > 1
}
# Everything else goes into the params dictionary for the URL
params = {k: v for k, v in kwargs.items() if k not in post_params}
# Set skipGeometry parameter (API expects camelCase)
params["skipGeometry"] = skip_geometry
# If limit is none or greater than 50000, then set limit to max results. Otherwise,
# use the limit
params["limit"] = 50000 if limit is None or limit > 50000 else limit
# Indicate if function needs to perform POST conversion
POST = bool(post_params)
# Convert dates to ISO08601 format
time_periods = {"last_modified", "datetime", "time", "begin", "end"}
for i in time_periods:
if i in params:
dates = service == "daily" and i != "last_modified"
params[i] = _format_api_dates(params[i], date=dates)
# String together bbox elements from a list to a comma-separated string,
# and string together properties if provided
if bbox:
params["bbox"] = ",".join(map(str, bbox))
if properties:
params["properties"] = ",".join(properties)
# Translate CQL filter Python names to the hyphenated URL parameter that
# the OGC API expects. The Python kwarg is `filter_lang` because hyphens
# aren't valid in Python identifiers.
if "filter_lang" in params:
params["filter-lang"] = params.pop("filter_lang")
headers = _default_headers()
if POST:
headers["Content-Type"] = "application/query-cql-json"
request = requests.Request(
method="POST",
url=service_url,
headers=headers,
data=_cql2_param(post_params),
params=params,
)
else:
request = requests.Request(
method="GET",
url=service_url,
headers=headers,
params=params,
)
return request.prepare()
def _next_req_url(resp: requests.Response) -> str | None:
"""
Extracts the URL for the next page of results from an HTTP response from a
water data endpoint.
Parameters
----------
resp : requests.Response
The HTTP response object containing JSON data and headers.
Returns
-------
Optional[str]
The URL for the next page of results if available, otherwise None.
Notes
-----
- If the environment variable "API_USGS_PAT" is set, logs the remaining
requests for the current hour.
- Logs the next URL if found at info level.
- Expects the response JSON to contain a "links" list with objects having
"rel" and "href" keys.
- Checks for the "next" relation in the "links" to determine the next URL.
"""
body = resp.json()
if not body.get("numberReturned"):
return None
header_info = resp.headers
if os.getenv("API_USGS_PAT", ""):
logger.info(
"Remaining requests this hour: %s",
header_info.get("x-ratelimit-remaining", ""),
)
for link in body.get("links", []):
if link.get("rel") == "next":
next_url = link.get("href")
logger.info("Next URL: %s", next_url)
return next_url
return None
def _get_resp_data(resp: requests.Response, geopd: bool) -> pd.DataFrame:
"""
Extracts and normalizes data from an HTTP response containing GeoJSON features.
Parameters
----------
resp : requests.Response
The HTTP response object expected to contain a JSON body
with a "features" key.
geopd : bool
Indicates whether geopandas is installed and should be used to
handle geometries.
Returns
-------
gpd.GeoDataFrame or pd.DataFrame
A geopandas GeoDataFrame if geometry is included, or a pandas DataFrame
containing the feature properties and each row's service-specific id.
Returns an empty pandas DataFrame if no features are returned.
"""
# Check if it's an empty response
body = resp.json()
if not body.get("numberReturned"):
return pd.DataFrame()
# If geopandas not installed, return a pandas dataframe
if not geopd:
df = pd.json_normalize(body["features"], sep="_")
df = df.drop(
columns=["type", "geometry", "AsGeoJSON(geometry)"], errors="ignore"
)
df.columns = [col.replace("properties_", "") for col in df.columns]
df.rename(columns={"geometry_coordinates": "geometry"}, inplace=True)
df = df.loc[:, ~df.columns.duplicated()]
return df
# Organize json into geodataframe and make sure id column comes along.
df = gpd.GeoDataFrame.from_features(body["features"])
df["id"] = pd.json_normalize(body["features"])["id"].values
df = df[["id"] + [col for col in df.columns if col != "id"]]
# If no geometry present, then return pandas dataframe. A geodataframe
# is not needed.
if df["geometry"].isnull().all():
df = pd.DataFrame(df.drop(columns="geometry"))
return df
def _walk_pages(
geopd: bool,
req: requests.PreparedRequest,
client: requests.Session | None = None,
) -> tuple[pd.DataFrame, requests.Response]:
"""
Iterates through paginated API responses and aggregates the results
into a single DataFrame.
Parameters
----------
geopd : bool
Indicates whether geopandas is installed and should be used for handling
geometries.
req : requests.PreparedRequest
The initial HTTP request to send.
client : Optional[requests.Session], default None
An optional HTTP client to use for requests. If not provided, a new
client is created.
Returns
-------
pd.DataFrame
A DataFrame containing the aggregated results from all pages.
requests.Response
The initial response object containing metadata about the first request.
Raises
------
Exception
If a request fails/returns a non-200 status code.
"""
logger.info("Requesting: %s", req.url)
if not geopd:
logger.warning(
"Geopandas not installed. Geometries will be flattened "
"into pandas DataFrames."
)
# Get first response from client
# using GET or POST call
close_client = client is None
client = client or requests.Session()
try:
resp = client.send(req)
if resp.status_code != 200:
raise RuntimeError(_error_body(resp))
# Store the initial response for metadata
initial_response = resp
# Grab some aspects of the original request: headers and the
# request type (GET or POST)
method = req.method.upper()
headers = dict(req.headers)
content = req.body if method == "POST" else None
# List to collect dataframes from each page
dfs = [_get_resp_data(resp, geopd=geopd)]
curr_url = _next_req_url(resp)
while curr_url:
try:
resp = client.request(
method,
curr_url,
headers=headers,
data=content if method == "POST" else None,
)
dfs.append(_get_resp_data(resp, geopd=geopd))
curr_url = _next_req_url(resp)
except Exception: # noqa: BLE001
error_text = _error_body(resp)
logger.error("Request incomplete. %s", error_text)
logger.warning(
"Request failed for URL: %s. Data download interrupted.", curr_url
)
curr_url = None
# Concatenate all pages at once for efficiency
return pd.concat(dfs, ignore_index=True), initial_response
finally:
if close_client:
client.close()
def _deal_with_empty(
return_list: pd.DataFrame, properties: list[str] | None, service: str
) -> pd.DataFrame:
"""
Handles empty DataFrame results by returning a DataFrame with appropriate columns.
If `return_list` is empty, determines the column names to use:
- If `properties` is not provided or contains only NaN values,
retrieves schema properties from the specified service.
- Otherwise, uses the provided `properties` list as column names.
Parameters
----------
return_list : pd.DataFrame
The DataFrame to check for emptiness.
properties : Optional[List[str]]
List of property names to use as columns, or None.
service : str
The service endpoint to query for schema properties if needed.
Returns
-------
pd.DataFrame
The original DataFrame if not empty, otherwise an empty
DataFrame with the appropriate columns.
"""
if return_list.empty:
if not properties or all(pd.isna(properties)):
schema = _check_ogc_requests(endpoint=service, req_type="schema")
properties = list(schema.get("properties", {}).keys())
return pd.DataFrame(columns=properties)
return return_list
def _arrange_cols(
df: pd.DataFrame, properties: list[str] | None, output_id: str
) -> pd.DataFrame:
"""
Rearranges and renames columns in a DataFrame based on provided
properties and the service output id.
Parameters
----------
df : pd.DataFrame
The input DataFrame whose columns are to be rearranged or renamed.
properties : Optional[List[str]]
A list of column names to possibly rename. If None or contains
only NaN, the function renames 'id' to output_id.
output_id : str
The name to which the 'id' column should be renamed if applicable.
Returns
-------
pd.DataFrame or gpd.GeoDataFrame
The DataFrame with columns rearranged and/or renamed according
to the specified properties and output_id.
"""
# Rename id column to output_id
df = df.rename(columns={"id": output_id})
# If properties are provided, filter to only those columns,
# appending the geometry column when present in the DataFrame.
if properties and not all(pd.isna(properties)):
# Don't alias the caller's list — we mutate below.
local_properties = list(properties)
if "geometry" in df.columns and "geometry" not in local_properties:
local_properties.append("geometry")
# 'id' is a valid service column, but expose it under the
# service-specific output_id name instead.
if "id" in local_properties:
local_properties[local_properties.index("id")] = output_id
df = df.loc[:, [col for col in local_properties if col in df.columns]]
# Move meaningless-to-user, extra id columns to the end
# of the dataframe, if they exist
extra_id_col = set(df.columns).intersection(
{
"latest_continuous_id",
"latest_daily_id",
"daily_id",
"continuous_id",
"field_measurement_id",
}
)
# If the arbitrary id column is returned (either due to properties
# being none or NaN), then move it to the end of the dataframe, but
# if part of properties, keep in requested order
if extra_id_col and (properties is None or all(pd.isna(properties))):
id_col_order = [col for col in df.columns if col not in extra_id_col] + list(
extra_id_col
)
df = df.loc[:, id_col_order]
return df
def _type_cols(df: pd.DataFrame) -> pd.DataFrame:
"""
Casts columns into appropriate types.
Parameters
----------
df : pd.DataFrame
The input DataFrame containing water data.
Returns
-------
pd.DataFrame
The DataFrame with columns cast to appropriate types.
"""
cols = set(df.columns)
numerical_cols = [
"altitude",
"altitude_accuracy",
"contributing_drainage_area",
"drainage_area",
"hole_constructed_depth",
"value",
"well_constructed_depth",
]
time_cols = [
"begin",
"begin_utc",
"construction_date",
"end",
"end_utc",
"datetime", # unused
"last_modified",
"time",
]
for col in cols.intersection(time_cols):
df[col] = pd.to_datetime(df[col], errors="coerce")
for col in cols.intersection(numerical_cols):
df[col] = pd.to_numeric(df[col], errors="coerce")
return df
def _sort_rows(df: pd.DataFrame) -> pd.DataFrame:
"""
Sorts rows by 'time' and 'monitoring_location_id' columns if they
exist.
Parameters
----------
df : pd.DataFrame
The input DataFrame containing water data.
Returns
-------
pd.DataFrame
The DataFrame with rows ordered by time and site.
"""
if "time" in df.columns and "monitoring_location_id" in df.columns:
df = df.sort_values(by=["time", "monitoring_location_id"], ignore_index=True)
elif "time" in df.columns:
df = df.sort_values(by="time", ignore_index=True)
return df
def get_ogc_data(
args: dict[str, Any], output_id: str, service: str
) -> tuple[pd.DataFrame, BaseMetadata]:
"""
Retrieves OGC (Open Geospatial Consortium) data from a specified
endpoint and returns it as a pandas DataFrame with metadata.
This function prepares request arguments, constructs API requests,
handles pagination, processes the results, and formats output
according to the specified parameters.
Parameters
----------
args : Dict[str, Any]
Dictionary of request arguments for the OGC service.
output_id : str
The name of the output identifier to use in the request.
service : str
The OGC service type (e.g., "wfs", "wms").
Returns
-------
pd.DataFrame or gpd.GeoDataFrame
A DataFrame containing the retrieved and processed OGC data.
BaseMetadata
A metadata object containing request information including URL and query time.
Notes
-----
- The function does not mutate the input `args` dictionary.
- Handles optional arguments such as `convert_type`.
- Applies column cleanup and reordering based on service and properties.
"""
args = args.copy()
args["service"] = service
args = _switch_arg_id(args, id_name=output_id, service=service)
# Capture `properties` before the id-switch so post-processing sees
# the user-facing names, not the wire-format ones.
properties = args.get("properties")
args["properties"] = _switch_properties_id(
properties, id_name=output_id, service=service
)
convert_type = args.pop("convert_type", False)
args = {k: v for k, v in args.items() if v is not None}
return_list, response = _fetch_once(args)
return_list = _deal_with_empty(return_list, properties, service)
if convert_type:
return_list = _type_cols(return_list)
return_list = _arrange_cols(return_list, properties, output_id)
return_list = _sort_rows(return_list)
return return_list, BaseMetadata(response)
@filters.chunked(build_request=_construct_api_requests)
def _fetch_once(
args: dict[str, Any],
) -> tuple[pd.DataFrame, requests.Response]:
"""Send one prepared-args OGC request; return the frame + response.
Filter chunking is added orthogonally by the ``@filters.chunked``
decorator: with no filter (or an un-chunkable one) the decorator
passes ``args`` through to this body; with a chunkable filter it
fans out and calls this body once per sub-filter, then combines.
Either way the return shape is ``(frame, response)``.
"""
req = _construct_api_requests(**args)
return _walk_pages(geopd=GEOPANDAS, req=req)
def _handle_stats_nesting(
body: dict[str, Any],
geopd: bool = False,
) -> pd.DataFrame:
"""
Takes nested json from stats service and flattens into a dataframe with
one row per monitoring location, parameter, and statistic.
Parameters
----------
body : Dict[str, Any]
The JSON response body from the statistics service containing nested data.
Returns
-------
pd.DataFrame
A DataFrame containing the flattened statistical data.
"""
if body is None:
return pd.DataFrame()
if not geopd:
logger.info(
"Geopandas not installed. Geometries will be flattened "
"into pandas DataFrames."
)
# If geopandas not installed, return a pandas dataframe
# otherwise return a geodataframe
if not geopd:
df = pd.json_normalize(body["features"]).drop(
columns=["type", "properties.data"], errors="ignore"
)
df.columns = df.columns.str.split(".").str[-1]
else:
df = gpd.GeoDataFrame.from_features(body["features"]).drop(
columns=["data"], errors="ignore"
)
# Unnest json features, properties, data, and values while retaining necessary
# metadata to merge with main dataframe.
dat = pd.json_normalize(
body,
record_path=["features", "properties", "data", "values"],
meta=[
["features", "properties", "monitoring_location_id"],
["features", "properties", "data", "parameter_code"],
["features", "properties", "data", "unit_of_measure"],
["features", "properties", "data", "parent_time_series_id"],
# ["features", "geometry", "coordinates"],
],
meta_prefix="",
errors="ignore",
)
dat.columns = dat.columns.str.split(".").str[-1]
return df.merge(dat, on="monitoring_location_id", how="left")
def _expand_percentiles(df: pd.DataFrame) -> pd.DataFrame:
"""
Takes percentile value and thresholds columns containing lists
of values and turns each list element into its own row in the
original dataframe. 'nan's are removed from the dataframe. If
no percentile data exist, it adds a percentile column and
populates column with percentile assigned to min, max, and
median.
Parameters
----------
df : pd.DataFrame
The dataframe returned from using one of the statistics services.
Returns
-------
pd.DataFrame
A DataFrame containing the flattened percentile data.
"""
if len(df) > 0:
if "percentile" in df["computation"].unique():
# Explode percentile lists into rows called "value" and "percentile"
percentiles = df.loc[df["computation"] == "percentile"]
percentiles_explode = percentiles[
["computation_id", "values", "percentiles"]
].explode(["values", "percentiles"], ignore_index=True)
percentiles_explode = percentiles_explode.loc[
percentiles_explode["values"] != "nan"
]
percentiles_explode["value"] = pd.to_numeric(percentiles_explode["values"])
percentiles_explode["percentile"] = pd.to_numeric(
percentiles_explode["percentiles"]
)
percentiles_explode = percentiles_explode.drop(
columns=["values", "percentiles"]
)
# Merge exploded values back to other metadata/geometry
percentiles = percentiles.drop(
columns=["values", "percentiles", "value"], errors="ignore"
).merge(percentiles_explode, on="computation_id", how="left")
# Concatenate back to original
dfs = pd.concat(
[df.loc[df["computation"] != "percentile"], percentiles]
).drop(columns=["values", "percentiles"])
else:
dfs = df
dfs["percentile"] = pd.NA
# Give min, max, median a percentile value
dfs.loc[dfs["computation"] == "maximum", "percentile"] = 100
dfs.loc[dfs["computation"] == "minimum", "percentile"] = 0
dfs.loc[dfs["computation"] == "median", "percentile"] = 50
# Make sure numeric
dfs["percentile"] = pd.to_numeric(dfs["percentile"])
# Move percentile column
cols = dfs.columns.tolist()
cols.remove("percentile")
col_index = cols.index("value") + 1
cols.insert(col_index, "percentile")
return dfs[cols]
else:
return df
def get_stats_data(
args: dict[str, Any],
service: str,
expand_percentiles: bool,
client: requests.Session | None = None,
) -> tuple[pd.DataFrame, BaseMetadata]:
"""
Retrieves statistical data from a specified endpoint and returns it