Skip to content

Commit 42ca33c

Browse files
EliEli
authored andcommitted
Simplified config, better implementation and testing of filename templates, inventory and read_ts_repo. Format uses config based template.
1 parent c657cda commit 42ca33c

17 files changed

Lines changed: 946 additions & 550 deletions

dms_datastore/auto_screen.py

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from dms_datastore.dstore_config import *
1919
from dms_datastore.inventory import *
2020
from dms_datastore.write_ts import *
21+
from dms_datastore.filename import meta_to_filename
2122
from schimpy.station import *
2223
import geopandas as gpd
2324
import numpy as np
@@ -148,22 +149,20 @@ def plot_anomalies(
148149
plt.close(fig)
149150

150151

152+
153+
151154
def filter_inventory_(inventory, stations, params):
152155
if stations is not None:
153156
if isinstance(stations, str):
154157
stations = [stations]
155-
inventory = inventory.loc[
156-
inventory.index.get_level_values("station_id").isin(stations), :
157-
]
158+
inventory = inventory.loc[inventory["station_id"].isin(stations), :]
159+
158160
if params is not None:
159161
if isinstance(params, str):
160162
params = [params]
161-
logger.debug(f"params: {params}")
162-
inventory = inventory.loc[
163-
inventory.index.get_level_values("param").isin(params), :
164-
]
165-
return inventory
163+
inventory = inventory.loc[inventory["param"].isin(params), :]
166164

165+
return inventory
167166

168167
def auto_screen(
169168
fpath="formatted",
@@ -208,29 +207,29 @@ def auto_screen(
208207

209208
active = start_station is None
210209
station_db = station_dbase()
211-
inventory = repo_data_inventory(fpath)
210+
211+
source_repo = "formatted"
212+
actual_fpath = fpath if fpath is not None else repo_root(source_repo)
213+
inventory = repo_data_inventory(repo="formatted",in_path=actual_fpath) # repo is the config repo, in_path is the data storage location
212214
inventory = filter_inventory_(inventory, stations, params)
213215
failed_read = []
214216

215217
for index, row in inventory.iterrows():
216-
station_id = index[0]
217-
if not active:
218-
if station_id == start_station:
219-
active = True
220-
else:
221-
continue
222-
subloc = index[1]
223-
if type(subloc) == float:
218+
station_id = row["station_id"]
219+
subloc = row["subloc"]
220+
param = row["param"]
221+
222+
if pd.isna(subloc) or subloc is None:
224223
subloc = "default"
225-
param = index[2]
224+
226225
if subloc is None:
227226
subloc = "default"
228227
if np.random.uniform() < 0.0: # 0.95:
229228
logger.debug(f"Randomly rejecting: {station_id} {subloc} {param}")
230229
continue
231-
filename = str(row.filename)
230+
232231
station_info = station_db.loc[station_id, :]
233-
agency = row.agency_dbase
232+
agency = row["agency_registry"] if "agency_registry" in row.index else row["agency"]
234233
if agency.startswith("dwr_"):
235234
agency = agency[4:] # todo: need to take care of des_ vs dwr_des etc
236235

@@ -240,9 +239,11 @@ def auto_screen(
240239
# these may be lists
241240
try:
242241
# logger.debug(f"fetching {fpath},{station_id},{param}")
243-
meta_ts = fetcher(fpath, station_id, param, subloc=subloc)
244-
except:
245-
logger.warning(f"Read failed for {fpath}, {station_id}, {param}, {subloc}")
242+
meta_ts = fetcher(source_repo, station_id, param, subloc=subloc, data_path=actual_fpath)
243+
except Exception as e:
244+
logger.warning(f"Read failed for {actual_fpath}, {station_id}, {param}, {subloc}, storage loc = {actual_fpath}")
245+
logger.exception(e)
246+
print(e)
246247
meta_ts = None
247248

248249
if meta_ts is None:
@@ -276,12 +277,23 @@ def auto_screen(
276277
if "value" in screened.columns:
277278
screened = screened[["value", "user_flag"]]
278279
meta["screen"] = proto
279-
if subloc_actual and subloc_actual != "default":
280-
output_fname = (
281-
f"{agency}_{station_id}@{subloc_actual}_{row.agency_id}_{param}.csv"
282-
)
283-
else:
284-
output_fname = f"{agency}_{station_id}_{row.agency_id}_{param}.csv"
280+
281+
# Build output filename using configured naming spec for screened repo
282+
output_meta = {
283+
"agency": agency,
284+
"station_id": station_id,
285+
"subloc": subloc_actual if subloc_actual != "default" else None,
286+
"param": param,
287+
"agency_id": row.agency_id,
288+
}
289+
# Add year info if available from metadata
290+
if "year" in meta:
291+
output_meta["year"] = meta["year"]
292+
elif "syear" in meta and "eyear" in meta:
293+
output_meta["syear"] = meta["syear"]
294+
output_meta["eyear"] = meta["eyear"]
295+
296+
output_fname = meta_to_filename(output_meta, repo="screened",include_shard=False)
285297
output_fpath = os.path.join(dest, output_fname)
286298
logger.debug("start write")
287299
write_ts_csv(screened, output_fpath, meta, chunk_years=True)
@@ -479,24 +491,26 @@ def spatial_config(configfile, x, y):
479491
return checker.region_info(x, y)
480492

481493

482-
def ncro_fetcher(repo_path, station_id, param, subloc):
483-
"""Reads NCRO data, correctly folding together NCRO and CDEC by priority.
484-
Celsius is converted to Farenheit
485-
"""
494+
def ncro_fetcher(repo, station_id, param, subloc, data_path=None):
486495
return read_ts_repo(
487496
station_id,
488497
param,
489498
subloc=subloc,
490499
src_priority=["ncro", "cdec"],
491-
repo=repo_path,
500+
repo=repo,
501+
data_path=data_path,
492502
meta=True,
493503
)
494504

495-
496-
def general_fetcher(repo_path, station_id, param, subloc):
497-
"""Fetches from a well behaved and standard repo"""
498-
return read_ts_repo(station_id, param, subloc=subloc, repo=repo_path, meta=True)
499-
505+
def general_fetcher(repo, station_id, param, subloc, data_path=None):
506+
return read_ts_repo(
507+
station_id,
508+
param,
509+
subloc=subloc,
510+
repo=repo,
511+
data_path=data_path,
512+
meta=True,
513+
)
500514

501515
def custom_fetcher(agency):
502516
if agency in ["ncro", "dwr_ncro"]:

dms_datastore/config_data/des_rationalize_cfg.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,5 @@ rationalize:
6868
# B supersedes A if B.syear <= A.syear and B.eyear >= A.eyear
6969
#
7070
# IMPORTANT: If any file would end up in both include and omit after expansion,
71-
# that is an error. Be explicit instead.
71+
# that is an error. Be explicit instead.
72+

dms_datastore/config_data/dstore_config.yaml

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,28 +47,25 @@ repos:
4747
search:
4848
use_source_slot: true
4949
shard_style: auto
50-
parse:
51-
style: legacy
50+
5251

5352
screened:
5453
root: "//cnrastore-bdo/Modeling_Data/repo/continuous/screened"
5554
registry: continuous
56-
key_column: id
55+
key_column: station_id
5756
source_priority_mode: by_registry_column
5857
source_priority_column: agency
5958
filename_templates:
60-
- "{source}_{key@subloc}_{agency_id}_{param}_{syear}_{eyear}.csv"
61-
- "{source}_{key@subloc}_{agency_id}_{param}_{year}.csv"
59+
- "{agency}_{key@subloc}_{agency_id}_{param}_{year}.csv"
6260
search:
6361
use_source_slot: true
6462
shard_style: auto
65-
parse:
66-
style: legacy
63+
6764

6865
processed:
6966
root: "//cnrastore-bdo/Modeling_Data/repo/continuous/processed"
7067
registry: processed
71-
key_column: id
68+
key_column: station_id
7269
source_priority_mode: repo_default
7370
source_priority_group: processed_default
7471
filename_templates:
@@ -79,13 +76,11 @@ repos:
7976
search:
8077
use_source_slot: true
8178
shard_style: auto
82-
parse:
83-
style: template
8479

8580
structures:
8681
root: "//cnrastore-bdo/Modeling_Data/repo/structures"
8782
registry: structures
88-
key_column: id
83+
key_column: structure_id
8984
source_priority_mode: repo_default
9085
source_priority_group: structures_default
9186
filename_templates:

dms_datastore/download_ncro.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -690,16 +690,9 @@ def test():
690690
param_lookup=vlookup,
691691
source="ncro",
692692
)
693-
print(df)
694693
ncro_download(df, destdir, stime, etime, overwrite=overwrite)
695694

696695

697-
def test_read():
698-
fname = "ncro_old_b95380_temp_2015_2024.csv"
699-
fname = "ncro_orm_b95370_cla_*.csv"
700-
ts = read_ts.read_ts(fname)
701-
print(ts)
702-
703696

704697
@click.command(
705698
help=(

dms_datastore/dstore_config.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ def repo_root(repo=None, repo_cfg=None):
7373

7474
def repo_registry(repo=None, repo_cfg=None):
7575
cfg = coerce_repo_config(repo=repo, repo_cfg=repo_cfg)
76-
print(cfg)
7776
registry_name = cfg.get("registry")
7877
key_column = cfg.get("key_column", "id")
7978
if registry_name is None:
@@ -179,8 +178,6 @@ def repo_config(repo_name):
179178
)
180179
spec.setdefault("key_column", "id")
181180
spec.setdefault("source_priority_mode", "none")
182-
spec.setdefault("parse", {"style": "legacy"})
183-
spec.setdefault("search", {"use_source_slot": True, "shard_style": "auto"})
184181

185182
templates = spec.get("filename_templates", [])
186183
if not templates:
@@ -232,7 +229,7 @@ def registry_df(registry_name, key_column="id"):
232229
)
233230

234231
db[key_column] = db[key_column].astype(str).str.replace("'", "", regex=True).str.strip()
235-
print(db)
232+
236233

237234
if "agency_id" in db.columns:
238235
db["agency_id"] = db["agency_id"].astype(str).str.replace("'", "", regex=True).str.strip()
@@ -244,6 +241,8 @@ def registry_df(registry_name, key_column="id"):
244241
raise ValueError(f"Registry {registry_name} has duplicate {key_column} keys")
245242

246243
db = db.set_index(key_column, drop=False)
244+
# Ensure index is string type for consistent merging
245+
db.index = db.index.astype(str)
247246
return db
248247

249248

0 commit comments

Comments
 (0)