-
Notifications
You must be signed in to change notification settings - Fork 2
feat: Timeseries Loader #137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e402aec
b7d0a8d
6c198fb
cd2efc1
4fb960f
cead361
8410b4c
3e555d7
02e10f2
9d46619
e6c667b
ef3aa84
3c09697
6b16efb
3fdeb5f
aeff8f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,157 @@ | ||
| # cwmscli/load/timeseries/timeseries_data.py | ||
| import logging | ||
| from datetime import datetime | ||
| from typing import Optional | ||
|
|
||
| import click | ||
|
|
||
|
|
||
| def _extract_timeseries_groups(payload) -> list[dict]: | ||
| if payload is None: | ||
| return [] | ||
| if isinstance(payload, list): | ||
| return [item for item in payload if isinstance(item, dict)] | ||
| return [] | ||
|
|
||
|
|
||
| def _group_id(group: dict) -> str: | ||
| value = group.get("id") | ||
| return str(value) if value else "<unknown>" | ||
|
|
||
|
|
||
| def _category_id(group: dict) -> Optional[str]: | ||
| category = group.get("time-series-category") | ||
| if isinstance(category, dict) and category.get("id"): | ||
| return str(category["id"]) | ||
| return None | ||
|
|
||
|
|
||
| def _assigned_timeseries(group: dict) -> list[dict]: | ||
| assigned = group.get("assigned-time-series", []) | ||
| if isinstance(assigned, list): | ||
| return [item for item in assigned if isinstance(item, dict)] | ||
| return [] | ||
|
|
||
|
|
||
| def _load_timeseries_data( | ||
| source_cda: str, | ||
| source_office: str, | ||
| target_cda: str, | ||
| target_api_key: Optional[str], | ||
| verbose: int, | ||
| dry_run: bool, | ||
| ts_ids: Optional[list[str]] = None, | ||
| ts_group: Optional[str] = None, | ||
| ts_group_category_id: Optional[str] = None, | ||
| ts_group_category_office_id: Optional[str] = None, | ||
| begin: Optional[datetime] = None, | ||
| end: Optional[datetime] = None, | ||
| ): | ||
| import cwms | ||
|
|
||
| def copy_timeseries_for_office( | ||
| current_ts_ids: list[str], current_office: str | ||
| ) -> None: | ||
| ts_data = cwms.get_multi_timeseries_df( | ||
| ts_ids=current_ts_ids, | ||
| office_id=current_office, | ||
| melted=True, | ||
| begin=begin, | ||
| end=end, | ||
| ) | ||
| if dry_run: | ||
| click.echo("Dry run enabled. No changes will be made.") | ||
| logging.debug( | ||
| f"Would store {ts_data} for {current_ts_ids}({current_office})" | ||
| ) | ||
| return | ||
| if ts_data.empty: | ||
| click.echo( | ||
| f"No data returned for timeseries ({', '.join(current_ts_ids)}) in office {current_office}." | ||
| ) | ||
| return | ||
| ts_data = ts_data.dropna(subset=["value"]).copy() | ||
| if ts_data.empty: | ||
| click.echo( | ||
| f"No non-null values returned for timeseries ({', '.join(current_ts_ids)}) in office {current_office}." | ||
| ) | ||
| return | ||
| cwms.init_session(api_root=target_cda, api_key=target_api_key) | ||
| cwms.store_multi_timeseries_df( | ||
| data=ts_data, | ||
| office_id=current_office, | ||
| store_rule="REPLACE_ALL", | ||
| override_protection=False, | ||
| ) | ||
|
|
||
| if verbose: | ||
| click.echo( | ||
| f"Loading timeseries data from source CDA '{source_cda}' (office '{source_office}') " | ||
| f"to target CDA '{target_cda}'." | ||
| ) | ||
| cwms.init_session(api_root=source_cda, api_key=None) | ||
| ts_id_groups: list[tuple[str, list[str]]] = [] | ||
|
|
||
| if ts_ids: | ||
| ts_id_groups.append((source_office, ts_ids)) | ||
|
|
||
| if ts_group: | ||
| ts_group_data = cwms.get_timeseries_groups( | ||
| office_id=source_office, | ||
| include_assigned=True, | ||
| timeseries_category_like=ts_group_category_id, | ||
| timeseries_group_like=ts_group, | ||
| category_office_id=ts_group_category_office_id, | ||
| ) | ||
| groups = _extract_timeseries_groups(ts_group_data.json) | ||
| if not groups: | ||
| raise click.ClickException( | ||
| f"No timeseries groups matched '{ts_group}' for office '{source_office}'." | ||
| ) | ||
|
|
||
| logging.info( | ||
| f"Matched {len(groups)} timeseries group(s) for pattern {ts_group}." | ||
| ) | ||
| logging.info(f"Storing TSID from begin: {begin} to end: {end}") | ||
|
|
||
| matched_groups: list[tuple[str, Optional[str], list[str]]] = [] | ||
| combined_ts_ids: list[str] = [] | ||
|
|
||
| for group in groups: | ||
| group_name = _group_id(group) | ||
| category_name = _category_id(group) | ||
| office_ts_ids = [ | ||
| str(item["timeseries-id"]) | ||
| for item in _assigned_timeseries(group) | ||
| if item.get("office-id") == source_office and item.get("timeseries-id") | ||
| ] | ||
| matched_groups.append((group_name, category_name, office_ts_ids)) | ||
| combined_ts_ids.extend(office_ts_ids) | ||
|
|
||
| click.echo( | ||
| f"Matched {len(matched_groups)} timeseries group(s) for office '{source_office}':" | ||
| ) | ||
| for group_name, category_name, office_ts_ids in matched_groups: | ||
| category_text = f" (category: {category_name})" if category_name else "" | ||
| click.echo( | ||
| f" - {group_name}{category_text}: {len(office_ts_ids)} timeseries for office {source_office}" | ||
| ) | ||
|
|
||
| deduped_ts_ids = list(dict.fromkeys(combined_ts_ids)) | ||
| if not deduped_ts_ids: | ||
| raise click.ClickException( | ||
| f"No assigned timeseries in the matched group set belong to office '{source_office}'." | ||
| ) | ||
| ts_id_groups.append((source_office, deduped_ts_ids)) | ||
|
|
||
| for current_office, current_ts_ids in ts_id_groups: | ||
| try: | ||
| copy_timeseries_for_office(current_ts_ids, current_office) | ||
| except Exception as e: | ||
| click.echo( | ||
| f"Error storing timeseries ({', '.join(current_ts_ids)}) data: {e}", | ||
| err=True, | ||
| ) | ||
|
|
||
| if verbose: | ||
| click.echo("Timeseries data copy operation completed.") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,7 +40,7 @@ def requires(*requirements): | |
| { | ||
| "module": "cwms", | ||
| "package": "cwms-python", | ||
| "version": "1.0.3", | ||
| "version": "1.0.7", | ||
| "desc": "CWMS REST API Python client", | ||
| "link": "https://github.com/hydrologicengineeringcenter/cwms-python" | ||
| }, | ||
|
Comment on lines
40
to
46
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe we should use the get_timeseries_groups function. That can use time_series_group_like and then that is all you have to input it doesn't need the category ID and category office those could be optional. The user should also be required to enter the office_id and only be able to get data from a single office. I don't think this should be used to copy data from multiple offices at one time. For example someone should not use it to copy the DMZ INCLUDE list and copy data from all offices into the database. A) it would take to long and B) offices should control their own data.
we might need a check of hey this pulls up multiple time series groups show then and tell them to enter the info for the correct one. or maybe allow multiple.... and only run on the ones that provide timeseries IDs for their office. Maybe in the dry run list the groups that will be copied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a timeseries group to test:
https://DEV/cwms-data/timeseries/group/Web?office=MVP&category-id=MVP%20Dissemination&_cb=1774300401397