fix(waterdata): gate the chunked fan-out with a semaphore, not the connection pool#322
Draft
thodson-usgs wants to merge 1 commit into
Draft
fix(waterdata): gate the chunked fan-out with a semaphore, not the connection pool#322thodson-usgs wants to merge 1 commit into
thodson-usgs wants to merge 1 commit into
Conversation
ChunkedCall._run dispatched every pending sub-request into one asyncio.gather and relied on the shared httpx.AsyncClient connection pool as the only concurrency throttle (max_connections sized from API_USGS_CONCURRENT). That collides with the client's pool-acquire timeout (60 s, from HTTPX_DEFAULTS): - Time a sub-request spends queued waiting for a pooled connection counts against that timeout, and a queued waiter's clock only resets when some response completes and httpcore reassigns the freed connection. So whenever every pooled connection stays busy past the timeout with no completion in between (a batch of large, slowly-streaming pages), the queued tail of the fan-out fails with httpx.PoolTimeout. Being a TransportError it burns the per-sub-request retry budget and ultimately surfaces as a bogus *resumable* ServiceInterrupted telling the user to wait for the upstream to recover — for a failure the server never saw. - Pool queueing also thunders: httpcore assigns a freed connection to every queued waiter, which all wake and race for it, the losers re-queuing — O(pending) wakeups per page completion under a large fan-out. Gate each fetch attempt with an asyncio.Semaphore sized from API_USGS_CONCURRENT instead; the connection pool is now merely sized to match so in-flight sub-requests reuse keepalive connections. Parked sub-requests wait on the semaphore before they touch the pool, so no transport clock runs while they wait and the pool timeout reverts to its protective role (a genuinely wedged checkout). The slot is acquired per attempt, so a sub-request sleeping off a retry backoff doesn't hold one. "unbounded" degenerates to a semaphore sized at the plan total, so there is a single gated code path. Observable behavior is otherwise unchanged: same plan, same sub-request order, same resume semantics. Tests: - in-flight high-water-mark probe (parametrized capped/unbounded) — the fetch-level concurrency equals the cap, not the plan total; the capped case fails on the pre-fix code. - real-localhost-server end-to-end test — mock transports bypass the pool, so this drives the chunker's shared client against a slow server past a scaled-down pool timeout; reproduces the spurious resumable ServiceInterrupted on the pre-fix code and completes on this branch. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
f4ecc31 to
1c676b8
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
ChunkedCall._rundispatches every pending sub-request into oneasyncio.gatherand relies on the sharedhttpx.AsyncClient's connection pool as the only concurrency throttle (max_connections = API_USGS_CONCURRENT). That collides with the client's pool-acquire timeout (60 s, fromHTTPX_DEFAULTS):httpx.PoolTimeout. Being aTransportErrorit burns the per-sub-request retry budget and ultimately surfaces as a bogus resumableServiceInterruptedtelling the user to wait for the upstream service to recover — for a failure the server never saw.Fix
Gate each fetch attempt with an
asyncio.Semaphoresized fromAPI_USGS_CONCURRENT; the connection pool is now merely sized to match so in-flight sub-requests reuse keepalive connections. Parked sub-requests wait on the semaphore before they touch the pool — no transport clock runs while they wait, semaphore releases wake one waiter each, and the pool timeout reverts to its protective role (a genuinely wedged checkout). The slot is acquired per attempt (inside the retry driver), so a sub-request sleeping off a retry backoff doesn't hold one.unboundeddegenerates to a semaphore sized at the plan total, so there is a single gated code path.Observable behavior is otherwise unchanged: same plan, same sub-request order, same resume semantics.
Why not just disable the pool timeout?
Setting
pool=Noneon the client would suppress the spurious failure but lose the stuck-checkout protection, keep the thundering-herd churn, and leave dispatch breadth-first in one FIFO. The semaphore removes the failure mode, keeps the timeout meaningful, and bounds in-flight work explicitly.Tests
test_fan_out_in_flight_high_water_mark_is_the_cap(parametrized capped/unbounded) — the fetch-level concurrency equals the cap, not the plan total; the capped case fails on the pre-fix code.test_fan_out_outlives_pool_timeout_on_real_transport— mock transports bypass the pool, so this drives the chunker's shared client against a slow localhost server past a scaled-down pool timeout; it reproduces the spurious resumableServiceInterruptedon the pre-fix code and completes on this branch.Verified end-to-end against the live USGS API: with the pre-fix code 6/8 runs interrupted (in-flight peak 8, pool the only throttle); with the fix 0/8 (in-flight peak 2, semaphore gating). Full suite green,
ruffandmypy --strict dataretrieval/clean.🤖 Generated with Claude Code