Skip to content

Commit c713659

Browse files
thodson-usgsclaude
andcommitted
fix(waterdata): gate the chunked fan-out with a semaphore, not the pool
ChunkedCall._run dispatched every pending sub-request into one asyncio.gather and relied on the shared httpx.AsyncClient connection pool as the only concurrency throttle (max_connections sized from API_USGS_CONCURRENT). That collides with the client's pool-acquire timeout (60 s, from HTTPX_DEFAULTS): - Time a sub-request spends queued waiting for a pooled connection counts against that timeout, and a queued waiter's clock only resets when some response completes and httpcore reassigns the freed connection. So whenever every pooled connection stays busy past the timeout with no completion in between (a batch of large, slowly-streaming pages), the queued tail of the fan-out fails with httpx.PoolTimeout. Being a TransportError it burns the per-sub-request retry budget and ultimately surfaces as a bogus *resumable* ServiceInterrupted telling the user to wait for the upstream to recover — for a failure the server never saw. - Pool queueing also thunders: httpcore assigns a freed connection to every queued waiter, which all wake and race for it, the losers re-queuing — O(pending) wakeups per page completion under a large fan-out. Gate each fetch attempt with an asyncio.Semaphore sized from API_USGS_CONCURRENT instead; the connection pool is now merely sized to match so in-flight sub-requests reuse keepalive connections. Parked sub-requests wait on the semaphore before they touch the pool, so no transport clock runs while they wait and the pool timeout reverts to its protective role (a genuinely wedged checkout). The slot is acquired per attempt, so a sub-request sleeping off a retry backoff doesn't hold one. "unbounded" degenerates to a semaphore sized at the plan total, so there is a single gated code path. Observable behavior is otherwise unchanged: same plan, same sub-request order, same resume semantics. Tests: - in-flight high-water-mark probe (parametrized capped/unbounded) — the fetch-level concurrency equals the cap, not the plan total; the capped case fails on the pre-fix code. - real-localhost-server end-to-end test — mock transports bypass the pool, so this drives the chunker's shared client against a slow server past a scaled-down pool timeout; reproduces the spurious resumable ServiceInterrupted on the pre-fix code and completes on this branch. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent ff8f535 commit c713659

3 files changed

Lines changed: 199 additions & 42 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,18 @@
1010
single-step plan — ``ChunkedCall`` has one code path either way.
1111
1212
Concurrency: ``multi_value_chunked`` fans every pending sub-request out
13-
under one ``asyncio.gather`` sharing a single ``httpx.AsyncClient``;
14-
concurrency is bounded purely by the client's connection pool
15-
(``httpx.Limits(max_connections=N, max_keepalive_connections=N)``), so
16-
the pool throttles. ``API_USGS_CONCURRENT`` resolves
17-
``N``: an integer N > 1 caps connections at N; ``1`` pins a single
18-
connection (one request at a time); the literal ``unbounded`` removes
19-
the cap (``N=None``). The default (16) is the server-friendly sweet
20-
spot; higher values can trip USGS burst-protection 5xx in practice. The
21-
fan-out runs in a short-lived worker thread (an ``anyio`` blocking
22-
portal), so it works whether or not the caller is already inside an
23-
event loop (Jupyter / IPython / async apps).
13+
under one ``asyncio.gather`` sharing a single ``httpx.AsyncClient``. An
14+
``asyncio.Semaphore`` — not the client's connection pool, which is
15+
merely sized to match — caps the sub-requests in flight at ``N``; see
16+
:meth:`ChunkedCall._run` for why the gate must be the semaphore rather
17+
than the pool. ``API_USGS_CONCURRENT`` resolves ``N``: an integer N > 1
18+
allows N sub-requests in flight; ``1`` pins sequential dispatch (one
19+
request at a time); the literal ``unbounded`` lifts the cap. The
20+
default (16) is the server-friendly sweet spot; higher values can trip
21+
USGS burst-protection 5xx in practice. The fan-out runs in a
22+
short-lived worker thread (an ``anyio`` blocking portal), so it works
23+
whether or not the caller is already inside an event loop (Jupyter /
24+
IPython / async apps).
2425
2526
Retries: each sub-request is retried on a transient failure (429,
2627
5xx, connect/read timeout) with exponential backoff + full jitter,
@@ -130,10 +131,10 @@ def _read_concurrency_env() -> int | None:
130131
Returns
131132
-------
132133
int or None
133-
``1`` for a single connection; an integer >1 for bounded
134-
concurrency; ``None`` to disable the per-call cap entirely
135-
(``unbounded`` keyword). Unset → default of
136-
``_CONCURRENCY_DEFAULT``.
134+
``1`` for sequential dispatch (one sub-request at a time); an
135+
integer >1 for bounded concurrency; ``None`` to disable the
136+
per-call cap entirely (``unbounded`` keyword). Unset → default
137+
of ``_CONCURRENCY_DEFAULT``.
137138
"""
138139
raw = os.environ.get(_CONCURRENCY_ENV)
139140
if raw is None:
@@ -1307,9 +1308,9 @@ class ChunkedCall:
13071308
:class:`httpx.AsyncClient`, applies the failure-precedence rules, and
13081309
combines; :meth:`resume` drives it through an ``anyio`` blocking
13091310
portal so it works whether or not the caller is already inside an
1310-
event loop. Concurrency is bounded purely by the client's connection
1311-
pool, so a single connection (``API_USGS_CONCURRENT=1``) is just a
1312-
degenerate gather.
1311+
event loop. Concurrency is bounded by a per-run ``asyncio.Semaphore``
1312+
(see :meth:`_run`), so sequential dispatch
1313+
(``API_USGS_CONCURRENT=1``) is just a degenerate gather.
13131314
13141315
A ``ChunkedCall`` is created internally when a :class:`ChunkPlan`
13151316
executes; callers reach it via :attr:`ChunkInterrupted.call` on
@@ -1551,20 +1552,34 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
15511552
``.call``; ``exc.call.resume()`` then re-issues only the unfinished
15521553
indices through this same runner.
15531554
1554-
Concurrency is bounded by the client's connection pool —
1555-
``httpx.Limits(max_connections=N, max_keepalive_connections=N)``
1556-
where ``N = max_concurrent`` (``None`` for unbounded). The gather
1557-
dispatches *every* pending sub-request and the pool throttles, so
1558-
``N=1`` is just a single-connection gather (one request at a time)
1559-
and ``total <= 1`` is just a one-element gather.
1555+
The gather dispatches *every* pending sub-request at once, but an
1556+
``asyncio.Semaphore`` caps how many fetch concurrently at
1557+
``N = max_concurrent`` — ``None`` lifts the cap, ``N=1`` runs them
1558+
one at a time. The connection pool is sized to the same ``N``
1559+
(``httpx.Limits(max_connections=N, max_keepalive_connections=N)``)
1560+
so the in-flight fetches reuse keepalive connections.
1561+
1562+
The semaphore, not the pool, is deliberately the throttle. If the
1563+
pool throttled instead, the excess sub-requests would queue
1564+
*inside* httpx waiting for a connection, and that wait counts
1565+
against the pool-acquire timeout (60 s, from ``HTTPX_DEFAULTS``).
1566+
A batch of slow pages that keeps every connection busy past that
1567+
window would then trip ``httpx.PoolTimeout`` on the queued tail —
1568+
a purely client-side failure that burns the retry budget and
1569+
surfaces as a bogus resumable ``ServiceInterrupted``. Parking
1570+
sub-requests on the semaphore keeps them out of the pool until a
1571+
slot frees, so the pool timeout only fires for a genuinely stuck
1572+
connection. (It also sidesteps a thundering herd: a freed
1573+
connection wakes every pool waiter at once, not just one.)
1574+
15601575
The shared client is published on :data:`_chunked_client` so
15611576
the paginated-loop helpers reuse its connection pool.
15621577
15631578
Parameters
15641579
----------
15651580
max_concurrent : int or None
1566-
Maximum simultaneous connections (the pool cap). ``None``
1567-
disables the cap.
1581+
Maximum sub-requests in flight (the semaphore value, and the
1582+
connection-pool size). ``None`` lifts the cap entirely.
15681583
15691584
Returns
15701585
-------
@@ -1583,25 +1598,45 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
15831598
holding the sparse completed sub-requests; ``.call.resume()``
15841599
re-issues the unfinished ones.
15851600
"""
1586-
# ``httpx.Limits()`` defaults to ``max_connections=100`` — at higher
1587-
# concurrency the pool would silently bottleneck the fan-out behind
1588-
# that cap. Set it to the resolved concurrency so the pool *is* the
1589-
# throttle (``None`` for truly unbounded).
1601+
# The semaphore is the throttle; the pool is merely sized to match
1602+
# it (the ``httpx.Limits()`` defaults — ``max_connections=100``,
1603+
# keepalive 20 — would bottleneck or churn connections under a
1604+
# different cap). See the method docstring for why the gate can't
1605+
# be the pool itself. ``unbounded`` (``max_concurrent=None``) is a
1606+
# degenerate cap at the plan total — a semaphore that can never
1607+
# block — so gated is the only code path.
15901608
limits = httpx.Limits(
15911609
max_connections=max_concurrent, max_keepalive_connections=max_concurrent
15921610
)
1611+
semaphore = asyncio.Semaphore(
1612+
self.plan.total if max_concurrent is None else max_concurrent
1613+
)
15931614

15941615
async with httpx.AsyncClient(limits=limits, **HTTPX_DEFAULTS) as client:
15951616
with _publish(client):
15961617
reporter = _progress.current()
15971618
if reporter is not None:
15981619
reporter.set_chunks(self.plan.total)
15991620

1621+
async def fetch_gated(
1622+
args: dict[str, Any],
1623+
) -> tuple[pd.DataFrame, httpx.Response]:
1624+
"""One fetch attempt under the concurrency gate.
1625+
1626+
The slot is held for the attempt's full duration —
1627+
every page of a paginated sub-request — but acquired
1628+
per *attempt* (this is what ``_retry`` re-invokes), so
1629+
a sub-request sleeping off a retry backoff isn't
1630+
holding a slot while it isn't touching the server.
1631+
"""
1632+
async with semaphore:
1633+
return await self.fetch(args)
1634+
16001635
async def track(
16011636
index: int, args: dict[str, Any]
16021637
) -> tuple[pd.DataFrame, httpx.Response]:
16031638
"""One sub-request (with retry) + result-store + progress tick."""
1604-
result = await _retry(lambda: self.fetch(args), self.retry_policy)
1639+
result = await _retry(lambda: fetch_gated(args), self.retry_policy)
16051640
self._chunks[index] = result
16061641
if reporter is not None:
16071642
# Chunks finish out of order under gather, so tick the
@@ -1610,7 +1645,7 @@ async def track(
16101645
return result
16111646

16121647
# Dispatch every pending sub-request concurrently; the
1613-
# connection pool (``limits``) is the only throttle.
1648+
# semaphore (via ``fetch_gated``) is the only throttle.
16141649
# ``return_exceptions`` keeps completed pairs after a sibling
16151650
# fails, so partial state stays recoverable via :meth:`resume`.
16161651
# Failure precedence, in order:
@@ -1706,8 +1741,8 @@ def wrapper(
17061741
limit = _WATERDATA_URL_BYTE_LIMIT if url_limit is None else url_limit
17071742
plan = ChunkPlan(args, build_request, limit)
17081743
retry_policy = RetryPolicy.from_env()
1709-
# The connection-pool cap is resolved inside ``resume()`` from
1710-
# ``API_USGS_CONCURRENT``; ``1`` is a single-connection gather,
1744+
# The concurrency cap is resolved inside ``resume()`` from
1745+
# ``API_USGS_CONCURRENT``; ``1`` is a sequential gather,
17111746
# ``total <= 1`` a one-element gather — no special branch.
17121747
return plan.execute(fetch, retry_policy, finalize)
17131748

dataretrieval/waterdata/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1551,7 +1551,7 @@ async def _fetch_once(
15511551
and iterates the cartesian product. With no chunkable inputs the
15521552
decorator passes args through unchanged. The decorator gathers every
15531553
sub-request over one shared :class:`httpx.AsyncClient` (concurrency
1554-
bounded by the connection pool, sized from ``API_USGS_CONCURRENT``)
1554+
bounded by a semaphore, sized from ``API_USGS_CONCURRENT``)
15551555
and returns a *synchronous* wrapper, so ``get_ogc_data`` keeps calling
15561556
``_fetch_once(args, finalize=...)`` synchronously. The return shape is
15571557
``(frame, response)``.

tests/waterdata_chunking_test.py

Lines changed: 129 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import asyncio
1919
import concurrent.futures
2020
import datetime
21+
import http.server
2122
import sys
23+
import threading
24+
import time
2225
import warnings
2326
from unittest import mock
2427
from urllib.parse import quote_plus
@@ -32,6 +35,7 @@
3235
pytest.skip("Skip entire module on Python < 3.10", allow_module_level=True)
3336

3437
from dataretrieval.exceptions import DataRetrievalError
38+
from dataretrieval.utils import HTTPX_DEFAULTS
3539
from dataretrieval.waterdata import chunking as _chunking
3640
from dataretrieval.waterdata import utils as _utils
3741
from dataretrieval.waterdata.chunking import (
@@ -56,6 +60,7 @@
5660
_retry,
5761
_retryable,
5862
_safe_request_bytes,
63+
get_active_client,
5964
multi_value_chunked,
6065
)
6166
from dataretrieval.waterdata.utils import _DATE_RANGE_PARAMS, _construct_api_requests
@@ -1314,13 +1319,15 @@ def test_iter_sub_args_passthrough_yields_a_copy():
13141319
# --- async fan-out path ----------------------------------------------------
13151320
#
13161321
# Every sub-request is gathered over one ``httpx.AsyncClient`` and
1317-
# concurrency is bounded purely by that client's connection pool, sized
1318-
# from ``API_USGS_CONCURRENT``. The conftest's ``_pin_chunker_env``
1319-
# autouse pins ``API_USGS_CONCURRENT=1`` (a single connection) for the
1320-
# whole suite; each test below raises it so the gather can dispatch
1321-
# sub-requests under a wider pool. The decorated async fetcher is the
1322-
# SAME one used on both first-run and resume. No real ``httpx.AsyncClient``
1323-
# round-trip occurs (the fakes return mock data), even though
1322+
# concurrency is bounded by an ``asyncio.Semaphore`` sized from
1323+
# ``API_USGS_CONCURRENT`` (the client's connection pool is sized to
1324+
# match, but the semaphore is the throttle — see ``ChunkedCall._run``).
1325+
# The conftest's ``_pin_chunker_env`` autouse pins
1326+
# ``API_USGS_CONCURRENT=1`` (sequential dispatch) for the whole suite;
1327+
# each test below raises it so the gather can dispatch sub-requests
1328+
# under a wider cap. The decorated async fetcher is the SAME one used on
1329+
# both first-run and resume. No real ``httpx.AsyncClient`` round-trip
1330+
# occurs (the fakes return mock data), even though
13241331
# :meth:`ChunkedCall._run` opens one for pool management.
13251332

13261333

@@ -1489,6 +1496,121 @@ async def fetch(args):
14891496
assert len(df) == len(calls)
14901497

14911498

1499+
# Eight 20-char sites against ``url_limit=240`` (base 200): any two atoms
1500+
# joined overflow the 40-byte budget, so the planner lands on eight
1501+
# singleton sub-requests — enough fan-out to observe the concurrency gate.
1502+
_EIGHT_SINGLETON_SITES = [f"S{i}" * 10 for i in range(8)]
1503+
1504+
1505+
def _concurrency_probe(in_flight):
1506+
"""An async fetch that records the high-water mark of simultaneous
1507+
invocations in ``in_flight`` (keys ``now``/``max``). The ``sleep(0)``
1508+
yields to the loop while "in flight", so overlap is observable."""
1509+
1510+
async def fetch_async(args):
1511+
in_flight["now"] += 1
1512+
in_flight["max"] = max(in_flight["max"], in_flight["now"])
1513+
await asyncio.sleep(0)
1514+
in_flight["now"] -= 1
1515+
return pd.DataFrame({"id": [_atom_id(args)]}), _ok_response()
1516+
1517+
return fetch_async
1518+
1519+
1520+
@pytest.mark.parametrize(
1521+
("cap", "expected_high_water"),
1522+
[
1523+
pytest.param(2, 2, id="capped"),
1524+
pytest.param("unbounded", len(_EIGHT_SINGLETON_SITES), id="unbounded"),
1525+
],
1526+
)
1527+
def test_fan_out_in_flight_high_water_mark_is_the_cap(
1528+
monkeypatch, cap, expected_high_water
1529+
):
1530+
"""The fetch-level high-water mark of simultaneous sub-requests IS the
1531+
``API_USGS_CONCURRENT`` cap — genuine parallelism up to it, never past
1532+
it — and ``unbounded`` degenerates to every sub-request at once.
1533+
1534+
Regression: the cap used to be enforced only by the shared client's
1535+
connection-pool size, so sub-requests beyond it queued on connection
1536+
*acquisition*, subject to the client's pool-acquire timeout and
1537+
httpcore's thundering-herd reassignment (see ``ChunkedCall._run``).
1538+
The semaphore parks excess sub-requests before they touch the pool.
1539+
"""
1540+
in_flight = {"now": 0, "max": 0}
1541+
fetch = _async_chunked_fetch(
1542+
monkeypatch, _concurrency_probe(in_flight), max_concurrent=cap
1543+
)
1544+
1545+
df, _ = fetch({"sites": list(_EIGHT_SINGLETON_SITES)})
1546+
1547+
assert len(df) == len(_EIGHT_SINGLETON_SITES) # all sub-requests completed
1548+
assert in_flight["max"] == expected_high_water
1549+
1550+
1551+
def test_fan_out_outlives_pool_timeout_on_real_transport(monkeypatch):
1552+
"""End-to-end regression for the pool-timeout starvation bug: the
1553+
fan-out must survive every pooled connection staying busy past the
1554+
client's pool-acquire timeout (the stall mechanism is documented on
1555+
``ChunkedCall._run``; at production scale think a batch of large,
1556+
slowly-streaming pages).
1557+
1558+
Sub-requests here send real HTTP to a slow localhost server through
1559+
the chunker's shared client — fakes can't catch this, since
1560+
``MockTransport`` bypasses the connection pool. With the pool as the
1561+
only throttle, 2 connections busy for 0.35 s each and the 0.2 s pool
1562+
timeout pinned below, the 2 queued sub-requests sat out the full
1563+
timeout with no completion to reset their clocks →
1564+
``httpx.PoolTimeout`` → (retries exhausted, ``API_USGS_RETRIES=0``)
1565+
a spurious resumable ``ServiceInterrupted``. Gated by the semaphore,
1566+
queued sub-requests never touch the pool and the call completes.
1567+
"""
1568+
1569+
class _SlowHandler(http.server.BaseHTTPRequestHandler):
1570+
protocol_version = "HTTP/1.1" # keepalive, so pooled connections reuse
1571+
1572+
def do_GET(self):
1573+
time.sleep(0.35) # hold the connection busy past the pool timeout
1574+
body = b'{"ok": true}'
1575+
self.send_response(200)
1576+
self.send_header("Content-Length", str(len(body)))
1577+
self.end_headers()
1578+
self.wfile.write(body)
1579+
1580+
def log_message(self, *args): # keep pytest output clean
1581+
pass
1582+
1583+
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), _SlowHandler)
1584+
thread = threading.Thread(target=server.serve_forever, daemon=True)
1585+
thread.start()
1586+
# Everything past the thread start is in the try, so any setup failure
1587+
# (monkeypatch, decorator construction) still tears the server down.
1588+
try:
1589+
url = f"http://127.0.0.1:{server.server_address[1]}/"
1590+
1591+
# Scale the production pool timeout (see ``HTTPX_DEFAULTS``) down to
1592+
# 0.2 s so the pre-semaphore failure mode reproduces in test time.
1593+
monkeypatch.setitem(
1594+
HTTPX_DEFAULTS, "timeout", httpx.Timeout(5.0, connect=1.0, pool=0.2)
1595+
)
1596+
1597+
async def fetch_async(args):
1598+
client = get_active_client()
1599+
assert client is not None, "sub-request must use the shared client"
1600+
resp = await client.get(url)
1601+
assert resp.status_code == 200
1602+
return pd.DataFrame({"id": [_atom_id(args)]}), resp
1603+
1604+
sites = _EIGHT_SINGLETON_SITES[:4] # 2 in flight + 2 queued, 2 waves
1605+
fetch = _async_chunked_fetch(monkeypatch, fetch_async, max_concurrent=2)
1606+
df, _ = fetch({"sites": sites})
1607+
assert len(df) == len(sites)
1608+
finally:
1609+
server.shutdown()
1610+
server.server_close()
1611+
thread.join(timeout=5)
1612+
1613+
14921614
def test_async_fan_out_runs_inside_running_event_loop(monkeypatch):
14931615
"""The parallel fan-out works even when the caller is already inside a
14941616
running event loop (Jupyter / async apps): the anyio blocking portal

0 commit comments

Comments
 (0)