Skip to content

Commit 4b4babc

Browse files
EliEli
authored andcommitted
Fix cdec_id lookup when prepping downloads.
1 parent 260e4e3 commit 4b4babc

4 files changed

Lines changed: 237 additions & 47 deletions

File tree

dms_datastore/config_data/dstore_config.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ repos:
4040
# (typically first slot) and drives globbing and merge order
4141
provider_key: source
4242

43-
# Strategy for resolving multiple providers for the same data_key if allwed. Possibilities:
44-
# assume_unique: only one provider allowed (no resolution needed)
43+
# Strategy for resolving multiple providers for the same data_key if allwed.
44+
# Possibilities:
45+
# assume_unique: only one provider allowed in a repo (no resolution needed). See screened.
4546
# registry_column: use registry metadata to determine preferred providers
4647
provider_resolution_mode: registry_column
4748

dms_datastore/download_cdec.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,12 @@ def download_cdec(
263263
)
264264

265265
df = attach_subloc(df, default_subloc="default")
266-
df = attach_agency_id(df, repo_name="formatted", agency_id_col="agency_id")
266+
df = attach_agency_id(
267+
df,
268+
repo_name="formatted",
269+
agency_id_col="cdec_id",
270+
on_missing="drop",
271+
)
267272
vlookup = dstore_config.config_file("variable_mappings")
268273
df = attach_src_var_id(df, vlookup, source="cdec")
269274

dms_datastore/populate_repo.py

Lines changed: 143 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,107 @@
1-
#!/usr/bin/env python
2-
# -*- coding: utf-8 -*-
3-
41
"""
5-
Scripts to populate raw/incoming with populate() obtaining des, usgs, noaa, usgs, usbr
6-
usgs: files may have two series
7-
des: naive download will produce files from different instruments with time overlaps
8-
the script run rationalize_time_partitions for des
9-
10-
ncro: typically done with download_ncro which is a period of record downloader
11-
ncro is not realtime run populate2 to get the update for ncro
12-
run revise_time to correct start and end times.
13-
14-
What are steps to update just realtime
15-
16-
Need to add something for the daily stations and for O&M (Clifton Court, Banks)
2+
Raw download and staging utilities for external data providers.
3+
4+
This module orchestrates acquisition of raw time series files from multiple
5+
providers and writes them into a staging/raw directory using a raw naming
6+
convention. It does not apply repository merge or provider-resolution policy;
7+
those steps occur later in formatting and repo update workflows.
8+
9+
Overview
10+
--------
11+
The module supports two closely related workflows.
12+
13+
1. General population of raw data
14+
- ``populate_main()`` is the top-level orchestration entry point.
15+
- ``populate()`` loops over agencies and variables.
16+
- ``populate_repo()`` prepares a station request for one agency/parameter
17+
pair and dispatches to the appropriate downloader.
18+
19+
2. NCRO realtime supplementation
20+
- ``list_ncro_stations()`` scans existing NCRO files in a destination
21+
directory and extracts the logical station/parameter pairs already present.
22+
- ``supplement_ncro_with_cdec()`` uses that list to request corresponding CDEC data,
23+
intended to supplement NCRO's non-realtime coverage with a realtime feed.
24+
- ``populate_ncro_realtime()`` is a convenience wrapper around this path.
25+
26+
Raw naming
27+
----------
28+
Files handled by this module use a raw/downloader naming profile rather than
29+
repo-configured naming semantics. The profile is used only for parsing and
30+
renaming downloader outputs:
31+
32+
- ``{agency}_{station_id@subloc}_{agency_id}_{param}_{syear}_{eyear}.csv``
33+
- ``{agency}_{station_id@subloc}_{agency_id}_{param}_{year}.csv``
34+
35+
This naming is represented by ``RAW_NAMING`` and is intentionally separate from
36+
repo templates used by formatted/screened/processed repositories.
37+
38+
Download sequence
39+
-----------------
40+
A typical end-to-end raw population sequence is:
41+
42+
1. Build a station request from configured registries and variable mappings.
43+
2. Download raw files with the provider-specific downloader.
44+
3. Revise filename year fields to match the actual years present in the data.
45+
4. For DES, rationalize overlapping or inconsistent time partitions.
46+
5. For NCRO, optionally supplement with corresponding CDEC realtime files.
47+
48+
The NCRO supplementation step depends on the existing NCRO files already present
49+
in the destination directory, because those files are used to determine which
50+
station/parameter combinations should be mirrored from CDEC.
51+
52+
Station request preparation
53+
---------------------------
54+
Before calling a downloader, the module prepares a station-request DataFrame
55+
using helper functions such as:
56+
57+
- ``normalize_station_request()``
58+
- ``attach_agency_id()``
59+
- ``attach_src_var_id()``
60+
61+
The resulting request typically contains station identity, sublocation,
62+
parameter, agency/provider-specific identifier, and source-variable code needed
63+
by the target downloader.
64+
65+
Functions
66+
---------
67+
populate_main(dest, agencies=None, varlist=None, partial_update=False)
68+
Main orchestration entry point. Runs downloads, handles failures, triggers
69+
NCRO realtime supplementation when applicable, and performs selected
70+
post-download cleanup steps.
71+
72+
populate(dest, all_agencies=None, varlist=None, partial_update=False)
73+
Agency/variable loop used by ``populate_main()``.
74+
75+
populate_repo(agency, param, dest, start, end, overwrite=False, ...)
76+
Prepare a request for a single agency and parameter and invoke the matching
77+
downloader.
78+
79+
list_ncro_stations(dest)
80+
Inspect existing ``ncro_*.csv`` files in a destination directory and return
81+
a DataFrame describing the station/parameter combinations present.
82+
83+
supplement_ncro_with_cdec(df, dest, start, overwrite=False, ...)
84+
Request CDEC data corresponding to the station/parameter combinations
85+
identified from NCRO files.
86+
87+
populate_ncro_realtime(dest, realtime_start=...)
88+
Convenience wrapper that derives the NCRO station list from ``dest`` and
89+
passes it to ``supplement_ncro_with_cdec()``.
90+
91+
revise_filename_syears(...)
92+
revise_filename_syear_eyear(...)
93+
Adjust year fields in raw filenames to reflect the actual valid-data span.
94+
95+
Notes
96+
-----
97+
This module operates at the raw-file staging layer. It is expected that later
98+
steps such as reformatting, screening, and repo reconciliation will impose the
99+
stricter repository semantics defined elsewhere in the package.
100+
101+
Because this module sits between external downloaders and downstream repo logic,
102+
identifier handling is especially important. In particular, the NCRO-to-CDEC
103+
supplementation path is sensitive to how station identifiers and provider-
104+
specific IDs are prepared before download.
17105
"""
18106

19107
import glob
@@ -281,26 +369,48 @@ def list_ncro_stations(dest):
281369
logger.info(x)
282370
raise ValueError(f"Unable to parse station and parameter from name {x}")
283371

284-
return pd.DataFrame(
285-
data=stationlist, columns=["id", "param", "agency", "agency_id_from_file"]
372+
df = pd.DataFrame(
373+
data=stationlist, columns=["station_id", "param", "agency", "agency_id_from_file"]
286374
)
375+
df = df.drop_duplicates(subset=["station_id", "param"])
376+
return df
287377

288378

289-
def populate_repo2(df, dest, start, overwrite=False, ignore_existing=None):
379+
def supplement_ncro_with_cdec(df, dest, start, overwrite=False, ignore_existing=None):
290380
"""Currently used by ncro realtime."""
291381
vlookup = dstore_config.config_file("variable_mappings")
292-
df["station_id"] = df["id"].str.replace("'", "")
382+
df["station_id"] = df["station_id"].str.replace("'", "")
293383
df["subloc"] = "default"
294384

295385
if ignore_existing is not None:
296-
df = df[~df["id"].isin(ignore_existing)]
386+
df = df[~df["station_id"].isin(ignore_existing)]
297387

298388
source = "cdec"
299-
agency_id_col = "agency_id_from_file"
300-
stationlist = normalize_station_request(stationframe=df, default_subloc="default")
301-
stationlist = attach_agency_id(stationlist, repo_name="formatted", agency_id_col=agency_id_col)
389+
agency_id_col = "cdec_id"
390+
391+
stationlist = normalize_station_request(
392+
stationframe=df,
393+
default_subloc="default",
394+
)
395+
396+
stationlist = attach_agency_id(
397+
stationlist,
398+
repo_name="formatted",
399+
agency_id_col=agency_id_col,
400+
on_missing="drop",
401+
)
402+
403+
if stationlist.empty:
404+
logger.warning(
405+
"No NCRO stations remain for CDEC supplementation after dropping rows "
406+
"with missing %s",
407+
agency_id_col,
408+
)
409+
return
410+
302411
stationlist = attach_src_var_id(stationlist, vlookup, source=source)
303412
end = None
413+
logger.debug("NCRO CDEC supplementation request:\n%s", stationlist)
304414
downloaders["cdec"](stationlist, dest, start, end, overwrite)
305415

306416

@@ -413,18 +523,11 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False):
413523
logger.info(agent)
414524

415525

416-
def purge(dest):
417-
if purge:
418-
for pat in ["*.csv", "*.rdb"]:
419-
allfiles = glob.glob(os.path.join(dest, pat))
420-
for fname in allfiles:
421-
os.remove(fname)
422-
423526

424527
def populate_ncro_realtime(dest, realtime_start=pd.Timestamp(2021, 1, 1)):
425528
end = None
426529
ncrodf = list_ncro_stations(dest)
427-
populate_repo2(ncrodf, dest, realtime_start, overwrite=True)
530+
supplement_ncro_with_cdec(ncrodf, dest, realtime_start, overwrite=True)
428531

429532

430533

@@ -452,8 +555,12 @@ def populate_main(dest, agencies=None, varlist=None, partial_update=False):
452555
all_agencies = ["usgs", "dwr_des", "usbr", "noaa", "dwr_ncro", "dwr"]
453556
else:
454557
all_agencies = agencies
455-
do_ncro = ("ncro" in all_agencies) or ("dwr_ncro" in all_agencies)
456-
do_des = ("des" in all_agencies) or ("dwr_des" in all_agencies)
558+
559+
# Normalize agency names: convert "ncro" to "dwr_ncro" and "des" to "dwr_des"
560+
all_agencies = ["dwr_ncro" if ag == "ncro" else "dwr_des" if ag == "des" else ag for ag in all_agencies]
561+
562+
do_ncro = "dwr_ncro" in all_agencies
563+
do_des = "dwr_des" in all_agencies
457564

458565
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
459566
future_to_agency = {
@@ -499,7 +606,9 @@ def populate_debug_ncro_rename(dest, agencies=None, varlist=None):
499606
all_agencies = ["usgs", "dwr_des", "usbr", "noaa", "dwr_ncro", "dwr"]
500607
else:
501608
all_agencies = agencies
502-
do_ncro = ("ncro" in all_agencies) or ("dwr_ncro" in all_agencies)
609+
# Normalize agency names: convert "ncro" to "dwr_ncro" and "des" to "dwr_des"
610+
all_agencies = ["dwr_ncro" if ag == "ncro" else "dwr_des" if ag == "des" else ag for ag in all_agencies]
611+
do_ncro = "dwr_ncro" in all_agencies
503612
if do_ncro:
504613
revise_filename_syear_eyear(os.path.join(dest, f"ncro_*.csv"))
505614
revise_filename_syear_eyear(os.path.join(dest, f"cdec_*.csv"))

0 commit comments

Comments
 (0)