Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cwmscli/load/location/location_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,5 @@ def load_locations(

if errors:
raise click.ClickException(f"Completed with {errors} error(s).")
if verbose:
logger.info("Done.")

click.echo("Done.")
91 changes: 91 additions & 0 deletions cwmscli/load/timeseries/timeseries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta
from typing import Optional

import click
Expand Down Expand Up @@ -57,3 +58,93 @@ def load_timeseries_ids_all(
timeseries_id_regex=timeseries_id_regex,
dry_run=dry_run,
)


@timeseries.command(
"data",
help="Copy timeseries data (by timeseries ID or timeseries group) into a target CDA from a source CDA.",
)
@shared_source_target_options
@click.option(
"--ts-id",
"ts_id",
default=None,
type=str,
help="Timeseries ID to copy, or a comma-delimited list of IDs.",
)
@click.option(
"--ts-group",
"ts_group",
default=None,
type=str,
help="ID of the timeseries group to copy (e.g. 'GroupID.*').",
)
@click.option(
"--ts-group-category-id",
"ts_group_category_id",
default=None,
type=str,
help="Optional category filter when matching timeseries groups.",
)
@click.option(
"--ts-group-category-office-id",
"ts_group_category_office_id",
default=None,
type=str,
help="Optional category office filter when matching timeseries groups.",
)
@click.option(
"--begin",
"begin",
default=(datetime.now().astimezone() - timedelta(days=1)).replace(microsecond=0),
type=click.DateTime(formats=["%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S %Z"]),
help="Start date for timeseries data (e.g. '2022-01-01T00:00:00-06:00').",
)
@click.option(
"--end",
"end",
default=datetime.now().astimezone().replace(microsecond=0),
type=click.DateTime(formats=["%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S %Z"]),
help="End date for timeseries data (e.g. '2022-01-31T00:00:00-0600').",
)
@requires(reqs.cwms)
@validate_cda_targets
def load_timeseries_data(
source_cda: str,
source_office: str,
target_cda: str,
target_api_key: Optional[str],
verbose: int,
ts_id: Optional[str],
dry_run: bool,
ts_group: Optional[str],
ts_group_category_id: Optional[str],
ts_group_category_office_id: Optional[str],
begin: Optional[datetime] = None,
end: Optional[datetime] = None,
):
ts_ids = None
if ts_id:
ts_ids = [item.strip() for item in ts_id.split(",") if item.strip()]
if not ts_ids:
raise click.UsageError(
"--ts-id must contain at least one non-empty timeseries ID."
)
if (ts_id is None) == (ts_group is None):
raise click.UsageError("Exactly one of --ts-id or --ts-group must be provided.")
from cwmscli.load.timeseries.timeseries_data import _load_timeseries_data

_load_timeseries_data(
source_cda=source_cda,
source_office=source_office,
target_cda=target_cda,
target_api_key=target_api_key,
verbose=verbose,
dry_run=dry_run,
ts_ids=ts_ids,
ts_group=ts_group,
ts_group_category_id=ts_group_category_id,
ts_group_category_office_id=ts_group_category_office_id,
begin=begin,
end=end,
)
157 changes: 157 additions & 0 deletions cwmscli/load/timeseries/timeseries_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# cwmscli/load/timeseries/timeseries_data.py
import logging
from datetime import datetime
from typing import Optional

import click


def _extract_timeseries_groups(payload) -> list[dict]:
if payload is None:
return []
if isinstance(payload, list):
return [item for item in payload if isinstance(item, dict)]
return []


def _group_id(group: dict) -> str:
value = group.get("id")
return str(value) if value else "<unknown>"


def _category_id(group: dict) -> Optional[str]:
category = group.get("time-series-category")
if isinstance(category, dict) and category.get("id"):
return str(category["id"])
return None


def _assigned_timeseries(group: dict) -> list[dict]:
assigned = group.get("assigned-time-series", [])
if isinstance(assigned, list):
return [item for item in assigned if isinstance(item, dict)]
return []


def _load_timeseries_data(
source_cda: str,
source_office: str,
target_cda: str,
target_api_key: Optional[str],
verbose: int,
dry_run: bool,
ts_ids: Optional[list[str]] = None,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we should use the get_timeseries_groups function. That can use time_series_group_like and then that is all you have to input it doesn't need the category ID and category office those could be optional. The user should also be required to enter the office_id and only be able to get data from a single office. I don't think this should be used to copy data from multiple offices at one time. For example someone should not use it to copy the DMZ INCLUDE list and copy data from all offices into the database. A) it would take to long and B) offices should control their own data.

we might need a check of hey this pulls up multiple time series groups show then and tell them to enter the info for the correct one. or maybe allow multiple.... and only run on the ones that provide timeseries IDs for their office. Maybe in the dry run list the groups that will be copied.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ts_group: Optional[str] = None,
ts_group_category_id: Optional[str] = None,
ts_group_category_office_id: Optional[str] = None,
begin: Optional[datetime] = None,
end: Optional[datetime] = None,
):
import cwms

def copy_timeseries_for_office(
current_ts_ids: list[str], current_office: str
) -> None:
ts_data = cwms.get_multi_timeseries_df(
ts_ids=current_ts_ids,
office_id=current_office,
melted=True,
begin=begin,
end=end,
)
if dry_run:
click.echo("Dry run enabled. No changes will be made.")
logging.debug(
f"Would store {ts_data} for {current_ts_ids}({current_office})"
)
return
if ts_data.empty:
click.echo(
f"No data returned for timeseries ({', '.join(current_ts_ids)}) in office {current_office}."
)
return
ts_data = ts_data.dropna(subset=["value"]).copy()
if ts_data.empty:
click.echo(
f"No non-null values returned for timeseries ({', '.join(current_ts_ids)}) in office {current_office}."
)
return
cwms.init_session(api_root=target_cda, api_key=target_api_key)
cwms.store_multi_timeseries_df(
data=ts_data,
office_id=current_office,
store_rule="REPLACE_ALL",
override_protection=False,
)

if verbose:
click.echo(
f"Loading timeseries data from source CDA '{source_cda}' (office '{source_office}') "
f"to target CDA '{target_cda}'."
)
cwms.init_session(api_root=source_cda, api_key=None)
ts_id_groups: list[tuple[str, list[str]]] = []

if ts_ids:
ts_id_groups.append((source_office, ts_ids))

if ts_group:
ts_group_data = cwms.get_timeseries_groups(
office_id=source_office,
include_assigned=True,
timeseries_category_like=ts_group_category_id,
timeseries_group_like=ts_group,
category_office_id=ts_group_category_office_id,
)
groups = _extract_timeseries_groups(ts_group_data.json)
if not groups:
raise click.ClickException(
f"No timeseries groups matched '{ts_group}' for office '{source_office}'."
)

logging.info(
f"Matched {len(groups)} timeseries group(s) for pattern {ts_group}."
)
logging.info(f"Storing TSID from begin: {begin} to end: {end}")

matched_groups: list[tuple[str, Optional[str], list[str]]] = []
combined_ts_ids: list[str] = []

for group in groups:
group_name = _group_id(group)
category_name = _category_id(group)
office_ts_ids = [
str(item["timeseries-id"])
for item in _assigned_timeseries(group)
if item.get("office-id") == source_office and item.get("timeseries-id")
]
matched_groups.append((group_name, category_name, office_ts_ids))
combined_ts_ids.extend(office_ts_ids)

click.echo(
f"Matched {len(matched_groups)} timeseries group(s) for office '{source_office}':"
)
for group_name, category_name, office_ts_ids in matched_groups:
category_text = f" (category: {category_name})" if category_name else ""
click.echo(
f" - {group_name}{category_text}: {len(office_ts_ids)} timeseries for office {source_office}"
)

deduped_ts_ids = list(dict.fromkeys(combined_ts_ids))
if not deduped_ts_ids:
raise click.ClickException(
f"No assigned timeseries in the matched group set belong to office '{source_office}'."
)
ts_id_groups.append((source_office, deduped_ts_ids))

for current_office, current_ts_ids in ts_id_groups:
try:
copy_timeseries_for_office(current_ts_ids, current_office)
except Exception as e:
click.echo(
f"Error storing timeseries ({', '.join(current_ts_ids)}) data: {e}",
err=True,
)

if verbose:
click.echo("Timeseries data copy operation completed.")
5 changes: 1 addition & 4 deletions cwmscli/load/timeseries/timeseries_ids.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
# cwmscli/load/timeseries_ids.py
import logging
from turtle import pd
from typing import Optional

import click
Expand Down Expand Up @@ -30,8 +28,6 @@ def load_timeseries_ids(
ts_ids = cwms.get_timeseries_identifiers(
office_id=source_office, timeseries_id_regex=timeseries_id_regex
).df

cwms.init_session(api_root=target_cda, api_key=target_api_key)
# only grab time_ids for locations that are in the target database
locations = cwms.get_locations_catalog(office_id=source_office)
ts_ids[["location-id", "param", "type", "int", "dur", "ver"]] = ts_ids[
Expand All @@ -43,6 +39,7 @@ def load_timeseries_ids(
if verbose:
logger.info("Found %s timeseries IDs to copy.", len(ts_lo_ids))

cwms.init_session(api_root=target_cda, api_key=target_api_key)
errors = 0
for i, row in ts_lo_ids.iterrows():
ts_id = row["time-series-id"]
Expand Down
2 changes: 1 addition & 1 deletion cwmscli/utils/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def requires(*requirements):
{
"module": "cwms",
"package": "cwms-python",
"version": "1.0.3",
"version": "1.0.7",
"desc": "CWMS REST API Python client",
"link": "https://github.com/hydrologicengineeringcenter/cwms-python"
},
Comment on lines 40 to 46
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions bumping the required cwms-python version, but this change only updates the example in the requires() docstring. The actual minimum version enforced by the CLI still comes from cwmscli/requirements.py (currently 0.8.0), and pyproject.toml also allows older versions. If cwms-python>=1.0.1 is truly required, update the real requirement source(s) (and keep this example in sync).

Copilot uses AI. Check for mistakes.
Expand Down
Loading
Loading