Skip to content

Commit 1829cb1

Browse files
thodson-usgsclaude
andcommitted
refactor(waterdata): Untangle chunking ↔ utils — strict one-way layer
The chunker module previously had a circular dep with utils: utils imported chunking at top level, but chunking did a lazy import of `RateLimited` / `ServiceUnavailable` from utils inside `_classify_chunk_error`. The lazy form existed only to defeat that cycle. utils.py also reached across the boundary three times for private chunker internals — `chunking._QUOTA_HEADER`, `chunking._chunked_session.get()`, and `chunking._publish_session`. Reasoning about the layer: `RateLimited` / `ServiceUnavailable` exist *because of* the chunker — they're raised by utils.py's `_raise_for_non_200` solely so `_classify_chunk_error` can dispatch on them as resumable transient failures. The motivating consumer is the chunker, so they belong in chunking.py. Changes: - Move `_RetryableTransportError`, `RateLimited`, `ServiceUnavailable` from utils.py → chunking.py. - Drop the lazy `from .utils import …` inside `_classify_chunk_error`. - Add `chunking.get_active_session()` — public accessor for the `_chunked_session` ContextVar so `_session()` in utils.py doesn't reach into a private name. - Hoist `_QUOTA_HEADER` to a top-level explicit import in utils.py instead of `chunking._QUOTA_HEADER` ad-hoc. - Update the two test files that imported `RateLimited` / `ServiceUnavailable` from utils.py. After this commit, chunking.py has zero runtime dependency on utils.py (only docstring cross-references). The dep direction is now strictly utils → chunking. 80/80 chunker + utils unit tests pass; ruff clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 47c82ec commit 1829cb1

4 files changed

Lines changed: 85 additions & 71 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@
113113
@contextmanager
114114
def _publish_session(session: requests.Session) -> Iterator[None]:
115115
"""
116-
Make ``session`` visible to :func:`dataretrieval.waterdata.utils._session`
117-
for the duration of the ``with`` block via the ``_chunked_session``
116+
Make ``session`` visible to :func:`get_active_session` for the
117+
duration of the ``with`` block via the ``_chunked_session``
118118
ContextVar. Wraps the set/reset token dance so callers don't have to.
119119
"""
120120
token = _chunked_session.set(session)
@@ -124,6 +124,23 @@ def _publish_session(session: requests.Session) -> Iterator[None]:
124124
_chunked_session.reset(token)
125125

126126

127+
def get_active_session() -> requests.Session | None:
128+
"""
129+
Return the chunker's currently-published session, or ``None``.
130+
131+
Public accessor for the ``_chunked_session`` ContextVar so
132+
sibling modules (notably :func:`dataretrieval.waterdata.utils._session`)
133+
don't have to reach into the private ContextVar directly.
134+
135+
Returns
136+
-------
137+
requests.Session or None
138+
The session published by :func:`_publish_session` if currently
139+
inside a :class:`ChunkedCall` ``resume`` block; ``None`` otherwise.
140+
"""
141+
return _chunked_session.get()
142+
143+
127144
# Separators the two axis kinds use to join their atoms back into
128145
# URL text. List axes comma-join values
129146
# (``site=USGS-A,USGS-B``); the filter axis OR-joins clauses
@@ -135,6 +152,58 @@ def _publish_session(session: requests.Session) -> Iterator[None]:
135152
_FetchOnce = Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]]
136153

137154

155+
class _RetryableTransportError(RuntimeError):
156+
"""
157+
Base for typed HTTP transport failures the chunker recognizes as
158+
transient.
159+
160+
Raised by :func:`dataretrieval.waterdata.utils._raise_for_non_200`
161+
and walked by :func:`_classify_chunk_error`. One subclass per
162+
recoverable HTTP status family (429 → :class:`RateLimited`,
163+
5xx → :class:`ServiceUnavailable`); ``ChunkedCall`` wraps them as
164+
resumable :class:`ChunkInterrupted` subclasses.
165+
166+
Parameters
167+
----------
168+
message : str
169+
Human-readable error message.
170+
retry_after : float, optional
171+
Seconds to wait before retrying, parsed from the
172+
``Retry-After`` response header.
173+
174+
Attributes
175+
----------
176+
retry_after : float or None
177+
Seconds to wait before retrying, parsed from the
178+
``Retry-After`` response header. ``None`` when the header was
179+
absent or unparseable.
180+
"""
181+
182+
def __init__(self, message: str, *, retry_after: float | None = None) -> None:
183+
super().__init__(message)
184+
self.retry_after = retry_after
185+
186+
187+
class RateLimited(_RetryableTransportError):
188+
"""
189+
A USGS Water Data API request was rejected with HTTP 429.
190+
191+
Exposed as a typed exception so callers (notably the multi-value
192+
chunker) can detect rate-limit failures via ``isinstance`` instead
193+
of string-matching error messages.
194+
"""
195+
196+
197+
class ServiceUnavailable(_RetryableTransportError):
198+
"""
199+
A USGS Water Data API request was rejected with HTTP 5xx.
200+
201+
Surfaced as a typed exception (parallel to :class:`RateLimited`)
202+
so ``ChunkedCall`` can treat transient server failures as
203+
resumable interruptions rather than fatal programmer errors.
204+
"""
205+
206+
138207
class RequestTooLarge(ValueError):
139208
"""
140209
No chunking plan fits the URL byte limit.
@@ -801,13 +870,7 @@ def _classify_chunk_error(
801870
``RuntimeError`` with the typed transport exception linked as
802871
``__cause__``, so this function must walk the chain rather than
803872
just ``isinstance`` the top-level exception.
804-
805-
The import of ``RateLimited`` / ``ServiceUnavailable`` is lazy
806-
because :mod:`dataretrieval.waterdata.utils` imports this module
807-
to decorate ``_fetch_once``; a top-level import would be circular.
808873
"""
809-
from .utils import RateLimited, ServiceUnavailable
810-
811874
cur: BaseException | None = exc
812875
while cur is not None:
813876
if isinstance(cur, RateLimited):

dataretrieval/waterdata/utils.py

Lines changed: 10 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
from dataretrieval import __version__
1717
from dataretrieval.utils import BaseMetadata
1818
from dataretrieval.waterdata import chunking
19+
from dataretrieval.waterdata.chunking import (
20+
_QUOTA_HEADER,
21+
RateLimited,
22+
ServiceUnavailable,
23+
get_active_session,
24+
)
1925
from dataretrieval.waterdata.types import (
2026
PROFILE_LOOKUP,
2127
PROFILES,
@@ -442,58 +448,6 @@ def _parse_retry_after(value: str | None) -> float | None:
442448
return None
443449

444450

445-
class _RetryableTransportError(RuntimeError):
446-
"""
447-
Base for typed transport failures recognized as transient by
448-
:func:`dataretrieval.waterdata.chunking._classify_chunk_error`.
449-
450-
One subclass per recoverable HTTP status family (429 →
451-
:class:`RateLimited`, 5xx → :class:`ServiceUnavailable`);
452-
``ChunkedCall`` wraps them as resumable ``ChunkInterrupted``
453-
subclasses.
454-
455-
Parameters
456-
----------
457-
message : str
458-
Human-readable error message (typically built by
459-
:func:`_error_body`).
460-
retry_after : float, optional
461-
Seconds to wait before retrying, parsed from the
462-
``Retry-After`` response header.
463-
464-
Attributes
465-
----------
466-
retry_after : float or None
467-
Seconds to wait before retrying, parsed from the
468-
``Retry-After`` response header. ``None`` when the header was
469-
absent or unparseable.
470-
"""
471-
472-
def __init__(self, message: str, *, retry_after: float | None = None) -> None:
473-
super().__init__(message)
474-
self.retry_after = retry_after
475-
476-
477-
class RateLimited(_RetryableTransportError):
478-
"""
479-
A USGS Water Data API request was rejected with HTTP 429.
480-
481-
Exposed as a typed exception so callers (notably the multi-value
482-
chunker) can detect rate-limit failures via ``isinstance`` instead
483-
of string-matching error messages.
484-
"""
485-
486-
487-
class ServiceUnavailable(_RetryableTransportError):
488-
"""
489-
A USGS Water Data API request was rejected with HTTP 5xx.
490-
491-
Surfaced as a typed exception (parallel to :class:`RateLimited`)
492-
so ``ChunkedCall`` can treat transient server failures as
493-
resumable interruptions rather than fatal programmer errors.
494-
"""
495-
496-
497451
def _raise_for_non_200(resp: requests.Response) -> None:
498452
"""
499453
Raise a typed exception for any non-200 response.
@@ -716,7 +670,7 @@ def _next_req_url(
716670
if os.getenv("API_USGS_PAT", ""):
717671
logger.info(
718672
"Remaining requests this hour: %s",
719-
header_info.get(chunking._QUOTA_HEADER, ""),
673+
header_info.get(_QUOTA_HEADER, ""),
720674
)
721675
for link in body.get("links", []):
722676
if link.get("rel") == "next":
@@ -794,8 +748,8 @@ def _session(client: requests.Session | None) -> Iterator[requests.Session]:
794748
1. ``client`` if the caller supplied one (borrowed; not closed
795749
here — the caller owns its lifecycle).
796750
2. The chunker's shared session if we're inside a ``ChunkedCall``
797-
fan-out (published via :func:`chunking._publish_session`).
798-
Borrowed; ``ChunkedCall.resume`` closes it on exit.
751+
fan-out (per :func:`chunking.get_active_session`). Borrowed;
752+
``ChunkedCall.resume`` closes it on exit.
799753
3. A fresh short-lived ``requests.Session`` opened here and closed
800754
on context exit.
801755
@@ -813,7 +767,7 @@ def _session(client: requests.Session | None) -> Iterator[requests.Session]:
813767
if client is not None:
814768
yield client
815769
return
816-
shared = chunking._chunked_session.get()
770+
shared = get_active_session()
817771
if shared is not None:
818772
yield shared
819773
return

tests/waterdata_chunking_test.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,17 @@
3434
ChunkInterrupted,
3535
ChunkPlan,
3636
QuotaExhausted,
37+
RateLimited,
3738
RequestExceedsQuota,
3839
RequestTooLarge,
3940
ServiceInterrupted,
41+
ServiceUnavailable,
4042
_chunked_session,
4143
_extract_axes,
4244
_read_remaining,
4345
multi_value_chunked,
4446
)
45-
from dataretrieval.waterdata.utils import (
46-
RateLimited,
47-
ServiceUnavailable,
48-
_construct_api_requests,
49-
)
47+
from dataretrieval.waterdata.utils import _construct_api_requests
5048

5149

5250
class _FakeReq:

tests/waterdata_utils_test.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
import requests
77

88
import dataretrieval.waterdata.utils as _utils_module
9+
from dataretrieval.waterdata.chunking import RateLimited, ServiceUnavailable
910
from dataretrieval.waterdata.utils import (
10-
RateLimited,
11-
ServiceUnavailable,
1211
_arrange_cols,
1312
_error_body,
1413
_format_api_dates,

0 commit comments

Comments
 (0)