Skip to content

Commit a81ac4f

Browse files
EliEli
authored andcommitted
Fix usgs_multi for cases with multiple contributors to sublocations. Removed extra help option on download_nwis.
1 parent 79c0e34 commit a81ac4f

File tree

3 files changed

+103
-59
lines changed

3 files changed

+103
-59
lines changed

dms_datastore/download_nwis.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,6 @@ def download_nwis(dest_dir, start, end, param, stations, overwrite, stationfile)
463463
@click.option("--logdir", type=click.Path(path_type=Path), default="logs")
464464
@click.option("--debug", is_flag=True)
465465
@click.option("--quiet", is_flag=True)
466-
@click.help_option("-h", "--help")
467466
@click.argument("stationfile", nargs=-1)
468467
def download_nwis_cli(dest_dir, start, end, param, stations, overwrite, stationfile,logdir, debug, quiet):
469468
"""CLI for downloading NWIS (National Water Information System)."""

dms_datastore/read_multi.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,9 @@ def ts_multifile(
327327
if filter_date(metafname, start, end):
328328
continue
329329
ts = read_ts(tsfile, force_regular=force_regular)
330+
print("tsfiles")
331+
print(tsfile)
332+
print("columns:",ts.columns)
330333
dup_mask = ts.index.duplicated(keep=False)
331334
if dup_mask.any():
332335
dup_index = ts.index[dup_mask]

dms_datastore/usgs_multi.py

Lines changed: 100 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323

2424
def _quarantine_file(fname, quarantine_dir="quarantine"):
2525
if not os.path.exists(quarantine_dir):
26-
os.makedirs("quarantine")
27-
shutil.copy(fname, "quarantine")
26+
os.makedirs(quarantine_dir)
27+
shutil.copy(fname, quarantine_dir)
28+
2829

2930

3031
def usgs_scan_series_json(fname):
@@ -278,69 +279,110 @@ def process_multivariate_usgs(repo="formatted", data_path=None, pat=None, rescan
278279
)
279280
vertical_non = [0, 0] # for counting how many subloc are vertical or not
280281

281-
# first process all known sublocations that are meant to be kept intact,
282-
# dropping them as processed
283-
# then if one left it is default and if many use the average
282+
# Partition every present source column into semantic sublocation groups,
283+
# then reduce each group to a single univariate "value" series.
284+
grouped_cols = {}
285+
284286
for index, row in subdf.iterrows():
285-
asubloc = row.asubloc[:]
286-
logger.debug(f"Isolating sublocation {asubloc[:]}")
287-
if asubloc[:] in ["lower", "upper", "upward", "vertical"]:
288-
# write out each sublocation as individual file
289-
selector = (
290-
"value"
291-
if len(ts.columns) == 1 and ts.columns[0] == "value"
292-
else f"{row.ts_id}_value"
287+
ts_id = str(row.ts_id)
288+
asubloc = str(row.asubloc)
289+
290+
selector = (
291+
"value"
292+
if len(ts.columns) == 1 and ts.columns[0] == "value"
293+
else f"{ts_id}_value"
294+
)
295+
296+
if selector not in ts.columns:
297+
logger.debug(f"Selector failed: {selector} columns: {ts.columns}")
298+
continue
299+
300+
# Keep existing mapped/lookup semantics from the scan table.
301+
# Only normalize empty/unknown labels to default here.
302+
bucket = str(asubloc).strip().lower()
303+
if bucket in ["", "nan", "none"]:
304+
bucket = "default"
305+
306+
grouped_cols.setdefault(bucket, []).append((selector, row))
307+
308+
written_any = False
309+
310+
for bucket, members in grouped_cols.items():
311+
cols = [col for col, _ in members if col in ts.columns]
312+
if not cols:
313+
continue
314+
315+
# Collapse this bucket to a single univariate series.
316+
if len(cols) == 1:
317+
out = ts[[cols[0]]].copy()
318+
else:
319+
out = ts[cols].mean(axis=1, skipna=True).to_frame()
320+
321+
out.columns = ["value"]
322+
323+
# Skip empty outputs
324+
if not out["value"].notna().any():
325+
logger.debug(
326+
f"Grouped output for {station_id} {param} bucket {bucket} is all-NA; skipping"
293327
)
328+
continue
329+
330+
meta_out = dict(original_header)
331+
332+
ts_ids = [str(r.ts_id) for _, r in members]
333+
var_ids = [str(r.var_id) for _, r in members]
294334

295-
try:
296-
univariate = ts[selector]
297-
except:
298-
logger.debug(f"Selector failed: {selector} columns: {ts.columns}")
299-
continue
300-
301-
if univariate.first_valid_index() is None:
302-
ts = ts.drop([selector], axis=1)
303-
# empty for the file
304-
continue
305-
original_header["agency_ts_id"] = row.ts_id
306-
original_header["agency_var_id"] = row.var_id
307-
original_header["sublocation"] = asubloc
308-
original_header["subloc_comment"] = (
309-
"multivariate file separated, mention of other series omitted in this file may appear in original header"
335+
if len(ts_ids) == 1:
336+
meta_out["agency_ts_id"] = ts_ids[0]
337+
else:
338+
meta_out["agency_ts_id"] = ts_ids
339+
340+
if len(var_ids) == 1:
341+
meta_out["agency_var_id"] = var_ids[0]
342+
else:
343+
meta_out["agency_var_id"] = var_ids
344+
345+
meta_out["sublocation"] = bucket
346+
347+
if len(cols) > 1:
348+
meta_out["subloc_comment"] = (
349+
f"value averages {len(cols)} source series assigned to sublocation {bucket}"
310350
)
311-
meta["subloc"] = asubloc
312-
newfname = newfname = meta_to_filename(meta, repo="formatted")
313-
work_dir, newfname_f = os.path.split(newfname)
314-
newfpath = os.path.join(tmpdir, newfname_f)
315-
univariate.columns = ["value"]
316-
univariate.name = "value"
317-
logger.debug(f"Writing to {newfpath}")
318-
write_ts_csv(univariate, newfpath, original_header, chunk_years=True)
319-
vertical_non[0] = vertical_non[0] + 1
320-
ts = ts.drop([selector], axis=1)
321-
322-
ncol = len(ts.columns)
323-
if ncol == 0:
324-
# No columns were left. Delete the original file as its contents have been parsed to other files
325-
logger.debug(f"All columns recognized for {fn}")
351+
else:
352+
meta_out["subloc_comment"] = (
353+
"multivariate file separated into sublocation outputs"
354+
)
355+
356+
meta_out["source_columns"] = cols
357+
358+
meta_file = dict(meta)
359+
if bucket == "default":
360+
meta_file.pop("subloc", None)
361+
else:
362+
meta_file["subloc"] = bucket
363+
364+
newfname = meta_to_filename(meta_file, repo="formatted")
365+
work_dir, newfname_f = os.path.split(newfname)
366+
newfpath = os.path.join(tmpdir, newfname_f)
367+
368+
logger.debug(
369+
f"Writing grouped output for {station_id} {param} bucket {bucket} "
370+
f"from columns {cols} to {newfpath}"
371+
)
372+
write_ts_csv(out, newfpath, meta_out, chunk_years=True)
373+
written_any = True
374+
375+
if written_any:
376+
logger.debug(
377+
f"Processed multivariate file {fn} into grouped outputs; marking original for deletion"
378+
)
326379
set_of_deletions.add(fn)
327380
else:
328-
if ncol == 1:
329-
logger.debug(f"One column left for {fn}, renaming and documenting")
330-
ts.columns = ["value"]
331-
else:
381+
logger.warning(
382+
f"Quarantining {fn} in usgs_multi: no non-empty grouped outputs could be formed"
383+
)
384+
_quarantine_file(fn)
332385

333-
logger.debug(
334-
f"Several sublocations for columns, averaging {fn} and labeling as value"
335-
)
336-
# Multivariate not collapsed, but we will add a 'value' column that aggregates and note this in metadata
337-
ts["value"] = ts.mean(axis=1)
338-
original_header["subloc_comment"] = "value averages sublocations"
339-
original_header["agency_ts_id"] = subdf.ts_id.tolist()
340-
if ts.first_valid_index() is None:
341-
continue # No more good data. bail
342-
fpath_write = os.path.join(tmpdir, filepart)
343-
write_ts_csv(ts, fpath_write, metadata=original_header, chunk_years=True)
344386
for fdname in set_of_deletions:
345387
logger.debug(f"Removing {fdname}")
346388
os.remove(fdname)

0 commit comments

Comments
 (0)