Skip to content

Commit 5a757cb

Browse files
authored
Merge branch 'main' into store-multi-ts-add-args
2 parents a7c7efa + 4e9c2cb commit 5a757cb

9 files changed

Lines changed: 377 additions & 29 deletions

File tree

cwms/api.py

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from typing import Any, Optional, cast
3838

3939
from requests import Response, adapters
40+
from requests.exceptions import RetryError as RequestsRetryError
4041
from requests_toolbelt import sessions # type: ignore
4142
from requests_toolbelt.sessions import BaseUrlSession # type: ignore
4243
from urllib3.util.retry import Retry
@@ -55,12 +56,12 @@
5556
status_forcelist=[
5657
403,
5758
429,
58-
500,
5959
502,
6060
503,
6161
504,
6262
], # Example: also retry on these HTTP status codes
6363
allowed_methods=["GET", "PUT", "POST", "PATCH", "DELETE"], # Methods to retry
64+
raise_on_status=False,
6465
)
6566
SESSION = sessions.BaseUrlSession(base_url=API_ROOT)
6667
adapter = adapters.HTTPAdapter(
@@ -140,6 +141,27 @@ class PermissionError(ApiError):
140141
"""Raised when the CDA request is not authorized for the current caller."""
141142

142143

144+
def _unwrap_retry_error(error: RequestsRetryError) -> Exception:
145+
"""Return the original retry cause when requests wraps it in RetryError."""
146+
147+
current: Exception = error
148+
cause = error.__cause__
149+
while isinstance(cause, Exception):
150+
current = cause
151+
cause = cause.__cause__
152+
153+
if current is error and error.args:
154+
first_arg = error.args[0]
155+
if isinstance(first_arg, Exception):
156+
current = first_arg
157+
reason = getattr(current, "reason", None)
158+
while isinstance(reason, Exception):
159+
current = reason
160+
reason = getattr(current, "reason", None)
161+
162+
return current
163+
164+
143165
def init_session(
144166
*,
145167
api_root: Optional[str] = None,
@@ -308,11 +330,14 @@ def get(
308330
"""
309331

310332
headers = {"Accept": api_version_text(api_version)}
311-
with SESSION.get(endpoint, params=params, headers=headers) as response:
312-
if not response.ok:
313-
logging.error(f"CDA Error: response={response}")
314-
raise ApiError(response)
315-
return _process_response(response)
333+
try:
334+
with SESSION.get(endpoint, params=params, headers=headers) as response:
335+
if not response.ok:
336+
logging.error(f"CDA Error: response={response}")
337+
raise ApiError(response)
338+
return _process_response(response)
339+
except RequestsRetryError as error:
340+
raise _unwrap_retry_error(error) from None
316341

317342

318343
def get_with_paging(
@@ -367,11 +392,16 @@ def _post_function(
367392
headers = {"accept": "*/*", "Content-Type": api_version_text(api_version)}
368393
if isinstance(data, dict) or isinstance(data, list):
369394
data = json.dumps(data)
370-
with SESSION.post(endpoint, params=params, headers=headers, data=data) as response:
371-
if not response.ok:
372-
logging.error(f"CDA Error: response={response}")
373-
raise ApiError(response)
374-
return response
395+
try:
396+
with SESSION.post(
397+
endpoint, params=params, headers=headers, data=data
398+
) as response:
399+
if not response.ok:
400+
logging.error(f"CDA Error: response={response}")
401+
raise ApiError(response)
402+
return response
403+
except RequestsRetryError as error:
404+
raise _unwrap_retry_error(error) from None
375405

376406

377407
def post(
@@ -461,10 +491,15 @@ def patch(
461491

462492
if data and isinstance(data, dict) or isinstance(data, list):
463493
data = json.dumps(data)
464-
with SESSION.patch(endpoint, params=params, headers=headers, data=data) as response:
465-
if not response.ok:
466-
logging.error(f"CDA Error: response={response}")
467-
raise ApiError(response)
494+
try:
495+
with SESSION.patch(
496+
endpoint, params=params, headers=headers, data=data
497+
) as response:
498+
if not response.ok:
499+
logging.error(f"CDA Error: response={response}")
500+
raise ApiError(response)
501+
except RequestsRetryError as error:
502+
raise _unwrap_retry_error(error) from None
468503

469504

470505
def delete(
@@ -488,7 +523,10 @@ def delete(
488523
"""
489524

490525
headers = {"Accept": api_version_text(api_version)}
491-
with SESSION.delete(endpoint, params=params, headers=headers) as response:
492-
if not response.ok:
493-
logging.error(f"CDA Error: response={response}")
494-
raise ApiError(response)
526+
try:
527+
with SESSION.delete(endpoint, params=params, headers=headers) as response:
528+
if not response.ok:
529+
logging.error(f"CDA Error: response={response}")
530+
raise ApiError(response)
531+
except RequestsRetryError as error:
532+
raise _unwrap_retry_error(error) from None

cwms/catalog/catalog.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ def get_locations_catalog(
6767
"location-kind-like": location_kind_like,
6868
}
6969

70-
response = api.get(endpoint=endpoint, params=params, api_version=2)
70+
response = api.get_with_paging(
71+
endpoint=endpoint, selector="entries", params=params, api_version=2
72+
)
7173
return Data(response, selector="entries")
7274

7375

@@ -131,7 +133,9 @@ def get_timeseries_catalog(
131133
"include-extents": include_extents,
132134
}
133135

134-
response = api.get(endpoint=endpoint, params=params, api_version=2)
136+
response = api.get_with_paging(
137+
endpoint=endpoint, selector="entries", params=params, api_version=2
138+
)
135139
return Data(response, selector="entries")
136140

137141

cwms/timeseries/timeseries.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,14 @@ def combine_timeseries_results(results: List[Data]) -> Data:
233233
)
234234
combined_df["date-time"] = combined_df["date-time"].astype("Int64")
235235
combined_df = combined_df.reindex(columns=["date-time", "value", "quality-code"])
236+
237+
# Replace NaN in value column with None so they serialize as JSON null
238+
# rather than the invalid JSON literal NaN.
239+
combined_df["value"] = (
240+
combined_df["value"]
241+
.astype(object)
242+
.where(combined_df["value"].notna(), other=None)
243+
)
236244
# Update the "values" key in the JSON to include the combined data
237245
combined_json["values"] = combined_df.values.tolist()
238246

@@ -438,8 +446,11 @@ def timeseries_df_to_json(
438446
pd.Timestamp.isoformat
439447
)
440448
df = df.reindex(columns=["date-time", "value", "quality-code"])
441-
if df.isnull().values.any():
442-
raise ValueError("Null/NaN data must be removed from the dataframe")
449+
450+
# Replace NaN/NA/NaT in value column with None so they serialize as JSON
451+
# null rather than the invalid JSON literal NaN.
452+
df["value"] = df["value"].astype(object).where(df["value"].notna(), other=None)
453+
443454
if version_date:
444455
version_date_iso = version_date.isoformat()
445456
else:

cwms/users/users.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,66 @@ def get_user_profile() -> dict[str, Any]:
3535
return dict(response)
3636

3737

38+
def filter_users_by_office(data: dict[str, Any], office: str) -> dict[str, Any]:
39+
"""
40+
Filter users JSON to only include users that have roles for the specified office.
41+
Each user's roles dict will only contain the entry for that office.
42+
43+
Args:
44+
data: The full users JSON as a Python dict.
45+
office: The office key to filter by (e.g., 'MVP', 'LRL').
46+
47+
Returns:
48+
A new dict with the same structure, filtered to the specified office.
49+
"""
50+
filtered_users = []
51+
52+
for user in data.get("users", []):
53+
roles = user.get("roles", {})
54+
55+
if office in roles:
56+
# Build a copy of the user with only the target office's roles
57+
filtered_user = {k: v for k, v in user.items() if k != "roles"}
58+
filtered_user["roles"] = {office: roles[office]}
59+
filtered_users.append(filtered_user)
60+
61+
return {
62+
"page": data.get("page"),
63+
"page-size": data.get("page-size"),
64+
"total": len(filtered_users),
65+
"users": filtered_users,
66+
}
67+
68+
3869
def get_users(
3970
office_id: Optional[str] = None,
71+
username_like: Optional[str] = None,
72+
include_roles: Optional[bool] = None,
4073
page: Optional[str] = None,
41-
page_size: Optional[int] = None,
74+
page_size: Optional[int] = 5000,
4275
) -> Data:
4376
"""Retrieve users with optional office and paging filters."""
4477

45-
params = {"office": office_id, "page": page, "page-size": page_size}
78+
endpoint = "users"
79+
params = {
80+
"office": office_id,
81+
"username-like": username_like,
82+
"include-roles": include_roles,
83+
"page": page,
84+
"page-size": page_size,
85+
}
4686
try:
47-
response = api.get("users", params=params, api_version=1)
87+
response = api.get_with_paging(
88+
endpoint=endpoint, selector="users", params=params, api_version=1
89+
)
4890
except api.ApiError as error:
4991
_raise_user_management_error(error, "User list lookup")
92+
93+
# filter by office if office_id is provided since the API does not
94+
# currently support filtering by office on the backend. This is a
95+
# temporary workaround until the API supports office filtering.
96+
if office_id:
97+
response = filter_users_by_office(response, office_id)
5098
return Data(response, selector="users")
5199

52100

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "d9b66ebe",
7+
"metadata": {
8+
"vscode": {
9+
"languageId": "plaintext"
10+
}
11+
},
12+
"outputs": [],
13+
"source": [
14+
"import pandas as pd\n",
15+
"import cwms\n",
16+
"import os\n",
17+
"import logging\n",
18+
"\n",
19+
"\n",
20+
"TS_NAME = \"KEYS.Elev.Inst.1Hour.0.Ccp-Rev\"\n",
21+
"OFFICE = os.getenv(\"OFFICE\", \"SWT\")\n",
22+
"API_KEY_DEV = os.getenv(\"CDA_API_KEY_DEV\")\n",
23+
"API_ROOT_DEV = os.getenv(\"CDA_API_ROOT_DEV\")\n",
24+
"\n",
25+
"logging.basicConfig(\n",
26+
" level=logging.DEBUG, format=\"%(asctime)s - %(levelname)s - %(message)s\"\n",
27+
")\n",
28+
"\n",
29+
"\n",
30+
"if not API_ROOT_DEV:\n",
31+
" logging.error(\n",
32+
" \"CDA_API_KEY environment variable is not set. Please set it to your API key.\"\n",
33+
" )\n",
34+
"\n",
35+
"if not API_KEY_DEV:\n",
36+
" logging.error(\n",
37+
" \"CDA_API_ROOT environment variable is not set. Please set it to your API root URL.\"\n",
38+
" )\n",
39+
"\n",
40+
"logging.debug(\"Starting Script!\")\n",
41+
"\n",
42+
"\n",
43+
"logging.info(f\"Getting timeseries data for {TS_NAME} from office {OFFICE}\")\n",
44+
"\n",
45+
"cwms.init_session()\n",
46+
"ts = cwms.get_timeseries(ts_id=TS_NAME, office_id=OFFICE)\n",
47+
"shifted_df = ts.df.copy()\n",
48+
"\n",
49+
"if isinstance(shifted_df.index, pd.DatetimeIndex):\n",
50+
" shifted_df.index = shifted_df.index + pd.Timedelta(hours=10)\n",
51+
"else:\n",
52+
" shifted_df[\"date-time\"] = pd.to_datetime(shifted_df[\"date-time\"]) + pd.Timedelta(\n",
53+
" hours=10\n",
54+
" )\n",
55+
"\n",
56+
"ts_json = cwms.timeseries_df_to_json(\n",
57+
" data=shifted_df,\n",
58+
" office_id=OFFICE,\n",
59+
" ts_id=TS_NAME,\n",
60+
" units=ts.json.get(\"units\"),\n",
61+
")\n",
62+
"cwms.init_session(api_root=API_ROOT_DEV, api_key=API_KEY_DEV)\n",
63+
"cwms.store_timeseries(data=ts_json)\n",
64+
"\n",
65+
"\n",
66+
"logging.debug(\"Script Done\")\n"
67+
]
68+
}
69+
],
70+
"metadata": {
71+
"language_info": {
72+
"name": "python"
73+
}
74+
},
75+
"nbformat": 4,
76+
"nbformat_minor": 5
77+
}

pyproject.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
name = "cwms-python"
33
repository = "https://github.com/HydrologicEngineeringCenter/cwms-python"
44

5-
version = "1.0.4"
6-
5+
version = "1.0.7"
76

87
packages = [
98
{ include = "cwms" },

0 commit comments

Comments
 (0)