Skip to content

Commit 8a3885b

Browse files
thodson-usgsclaude
andcommitted
refactor(waterdata): single gated path and one canonical pool-timeout rationale
Post-review cleanup of the semaphore gate: - Collapse the None-vs-Semaphore fork: "unbounded" is now a degenerate cap at the plan total (a gate that can never block), so the gated fetch is the only code path — matching the chunker's no-special-branch style elsewhere. - Keep the full why-not-the-pool rationale only on ChunkedCall._run; the module docstring, inline comment, and test docstrings now point there instead of retelling it, and prose no longer hardcodes the 60 s pool-timeout literal that belongs to HTTPX_DEFAULTS. - Merge the two concurrency-probe tests into one parametrized test and trim the e2e stall repro from 6 sub-requests x 0.7 s to 4 x 0.5 s (~1 s saved per suite run, same >=2x timeout margin). Both regression cases still fail against the pre-fix chunker (verified by running them with HEAD~1's chunking.py). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent bd536c6 commit 8a3885b

2 files changed

Lines changed: 58 additions & 70 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,18 @@
1010
single-step plan — ``ChunkedCall`` has one code path either way.
1111
1212
Concurrency: ``multi_value_chunked`` fans every pending sub-request out
13-
under one ``asyncio.gather`` sharing a single ``httpx.AsyncClient``; an
14-
``asyncio.Semaphore`` caps the sub-requests in flight at ``N``, and the
15-
client's connection pool is sized to match
16-
(``httpx.Limits(max_connections=N, max_keepalive_connections=N)``) so
17-
the in-flight sub-requests reuse keepalive connections. The semaphore —
18-
not the pool — is the throttle: excess sub-requests park on the
19-
semaphore instead of queueing on connection acquisition, whose wait
20-
counts against the client's pool-acquire timeout (``HTTPX_DEFAULTS``)
21-
and would otherwise expire the queued tail of the fan-out whenever
22-
every connection stays busy past it (see :meth:`ChunkedCall._run`).
23-
``API_USGS_CONCURRENT`` resolves ``N``: an
24-
integer N > 1 allows N sub-requests in flight; ``1`` pins sequential
25-
dispatch (one request at a time); the literal ``unbounded`` removes
26-
the cap (``N=None``). The default (16) is the server-friendly sweet
27-
spot; higher values can trip USGS burst-protection 5xx in practice. The
28-
fan-out runs in a short-lived worker thread (an ``anyio`` blocking
29-
portal), so it works whether or not the caller is already inside an
30-
event loop (Jupyter / IPython / async apps).
13+
under one ``asyncio.gather`` sharing a single ``httpx.AsyncClient``. An
14+
``asyncio.Semaphore`` — not the client's connection pool, which is
15+
merely sized to match — caps the sub-requests in flight at ``N``; see
16+
:meth:`ChunkedCall._run` for why the gate must be the semaphore rather
17+
than the pool. ``API_USGS_CONCURRENT`` resolves ``N``: an integer N > 1
18+
allows N sub-requests in flight; ``1`` pins sequential dispatch (one
19+
request at a time); the literal ``unbounded`` lifts the cap. The
20+
default (16) is the server-friendly sweet spot; higher values can trip
21+
USGS burst-protection 5xx in practice. The fan-out runs in a
22+
short-lived worker thread (an ``anyio`` blocking portal), so it works
23+
whether or not the caller is already inside an event loop (Jupyter /
24+
IPython / async apps).
3125
3226
Retries: each sub-request is retried on a transient failure (429,
3327
5xx, connect/read timeout) with exponential backoff + full jitter,
@@ -1560,15 +1554,16 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
15601554
15611555
The gather dispatches *every* pending sub-request, but an
15621556
``asyncio.Semaphore`` gates the fetches so at most
1563-
``N = max_concurrent`` are in flight (``None`` for unbounded);
1557+
``N = max_concurrent`` are in flight (``None`` degenerates to a
1558+
cap at the plan total — a gate that can never block);
15641559
``N=1`` is just a sequential gather (one request at a time) and
15651560
``total <= 1`` is just a one-element gather. The client's
15661561
connection pool is sized to match
15671562
(``httpx.Limits(max_connections=N, max_keepalive_connections=N)``)
15681563
so in-flight sub-requests reuse keepalive connections. The
15691564
semaphore must be the throttle, not the pool, for two reasons.
15701565
First, time spent queued on connection acquisition counts
1571-
against the client's pool-acquire timeout (60 s via
1566+
against the client's pool-acquire timeout (from
15721567
``HTTPX_DEFAULTS``); a queued waiter's clock only resets when
15731568
some response completes and httpcore reassigns the freed
15741569
connection. So whenever every pooled connection stays busy past
@@ -1592,7 +1587,8 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
15921587
----------
15931588
max_concurrent : int or None
15941589
Maximum sub-requests in flight (the semaphore value, and the
1595-
connection-pool size). ``None`` disables the cap.
1590+
connection-pool size). ``None`` lifts the cap: the gate
1591+
degenerates to the plan total and the pool goes unbounded.
15961592
15971593
Returns
15981594
-------
@@ -1614,14 +1610,15 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
16141610
# The semaphore is the throttle; the pool is merely sized to match
16151611
# it (the ``httpx.Limits()`` defaults — ``max_connections=100``,
16161612
# keepalive 20 — would bottleneck or churn connections under a
1617-
# different cap). Why the gate can't be the pool itself is in the
1618-
# method docstring: pool-acquire waits count against the client's
1619-
# pool timeout, semaphore waits don't.
1613+
# different cap). See the method docstring for why the gate can't
1614+
# be the pool itself. ``unbounded`` (``max_concurrent=None``) is a
1615+
# degenerate cap at the plan total — a semaphore that can never
1616+
# block — so gated is the only code path.
16201617
limits = httpx.Limits(
16211618
max_connections=max_concurrent, max_keepalive_connections=max_concurrent
16221619
)
1623-
semaphore = (
1624-
None if max_concurrent is None else asyncio.Semaphore(max_concurrent)
1620+
semaphore = asyncio.Semaphore(
1621+
self.plan.total if max_concurrent is None else max_concurrent
16251622
)
16261623

16271624
async with httpx.AsyncClient(limits=limits, **HTTPX_DEFAULTS) as client:
@@ -1641,8 +1638,6 @@ async def fetch_gated(
16411638
a sub-request sleeping off a retry backoff isn't
16421639
holding a slot while it isn't touching the server.
16431640
"""
1644-
if semaphore is None:
1645-
return await self.fetch(args)
16461641
async with semaphore:
16471642
return await self.fetch(args)
16481643

tests/waterdata_chunking_test.py

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,67 +1517,60 @@ async def fetch_async(args):
15171517
return fetch_async
15181518

15191519

1520-
def test_fan_out_in_flight_sub_requests_never_exceed_cap(monkeypatch):
1521-
"""At most ``API_USGS_CONCURRENT`` sub-requests are in flight at once,
1522-
while still running genuinely in parallel up to that cap.
1520+
@pytest.mark.parametrize(
1521+
("cap", "expected_high_water"),
1522+
[
1523+
pytest.param(2, 2, id="capped"),
1524+
pytest.param("unbounded", len(_EIGHT_SINGLETON_SITES), id="unbounded"),
1525+
],
1526+
)
1527+
def test_fan_out_in_flight_high_water_mark_is_the_cap(
1528+
monkeypatch, cap, expected_high_water
1529+
):
1530+
"""The fetch-level high-water mark of simultaneous sub-requests IS the
1531+
``API_USGS_CONCURRENT`` cap — genuine parallelism up to it, never past
1532+
it — and ``unbounded`` degenerates to every sub-request at once.
15231533
15241534
Regression: the cap used to be enforced only by the shared client's
1525-
connection-pool size, so every sub-request beyond it queued on
1526-
connection *acquisition* — subject to the client's pool-acquire
1527-
timeout and httpcore's thundering-herd reassignment (see
1528-
``ChunkedCall._run``). The semaphore parks excess sub-requests
1529-
before they touch the pool, which this test observes directly: the
1530-
fetch-level high-water mark IS the cap, not the plan's total.
1535+
connection-pool size, so sub-requests beyond it queued on connection
1536+
*acquisition*, subject to the client's pool-acquire timeout and
1537+
httpcore's thundering-herd reassignment (see ``ChunkedCall._run``).
1538+
The semaphore parks excess sub-requests before they touch the pool.
15311539
"""
15321540
in_flight = {"now": 0, "max": 0}
15331541
fetch = _async_chunked_fetch(
1534-
monkeypatch, _concurrency_probe(in_flight), max_concurrent=2
1542+
monkeypatch, _concurrency_probe(in_flight), max_concurrent=cap
15351543
)
15361544

15371545
df, _ = fetch({"sites": list(_EIGHT_SINGLETON_SITES)})
15381546

15391547
assert len(df) == len(_EIGHT_SINGLETON_SITES) # all sub-requests completed
1540-
assert in_flight["max"] == 2 # parallel, but never beyond the cap
1541-
1542-
1543-
def test_fan_out_unbounded_dispatches_every_sub_request_at_once(monkeypatch):
1544-
"""``API_USGS_CONCURRENT=unbounded`` disables the gate: every pending
1545-
sub-request is in flight simultaneously."""
1546-
in_flight = {"now": 0, "max": 0}
1547-
fetch = _async_chunked_fetch(
1548-
monkeypatch, _concurrency_probe(in_flight), max_concurrent="unbounded"
1549-
)
1550-
1551-
df, _ = fetch({"sites": list(_EIGHT_SINGLETON_SITES)})
1552-
1553-
assert len(df) == len(_EIGHT_SINGLETON_SITES)
1554-
assert in_flight["max"] == len(_EIGHT_SINGLETON_SITES)
1548+
assert in_flight["max"] == expected_high_water
15551549

15561550

15571551
def test_fan_out_outlives_pool_timeout_on_real_transport(monkeypatch):
15581552
"""End-to-end regression for the pool-timeout starvation bug: the
15591553
fan-out must survive every pooled connection staying busy past the
1560-
client's pool-acquire timeout.
1554+
client's pool-acquire timeout (the stall mechanism is documented on
1555+
``ChunkedCall._run``; at production scale think a batch of large,
1556+
slowly-streaming pages).
15611557
15621558
Sub-requests here send real HTTP to a slow localhost server through
1563-
the chunker's shared client (fakes can't catch this —
1564-
``MockTransport`` bypasses the connection pool). A queued waiter's
1565-
pool-timeout clock only resets when some response completes, so the
1566-
repro needs a *stall*: with the pool as the only throttle, 2
1567-
connections busy for 0.7 s each and a 0.3 s pool timeout pinned
1568-
below, the 4 queued sub-requests sat through 0.3 s with no
1569-
completion → ``httpx.PoolTimeout`` → (retries exhausted,
1570-
``API_USGS_RETRIES=0``) a spurious resumable ``ServiceInterrupted``.
1571-
Gated by the semaphore, queued sub-requests never touch the pool
1572-
and the call completes. (Production scale: 60 s timeout, tripped by
1573-
a batch of large, slowly-streaming pages.)
1559+
the chunker's shared client — fakes can't catch this, since
1560+
``MockTransport`` bypasses the connection pool. With the pool as the
1561+
only throttle, 2 connections busy for 0.5 s each and the 0.25 s pool
1562+
timeout pinned below, the 2 queued sub-requests sat out the full
1563+
timeout with no completion to reset their clocks →
1564+
``httpx.PoolTimeout`` → (retries exhausted, ``API_USGS_RETRIES=0``)
1565+
a spurious resumable ``ServiceInterrupted``. Gated by the semaphore,
1566+
queued sub-requests never touch the pool and the call completes.
15741567
"""
15751568

15761569
class _SlowHandler(http.server.BaseHTTPRequestHandler):
15771570
protocol_version = "HTTP/1.1" # keepalive, so pooled connections reuse
15781571

15791572
def do_GET(self):
1580-
time.sleep(0.7) # hold the connection busy past the pool timeout
1573+
time.sleep(0.5) # hold the connection busy past the pool timeout
15811574
body = b'{"ok": true}'
15821575
self.send_response(200)
15831576
self.send_header("Content-Length", str(len(body)))
@@ -1592,10 +1585,10 @@ def log_message(self, *args): # keep pytest output clean
15921585
thread.start()
15931586
url = f"http://127.0.0.1:{server.server_address[1]}/"
15941587

1595-
# Scale the production 60 s pool timeout down to 0.3 s so the
1596-
# pre-semaphore failure mode reproduces in test time.
1588+
# Scale the production pool timeout (see ``HTTPX_DEFAULTS``) down to
1589+
# 0.25 s so the pre-semaphore failure mode reproduces in test time.
15971590
monkeypatch.setitem(
1598-
HTTPX_DEFAULTS, "timeout", httpx.Timeout(5.0, connect=1.0, pool=0.3)
1591+
HTTPX_DEFAULTS, "timeout", httpx.Timeout(5.0, connect=1.0, pool=0.25)
15991592
)
16001593

16011594
async def fetch_async(args):
@@ -1605,7 +1598,7 @@ async def fetch_async(args):
16051598
assert resp.status_code == 200
16061599
return pd.DataFrame({"id": [_atom_id(args)]}), resp
16071600

1608-
sites = _EIGHT_SINGLETON_SITES[:6] # 2 in flight + 4 queued, 3 waves
1601+
sites = _EIGHT_SINGLETON_SITES[:4] # 2 in flight + 2 queued, 2 waves
16091602
try:
16101603
fetch = _async_chunked_fetch(monkeypatch, fetch_async, max_concurrent=2)
16111604
df, _ = fetch({"sites": sites})

0 commit comments

Comments
 (0)