Skip to content

Commit 4f1c04c

Browse files
thodson-usgsclaude
andcommitted
Add waterdata.get_nearest_continuous helper
For each target timestamp, fetch the nearest continuous observation in a single round trip. Builds a CQL OR-chain of per-target bracketed windows, pipes it through ``get_continuous`` (which already auto-chunks long filters across multiple sub-requests), then selects the single observation closest to each target client-side. Exists because the Water Data API matches a single-instant ``time=`` parameter exactly (10:30:31 returns zero rows on a 15-minute gauge), does not implement ``sortby`` for arbitrary queryables, and does not expose a ``T_NEAREST`` CQL function. The narrow-window + client-side reduction is the one pattern that works today for multi-target nearest lookups in one API call. Tie handling is configurable via ``on_tie``: - "first" (default): keep the earlier observation - "last": keep the later observation - "mean": average numeric columns; set ``time`` to the target Default ``window="7min30s"`` matches the 15-minute gauge cadence so most targets' windows contain exactly one observation. Users with irregular-cadence gauges or known data gaps can widen to "15min" or "30min" at the cost of more bytes per response. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9ca75f0 commit 4f1c04c

3 files changed

Lines changed: 442 additions & 1 deletion

File tree

dataretrieval/waterdata/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
get_latest_continuous,
2020
get_latest_daily,
2121
get_monitoring_locations,
22+
get_nearest_continuous,
2223
get_reference_table,
2324
get_samples,
2425
get_stats_date_range,
@@ -47,6 +48,7 @@
4748
"get_latest_continuous",
4849
"get_latest_daily",
4950
"get_monitoring_locations",
51+
"get_nearest_continuous",
5052
"get_reference_table",
5153
"get_samples",
5254
"get_stats_date_range",

dataretrieval/waterdata/api.py

Lines changed: 194 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import json
1010
import logging
1111
from io import StringIO
12-
from typing import get_args
12+
from typing import Literal, get_args
1313

1414
import pandas as pd
1515
import requests
@@ -440,6 +440,199 @@ def get_continuous(
440440
return get_ogc_data(args, output_id, service)
441441

442442

443+
def get_nearest_continuous(
444+
targets,
445+
monitoring_location_id: str | list[str] | None = None,
446+
parameter_code: str | list[str] | None = None,
447+
*,
448+
window: str | pd.Timedelta = "7min30s",
449+
on_tie: Literal["first", "last", "mean"] = "first",
450+
**kwargs,
451+
) -> tuple[pd.DataFrame, BaseMetadata]:
452+
"""For each target timestamp, return the nearest continuous observation.
453+
454+
Builds one bracketed ``(time >= t-window AND time <= t+window)`` clause
455+
per target, joins them as a top-level CQL ``OR`` filter, and lets
456+
``get_continuous`` (with its auto-chunking) fetch every observation
457+
that falls in any window. Then, per ``(monitoring_location_id, target)``
458+
pair, picks the single observation with the smallest ``|time - target|``.
459+
460+
The USGS continuous endpoint matches ``time`` parameters exactly rather
461+
than fuzzily, and it does not implement ``sortby`` for arbitrary fields;
462+
this function is the single-round-trip way to ask "what reading is
463+
nearest this timestamp?" for many timestamps at once.
464+
465+
Parameters
466+
----------
467+
targets : list-like of datetime-convertible
468+
Target timestamps. Naive datetimes are treated as UTC. Accepts a
469+
list, ``pandas.Series``, ``pandas.DatetimeIndex``, ``numpy.ndarray``,
470+
or anything ``pandas.to_datetime`` consumes.
471+
monitoring_location_id : string or list of strings, optional
472+
Forwarded to ``get_continuous``.
473+
parameter_code : string or list of strings, optional
474+
Forwarded to ``get_continuous``.
475+
window : string or ``pandas.Timedelta``, default ``"7min30s"``
476+
Half-window around each target. Must be small enough that every
477+
target's window captures roughly one observation at the service
478+
cadence. The 7min30s default matches a 15-minute continuous gauge;
479+
use a larger value (e.g. ``"15min"``) when the gauge cadence is
480+
longer or you need more resilience to data gaps.
481+
on_tie : {"first", "last", "mean"}, default ``"first"``
482+
How to resolve ties when two observations are exactly equidistant
483+
from a target (which happens when the target falls at the midpoint
484+
between grid points — e.g. target ``10:22:30`` for a 15-minute
485+
gauge).
486+
487+
- ``"first"``: keep the earlier observation.
488+
- ``"last"``: keep the later observation.
489+
- ``"mean"``: average numeric columns; set the ``time`` column to
490+
the target, since no real observation exists at the midpoint.
491+
492+
**kwargs
493+
Additional keyword arguments forwarded to ``get_continuous``
494+
(e.g. ``statistic_id``, ``approval_status``, ``properties``).
495+
Passing ``time``, ``filter``, or ``filter_lang`` raises
496+
``TypeError`` — this function builds those itself.
497+
498+
Returns
499+
-------
500+
df : ``pandas.DataFrame``
501+
One row per ``(target, monitoring_location_id)`` combination that
502+
had at least one observation in its window. Rows are augmented
503+
with a ``target_time`` column indicating which target they
504+
correspond to. Targets with no observations in their window are
505+
silently dropped.
506+
md : :class:`~dataretrieval.utils.BaseMetadata`
507+
Metadata from the underlying ``get_continuous`` call.
508+
509+
Notes
510+
-----
511+
*Window sizing and ties.* When ``window`` is exactly half the service
512+
cadence, most targets' windows contain a single observation and
513+
``on_tie`` is moot. Ties arise only when a target sits exactly at the
514+
window edge — rare in practice but possible. Setting ``window`` to a
515+
full cadence (or larger) guarantees at least one observation per
516+
target in steady state at the cost of more bytes per response.
517+
518+
*Why windowed CQL rather than sort+limit.* The API's advertised
519+
``sortby`` parameter would make this a one-liner per target (``filter``
520+
by ``time <= t`` and ``limit 1``), but it is per-query — you would need
521+
one HTTP round-trip per target. The CQL ``OR``-chain approach folds
522+
all N targets into one request (auto-chunked when the URL is long).
523+
524+
Examples
525+
--------
526+
.. code::
527+
528+
>>> import pandas as pd
529+
>>> from dataretrieval import waterdata
530+
531+
>>> # Pair three off-grid timestamps with nearby observations
532+
>>> targets = pd.to_datetime(
533+
... [
534+
... "2023-06-15T10:30:31Z",
535+
... "2023-06-15T14:07:12Z",
536+
... "2023-06-16T03:45:19Z",
537+
... ]
538+
... )
539+
>>> df, md = waterdata.get_nearest_continuous(
540+
... targets,
541+
... monitoring_location_id="USGS-02238500",
542+
... parameter_code="00060",
543+
... )
544+
545+
>>> # Widen the window for an irregular-cadence gauge
546+
>>> df, md = waterdata.get_nearest_continuous(
547+
... targets,
548+
... monitoring_location_id="USGS-02238500",
549+
... parameter_code="00060",
550+
... window="30min",
551+
... on_tie="mean",
552+
... )
553+
"""
554+
for forbidden in ("time", "filter", "filter_lang"):
555+
if forbidden in kwargs:
556+
raise TypeError(
557+
f"get_nearest_continuous constructs its own {forbidden!r}; "
558+
"do not pass it directly"
559+
)
560+
if on_tie not in ("first", "last", "mean"):
561+
raise ValueError(f"on_tie must be 'first', 'last', or 'mean'; got {on_tie!r}")
562+
563+
targets = pd.to_datetime(list(targets), utc=True)
564+
window_td = pd.Timedelta(window)
565+
566+
if len(targets) == 0:
567+
# Nothing to ask about — return an empty frame shaped like a real
568+
# ``get_continuous`` response (via a trivially-empty time range).
569+
df, md = get_continuous(
570+
monitoring_location_id=monitoring_location_id,
571+
parameter_code=parameter_code,
572+
time="1900-01-01T00:00:00Z/1900-01-01T00:00:00Z",
573+
**kwargs,
574+
)
575+
return df.iloc[0:0], md
576+
577+
filter_expr = " OR ".join(
578+
f"(time >= '{(t - window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}' "
579+
f"AND time <= '{(t + window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}')"
580+
for t in targets
581+
)
582+
583+
df, md = get_continuous(
584+
monitoring_location_id=monitoring_location_id,
585+
parameter_code=parameter_code,
586+
filter=filter_expr,
587+
filter_lang="cql-text",
588+
**kwargs,
589+
)
590+
591+
if df.empty:
592+
return df, md
593+
594+
df = df.copy()
595+
df["time"] = pd.to_datetime(df["time"], utc=True)
596+
597+
if "monitoring_location_id" in df.columns:
598+
site_groups = list(df.groupby("monitoring_location_id", sort=False))
599+
else:
600+
site_groups = [(None, df)]
601+
602+
selected = []
603+
for _, site_df in site_groups:
604+
for target in targets:
605+
mask = (site_df["time"] >= target - window_td) & (
606+
site_df["time"] <= target + window_td
607+
)
608+
window_df = site_df[mask]
609+
if window_df.empty:
610+
continue
611+
deltas = (window_df["time"] - target).abs()
612+
candidates = window_df[deltas == deltas.min()].sort_values("time")
613+
614+
if len(candidates) == 1 or on_tie == "first":
615+
row = candidates.iloc[0].copy()
616+
elif on_tie == "last":
617+
row = candidates.iloc[-1].copy()
618+
else: # "mean"
619+
row = candidates.iloc[0].copy()
620+
for col in candidates.select_dtypes("number").columns:
621+
row[col] = candidates[col].mean()
622+
row["time"] = target
623+
624+
row["target_time"] = target
625+
selected.append(row)
626+
627+
if not selected:
628+
empty = df.iloc[0:0].copy()
629+
empty["target_time"] = pd.Series(dtype="datetime64[ns, UTC]")
630+
return empty, md
631+
632+
result = pd.DataFrame(selected).reset_index(drop=True)
633+
return result, md
634+
635+
443636
def get_monitoring_locations(
444637
monitoring_location_id: list[str] | None = None,
445638
agency_code: list[str] | None = None,

0 commit comments

Comments
 (0)