Skip to content

Commit 3cc7c95

Browse files
thodson-usgsclaude
andcommitted
feat(waterdata): Migrate to httpx and add async parallel chunker
Replaces ``requests`` with ``httpx`` package-wide and adds an async parallel branch to the multi-value chunker, governed by the new ``API_USGS_CONCURRENT`` environment variable. Parallel mode is the default; set ``API_USGS_CONCURRENT=1`` to force the legacy sequential path. Benchmarked on a 52,753-site / 10-state ``get_daily`` query: 12.69s parallel vs 67.33s sequential (5.3x speedup), identical row counts, comparable quota burn. Why httpx: the parallel fan-out runs on a single shared ``httpx.AsyncClient`` so all sub-requests amortize one TCP+TLS handshake — impossible under the requests stack without a thread pool. Both modes share one ``_walk_pages_steps`` generator (pagination state machine) driven by thin sync/async loops; future retry/backoff lands in one place via the ``("wait", seconds)`` yield pattern the drivers can translate to ``time.sleep`` / ``await asyncio.sleep``. Production - New ``httpx`` dependency, dropped ``requests``; ``pytest-httpx`` replaces ``requests-mock`` in test extras. - Three httpx behavior diffs handled defensively: ``InvalidURL`` (URLs > 64 KB rejected client-side) via ``_safe_request_bytes`` / ``_safe_canonical_url``; ``Response.elapsed`` only populated on close via ``_safe_elapsed``; ``Response.url`` is a read-only property wrapped via ``_set_response_url``. - ``BaseMetadata.url`` coerced to ``str`` to preserve the string-typed contract. ``BaseMetadata.header`` is now ``httpx.Headers`` (see backwards-compat note below). - Chunker decorator gains ``walk_pages_async=`` and honors ``API_USGS_CONCURRENT``. Silently falls back to sequential (with INFO log) when ``walk_pages_async`` isn't wired or when args trip the inner filter chunker — preserves workloads that worked before parallel-by-default. - One shared ``httpx.Client`` / ``AsyncClient`` per chunked call via ``ContextVar``. Eliminates per-sub-request TCP+TLS handshakes in both modes. - Bug fix surfaced by the migration: ``filters.py`` was calling ``len(probe.url)`` which fails on ``httpx.URL`` — fixed. - Unified pagination state machine: ``_walk_pages``, ``_walk_pages_async``, and ``get_stats_data`` all route through ``_walk_pages_steps`` parameterized on ``page_data_fn`` and ``next_req_fn``. ~80 lines of duplicated pagination logic collapsed into one place; same source of truth across two pagination styles (OGC link-header and stats next-token). Tests - All 8 test files migrated from ``requests-mock`` to ``pytest-httpx``. - ``tests/conftest.py`` (new) centralizes ``mock_request``, ``assert_url_equivalent``, ``assert_mock_header`` (previously triplicated across three test files). - New tests for parallel-mode contract, env-var parsing (explicit-low / malformed / unset), client contextvar publishing, and the ``_safe_*`` defensive helpers. - 194 mocked tests pass; live-API tests unaffected by the migration (one pre-existing USGS column-drift failure unrelated to this PR). Backwards-compat - ``BaseMetadata.header`` type: ``requests.structures.CaseInsensitiveDict`` → ``httpx.Headers``. Both behave like case-insensitive dicts for reads, but ``httpx.Headers`` carries auto-added entries (``host``, ``content-length``) so ``md.header == {"key": "val"}`` literal equality breaks. Use ``md.header.get(...)``. - ``API_USGS_CONCURRENT`` now controls a feature that defaults on. Workloads that combined long multi-value lists with chunkable CQL filters (rare) automatically fall back to sequential — no caller changes needed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 46335b6 commit 3cc7c95

20 files changed

Lines changed: 1255 additions & 564 deletions

dataretrieval/nadp.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import warnings
3535
import zipfile
3636

37-
import requests
37+
import httpx
38+
39+
from dataretrieval.utils import HTTPX_DEFAULTS
3840

3941
_DEPRECATION_MESSAGE = (
4042
"The `nadp` module is deprecated and will be removed from `dataretrieval` "
@@ -213,7 +215,7 @@ def get_zip(url, filename):
213215
"""
214216
_warn_deprecated()
215217

216-
req = requests.get(url + filename)
218+
req = httpx.get(url + filename, **HTTPX_DEFAULTS)
217219
req.raise_for_status()
218220

219221
# z = zipfile.ZipFile(io.BytesIO(req.content))

dataretrieval/nwis.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import warnings
1212
from json import JSONDecodeError
1313

14+
import httpx
1415
import pandas as pd
15-
import requests
1616

1717
from dataretrieval.rdb import read_rdb
1818
from dataretrieval.utils import BaseMetadata
@@ -110,7 +110,7 @@ def wrapper(*args, **kwargs):
110110
return wrapper
111111

112112

113-
def _parse_json_or_raise(response: requests.Response) -> pd.DataFrame:
113+
def _parse_json_or_raise(response: httpx.Response) -> pd.DataFrame:
114114
"""Parse a JSON NWIS response, raising a helpful error on HTML responses."""
115115
try:
116116
return _read_json(response.json())
@@ -364,9 +364,7 @@ def get_stats(
364364

365365

366366
@_deprecated
367-
def query_waterdata(
368-
service: str, ssl_check: bool = True, **kwargs
369-
) -> requests.models.Response:
367+
def query_waterdata(service: str, ssl_check: bool = True, **kwargs) -> httpx.Response:
370368
"""
371369
Queries waterdata.
372370
@@ -382,7 +380,7 @@ def query_waterdata(
382380
383381
Returns
384382
-------
385-
request: ``requests.models.Response``
383+
request: ``httpx.Response``
386384
The response object from the API request to the web service
387385
"""
388386
major_params = ["site_no", "state_cd"]
@@ -412,7 +410,7 @@ def query_waterdata(
412410
@_deprecated
413411
def query_waterservices(
414412
service: str, ssl_check: bool = True, **kwargs
415-
) -> requests.models.Response:
413+
) -> httpx.Response:
416414
"""
417415
Queries waterservices.usgs.gov
418416
@@ -451,7 +449,7 @@ def query_waterservices(
451449
452450
Returns
453451
-------
454-
request: ``requests.models.Response``
452+
request: ``httpx.Response``
455453
The response object from the API request to the web service
456454
457455
"""
@@ -1123,7 +1121,7 @@ class NWIS_Metadata(BaseMetadata):
11231121
Response url
11241122
query_time: datetme.timedelta
11251123
Response elapsed time
1126-
header: requests.structures.CaseInsensitiveDict
1124+
header: httpx.Headers
11271125
Response headers
11281126
comments: str | None
11291127
Metadata comments, if any
@@ -1143,7 +1141,7 @@ def __init__(self, response, **parameters) -> None:
11431141
Parameters
11441142
----------
11451143
response: Response
1146-
Response object from requests module
1144+
Response object from httpx module
11471145
parameters: unpacked dictionary
11481146
Unpacked dictionary of the parameters supplied in the request
11491147

dataretrieval/streamstats.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
import json
99

10-
import requests
10+
import httpx
11+
12+
from dataretrieval.utils import HTTPX_DEFAULTS
1113

1214

1315
def download_workspace(workspaceID, format=""):
@@ -32,7 +34,7 @@ def download_workspace(workspaceID, format=""):
3234
payload = {"workspaceID": workspaceID, "format": format}
3335
url = "https://streamstats.usgs.gov/streamstatsservices/download"
3436

35-
r = requests.get(url, params=payload)
37+
r = httpx.get(url, params=payload, **HTTPX_DEFAULTS)
3638

3739
r.raise_for_status()
3840
return r
@@ -125,7 +127,7 @@ def get_watershed(
125127
}
126128
url = "https://streamstats.usgs.gov/streamstatsservices/watershed.geojson"
127129

128-
r = requests.get(url, params=payload)
130+
r = httpx.get(url, params=payload, **HTTPX_DEFAULTS)
129131

130132
r.raise_for_status()
131133

dataretrieval/utils.py

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@
55
import warnings
66
from collections.abc import Iterable
77

8+
import httpx
89
import pandas as pd
9-
import requests
1010

1111
import dataretrieval
1212
from dataretrieval.codes import tz
1313

14+
# Shared kwargs for every ``httpx`` call site in the package. ``requests``
15+
# defaulted to following redirects on GET; ``httpx`` does not. ``timeout=None``
16+
# preserves the no-timeout behavior the chunker docs promise (caller-driven
17+
# timeout policy via wrappers).
18+
HTTPX_DEFAULTS = {"follow_redirects": True, "timeout": None}
19+
1420

1521
def to_str(listlike, delimiter=","):
1622
"""Translates list-like objects into strings.
@@ -205,7 +211,7 @@ class BaseMetadata:
205211
Response url
206212
query_time: datetme.timedelta
207213
Response elapsed time
208-
header: requests.structures.CaseInsensitiveDict
214+
header: httpx.Headers
209215
Response headers
210216
211217
"""
@@ -216,7 +222,7 @@ def __init__(self, response) -> None:
216222
Parameters
217223
----------
218224
response: Response
219-
Response object from requests module
225+
Response object from httpx module
220226
221227
Returns
222228
-------
@@ -225,8 +231,8 @@ def __init__(self, response) -> None:
225231
226232
"""
227233

228-
# These are built from the API response
229-
self.url = response.url
234+
# Coerce httpx.URL -> str: BaseMetadata.url has always been str.
235+
self.url = str(response.url)
230236
self.query_time = response.elapsed
231237
self.header = response.headers
232238
self.comment = None
@@ -254,18 +260,29 @@ def __repr__(self) -> str:
254260
return f"{type(self).__name__}(url={self.url})"
255261

256262

263+
_URL_TOO_LONG_EXAMPLE = """
264+
# n is the number of chunks to divide the query into \n
265+
split_list = np.array_split(site_list, n)
266+
data_list = [] # list to store chunk results in \n
267+
# loop through chunks and make requests \n
268+
for site_list in split_list: \n
269+
data = nwis.get_record(sites=site_list, service='dv', \n
270+
start=start, end=end) \n
271+
data_list.append(data) # append results to list"""
272+
273+
257274
def query(url, payload, delimiter=",", ssl_check=True):
258275
"""Send a query.
259276
260-
Wrapper for requests.get that handles errors, converts listed
277+
Wrapper for httpx.get that handles errors, converts listed
261278
query parameters to comma separated strings, and returns response.
262279
263280
Parameters
264281
----------
265282
url: string
266283
URL to query
267284
payload: dict
268-
query parameters passed to ``requests.get``
285+
query parameters passed to ``httpx.get``
269286
delimiter: string
270287
delimiter to use with lists
271288
ssl_check: bool
@@ -275,19 +292,34 @@ def query(url, payload, delimiter=",", ssl_check=True):
275292
Returns
276293
-------
277294
string: query response
278-
The response from the API query ``requests.get`` function call.
295+
The response from the API query ``httpx.get`` function call.
279296
"""
280297

281298
for key, value in payload.items():
282299
payload[key] = to_str(value, delimiter)
283-
# for index in range(len(payload)):
284-
# key, value = payload[index]
285-
# payload[index] = (key, to_str(value))
300+
# httpx serializes None params as ``foo=``; USGS rejects with 400.
301+
# Drop them. (``to_str`` returns None for non-iterable scalars like bools.)
302+
payload = {k: v for k, v in payload.items() if v is not None}
286303

287304
# define the user agent for the query
288305
user_agent = {"user-agent": f"python-dataretrieval/{dataretrieval.__version__}"}
289306

290-
response = requests.get(url, params=payload, headers=user_agent, verify=ssl_check)
307+
try:
308+
response = httpx.get(
309+
url,
310+
params=payload,
311+
headers=user_agent,
312+
verify=ssl_check,
313+
**HTTPX_DEFAULTS,
314+
)
315+
except httpx.InvalidURL as exc:
316+
# httpx rejects oversize URLs client-side; mirror the 414 branch
317+
# so caller handling is unchanged.
318+
raise ValueError(
319+
"Request URL too long. Modify your query to use fewer sites. "
320+
f"httpx rejected the URL client-side: {exc}. Pseudo-code "
321+
f"example of how to split your query: \n {_URL_TOO_LONG_EXAMPLE}"
322+
) from exc
291323

292324
if response.status_code == 400:
293325
raise ValueError(
@@ -299,24 +331,14 @@ def query(url, payload, delimiter=",", ssl_check=True):
299331
+ f"URL: {response.url}"
300332
)
301333
elif response.status_code == 414:
302-
_reason = response.reason
303-
_example = """
304-
# n is the number of chunks to divide the query into \n
305-
split_list = np.array_split(site_list, n)
306-
data_list = [] # list to store chunk results in \n
307-
# loop through chunks and make requests \n
308-
for site_list in split_list: \n
309-
data = nwis.get_record(sites=site_list, service='dv', \n
310-
start=start, end=end) \n
311-
data_list.append(data) # append results to list"""
312334
raise ValueError(
313335
"Request URL too long. Modify your query to use fewer sites. "
314-
+ f"API response reason: {_reason}. Pseudo-code example of how to "
315-
+ f"split your query: \n {_example}"
336+
f"API response reason: {response.reason_phrase}. Pseudo-code "
337+
f"example of how to split your query: \n {_URL_TOO_LONG_EXAMPLE}"
316338
)
317339
elif response.status_code in [500, 502, 503]:
318340
raise ValueError(
319-
f"Service Unavailable: {response.status_code} {response.reason}. "
341+
f"Service Unavailable: {response.status_code} {response.reason_phrase}. "
320342
+ f"The service at {response.url} may be down or experiencing issues."
321343
)
322344

dataretrieval/waterdata/api.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@
1313
from typing import get_args
1414
from urllib.parse import quote
1515

16+
import httpx
1617
import pandas as pd
17-
import requests
18-
from requests.models import PreparedRequest
1918

20-
from dataretrieval.utils import BaseMetadata, _attach_datetime_columns, to_str
19+
from dataretrieval.utils import (
20+
HTTPX_DEFAULTS,
21+
BaseMetadata,
22+
_attach_datetime_columns,
23+
to_str,
24+
)
2125
from dataretrieval.waterdata.filters import FILTER_LANG
2226
from dataretrieval.waterdata.types import (
2327
CODE_SERVICES,
@@ -188,7 +192,11 @@ def get_daily(
188192
:mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking,
189193
and the lexicographic-comparison pitfall.
190194
convert_type : boolean, optional
191-
If True, converts columns to appropriate types.
195+
If True, converts columns to appropriate types. Long multi-value
196+
inputs are transparently chunked across sub-requests that fan
197+
out concurrently by default — see
198+
:mod:`dataretrieval.waterdata.chunking` for the
199+
``API_USGS_CONCURRENT`` tuning knob.
192200
193201
Returns
194202
-------
@@ -2110,7 +2118,11 @@ def get_codes(code_service: CODE_SERVICES) -> pd.DataFrame:
21102118

21112119
url = f"{SAMPLES_URL}/codeservice/{code_service}?mimeType=application%2Fjson"
21122120

2113-
response = requests.get(url, headers=_default_headers())
2121+
response = httpx.get(
2122+
url,
2123+
headers=_default_headers(),
2124+
**HTTPX_DEFAULTS,
2125+
)
21142126

21152127
response.raise_for_status()
21162128

@@ -2336,12 +2348,14 @@ def get_samples(
23362348

23372349
url = f"{SAMPLES_URL}/{service}/{profile}"
23382350

2339-
req = PreparedRequest()
2340-
req.prepare_url(url, params=params)
2341-
logger.info("Request: %s", req.url)
2351+
logger.info("Request: %s", httpx.URL(url).copy_merge_params(params))
23422352

2343-
response = requests.get(
2344-
url, params=params, verify=ssl_check, headers=_default_headers()
2353+
response = httpx.get(
2354+
url,
2355+
params=params,
2356+
verify=ssl_check,
2357+
headers=_default_headers(),
2358+
**HTTPX_DEFAULTS,
23452359
)
23462360

23472361
response.raise_for_status()
@@ -2408,12 +2422,14 @@ def get_samples_summary(
24082422
url = f"{SAMPLES_URL}/summary/{quote(monitoringLocationIdentifier, safe='')}"
24092423
params = {"mimeType": "text/csv"}
24102424

2411-
req = PreparedRequest()
2412-
req.prepare_url(url, params=params)
2413-
logger.info("Request: %s", req.url)
2425+
logger.info("Request: %s", httpx.URL(url).copy_merge_params(params))
24142426

2415-
response = requests.get(
2416-
url, params=params, verify=ssl_check, headers=_default_headers()
2427+
response = httpx.get(
2428+
url,
2429+
params=params,
2430+
verify=ssl_check,
2431+
headers=_default_headers(),
2432+
**HTTPX_DEFAULTS,
24172433
)
24182434

24192435
response.raise_for_status()

0 commit comments

Comments
 (0)