Skip to content

Commit c63da1b

Browse files
thodson-usgsclaude
andcommitted
refactor(waterdata): address PR 285 review (compact retry doc, drop _ASLEEP, inline trivial methods)
- Module docstring (L29-31): apply suggested wording — drop the "isn't slept off inline" / "doesn't block the call" rationale; the one-line escalation statement is enough. - Drop the ``_ASLEEP = asyncio.sleep`` module-level test hook in favor of a direct ``await asyncio.sleep(delay)``; tests now patch ``chunking.asyncio.sleep`` (still scoped to the chunking module's asyncio binding, no extra indirection in production). - Inline ``ChunkedCall.record(index, pair)`` into the one call site in ``_run.track``; the "single writer of ``_chunks``" invariant moves to a comment on ``self._chunks`` initialization. - Inline ``ChunkedCall.combined()`` into ``_run``'s return; the ``partial_*`` bypass note moves to a comment at the return site, where it's more useful than buried in a removed helper's docstring. No behavior change; 296 offline tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ad1208e commit c63da1b

2 files changed

Lines changed: 21 additions & 61 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 9 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
5xx, connect/read timeout) with exponential backoff + full jitter,
2727
honoring a server ``Retry-After`` when present. ``API_USGS_RETRIES``
2828
sets the cap (default 4; ``0`` disables). A ``Retry-After`` longer
29-
than the per-call ceiling isn't slept off inline — it escalates to
30-
the resumable interruption below so a multi-minute quota-window
31-
reset doesn't block the call.
29+
than the per-call ceiling escalates to a resumable interruption.
3230
3331
Interruption: any mid-stream transient failure — 429, 5xx, or a bare
3432
transport error (connect/read timeout, oversize follow-up URL) — surfaces
@@ -1128,12 +1126,6 @@ def _retryable(exc: BaseException) -> tuple[bool, float | None]:
11281126
return False, None
11291127

11301128

1131-
# Sleep hook, indirected through a module global so tests can
1132-
# ``monkeypatch.setattr`` it to a no-op instead of waiting for real
1133-
# backoff. Production uses the stdlib call.
1134-
_ASLEEP = asyncio.sleep
1135-
1136-
11371129
def _retry_delay(exc: BaseException, attempt: int, policy: RetryPolicy) -> float | None:
11381130
"""
11391131
Decide the backoff for a just-failed ``attempt`` (1-based), or ``None``
@@ -1206,7 +1198,7 @@ async def _retry(
12061198
delay = _retry_delay(exc, attempt, policy)
12071199
if delay is None:
12081200
raise
1209-
await _ASLEEP(delay)
1201+
await asyncio.sleep(delay)
12101202

12111203

12121204
def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
@@ -1401,26 +1393,10 @@ def __init__(
14011393
self.finalize = finalize
14021394
# Completed (frame, response) pairs keyed by sub-args index; sparse
14031395
# (gathered sub-requests complete out of order — see class docstring).
1396+
# ``_run``'s ``track`` closure is the only writer, so ``dict`` insertion
1397+
# order is completion order (relied on by :meth:`_combine_raw`).
14041398
self._chunks: dict[int, tuple[pd.DataFrame, httpx.Response]] = {}
14051399

1406-
def record(self, index: int, pair: tuple[pd.DataFrame, httpx.Response]) -> None:
1407-
"""
1408-
Record a completed sub-request's ``(frame, response)`` pair under
1409-
its sub-args index.
1410-
1411-
The single writer of ``self._chunks`` — used by the gather in
1412-
:meth:`_run` — so ``dict`` insertion order is completion order
1413-
(see :meth:`_combine_raw`).
1414-
1415-
Parameters
1416-
----------
1417-
index : int
1418-
The sub-args index this completed pair belongs to.
1419-
pair : tuple of (pandas.DataFrame, httpx.Response)
1420-
The completed sub-request's ``(frame, response)`` pair.
1421-
"""
1422-
self._chunks[index] = pair
1423-
14241400
def wrap_failure(self, exc: BaseException) -> ChunkInterrupted | None:
14251401
"""
14261402
Build the matching :class:`ChunkInterrupted` carrying this
@@ -1483,25 +1459,6 @@ def _combine_raw(self) -> tuple[pd.DataFrame, httpx.Response]:
14831459
_combine_chunk_responses(responses, self.plan.canonical_url),
14841460
)
14851461

1486-
def combined(self) -> tuple[pd.DataFrame, Any]:
1487-
"""
1488-
Combine every recorded sub-request and apply :attr:`finalize`.
1489-
1490-
Returned by :meth:`_run` on a completed call (first run or
1491-
resume). The ``partial_*`` accessors deliberately do NOT route
1492-
through here — they return the raw :meth:`_combine_raw` snapshot
1493-
to stay cheap and side-effect-free.
1494-
1495-
Returns
1496-
-------
1497-
tuple of (pandas.DataFrame, finalized response)
1498-
The combined frame and whatever :attr:`finalize` produces —
1499-
a raw :class:`httpx.Response` by default, or the OGC
1500-
getters' type-coerced / column-arranged frame plus
1501-
``BaseMetadata``.
1502-
"""
1503-
return self.finalize(*self._combine_raw())
1504-
15051462
@property
15061463
def partial_frame(self) -> pd.DataFrame:
15071464
"""
@@ -1669,7 +1626,7 @@ async def track(
16691626
) -> tuple[pd.DataFrame, httpx.Response]:
16701627
"""One sub-request (with retry) + record + progress tick."""
16711628
result = await _retry(lambda: self.fetch(args), self.retry_policy)
1672-
self.record(index, result)
1629+
self._chunks[index] = result
16731630
if reporter is not None:
16741631
# Chunks finish out of order under gather, so tick the
16751632
# completed *count* rather than a positional index.
@@ -1710,7 +1667,10 @@ async def track(
17101667
interrupted, exc = first_transient
17111668
raise interrupted from exc
17121669

1713-
return self.combined()
1670+
# Apply the injected ``finalize`` to the raw combined result.
1671+
# ``partial_frame`` / ``partial_response`` deliberately bypass
1672+
# ``finalize`` to stay cheap and side-effect-free.
1673+
return self.finalize(*self._combine_raw())
17141674

17151675

17161676
def multi_value_chunked(

tests/waterdata_chunking_test.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@
6060

6161

6262
def _aiozero(_d):
63-
"""An async no-op sleep — monkeypatched over ``chunking._ASLEEP`` so
64-
retry backoff doesn't actually wait in tests."""
63+
"""An async no-op sleep — monkeypatched over the ``chunking`` module's
64+
``asyncio.sleep`` so retry backoff doesn't actually wait in tests."""
6565

6666
async def _noop():
6767
return None
@@ -1489,7 +1489,7 @@ def test_combine_chunk_responses_does_not_mutate_input_urls():
14891489
# ---------------------------------------------------------------------------
14901490
# Retry-with-backoff: RetryPolicy + _retryable + drivers + decorator wiring.
14911491
# Conftest pins API_USGS_RETRIES=0, so these tests opt in explicitly and
1492-
# patch chunking._SLEEP / chunking._ASLEEP to no-ops (no real backoff).
1492+
# patch the chunking module's ``asyncio.sleep`` to a no-op (no real backoff).
14931493
# ---------------------------------------------------------------------------
14941494

14951495

@@ -1601,7 +1601,7 @@ def test_retryable_skips_wrapped_midpagination_transient():
16011601

16021602

16031603
def test_retry_transient_then_recovers(monkeypatch):
1604-
monkeypatch.setattr(_chunking, "_ASLEEP", _aiozero)
1604+
monkeypatch.setattr(_chunking.asyncio, "sleep", _aiozero)
16051605
calls = {"n": 0}
16061606

16071607
async def afn():
@@ -1616,7 +1616,7 @@ async def afn():
16161616

16171617

16181618
def test_retry_exhausted_reraises(monkeypatch):
1619-
monkeypatch.setattr(_chunking, "_ASLEEP", _aiozero)
1619+
monkeypatch.setattr(_chunking.asyncio, "sleep", _aiozero)
16201620
calls = {"n": 0}
16211621

16221622
async def afn():
@@ -1635,7 +1635,7 @@ def _record(delay):
16351635
slept.append(delay)
16361636
return _aiozero(delay)
16371637

1638-
monkeypatch.setattr(_chunking, "_ASLEEP", _record)
1638+
monkeypatch.setattr(_chunking.asyncio, "sleep", _record)
16391639
calls = {"n": 0}
16401640

16411641
async def afn():
@@ -1654,7 +1654,7 @@ def _record(delay):
16541654
slept.append(delay)
16551655
return _aiozero(delay)
16561656

1657-
monkeypatch.setattr(_chunking, "_ASLEEP", _record)
1657+
monkeypatch.setattr(_chunking.asyncio, "sleep", _record)
16581658
calls = {"n": 0}
16591659

16601660
async def afn():
@@ -1673,7 +1673,7 @@ def test_retry_transient_then_success(monkeypatch):
16731673
async def _noslept(_d):
16741674
return None
16751675

1676-
monkeypatch.setattr(_chunking, "_ASLEEP", _noslept)
1676+
monkeypatch.setattr(_chunking.asyncio, "sleep", _noslept)
16771677
calls = {"n": 0}
16781678

16791679
async def afn():
@@ -1693,7 +1693,7 @@ def test_chunker_retries_transient_then_completes(monkeypatch):
16931693
"""A transient on one sub-request is retried transparently; the
16941694
decorated call completes with no ChunkInterrupted."""
16951695
monkeypatch.setenv("API_USGS_RETRIES", "3")
1696-
monkeypatch.setattr(_chunking, "_ASLEEP", _aiozero)
1696+
monkeypatch.setattr(_chunking.asyncio, "sleep", _aiozero)
16971697
state = {"failed": False}
16981698

16991699
async def fetch(args):
@@ -1713,7 +1713,7 @@ def test_chunker_exhausted_retries_still_resumable(monkeypatch):
17131713
"""When retries are exhausted the failure still surfaces as a
17141714
resumable ChunkInterrupted — retries don't swallow the escape hatch."""
17151715
monkeypatch.setenv("API_USGS_RETRIES", "2")
1716-
monkeypatch.setattr(_chunking, "_ASLEEP", _aiozero)
1716+
monkeypatch.setattr(_chunking.asyncio, "sleep", _aiozero)
17171717
attempts = {"n": 0}
17181718

17191719
async def fetch(args):
@@ -1737,7 +1737,7 @@ def test_async_fan_out_retries_transient_then_completes(monkeypatch):
17371737
async def _noslept(_d):
17381738
return None
17391739

1740-
monkeypatch.setattr(_chunking, "_ASLEEP", _noslept)
1740+
monkeypatch.setattr(_chunking.asyncio, "sleep", _noslept)
17411741
state = {"failed": False}
17421742

17431743
async def fetch_async(args):
@@ -1759,7 +1759,7 @@ def test_async_fan_out_surfaces_fatal_over_transient(monkeypatch):
17591759
async def _noslept(_d):
17601760
return None
17611761

1762-
monkeypatch.setattr(_chunking, "_ASLEEP", _noslept)
1762+
monkeypatch.setattr(_chunking.asyncio, "sleep", _noslept)
17631763

17641764
async def fetch_async(args):
17651765
# One chunk carries a deterministic programmer error; the rest are

0 commit comments

Comments
 (0)