Skip to content

Commit d454cd9

Browse files
thodson-usgsclaude
andcommitted
refactor(ogc): reuse + unify the OGC engine — pager, aggregation, ambient state
Reuse and unify the OGC engine's HTTP pagination / aggregation / error-recovery / ambient-state plumbing instead of carrying parallel implementations. Net source reduction of ~66 LOC, behavior-preserving, plus one rate-limit correctness fix. wateruse reuse: - wateruse drives the engine's generic `_paginate` (with an injected `raise_for_status` for the NWDC `{detail}` envelope), `_run_sync` (anyio portal, Jupyter-safe), and `_combine_chunk_frames` / `_combine_chunk_responses` aggregators — replacing a hand-rolled pager, thread bridge, and bespoke aggregation. `_resolve_locations` becomes a `_LOCATION_BUILDERS` table dispatch, dropping a 3-way if/elif and a duplicated selector enumeration. Engine unification: - `planning._merge_response`: one low-level "fold N responses into one" behind both pagination (`_paginate`) and chunked/fan-out aggregation (`_combine_chunk_responses`), replacing two near-duplicate implementations; deletes `engine._aggregate_paginated_response`. - `utils.Ambient[T]`: a generic ContextVar-with-scope class collapsing each per-call ambient (`_row_cap`, `_ogc_base_url`, `_dialect`, the chunker's `_chunked_client`) from a var + hand-written `@contextmanager` setter pair into one declaration. `with _x(value):` call sites unchanged; readers shorten to `_x.get()`. - `_paginate`'s verbatim per-page progress block deduped into a `report_page` closure. - `_combine_chunk_responses`: dropped a dead single-response branch. - `_QUOTA_HEADER` moved to the base `planning` module — dedups the literal and fixes a layering inversion (planning had hard-coded it, unable to import from chunking). - `_cql2_param`: CQL2 filter list built as a comprehension. - `engine._check_id_format`: inlined into its only caller; dead re-export dropped. Rate-limit correctness fix: - `x-ratelimit-remaining` now reports the LOWEST value any concurrent sub-request saw (the quota actually left after a fan-out), via a shared `_lowest_remaining`, instead of the last-by-index — fixing a latent inaccuracy in the OGC chunker too. Behavior-preserving (live-verified); offline OGC/wateruse/utils/progress suites green; ruff + mypy --strict clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
1 parent 9345743 commit d454cd9

10 files changed

Lines changed: 297 additions & 354 deletions

File tree

dataretrieval/ogc/chunking.py

Lines changed: 10 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,14 @@
6969
import functools
7070
import os
7171
from collections.abc import Callable, Iterator
72-
from contextlib import contextmanager
73-
from contextvars import ContextVar, copy_context
72+
from contextvars import copy_context
7473
from typing import Any, cast
7574

7675
import httpx
7776
import pandas as pd
7877
from anyio.from_thread import start_blocking_portal
7978

80-
from dataretrieval.utils import HTTPX_DEFAULTS
79+
from dataretrieval.utils import HTTPX_DEFAULTS, Ambient
8180

8281
from . import progress as _progress
8382
from .interruptions import (
@@ -106,9 +105,6 @@
106105
_OGC_URL_BYTE_LIMIT = 8000
107106

108107

109-
# Response header USGS uses to advertise remaining hourly quota.
110-
_QUOTA_HEADER = "x-ratelimit-remaining"
111-
112108
# Fan-out concurrency cap, read at call time (not import) so test
113109
# ``monkeypatch.setenv`` applies. Value grammar in :func:`_read_concurrency_env`;
114110
# the concurrency model is in the module docstring.
@@ -152,38 +148,11 @@ def _read_concurrency_env() -> int | None:
152148
return value
153149

154150

155-
# Shared per-call ``httpx.AsyncClient``, published via :func:`_publish`
156-
# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``)
157-
# reuse the same connection pool across every sub-request. ``None``
158-
# outside a chunked call — paginated helpers then open their own
159-
# short-lived client.
160-
_chunked_client: ContextVar[httpx.AsyncClient | None] = ContextVar(
161-
"_chunked_client", default=None
162-
)
163-
164-
165-
@contextmanager
166-
def _publish(client: httpx.AsyncClient) -> Iterator[None]:
167-
"""
168-
Publish ``client`` on the ``_chunked_client`` ContextVar so the
169-
paginated-loop helpers can borrow it via :func:`get_active_client`
170-
for the duration of the ``with`` block.
171-
172-
Parameters
173-
----------
174-
client : httpx.AsyncClient
175-
The client to publish.
176-
177-
Yields
178-
------
179-
None
180-
Yields once, for the duration of the bind.
181-
"""
182-
token = _chunked_client.set(client)
183-
try:
184-
yield
185-
finally:
186-
_chunked_client.reset(token)
151+
# Shared per-call ``httpx.AsyncClient``, scoped via ``with _chunked_client(c):``
152+
# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``) reuse
153+
# the same connection pool across every sub-request. ``None`` outside a chunked
154+
# call — paginated helpers then open their own short-lived client.
155+
_chunked_client: Ambient[httpx.AsyncClient | None] = Ambient("_chunked_client", None)
187156

188157

189158
def get_active_client() -> httpx.AsyncClient | None:
@@ -197,8 +166,8 @@ def get_active_client() -> httpx.AsyncClient | None:
197166
Returns
198167
-------
199168
httpx.AsyncClient or None
200-
The client published via :func:`_publish` if currently inside a
201-
:class:`ChunkedCall` run; ``None`` otherwise.
169+
The client scoped via ``with _chunked_client(...)`` if currently inside
170+
a :class:`ChunkedCall` run; ``None`` otherwise.
202171
"""
203172
return _chunked_client.get()
204173

@@ -541,7 +510,7 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
541510
)
542511

543512
async with httpx.AsyncClient(limits=limits, **HTTPX_DEFAULTS) as client:
544-
with _publish(client):
513+
with _chunked_client(client):
545514
reporter = _progress.current()
546515
if reporter is not None:
547516
reporter.set_chunks(self.plan.total)

0 commit comments

Comments
 (0)