|
| 1 | +"""USGS rating-curve retrieval via the Water Data STAC catalog. |
| 2 | +
|
| 3 | +Wraps ``https://api.waterdata.usgs.gov/stac/v0/search`` and the per-feature |
| 4 | +RDB downloads that follow. The STAC endpoint hosts standard NWIS rating |
| 5 | +files (``exsa``, ``base``, ``corr``) for active streamgages — see the |
| 6 | +service overview at https://api.waterdata.usgs.gov/docs/stac/ and the |
| 7 | +WDFN announcement at https://waterdata.usgs.gov/blog/wdfn-rating-curves/. |
| 8 | +
|
| 9 | +This is the discrete analogue to the OGC waterdata getters; it lives in |
| 10 | +its own module because the transport layer (STAC search + RDB download) |
| 11 | +differs from the OGC collections used by the rest of the package. |
| 12 | +
|
| 13 | +The R analogue is ``read_waterdata_ratings`` in |
| 14 | +https://github.com/DOI-USGS/dataRetrieval/. |
| 15 | +""" |
| 16 | + |
| 17 | +from __future__ import annotations |
| 18 | + |
| 19 | +import logging |
| 20 | +import os |
| 21 | +import tempfile |
| 22 | +from typing import Any |
| 23 | + |
| 24 | +import pandas as pd |
| 25 | +import requests |
| 26 | + |
| 27 | +from dataretrieval.nwis import _read_rdb |
| 28 | + |
| 29 | +from .utils import BASE_URL, _default_headers, _format_api_dates |
| 30 | + |
| 31 | +logger = logging.getLogger(__name__) |
| 32 | + |
| 33 | +STAC_URL = f"{BASE_URL}/stac/v0" |
| 34 | +_VALID_FILE_TYPES: tuple[str, ...] = ("exsa", "base", "corr") |
| 35 | + |
| 36 | + |
| 37 | +def _build_filter( |
| 38 | + monitoring_location_id: str | list[str] | None, |
| 39 | + file_type: str | None, |
| 40 | +) -> str | None: |
| 41 | + """Compose the CQL filter sent to STAC ``/search``. |
| 42 | +
|
| 43 | + Mirrors R's logic: only pin ``file_type`` when a single value was given, |
| 44 | + so a multi-type request returns every matching site and the file-type |
| 45 | + filtering happens client-side from the per-feature URLs. |
| 46 | + """ |
| 47 | + parts: list[str] = [] |
| 48 | + if monitoring_location_id is not None: |
| 49 | + ids = ( |
| 50 | + [monitoring_location_id] |
| 51 | + if isinstance(monitoring_location_id, str) |
| 52 | + else list(monitoring_location_id) |
| 53 | + ) |
| 54 | + joined = "', '".join(ids) |
| 55 | + parts.append(f"monitoring_location_id IN ('{joined}')") |
| 56 | + if file_type is not None: |
| 57 | + parts.append(f"file_type = '{file_type}'") |
| 58 | + return " AND ".join(parts) if parts else None |
| 59 | + |
| 60 | + |
| 61 | +def _search( |
| 62 | + filter_str: str | None, |
| 63 | + datetime_str: str | None, |
| 64 | + bbox: list[float] | None, |
| 65 | + limit: int, |
| 66 | + ssl_check: bool, |
| 67 | +) -> list[dict[str, Any]]: |
| 68 | + """Run a single STAC ``/search`` request and return its features.""" |
| 69 | + params: dict[str, Any] = {"limit": limit} |
| 70 | + if filter_str is not None: |
| 71 | + params["filter"] = filter_str |
| 72 | + if datetime_str is not None: |
| 73 | + params["datetime"] = datetime_str |
| 74 | + if bbox is not None: |
| 75 | + params["bbox"] = ",".join(str(b) for b in bbox) |
| 76 | + |
| 77 | + response = requests.get( |
| 78 | + f"{STAC_URL}/search", |
| 79 | + params=params, |
| 80 | + headers=_default_headers(), |
| 81 | + verify=ssl_check, |
| 82 | + ) |
| 83 | + response.raise_for_status() |
| 84 | + return response.json().get("features", []) |
| 85 | + |
| 86 | + |
| 87 | +def _download_and_parse( |
| 88 | + feature: dict[str, Any], |
| 89 | + file_path: str, |
| 90 | + ssl_check: bool, |
| 91 | +) -> pd.DataFrame: |
| 92 | + """Fetch the feature's data asset, write it to ``file_path``, parse RDB.""" |
| 93 | + url = feature["assets"]["data"]["href"] |
| 94 | + fid = feature["id"] |
| 95 | + target = os.path.join(file_path, fid) |
| 96 | + |
| 97 | + response = requests.get(url, headers=_default_headers(), verify=ssl_check) |
| 98 | + response.raise_for_status() |
| 99 | + |
| 100 | + with open(target, "w") as f: |
| 101 | + f.write(response.text) |
| 102 | + |
| 103 | + return _read_rdb(response.text) |
| 104 | + |
| 105 | + |
| 106 | +def get_ratings( |
| 107 | + monitoring_location_id: str | list[str] | None = None, |
| 108 | + file_type: str | list[str] = "exsa", |
| 109 | + file_path: str | None = None, |
| 110 | + datetime: str | list[str] | None = None, |
| 111 | + bbox: list[float] | None = None, |
| 112 | + limit: int = 10000, |
| 113 | + download_and_parse: bool = True, |
| 114 | + ssl_check: bool = True, |
| 115 | +) -> dict[str, pd.DataFrame] | list[dict[str, Any]]: |
| 116 | + """Get USGS stage-discharge rating curves from the Water Data STAC catalog. |
| 117 | +
|
| 118 | + Returns the current rating tables for one or more active USGS streamgages. |
| 119 | + The catalog hosts three file types: |
| 120 | +
|
| 121 | + - ``"exsa"`` — expanded shift-adjusted rating (default). Adds a ``SHIFT`` |
| 122 | + column to ``"base"`` indicating the current shift for each ``INDEP``. |
| 123 | + - ``"base"`` — three columns: ``INDEP`` (typically gage height, ft); |
| 124 | + ``DEP`` (typically discharge, ft^3/s); ``STOR`` ("``*``" marks fixed |
| 125 | + points of the rating). |
| 126 | + - ``"corr"`` — three columns: ``INDEP``; ``CORR`` (correction for that |
| 127 | + value); ``CORRINDEP`` (corrected INDEP). |
| 128 | +
|
| 129 | + See https://api.waterdata.usgs.gov/docs/stac/ for the upstream service |
| 130 | + docs and https://waterdata.usgs.gov/blog/wdfn-rating-curves/ for the |
| 131 | + background announcement. The R analogue is ``read_waterdata_ratings`` |
| 132 | + in https://github.com/DOI-USGS/dataRetrieval/. |
| 133 | +
|
| 134 | + Parameters |
| 135 | + ---------- |
| 136 | + monitoring_location_id : string or list of strings, optional |
| 137 | + One or more identifiers in ``AGENCY-ID`` form (e.g. |
| 138 | + ``"USGS-01104475"``). If omitted, the spatial / temporal filters |
| 139 | + determine the result set. |
| 140 | + file_type : string or list of strings, default ``"exsa"`` |
| 141 | + Which rating file(s) to request. One or more of ``"exsa"``, |
| 142 | + ``"base"``, ``"corr"``. |
| 143 | + file_path : string, optional |
| 144 | + Directory the downloaded RDB files are written to. Defaults to a |
| 145 | + per-call temporary directory created via :func:`tempfile.mkdtemp`. |
| 146 | + datetime : string or list of strings, optional |
| 147 | + STAC ``datetime`` filter — a single date / datetime, or an |
| 148 | + interval (``"start/end"``, optionally half-bounded with ``..``). |
| 149 | + ISO 8601 *durations* (``"P1M"``, ``"PT36H"``, …) are **not** |
| 150 | + supported by the rating-curve service; passing one raises |
| 151 | + ``ValueError``. |
| 152 | + bbox : list of numbers, optional |
| 153 | + Only features whose geometry intersects the bounding box are |
| 154 | + selected. Format: ``[xmin, ymin, xmax, ymax]`` in CRS 4326 |
| 155 | + (longitude / latitude, west-south-east-north). |
| 156 | + limit : int, default 10000 |
| 157 | + Page size for the STAC ``/search`` request (capped at 10000). |
| 158 | + download_and_parse : bool, default ``True`` |
| 159 | + If ``True``, download every matching RDB file and parse it into a |
| 160 | + ``DataFrame``. If ``False``, return the raw list of STAC feature |
| 161 | + dicts so the caller can inspect what's available before pulling |
| 162 | + bytes. |
| 163 | + ssl_check : bool, default ``True`` |
| 164 | + Verify the server's SSL certificate. |
| 165 | +
|
| 166 | + Returns |
| 167 | + ------- |
| 168 | + dict[str, pandas.DataFrame] or list[dict] |
| 169 | + When ``download_and_parse=True`` (the default), a dict keyed by |
| 170 | + feature ID (e.g. ``"USGS-01104475.exsa.rdb"``) mapping to a parsed |
| 171 | + ``DataFrame``. When ``download_and_parse=False``, the raw list of |
| 172 | + STAC feature dicts as returned by the search endpoint. |
| 173 | +
|
| 174 | + Raises |
| 175 | + ------ |
| 176 | + ValueError |
| 177 | + For an unrecognized ``file_type`` value or an ISO 8601 duration in |
| 178 | + ``datetime``. |
| 179 | +
|
| 180 | + Examples |
| 181 | + -------- |
| 182 | + .. code:: |
| 183 | +
|
| 184 | + >>> # Default exsa ratings for two sites |
| 185 | + >>> ratings = dataretrieval.waterdata.get_ratings( |
| 186 | + ... monitoring_location_id=["USGS-01104475", "USGS-01104460"], |
| 187 | + ... file_type="exsa", |
| 188 | + ... ) |
| 189 | + >>> ratings["USGS-01104475.exsa.rdb"].head() |
| 190 | +
|
| 191 | + >>> # Both exsa and corr files for the same two sites |
| 192 | + >>> ratings = dataretrieval.waterdata.get_ratings( |
| 193 | + ... monitoring_location_id=["USGS-01104475", "USGS-01104460"], |
| 194 | + ... file_type=["exsa", "corr"], |
| 195 | + ... ) |
| 196 | +
|
| 197 | + >>> # Bounding-box query, listing what's available without downloading |
| 198 | + >>> features = dataretrieval.waterdata.get_ratings( |
| 199 | + ... bbox=[-95.0, 40.0, -92.0, 42.0], |
| 200 | + ... download_and_parse=False, |
| 201 | + ... ) |
| 202 | +
|
| 203 | + >>> # Restrict to features modified since seven days ago (no durations) |
| 204 | + >>> features = dataretrieval.waterdata.get_ratings( |
| 205 | + ... bbox=[-95.0, 40.0, -92.0, 42.0], |
| 206 | + ... datetime=["2026-04-29", ".."], |
| 207 | + ... download_and_parse=False, |
| 208 | + ... ) |
| 209 | +
|
| 210 | + """ |
| 211 | + file_types = [file_type] if isinstance(file_type, str) else list(file_type) |
| 212 | + invalid = [ft for ft in file_types if ft not in _VALID_FILE_TYPES] |
| 213 | + if invalid: |
| 214 | + raise ValueError( |
| 215 | + f"Invalid file_type {invalid!r}. Valid options: {list(_VALID_FILE_TYPES)}." |
| 216 | + ) |
| 217 | + |
| 218 | + if datetime is not None: |
| 219 | + # The rating-curve STAC service rejects ISO 8601 durations; surface a |
| 220 | + # clear error rather than letting the server return a confusing 4xx. |
| 221 | + dt_values = datetime if isinstance(datetime, list) else [datetime] |
| 222 | + if any(v is not None and "P" in str(v).upper() for v in dt_values): |
| 223 | + raise ValueError( |
| 224 | + "ISO 8601 durations (e.g. 'P7D') are not supported in " |
| 225 | + "`datetime` for the rating-curve service. Provide a date or " |
| 226 | + "interval instead." |
| 227 | + ) |
| 228 | + datetime_str = _format_api_dates(datetime, date=False) |
| 229 | + else: |
| 230 | + datetime_str = None |
| 231 | + |
| 232 | + # Mirror R: only pin file_type in the server-side filter when one type |
| 233 | + # is requested. With multiple types, fetch all and filter URLs locally. |
| 234 | + server_file_type = file_types[0] if len(file_types) == 1 else None |
| 235 | + filter_str = _build_filter(monitoring_location_id, server_file_type) |
| 236 | + |
| 237 | + features = _search(filter_str, datetime_str, bbox, limit, ssl_check) |
| 238 | + |
| 239 | + if not download_and_parse: |
| 240 | + return features |
| 241 | + |
| 242 | + if file_path is None: |
| 243 | + file_path = tempfile.mkdtemp(prefix="dataretrieval-ratings-") |
| 244 | + os.makedirs(file_path, exist_ok=True) |
| 245 | + |
| 246 | + out: dict[str, pd.DataFrame] = {} |
| 247 | + for feature in features: |
| 248 | + url = feature.get("assets", {}).get("data", {}).get("href", "") |
| 249 | + # Skip features whose file type wasn't requested (only relevant when |
| 250 | + # `file_type` is a list — single-type requests are already filtered |
| 251 | + # server-side). |
| 252 | + if not any(ft in url for ft in file_types): |
| 253 | + continue |
| 254 | + fid = feature["id"] |
| 255 | + try: |
| 256 | + out[fid] = _download_and_parse(feature, file_path, ssl_check) |
| 257 | + except Exception as e: |
| 258 | + logger.warning("Failed to download / parse %s: %s", fid, e) |
| 259 | + |
| 260 | + return out |
0 commit comments