Skip to content

Commit 9a120e3

Browse files
krowvinCopilot
andauthored
feat: Timeseries Loader (#137)
# New Sub Command for Time Series Takes in a ts group or tsid(s) and copies the data from one CDA instance to another `cwms-cli timeseries data -h` gives ```bash Usage: cwms-cli load timeseries data [OPTIONS] Copy timeseries data for a single location in a target CDA from a source CDA. Options: -v, --verbose Increase verbosity (repeat for more detail). --dry-run / --no-dry-run Show what would be written without storing to target. [default: no-dry-run] --target-api-key TEXT Target API key (if required by the target CDA). --target-cda TEXT Target CWMS Data API root. Default: http://localhost:8081/cwms-data/ [required] --source-office TEXT Source office ID (e.g. SWT, SWL). [required] --source-cda TEXT Source CWMS Data API root. Default: https://cwms-data.usace.army.mil/cwms-data/ [required] --ts-id TEXT ID of the timeseries to copy (e.g. 'LocID.*'). --ts-group TEXT ID of the timeseries group to copy (e.g. 'GroupID.*'). --ts-group-category-id TEXT ID of the timeseries group category to copy. --ts-group-category-office-id TEXT ID of the timeseries group category office to copy. --begin [%Y-%m-%dT%H:%M:%S%z|%Y-%m-%dT%H:%M:%S %Z] Start date for timeseries data (e.g. '2022-01-01T00:00:00-06:00'). --end [%Y-%m-%dT%H:%M:%S%z|%Y-%m-%dT%H:%M:%S %Z] End date for timeseries data (e.g. '2022-01-31T00:00:00-0600'). -h, --help Show this message and exit. ``` This pull request introduces a new CLI command for copying timeseries data between CWMS Data Access (CDA) instances, along with several supporting improvements and fixes. The main focus is on enabling users to copy timeseries data for a single location or a group of timeseries, with robust input validation and dry-run support. Additional changes include dependency updates and minor bug fixes. **New timeseries data copy functionality:** * Added a new `data` subcommand to the `timeseries` CLI group, allowing users to copy timeseries data for a specific timeseries ID or a timeseries group from a source CDA to a target CDA, with options for specifying time range and group/category details. Input validation ensures correct option usage. (`cwmscli/load/timeseries/timeseries.py`, `cwmscli/load/timeseries/timeseries_data.py`) [[1]](diffhunk://#diff-bd8fc5ed7f637c754f4f37d0841bdeb960ca74246d2889fdb801df2b29a8b66bR57-R147) [[2]](diffhunk://#diff-dbec006c6c9b478f5e30303a51327ea335a1bbf4d46355384afdc4b4f1b2c903R1-R109) **Supporting improvements and fixes:** * Updated the required `cwms-python` client library version from `0.8.0` to `1.0.1` to ensure compatibility with new features. (`cwmscli/utils/deps.py`) * Fixed the order of `cwms.init_session` calls in `load_timeseries_ids` to ensure the session is initialized for the target CDA before storing timeseries data. (`cwmscli/load/timeseries/timeseries_ids.py`) [[1]](diffhunk://#diff-34fd913442e086124d05e5474a9842b5aecc6bcdfb014a2c89bdf19a6eccd0b1L30-L31) [[2]](diffhunk://#diff-34fd913442e086124d05e5474a9842b5aecc6bcdfb014a2c89bdf19a6eccd0b1R40) * Removed an unnecessary import of `pd` from the `turtle` module in `timeseries_ids.py`. (`cwmscli/load/timeseries/timeseries_ids.py`) **Minor CLI output adjustment:** * Changed the `verbose` check for printing "Done." in the `load_locations` command to always print the message, regardless of verbosity. (`cwmscli/load/location/location_ids.py`) --------- Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> Co-authored-by: krowvin <23619282+krowvin@users.noreply.github.com>
1 parent 29f43dc commit 9a120e3

File tree

6 files changed

+439
-7
lines changed

6 files changed

+439
-7
lines changed

cwmscli/load/location/location_ids.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,5 +100,5 @@ def load_locations(
100100

101101
if errors:
102102
raise click.ClickException(f"Completed with {errors} error(s).")
103-
if verbose:
104-
logger.info("Done.")
103+
104+
click.echo("Done.")

cwmscli/load/timeseries/timeseries.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime, timedelta
12
from typing import Optional
23

34
import click
@@ -57,3 +58,93 @@ def load_timeseries_ids_all(
5758
timeseries_id_regex=timeseries_id_regex,
5859
dry_run=dry_run,
5960
)
61+
62+
63+
@timeseries.command(
64+
"data",
65+
help="Copy timeseries data (by timeseries ID or timeseries group) into a target CDA from a source CDA.",
66+
)
67+
@shared_source_target_options
68+
@click.option(
69+
"--ts-id",
70+
"ts_id",
71+
default=None,
72+
type=str,
73+
help="Timeseries ID to copy, or a comma-delimited list of IDs.",
74+
)
75+
@click.option(
76+
"--ts-group",
77+
"ts_group",
78+
default=None,
79+
type=str,
80+
help="ID of the timeseries group to copy (e.g. 'GroupID.*').",
81+
)
82+
@click.option(
83+
"--ts-group-category-id",
84+
"ts_group_category_id",
85+
default=None,
86+
type=str,
87+
help="Optional category filter when matching timeseries groups.",
88+
)
89+
@click.option(
90+
"--ts-group-category-office-id",
91+
"ts_group_category_office_id",
92+
default=None,
93+
type=str,
94+
help="Optional category office filter when matching timeseries groups.",
95+
)
96+
@click.option(
97+
"--begin",
98+
"begin",
99+
default=(datetime.now().astimezone() - timedelta(days=1)).replace(microsecond=0),
100+
type=click.DateTime(formats=["%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S %Z"]),
101+
help="Start date for timeseries data (e.g. '2022-01-01T00:00:00-06:00').",
102+
)
103+
@click.option(
104+
"--end",
105+
"end",
106+
default=datetime.now().astimezone().replace(microsecond=0),
107+
type=click.DateTime(formats=["%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S %Z"]),
108+
help="End date for timeseries data (e.g. '2022-01-31T00:00:00-0600').",
109+
)
110+
@requires(reqs.cwms)
111+
@validate_cda_targets
112+
def load_timeseries_data(
113+
source_cda: str,
114+
source_office: str,
115+
target_cda: str,
116+
target_api_key: Optional[str],
117+
verbose: int,
118+
ts_id: Optional[str],
119+
dry_run: bool,
120+
ts_group: Optional[str],
121+
ts_group_category_id: Optional[str],
122+
ts_group_category_office_id: Optional[str],
123+
begin: Optional[datetime] = None,
124+
end: Optional[datetime] = None,
125+
):
126+
ts_ids = None
127+
if ts_id:
128+
ts_ids = [item.strip() for item in ts_id.split(",") if item.strip()]
129+
if not ts_ids:
130+
raise click.UsageError(
131+
"--ts-id must contain at least one non-empty timeseries ID."
132+
)
133+
if (ts_id is None) == (ts_group is None):
134+
raise click.UsageError("Exactly one of --ts-id or --ts-group must be provided.")
135+
from cwmscli.load.timeseries.timeseries_data import _load_timeseries_data
136+
137+
_load_timeseries_data(
138+
source_cda=source_cda,
139+
source_office=source_office,
140+
target_cda=target_cda,
141+
target_api_key=target_api_key,
142+
verbose=verbose,
143+
dry_run=dry_run,
144+
ts_ids=ts_ids,
145+
ts_group=ts_group,
146+
ts_group_category_id=ts_group_category_id,
147+
ts_group_category_office_id=ts_group_category_office_id,
148+
begin=begin,
149+
end=end,
150+
)
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
# cwmscli/load/timeseries/timeseries_data.py
2+
import logging
3+
from datetime import datetime
4+
from typing import Optional
5+
6+
import click
7+
8+
9+
def _extract_timeseries_groups(payload) -> list[dict]:
10+
if payload is None:
11+
return []
12+
if isinstance(payload, list):
13+
return [item for item in payload if isinstance(item, dict)]
14+
return []
15+
16+
17+
def _group_id(group: dict) -> str:
18+
value = group.get("id")
19+
return str(value) if value else "<unknown>"
20+
21+
22+
def _category_id(group: dict) -> Optional[str]:
23+
category = group.get("time-series-category")
24+
if isinstance(category, dict) and category.get("id"):
25+
return str(category["id"])
26+
return None
27+
28+
29+
def _assigned_timeseries(group: dict) -> list[dict]:
30+
assigned = group.get("assigned-time-series", [])
31+
if isinstance(assigned, list):
32+
return [item for item in assigned if isinstance(item, dict)]
33+
return []
34+
35+
36+
def _load_timeseries_data(
37+
source_cda: str,
38+
source_office: str,
39+
target_cda: str,
40+
target_api_key: Optional[str],
41+
verbose: int,
42+
dry_run: bool,
43+
ts_ids: Optional[list[str]] = None,
44+
ts_group: Optional[str] = None,
45+
ts_group_category_id: Optional[str] = None,
46+
ts_group_category_office_id: Optional[str] = None,
47+
begin: Optional[datetime] = None,
48+
end: Optional[datetime] = None,
49+
):
50+
import cwms
51+
52+
def copy_timeseries_for_office(
53+
current_ts_ids: list[str], current_office: str
54+
) -> None:
55+
ts_data = cwms.get_multi_timeseries_df(
56+
ts_ids=current_ts_ids,
57+
office_id=current_office,
58+
melted=True,
59+
begin=begin,
60+
end=end,
61+
)
62+
if dry_run:
63+
click.echo("Dry run enabled. No changes will be made.")
64+
logging.debug(
65+
f"Would store {ts_data} for {current_ts_ids}({current_office})"
66+
)
67+
return
68+
if ts_data.empty:
69+
click.echo(
70+
f"No data returned for timeseries ({', '.join(current_ts_ids)}) in office {current_office}."
71+
)
72+
return
73+
ts_data = ts_data.dropna(subset=["value"]).copy()
74+
if ts_data.empty:
75+
click.echo(
76+
f"No non-null values returned for timeseries ({', '.join(current_ts_ids)}) in office {current_office}."
77+
)
78+
return
79+
cwms.init_session(api_root=target_cda, api_key=target_api_key)
80+
cwms.store_multi_timeseries_df(
81+
data=ts_data,
82+
office_id=current_office,
83+
store_rule="REPLACE_ALL",
84+
override_protection=False,
85+
)
86+
87+
if verbose:
88+
click.echo(
89+
f"Loading timeseries data from source CDA '{source_cda}' (office '{source_office}') "
90+
f"to target CDA '{target_cda}'."
91+
)
92+
cwms.init_session(api_root=source_cda, api_key=None)
93+
ts_id_groups: list[tuple[str, list[str]]] = []
94+
95+
if ts_ids:
96+
ts_id_groups.append((source_office, ts_ids))
97+
98+
if ts_group:
99+
ts_group_data = cwms.get_timeseries_groups(
100+
office_id=source_office,
101+
include_assigned=True,
102+
timeseries_category_like=ts_group_category_id,
103+
timeseries_group_like=ts_group,
104+
category_office_id=ts_group_category_office_id,
105+
)
106+
groups = _extract_timeseries_groups(ts_group_data.json)
107+
if not groups:
108+
raise click.ClickException(
109+
f"No timeseries groups matched '{ts_group}' for office '{source_office}'."
110+
)
111+
112+
logging.info(
113+
f"Matched {len(groups)} timeseries group(s) for pattern {ts_group}."
114+
)
115+
logging.info(f"Storing TSID from begin: {begin} to end: {end}")
116+
117+
matched_groups: list[tuple[str, Optional[str], list[str]]] = []
118+
combined_ts_ids: list[str] = []
119+
120+
for group in groups:
121+
group_name = _group_id(group)
122+
category_name = _category_id(group)
123+
office_ts_ids = [
124+
str(item["timeseries-id"])
125+
for item in _assigned_timeseries(group)
126+
if item.get("office-id") == source_office and item.get("timeseries-id")
127+
]
128+
matched_groups.append((group_name, category_name, office_ts_ids))
129+
combined_ts_ids.extend(office_ts_ids)
130+
131+
click.echo(
132+
f"Matched {len(matched_groups)} timeseries group(s) for office '{source_office}':"
133+
)
134+
for group_name, category_name, office_ts_ids in matched_groups:
135+
category_text = f" (category: {category_name})" if category_name else ""
136+
click.echo(
137+
f" - {group_name}{category_text}: {len(office_ts_ids)} timeseries for office {source_office}"
138+
)
139+
140+
deduped_ts_ids = list(dict.fromkeys(combined_ts_ids))
141+
if not deduped_ts_ids:
142+
raise click.ClickException(
143+
f"No assigned timeseries in the matched group set belong to office '{source_office}'."
144+
)
145+
ts_id_groups.append((source_office, deduped_ts_ids))
146+
147+
for current_office, current_ts_ids in ts_id_groups:
148+
try:
149+
copy_timeseries_for_office(current_ts_ids, current_office)
150+
except Exception as e:
151+
click.echo(
152+
f"Error storing timeseries ({', '.join(current_ts_ids)}) data: {e}",
153+
err=True,
154+
)
155+
156+
if verbose:
157+
click.echo("Timeseries data copy operation completed.")

cwmscli/load/timeseries/timeseries_ids.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
# cwmscli/load/timeseries_ids.py
2-
import logging
3-
from turtle import pd
42
from typing import Optional
53

64
import click
@@ -30,8 +28,6 @@ def load_timeseries_ids(
3028
ts_ids = cwms.get_timeseries_identifiers(
3129
office_id=source_office, timeseries_id_regex=timeseries_id_regex
3230
).df
33-
34-
cwms.init_session(api_root=target_cda, api_key=target_api_key)
3531
# only grab time_ids for locations that are in the target database
3632
locations = cwms.get_locations_catalog(office_id=source_office)
3733
ts_ids[["location-id", "param", "type", "int", "dur", "ver"]] = ts_ids[
@@ -43,6 +39,7 @@ def load_timeseries_ids(
4339
if verbose:
4440
logger.info("Found %s timeseries IDs to copy.", len(ts_lo_ids))
4541

42+
cwms.init_session(api_root=target_cda, api_key=target_api_key)
4643
errors = 0
4744
for i, row in ts_lo_ids.iterrows():
4845
ts_id = row["time-series-id"]

cwmscli/utils/deps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def requires(*requirements):
4040
{
4141
"module": "cwms",
4242
"package": "cwms-python",
43-
"version": "1.0.3",
43+
"version": "1.0.7",
4444
"desc": "CWMS REST API Python client",
4545
"link": "https://github.com/hydrologicengineeringcenter/cwms-python"
4646
},

0 commit comments

Comments
 (0)