Skip to content

Commit 7f9540d

Browse files
EliEli
authored andcommitted
Add process_elev script and changed site_key to a hardwired name station_id.
This simplifies away site_id.
1 parent 5a6f01d commit 7f9540d

19 files changed

Lines changed: 377 additions & 111 deletions

dms_datastore/__main__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from dms_datastore.rationalize_time_partitions import rationalize_time_partitions_cli
3131
from dms_datastore.spot_check import spot_check_cli
3232
from dms_datastore.populate_daily import populate_daily_cli
33+
from dms_datastore.processed.process_elev_data import process_elev_cli
3334

3435
@click.group(help="DMS CLI tools for data processing and extraction.")
3536
@click.help_option("-h", "--help") # Add the help option at the group level
@@ -69,6 +70,7 @@ def cli():
6970
cli.add_command(rationalize_time_partitions_cli, "rationalize_time_partitions")
7071
cli.add_command(populate_daily_cli, "populate_daily")
7172
cli.add_command(spot_check_cli, "spot_check")
73+
cli.add_command(process_elev_cli, "process_elev")
7274

7375

7476
if __name__ == "__main__":

dms_datastore/config_data/dstore_config.yaml

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,25 @@ variable_mappings: variable_mappings.csv
33
variable_definitions: variables.csv
44
sublocations: station_subloc.csv
55
ncro_inventory: ncro_inventory_full.csv
6-
76
daily_data_manifest: daily_data_manifest.csv
87
daily_dropbox_spec: daily_dropbox.yaml
9-
108
file_deletion_list: non_15_min_files_checked.txt
119
compare_excepts_formatted: compare_excepts_formatted.txt
1210
des_rationalize_time_spec: des_rationalize_cfg.yaml
13-
1411
repo_base: "//cnrastore-bdo/Modeling_Data/repo"
15-
16-
default_repo: screened
17-
1812
spot_check_spec: spot_check_spec.yaml
1913

20-
# registry files available to repos
14+
# Repository info
15+
default_repo: screened
2116

2217

18+
# aliases for registry files available to repos in their "registry"
19+
# these will be looked for in the dbase_config directory and are expected to be csv files.
20+
# The registry files provide supplementary metadata that can be used for provider resolution,
21+
# geographical and other purposes.
2322
registries:
2423
continuous: station_dbase.csv
25-
processed: processed_series.csv
24+
processed_synthetic: processed_registry.csv
2625
structures: structures_registry.csv
2726
daily: station_dbase.csv
2827

@@ -31,15 +30,10 @@ repos:
3130
formatted:
3231
root: "//cnrastore-bdo/Modeling_Data/repo/continuous/formatted"
3332

34-
# Registry providing supplementary metadata (e.g., agency, location)
35-
registry: continuous
36-
37-
# Primary identifier for a site (station, structure, etc.). This will be a key for the registry
38-
# and is always part of filename templates, usually in the second slot
39-
# Examples: structure_id or station_id
40-
site_key: station_id
41-
42-
# Depending on settings for the repo, multiple sources (e.g., ncro, cdec) may supply the same series
33+
# Registry providing supplementary metadata (e.g., agency, location) for stations
34+
registry: continuous # this is an alias set up above for station_dbase.csv.
35+
36+
# Depending on settings for the repo, multiple sources (e.g., ncro, cdec) may supply the same time series
4337
# For instance Bacon Island conductance is observed by NCRO but distributed both by NCRO (QA/QC'd data on Water Data Library)
4438
# and on CDEC (realtime, not QA/QC). This field, in this case {source} appears in the filename template
4539
# (typically first slot) and drives globbing and merge order
@@ -88,7 +82,6 @@ repos:
8882
screened:
8983
root: "//cnrastore-bdo/Modeling_Data/repo/continuous/screened"
9084
registry: continuous
91-
site_key: station_id
9285
# screened data is assumed to be grouped according to the original agency/program that manages the station.
9386
# so whereas in formatted you might have both "ncro" and "cdec" sources for a station, here they will be unified under ncro.
9487
provider_key: agency
@@ -102,8 +95,8 @@ repos:
10295

10396
processed:
10497
root: "//cnrastore-bdo/Modeling_Data/repo/continuous/processed"
105-
registry: processed
106-
site_key: station_id
98+
registry: [continuous, processed_synthetic]
99+
107100
provider_key: processor
108101
provider_resolution_mode: assume_unique # This may have to be adjusted for cases like SF harmonic tides where there is a noaa and dwr version
109102
filename_templates:
@@ -113,23 +106,21 @@ repos:
113106
- "{processor}_{station_id@subloc}_{param}_{year}.csv"
114107
- "{processor}_{station_id@subloc}_{param}.csv"
115108
file_key: [processor, station_id, subloc, param, modifier, shard]
116-
data_key: [station_id, subloc, param, modifier]
109+
data_key: [processor, subloc, param, modifier]
117110

118111
structures_formatted:
119112
root: "//cnrastore-bdo/Modeling_Data/repo/structures/formatted"
120113
registry: structures
121-
site_key: structure_id
122114
provider_key: processor
123115
provider_resolution_mode: assume_unique
124116
filename_templates:
125-
- "{processor}_{structure_id@subloc}_{param@modifier}.csv"
126-
- "{processor}_{structure_id@subloc}_{param@modifier}_{year}.csv"
117+
- "{processor}_{station_id@subloc}_{param@modifier}.csv"
118+
- "{processor}_{station_id@subloc}_{param@modifier}_{year}.csv"
127119

128120

129121
daily_formatted:
130122
root: "//cnrastore-bdo/Modeling_Data/repo/daily/formatted"
131123
registry: continuous # aliases the station data base that provides supplementary metadata like geolocation and agency
132-
site_key: station_id # registry and template use this term for the site's identifier (was "key")
133124
provider_key: source # provider is the agency mainly involved in delivering the data.
134125
# Generic for "source" (datamart) "agency" (collecting agency) and
135126
# # "processor" (group synthesizing, manipulating or filling data)

dms_datastore/dropbox_data.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def _resolve_metadata_value(field_name, raw_value, inferred_meta, registry_row):
175175

176176
def _registry_row_for_metadata(name, merged, repo_name):
177177
rcfg = repo_config(repo_name)
178-
site_key = rcfg["site_key"]
178+
site_key = "station_id"
179179
slookup = repo_registry(repo_name)
180180

181181
if site_key in merged and merged[site_key] not in (None, "", "infer_from_filename"):
@@ -197,7 +197,7 @@ def _registry_row_for_metadata(name, merged, repo_name):
197197

198198
def populate_meta(fpath, listing, repo_name, meta_out=None):
199199
rcfg = repo_config(repo_name)
200-
site_key = rcfg["site_key"]
200+
site_key = "station_id"
201201

202202
meta = dict(listing.get("metadata", {}) or {})
203203
name = listing.get("name", "<unnamed>")
@@ -330,7 +330,7 @@ def _maybe_rename_value_column(ts, splice_args):
330330

331331
def _check_metadata(meta, repo_name):
332332
rcfg = repo_config(repo_name)
333-
site_key = rcfg["site_key"]
333+
site_key = "station_id"
334334

335335
required = [
336336
site_key,

dms_datastore/dstore_config.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -402,31 +402,65 @@ def repo_registry(repo=None, repo_cfg=None):
402402
In contrast, :func:`registry_df` returns a raw table without
403403
any guarantees about identity or uniqueness.
404404
"""
405+
site_key = "station_id"
405406
cfg = coerce_repo_config(repo=repo, repo_cfg=repo_cfg)
406-
registry_name = cfg.get("registry")
407-
site_key = cfg["site_key"]
407+
registry_names = cfg.get("registry")
408408

409-
if registry_name is None:
409+
if registry_names is None:
410410
raise ValueError(f"Repo {cfg['name']!r} has no named registry")
411411

412-
db = registry_df(registry_name).copy()
412+
if isinstance(registry_names, str):
413+
registry_names = [registry_names]
413414

414-
if site_key not in db.columns:
415-
raise ValueError(
416-
f"Registry {registry_name} is missing key column {site_key}"
415+
dfs = []
416+
for rname in registry_names:
417+
sub = registry_df(rname).copy()
418+
if site_key not in sub.columns:
419+
raise ValueError(
420+
f"Registry {rname} is missing key column {site_key}"
421+
)
422+
sub[site_key] = (
423+
sub[site_key].astype(str).str.replace("'", "", regex=True).str.strip()
417424
)
425+
dfs.append(sub)
426+
427+
# Merge on common columns
428+
if len(dfs) == 1:
429+
db = dfs[0]
430+
else:
431+
common_cols = set(dfs[0].columns)
432+
for sub in dfs[1:]:
433+
common_cols &= set(sub.columns)
434+
common_cols = sorted(common_cols)
435+
if site_key not in common_cols:
436+
raise ValueError(
437+
f"Column {site_key!r} not in common columns across registries"
438+
)
418439

419-
db[site_key] = (
420-
db[site_key].astype(str).str.replace("'", "", regex=True).str.strip()
421-
)
440+
# Check for key overlap across sub-registries
441+
all_keys = []
442+
for i, sub in enumerate(dfs):
443+
keys = set(sub[site_key].values)
444+
overlap = keys & set(all_keys)
445+
if overlap:
446+
raise ValueError(
447+
f"Key overlap across registries: {sorted(overlap)}"
448+
)
449+
all_keys.extend(sub[site_key].values)
450+
451+
dfs = [sub[common_cols] for sub in dfs]
452+
db = pd.concat(dfs, ignore_index=True)
422453

423454
dup = db[site_key].duplicated()
424455
if dup.any():
425-
raise ValueError(f"Registry {registry_name} has duplicate {site_key} keys")
456+
duped = db.loc[dup, site_key].tolist()
457+
raise ValueError(
458+
f"Registry has duplicate {site_key} keys: {duped}"
459+
)
426460

427461
db = db.set_index(site_key, drop=False)
428462
db.index = db.index.astype(str)
429-
db.index.name = "site_id"
463+
db.index.name = None
430464
return db
431465

432466

@@ -487,7 +521,7 @@ def repo_config(repo_name):
487521

488522
spec = dict(repos[repo_name])
489523

490-
required = ["site_key", "provider_key", "provider_resolution_mode", "filename_templates"]
524+
required = ["provider_key", "provider_resolution_mode", "filename_templates"]
491525
missing = [k for k in required if k not in spec]
492526
if missing:
493527
raise ValueError(f"Repo {repo_name!r} missing required keys: {missing}")

dms_datastore/filename.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def naming_spec(repo=None, repo_cfg=None, templates=None):
1212
if repo_cfg is not None:
1313
return {
1414
"filename_templates": list(repo_cfg.get("filename_templates", [])),
15-
"site_key": repo_cfg["site_key"],
15+
"site_key": "station_id",
1616
"provider_key": repo_cfg.get("provider_key"),
1717
"name": repo_cfg.get("name"),
1818
}
@@ -22,7 +22,7 @@ def naming_spec(repo=None, repo_cfg=None, templates=None):
2222
rcfg = dstore_config.repo_config(repo)
2323
return {
2424
"filename_templates": list(rcfg.get("filename_templates", [])),
25-
"site_key": rcfg["site_key"],
25+
"site_key": "station_id",
2626
"provider_key": rcfg.get("provider_key"),
2727
"name": rcfg.get("name"),
2828
}
@@ -128,7 +128,7 @@ def build_repo_globs(
128128
if not templates:
129129
raise ValueError(f"Repo {repo_cfg.get('name')!r} has no filename_templates")
130130

131-
site_key = repo_cfg["site_key"]
131+
site_key = "station_id"
132132
provider_key = repo_cfg.get("provider_key")
133133

134134
if providers is None:
@@ -264,7 +264,7 @@ def _template_regex_from_template(template, *, key_column="station_id"):
264264
def _interpret_fname_template(fname, repo_cfg):
265265
fname = os.path.split(fname)[1]
266266
templates = repo_cfg.get("filename_templates", [])
267-
site_key = repo_cfg.get("site_key", "station_id")
267+
site_key = "station_id"
268268
if not templates:
269269
raise ValueError(
270270
f"Repo {repo_cfg.get('name')!r} has no filename_templates for template parse"

dms_datastore/inventory.py

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def repo_data_inventory(repo=None, *, repo_cfg=None, in_path=None, registry=None
123123
root = in_path if in_path is not None else repo_cfg["root"]
124124
registry = repo_registry(repo_cfg=repo_cfg)
125125

126-
site_key = repo_cfg["site_key"]
126+
site_key = "station_id"
127127
provider_key = repo_cfg["provider_key"]
128128

129129
allfiles = _inventory_files(root)
@@ -190,23 +190,12 @@ def repo_data_inventory(repo=None, *, repo_cfg=None, in_path=None, registry=None
190190

191191
grouped[site_key] = grouped[site_key].astype(str).str.strip()
192192

193-
if registry.index.name != site_key:
194-
if site_key not in registry.columns:
195-
raise ValueError(
196-
f"Registry missing join key column {site_key!r}; "
197-
f"columns are {registry.columns.tolist()}"
198-
)
199-
registry = registry.copy()
200-
registry[site_key] = registry[site_key].astype(str).str.strip()
201-
registry = registry.set_index(site_key, drop=False)
202-
else:
203-
registry = registry.copy()
204-
registry.index = registry.index.astype(str)
205-
if site_key in registry.columns:
206-
registry[site_key] = registry[site_key].astype(str).str.strip()
193+
# Registry is already indexed by station_id from repo_registry()
194+
# Drop the station_id column before join to avoid pandas ambiguity
195+
reg_for_join = registry.drop(columns=[site_key], errors="ignore")
207196

208197
metastat = grouped.join(
209-
registry,
198+
reg_for_join,
210199
on=site_key,
211200
rsuffix="_registry",
212201
how="left",
@@ -251,7 +240,7 @@ def repo_file_inventory(repo=None, *, repo_cfg=None, in_path=None):
251240
root = in_path if in_path is not None else repo_cfg["root"]
252241
registry = repo_registry(repo_cfg=repo_cfg)
253242

254-
site_key = repo_cfg["site_key"]
243+
site_key = "station_id"
255244

256245
allfiles = _inventory_files(root)
257246
allmeta = _parse_inventory_meta(allfiles, repo_cfg=repo_cfg)
@@ -309,8 +298,11 @@ def repo_file_inventory(repo=None, *, repo_cfg=None, in_path=None):
309298
f"Cannot join registry: grouped inventory missing site key {site_key!r}"
310299
)
311300

301+
# Drop the station_id column before join to avoid pandas ambiguity
302+
reg_for_join = registry.drop(columns=[site_key], errors="ignore")
303+
312304
metastat = grouped.join(
313-
registry,
305+
reg_for_join,
314306
on=site_key,
315307
rsuffix="_registry",
316308
how="left",

dms_datastore/process_station_variable.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,8 @@ def attach_agency_id(
282282
)
283283

284284
repo_cfg = dstore_config.repo_config(repo_name)
285-
registry_name = repo_cfg["registry"]
286-
site_key = repo_cfg["site_key"]
287-
registry = dstore_config.registry_df(registry_name).copy()
285+
site_key = "station_id"
286+
registry = dstore_config.repo_registry(repo_cfg=repo_cfg)
288287

289288
if site_key not in registry.columns:
290289
raise ValueError(f"Registry site key column {site_key!r} not found")
@@ -298,7 +297,7 @@ def attach_agency_id(
298297
raise ValueError(f"Requested registry column(s) not found: {missing_cols}")
299298

300299
lookup = registry[[site_key] + required_cols].copy()
301-
lookup = lookup.rename(columns={site_key: "station_id"})
300+
lookup = lookup.reset_index(drop=True)
302301

303302
merged = df.merge(lookup, on="station_id", how="left")
304303

dms_datastore/processed/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)