Skip to content

Commit 1c676b8

Browse files
thodson-usgsclaude
andcommitted
fix(waterdata): gate the chunked fan-out with a semaphore, not the pool
ChunkedCall._run dispatched every pending sub-request into one asyncio.gather and relied on the shared httpx.AsyncClient connection pool as the only concurrency throttle (max_connections sized from API_USGS_CONCURRENT). That collides with the client's pool-acquire timeout (60 s, from HTTPX_DEFAULTS): - Time a sub-request spends queued waiting for a pooled connection counts against that timeout, and a queued waiter's clock only resets when some response completes and httpcore reassigns the freed connection. So whenever every pooled connection stays busy past the timeout with no completion in between (a batch of large, slowly-streaming pages), the queued tail of the fan-out fails with httpx.PoolTimeout. Being a TransportError it burns the per-sub-request retry budget and ultimately surfaces as a bogus *resumable* ServiceInterrupted telling the user to wait for the upstream to recover — for a failure the server never saw. - Pool queueing also thunders: httpcore assigns a freed connection to every queued waiter, which all wake and race for it, the losers re-queuing — O(pending) wakeups per page completion under a large fan-out. Gate each fetch attempt with an asyncio.Semaphore sized from API_USGS_CONCURRENT instead; the connection pool is now merely sized to match so in-flight sub-requests reuse keepalive connections. Parked sub-requests wait on the semaphore before they touch the pool, so no transport clock runs while they wait and the pool timeout reverts to its protective role (a genuinely wedged checkout). The slot is acquired per attempt, so a sub-request sleeping off a retry backoff doesn't hold one. "unbounded" degenerates to a semaphore sized at the plan total, so there is a single gated code path. Observable behavior is otherwise unchanged: same plan, same sub-request order, same resume semantics. Tests: - in-flight high-water-mark probe (parametrized capped/unbounded) — the fetch-level concurrency equals the cap, not the plan total; the capped case fails on the pre-fix code. - real-localhost-server end-to-end test — mock transports bypass the pool, so this drives the chunker's shared client against a slow server past a scaled-down pool timeout; reproduces the spurious resumable ServiceInterrupted on the pre-fix code and completes on this branch. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent ff8f535 commit 1c676b8

3 files changed

Lines changed: 206 additions & 42 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 76 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,18 @@
1010
single-step plan — ``ChunkedCall`` has one code path either way.
1111
1212
Concurrency: ``multi_value_chunked`` fans every pending sub-request out
13-
under one ``asyncio.gather`` sharing a single ``httpx.AsyncClient``;
14-
concurrency is bounded purely by the client's connection pool
15-
(``httpx.Limits(max_connections=N, max_keepalive_connections=N)``), so
16-
the pool throttles. ``API_USGS_CONCURRENT`` resolves
17-
``N``: an integer N > 1 caps connections at N; ``1`` pins a single
18-
connection (one request at a time); the literal ``unbounded`` removes
19-
the cap (``N=None``). The default (16) is the server-friendly sweet
20-
spot; higher values can trip USGS burst-protection 5xx in practice. The
21-
fan-out runs in a short-lived worker thread (an ``anyio`` blocking
22-
portal), so it works whether or not the caller is already inside an
23-
event loop (Jupyter / IPython / async apps).
13+
under one ``asyncio.gather`` sharing a single ``httpx.AsyncClient``. An
14+
``asyncio.Semaphore`` — not the client's connection pool, which is
15+
merely sized to match — caps the sub-requests in flight at ``N``; see
16+
:meth:`ChunkedCall._run` for why the gate must be the semaphore rather
17+
than the pool. ``API_USGS_CONCURRENT`` resolves ``N``: an integer N > 1
18+
allows N sub-requests in flight; ``1`` pins sequential dispatch (one
19+
request at a time); the literal ``unbounded`` lifts the cap. The
20+
default (16) is the server-friendly sweet spot; higher values can trip
21+
USGS burst-protection 5xx in practice. The fan-out runs in a
22+
short-lived worker thread (an ``anyio`` blocking portal), so it works
23+
whether or not the caller is already inside an event loop (Jupyter /
24+
IPython / async apps).
2425
2526
Retries: each sub-request is retried on a transient failure (429,
2627
5xx, connect/read timeout) with exponential backoff + full jitter,
@@ -130,10 +131,10 @@ def _read_concurrency_env() -> int | None:
130131
Returns
131132
-------
132133
int or None
133-
``1`` for a single connection; an integer >1 for bounded
134-
concurrency; ``None`` to disable the per-call cap entirely
135-
(``unbounded`` keyword). Unset → default of
136-
``_CONCURRENCY_DEFAULT``.
134+
``1`` for sequential dispatch (one sub-request at a time); an
135+
integer >1 for bounded concurrency; ``None`` to disable the
136+
per-call cap entirely (``unbounded`` keyword). Unset → default
137+
of ``_CONCURRENCY_DEFAULT``.
137138
"""
138139
raw = os.environ.get(_CONCURRENCY_ENV)
139140
if raw is None:
@@ -1307,9 +1308,9 @@ class ChunkedCall:
13071308
:class:`httpx.AsyncClient`, applies the failure-precedence rules, and
13081309
combines; :meth:`resume` drives it through an ``anyio`` blocking
13091310
portal so it works whether or not the caller is already inside an
1310-
event loop. Concurrency is bounded purely by the client's connection
1311-
pool, so a single connection (``API_USGS_CONCURRENT=1``) is just a
1312-
degenerate gather.
1311+
event loop. Concurrency is bounded by a per-run ``asyncio.Semaphore``
1312+
(see :meth:`_run`), so sequential dispatch
1313+
(``API_USGS_CONCURRENT=1``) is just a degenerate gather.
13131314
13141315
A ``ChunkedCall`` is created internally when a :class:`ChunkPlan`
13151316
executes; callers reach it via :attr:`ChunkInterrupted.call` on
@@ -1551,20 +1552,41 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
15511552
``.call``; ``exc.call.resume()`` then re-issues only the unfinished
15521553
indices through this same runner.
15531554
1554-
Concurrency is bounded by the client's connection pool —
1555-
``httpx.Limits(max_connections=N, max_keepalive_connections=N)``
1556-
where ``N = max_concurrent`` (``None`` for unbounded). The gather
1557-
dispatches *every* pending sub-request and the pool throttles, so
1558-
``N=1`` is just a single-connection gather (one request at a time)
1559-
and ``total <= 1`` is just a one-element gather.
1555+
The gather dispatches *every* pending sub-request, but an
1556+
``asyncio.Semaphore`` gates the fetches so at most
1557+
``N = max_concurrent`` are in flight (``None`` lifts the cap);
1558+
``N=1`` is just a sequential gather (one request at a time) and
1559+
``total <= 1`` is just a one-element gather. The client's
1560+
connection pool is sized to match
1561+
(``httpx.Limits(max_connections=N, max_keepalive_connections=N)``)
1562+
so in-flight sub-requests reuse keepalive connections. The
1563+
semaphore must be the throttle, not the pool, for two reasons.
1564+
First, time spent queued on connection acquisition counts
1565+
against the client's pool-acquire timeout (from
1566+
``HTTPX_DEFAULTS``); a queued waiter's clock only resets when
1567+
some response completes and httpcore reassigns the freed
1568+
connection. So whenever every pooled connection stays busy past
1569+
the timeout with no completion in between — a batch of large,
1570+
slowly-streaming pages is enough — the queued tail of the
1571+
fan-out trips spurious ``httpx.PoolTimeout``: the retry budget
1572+
burns on a client-side artifact and what surfaces is a bogus
1573+
resumable ``ServiceInterrupted``. Second, pool queueing
1574+
thunders: httpcore assigns a freed connection to *every* queued
1575+
waiter, which all wake and race for it, the losers bouncing
1576+
back into a fresh wait — O(pending) wakeups per page completion
1577+
under a large fan-out. Parked on the semaphore instead, a
1578+
sub-request hasn't touched the pool yet: no transport clock
1579+
runs, releases wake one waiter each, and the pool timeout
1580+
reverts to its protective role (a genuinely wedged connection
1581+
checkout).
15601582
The shared client is published on :data:`_chunked_client` so
15611583
the paginated-loop helpers reuse its connection pool.
15621584
15631585
Parameters
15641586
----------
15651587
max_concurrent : int or None
1566-
Maximum simultaneous connections (the pool cap). ``None``
1567-
disables the cap.
1588+
Maximum sub-requests in flight (the semaphore value, and the
1589+
connection-pool size). ``None`` lifts the cap entirely.
15681590
15691591
Returns
15701592
-------
@@ -1583,25 +1605,45 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
15831605
holding the sparse completed sub-requests; ``.call.resume()``
15841606
re-issues the unfinished ones.
15851607
"""
1586-
# ``httpx.Limits()`` defaults to ``max_connections=100`` — at higher
1587-
# concurrency the pool would silently bottleneck the fan-out behind
1588-
# that cap. Set it to the resolved concurrency so the pool *is* the
1589-
# throttle (``None`` for truly unbounded).
1608+
# The semaphore is the throttle; the pool is merely sized to match
1609+
# it (the ``httpx.Limits()`` defaults — ``max_connections=100``,
1610+
# keepalive 20 — would bottleneck or churn connections under a
1611+
# different cap). See the method docstring for why the gate can't
1612+
# be the pool itself. ``unbounded`` (``max_concurrent=None``) is a
1613+
# degenerate cap at the plan total — a semaphore that can never
1614+
# block — so gated is the only code path.
15901615
limits = httpx.Limits(
15911616
max_connections=max_concurrent, max_keepalive_connections=max_concurrent
15921617
)
1618+
semaphore = asyncio.Semaphore(
1619+
self.plan.total if max_concurrent is None else max_concurrent
1620+
)
15931621

15941622
async with httpx.AsyncClient(limits=limits, **HTTPX_DEFAULTS) as client:
15951623
with _publish(client):
15961624
reporter = _progress.current()
15971625
if reporter is not None:
15981626
reporter.set_chunks(self.plan.total)
15991627

1628+
async def fetch_gated(
1629+
args: dict[str, Any],
1630+
) -> tuple[pd.DataFrame, httpx.Response]:
1631+
"""One fetch attempt under the concurrency gate.
1632+
1633+
The slot is held for the attempt's full duration —
1634+
every page of a paginated sub-request — but acquired
1635+
per *attempt* (this is what ``_retry`` re-invokes), so
1636+
a sub-request sleeping off a retry backoff isn't
1637+
holding a slot while it isn't touching the server.
1638+
"""
1639+
async with semaphore:
1640+
return await self.fetch(args)
1641+
16001642
async def track(
16011643
index: int, args: dict[str, Any]
16021644
) -> tuple[pd.DataFrame, httpx.Response]:
16031645
"""One sub-request (with retry) + result-store + progress tick."""
1604-
result = await _retry(lambda: self.fetch(args), self.retry_policy)
1646+
result = await _retry(lambda: fetch_gated(args), self.retry_policy)
16051647
self._chunks[index] = result
16061648
if reporter is not None:
16071649
# Chunks finish out of order under gather, so tick the
@@ -1610,7 +1652,7 @@ async def track(
16101652
return result
16111653

16121654
# Dispatch every pending sub-request concurrently; the
1613-
# connection pool (``limits``) is the only throttle.
1655+
# semaphore (via ``fetch_gated``) is the only throttle.
16141656
# ``return_exceptions`` keeps completed pairs after a sibling
16151657
# fails, so partial state stays recoverable via :meth:`resume`.
16161658
# Failure precedence, in order:
@@ -1706,8 +1748,8 @@ def wrapper(
17061748
limit = _WATERDATA_URL_BYTE_LIMIT if url_limit is None else url_limit
17071749
plan = ChunkPlan(args, build_request, limit)
17081750
retry_policy = RetryPolicy.from_env()
1709-
# The connection-pool cap is resolved inside ``resume()`` from
1710-
# ``API_USGS_CONCURRENT``; ``1`` is a single-connection gather,
1751+
# The concurrency cap is resolved inside ``resume()`` from
1752+
# ``API_USGS_CONCURRENT``; ``1`` is a sequential gather,
17111753
# ``total <= 1`` a one-element gather — no special branch.
17121754
return plan.execute(fetch, retry_policy, finalize)
17131755

dataretrieval/waterdata/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1551,7 +1551,7 @@ async def _fetch_once(
15511551
and iterates the cartesian product. With no chunkable inputs the
15521552
decorator passes args through unchanged. The decorator gathers every
15531553
sub-request over one shared :class:`httpx.AsyncClient` (concurrency
1554-
bounded by the connection pool, sized from ``API_USGS_CONCURRENT``)
1554+
bounded by a semaphore, sized from ``API_USGS_CONCURRENT``)
15551555
and returns a *synchronous* wrapper, so ``get_ogc_data`` keeps calling
15561556
``_fetch_once(args, finalize=...)`` synchronously. The return shape is
15571557
``(frame, response)``.

tests/waterdata_chunking_test.py

Lines changed: 129 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import asyncio
1919
import concurrent.futures
2020
import datetime
21+
import http.server
2122
import sys
23+
import threading
24+
import time
2225
import warnings
2326
from unittest import mock
2427
from urllib.parse import quote_plus
@@ -32,6 +35,7 @@
3235
pytest.skip("Skip entire module on Python < 3.10", allow_module_level=True)
3336

3437
from dataretrieval.exceptions import DataRetrievalError
38+
from dataretrieval.utils import HTTPX_DEFAULTS
3539
from dataretrieval.waterdata import chunking as _chunking
3640
from dataretrieval.waterdata import utils as _utils
3741
from dataretrieval.waterdata.chunking import (
@@ -56,6 +60,7 @@
5660
_retry,
5761
_retryable,
5862
_safe_request_bytes,
63+
get_active_client,
5964
multi_value_chunked,
6065
)
6166
from dataretrieval.waterdata.utils import _DATE_RANGE_PARAMS, _construct_api_requests
@@ -1314,13 +1319,15 @@ def test_iter_sub_args_passthrough_yields_a_copy():
13141319
# --- async fan-out path ----------------------------------------------------
13151320
#
13161321
# Every sub-request is gathered over one ``httpx.AsyncClient`` and
1317-
# concurrency is bounded purely by that client's connection pool, sized
1318-
# from ``API_USGS_CONCURRENT``. The conftest's ``_pin_chunker_env``
1319-
# autouse pins ``API_USGS_CONCURRENT=1`` (a single connection) for the
1320-
# whole suite; each test below raises it so the gather can dispatch
1321-
# sub-requests under a wider pool. The decorated async fetcher is the
1322-
# SAME one used on both first-run and resume. No real ``httpx.AsyncClient``
1323-
# round-trip occurs (the fakes return mock data), even though
1322+
# concurrency is bounded by an ``asyncio.Semaphore`` sized from
1323+
# ``API_USGS_CONCURRENT`` (the client's connection pool is sized to
1324+
# match, but the semaphore is the throttle — see ``ChunkedCall._run``).
1325+
# The conftest's ``_pin_chunker_env`` autouse pins
1326+
# ``API_USGS_CONCURRENT=1`` (sequential dispatch) for the whole suite;
1327+
# each test below raises it so the gather can dispatch sub-requests
1328+
# under a wider cap. The decorated async fetcher is the SAME one used on
1329+
# both first-run and resume. No real ``httpx.AsyncClient`` round-trip
1330+
# occurs (the fakes return mock data), even though
13241331
# :meth:`ChunkedCall._run` opens one for pool management.
13251332

13261333

@@ -1489,6 +1496,121 @@ async def fetch(args):
14891496
assert len(df) == len(calls)
14901497

14911498

1499+
# Eight 20-char sites against ``url_limit=240`` (base 200): any two atoms
1500+
# joined overflow the 40-byte budget, so the planner lands on eight
1501+
# singleton sub-requests — enough fan-out to observe the concurrency gate.
1502+
_EIGHT_SINGLETON_SITES = [f"S{i}" * 10 for i in range(8)]
1503+
1504+
1505+
def _concurrency_probe(in_flight):
1506+
"""An async fetch that records the high-water mark of simultaneous
1507+
invocations in ``in_flight`` (keys ``now``/``max``). The ``sleep(0)``
1508+
yields to the loop while "in flight", so overlap is observable."""
1509+
1510+
async def fetch_async(args):
1511+
in_flight["now"] += 1
1512+
in_flight["max"] = max(in_flight["max"], in_flight["now"])
1513+
await asyncio.sleep(0)
1514+
in_flight["now"] -= 1
1515+
return pd.DataFrame({"id": [_atom_id(args)]}), _ok_response()
1516+
1517+
return fetch_async
1518+
1519+
1520+
@pytest.mark.parametrize(
1521+
("cap", "expected_high_water"),
1522+
[
1523+
pytest.param(2, 2, id="capped"),
1524+
pytest.param("unbounded", len(_EIGHT_SINGLETON_SITES), id="unbounded"),
1525+
],
1526+
)
1527+
def test_fan_out_in_flight_high_water_mark_is_the_cap(
1528+
monkeypatch, cap, expected_high_water
1529+
):
1530+
"""The fetch-level high-water mark of simultaneous sub-requests IS the
1531+
``API_USGS_CONCURRENT`` cap — genuine parallelism up to it, never past
1532+
it — and ``unbounded`` degenerates to every sub-request at once.
1533+
1534+
Regression: the cap used to be enforced only by the shared client's
1535+
connection-pool size, so sub-requests beyond it queued on connection
1536+
*acquisition*, subject to the client's pool-acquire timeout and
1537+
httpcore's thundering-herd reassignment (see ``ChunkedCall._run``).
1538+
The semaphore parks excess sub-requests before they touch the pool.
1539+
"""
1540+
in_flight = {"now": 0, "max": 0}
1541+
fetch = _async_chunked_fetch(
1542+
monkeypatch, _concurrency_probe(in_flight), max_concurrent=cap
1543+
)
1544+
1545+
df, _ = fetch({"sites": list(_EIGHT_SINGLETON_SITES)})
1546+
1547+
assert len(df) == len(_EIGHT_SINGLETON_SITES) # all sub-requests completed
1548+
assert in_flight["max"] == expected_high_water
1549+
1550+
1551+
def test_fan_out_outlives_pool_timeout_on_real_transport(monkeypatch):
1552+
"""End-to-end regression for the pool-timeout starvation bug: the
1553+
fan-out must survive every pooled connection staying busy past the
1554+
client's pool-acquire timeout (the stall mechanism is documented on
1555+
``ChunkedCall._run``; at production scale think a batch of large,
1556+
slowly-streaming pages).
1557+
1558+
Sub-requests here send real HTTP to a slow localhost server through
1559+
the chunker's shared client — fakes can't catch this, since
1560+
``MockTransport`` bypasses the connection pool. With the pool as the
1561+
only throttle, 2 connections busy for 0.35 s each and the 0.2 s pool
1562+
timeout pinned below, the 2 queued sub-requests sat out the full
1563+
timeout with no completion to reset their clocks →
1564+
``httpx.PoolTimeout`` → (retries exhausted, ``API_USGS_RETRIES=0``)
1565+
a spurious resumable ``ServiceInterrupted``. Gated by the semaphore,
1566+
queued sub-requests never touch the pool and the call completes.
1567+
"""
1568+
1569+
class _SlowHandler(http.server.BaseHTTPRequestHandler):
1570+
protocol_version = "HTTP/1.1" # keepalive, so pooled connections reuse
1571+
1572+
def do_GET(self):
1573+
time.sleep(0.35) # hold the connection busy past the pool timeout
1574+
body = b'{"ok": true}'
1575+
self.send_response(200)
1576+
self.send_header("Content-Length", str(len(body)))
1577+
self.end_headers()
1578+
self.wfile.write(body)
1579+
1580+
def log_message(self, *args): # keep pytest output clean
1581+
pass
1582+
1583+
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), _SlowHandler)
1584+
thread = threading.Thread(target=server.serve_forever, daemon=True)
1585+
thread.start()
1586+
# Everything past the thread start is in the try, so any setup failure
1587+
# (monkeypatch, decorator construction) still tears the server down.
1588+
try:
1589+
url = f"http://127.0.0.1:{server.server_address[1]}/"
1590+
1591+
# Scale the production pool timeout (see ``HTTPX_DEFAULTS``) down to
1592+
# 0.2 s so the pre-semaphore failure mode reproduces in test time.
1593+
monkeypatch.setitem(
1594+
HTTPX_DEFAULTS, "timeout", httpx.Timeout(5.0, connect=1.0, pool=0.2)
1595+
)
1596+
1597+
async def fetch_async(args):
1598+
client = get_active_client()
1599+
assert client is not None, "sub-request must use the shared client"
1600+
resp = await client.get(url)
1601+
assert resp.status_code == 200
1602+
return pd.DataFrame({"id": [_atom_id(args)]}), resp
1603+
1604+
sites = _EIGHT_SINGLETON_SITES[:4] # 2 in flight + 2 queued, 2 waves
1605+
fetch = _async_chunked_fetch(monkeypatch, fetch_async, max_concurrent=2)
1606+
df, _ = fetch({"sites": sites})
1607+
assert len(df) == len(sites)
1608+
finally:
1609+
server.shutdown()
1610+
server.server_close()
1611+
thread.join(timeout=5)
1612+
1613+
14921614
def test_async_fan_out_runs_inside_running_event_loop(monkeypatch):
14931615
"""The parallel fan-out works even when the caller is already inside a
14941616
running event loop (Jupyter / async apps): the anyio blocking portal

0 commit comments

Comments
 (0)