Skip to content

Commit 4daf771

Browse files
thodson-usgsclaude
andauthored
feat(wateruse): add water-use module for the USGS NWDC API (DOI-USGS#328)
Add `dataretrieval.wateruse` for USGS National Water Availability Assessment Data Companion (NWDC) water-use estimates — modeled on a HUC12 grid and queryable by state, county, or hydrologic unit. This is the modern replacement for the defunct legacy NWIS water-use service (`nwis.get_water_use` now points callers here). from dataretrieval import wateruse df, md = wateruse.get_wateruse( model="wu-public-supply-wd", variable=["pswdtot", "pswdgw", "pswdsw"], state="RI", start_date="2020-01", time_resolution="monthly", ) The NWDC is a plain CSV REST service, not an OGC API Features collection, so the module supplies the NWDC-specific pieces (CSV parsing, the RFC 8288 Link-header pagination cursor, the `{detail}` error envelope, and state/county/huc location builders) but reuses the OGC engine's generic transport rather than re-implementing it: the shared pager (`_paginate`), the Jupyter-safe anyio sync bridge (`_run_sync`), response/frame aggregation, and `_default_headers`. It keeps the package conventions where they fit — a `(DataFrame, BaseMetadata)` return, the typed `DataRetrievalError` taxonomy (surfacing the NWDC `detail`), `API_USGS_PAT` token support, idiomatic snake_case params, and `state` / `county` / `huc` selectors that each accept a value or a list (a list fans out one concurrent request per location). Large areas paginate transparently. A `FutureWarning` flags the module as experimental, since the NWDC service is new and still changing. Extracting the reusable engine seams also de-duplicated the engine itself (~-66 LOC, behavior-preserving): `planning._merge_response` now backs both pagination and fan-out aggregation; a generic `utils.Ambient[T]` contextvar-with-scope helper collapses the per-call ambients; and `x-ratelimit-remaining` now reports the lowest value any concurrent sub-request saw (the quota actually left after a fan-out), fixing a latent inaccuracy in the OGC chunker too. Includes offline pytest-httpx coverage, a reference page, a README example, and a demo notebook. Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 5b79b2d commit 4daf771

18 files changed

Lines changed: 1395 additions & 263 deletions

AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
- Exclude `.claude/worktrees/` from searches and edits; it contains stale worktrees that pollute results.
77

88
## Example Notebooks
9-
- `demos/*.ipynb` — top-level Water Data tour: `USGS_WaterData_Introduction_Examples.ipynb` is the entry point; `_ContinuousData_`, `_DailyStatistics_`, `_DiscreteSamples_`, `_ReferenceLists_` cover individual collections; `WaterData_demo.ipynb`, `peak_streamflow_trends.ipynb`, and `R Python Vignette equivalents.ipynb` are standalone walkthroughs.
9+
- `demos/*.ipynb` — top-level Water Data tour: `USGS_WaterData_Introduction_Examples.ipynb` is the entry point; `_ContinuousData_`, `_DailyStatistics_`, `_DiscreteSamples_`, `_ReferenceLists_` cover individual collections; `WaterData_demo.ipynb`, `peak_streamflow_trends.ipynb`, `USGS_WaterUse_Examples.ipynb` (NWDC water-use data via `wateruse.get_wateruse`), and `R Python Vignette equivalents.ipynb` are standalone walkthroughs.
1010
- `demos/hydroshare/*.ipynb` — per-service HydroShare examples (NLDI, NWIS WaterUse, and Water Data DailyValues / GroundwaterLevels / Measurements / ParameterCodes / Peaks / Ratings / Samples / SiteInfo / SiteInventory / Statistics / UnitValues). Mirror these when adding examples for a new collection.
1111
- `demos/nwqn_data_pull/` — non-notebook example: a lithops/Docker batch pipeline (`retrieve_nwqn_samples.py`, `retrieve_nwqn_streamflow.py`) with its own `README.md`.
1212
- Any `Untitled*.ipynb`, `*_test.ipynb`, or notebooks not listed here are untracked local scratch; ignore them.

README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ sites, metadata = ngwmn.get_sites(state='Wisconsin')
133133

134134
print(f"Found {len(sites)} NGWMN sites in Wisconsin")
135135

136-
# Pull water levels from the first twenty sites over a time window.
136+
# Pull water levels from the first twenty sites over a time window.
137137
water_levels, metadata = ngwmn.get_water_level(
138138
monitoring_location_id=sites['monitoring_location_id'][:20],
139139
datetime=['2022-01-01', '2024-01-01']
@@ -192,6 +192,31 @@ flowlines = nldi.get_flowlines(
192192
print(f"Found {len(flowlines)} upstream tributaries within 50km")
193193
```
194194

195+
### Water Use (NWDC)
196+
197+
Retrieve modeled water-use estimates from the National Water Availability
198+
Assessment Data Companion:
199+
200+
```python
201+
from dataretrieval import wateruse
202+
203+
# Monthly public-supply withdrawals for Rhode Island, split into
204+
# groundwater and surface-water sources (returns a DataFrame and metadata).
205+
df, metadata = wateruse.get_wateruse(
206+
model='wu-public-supply-wd',
207+
variable=['pswdtot', 'pswdgw', 'pswdsw'],
208+
state='RI', # name/postal/FIPS; pass a list to fan out over several areas
209+
start_date='2020-01',
210+
time_resolution='monthly',
211+
)
212+
213+
print(f"Retrieved {len(df)} records across {df['huc12_id'].nunique()} watersheds")
214+
215+
# Aggregate the HUC12 grid to a statewide monthly total (million gallons/day)
216+
statewide = df.groupby('year_month')['pswdtot_mgd'].sum()
217+
print(statewide.head())
218+
```
219+
195220
## Available Data Services
196221

197222
### Modern USGS Water Data APIs (Recommended) — `dataretrieval.waterdata`
@@ -232,6 +257,13 @@ print(f"Found {len(flowlines)} upstream tributaries within 50km")
232257
- `get_features`: Find monitoring sites, dams, and other features along the network
233258
- `get_features_by_data_source`: Features from a specific data source
234259

260+
### Water Use (NWDC)
261+
- **Public supply**: Modeled public-supply withdrawals and consumptive use
262+
- **Irrigation**: Modeled irrigation withdrawals and consumptive use
263+
- **Thermoelectric**: Modeled thermoelectric-power water use
264+
- **HUC12 estimates**: National coverage on a 12-digit hydrologic-unit grid,
265+
summarizable to counties, states, or coarser hydrologic units
266+
235267
## More Examples
236268

237269
Explore additional examples in the

dataretrieval/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
df, meta = nwis.get_dv(sites="05427718")
1212
1313
Available service modules: ``waterdata``, ``wqp`` (Water Quality Portal),
14-
``nldi``, ``streamstats``, and the deprecated ``nwis``.
14+
``wateruse`` (NWDC water-use data), ``nldi``, ``streamstats``, and the
15+
deprecated ``nwis``.
1516
1617
``nldi`` requires geopandas (``pip install dataretrieval[nldi]``) and is
1718
imported on demand: ``from dataretrieval import nldi``.
@@ -62,6 +63,7 @@
6263
streamstats,
6364
utils,
6465
waterdata,
66+
wateruse,
6567
wqp,
6668
)
6769

@@ -72,6 +74,7 @@
7274
"streamstats",
7375
"utils",
7476
"waterdata",
77+
"wateruse",
7578
"wqp",
7679
# error taxonomy (canonical home: ``dataretrieval.exceptions``), re-exported
7780
# so callers can ``except dataretrieval.DataRetrievalError``

dataretrieval/nwis.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -741,8 +741,17 @@ def get_pmcodes(**kwargs: Any) -> NoReturn:
741741

742742

743743
def get_water_use(**kwargs: Any) -> NoReturn:
744-
"""Defunct: no current replacement."""
745-
raise NameError("`nwis.get_water_use` is defunct.")
744+
"""Defunct: use ``dataretrieval.wateruse.get_wateruse`` instead.
745+
746+
The legacy NWIS water-use service has been retired. Modeled water-use
747+
estimates are now served by the National Water Availability Assessment Data
748+
Companion (NWDC); retrieve them with
749+
:func:`dataretrieval.wateruse.get_wateruse`.
750+
"""
751+
raise NameError(
752+
"`nwis.get_water_use` is defunct; use "
753+
"`dataretrieval.wateruse.get_wateruse` instead."
754+
)
746755

747756

748757
@_deprecated

dataretrieval/ogc/chunking.py

Lines changed: 10 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,14 @@
6969
import functools
7070
import os
7171
from collections.abc import Callable, Iterator
72-
from contextlib import contextmanager
73-
from contextvars import ContextVar, copy_context
72+
from contextvars import copy_context
7473
from typing import Any, cast
7574

7675
import httpx
7776
import pandas as pd
7877
from anyio.from_thread import start_blocking_portal
7978

80-
from dataretrieval.utils import HTTPX_DEFAULTS
79+
from dataretrieval.utils import HTTPX_DEFAULTS, Ambient
8180

8281
from . import progress as _progress
8382
from .interruptions import (
@@ -106,9 +105,6 @@
106105
_OGC_URL_BYTE_LIMIT = 8000
107106

108107

109-
# Response header USGS uses to advertise remaining hourly quota.
110-
_QUOTA_HEADER = "x-ratelimit-remaining"
111-
112108
# Fan-out concurrency cap, read at call time (not import) so test
113109
# ``monkeypatch.setenv`` applies. Value grammar in :func:`_read_concurrency_env`;
114110
# the concurrency model is in the module docstring.
@@ -152,38 +148,11 @@ def _read_concurrency_env() -> int | None:
152148
return value
153149

154150

155-
# Shared per-call ``httpx.AsyncClient``, published via :func:`_publish`
156-
# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``)
157-
# reuse the same connection pool across every sub-request. ``None``
158-
# outside a chunked call — paginated helpers then open their own
159-
# short-lived client.
160-
_chunked_client: ContextVar[httpx.AsyncClient | None] = ContextVar(
161-
"_chunked_client", default=None
162-
)
163-
164-
165-
@contextmanager
166-
def _publish(client: httpx.AsyncClient) -> Iterator[None]:
167-
"""
168-
Publish ``client`` on the ``_chunked_client`` ContextVar so the
169-
paginated-loop helpers can borrow it via :func:`get_active_client`
170-
for the duration of the ``with`` block.
171-
172-
Parameters
173-
----------
174-
client : httpx.AsyncClient
175-
The client to publish.
176-
177-
Yields
178-
------
179-
None
180-
Yields once, for the duration of the bind.
181-
"""
182-
token = _chunked_client.set(client)
183-
try:
184-
yield
185-
finally:
186-
_chunked_client.reset(token)
151+
# Shared per-call ``httpx.AsyncClient``, scoped via ``with _chunked_client(c):``
152+
# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``) reuse
153+
# the same connection pool across every sub-request. ``None`` outside a chunked
154+
# call — paginated helpers then open their own short-lived client.
155+
_chunked_client: Ambient[httpx.AsyncClient | None] = Ambient("_chunked_client", None)
187156

188157

189158
def get_active_client() -> httpx.AsyncClient | None:
@@ -197,8 +166,8 @@ def get_active_client() -> httpx.AsyncClient | None:
197166
Returns
198167
-------
199168
httpx.AsyncClient or None
200-
The client published via :func:`_publish` if currently inside a
201-
:class:`ChunkedCall` run; ``None`` otherwise.
169+
The client scoped via ``with _chunked_client(...)`` if currently inside
170+
a :class:`ChunkedCall` run; ``None`` otherwise.
202171
"""
203172
return _chunked_client.get()
204173

@@ -541,7 +510,7 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
541510
)
542511

543512
async with httpx.AsyncClient(limits=limits, **HTTPX_DEFAULTS) as client:
544-
with _publish(client):
513+
with _chunked_client(client):
545514
reporter = _progress.current()
546515
if reporter is not None:
547516
reporter.set_chunks(self.plan.total)

0 commit comments

Comments
 (0)