Skip to content

Commit 094ca18

Browse files
EliEli
authored andcommitted
Distinguish better between agency_id and lookup id
Distinguish between source site id and agency_id for cdec Eliminate hard-wired conception of series_id based on stations database
1 parent 5a212da commit 094ca18

9 files changed

Lines changed: 217 additions & 183 deletions

dms_datastore/config_data/dstore_config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ repos:
7979

8080
# Data identity: uniquely identifies a logical time series independent of provider
8181
# Multiple files (different sources) may map to the same data_key
82-
data_key: [station_id, subloc, param, shard]
82+
data_key: [station_id, subloc, param]
8383

8484

8585
screened:
@@ -109,7 +109,7 @@ repos:
109109
- "{processor}_{station_id@subloc}_{param}_{year}.csv"
110110
- "{processor}_{station_id@subloc}_{param}.csv"
111111
file_key: [processor, station_id, subloc, param, modifier, shard]
112-
data_key: [station_id, subloc, param, modifier, shard]
112+
data_key: [station_id, subloc, param, modifier]
113113

114114
structures:
115115
root: "//cnrastore-bdo/Modeling_Data/repo/structures"

dms_datastore/download_cdec.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import zipfile
1313
import os
1414
import string
15+
from pathlib import Path
1516
import datetime as dt
1617
import time
1718
import numpy as np
@@ -39,7 +40,7 @@ def download_station_data(
3940
):
4041
station = row.station_id
4142
try:
42-
cdec_id = row.cdec_id.lower()
43+
cdec_id = row.src_site_id.upper()
4344
except Exception:
4445
cdec_id = station
4546

@@ -266,7 +267,8 @@ def download_cdec(
266267
df = attach_agency_id(
267268
df,
268269
repo_name="formatted",
269-
agency_id_col="cdec_id",
270+
agency_id_col="agency_id",
271+
src_site_id_col="cdec_id",
270272
on_missing="drop",
271273
)
272274
vlookup = dstore_config.config_file("variable_mappings")
@@ -329,6 +331,9 @@ def download_cdec(
329331
help="Frequency code(s): E (event), H (hourly), D (daily), M (monthly). "
330332
"Default is E,H. Multiple values allowed as comma-separated list (e.g., E,H,D)."
331333
)
334+
@click.option("--logdir", type=click.Path(path_type=Path), default=None)
335+
@click.option("--debug", is_flag=True)
336+
@click.option("--quiet", is_flag=True)
332337
@click.argument("stationfile", nargs=-1)
333338
def download_cdec_cli(
334339
dest_dir,
@@ -341,9 +346,25 @@ def download_cdec_cli(
341346
stationfile,
342347
overwrite,
343348
freq,
349+
logdir,
350+
debug,
351+
quiet,
344352
):
345353
"""CLI for downloading CDEC water data."""
346-
354+
level, console = resolve_loglevel(
355+
debug=debug,
356+
quiet=quiet,
357+
)
358+
logger.debug(f"Logging level set to {logging.getLevelName(level)} and console={console}")
359+
360+
configure_logging(
361+
package_name="dms_datastore",
362+
level=level,
363+
console=not quiet,
364+
logdir=logdir,
365+
logfile_prefix="download_ncro"
366+
)
367+
logger.debug("Starting NCRO download")
347368
download_cdec(
348369
dest_dir,
349370
id_col,

dms_datastore/dstore_config.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ def repo_registry(repo=None, repo_cfg=None):
426426

427427
db = db.set_index(site_key, drop=False)
428428
db.index = db.index.astype(str)
429+
db.index.name = "site_id"
429430
return db
430431

431432

@@ -590,6 +591,17 @@ def registry_df(registry_name):
590591
db["agency_id"].astype(str).str.replace("'", "", regex=True).str.strip()
591592
)
592593

594+
if "agency_id" in db.columns:
595+
db["agency_id"] = (
596+
db["agency_id"].astype(str).str.replace("'", "", regex=True).str.strip()
597+
)
598+
599+
missing_agency = db["agency_id"].isna() | (db["agency_id"] == "") | (db["agency_id"].str.lower() == "nan")
600+
if missing_agency.any():
601+
bad_rows = db.loc[missing_agency]
602+
raise ValueError(
603+
f"Registry {registry_name} has missing agency_id values in {len(bad_rows)} row(s)"
604+
)
593605
_registry_cache[registry_name] = db
594606
return db
595607

dms_datastore/inventory.py

Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -58,36 +58,35 @@ def _inventory_files(root):
5858
def _parse_inventory_meta(allfiles, repo_cfg=None):
5959
return [interpret_fname(fname, repo_cfg=repo_cfg) for fname in allfiles]
6060

61+
6162
def series_id_from_meta(meta, repo_cfg=None, remove_provider=False):
6263
"""
6364
Construct a stable logical series identifier from parsed metadata.
6465
6566
Parameters
6667
----------
67-
meta : dict
68-
Parsed filename metadata
68+
meta : dict-like
69+
Parsed filename metadata or row-like object.
6970
repo_cfg : dict or None
70-
Repo configuration (defines provider_key, site_key)
71+
Repo configuration. When provided, identity is driven by repo_cfg["data_key"].
72+
When omitted, a legacy station-style fallback is used.
7173
remove_provider : bool
72-
If True, omit provider from identity
74+
If True, omit provider from identity.
7375
7476
Returns
7577
-------
7678
str
7779
series_id
7880
"""
79-
80-
# --- resolve keys ---
8181
if repo_cfg is None:
82-
provider_key = "agency" # legacy fallback
83-
site_key = "station_id"
82+
provider_key = "agency"
83+
data_key = ["station_id", "subloc", "param"]
8484
else:
8585
provider_key = repo_cfg["provider_key"]
86-
site_key = repo_cfg["site_key"]
86+
data_key = repo_cfg["data_key"] # let missing data_key fail naturally for now
8787

8888
parts = []
8989

90-
# --- provider ---
9190
if not remove_provider:
9291
provider_val = meta.get(provider_key)
9392
if provider_val is None:
@@ -96,29 +95,19 @@ def series_id_from_meta(meta, repo_cfg=None, remove_provider=False):
9695
)
9796
parts.append(str(provider_val))
9897

99-
# --- site ---
100-
site_val = meta.get(site_key)
101-
if site_val is None:
102-
raise ValueError(
103-
f"Missing site field {site_key!r} in metadata: {meta}"
104-
)
105-
parts.append(str(site_val))
106-
107-
# --- subloc ---
108-
subloc = meta.get("subloc")
109-
if subloc not in (None, "default"):
110-
parts.append(str(subloc))
111-
112-
# --- param ---
113-
param = meta.get("param")
114-
if param is None:
115-
raise ValueError(f"Missing param in metadata: {meta}")
116-
parts.append(str(param))
117-
118-
# --- modifier ---
119-
modifier = meta.get("modifier")
120-
if modifier is not None:
121-
parts.append(str(modifier))
98+
for key in data_key:
99+
val = meta.get(key)
100+
101+
# suppress conventional empty sublocation/modifier noise
102+
if key in ("subloc", "modifier") and val in (None, "", "default", "none"):
103+
continue
104+
105+
if val is None:
106+
raise ValueError(
107+
f"Missing data_key field {key!r} in metadata: {meta}"
108+
)
109+
110+
parts.append(str(val))
122111

123112
return "|".join(parts)
124113

dms_datastore/populate_repo.py

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -289,47 +289,70 @@ def revise_filename_syear_eyear(pat, force=True, outfile="rename.txt"):
289289
logger.info(b)
290290
logger.info(f"Renaming complete for pattern: {pat}")
291291

292-
293292
def populate_repo(
294293
agency, param, dest, start, end, overwrite=False, ignore_existing=None
295294
):
296295
"""Populate repository for the given agency/source and parameter."""
297296
maximize_subloc = False
298297

299-
slookup = dstore_config.config_file("station_dbase")
300298
if "ncro" in agency:
301299
vlookup = mapping_df
302300
agency = "ncro"
303301
else:
304302
vlookup = dstore_config.config_file("variable_mappings")
305303

306-
subloclookup = dstore_config.config_file("sublocations")
307-
df = pd.read_csv(slookup, sep=",", comment="#", header=0, dtype={"agency_id": str})
304+
# Use repo-aware registry access instead of reading station_dbase CSV directly.
305+
slookup = dstore_config.repo_registry("formatted").copy()
306+
308307
filter_agency = "dwr_ncro" if agency == "ncro" else agency
309-
df = df.loc[df.agency.str.lower() == filter_agency, :]
310-
df["agency_id"] = df["agency_id"].str.replace("'", "", regex=True)
308+
slookup = slookup.loc[slookup.agency.str.lower() == filter_agency, :]
309+
name_lookup = (
310+
slookup.loc[:, ["station_id", "name"]]
311+
.drop_duplicates(subset=["station_id"])
312+
)
311313

312-
dfsub = read_station_subloc(subloclookup)
313-
df = merge_station_subloc(df, dfsub, default_z=-0.5)
314314

315-
df = df.reset_index()
315+
dfsub = read_station_subloc(dstore_config.config_file("sublocations"))
316+
slookup = merge_station_subloc(slookup, dfsub, default_z=-0.5)
316317

317318
if ignore_existing is not None:
318-
df = df[~df["station_id"].isin(ignore_existing)]
319+
slookup = slookup[~slookup["station_id"].isin(ignore_existing)]
319320

320321
dest_dir = dest
321322
source = "cdec" if agency in ["dwr", "usbr"] else agency
322-
agency_id_col = "cdec_id" if source == "cdec" else "agency_id"
323+
agency_id_col = "agency_id"
324+
src_site_id_col = "cdec_id" if source == "cdec" else None
323325

324-
df = df[["station_id", "subloc"]]
326+
# Preserve only the station display name, and only outside the standard
327+
# request-building pipeline.
328+
slookup = slookup.reset_index()
329+
df_req = slookup.loc[:, ["station_id", "subloc"]]
325330

326331
stationlist = normalize_station_request(
327-
stationframe=df,
332+
stationframe=df_req,
328333
param=param,
329334
default_subloc="default",
330335
)
331-
stationlist = attach_agency_id(stationlist, repo_name="formatted", agency_id_col=agency_id_col)
336+
stationlist = attach_agency_id(
337+
stationlist,
338+
repo_name="formatted",
339+
agency_id_col=agency_id_col,
340+
src_site_id_col=src_site_id_col,
341+
on_missing="drop" if src_site_id_col is not None else "raise",
342+
)
332343
stationlist = attach_src_var_id(stationlist, vlookup, source=source)
344+
345+
stationlist = stationlist.merge(
346+
name_lookup,
347+
on="station_id",
348+
how="left",
349+
validate="many_to_one",
350+
)
351+
352+
if stationlist["name"].isna().any():
353+
missing = stationlist.loc[stationlist["name"].isna(), "station_id"].tolist()
354+
raise ValueError(f"Missing station name for station_id(s): {missing}")
355+
333356
if maximize_subloc:
334357
stationlist["subloc"] = "default"
335358
if param not in ["flow", "elev"]:
@@ -386,7 +409,8 @@ def supplement_ncro_with_cdec(df, dest, start, overwrite=False, ignore_existing=
386409
df = df[~df["station_id"].isin(ignore_existing)]
387410

388411
source = "cdec"
389-
agency_id_col = "cdec_id"
412+
agency_id_col = "agency_id"
413+
src_site_id_col = "cdec_id"
390414

391415
stationlist = normalize_station_request(
392416
stationframe=df,
@@ -397,6 +421,7 @@ def supplement_ncro_with_cdec(df, dest, start, overwrite=False, ignore_existing=
397421
stationlist,
398422
repo_name="formatted",
399423
agency_id_col=agency_id_col,
424+
src_site_id_col=src_site_id_col,
400425
on_missing="drop",
401426
)
402427

0 commit comments

Comments
 (0)