Skip to content

Commit 3e56b62

Browse files
thodson-usgsclaude
andauthored
fix(waterdata): gate the chunked fan-out with a semaphore, not the pool (#322)
ChunkedCall._run dispatched every pending sub-request into one asyncio.gather and relied on the shared httpx.AsyncClient connection pool as the only concurrency throttle (max_connections sized from API_USGS_CONCURRENT). That collides with the client's pool-acquire timeout (60 s, from HTTPX_DEFAULTS): a sub-request that can't get a connection waits in httpx's pool queue, and that wait is bounded by the pool-acquire timeout. So whenever every pooled connection stays busy past that window with none freeing — a batch of large, slowly-streaming pages is enough — the still-queued tail of the fan-out times out with httpx.PoolTimeout. Being a TransportError it burns the per-sub-request retry budget and ultimately surfaces as a bogus *resumable* ServiceInterrupted, telling the user to wait for an upstream that never saw the request. Gate each fetch attempt with an asyncio.Semaphore sized from API_USGS_CONCURRENT instead; the connection pool is now merely sized to match so in-flight sub-requests reuse keepalive connections. Parked sub-requests wait on the semaphore before they touch the pool, so no transport clock runs while they wait and the pool timeout reverts to its protective role (a genuinely wedged checkout). The slot is acquired per attempt, so a sub-request sleeping off a retry backoff doesn't hold one. "unbounded" degenerates to a semaphore sized at the plan total, so there is a single gated code path. Observable behavior is otherwise unchanged: same plan, same sub-request order, same resume semantics. Tests: - in-flight high-water-mark probe (parametrized capped/unbounded) — the fetch-level concurrency equals the cap, not the plan total; the capped case fails on the pre-fix code. - real-localhost-server end-to-end test — mock transports bypass the pool, so this drives the chunker's shared client against a slow server past a scaled-down pool timeout; reproduces the spurious resumable ServiceInterrupted on the pre-fix code and completes on this branch. Also raise the default API_USGS_CONCURRENT from 16 to 32 and correct the concurrency rationale: N caps how many of a chunked query's sub-requests are in flight at once (a client-side connection/latency knob), but does not affect the API rate limit -- a chunked call issues the same number of sub-requests regardless of N. Live testing showed the API serves 300 simultaneous requests without 5xx; heavy use is rate-limited by request volume (HTTP 429), mitigated by an API_USGS_PAT token. Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
1 parent 4b7464f commit 3e56b62

4 files changed

Lines changed: 205 additions & 44 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 74 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,23 @@
1010
single-step plan — ``ChunkedCall`` has one code path either way.
1111
1212
Concurrency: ``multi_value_chunked`` fans every pending sub-request out
13-
under one ``asyncio.gather`` sharing a single ``httpx.AsyncClient``;
14-
concurrency is bounded purely by the client's connection pool
15-
(``httpx.Limits(max_connections=N, max_keepalive_connections=N)``), so
16-
the pool throttles. ``API_USGS_CONCURRENT`` resolves
17-
``N``: an integer N > 1 caps connections at N; ``1`` pins a single
18-
connection (one request at a time); the literal ``unbounded`` removes
19-
the cap (``N=None``). The default (16) is the server-friendly sweet
20-
spot; higher values can trip USGS burst-protection 5xx in practice. The
21-
fan-out runs in a short-lived worker thread (an ``anyio`` blocking
22-
portal), so it works whether or not the caller is already inside an
23-
event loop (Jupyter / IPython / async apps).
13+
under one ``asyncio.gather`` sharing a single ``httpx.AsyncClient``. An
14+
``asyncio.Semaphore`` — not the client's connection pool, which is
15+
merely sized to match — caps the sub-requests in flight at ``N``; see
16+
:meth:`ChunkedCall._run` for why the gate must be the semaphore rather
17+
than the pool. ``API_USGS_CONCURRENT`` resolves ``N``: an integer N > 1
18+
allows N sub-requests in flight; ``1`` forces sequential dispatch (one
19+
request at a time); the literal ``unbounded`` lifts the cap. ``N``
20+
bounds only how many of a chunked query's sub-requests are in flight at
21+
once — a client-side trade-off between open connections and fan-out
22+
latency. It does not affect the API rate limit: a chunked call issues
23+
the same number of sub-requests regardless of ``N``, so ``N`` changes
24+
their timing, not the total request volume. The USGS API rate-limits by
25+
volume over time (HTTP 429), not by simultaneity; set ``API_USGS_PAT``
26+
to raise that quota. The default of 32 is a conservative cap that keeps
27+
connection use modest. The fan-out runs in a short-lived worker thread
28+
(an ``anyio`` blocking portal), so it works whether or not the caller is
29+
already inside an event loop (Jupyter / IPython / async apps).
2430
2531
Retries: each sub-request is retried on a transient failure (429,
2632
5xx, connect/read timeout) with exponential backoff + full jitter,
@@ -119,7 +125,7 @@
119125
# ``monkeypatch.setenv`` applies. Value grammar in :func:`_read_concurrency_env`;
120126
# the concurrency model is in the module docstring.
121127
_CONCURRENCY_ENV = "API_USGS_CONCURRENT"
122-
_CONCURRENCY_DEFAULT = 16
128+
_CONCURRENCY_DEFAULT = 32
123129
_CONCURRENCY_UNBOUNDED = "unbounded"
124130

125131

@@ -130,10 +136,10 @@ def _read_concurrency_env() -> int | None:
130136
Returns
131137
-------
132138
int or None
133-
``1`` for a single connection; an integer >1 for bounded
134-
concurrency; ``None`` to disable the per-call cap entirely
135-
(``unbounded`` keyword). Unset → default of
136-
``_CONCURRENCY_DEFAULT``.
139+
``1`` for sequential dispatch (one sub-request at a time); an
140+
integer >1 for bounded concurrency; ``None`` to disable the
141+
per-call cap entirely (``unbounded`` keyword). Unset → default
142+
of ``_CONCURRENCY_DEFAULT``.
137143
"""
138144
raw = os.environ.get(_CONCURRENCY_ENV)
139145
if raw is None:
@@ -1307,9 +1313,9 @@ class ChunkedCall:
13071313
:class:`httpx.AsyncClient`, applies the failure-precedence rules, and
13081314
combines; :meth:`resume` drives it through an ``anyio`` blocking
13091315
portal so it works whether or not the caller is already inside an
1310-
event loop. Concurrency is bounded purely by the client's connection
1311-
pool, so a single connection (``API_USGS_CONCURRENT=1``) is just a
1312-
degenerate gather.
1316+
event loop. Concurrency is bounded by a per-run ``asyncio.Semaphore``
1317+
(see :meth:`_run`), so sequential dispatch
1318+
(``API_USGS_CONCURRENT=1``) is just a degenerate gather.
13131319
13141320
A ``ChunkedCall`` is created internally when a :class:`ChunkPlan`
13151321
executes; callers reach it via :attr:`ChunkInterrupted.call` on
@@ -1551,20 +1557,33 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
15511557
``.call``; ``exc.call.resume()`` then re-issues only the unfinished
15521558
indices through this same runner.
15531559
1554-
Concurrency is bounded by the client's connection pool —
1555-
``httpx.Limits(max_connections=N, max_keepalive_connections=N)``
1556-
where ``N = max_concurrent`` (``None`` for unbounded). The gather
1557-
dispatches *every* pending sub-request and the pool throttles, so
1558-
``N=1`` is just a single-connection gather (one request at a time)
1559-
and ``total <= 1`` is just a one-element gather.
1560+
The gather dispatches *every* pending sub-request at once, but an
1561+
``asyncio.Semaphore`` caps the number of concurrent fetches at
1562+
``N = max_concurrent`` — ``None`` lifts the cap, ``N=1`` runs them
1563+
one at a time. The connection pool is sized to the same ``N``
1564+
(``httpx.Limits(max_connections=N, max_keepalive_connections=N)``)
1565+
so the in-flight fetches reuse keepalive connections.
1566+
1567+
The semaphore, not the pool, is deliberately the throttle. If the
1568+
pool throttled instead, the excess sub-requests would queue
1569+
*inside* httpx waiting for a connection, and that wait counts
1570+
against the pool-acquire timeout (60 s, from ``HTTPX_DEFAULTS``).
1571+
A batch of slow pages that keeps every connection busy past that
1572+
window would then trip ``httpx.PoolTimeout`` on the queued tail —
1573+
a purely client-side failure that consumes the retry budget and
1574+
surfaces as a spurious resumable ``ServiceInterrupted``. Holding
1575+
sub-requests at the semaphore keeps them out of the pool until a
1576+
slot frees, so the pool timeout only fires for a genuinely stuck
1577+
connection.
1578+
15601579
The shared client is published on :data:`_chunked_client` so
15611580
the paginated-loop helpers reuse its connection pool.
15621581
15631582
Parameters
15641583
----------
15651584
max_concurrent : int or None
1566-
Maximum simultaneous connections (the pool cap). ``None``
1567-
disables the cap.
1585+
Maximum sub-requests in flight (the semaphore value, and the
1586+
connection-pool size). ``None`` lifts the cap entirely.
15681587
15691588
Returns
15701589
-------
@@ -1583,25 +1602,45 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
15831602
holding the sparse completed sub-requests; ``.call.resume()``
15841603
re-issues the unfinished ones.
15851604
"""
1586-
# ``httpx.Limits()`` defaults to ``max_connections=100`` — at higher
1587-
# concurrency the pool would silently bottleneck the fan-out behind
1588-
# that cap. Set it to the resolved concurrency so the pool *is* the
1589-
# throttle (``None`` for truly unbounded).
1605+
# The semaphore is the throttle; the pool is merely sized to match
1606+
# it. Left at httpx's default client limits (``max_connections=100``,
1607+
# keepalive 20) the pool would bottleneck a wider cap or churn
1608+
# connections by keeping too few alive. See the method docstring for
1609+
# why the gate can't be the pool itself. ``unbounded``
1610+
# (``max_concurrent=None``) is a degenerate cap at the plan total — a
1611+
# semaphore that can never block — so gated is the only code path.
15901612
limits = httpx.Limits(
15911613
max_connections=max_concurrent, max_keepalive_connections=max_concurrent
15921614
)
1615+
semaphore = asyncio.Semaphore(
1616+
self.plan.total if max_concurrent is None else max_concurrent
1617+
)
15931618

15941619
async with httpx.AsyncClient(limits=limits, **HTTPX_DEFAULTS) as client:
15951620
with _publish(client):
15961621
reporter = _progress.current()
15971622
if reporter is not None:
15981623
reporter.set_chunks(self.plan.total)
15991624

1625+
async def fetch_gated(
1626+
args: dict[str, Any],
1627+
) -> tuple[pd.DataFrame, httpx.Response]:
1628+
"""One fetch attempt under the concurrency gate.
1629+
1630+
The slot is held for the attempt's full duration —
1631+
every page of a paginated sub-request — but acquired
1632+
per *attempt* (this is what ``_retry`` re-invokes), so
1633+
a sub-request sleeping off a retry backoff isn't
1634+
holding a slot while it isn't touching the server.
1635+
"""
1636+
async with semaphore:
1637+
return await self.fetch(args)
1638+
16001639
async def track(
16011640
index: int, args: dict[str, Any]
16021641
) -> tuple[pd.DataFrame, httpx.Response]:
16031642
"""One sub-request (with retry) + result-store + progress tick."""
1604-
result = await _retry(lambda: self.fetch(args), self.retry_policy)
1643+
result = await _retry(lambda: fetch_gated(args), self.retry_policy)
16051644
self._chunks[index] = result
16061645
if reporter is not None:
16071646
# Chunks finish out of order under gather, so tick the
@@ -1610,7 +1649,7 @@ async def track(
16101649
return result
16111650

16121651
# Dispatch every pending sub-request concurrently; the
1613-
# connection pool (``limits``) is the only throttle.
1652+
# semaphore (via ``fetch_gated``) is the only throttle.
16141653
# ``return_exceptions`` keeps completed pairs after a sibling
16151654
# fails, so partial state stays recoverable via :meth:`resume`.
16161655
# Failure precedence, in order:
@@ -1706,8 +1745,8 @@ def wrapper(
17061745
limit = _WATERDATA_URL_BYTE_LIMIT if url_limit is None else url_limit
17071746
plan = ChunkPlan(args, build_request, limit)
17081747
retry_policy = RetryPolicy.from_env()
1709-
# The connection-pool cap is resolved inside ``resume()`` from
1710-
# ``API_USGS_CONCURRENT``; ``1`` is a single-connection gather,
1748+
# The concurrency cap is resolved inside ``resume()`` from
1749+
# ``API_USGS_CONCURRENT``; ``1`` is a sequential gather,
17111750
# ``total <= 1`` a one-element gather — no special branch.
17121751
return plan.execute(fetch, retry_policy, finalize)
17131752

dataretrieval/waterdata/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1551,7 +1551,7 @@ async def _fetch_once(
15511551
and iterates the cartesian product. With no chunkable inputs the
15521552
decorator passes args through unchanged. The decorator gathers every
15531553
sub-request over one shared :class:`httpx.AsyncClient` (concurrency
1554-
bounded by the connection pool, sized from ``API_USGS_CONCURRENT``)
1554+
bounded by a semaphore, sized from ``API_USGS_CONCURRENT``)
15551555
and returns a *synchronous* wrapper, so ``get_ogc_data`` keeps calling
15561556
``_fetch_once(args, finalize=...)`` synchronously. The return shape is
15571557
``(frame, response)``.

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def non_mocked_hosts() -> list[str]:
3838
def _pin_chunker_env(monkeypatch):
3939
"""Pin every test to one connection and no retries.
4040
41-
Production defaults ``API_USGS_CONCURRENT`` to 16 and
41+
Production defaults ``API_USGS_CONCURRENT`` to 32 and
4242
``API_USGS_RETRIES`` to 4. Pinning ``API_USGS_CONCURRENT=1`` keeps
4343
sub-request dispatch deterministic for the mocked suite, and
4444
``API_USGS_RETRIES=0`` makes a single transient surface immediately

tests/waterdata_chunking_test.py

Lines changed: 129 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import asyncio
1919
import concurrent.futures
2020
import datetime
21+
import http.server
2122
import sys
23+
import threading
24+
import time
2225
import warnings
2326
from unittest import mock
2427
from urllib.parse import quote_plus
@@ -32,6 +35,7 @@
3235
pytest.skip("Skip entire module on Python < 3.10", allow_module_level=True)
3336

3437
from dataretrieval.exceptions import DataRetrievalError
38+
from dataretrieval.utils import HTTPX_DEFAULTS
3539
from dataretrieval.waterdata import chunking as _chunking
3640
from dataretrieval.waterdata import utils as _utils
3741
from dataretrieval.waterdata.chunking import (
@@ -56,6 +60,7 @@
5660
_retry,
5761
_retryable,
5862
_safe_request_bytes,
63+
get_active_client,
5964
multi_value_chunked,
6065
)
6166
from dataretrieval.waterdata.utils import _DATE_RANGE_PARAMS, _construct_api_requests
@@ -1314,13 +1319,15 @@ def test_iter_sub_args_passthrough_yields_a_copy():
13141319
# --- async fan-out path ----------------------------------------------------
13151320
#
13161321
# Every sub-request is gathered over one ``httpx.AsyncClient`` and
1317-
# concurrency is bounded purely by that client's connection pool, sized
1318-
# from ``API_USGS_CONCURRENT``. The conftest's ``_pin_chunker_env``
1319-
# autouse pins ``API_USGS_CONCURRENT=1`` (a single connection) for the
1320-
# whole suite; each test below raises it so the gather can dispatch
1321-
# sub-requests under a wider pool. The decorated async fetcher is the
1322-
# SAME one used on both first-run and resume. No real ``httpx.AsyncClient``
1323-
# round-trip occurs (the fakes return mock data), even though
1322+
# concurrency is bounded by an ``asyncio.Semaphore`` sized from
1323+
# ``API_USGS_CONCURRENT`` (the client's connection pool is sized to
1324+
# match, but the semaphore is the throttle — see ``ChunkedCall._run``).
1325+
# The conftest's ``_pin_chunker_env`` autouse pins
1326+
# ``API_USGS_CONCURRENT=1`` (sequential dispatch) for the whole suite;
1327+
# each test below raises it so the gather can dispatch sub-requests
1328+
# under a wider cap. The decorated async fetcher is the SAME one used on
1329+
# both first-run and resume. No real ``httpx.AsyncClient`` round-trip
1330+
# occurs (the fakes return mock data), even though
13241331
# :meth:`ChunkedCall._run` opens one for pool management.
13251332

13261333

@@ -1489,6 +1496,121 @@ async def fetch(args):
14891496
assert len(df) == len(calls)
14901497

14911498

1499+
# Eight 20-char sites against ``url_limit=240`` (base 200): any two atoms
1500+
# joined overflow the 40-byte budget, so the planner lands on eight
1501+
# singleton sub-requests — enough fan-out to observe the concurrency gate.
1502+
_EIGHT_SINGLETON_SITES = [f"S{i}" * 10 for i in range(8)]
1503+
1504+
1505+
def _concurrency_probe(in_flight):
1506+
"""An async fetch that records the high-water mark of simultaneous
1507+
invocations in ``in_flight`` (keys ``now``/``max``). The ``sleep(0)``
1508+
yields to the loop while "in flight", so overlap is observable."""
1509+
1510+
async def fetch_async(args):
1511+
in_flight["now"] += 1
1512+
in_flight["max"] = max(in_flight["max"], in_flight["now"])
1513+
await asyncio.sleep(0)
1514+
in_flight["now"] -= 1
1515+
return pd.DataFrame({"id": [_atom_id(args)]}), _ok_response()
1516+
1517+
return fetch_async
1518+
1519+
1520+
@pytest.mark.parametrize(
1521+
("cap", "expected_high_water"),
1522+
[
1523+
pytest.param(2, 2, id="capped"),
1524+
pytest.param("unbounded", len(_EIGHT_SINGLETON_SITES), id="unbounded"),
1525+
],
1526+
)
1527+
def test_fan_out_in_flight_high_water_mark_is_the_cap(
1528+
monkeypatch, cap, expected_high_water
1529+
):
1530+
"""The fetch-level high-water mark of simultaneous sub-requests IS the
1531+
``API_USGS_CONCURRENT`` cap — genuine parallelism up to it, never past
1532+
it — and ``unbounded`` degenerates to every sub-request at once.
1533+
1534+
Regression: the cap used to be enforced only by the shared client's
1535+
connection-pool size, so sub-requests beyond it queued on connection
1536+
*acquisition*, subject to the client's pool-acquire timeout (see
1537+
``ChunkedCall._run``). The semaphore parks excess sub-requests before
1538+
they touch the pool.
1539+
"""
1540+
in_flight = {"now": 0, "max": 0}
1541+
fetch = _async_chunked_fetch(
1542+
monkeypatch, _concurrency_probe(in_flight), max_concurrent=cap
1543+
)
1544+
1545+
df, _ = fetch({"sites": list(_EIGHT_SINGLETON_SITES)})
1546+
1547+
assert len(df) == len(_EIGHT_SINGLETON_SITES) # all sub-requests completed
1548+
assert in_flight["max"] == expected_high_water
1549+
1550+
1551+
def test_fan_out_outlives_pool_timeout_on_real_transport(monkeypatch):
1552+
"""End-to-end regression for the pool-timeout starvation bug: the
1553+
fan-out must survive every pooled connection staying busy past the
1554+
client's pool-acquire timeout (the stall mechanism is documented on
1555+
``ChunkedCall._run``; at production scale think a batch of large,
1556+
slowly-streaming pages).
1557+
1558+
Sub-requests here send real HTTP to a slow localhost server through
1559+
the chunker's shared client — fakes can't catch this, since
1560+
``MockTransport`` bypasses the connection pool. With the pool as the
1561+
only throttle, 2 connections busy for 0.35 s each and the 0.2 s pool
1562+
timeout pinned below, the 2 queued sub-requests sat out the full
1563+
timeout with no completion to reset their clocks →
1564+
``httpx.PoolTimeout`` → (retries exhausted, ``API_USGS_RETRIES=0``)
1565+
a spurious resumable ``ServiceInterrupted``. Gated by the semaphore,
1566+
queued sub-requests never touch the pool and the call completes.
1567+
"""
1568+
1569+
class _SlowHandler(http.server.BaseHTTPRequestHandler):
1570+
protocol_version = "HTTP/1.1" # keepalive, so pooled connections reuse
1571+
1572+
def do_GET(self):
1573+
time.sleep(0.35) # hold the connection busy past the pool timeout
1574+
body = b'{"ok": true}'
1575+
self.send_response(200)
1576+
self.send_header("Content-Length", str(len(body)))
1577+
self.end_headers()
1578+
self.wfile.write(body)
1579+
1580+
def log_message(self, *args): # keep pytest output clean
1581+
pass
1582+
1583+
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), _SlowHandler)
1584+
thread = threading.Thread(target=server.serve_forever, daemon=True)
1585+
thread.start()
1586+
# Everything past the thread start is in the try, so any setup failure
1587+
# (monkeypatch, decorator construction) still tears the server down.
1588+
try:
1589+
url = f"http://127.0.0.1:{server.server_address[1]}/"
1590+
1591+
# Scale the production pool timeout (see ``HTTPX_DEFAULTS``) down to
1592+
# 0.2 s so the pre-semaphore failure mode reproduces in test time.
1593+
monkeypatch.setitem(
1594+
HTTPX_DEFAULTS, "timeout", httpx.Timeout(5.0, connect=1.0, pool=0.2)
1595+
)
1596+
1597+
async def fetch_async(args):
1598+
client = get_active_client()
1599+
assert client is not None, "sub-request must use the shared client"
1600+
resp = await client.get(url)
1601+
assert resp.status_code == 200
1602+
return pd.DataFrame({"id": [_atom_id(args)]}), resp
1603+
1604+
sites = _EIGHT_SINGLETON_SITES[:4] # 2 in flight + 2 queued, 2 waves
1605+
fetch = _async_chunked_fetch(monkeypatch, fetch_async, max_concurrent=2)
1606+
df, _ = fetch({"sites": sites})
1607+
assert len(df) == len(sites)
1608+
finally:
1609+
server.shutdown()
1610+
server.server_close()
1611+
thread.join(timeout=5)
1612+
1613+
14921614
def test_async_fan_out_runs_inside_running_event_loop(monkeypatch):
14931615
"""The parallel fan-out works even when the caller is already inside a
14941616
running event loop (Jupyter / async apps): the anyio blocking portal

0 commit comments

Comments
 (0)