Skip to content

Commit 63a13ba

Browse files
thodson-usgsclaude
andcommitted
feat(wateruse): fetch fanned-out locations concurrently (no backoff needed)
A multi-value `state`/`county`/`huc` selector now fans out over a `ThreadPoolExecutor` instead of a serial loop. Concurrency is capped by a module-level `MAX_CONCURRENT_REQUESTS` (default 4; set to 1 for serial) — kept in this module rather than honoring the OGC engine's `API_USGS_CONCURRENT`, so wateruse stays decoupled from the engine. The locations are independent single requests over the synchronous `_get`, so the thread pool needs no shared state; `pool.map` preserves input order and re-raises the first failure. Stress-tested against the live NWDC at concurrency 1/2/4/8/16 over 16 distinct locations: all 200s, zero rate-limit/connection errors, and the rate budget depletes one token per request regardless of concurrency — so no request backoff/retry is required. End-to-end results are concurrency-invariant (byte-identical at conc 1/4/8) with a ~3.6x speedup at the default of 4. The single-location common path skips the pool entirely. Tests route each location to its own mocked response so the fan-out assertions are deterministic under thread races, and cover both the concurrent and serial (cap=1) paths. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
1 parent c3c8365 commit 63a13ba

2 files changed

Lines changed: 66 additions & 19 deletions

File tree

dataretrieval/wateruse.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io
4343
import logging
4444
from collections.abc import Iterable
45+
from concurrent.futures import ThreadPoolExecutor
4546
from typing import Any
4647

4748
import httpx
@@ -75,6 +76,13 @@
7576
#: Temporal resolutions: monthly, annual calendar year, annual water year.
7677
TIME_RESOLUTIONS = ("monthly", "annualcy", "annualwy")
7778

79+
#: Maximum locations fetched concurrently when a list of state/county/huc
80+
#: selectors is fanned out (one request per location). Kept conservative
81+
#: because this module intentionally carries no request backoff/retry; the
82+
#: NWDC tolerates this level of concurrency without rate-limit errors (verified
83+
#: by stress test). Set ``wateruse.MAX_CONCURRENT_REQUESTS = 1`` for serial.
84+
MAX_CONCURRENT_REQUESTS = 4
85+
7886
# Page responses carry the HUC12 identifier in this column; it must stay a
7987
# string so leading zeros (e.g. "010900020502") survive the round trip.
8088
_HUC12_COLUMN = "huc12_id"
@@ -104,8 +112,9 @@ def get_wateruse(
104112
frame.
105113
106114
Each selector also accepts a list of values. The NWDC queries one area per
107-
request, so a list is fanned out into one request per value and the results
108-
are concatenated — convenient, but proportionally slower for many areas.
115+
request, so a list is fanned out into one request per value — up to
116+
:data:`MAX_CONCURRENT_REQUESTS` in parallel — and the results are
117+
concatenated in the order given.
109118
110119
Parameters
111120
----------
@@ -209,17 +218,25 @@ def get_wateruse(
209218
# The NWDC queries one location per request, so fan a multi-value selector
210219
# out into a request per location and concatenate the results.
211220
locations = _resolve_locations(state, county, huc)
212-
frame, first_response = _fetch_all_pages(
213-
{**base_params, "location": locations[0]}, ssl_check=ssl_check
214-
)
215-
frames = [frame]
216-
for location in locations[1:]:
217-
frame, _ = _fetch_all_pages(
221+
222+
def _fetch(location: str) -> tuple[pd.DataFrame, httpx.Response]:
223+
return _fetch_all_pages(
218224
{**base_params, "location": location}, ssl_check=ssl_check
219225
)
220-
frames.append(frame)
221-
df = pd.concat(frames, ignore_index=True) if len(frames) > 1 else frames[0]
222-
return df, BaseMetadata(first_response)
226+
227+
if len(locations) == 1:
228+
# Common case: no pool, and no extra concat copy of the whole result.
229+
frame, response = _fetch(locations[0])
230+
return frame, BaseMetadata(response)
231+
232+
# Fan out concurrently (bounded), preserving input order. The locations are
233+
# independent single requests, so a thread pool over the synchronous fetch
234+
# needs no shared state or backoff; ``pool.map`` re-raises the first failure.
235+
workers = min(len(locations), max(1, MAX_CONCURRENT_REQUESTS))
236+
with ThreadPoolExecutor(max_workers=workers) as pool:
237+
results = list(pool.map(_fetch, locations))
238+
df = pd.concat([frame for frame, _ in results], ignore_index=True)
239+
return df, BaseMetadata(results[0][1])
223240

224241

225242
# Valid HUC code lengths (digits) → the hydrologic-unit level they query.

tests/wateruse_test.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -220,19 +220,49 @@ def test_state_selector_builds_location_query(httpx_mock):
220220
assert qs["location"] == ["stateCd:RI"]
221221

222222

223-
def test_multiple_states_fan_out_into_separate_requests(httpx_mock):
224-
"""A list selector issues one request per location and concatenates them."""
225-
httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_P1) # first state
226-
httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_P2) # second state
223+
def test_multiple_states_fan_out_preserves_input_order(httpx_mock):
224+
"""A list selector fans out one request per location and concatenates the
225+
results in the order given — even though the requests run concurrently and
226+
may reach the server out of order. Each location is routed to its own
227+
response so attribution is deterministic regardless of arrival order."""
228+
httpx_mock.add_response(
229+
method="GET", url=re.compile(r".*location=stateCd%3ARI.*"), text=_CSV_P1
230+
)
231+
httpx_mock.add_response(
232+
method="GET", url=re.compile(r".*location=stateCd%3AWI.*"), text=_CSV_P2
233+
)
227234

228235
df, _ = get_wateruse(model="wu-public-supply-wd", state=["RI", "Wisconsin"])
229236

230-
# _CSV_P1 (2 rows) + _CSV_P2 (1 row), one request per state.
231-
assert len(df) == 3
237+
# RI's rows (_CSV_P1) precede WI's (_CSV_P2) regardless of which request the
238+
# thread pool dispatched first.
239+
assert df["huc12_id"].tolist() == [
240+
"010900020502",
241+
"010900020503",
242+
"010900020504",
243+
]
232244
reqs = httpx_mock.get_requests()
233245
assert len(reqs) == 2
234-
locations = [parse_qs(urlsplit(str(r.url)).query)["location"][0] for r in reqs]
235-
assert locations == ["stateCd:RI", "stateCd:WI"]
246+
assert {parse_qs(urlsplit(str(r.url)).query)["location"][0] for r in reqs} == {
247+
"stateCd:RI",
248+
"stateCd:WI",
249+
}
250+
251+
252+
def test_fan_out_is_serial_when_concurrency_is_one(httpx_mock, monkeypatch):
253+
"""``MAX_CONCURRENT_REQUESTS = 1`` still fans out correctly (serial path)."""
254+
monkeypatch.setattr(wateruse, "MAX_CONCURRENT_REQUESTS", 1)
255+
httpx_mock.add_response(
256+
method="GET", url=re.compile(r".*location=stateCd%3ARI.*"), text=_CSV_P1
257+
)
258+
httpx_mock.add_response(
259+
method="GET", url=re.compile(r".*location=stateCd%3AWI.*"), text=_CSV_P2
260+
)
261+
262+
df, _ = get_wateruse(model="wu-public-supply-wd", state=["RI", "WI"])
263+
264+
assert len(df) == 3
265+
assert len(httpx_mock.get_requests()) == 2
236266

237267

238268
# --- _resolve_locations unit tests (no HTTP) -------------------------------

0 commit comments

Comments
 (0)