Skip to content

Commit 4e0a5c4

Browse files
EliEli
authored andcommitted
Changed --fpath arg back to original name. Added progress messages, changed temp directory handling
1 parent 87e95b2 commit 4e0a5c4

File tree

1 file changed

+116
-109
lines changed

1 file changed

+116
-109
lines changed

dms_datastore/usgs_multi.py

Lines changed: 116 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,24 @@ def usgs_multivariate(pat, outfile):
123123
("c24", "287159", "upward"),
124124
]
125125

126+
logger.info("Start scanning phase looking for multivariate entries")
126127
with open(outfile, "w", encoding="utf-8") as out:
127128
files = glob.glob(pat)
129+
nfiles = len(files)
128130
data = []
129-
for fname in files:
131+
for i,fname in enumerate(files,start=1):
130132
meta = interpret_fname(fname, repo="formatted")
131133
try:
132134
ts = read_ts(fname, nrows=4000)
133135
except:
134136
logger.warning(f"Failed to read file with read_ts(): {fname}")
135137
continue
136138

139+
if i == 1 or i % 500 == 0 or i == nfiles:
140+
logger.info(
141+
f"USGS scan progress: {i}/{nfiles} files, currently on {fname} ")
142+
143+
137144
multi_cols = ts.shape[1] > 1
138145
subloc_df = sublocation_df()
139146

@@ -180,7 +187,7 @@ def usgs_multivariate(pat, outfile):
180187
asubloc = "mid"
181188

182189
if random_check and not known_multi and asubloc != "default":
183-
logger.error(
190+
logger.warning(
184191
f"Sublocation labeling was detected during spot check in station {station_id} param {param} but no listing in subloc table"
185192
)
186193

@@ -230,131 +237,131 @@ def process_multivariate_usgs(repo="formatted", data_path=None, pat=None, rescan
230237
logger.info("Entering process_multivariate_usgs")
231238
actual_fpath = data_path if data_path is not None else repo_root(repo)
232239
# todo: straighten out fpath and pat stuff
233-
tempfile.tempdir = "."
234-
tmpdir = tempfile.TemporaryDirectory()
235-
236-
if pat is None:
237-
pat = os.path.join(actual_fpath, "usgs*.csv")
238-
else:
239-
pat = os.path.join(actual_fpath, pat)
240-
241-
242-
# This recreates or reuses list of multivariate files. Being multivariate is something that has
243-
# to be assessed over the full period of record
244-
if rescan:
245-
df = usgs_multivariate(pat, "usgs_subloc_meta_new.csv")
246-
else:
247-
df = pd.read_csv("usgs_subloc_meta.csv", header=0, dtype=str)
248-
df.reset_index()
249-
df.index.name = "id"
250-
filenames = glob.glob(pat)
251-
set_of_deletions = set()
252-
253-
for fn in filenames:
254-
direct, filepart = os.path.split(fn)
255-
meta = interpret_fname(filepart, repo="formatted")
256-
station_id = meta["station_id"]
257-
param = meta["param"]
258-
logger.info(f"Working on {fn}, {station_id}, {param}")
259-
subdf = df.loc[(df.station_id == station_id) & (df.param == param), :]
260-
if subdf.empty:
261-
logger.debug("No entry in table indicating multivariate content, skipping")
262-
continue
263-
# if len(subdf) == 1:
264-
# logger.info("Dataset with only one sublocation not expected for station_id {station_id}")
265-
266-
original_header = read_yaml_header(fn)
267-
268-
ts = read_ts(fn)
269-
logger.debug(
270-
f"Number of sublocation metadata entries for {station_id} {param} = {len(subdf)}"
271-
)
272-
vertical_non = [0, 0] # for counting how many subloc are vertical or not
273-
274-
# first process all known sublocations that are meant to be kept intact,
275-
# dropping them as processed
276-
# then if one left it is default and if many use the average
277-
for index, row in subdf.iterrows():
278-
asubloc = row.asubloc[:]
279-
logger.debug(f"Isolating sublocation {asubloc[:]}")
280-
if asubloc[:] in ["lower", "upper", "upward", "vertical"]:
281-
# write out each sublocation as individual file
282-
selector = (
283-
"value"
284-
if len(ts.columns) == 1 and ts.columns[0] == "value"
285-
else f"{row.ts_id}_value"
286-
)
240+
with tempfile.TemporaryDirectory() as tmpdir:
241+
if pat is None:
242+
pat = os.path.join(actual_fpath, "usgs*.csv")
243+
else:
244+
pat = os.path.join(actual_fpath, pat)
287245

288-
try:
289-
univariate = ts[selector]
290-
except:
291-
logger.warning(f"Selector failed: {selector} columns: {ts.columns}")
292-
continue
246+
# This recreates or reuses list of multivariate files. Being multivariate is something that has
247+
# to be assessed over the full period of record
248+
if rescan:
249+
df = usgs_multivariate(pat, "usgs_subloc_meta_new.csv")
250+
else:
251+
df = pd.read_csv("usgs_subloc_meta.csv", header=0, dtype=str)
252+
df.reset_index()
253+
df.index.name = "id"
254+
filenames = glob.glob(pat)
255+
set_of_deletions = set()
256+
257+
logger.info("Begin usgs_multi consolidation and separation phase")
258+
for i,fn in enumerate(filenames,start=1):
259+
direct, filepart = os.path.split(fn)
260+
261+
262+
meta = interpret_fname(filepart, repo="formatted")
263+
station_id = meta["station_id"]
264+
param = meta["param"]
265+
logger.info(f"Working on {fn}, {station_id}, {param}")
266+
subdf = df.loc[(df.station_id == station_id) & (df.param == param), :]
267+
if subdf.empty:
268+
logger.debug("No entry in table indicating multivariate content, skipping")
269+
continue
270+
# if len(subdf) == 1:
271+
# logger.info("Dataset with only one sublocation not expected for station_id {station_id}")
272+
273+
original_header = read_yaml_header(fn)
274+
275+
ts = read_ts(fn)
276+
logger.debug(
277+
f"Number of sublocation metadata entries for {station_id} {param} = {len(subdf)}"
278+
)
279+
vertical_non = [0, 0] # for counting how many subloc are vertical or not
280+
281+
# first process all known sublocations that are meant to be kept intact,
282+
# dropping them as processed
283+
# then if one left it is default and if many use the average
284+
for index, row in subdf.iterrows():
285+
asubloc = row.asubloc[:]
286+
logger.debug(f"Isolating sublocation {asubloc[:]}")
287+
if asubloc[:] in ["lower", "upper", "upward", "vertical"]:
288+
# write out each sublocation as individual file
289+
selector = (
290+
"value"
291+
if len(ts.columns) == 1 and ts.columns[0] == "value"
292+
else f"{row.ts_id}_value"
293+
)
293294

294-
if univariate.first_valid_index() is None:
295+
try:
296+
univariate = ts[selector]
297+
except:
298+
logger.debug(f"Selector failed: {selector} columns: {ts.columns}")
299+
continue
300+
301+
if univariate.first_valid_index() is None:
302+
ts = ts.drop([selector], axis=1)
303+
# empty for the file
304+
continue
305+
original_header["agency_ts_id"] = row.ts_id
306+
original_header["agency_var_id"] = row.var_id
307+
original_header["sublocation"] = asubloc
308+
original_header["subloc_comment"] = (
309+
"multivariate file separated, mention of other series omitted in this file may appear in original header"
310+
)
311+
meta["subloc"] = asubloc
312+
newfname = newfname = meta_to_filename(meta, repo="formatted")
313+
work_dir, newfname_f = os.path.split(newfname)
314+
newfpath = os.path.join(tmpdir.name, newfname_f) ## todo: hardwire
315+
univariate.columns = ["value"]
316+
univariate.name = "value"
317+
logger.debug(f"Writing to {newfpath}")
318+
write_ts_csv(univariate, newfpath, original_header, chunk_years=True)
319+
vertical_non[0] = vertical_non[0] + 1
295320
ts = ts.drop([selector], axis=1)
296-
# empty for the file
297-
continue
298-
original_header["agency_ts_id"] = row.ts_id
299-
original_header["agency_var_id"] = row.var_id
300-
original_header["sublocation"] = asubloc
301-
original_header["subloc_comment"] = (
302-
"multivariate file separated, mention of other series omitted in this file may appear in original header"
303-
)
304-
meta["subloc"] = asubloc
305-
newfname = newfname = meta_to_filename(meta, repo="formatted")
306-
work_dir, newfname_f = os.path.split(newfname)
307-
newfpath = os.path.join(tmpdir.name, newfname_f) ## todo: hardwire
308-
univariate.columns = ["value"]
309-
univariate.name = "value"
310-
logger.debug(f"Writing to {newfpath}")
311-
write_ts_csv(univariate, newfpath, original_header, chunk_years=True)
312-
vertical_non[0] = vertical_non[0] + 1
313-
ts = ts.drop([selector], axis=1)
314-
315-
ncol = len(ts.columns)
316-
if ncol == 0:
317-
# No columns were left. Delete the original file as its contents have been parsed to other files
318-
logger.debug(f"All columns recognized for {fn}")
319-
set_of_deletions.add(fn)
320-
else:
321-
if ncol == 1:
322-
logger.debug(f"One column left for {fn}, renaming and documenting")
323-
ts.columns = ["value"]
321+
322+
ncol = len(ts.columns)
323+
if ncol == 0:
324+
# No columns were left. Delete the original file as its contents have been parsed to other files
325+
logger.debug(f"All columns recognized for {fn}")
326+
set_of_deletions.add(fn)
324327
else:
328+
if ncol == 1:
329+
logger.debug(f"One column left for {fn}, renaming and documenting")
330+
ts.columns = ["value"]
331+
else:
332+
333+
logger.debug(
334+
f"Several sublocations for columns, averaging {fn} and labeling as value"
335+
)
336+
# Multivariate not collapsed, but we will add a 'value' column that aggregates and note this in metadata
337+
ts["value"] = ts.mean(axis=1)
338+
original_header["subloc_comment"] = "value averages sublocations"
339+
original_header["agency_ts_id"] = subdf.ts_id.tolist()
340+
if ts.first_valid_index() is None:
341+
continue # No more good data. bail
342+
fpath_write = os.path.join(tmpdir.name, filepart)
343+
write_ts_csv(ts, fpath_write, metadata=original_header, chunk_years=True)
344+
for fdname in set_of_deletions:
345+
logger.debug(f"Removing {fdname}")
346+
os.remove(fdname)
347+
shutil.copytree(tmpdir.name, actual_fpath, dirs_exist_ok=True)
325348

326-
logger.debug(
327-
f"Several sublocations for columns, averaging {fn} and labeling as value"
328-
)
329-
# Multivariate not collapsed, but we will add a 'value' column that aggregates and note this in metadata
330-
ts["value"] = ts.mean(axis=1)
331-
original_header["subloc_comment"] = "value averages sublocations"
332-
original_header["agency_ts_id"] = subdf.ts_id.tolist()
333-
if ts.first_valid_index() is None:
334-
continue # No more good data. bail
335-
fpath_write = os.path.join(tmpdir.name, filepart)
336-
write_ts_csv(ts, fpath_write, metadata=original_header, chunk_years=True)
337-
for fdname in set_of_deletions:
338-
logger.debug(f"Removing {fdname}")
339-
os.remove(fdname)
340-
shutil.copytree(tmpdir.name, actual_fpath, dirs_exist_ok=True)
341-
del tmpdir
342349
logger.info("Exiting process_multivariate_usgs")
343350

344351

345352
@click.command()
346353
@click.option("--pat", default="usgs*.csv", help="Pattern of files to process")
347354
@click.option("--repo", default="formatted", help="Configured repo name for naming/parse rules.")
348355
@click.option(
349-
"--data-path",
356+
"--fpath",
350357
default=None,
351358
help="Directory containing the files. Defaults to the configured root of --repo.",
352359
)
353360
@click.option("--logdir", type=click.Path(path_type=Path), default=None)
354361
@click.option("--debug", is_flag=True)
355362
@click.option("--quiet", is_flag=True)
356363
@click.help_option("-h", "--help")
357-
def usgs_multi_cli(pat, repo, data_path, logdir=None, debug=False, quiet=False):
364+
def usgs_multi_cli(pat, repo, fpath, logdir=None, debug=False, quiet=False):
358365
"""CLI for processing multivariate USGS files."""
359366
# recatalogs the unique series. If false an old catalog will be used, which is useful
360367
# for sequential debugging.
@@ -371,7 +378,7 @@ def usgs_multi_cli(pat, repo, data_path, logdir=None, debug=False, quiet=False):
371378
logdir=logdir,
372379
logfile_prefix="usgs_multi"
373380
)
374-
process_multivariate_usgs(repo=repo, data_path=data_path, pat=pat, rescan=True)
381+
process_multivariate_usgs(repo=repo, data_path=fpath, pat=pat, rescan=True)
375382

376383

377384
if __name__ == "__main__":

0 commit comments

Comments
 (0)