Skip to content

Commit ac64d4e

Browse files
EliEli
authored andcommitted
First cut rationalize_time_partitions.py that can take yaml spec.
Allows file to be used twice (A-B-A) when partitioning in splicable order. Allows argument to be a Path object, coerced immediately to string. Allow file updates based on new data on either end of series, not just during common period. CLI for reconcile. Should move this into main file -- it is a bit bulky. New cli for update repo and rationalize time partition. Added rationalize yaml file to config system. Added project info for new entry points.
1 parent a35d524 commit ac64d4e

File tree

10 files changed

+1207
-21
lines changed

10 files changed

+1207
-21
lines changed

dms_datastore/__main__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
from dms_datastore.merge_files import merge_files_cli
2626
from dms_datastore.dropbox_data import dropbox_cli
2727
from dms_datastore.coarsen_file import coarsen_ts_cli
28+
from dms_datastore.update_repo import update_repo
29+
from dms_datastore.update_flagged_data import update_flagged_data
30+
from dms_datastore.rationalize_time_partitions import rationalize_time_partitions_cli
2831

2932

3033
@click.group(help="DMS CLI tools for data processing and extraction.")
@@ -60,6 +63,9 @@ def cli():
6063
cli.add_command(data_cache_cli, "data_cache")
6164
cli.add_command(merge_files_cli, "merge_files")
6265
cli.add_command(coarsen_ts_cli, "coarsen")
66+
cli.add_command(update_flagged_data,"updated_flagged_data")
67+
cli.add_command(update_repo,"update_repo")
68+
cli.add_command(rationalize_time_partitions_cli, "rationalize_time_partitions")
6369

6470
if __name__ == "__main__":
6571
cli()

dms_datastore/_reconcile_cli.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# dms_datastore/cli/_reconcile_cli.py
2+
from __future__ import annotations
3+
4+
import csv
5+
from dataclasses import asdict, is_dataclass
6+
from pathlib import Path
7+
from typing import Iterable, Optional
8+
9+
10+
def echo_actions_text(actions: Iterable[object]) -> None:
11+
"""
12+
Print one action per line.
13+
14+
Assumes action objects have attributes:
15+
series_id, shard, action, reason, staged_path, repo_path
16+
"""
17+
actions = list(actions)
18+
if not actions:
19+
print("no actions (repo already up to date for inspected shards)")
20+
return
21+
for a in actions:
22+
series_id = getattr(a, "series_id", None)
23+
shard = getattr(a, "shard", None)
24+
action = getattr(a, "action", None)
25+
reason = getattr(a, "reason", None)
26+
staged = getattr(a, "staged_path", None)
27+
repo = getattr(a, "repo_path", None)
28+
29+
parts = [
30+
str(action),
31+
f"series_id={series_id}",
32+
f"shard={shard}",
33+
f"reason={reason}",
34+
]
35+
if staged:
36+
parts.append(f"staged={staged}")
37+
if repo:
38+
parts.append(f"repo={repo}")
39+
40+
print(" ".join(parts))
41+
42+
43+
def write_actions_csv(actions: Iterable[object], out_csv: str) -> None:
44+
"""
45+
Write actions to a CSV file.
46+
47+
Columns are stable and explicit to support downstream parsing and fixtures.
48+
"""
49+
out_path = Path(out_csv)
50+
out_path.parent.mkdir(parents=True, exist_ok=True)
51+
52+
fieldnames = ["series_id", "shard", "action", "reason", "staged_path", "repo_path"]
53+
54+
with out_path.open("w", newline="", encoding="utf-8") as f:
55+
w = csv.DictWriter(f, fieldnames=fieldnames)
56+
w.writeheader()
57+
for a in actions:
58+
if is_dataclass(a):
59+
row = asdict(a)
60+
else:
61+
row = {
62+
"series_id": getattr(a, "series_id", None),
63+
"shard": getattr(a, "shard", None),
64+
"action": getattr(a, "action", None),
65+
"reason": getattr(a, "reason", None),
66+
"staged_path": getattr(a, "staged_path", None),
67+
"repo_path": getattr(a, "repo_path", None),
68+
}
69+
70+
# enforce stable column ordering
71+
w.writerow({k: row.get(k, None) for k in fieldnames})
72+
73+
74+
def resolve_plan_flag(apply: bool, plan: bool) -> bool:
75+
"""
76+
Default is plan=True unless apply=True.
77+
78+
If the user explicitly sets --plan, it wins (and --apply should be absent).
79+
"""
80+
if plan:
81+
return True
82+
if apply:
83+
return False
84+
return True # default
85+
86+
87+
def maybe_fail_if_changes(actions: list[object], fail_if_changes: bool) -> None:
88+
"""
89+
Exit code convention:
90+
- 0: no actions (or apply mode)
91+
- 2: actions exist and fail_if_changes requested
92+
"""
93+
if fail_if_changes and len(actions) > 0:
94+
raise SystemExit(2)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# YAML-driven time-partition rationalization for instrument files.
2+
#
3+
# Goal:
4+
# For selected pools of files (matched by a glob pattern), enforce a canonical
5+
# non-overlapping partition of time by slicing specific "include" files into
6+
# left-closed/right-open windows [start_k, start_{k+1}) and deleting "omit" files.
7+
#
8+
# Notes:
9+
# - All matching is done by exact basename equality within the pool.
10+
# - Time windows are left-closed/right-open to avoid duplicates at boundaries.
11+
# - This mode is intended to run *before* general "superset deletes subset" logic.
12+
13+
rationalize:
14+
- pattern: "des_mrz@upper*_ec_*.csv"
15+
# pattern defines the pool: all files matched by this glob form one candidate set.
16+
# If this pattern matches, the rule applies; if it matches nothing, that's an error.
17+
18+
include:
19+
# include is ordered and defines the canonical partitioning plan.
20+
#
21+
# Each entry has:
22+
# fname: exact basename of a file in the pool (may include ${LAST})
23+
# start: ISO date (YYYY-MM-DD) or ${START}
24+
#
25+
# start values must be strictly increasing once resolved.
26+
27+
- fname: "des_mrz@upper_40_ec_1983_1999.csv"
28+
start: "${START}"
29+
# ${START} means "from the beginning of this file" (no lower time bound).
30+
# Window becomes: [-inf, next_start)
31+
32+
- fname: "des_mrz@upper_40_ec_2000_2007.csv"
33+
start: "2000-01-01"
34+
# Window becomes: [2000-01-01, next_start)
35+
36+
- fname: "des_mrz@upper_40_ec_2004_2019.csv"
37+
start: "2004-09-21"
38+
# Example of intentionally re-partitioning an overlapping file:
39+
# you keep only from 2006 onward, and exclude the boundary at next_start.
40+
41+
- fname: "des_mrz@upper_40_ec_2007_2009.csv"
42+
start: "2007-10-01"
43+
# Example of intentionally re-partitioning an overlapping file:
44+
# you keep only from 2006 onward, and exclude the boundary at next_start.
45+
46+
47+
- fname: "des_mrz@upper_40_ec_2004_2019.csv"
48+
start: "2008-08-20"
49+
# Example of a repeated file
50+
51+
52+
- fname: "des_mrz@upper_40_ec_2020_${LAST}.csv"
53+
start: "2020-01-01"
54+
# ${LAST} is resolved from the pool's max eyear (parsed from filenames),
55+
# substituted into fname, and must then match exactly one pool file.
56+
# Last window becomes: [2020-01-01, +inf)
57+
58+
omit:
59+
# omit is optional but, if present, acts as a completeness assertion:
60+
# include + omit must cover the entire pool (disjoint union).
61+
#
62+
# omit items are either explicit basenames or special tokens.
63+
64+
#- "des_mrz@upper_40_ec_2007_2009.csv"
65+
#- "${SUPERSEDED}"
66+
# ${SUPERSEDED} expands to all pool files that are strict subsets (by year span)
67+
# of another pool file using the existing inclusive-year superset rule:
68+
# B supersedes A if B.syear <= A.syear and B.eyear >= A.eyear
69+
#
70+
# IMPORTANT: If any file would end up in both include and omit after expansion,
71+
# that is an error. Be explicit instead.

dms_datastore/config_data/dstore_config.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ sublocations: station_subloc.csv
99
file_deletion_list: non_15_min_files_checked.txt
1010
compare_excepts_formatted: compare_excepts_formatted.txt
1111

12+
des_time_partition: des_rationalize_cfg.yaml
13+
14+
1215
repo: "//cnrastore-bdo/Modeling_Data/repo/continuous/screened"
1316
screened: "//cnrastore-bdo/Modeling_Data/repo/continuous/screened"
1417
processed: "//cnrastore-bdo/Modeling_Data/repo/continuous/processed"

0 commit comments

Comments
 (0)