Skip to content

Commit 35f6e02

Browse files
thodson-usgsclaude
andcommitted
perf(waterdata.xarray): build datasets from the lean hash-free frame
The wrappers previously forced include_hash=True purely to keep a join key, which materialized the per-record UUID column (a unique ~36-char string per row -- e.g. continuous_id measured at ~155 KB / 1825 unique values on a 20-day continuous pull) only to discard it. Drop that. The wrappers now fetch the underlying getter's default, hash-free output, so the heavy UUID column is never requested or built. Every CF attribute is derived from columns already present: units from unit_of_measure, cell_methods from statistic_id (new _STATISTIC_CELL_METHOD map), standard_name/usgs_parameter_code from parameter_code. Only the human-readable parameter name still needs the metadata endpoint, now looked up (and cached) keyed by parameter_code rather than a series hash. time and monitoring_location_id are the coordinates; the converter slims to just the columns it pivots before copying. Without the hash key the rare two-series-per-(site, parameter, statistic) collision can no longer be split (the values are inherently ambiguous without it), so it now keeps the first value and warns instead of switching the instance dimension. Tests updated accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7372c31 commit 35f6e02

2 files changed

Lines changed: 118 additions & 135 deletions

File tree

dataretrieval/waterdata/xarray.py

Lines changed: 94 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
* dataset-level attributes carry ``Conventions``, provenance, and the
1818
request URL.
1919
20-
The wrappers call the underlying getter with ``include_hash=True`` so the
21-
join key survives, perform the metadata lookup, then drop the opaque hash
22-
columns from the user-facing result.
20+
The wrappers call the underlying getter with its default, hash-free output
21+
-- so the large per-record UUID column is never fetched or materialized --
22+
and derive the CF attributes directly from the surviving columns
23+
(``unit_of_measure`` -> ``units``, ``statistic_id`` -> ``cell_methods``,
24+
``parameter_code`` -> ``standard_name``). Only the human-readable parameter
25+
name comes from a small, cached metadata lookup keyed by ``parameter_code``.
2326
2427
This module requires the optional ``xarray`` dependency::
2528
@@ -79,15 +82,16 @@
7982
"%": "percent",
8083
}
8184

82-
# computation_identifier -> the operator in a CF ``cell_methods`` string.
83-
_CELL_METHOD = {
84-
"Mean": "mean",
85-
"Sum": "sum",
86-
"Maximum": "maximum",
87-
"Max At Event Time": "maximum",
88-
"Minimum": "minimum",
89-
"Median": "median",
90-
"Instantaneous": "point",
85+
# USGS statistic_id -> the operator in a CF ``cell_methods`` string. Read
86+
# straight from the values frame, so no metadata round-trip is needed to
87+
# classify the aggregation.
88+
_STATISTIC_CELL_METHOD = {
89+
"00001": "maximum",
90+
"00002": "minimum",
91+
"00003": "mean",
92+
"00006": "sum",
93+
"00008": "median",
94+
"00011": "point", # instantaneous
9195
}
9296

9397
# USGS 5-digit parameter code -> CF standard_name. Deliberately conservative;
@@ -109,62 +113,43 @@
109113
_TS_META_CACHE: dict[str, dict[str, dict]] = {}
110114
_FIELD_META_CACHE: dict[str, dict[str, dict]] = {}
111115

112-
_TS_DESCRIPTORS = (
113-
"parameter_code",
114-
"parameter_name",
115-
"parameter_description",
116-
"unit_of_measure",
117-
"statistic_id",
118-
"computation_period_identifier",
119-
"computation_identifier",
120-
)
121-
_FIELD_DESCRIPTORS = ("parameter_code", "parameter_name", "parameter_description")
116+
# Only the human-readable name is sourced from the metadata endpoint; units,
117+
# statistic, and parameter code all come from the values frame itself.
118+
_NAME_DESCRIPTORS = ("parameter_name", "parameter_description")
122119

123120

124-
def _lookup(site_ids, cache, getter, id_col, descriptors):
125-
"""Fetch and cache metadata descriptors keyed by the series id column.
121+
def _lookup(site_ids, cache, getter):
122+
"""Fetch and cache ``{parameter_code: {name descriptors}}`` for sites.
126123
127-
Returns a flat ``{series_id: {descriptor: value}}`` dict covering every
128-
requested site. One network call per not-yet-cached batch of sites; the
129-
cache is keyed by site so repeated getter calls reuse it.
124+
Keyed by ``parameter_code`` (stable and 1:1 with the parameter name), so
125+
the lookup needs no hash id. One metadata call per not-yet-cached batch
126+
of sites; the cache is keyed by site so repeated getter calls reuse it.
130127
"""
131128
sites = sorted({str(s) for s in site_ids if _pd.notna(s)})
132129
todo = [s for s in sites if s not in cache]
133130
if todo:
134-
meta, _ = getter(monitoring_location_id=todo, include_hash=True)
131+
meta, _ = getter(monitoring_location_id=todo)
135132
for s in todo:
136133
cache[s] = {}
137-
if not meta.empty:
138-
cols = [c for c in descriptors if c in meta.columns]
134+
if not meta.empty and "parameter_code" in meta.columns:
135+
cols = [c for c in _NAME_DESCRIPTORS if c in meta.columns]
139136
for _, row in meta.iterrows():
140137
site = row.get("monitoring_location_id")
141-
sid = row.get(id_col)
142-
if site in cache and _pd.notna(sid):
143-
cache[site][sid] = {c: row.get(c) for c in cols}
138+
pcode = row.get("parameter_code")
139+
if site in cache and _pd.notna(pcode):
140+
cache[site][str(pcode)] = {c: row.get(c) for c in cols}
144141
out: dict[str, dict] = {}
145142
for s in sites:
146143
out.update(cache.get(s, {}))
147144
return out
148145

149146

150147
def _timeseries_metadata(site_ids):
151-
return _lookup(
152-
site_ids,
153-
_TS_META_CACHE,
154-
_api.get_time_series_metadata,
155-
"time_series_id",
156-
_TS_DESCRIPTORS,
157-
)
148+
return _lookup(site_ids, _TS_META_CACHE, _api.get_time_series_metadata)
158149

159150

160151
def _field_metadata(site_ids):
161-
return _lookup(
162-
site_ids,
163-
_FIELD_META_CACHE,
164-
_api.get_field_measurements_metadata,
165-
"field_series_id",
166-
_FIELD_DESCRIPTORS,
167-
)
152+
return _lookup(site_ids, _FIELD_META_CACHE, _api.get_field_measurements_metadata)
168153

169154

170155
# --- helpers ---------------------------------------------------------------
@@ -182,20 +167,26 @@ def _first(series):
182167
return nonnull.iloc[0] if len(nonnull) else None
183168

184169

185-
def _var_attrs(desc, group, *, default_cell_method, pcode, ancillary, name):
186-
"""Build the CF attribute dict for one data variable."""
170+
def _var_attrs(desc, *, unit, pcode, stat, default_cell_method, ancillary, name):
171+
"""Build the CF attribute dict for one data variable.
172+
173+
``unit``, ``pcode`` and ``stat`` are read from the values frame; ``desc``
174+
supplies only the human-readable name from the metadata lookup.
175+
"""
187176
attrs: dict[str, str] = {}
188177
long_name = desc.get("parameter_description") or desc.get("parameter_name")
189178
if long_name and _pd.notna(long_name):
190179
attrs["long_name"] = str(long_name)
191180

192-
unit = desc.get("unit_of_measure")
193-
if (unit is None or _pd.isna(unit)) and "unit_of_measure" in group:
194-
unit = _first(group["unit_of_measure"])
195181
if unit is not None and _pd.notna(unit):
196182
attrs["units"] = _UDUNITS.get(str(unit), str(unit))
197183

198-
op = _CELL_METHOD.get(desc.get("computation_identifier"), default_cell_method)
184+
op = (
185+
_STATISTIC_CELL_METHOD.get(str(stat))
186+
if stat is not None and _pd.notna(stat)
187+
else None
188+
)
189+
op = op or default_cell_method
199190
if op:
200191
attrs["cell_methods"] = f"time: {op}"
201192

@@ -205,9 +196,6 @@ def _var_attrs(desc, group, *, default_cell_method, pcode, ancillary, name):
205196
attrs["standard_name"] = sn
206197
attrs["usgs_parameter_code"] = str(pcode)
207198

208-
stat = desc.get("statistic_id")
209-
if (stat is None or _pd.isna(stat)) and "statistic_id" in group:
210-
stat = _first(group["statistic_id"])
211199
if stat is not None and _pd.notna(stat):
212200
attrs["usgs_statistic_id"] = str(stat)
213201

@@ -254,69 +242,77 @@ def _point_coords(df, inst):
254242
return lon, lat
255243

256244

245+
_INSTANCE = "monitoring_location_id"
246+
247+
257248
def _build_timeseries(
258249
df,
259250
base_meta,
260251
*,
261252
service,
262253
series_meta,
263-
key_col="time_series_id",
264254
group_cols=("parameter_code", "statistic_id"),
265255
default_cell_method=None,
266256
):
267-
"""Long values frame -> CF timeSeries Dataset (one var per parameter)."""
257+
"""Hash-free values frame -> CF timeSeries Dataset (one var per parameter).
258+
259+
The frame carries no hash columns (the wrappers fetch the default lean
260+
output); every CF attribute is derived from ``parameter_code`` /
261+
``statistic_id`` / ``unit_of_measure`` already present, plus the
262+
human-readable name from ``series_meta`` keyed by ``parameter_code``.
263+
``time`` and ``monitoring_location_id`` become the coordinates.
264+
"""
268265
if df is None or len(df) == 0:
269266
return _empty_dataset(service, base_meta)
270267

271-
df = df.copy()
272-
# Normalize to naive-UTC so xarray can store datetime64 (it has no tz dtype).
273-
times = _pd.to_datetime(df["time"], errors="coerce", utc=True)
274-
df["time"] = times.dt.tz_localize(None)
275-
df["value"] = _pd.to_numeric(df["value"], errors="coerce")
276268
group_cols = [c for c in group_cols if c in df.columns]
277-
278-
# Instance (DSG) dimension: the site, unless two series collide on
279-
# (site, parameter, time) -- then fall back to the unambiguous series id.
280-
inst = "monitoring_location_id"
281-
if key_col in df.columns and df.duplicated(group_cols + [inst, "time"]).any():
282-
inst = key_col
283-
_warnings.warn(
284-
"multiple time series share a (site, parameter); using "
285-
f"'{key_col}' as the instance dimension instead of "
286-
"'monitoring_location_id'.",
287-
stacklevel=3,
288-
)
269+
ancillary = [c for c in _ANCILLARY if c in df.columns]
270+
has_unit = "unit_of_measure" in df.columns
271+
# Slim to just the columns we convert, so the heavy frame (and any columns
272+
# we ignore) is not copied wholesale.
273+
cols = [_INSTANCE, "time", "value", *group_cols, *ancillary]
274+
if has_unit:
275+
cols.append("unit_of_measure")
276+
work = df.loc[:, list(dict.fromkeys(cols))].copy()
277+
# Normalize to naive-UTC so xarray can store datetime64 (it has no tz dtype).
278+
work["time"] = _pd.to_datetime(
279+
work["time"], errors="coerce", utc=True
280+
).dt.tz_localize(None)
281+
work["value"] = _pd.to_numeric(work["value"], errors="coerce")
289282

290283
datasets, used = [], set()
291-
for _, group in df.groupby(group_cols, dropna=False):
292-
sid = _first(group[key_col]) if key_col in group else None
293-
desc = series_meta.get(sid, {}) if sid is not None else {}
294-
pcode = desc.get("parameter_code") or (
295-
_first(group["parameter_code"]) if "parameter_code" in group else None
296-
)
284+
for _, group in work.groupby(group_cols, dropna=False):
285+
pcode = _first(group["parameter_code"]) if "parameter_code" in group else None
286+
stat = _first(group["statistic_id"]) if "statistic_id" in group else None
287+
unit = _first(group["unit_of_measure"]) if has_unit else None
288+
desc = series_meta.get(str(pcode), {}) if pcode is not None else {}
297289

298290
name = _slug(desc.get("parameter_name") or pcode)
299-
if name in used: # disambiguate same-parameter, different-statistic vars
300-
comp = desc.get("computation_identifier") or _first(
301-
group.get("statistic_id", _pd.Series(dtype=object))
302-
)
303-
name = f"{name}_{_slug(comp)}" if comp else name
291+
if name in used: # same parameter, different statistic -> distinct var
292+
op = _STATISTIC_CELL_METHOD.get(str(stat)) or (str(stat) if stat else None)
293+
name = f"{name}_{_slug(op)}" if op else name
304294
while name in used:
305295
name += "_x"
306296
used.add(name)
307297

308-
ancillary = [c for c in _ANCILLARY if c in group.columns]
309-
sub = group.set_index([inst, "time"])[["value", *ancillary]]
298+
sub = group.set_index([_INSTANCE, "time"])[["value", *ancillary]]
310299
if not sub.index.is_unique:
300+
_warnings.warn(
301+
f"'{name}' has multiple values per (site, time) -- two series "
302+
"share this (site, parameter, statistic); keeping the first. "
303+
"Filter the query to separate them.",
304+
stacklevel=3,
305+
)
311306
sub = sub[~sub.index.duplicated(keep="first")]
312307
ds_g = sub.to_xarray().rename(
313308
{"value": name, **{c: f"{name}_{c}" for c in ancillary}}
314309
)
315310
ds_g[name].attrs = _var_attrs(
316311
desc,
317-
group,
318-
default_cell_method=default_cell_method,
312+
unit=unit,
319313
pcode=pcode,
314+
stat=stat,
315+
default_cell_method=default_cell_method,
320316
ancillary=ancillary,
321317
name=name,
322318
)
@@ -327,26 +323,20 @@ def _build_timeseries(
327323
ds = _xr.merge(datasets, combine_attrs="drop_conflicts", join="outer")
328324
ds.attrs = _dataset_attrs(service, base_meta)
329325
ds["time"].attrs.setdefault("standard_name", "time")
330-
if inst in ds.coords:
331-
ds[inst].attrs.setdefault("cf_role", "timeseries_id")
326+
if _INSTANCE in ds.coords:
327+
ds[_INSTANCE].attrs.setdefault("cf_role", "timeseries_id")
332328

333-
coords = _point_coords(df, inst)
329+
coords = _point_coords(df, _INSTANCE)
334330
if coords is not None:
335331
lon, lat = coords
336-
order = list(ds[inst].values)
332+
order = list(ds[_INSTANCE].values)
337333
ds = ds.assign_coords(
338-
longitude=(inst, [lon.get(k) for k in order]),
339-
latitude=(inst, [lat.get(k) for k in order]),
334+
longitude=(_INSTANCE, [lon.get(k) for k in order]),
335+
latitude=(_INSTANCE, [lat.get(k) for k in order]),
340336
)
341337
ds["longitude"].attrs = {"standard_name": "longitude", "units": "degrees_east"}
342338
ds["latitude"].attrs = {"standard_name": "latitude", "units": "degrees_north"}
343339

344-
# Surface the human-readable site as a coordinate when the id is the dim.
345-
if inst == key_col and "monitoring_location_id" in df.columns:
346-
sites = df.drop_duplicates(key_col).set_index(key_col)["monitoring_location_id"]
347-
ds = ds.assign_coords(
348-
monitoring_location_id=(inst, sites.reindex(ds[inst].values).values)
349-
)
350340
return ds
351341

352342

@@ -390,7 +380,8 @@ def _xr_doc(func):
390380
def _timeseries_wrapper(func, *, service, default_cell_method=None):
391381
@_wraps(func)
392382
def wrapper(*args, **kwargs):
393-
kwargs.setdefault("include_hash", True)
383+
# Default (hash-free) fetch: the per-record UUID is never requested or
384+
# materialized; CF attributes come from the surviving columns.
394385
df, base_meta = func(*args, **kwargs)
395386
return _build_timeseries(
396387
df,
@@ -407,14 +398,12 @@ def wrapper(*args, **kwargs):
407398
def _field_wrapper(func, *, service):
408399
@_wraps(func)
409400
def wrapper(*args, **kwargs):
410-
kwargs.setdefault("include_hash", True)
411401
df, base_meta = func(*args, **kwargs)
412402
return _build_timeseries(
413403
df,
414404
base_meta,
415405
service=service,
416406
series_meta=_field_metadata(_sites(df)),
417-
key_col="field_measurements_series_id",
418407
group_cols=("parameter_code",),
419408
default_cell_method="point",
420409
)
@@ -426,7 +415,6 @@ def wrapper(*args, **kwargs):
426415
def _stats_wrapper(func, *, service):
427416
@_wraps(func)
428417
def wrapper(*args, **kwargs):
429-
kwargs.setdefault("include_hash", True)
430418
df, base_meta = func(*args, **kwargs)
431419
return _build_stats(df, base_meta, service)
432420

0 commit comments

Comments
 (0)