Skip to content

Commit b65c400

Browse files
EliEli
authored andcommitted
Merge branch 'generalize_repo'
2 parents 6cd8f23 + 0f09843 commit b65c400

31 files changed

+3208
-936
lines changed

dms_datastore/auto_screen.py

Lines changed: 53 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
from vtools.data.gap import *
1717
from dms_datastore.read_multi import *
1818
from dms_datastore.dstore_config import *
19-
from dms_datastore.filename import interpret_fname
2019
from dms_datastore.inventory import *
2120
from dms_datastore.write_ts import *
21+
from dms_datastore.filename import meta_to_filename
2222
from schimpy.station import *
2323
import geopandas as gpd
2424
import numpy as np
@@ -149,22 +149,20 @@ def plot_anomalies(
149149
plt.close(fig)
150150

151151

152+
153+
152154
def filter_inventory_(inventory, stations, params):
153155
if stations is not None:
154156
if isinstance(stations, str):
155157
stations = [stations]
156-
inventory = inventory.loc[
157-
inventory.index.get_level_values("station_id").isin(stations), :
158-
]
158+
inventory = inventory.loc[inventory["station_id"].isin(stations), :]
159+
159160
if params is not None:
160161
if isinstance(params, str):
161162
params = [params]
162-
logger.debug(f"params: {params}")
163-
inventory = inventory.loc[
164-
inventory.index.get_level_values("param").isin(params), :
165-
]
166-
return inventory
163+
inventory = inventory.loc[inventory["param"].isin(params), :]
167164

165+
return inventory
168166

169167
def auto_screen(
170168
fpath="formatted",
@@ -209,29 +207,29 @@ def auto_screen(
209207

210208
active = start_station is None
211209
station_db = station_dbase()
212-
inventory = repo_data_inventory(fpath)
210+
211+
source_repo = "formatted"
212+
actual_fpath = fpath if fpath is not None else repo_root(source_repo)
213+
inventory = repo_data_inventory(repo="formatted",in_path=actual_fpath) # repo is the config repo, in_path is the data storage location
213214
inventory = filter_inventory_(inventory, stations, params)
214215
failed_read = []
215216

216217
for index, row in inventory.iterrows():
217-
station_id = index[0]
218-
if not active:
219-
if station_id == start_station:
220-
active = True
221-
else:
222-
continue
223-
subloc = index[1]
224-
if type(subloc) == float:
218+
station_id = row["station_id"]
219+
subloc = row["subloc"]
220+
param = row["param"]
221+
222+
if pd.isna(subloc) or subloc is None:
225223
subloc = "default"
226-
param = index[2]
224+
227225
if subloc is None:
228226
subloc = "default"
229227
if np.random.uniform() < 0.0: # 0.95:
230228
logger.debug(f"Randomly rejecting: {station_id} {subloc} {param}")
231229
continue
232-
filename = str(row.filename)
230+
233231
station_info = station_db.loc[station_id, :]
234-
agency = row.agency_dbase
232+
agency = row["agency_registry"] if "agency_registry" in row.index else row["agency"]
235233
if agency.startswith("dwr_"):
236234
agency = agency[4:] # todo: need to take care of des_ vs dwr_des etc
237235

@@ -241,9 +239,11 @@ def auto_screen(
241239
# these may be lists
242240
try:
243241
# logger.debug(f"fetching {fpath},{station_id},{param}")
244-
meta_ts = fetcher(fpath, station_id, param, subloc=subloc)
245-
except:
246-
logger.warning(f"Read failed for {fpath}, {station_id}, {param}, {subloc}")
242+
meta_ts = fetcher(source_repo, station_id, param, subloc=subloc, data_path=actual_fpath)
243+
except Exception as e:
244+
logger.warning(f"Read failed for {actual_fpath}, {station_id}, {param}, {subloc}, storage loc = {actual_fpath}")
245+
logger.exception(e)
246+
print(e)
247247
meta_ts = None
248248

249249
if meta_ts is None:
@@ -277,12 +277,23 @@ def auto_screen(
277277
if "value" in screened.columns:
278278
screened = screened[["value", "user_flag"]]
279279
meta["screen"] = proto
280-
if subloc_actual and subloc_actual != "default":
281-
output_fname = (
282-
f"{agency}_{station_id}@{subloc_actual}_{row.agency_id}_{param}.csv"
283-
)
284-
else:
285-
output_fname = f"{agency}_{station_id}_{row.agency_id}_{param}.csv"
280+
281+
# Build output filename using configured naming spec for screened repo
282+
output_meta = {
283+
"agency": agency,
284+
"station_id": station_id,
285+
"subloc": subloc_actual if subloc_actual != "default" else None,
286+
"param": param,
287+
"agency_id": row.agency_id,
288+
}
289+
# Add year info if available from metadata
290+
if "year" in meta:
291+
output_meta["year"] = meta["year"]
292+
elif "syear" in meta and "eyear" in meta:
293+
output_meta["syear"] = meta["syear"]
294+
output_meta["eyear"] = meta["eyear"]
295+
296+
output_fname = meta_to_filename(output_meta, repo="screened",include_shard=False)
286297
output_fpath = os.path.join(dest, output_fname)
287298
logger.debug("start write")
288299
write_ts_csv(screened, output_fpath, meta, chunk_years=True)
@@ -480,24 +491,26 @@ def spatial_config(configfile, x, y):
480491
return checker.region_info(x, y)
481492

482493

483-
def ncro_fetcher(repo_path, station_id, param, subloc):
484-
"""Reads NCRO data, correctly folding together NCRO and CDEC by priority.
485-
Celsius is converted to Farenheit
486-
"""
494+
def ncro_fetcher(repo, station_id, param, subloc, data_path=None):
487495
return read_ts_repo(
488496
station_id,
489497
param,
490498
subloc=subloc,
491499
src_priority=["ncro", "cdec"],
492-
repo=repo_path,
500+
repo=repo,
501+
data_path=data_path,
493502
meta=True,
494503
)
495504

496-
497-
def general_fetcher(repo_path, station_id, param, subloc):
498-
"""Fetches from a well behaved and standard repo"""
499-
return read_ts_repo(station_id, param, subloc=subloc, repo=repo_path, meta=True)
500-
505+
def general_fetcher(repo, station_id, param, subloc, data_path=None):
506+
return read_ts_repo(
507+
station_id,
508+
param,
509+
subloc=subloc,
510+
repo=repo,
511+
data_path=data_path,
512+
meta=True,
513+
)
501514

502515
def custom_fetcher(agency):
503516
if agency in ["ncro", "dwr_ncro"]:

dms_datastore/config_data/des_rationalize_cfg.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,5 @@ rationalize:
6868
# B supersedes A if B.syear <= A.syear and B.eyear >= A.eyear
6969
#
7070
# IMPORTANT: If any file would end up in both include and omit after expansion,
71-
# that is an error. Be explicit instead.
71+
# that is an error. Be explicit instead.
72+
Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,96 @@
1-
2-
3-
4-
station_dbase : station_dbase.csv
1+
station_dbase: station_dbase.csv
52
variable_mappings: variable_mappings.csv
63
variable_definitions: variables.csv
74
sublocations: station_subloc.csv
85

96
file_deletion_list: non_15_min_files_checked.txt
107
compare_excepts_formatted: compare_excepts_formatted.txt
11-
128
des_rationalize_time_spec: des_rationalize_cfg.yaml
139

1410
repo_base: "//cnrastore-bdo/Modeling_Data/repo"
11+
12+
# legacy aliases kept for compatibility
1513
repo: "//cnrastore-bdo/Modeling_Data/repo/continuous/screened"
1614
screened: "//cnrastore-bdo/Modeling_Data/repo/continuous/screened"
1715
processed: "//cnrastore-bdo/Modeling_Data/repo/continuous/processed"
1816

19-
source_priority:
20-
ncro: ['ncro','cdec']
21-
dwr_ncro: ['ncro']
22-
des: ['des']
23-
dwr_des: ['des']
24-
usgs: ['usgs']
25-
noaa: ['noaa']
26-
usbr: ['usbr','cdec']
27-
dwr_om: ['cdec']
28-
dwr: ['dwr','cdec']
29-
ebmud: ['usgs','ebmud','cdec']
30-
17+
# global priorities can remain as named groups
18+
source_priority_groups:
19+
ncro: ["ncro", "cdec"]
20+
dwr_ncro: ["ncro"]
21+
des: ["des"]
22+
dwr_des: ["des"]
23+
usgs: ["usgs"]
24+
noaa: ["noaa"]
25+
usbr: ["usbr", "cdec"]
26+
dwr_om: ["cdec"]
27+
dwr: ["dwr", "cdec"]
28+
ebmud: ["usgs", "ebmud", "cdec"]
29+
processed_default: ["dms", "noaa", "usgs", "cdec"]
30+
structures_default: ["dms"]
31+
32+
# registry files available to repos
33+
registries:
34+
continuous: station_dbase.csv
35+
processed: processed_series.csv
36+
structures: structures.csv
37+
38+
repos:
39+
formatted:
40+
root: "//cnrastore-bdo/Modeling_Data/repo/continuous/formatted"
41+
registry: continuous
42+
key_column: station_id
43+
source_priority_mode: by_registry_column
44+
source_priority_column: agency
45+
filename_templates:
46+
- "{source}_{key@subloc}_{agency_id}_{param}_{year}.csv"
47+
search:
48+
use_source_slot: true
49+
shard_style: auto
50+
51+
52+
screened:
53+
root: "//cnrastore-bdo/Modeling_Data/repo/continuous/screened"
54+
registry: continuous
55+
key_column: station_id
56+
source_priority_mode: by_registry_column
57+
source_priority_column: agency
58+
filename_templates:
59+
- "{agency}_{key@subloc}_{agency_id}_{param}_{year}.csv"
60+
search:
61+
use_source_slot: true
62+
shard_style: auto
63+
64+
65+
processed:
66+
root: "//cnrastore-bdo/Modeling_Data/repo/continuous/processed"
67+
registry: processed
68+
key_column: station_id
69+
source_priority_mode: repo_default
70+
source_priority_group: processed_default
71+
filename_templates:
72+
- "{source}_{key@subloc}_{param@modifier}.csv"
73+
- "{source}_{key@subloc}_{param@modifier}_{year}.csv"
74+
- "{source}_{key@subloc}_{param}_{year}.csv"
75+
- "{source}_{key@subloc}_{param}.csv"
76+
search:
77+
use_source_slot: true
78+
shard_style: auto
79+
80+
structures:
81+
root: "//cnrastore-bdo/Modeling_Data/repo/structures"
82+
registry: structures
83+
key_column: structure_id
84+
source_priority_mode: repo_default
85+
source_priority_group: structures_default
86+
filename_templates:
87+
- "{source}_{key}_{param}.csv"
88+
- "{source}_{key}_{param}_{year}.csv"
89+
search:
90+
use_source_slot: true
91+
shard_style: auto
92+
parse:
93+
style: template
94+
3195
screen_config_v20230126: screen_config_v20230126.yaml
32-
screen_config: screen_config_v20230126.yaml
96+
screen_config: screen_config_v20230126.yaml

dms_datastore/config_data/station_dbase.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
id,agency_id,name,lat,lon,x,y,d1641_id,wdl_id,cdec_id,agency,stage,flow,quality,notes
1+
station_id,agency_id,name,lat,lon,x,y,d1641_id,wdl_id,cdec_id,agency,stage,flow,quality,notes
22
anh2,B95020,San Joaquin at Antioch,38.01784713,-121.802883,605075.1,4208474.7,D12A,B95020,ANH,dwr_ncro,x,,,USBR and DWR have equipment within a 30x30 building (in a 10x10 room) located on a pier that is the City of Antioch's pumping station
33
ben,B94175,Mokelumne River at Benson's Ferry,38.2555,-121.44,636491.3,4235314.6,,B94175,BEN,dwr_ncro,x,,,
44
ccs,CCS,Cache Slough,38.29713655,-121.7477601,609500.3,4239525.4,,,CCS,usbr,,,x,Old Vallejo Pumping Plant

dms_datastore/config_data/station_subloc.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# subloc is the "sublocation" which can differentiate anything you want
33
# that the agency reports as a station, but typically is used for multiple sensor depths
44
# z is instrument elevation in meters NAVD
5-
id,subloc,z,comment
5+
station_id,subloc,z,comment
66
mal,upper,-1.0,
77
mal,lower,-9.4,
88
mrz,upper,-1.0,

0 commit comments

Comments
 (0)