Skip to content

Commit 7684a1b

Browse files
EliEli
authored andcommitted
Fixed station_info after column name change.
Reduced verbosity of downloaders Typically one of two possible codes for a variable fails. Now this is not reported. Created daily repo, adjusted dropbox recipes for structures and daily data.
1 parent 013a696 commit 7684a1b

13 files changed

Lines changed: 1383 additions & 463 deletions

dms_datastore/config_data/dstore_config.yaml

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ repo_base: "//cnrastore-bdo/Modeling_Data/repo"
1313
registries:
1414
continuous: station_dbase.csv
1515
processed: processed_series.csv
16-
structures: structures.csv
16+
structures: structures_registry.csv
17+
daily: station_dbase.csv
1718

1819
repos:
1920
formatted:
@@ -72,6 +73,7 @@ repos:
7273
data_key: [station_id, subloc, param, modifier, shard]
7374

7475

76+
7577
structures:
7678
root: "//cnrastore-bdo/Modeling_Data/repo/structures"
7779
registry: structures
@@ -83,5 +85,27 @@ repos:
8385
- "{processor}_{structure_id@subloc}_{param@modifier}_{year}.csv"
8486

8587

88+
daily_formatted:
89+
root: "//cnrastore-bdo/Modeling_Data/repo/daily/formatted"
90+
registry: continuous # aliases the station data base that provides supplementary metadata like geolocation and agency
91+
site_key: station_id # registry and template use this term for the site's identifier (was "key")
92+
provider_key: source # provider is the agency mainly involved in delivering the data.
93+
# Generic for "source" (datamart) "agency" (collecting agency) and
94+
# # "processor" (group synthesizing, manipulating or filling data)
95+
provider_resolution_mode: registry_column # way of differentiating data that cover
96+
# the same logical data set from two providers
97+
provider_resolution_column: agency # consult the registry column
98+
provider_resolution_order:
99+
bbid: ["cdec"] # ncro is prioritized over cdec
100+
usgs: ["usgs"]
101+
dwr: ["dwr", "cdec"]
102+
ebmud: ["usgs", "ebmud", "cdec"]
103+
filename_templates:
104+
- "{source}_{station_id@subloc}_{agency_id}_{param}.csv"
105+
file_key: [source, station_id, subloc, param] # sufficient key to identify a file/wildcard group.
106+
# shard is conceptual
107+
data_key: [station_id, subloc, param] # sufficient to describe a stream/series of data
108+
109+
86110
screen_config_v20230126: screen_config_v20230126.yaml
87111
screen_config: screen_config_v20230126.yaml

dms_datastore/config_data/station_dbase.csv

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,4 +297,7 @@ rrcs,rrcs,Railroad Cut South,37.9395028,-121.5564612,626846.1491,4200085.169,,,,
297297
x2,x2,Daily X2 Calculation,37.83330,-122.41670,626846.1491,4200085.169,,cx2,,dayflow,,x,,Calculated X2
298298
ndoi,ndoi,Net Delta Outflow Index,37.83330,-122.41670,626846.1491,4200085.169,,cx2,,dayflow,,x,,
299299
odtw,B9537700,Old River Downstream of Tracy Wildlife,37.798111,-121.468861,634801.65,4184519.2,,B9537700,,dwr_ncro,,,x,Post-MSS moved from OAAD
300-
god2,11455740,Suisun Slough Near Benicia,38.12379722,-122.0813861,580516.9,4219949.17,,,,usgs,,,,DWR measures WQ at GOD
300+
god2,11455740,Suisun Slough Near Benicia,38.12379722,-122.0813861,580516.9,4219949.17,,,,usgs,,,,DWR measures WQ at GOD
301+
bbi,bbi,Byron Bethany Irrigation District Diversion,37.8298,-121.5574,626951.948,4187911.381,,,BBI,bbid,,,,
302+
ccw,ccw,CCWD Middle R PP on Victoria Canal,37.866540,-121.543790,628094.0,4192006.7,,,CCW,ccwd,,,,
303+

dms_datastore/download_cdec.py

Lines changed: 102 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,21 @@
3535

3636

3737
def download_station_data(
38-
row, dest_dir, start, end, endfile, param, overwrite, freq, failures, skips
38+
row, dest_dir, start, end, endfile, param, overwrite, freq
3939
):
40-
# Extract station information
4140
station = row.station_id
4241
try:
4342
cdec_id = row.cdec_id.lower()
44-
except:
43+
except Exception:
4544
cdec_id = station
4645

4746
agency_id = row.agency_id
4847
p = row.param
4948
z = row.src_var_id
5049
subloc = row.subloc
51-
yearname = (
52-
f"{start.year}_{endfile}" # if start.year != end.year else f"{start.year}"
53-
)
50+
semantic_key = (station, p)
51+
52+
yearname = f"{start.year}_{endfile}"
5453

5554
if subloc == "default":
5655
path = os.path.join(
@@ -61,55 +60,75 @@ def download_station_data(
6160
dest_dir, f"cdec_{station}@{subloc}_{agency_id}_{p}_{yearname}.csv"
6261
).lower()
6362

63+
result = {
64+
"station": station,
65+
"paramname": p,
66+
"param_code": z,
67+
"semantic_key": semantic_key,
68+
"path": path,
69+
"found": False,
70+
"skipped": False,
71+
"reason": None,
72+
"durations_tried": ["E", "H", "D", "M"] if freq is None else [freq],
73+
"sensor_codes_tried": [z],
74+
}
75+
6476
if os.path.exists(path) and overwrite is False:
65-
logger.info("Skipping existing station because file exists: %s" % path)
66-
skips.append(path)
67-
return
77+
logger.info("Skipping existing station because file exists: %s", path)
78+
result["skipped"] = True
79+
result["reason"] = "exists"
80+
return result
81+
6882
stime = start.strftime("%m-%d-%Y")
6983
etime = end if end == "Now" else end.strftime("%m-%d-%Y")
84+
85+
logger.debug(f"Downloading station {station} parameter {p} sensor code {z}")
86+
7087
found = False
71-
logger.info(f"Downloading station {station} parameter {p} sensor code {z}")
72-
zz = [z]
73-
for code in zz:
88+
for code in [z]:
7489
dur_codes = ["E", "H", "D", "M"] if freq is None else [freq]
7590
for dur in dur_codes:
76-
station_query = f"http://{cdec_base_url}/dynamicapp/req/CSVDataServletPST?Stations={cdec_id}&SensorNums={code}&dur_code={dur}&Start={stime}&End={etime}"
91+
station_query = (
92+
f"http://{cdec_base_url}/dynamicapp/req/CSVDataServletPST"
93+
f"?Stations={cdec_id}&SensorNums={code}&dur_code={dur}"
94+
f"&Start={stime}&End={etime}"
95+
)
96+
7797
maxattempt = 5
78-
response = None
98+
station_html = ""
7999
for iattempt in range(maxattempt):
80100
try:
81101
response = requests.get(station_query)
82102
station_html = response.text.replace("\r", "")
83103
break
84-
except:
104+
except Exception:
85105
time.sleep(1)
86106
station_html = ""
87-
found = False
88107

89108
if (station_html.startswith("Title") and len(station_html) > 16) or (
90109
station_html.startswith("STATION_ID") and len(station_html) > 90
91110
):
92-
found = True
93111
with open(path, "w") as f:
94112
f.write(station_html)
95-
logger.debug("Found, duration code: %s" % dur)
113+
logger.debug("Found, duration code: %s", dur)
114+
found = True
115+
result["found"] = True
116+
result["reason"] = "success"
117+
result["duration_found"] = dur
96118
break
119+
97120
if found:
98121
break
122+
99123
if not found:
100-
failures.append((station, p))
101-
logger.info(f"No data found for durations {dur_codes}, sensor codes {zz}")
124+
result["reason"] = "no_data"
102125

126+
return result
103127

104128
def cdec_download(
105-
stations, dest_dir, start, end=None, param=None, overwrite=False, freq=None
129+
stations, dest_dir, start, end=None, param=None, overwrite=False, freq=None, max_workers=6
106130
):
107-
"""Download robot for CDEC
108-
Requires a list of stations, destination directory and start/end date
109-
These dates are passed on to CDEC ... actual return dates can be
110-
slightly different
111-
"""
112-
131+
"""Download robot for CDEC."""
113132
if end is None:
114133
end = dt.datetime.now()
115134
endfile = 9999
@@ -118,12 +137,7 @@ def cdec_download(
118137

119138
if not os.path.exists(dest_dir):
120139
os.mkdir(dest_dir)
121-
failures = []
122-
skips = []
123140

124-
# This is a small hardwired section to cull ec values
125-
# from the wrong sublocation/program
126-
# CDEC uses a different variable code for each
127141
bottom_codes = {"92", "102"}
128142

129143
stations = stations.copy()
@@ -140,47 +154,66 @@ def cdec_download(
140154
& ~stations.src_var_id.isin(bottom_codes)
141155
)
142156
stations = stations.loc[~subloc_inconsist, :]
143-
for index, row in stations.iterrows():
144-
download_station_data(
145-
row, dest_dir, start, end, endfile, param, overwrite, freq, failures, skips
146-
)
147-
# # Use ThreadPoolExecutor
148-
# with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
149-
# # Schedule the download tasks and handle them asynchronously
150-
# futures = []
151-
# for index, row in stations.iterrows():
152-
# future = executor.submit(
153-
# download_station_data,
154-
# row,
155-
# dest_dir,
156-
# start,
157-
# end,
158-
# endfile,
159-
# param,
160-
# overwrite,
161-
# freq,
162-
# failures,
163-
# skips,
164-
# )
165-
# futures.append(future)
166-
167-
# # Optionally, handle the results of the tasks
168-
# for future in concurrent.futures.as_completed(futures):
169-
# try:
170-
# future.result() # This line can be used to handle results or exceptions from the tasks
171-
# except Exception as e:
172-
# logger.error(f"Exception occurred during download: {e}")
173-
174-
if len(failures) == 0:
175-
logger.debug("No failed station variable combinations")
176-
else:
177-
logger.debug("Failed query stations: ")
178-
for failure in failures:
179-
logger.info(failure)
180157

158+
results = []
159+
160+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
161+
futures = []
162+
for _, row in stations.iterrows():
163+
futures.append(
164+
executor.submit(
165+
download_station_data,
166+
row,
167+
dest_dir,
168+
start,
169+
end,
170+
endfile,
171+
param,
172+
overwrite,
173+
freq,
174+
)
175+
)
176+
177+
for future in concurrent.futures.as_completed(futures):
178+
try:
179+
results.append(future.result())
180+
except Exception as e:
181+
logger.error(f"Exception occurred during download: {e}")
182+
183+
grouped = {}
184+
for result in results:
185+
if result is None:
186+
continue
187+
key = result["semantic_key"]
188+
grouped.setdefault(key, []).append(result)
189+
190+
final_failures = []
191+
skips = []
192+
193+
for key, group in grouped.items():
194+
if any(item.get("found") for item in group):
195+
success_codes = [str(item["param_code"]) for item in group if item.get("found")]
196+
logger.debug(f"Semantic success for {key} via code(s): {', '.join(success_codes)}")
197+
continue
198+
199+
if all(item.get("skipped") for item in group):
200+
skips.extend([item["path"] for item in group if item.get("path")])
201+
continue
202+
203+
final_failures.append(key)
181204

205+
durs = sorted({dur for item in group for dur in item.get("durations_tried", [])})
206+
codes = sorted({str(item["param_code"]) for item in group})
207+
logger.info(f"No data found for station={key[0]} param={key[1]} durations={durs}, sensor codes={codes}")
182208

209+
if len(final_failures) == 0:
210+
logger.debug("No failed station variable combinations")
211+
else:
212+
logger.info("Failed query stations:")
213+
for failure in final_failures:
214+
logger.info(failure)
183215

216+
return results
184217

185218
def download_cdec(
186219
dest_dir,

0 commit comments

Comments
 (0)