Skip to content

Commit e00bb69

Browse files
committed
modularize stats function, fix paging
1 parent c067597 commit e00bb69

3 files changed

Lines changed: 74 additions & 78 deletions

File tree

dataretrieval/waterdata/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
get_latest_daily,
2121
get_monitoring_locations,
2222
get_samples,
23+
get_statistics,
2324
get_time_series_metadata,
2425
)
2526
from .types import (
@@ -38,6 +39,7 @@
3839
"get_latest_daily",
3940
"get_monitoring_locations",
4041
"get_samples",
42+
"get_statistics",
4143
"get_time_series_metadata",
4244
"_check_profiles",
4345
"CODE_SERVICES",

dataretrieval/waterdata/api.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1653,8 +1653,8 @@ def get_statistics(
16531653
country_code: Optional[str] = None,
16541654
state_code: Optional[str] = None,
16551655
county_code: Optional[str] = None,
1656-
start_date: Optional[Union[str, datetime]] = None,
1657-
end_date: Optional[Union[str, datetime]] = None,
1656+
start_date: Optional[Union[str, pd.datetime]] = None,
1657+
end_date: Optional[Union[str, pd.datetime]] = None,
16581658
monitoring_location_id: Optional[str] = None,
16591659
page_size: int = 1000,
16601660
parent_timeseries_id: Optional[str] = None,
@@ -1741,12 +1741,12 @@ def get_statistics(
17411741
params = {
17421742
k: v
17431743
for k, v in locals().items()
1744-
if k not in ["service"] and v is not None
1744+
if k not in ["service", "valid_services"] and v is not None
17451745
}
17461746

17471747
return get_stats_data(
17481748
args=params,
1749-
service=service,
1749+
service=service
17501750
)
17511751

17521752

dataretrieval/waterdata/utils.py

Lines changed: 68 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,7 @@ def _walk_pages(
588588
dfs = pd.concat([dfs, df1], ignore_index=True)
589589
curr_url = _next_req_url(resp)
590590
except Exception:
591+
error_text = _error_body(resp)
591592
warnings.warn(f"{error_text}. Data request incomplete.")
592593
logger.error("Request incomplete. %s", error_text)
593594
logger.warning("Request failed for URL: %s. Data download interrupted.", curr_url)
@@ -822,10 +823,62 @@ def get_ogc_data(
822823
metadata = BaseMetadata(response)
823824
return return_list, metadata
824825

826+
def _handle_stats_nesting(
827+
body: Dict[str, Any],
828+
geopd: bool = False,
829+
) -> pd.DataFrame:
830+
"""
831+
Takes nested json from stats service and flattens into a dataframe with
832+
one row per monitoring location, parameter, and statistic.
833+
834+
Parameters
835+
----------
836+
body : Dict[str, Any]
837+
The JSON response body from the statistics service containing nested data.
838+
839+
Returns
840+
-------
841+
pd.DataFrame
842+
A DataFrame containing the flattened statistical data.
843+
"""
844+
if body is None:
845+
return pd.DataFrame()
846+
847+
if not geopd:
848+
logger.info(
849+
"Geopandas not installed. Geometries will be flattened into pandas DataFrames."
850+
)
851+
852+
# If geopandas not installed, return a pandas dataframe
853+
# otherwise return a geodataframe
854+
if not geopd:
855+
df = pd.json_normalize(body['features'])
856+
else:
857+
df = gpd.GeoDataFrame.from_features(body["features"]).drop(columns=['data'])
858+
859+
# Unnest json features, properties, data, and values while retaining necessary
860+
# metadata to merge with main dataframe.
861+
dat = pd.json_normalize(
862+
body,
863+
record_path=["features", "properties", "data", "values"],
864+
meta=[
865+
["features", "properties", "monitoring_location_id"],
866+
["features", "properties", "data", "parameter_code"],
867+
["features", "properties", "data", "unit_of_measure"],
868+
["features", "properties", "data", "parent_time_series_id"],
869+
["features", "geometry", "coordinates"],
870+
],
871+
meta_prefix="",
872+
errors="ignore",
873+
)
874+
dat.columns = dat.columns.str.split('.').str[-1]
875+
876+
return df.merge(dat, on='monitoring_location_id', how='left')
877+
878+
825879
def get_stats_data(
826880
args: Dict[str, Any],
827881
service: str,
828-
geopd: bool,
829882
client: Optional[requests.Session] = None,
830883
) -> Tuple[pd.DataFrame, BaseMetadata]:
831884
"""
@@ -840,9 +893,6 @@ def get_stats_data(
840893
Dictionary of request arguments for the statistics service.
841894
service : str
842895
The statistics service type (e.g., "observationNormals", "observationIntervals").
843-
geopd : bool, optional
844-
If True, returns a GeoDataFrame if geometries are present; otherwise, returns a pandas DataFrame.
845-
Defaults to False.
846896
847897
Returns
848898
-------
@@ -854,11 +904,6 @@ def get_stats_data(
854904

855905
url = f"{STATISTICS_API_URL}/{service}"
856906

857-
if not geopd:
858-
logger.info(
859-
"Geopandas not installed. Geometries will be flattened into pandas DataFrames."
860-
)
861-
862907
headers = _default_headers()
863908

864909
request = requests.Request(
@@ -867,12 +912,11 @@ def get_stats_data(
867912
headers=headers,
868913
params=args,
869914
)
870-
871915
req = request.prepare()
872916
logger.info("Request: %s", req.url)
873917

874-
# Get first response from client
875-
# using GET or POST call
918+
# create temp client if not provided
919+
# and close it after the request is done
876920
close_client = client is None
877921
client = client or requests.Session()
878922

@@ -889,82 +933,32 @@ def get_stats_data(
889933
method = req.method.upper()
890934
headers = dict(req.headers)
891935

892-
# Check if it's an empty response
893936
body = resp.json()
894-
if body is None:
895-
return pd.DataFrame()
896-
897-
# If geopandas not installed, return a pandas dataframe
898-
# otherwise return a geodataframe
899-
if not geopd:
900-
df = pd.json_normalize(resp['features'])
901-
else:
902-
df = gpd.GeoDataFrame.from_features(resp["features"]).drop(columns=['data'])
903-
904-
dat = pd.json_normalize(
905-
resp,
906-
record_path=["features", "properties", "data", "values"],
907-
meta=[
908-
["features", "properties", "monitoring_location_id"],
909-
["features", "properties", "data", "parameter_code"],
910-
["features", "properties", "data", "unit_of_measure"],
911-
["features", "properties", "data", "parent_time_series_id"],
912-
["features", "geometry", "coordinates"],
913-
],
914-
meta_prefix="",
915-
errors="ignore",
916-
)
917-
dat.columns = dat.columns.str.split('.').str[-1]
937+
dfs = _handle_stats_nesting(body, geopd=GEOPANDAS)
918938

919-
dfs = df.merge(dat, on='monitoring_location_id', how='left')
939+
# Look for a next code in the response body
940+
next_token = body['next']
920941

921-
curr_url = body['next']
942+
while next_token:
943+
args['next_token'] = next_token
922944

923-
while curr_url:
924945
try:
925946
resp = client.request(
926947
method,
927-
curr_url,
948+
url=url,
949+
params=args,
928950
headers=headers,
929951
)
930-
if resp.status_code != 200:
931-
error_text = _error_body(resp)
932-
raise Exception(error_text)
933-
# Check if it's an empty response
934952
body = resp.json()
935-
if body is None:
936-
return pd.DataFrame()
937-
938-
# If geopandas not installed, return a pandas dataframe
939-
# otherwise return a geodataframe
940-
if not geopd:
941-
df1 = pd.json_normalize(resp['features'])
942-
else:
943-
df1 = gpd.GeoDataFrame.from_features(resp["features"]).drop(columns=['data'])
944-
945-
dat = pd.json_normalize(
946-
resp,
947-
record_path=["features", "properties", "data", "values"],
948-
meta=[
949-
["features", "properties", "monitoring_location_id"],
950-
["features", "properties", "data", "parameter_code"],
951-
["features", "properties", "data", "unit_of_measure"],
952-
["features", "properties", "data", "parent_time_series_id"],
953-
["features", "geometry", "coordinates"],
954-
],
955-
meta_prefix="",
956-
errors="ignore",
957-
)
958-
dat.columns = dat.columns.str.split('.').str[-1]
959-
960-
df1 = df1.merge(dat, on='monitoring_location_id', how='left')
953+
df1 = _handle_stats_nesting(body, geopd=GEOPANDAS)
961954
dfs = pd.concat([dfs, df1], ignore_index=True)
962-
curr_url = body['next']
955+
next_token = body['next']
963956
except Exception:
957+
error_text = _error_body(resp)
964958
warnings.warn(f"{error_text}. Data request incomplete.")
965959
logger.error("Request incomplete. %s", error_text)
966-
logger.warning("Request failed for URL: %s. Data download interrupted.", curr_url)
967-
curr_url = None
960+
logger.warning("Request failed for URL: %s. Data download interrupted.", resp.url)
961+
next_token = None
968962
return dfs, BaseMetadata(initial_response)
969963
finally:
970964
if close_client:

0 commit comments

Comments
 (0)