Skip to content

Commit a1c79e0

Browse files
EliEli
authored andcommitted
Bug fixes and extended examples.
1 parent e227c86 commit a1c79e0

File tree

4 files changed

+254
-95
lines changed

4 files changed

+254
-95
lines changed

dms_datastore/auto_screen.py

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
import numpy as np
2424
import seaborn as sns
2525
from shapely.geometry import Point
26+
import logging
27+
from pathlib import Path
28+
logger = logging.getLogger(__name__)
29+
2630

2731
# Todo: left join regions to station dbase and make this a local rather than global
2832
region_checkers = {}
@@ -40,7 +44,7 @@ def screener(
4044
plot_dest=None, # directory or 'interactive' or None for no plots
4145
):
4246
"""Performs yaml-specified screening protocol on time series"""
43-
print("screener", station_id, subloc, param)
47+
logger.info(f"screening: station_id: {station_id}, subloc: {subloc}, param: {param}")
4448
# name = protocol['name']
4549
steps = protocol["steps"]
4650
full = None
@@ -49,7 +53,7 @@ def screener(
4953
for step in steps:
5054
method_name = step["method"]
5155
label = step["label"] if "label" in step else method_name
52-
print("Performing:", label)
56+
logger.debug("Performing step:", label)
5357
method = globals()[method_name]
5458

5559
args = step["args"]
@@ -60,9 +64,7 @@ def screener(
6064
args["subloc"] = subloc
6165
if args[key] == "param" and args[key] == "param":
6266
args["param"] = param
63-
print(station_id, subloc, param)
64-
print("step:")
65-
print(step)
67+
6668
if len(ts_process.columns) > 1:
6769
if "value" in ts_process.columns:
6870
ts_process = ts_process.value.to_frame()
@@ -84,22 +86,22 @@ def screener(
8486
full.columns = [label]
8587
else:
8688
full[label] = anomaly
87-
print("step complete")
89+
logger.debug("step complete")
8890

8991
if do_plot:
90-
print("plotting")
92+
logger.debug("plotting")
9193
plot_anomalies(
9294
ts_process, full, plot_label, gap_fill_final=3, plot_dest=plot_dest
9395
)
94-
print("plotting complete")
96+
logger.debug("plotting complete")
9597
# This uses nullable integer so we can leave blank
9698
ts["user_flag"] = full.any(axis=1).astype(pd.Int64Dtype()) # This is nullable
9799
ts["user_flag"] = ts["user_flag"].mask(ts["user_flag"] == 0, other=pd.NA)
98100
if return_anomaly:
99101
return ts, full
100102
else:
101103
return ts
102-
print("time series screen complete")
104+
logger.debug("time series screen complete")
103105

104106

105107
def plot_anomalies(
@@ -127,7 +129,7 @@ def plot_anomalies(
127129
s=20 + 20 * ((nstep - i) / 2),
128130
)
129131
except:
130-
print(f"Problem time series: {subts}")
132+
logger.debug(f"Problem time series: {subts}")
131133
raise ValueError("Time series could not be plotted")
132134
ax0.legend()
133135
if plot_label is None:
@@ -154,7 +156,7 @@ def filter_inventory_(inventory, stations, params):
154156
if params is not None:
155157
if isinstance(params, str):
156158
params = [params]
157-
print(f"params: {params}")
159+
logger.debug(f"params: {params}")
158160
inventory = inventory.loc[
159161
inventory.index.get_level_values("param").isin(params), :
160162
]
@@ -222,7 +224,7 @@ def auto_screen(
222224
if subloc is None:
223225
subloc = "default"
224226
if np.random.uniform() < 0.0: # 0.95:
225-
print(f"Randomly rejecting: {station_id} {subloc} {param}")
227+
logger.debug(f"Randomly rejecting: {station_id} {subloc} {param}")
226228
continue
227229
filename = str(row.filename)
228230
station_info = station_db.loc[station_id, :]
@@ -235,18 +237,18 @@ def auto_screen(
235237
fetcher = custom_fetcher(agency)
236238
# these may be lists
237239
try:
238-
# print(f"fetching {fpath},{station_id},{param}")
240+
# logger.debug(f"fetching {fpath},{station_id},{param}")
239241
meta_ts = fetcher(fpath, station_id, param, subloc=subloc)
240242
except:
241-
print("Read failed for ", fpath, station_id, param, subloc)
243+
logger.warning("Read failed for ", fpath, station_id, param, subloc)
242244
meta_ts = None
243245

244246
if meta_ts is None:
245-
print(f"No data found for {station_id} {subloc} {param}")
247+
logger.debug(f"No data found for {station_id} {subloc} {param}")
246248
failed_read.append((station_id, subloc, param))
247-
print("Cumulative fails:")
249+
logger.debug("Cumulative fails:")
248250
for fr in failed_read:
249-
print(fr)
251+
logger.debug(fr)
250252
continue
251253
metas, ts = meta_ts
252254
meta = metas[0]
@@ -279,9 +281,9 @@ def auto_screen(
279281
else:
280282
output_fname = f"{agency}_{station_id}_{row.agency_id}_{param}.csv"
281283
output_fpath = os.path.join(dest, output_fname)
282-
print("start write")
284+
logger.debug("start write")
283285
write_ts_csv(screened, output_fpath, meta, chunk_years=True)
284-
print("end write")
286+
logger.debug("end write")
285287

286288

287289
def update_steps(proto, x):
@@ -301,7 +303,7 @@ def update_steps(proto, x):
301303
continue # don't include
302304
newsteps.append(step)
303305
if len(omissions) > 0:
304-
print(
306+
logger.debug(
305307
"Omissions listed but not found in inherited specification: ", omissions
306308
)
307309
finalsteps = []
@@ -325,7 +327,7 @@ def load_config(config_file):
325327
try:
326328
screen_config = yaml.safe_load(stream)
327329
except yaml.YAMLError as exc:
328-
print(exc)
330+
logger.debug(exc)
329331
raise
330332
return screen_config
331333

@@ -352,13 +354,13 @@ def context_config(screen_config, station_id, subloc, param):
352354
station_info = station_dbase()
353355

354356
region_file = screen_config["regions"]["region_file"]
355-
print("Region file: ", region_file)
357+
logger.debug("Region file: ", region_file)
356358

357359
if not (os.path.exists(region_file)):
358360
region_file = os.path.join(screen_config["config_dir"], region_file)
359361

360362
# Search for applicable region
361-
print("station_id: ", station_id, " subloc: ", subloc, " param: ", param)
363+
logger.debug("station_id: ", station_id, " subloc: ", subloc, " param: ", param)
362364
x = station_info.loc[station_id, "x"]
363365
y = station_info.loc[station_id, "y"]
364366
region = spatial_config(region_file, x, y)
@@ -382,20 +384,20 @@ def context_config(screen_config, station_id, subloc, param):
382384
region_config = config["regions"][region_name]
383385
if param in region_config["params"]:
384386
update_region = region_config["params"][param]
385-
print("region var\n", proto, "\n", update_region)
387+
logger.debug("region var\n", proto, "\n", update_region)
386388
proto = update_steps(proto, update_region)
387-
print("region var 2\n", proto, "\nafter\n", update_region)
389+
logger.debug("region var 2\n", proto, "\nafter\n", update_region)
388390

389391
# first priority: match station and variable
390392
station_config = None
391393
if station_id in config["stations"]:
392-
print("Found station")
394+
logger.debug("Found station")
393395
station_config = config["stations"][station_id]
394396
if param in station_config["params"]:
395397
update_station = station_config["params"][param]
396-
print("station var\n", proto, "\nthen\n", update_station)
398+
logger.debug("station var\n", proto, "\nthen\n", update_station)
397399
proto = update_steps(proto, update_station)
398-
print("station var 2\n", proto, "\nafter\n", update_station)
400+
logger.debug("station var 2\n", proto, "\nafter\n", update_station)
399401

400402
return proto
401403

@@ -559,8 +561,25 @@ def test_single(fname): # not maintained
559561
default=None,
560562
help="Station id for starting or restarting the screening process.",
561563
)
562-
def auto_screen_cli(config, fpath, dest, stations, params, plot_dest, start_station):
564+
@click.option("--logdir", type=click.Path(path_type=Path), default="logs")
565+
@click.option("--debug", is_flag=True)
566+
@click.option("--quiet", is_flag=True)
567+
@click.help_option("-h", "--help")
568+
def auto_screen_cli(config, fpath, dest, stations, params, plot_dest, start_station,
569+
logdir=None, debug=False, quiet=False):
563570
"""Auto-screen individual files or whole repos."""
571+
level, console = resolve_loglevel(
572+
debug=debug,
573+
quiet=quiet,
574+
)
575+
configure_logging(
576+
package_name="dms_datastore",
577+
level=level,
578+
console=console,
579+
logdir=logdir,
580+
logfile_prefix="auto_screen"
581+
)
582+
564583
repo = fpath
565584
stations_list = list(stations) if stations else None
566585
params_list = list(params) if params else None

dms_datastore/reconcile_data.py

Lines changed: 14 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
import numpy as np
4545
import pandas as pd
46-
46+
from vtools import ts_merge
4747
from dms_datastore.inventory import to_wildcard
4848
from dms_datastore.read_ts import original_header
4949

@@ -408,8 +408,8 @@ def _write_preserving_header(
408408
# -----------------------------
409409

410410

411-
def _list_csv_files(d: str) -> List[str]:
412-
return sorted(glob(os.path.join(d, "*.csv")))
411+
def _list_csv_files(d: str, pattern: str = "*.csv") -> List[str]:
412+
return sorted(glob(os.path.join(d, pattern)))
413413

414414

415415
def _index_by_series_and_shard(
@@ -443,60 +443,6 @@ def _index_by_series_and_shard(
443443
return out
444444

445445

446-
# -----------------------------
447-
# Core merge logic (values)
448-
# -----------------------------
449-
450-
451-
def _merge_prefer(
452-
repo: pd.DataFrame,
453-
staged: pd.DataFrame,
454-
*,
455-
prefer: str,
456-
) -> pd.DataFrame:
457-
""" Merge two DataFrames by time index, selecting overlap by preference.
458-
Parameters
459-
----------
460-
repo, staged : pandas.DataFrame
461-
DataFrames representing the existing repo data and incoming staged data.
462-
Columns must match exactly.
463-
prefer : {'staged', 'repo'}
464-
Which side's values win on timestamps present in both.
465-
466-
Returns
467-
-------
468-
merged : pandas.DataFrame
469-
DataFrame on the union index with preferred values applied.
470-
471-
Raises
472-
------
473-
ValueError
474-
If columns do not match or ``prefer`` is invalid.
475-
476-
Notes
477-
-----
478-
This is the generic value reconciliation used for formatted/processed tiers
479-
during splice operations.
480-
"""
481-
482-
if list(repo.columns) != list(staged.columns):
483-
raise ValueError(
484-
f"Column mismatch: repo={list(repo.columns)} staged={list(staged.columns)}"
485-
)
486-
idx = repo.index.union(staged.index)
487-
r = repo.reindex(idx)
488-
s = staged.reindex(idx)
489-
if prefer == "staged":
490-
out = r.copy()
491-
out.loc[s.index, :] = out.loc[s.index, :].where(s.isna(), s)
492-
return out
493-
if prefer == "repo":
494-
out = s.copy()
495-
out.loc[r.index, :] = out.loc[r.index, :].where(r.isna(), r)
496-
return out
497-
raise ValueError("prefer must be 'staged' or 'repo'")
498-
499-
500446
# -----------------------------
501447
# Flag merge logic (screened)
502448
# -----------------------------
@@ -703,6 +649,7 @@ def update_repo(
703649
staged_dir: str,
704650
repo_dir: str,
705651
*,
652+
pattern: str = "*.csv",
706653
prefer: str = "staged",
707654
remove_source: bool = False,
708655
now: Optional[pd.Timestamp] = None,
@@ -774,9 +721,11 @@ def update_repo(
774721
if now is None:
775722
now = pd.Timestamp.now()
776723
this_year = int(now.year)
724+
if prefer not in ["repo", "staged"]:
725+
raise ValueError("prefer must be 'repo' or 'staged'")
777726

778-
staged_files = _list_csv_files(staged_dir)
779-
repo_files = _list_csv_files(repo_dir)
727+
staged_files = _list_csv_files(staged_dir, pattern=pattern)
728+
repo_files = _list_csv_files(repo_dir, pattern="*") # match all to detect deletions
780729
staged_map = _index_by_series_and_shard(staged_files, remove_source=remove_source)
781730
repo_map = _index_by_series_and_shard(repo_files, remove_source=remove_source)
782731

@@ -902,7 +851,12 @@ def update_repo(
902851
head = original_header(a.repo_path)
903852
sdf = _read_csv_timeseries(a.staged_path)
904853
rdf = _read_csv_timeseries(a.repo_path)
905-
merged = _merge_prefer(rdf, sdf, prefer=prefer)
854+
if prefer == "repo":
855+
merged = ts_merge([rdf, sdf], strict_priority=True)
856+
elif prefer == "staged":
857+
merged = ts_merge([sdf, rdf], strict_priority=True)
858+
else:
859+
raise ValueError("prefer must be 'repo' or 'staged'")
906860
_write_preserving_header(df=merged, dest_path=a.repo_path, header_text=head)
907861
else:
908862
raise ValueError(f"Unknown action {a.action}")

0 commit comments

Comments
 (0)