diff --git a/cwmscli/load/location/location_ids.py b/cwmscli/load/location/location_ids.py index 59ac9ff..e8f45b4 100644 --- a/cwmscli/load/location/location_ids.py +++ b/cwmscli/load/location/location_ids.py @@ -100,5 +100,5 @@ def load_locations( if errors: raise click.ClickException(f"Completed with {errors} error(s).") - if verbose: - logger.info("Done.") + + click.echo("Done.") diff --git a/cwmscli/load/timeseries/timeseries.py b/cwmscli/load/timeseries/timeseries.py index b6ede26..2c325dd 100644 --- a/cwmscli/load/timeseries/timeseries.py +++ b/cwmscli/load/timeseries/timeseries.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta from typing import Optional import click @@ -57,3 +58,93 @@ def load_timeseries_ids_all( timeseries_id_regex=timeseries_id_regex, dry_run=dry_run, ) + + +@timeseries.command( + "data", + help="Copy timeseries data (by timeseries ID or timeseries group) into a target CDA from a source CDA.", +) +@shared_source_target_options +@click.option( + "--ts-id", + "ts_id", + default=None, + type=str, + help="Timeseries ID to copy, or a comma-delimited list of IDs.", +) +@click.option( + "--ts-group", + "ts_group", + default=None, + type=str, + help="ID of the timeseries group to copy (e.g. 'GroupID.*').", +) +@click.option( + "--ts-group-category-id", + "ts_group_category_id", + default=None, + type=str, + help="Optional category filter when matching timeseries groups.", +) +@click.option( + "--ts-group-category-office-id", + "ts_group_category_office_id", + default=None, + type=str, + help="Optional category office filter when matching timeseries groups.", +) +@click.option( + "--begin", + "begin", + default=(datetime.now().astimezone() - timedelta(days=1)).replace(microsecond=0), + type=click.DateTime(formats=["%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S %Z"]), + help="Start date for timeseries data (e.g. '2022-01-01T00:00:00-06:00').", +) +@click.option( + "--end", + "end", + default=datetime.now().astimezone().replace(microsecond=0), + type=click.DateTime(formats=["%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S %Z"]), + help="End date for timeseries data (e.g. '2022-01-31T00:00:00-0600').", +) +@requires(reqs.cwms) +@validate_cda_targets +def load_timeseries_data( + source_cda: str, + source_office: str, + target_cda: str, + target_api_key: Optional[str], + verbose: int, + ts_id: Optional[str], + dry_run: bool, + ts_group: Optional[str], + ts_group_category_id: Optional[str], + ts_group_category_office_id: Optional[str], + begin: Optional[datetime] = None, + end: Optional[datetime] = None, +): + ts_ids = None + if ts_id: + ts_ids = [item.strip() for item in ts_id.split(",") if item.strip()] + if not ts_ids: + raise click.UsageError( + "--ts-id must contain at least one non-empty timeseries ID." + ) + if (ts_id is None) == (ts_group is None): + raise click.UsageError("Exactly one of --ts-id or --ts-group must be provided.") + from cwmscli.load.timeseries.timeseries_data import _load_timeseries_data + + _load_timeseries_data( + source_cda=source_cda, + source_office=source_office, + target_cda=target_cda, + target_api_key=target_api_key, + verbose=verbose, + dry_run=dry_run, + ts_ids=ts_ids, + ts_group=ts_group, + ts_group_category_id=ts_group_category_id, + ts_group_category_office_id=ts_group_category_office_id, + begin=begin, + end=end, + ) diff --git a/cwmscli/load/timeseries/timeseries_data.py b/cwmscli/load/timeseries/timeseries_data.py new file mode 100644 index 0000000..93f4e9a --- /dev/null +++ b/cwmscli/load/timeseries/timeseries_data.py @@ -0,0 +1,157 @@ +# cwmscli/load/timeseries/timeseries_data.py +import logging +from datetime import datetime +from typing import Optional + +import click + + +def _extract_timeseries_groups(payload) -> list[dict]: + if payload is None: + return [] + if isinstance(payload, list): + return [item for item in payload if isinstance(item, dict)] + return [] + + +def _group_id(group: dict) -> str: + value = group.get("id") + return str(value) if value else "" + + +def _category_id(group: dict) -> Optional[str]: + category = group.get("time-series-category") + if isinstance(category, dict) and category.get("id"): + return str(category["id"]) + return None + + +def _assigned_timeseries(group: dict) -> list[dict]: + assigned = group.get("assigned-time-series", []) + if isinstance(assigned, list): + return [item for item in assigned if isinstance(item, dict)] + return [] + + +def _load_timeseries_data( + source_cda: str, + source_office: str, + target_cda: str, + target_api_key: Optional[str], + verbose: int, + dry_run: bool, + ts_ids: Optional[list[str]] = None, + ts_group: Optional[str] = None, + ts_group_category_id: Optional[str] = None, + ts_group_category_office_id: Optional[str] = None, + begin: Optional[datetime] = None, + end: Optional[datetime] = None, +): + import cwms + + def copy_timeseries_for_office( + current_ts_ids: list[str], current_office: str + ) -> None: + ts_data = cwms.get_multi_timeseries_df( + ts_ids=current_ts_ids, + office_id=current_office, + melted=True, + begin=begin, + end=end, + ) + if dry_run: + click.echo("Dry run enabled. No changes will be made.") + logging.debug( + f"Would store {ts_data} for {current_ts_ids}({current_office})" + ) + return + if ts_data.empty: + click.echo( + f"No data returned for timeseries ({', '.join(current_ts_ids)}) in office {current_office}." + ) + return + ts_data = ts_data.dropna(subset=["value"]).copy() + if ts_data.empty: + click.echo( + f"No non-null values returned for timeseries ({', '.join(current_ts_ids)}) in office {current_office}." + ) + return + cwms.init_session(api_root=target_cda, api_key=target_api_key) + cwms.store_multi_timeseries_df( + data=ts_data, + office_id=current_office, + store_rule="REPLACE_ALL", + override_protection=False, + ) + + if verbose: + click.echo( + f"Loading timeseries data from source CDA '{source_cda}' (office '{source_office}') " + f"to target CDA '{target_cda}'." + ) + cwms.init_session(api_root=source_cda, api_key=None) + ts_id_groups: list[tuple[str, list[str]]] = [] + + if ts_ids: + ts_id_groups.append((source_office, ts_ids)) + + if ts_group: + ts_group_data = cwms.get_timeseries_groups( + office_id=source_office, + include_assigned=True, + timeseries_category_like=ts_group_category_id, + timeseries_group_like=ts_group, + category_office_id=ts_group_category_office_id, + ) + groups = _extract_timeseries_groups(ts_group_data.json) + if not groups: + raise click.ClickException( + f"No timeseries groups matched '{ts_group}' for office '{source_office}'." + ) + + logging.info( + f"Matched {len(groups)} timeseries group(s) for pattern {ts_group}." + ) + logging.info(f"Storing TSID from begin: {begin} to end: {end}") + + matched_groups: list[tuple[str, Optional[str], list[str]]] = [] + combined_ts_ids: list[str] = [] + + for group in groups: + group_name = _group_id(group) + category_name = _category_id(group) + office_ts_ids = [ + str(item["timeseries-id"]) + for item in _assigned_timeseries(group) + if item.get("office-id") == source_office and item.get("timeseries-id") + ] + matched_groups.append((group_name, category_name, office_ts_ids)) + combined_ts_ids.extend(office_ts_ids) + + click.echo( + f"Matched {len(matched_groups)} timeseries group(s) for office '{source_office}':" + ) + for group_name, category_name, office_ts_ids in matched_groups: + category_text = f" (category: {category_name})" if category_name else "" + click.echo( + f" - {group_name}{category_text}: {len(office_ts_ids)} timeseries for office {source_office}" + ) + + deduped_ts_ids = list(dict.fromkeys(combined_ts_ids)) + if not deduped_ts_ids: + raise click.ClickException( + f"No assigned timeseries in the matched group set belong to office '{source_office}'." + ) + ts_id_groups.append((source_office, deduped_ts_ids)) + + for current_office, current_ts_ids in ts_id_groups: + try: + copy_timeseries_for_office(current_ts_ids, current_office) + except Exception as e: + click.echo( + f"Error storing timeseries ({', '.join(current_ts_ids)}) data: {e}", + err=True, + ) + + if verbose: + click.echo("Timeseries data copy operation completed.") diff --git a/cwmscli/load/timeseries/timeseries_ids.py b/cwmscli/load/timeseries/timeseries_ids.py index 2b27b68..90fb355 100644 --- a/cwmscli/load/timeseries/timeseries_ids.py +++ b/cwmscli/load/timeseries/timeseries_ids.py @@ -1,6 +1,4 @@ # cwmscli/load/timeseries_ids.py -import logging -from turtle import pd from typing import Optional import click @@ -30,8 +28,6 @@ def load_timeseries_ids( ts_ids = cwms.get_timeseries_identifiers( office_id=source_office, timeseries_id_regex=timeseries_id_regex ).df - - cwms.init_session(api_root=target_cda, api_key=target_api_key) # only grab time_ids for locations that are in the target database locations = cwms.get_locations_catalog(office_id=source_office) ts_ids[["location-id", "param", "type", "int", "dur", "ver"]] = ts_ids[ @@ -43,6 +39,7 @@ def load_timeseries_ids( if verbose: logger.info("Found %s timeseries IDs to copy.", len(ts_lo_ids)) + cwms.init_session(api_root=target_cda, api_key=target_api_key) errors = 0 for i, row in ts_lo_ids.iterrows(): ts_id = row["time-series-id"] diff --git a/cwmscli/utils/deps.py b/cwmscli/utils/deps.py index 67850db..eb4c2b8 100644 --- a/cwmscli/utils/deps.py +++ b/cwmscli/utils/deps.py @@ -40,7 +40,7 @@ def requires(*requirements): { "module": "cwms", "package": "cwms-python", - "version": "1.0.3", + "version": "1.0.7", "desc": "CWMS REST API Python client", "link": "https://github.com/hydrologicengineeringcenter/cwms-python" }, diff --git a/tests/load/test_timeseries_data.py b/tests/load/test_timeseries_data.py new file mode 100644 index 0000000..21d194c --- /dev/null +++ b/tests/load/test_timeseries_data.py @@ -0,0 +1,187 @@ +from types import SimpleNamespace + +import pandas as pd +import pytest +from click import ClickException +from click.testing import CliRunner + +from cwmscli.__main__ import cli +from cwmscli.load.timeseries.timeseries_data import _load_timeseries_data + + +def test_load_timeseries_data_command_allows_group_without_category_filters( + monkeypatch, +): + calls = [] + + def fake_load(**kwargs): + calls.append(kwargs) + + monkeypatch.setattr( + "cwmscli.load.timeseries.timeseries_data._load_timeseries_data", + fake_load, + ) + + runner = CliRunner() + result = runner.invoke( + cli, + [ + "load", + "timeseries", + "data", + "--source-cda", + "https://cwms-data.usace.army.mil/cwms-data/", + "--source-office", + "MVP", + "--target-cda", + "http://localhost:8082/cwms-data/", + "--ts-group", + "Include.*", + "--dry-run", + ], + ) + + assert result.exit_code == 0, result.output + assert len(calls) == 1 + assert calls[0]["ts_group"] == "Include.*" + assert calls[0]["ts_group_category_id"] is None + assert calls[0]["ts_group_category_office_id"] is None + + +def test_load_timeseries_data_group_filters_to_single_source_office( + monkeypatch, capsys +): + captured = { + "get_timeseries_groups": [], + "get_multi_timeseries_df": [], + "store_multi_timeseries_df": [], + "init_session": [], + } + + def fake_init_session(api_root, api_key=None): + captured["init_session"].append((api_root, api_key)) + + def fake_get_timeseries_groups(**kwargs): + captured["get_timeseries_groups"].append(kwargs) + return SimpleNamespace( + json=[ + { + "id": "MVP Include", + "time-series-category": {"id": "MVP Dissemination"}, + "assigned-time-series": [ + {"office-id": "MVP", "timeseries-id": "A.Flow.Inst.1Hour.0"}, + {"office-id": "SWT", "timeseries-id": "B.Flow.Inst.1Hour.0"}, + ], + }, + { + "id": "MVP Include Secondary", + "time-series-category": {"id": "MVP Dissemination"}, + "assigned-time-series": [ + {"office-id": "MVP", "timeseries-id": "A.Flow.Inst.1Hour.0"}, + {"office-id": "MVP", "timeseries-id": "C.Stage.Inst.1Hour.0"}, + ], + }, + ] + ) + + def fake_get_multi_timeseries_df(**kwargs): + captured["get_multi_timeseries_df"].append(kwargs) + return pd.DataFrame( + [ + { + "date-time": "2024-01-01T00:00:00Z", + "timeseries-id": kwargs["ts_ids"][0], + "value": 1.0, + } + ] + ) + + def fake_store_multi_timeseries_df(**kwargs): + captured["store_multi_timeseries_df"].append(kwargs) + + fake_cwms = SimpleNamespace( + init_session=fake_init_session, + get_timeseries_groups=fake_get_timeseries_groups, + get_multi_timeseries_df=fake_get_multi_timeseries_df, + store_multi_timeseries_df=fake_store_multi_timeseries_df, + ) + monkeypatch.setitem(__import__("sys").modules, "cwms", fake_cwms) + + _load_timeseries_data( + source_cda="https://cwms-data.usace.army.mil/cwms-data/", + source_office="MVP", + target_cda="http://localhost:8082/cwms-data/", + target_api_key=None, + verbose=0, + dry_run=True, + ts_group="Include.*", + ts_group_category_id="MVP Dissemination", + ) + + assert captured["get_timeseries_groups"] == [ + { + "office_id": "MVP", + "include_assigned": True, + "timeseries_category_like": "MVP Dissemination", + "timeseries_group_like": "Include.*", + "category_office_id": None, + } + ] + assert captured["get_multi_timeseries_df"] == [ + { + "ts_ids": ["A.Flow.Inst.1Hour.0", "C.Stage.Inst.1Hour.0"], + "office_id": "MVP", + "melted": True, + "begin": None, + "end": None, + } + ] + assert captured["store_multi_timeseries_df"] == [] + + output = capsys.readouterr().out + assert "Matched 2 timeseries group(s) for office 'MVP'" in output + assert ( + "MVP Include (category: MVP Dissemination): 1 timeseries for office MVP" + in output + ) + assert ( + "MVP Include Secondary (category: MVP Dissemination): 2 timeseries for office MVP" + in output + ) + assert "SWT" not in str(captured["get_multi_timeseries_df"]) + + +def test_load_timeseries_data_group_raises_when_no_members_belong_to_source_office( + monkeypatch, +): + def fake_init_session(api_root, api_key=None): + return None + + def fake_get_timeseries_groups(**kwargs): + return SimpleNamespace( + json=[ + { + "id": "Cross Office", + "assigned-time-series": [ + {"office-id": "SWT", "timeseries-id": "B.Flow.Inst.1Hour.0"} + ], + } + ] + ) + + fake_cwms = SimpleNamespace( + init_session=fake_init_session, + get_timeseries_groups=fake_get_timeseries_groups, + ) + monkeypatch.setitem(__import__("sys").modules, "cwms", fake_cwms) + + with pytest.raises(ClickException, match="No assigned timeseries.*office 'MVP'"): + _load_timeseries_data( + source_cda="https://cwms-data.usace.army.mil/cwms-data/", + source_office="MVP", + target_cda="http://localhost:8082/cwms-data/", + target_api_key=None, + verbose=0, + dry_run=True, + ts_group="Include.*", + )