Skip to content

Commit 51bdc6f

Browse files
thodson-usgsclaude
andauthored
feat(waterdata): replace per-page logger.info with a single progress line (#288)
Paginated and chunked Water Data queries display a progress bar that gets updated as data arrives: waterdata · chunk 2/5 · 14 pages · 8,421 rows · 4,870 requests left Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 092f1b0 commit 51bdc6f

6 files changed

Lines changed: 680 additions & 52 deletions

File tree

README.md

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -109,22 +109,13 @@ Visit the
109109
[API Reference](https://doi-usgs.github.io/dataretrieval-python/reference/waterdata.html)
110110
for more information and examples on available services and input parameters.
111111

112-
**NEW:** This module implements
113-
[logging](https://docs.python.org/3/howto/logging.html#logging-basic-tutorial)
114-
so you can view the URL requests sent to the USGS Water Data APIs and the
115-
number of requests remaining each hour. These messages can be helpful for
116-
troubleshooting and support. To enable logging in your Python console or
117-
notebook:
112+
For verbose troubleshooting and support — including the request URL sent to the
113+
API — enable debug-level
114+
[logging](https://docs.python.org/3/howto/logging.html#logging-basic-tutorial):
118115

119116
```python
120117
import logging
121-
logging.basicConfig(level=logging.INFO)
122-
```
123-
To log messages to a file, you can specify a filename in the
124-
`basicConfig` call:
125-
126-
```python
127-
logging.basicConfig(filename='waterdata.log', level=logging.INFO)
118+
logging.basicConfig(level=logging.DEBUG)
128119
```
129120

130121
### Water Quality Portal (WQP)
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
"""A single self-updating status line for paginated / chunked Water Data queries.
2+
3+
Water Data getters fan out two ways the caller can't see: large multi-value
4+
requests are split into URL-length-safe *chunks* (``chunking`` module), and each
5+
request follows ``next`` links across an unknown number of *pages*
6+
(``utils._paginate``). This module surfaces that work as one line on stderr,
7+
rewritten in place as data arrives::
8+
9+
Retrieving: daily · 6 pages · 2,881 rows · 995/1,000 requests remaining
10+
11+
It replaces the per-page ``logger.info`` calls that previously narrated the same
12+
events one line at a time.
13+
14+
The active reporter lives in a :class:`~contextvars.ContextVar` rather than being
15+
threaded through every signature: progress is a cross-cutting concern that the
16+
chunk orchestrator (outer, chunk counts) and the page-walking loop (inner,
17+
page/row/rate-limit counts) both update without knowing about each other. Call
18+
:func:`progress_context` to activate one and :func:`current` to reach it.
19+
20+
By default the line is shown for interactive use — an interactive terminal or a
21+
Jupyter/IPython kernel (like ``tqdm``) — while redirected logs and CI stay clean.
22+
``API_USGS_PROGRESS`` forces it on (``1``/``true``) or off (``0``/``false``).
23+
"""
24+
25+
from __future__ import annotations
26+
27+
import contextvars
28+
import os
29+
import sys
30+
from collections.abc import Iterator
31+
from contextlib import contextmanager
32+
from typing import TextIO
33+
34+
35+
def _group_int(value: str) -> str:
36+
"""Comma-group a plain ASCII integer string; pass anything else through.
37+
38+
(``str.isdigit`` alone is True for non-decimal unicode digits that ``int``
39+
rejects, hence the ``isascii`` guard.)
40+
"""
41+
return f"{int(value):,}" if value.isascii() and value.isdigit() else value
42+
43+
44+
# The reporter active for the current query. A ContextVar (not a module global)
45+
# so the chunk orchestrator and the page loop resolve to the same reporter
46+
# within one query, and an unrelated query in another context can't clobber its
47+
# state. (It does not give concurrent queries sharing one stderr separate
48+
# lines — they would still interleave.)
49+
_active: contextvars.ContextVar[ProgressReporter | None] = contextvars.ContextVar(
50+
"waterdata_progress", default=None
51+
)
52+
53+
# Where to register for an API key. Surfaced once when a query runs without an
54+
# API key configured (no API_USGS_PAT), since unauthenticated callers hit much
55+
# lower rate limits (see the API_USGS_PAT note in the README).
56+
SIGNUP_URL = "https://api.waterdata.usgs.gov/signup/"
57+
58+
# Process-level latch so the "no API key" pointer is shown at most once.
59+
_api_key_hint_shown = False
60+
61+
62+
def _in_jupyter_kernel() -> bool:
63+
"""True when running inside a Jupyter/IPython *kernel* (notebook, lab,
64+
qtconsole).
65+
66+
A kernel's ``stderr`` isn't a TTY, but it honors carriage-return rewrites in
67+
the cell output area — the same mechanism ``tqdm`` rides on — so the line is
68+
worth showing there. The plain IPython terminal REPL is a
69+
``TerminalInteractiveShell`` (already a TTY), so only the ZMQ kernel needs
70+
this extra signal. Detected without importing IPython: if it isn't already
71+
imported, we aren't in a shell.
72+
"""
73+
ipython = sys.modules.get("IPython")
74+
if ipython is None:
75+
return False
76+
shell = ipython.get_ipython()
77+
return shell is not None and type(shell).__name__ == "ZMQInteractiveShell"
78+
79+
80+
def _enabled_default(stream: TextIO) -> bool:
81+
"""Whether to draw the line by default.
82+
83+
``API_USGS_PROGRESS`` wins when set. Otherwise show it for interactive use —
84+
a TTY or a Jupyter/IPython kernel — and stay quiet for redirected output,
85+
logs, and CI.
86+
"""
87+
override = os.getenv("API_USGS_PROGRESS")
88+
if override is not None:
89+
return override.strip().lower() not in {"", "0", "false", "no", "off"}
90+
if _in_jupyter_kernel():
91+
return True
92+
return hasattr(stream, "isatty") and stream.isatty()
93+
94+
95+
class ProgressReporter:
96+
"""Accumulates query progress and rewrites a single status line in place.
97+
98+
Every update method is a no-op when the reporter is disabled, so call sites
99+
need no ``if enabled`` guards. The line is redrawn with a leading carriage
100+
return and padded to erase the previous (possibly longer) contents;
101+
:meth:`close` terminates it with a newline so the final state persists.
102+
"""
103+
104+
def __init__(
105+
self,
106+
*,
107+
service: str | None = None,
108+
stream: TextIO | None = None,
109+
enabled: bool | None = None,
110+
) -> None:
111+
self._stream = stream if stream is not None else sys.stderr
112+
self.enabled = _enabled_default(self._stream) if enabled is None else enabled
113+
# The service/collection being retrieved (e.g. "daily", "peaks"),
114+
# shown as the line's leading label.
115+
self.service = service
116+
self.total_chunks = 1
117+
self.current_chunk = 0
118+
self.pages = 0
119+
self.rows = 0
120+
self.rate_remaining: str | None = None
121+
# The hourly request quota (``x-ratelimit-limit``), shown as the
122+
# denominator when the server reports it.
123+
self.rate_limit: str | None = None
124+
self._last_len = 0
125+
# Whether anything was actually written to the stream — drives whether
126+
# close() needs a terminating newline. (``current_chunk`` is a poor
127+
# proxy: ``start_chunk`` sets it even when it doesn't render.)
128+
self._rendered = False
129+
self._closed = False
130+
131+
def set_chunks(self, total: int) -> None:
132+
"""Record how many filter chunks this query was split into."""
133+
self.total_chunks = max(int(total), 1)
134+
135+
def start_chunk(self, index: int) -> None:
136+
"""Mark the start of chunk ``index`` (1-based) and redraw.
137+
138+
Only redraws when actually chunking (``total_chunks > 1``); a
139+
single-chunk plan has nothing chunk-specific to show yet, so it
140+
avoids a premature "0 pages" frame before the first page arrives.
141+
"""
142+
self.current_chunk = index
143+
if self.total_chunks > 1:
144+
self._render()
145+
146+
def add_page(self, rows: int = 0) -> None:
147+
"""Record one fetched page carrying ``rows`` rows and redraw."""
148+
self.pages += 1
149+
self.rows += int(rows)
150+
self._render()
151+
152+
def set_rate_remaining(
153+
self, value: str | int | None, limit: str | int | None = None
154+
) -> None:
155+
"""Update the rate-limit display from the response headers.
156+
157+
``value`` is ``x-ratelimit-remaining``; ``limit`` is the optional
158+
``x-ratelimit-limit`` quota, shown as the denominator. Empty/missing
159+
values are ignored so a page that omits a header doesn't blank out the
160+
last known value.
161+
"""
162+
if value not in (None, ""):
163+
self.rate_remaining = str(value)
164+
if limit not in (None, ""):
165+
self.rate_limit = str(limit)
166+
167+
def _format(self) -> str:
168+
parts: list[str] = []
169+
if self.total_chunks > 1:
170+
parts.append(f"chunk {self.current_chunk}/{self.total_chunks}")
171+
parts.append(f"{self.pages} page" + ("" if self.pages == 1 else "s"))
172+
if self.rows:
173+
parts.append(f"{self.rows:,} rows")
174+
if self.rate_remaining is not None:
175+
remaining = _group_int(self.rate_remaining)
176+
if self.rate_limit is not None:
177+
limit = _group_int(self.rate_limit)
178+
segment = f"{remaining}/{limit} requests remaining"
179+
else:
180+
segment = f"{remaining} requests remaining"
181+
parts.append(segment)
182+
if self.service:
183+
return f"Retrieving: {self.service} · " + " · ".join(parts)
184+
return "Progress: " + " · ".join(parts)
185+
186+
def _render(self) -> None:
187+
if not self.enabled or self._closed:
188+
return
189+
try:
190+
line = self._format()
191+
pad = max(self._last_len - len(line), 0)
192+
self._stream.write("\r" + line + " " * pad)
193+
self._stream.flush()
194+
self._last_len = len(line)
195+
self._rendered = True
196+
except Exception: # noqa: BLE001
197+
# Progress output is best-effort cosmetics; a broken pipe (output
198+
# piped to ``head``), a closed stream, or an encoding error must
199+
# never disturb — let alone truncate — the query. Disable so we
200+
# don't retry on every subsequent page.
201+
self.enabled = False
202+
203+
def close(self) -> None:
204+
"""Finalize the line with a trailing newline so it persists on screen.
205+
206+
If no API key is configured (no ``API_USGS_PAT``), append a one-time
207+
pointer to API-key registration, since unauthenticated callers hit much
208+
lower rate limits.
209+
"""
210+
if self._closed:
211+
return
212+
self._closed = True
213+
if not (self.enabled and self._rendered):
214+
return
215+
try:
216+
self._stream.write("\n")
217+
self._maybe_hint_api_key()
218+
self._stream.flush()
219+
except Exception: # noqa: BLE001
220+
self.enabled = False
221+
222+
def _maybe_hint_api_key(self) -> None:
223+
global _api_key_hint_shown
224+
if _api_key_hint_shown or os.getenv("API_USGS_PAT"):
225+
return
226+
# Set the once-per-process latch only after a successful write, so a
227+
# failed write (broken pipe) doesn't silently burn the hint for every
228+
# later query in the process.
229+
self._stream.write(
230+
f"No API key detected — register for higher rate limits at {SIGNUP_URL}\n"
231+
)
232+
_api_key_hint_shown = True
233+
234+
235+
@contextmanager
236+
def progress_context(
237+
*,
238+
service: str | None = None,
239+
stream: TextIO | None = None,
240+
enabled: bool | None = None,
241+
) -> Iterator[ProgressReporter]:
242+
"""Activate a :class:`ProgressReporter` for the duration of a query.
243+
244+
``service`` labels the line (e.g. ``"Retrieving: daily ..."``). If a reporter
245+
is already active (a nested call), the existing one is yielded unchanged so
246+
the outermost query owns the single line; only the outermost context closes
247+
it (and ``service``/``stream``/``enabled`` of a nested call are ignored).
248+
"""
249+
existing = _active.get()
250+
if existing is not None:
251+
yield existing
252+
return
253+
reporter = ProgressReporter(service=service, stream=stream, enabled=enabled)
254+
token = _active.set(reporter)
255+
try:
256+
yield reporter
257+
finally:
258+
_active.reset(token)
259+
reporter.close()
260+
261+
262+
def current() -> ProgressReporter | None:
263+
"""Return the reporter active for the current query, or ``None``."""
264+
return _active.get()

dataretrieval/waterdata/api.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2338,7 +2338,7 @@ def get_samples(
23382338

23392339
req = PreparedRequest()
23402340
req.prepare_url(url, params=params)
2341-
logger.info("Request: %s", req.url)
2341+
logger.debug("Request: %s", req.url)
23422342

23432343
response = requests.get(
23442344
url, params=params, verify=ssl_check, headers=_default_headers()
@@ -2410,7 +2410,7 @@ def get_samples_summary(
24102410

24112411
req = PreparedRequest()
24122412
req.prepare_url(url, params=params)
2413-
logger.info("Request: %s", req.url)
2413+
logger.debug("Request: %s", req.url)
24142414

24152415
response = requests.get(
24162416
url, params=params, verify=ssl_check, headers=_default_headers()

dataretrieval/waterdata/chunking.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import requests
5050
from requests.structures import CaseInsensitiveDict
5151

52+
from . import _progress
5253
from .filters import (
5354
_check_numeric_filter_pitfall,
5455
_is_chunkable,
@@ -1126,10 +1127,15 @@ def resume(self) -> tuple[pd.DataFrame, requests.Response]:
11261127
(checked after the first sub-request).
11271128
"""
11281129
with requests.Session() as session, _publish_session(session):
1130+
reporter = _progress.current()
1131+
if reporter is not None:
1132+
reporter.set_chunks(self.plan.total)
11291133
completed = len(self._chunks)
11301134
for i, sub_args in enumerate(self.plan.iter_sub_args()):
11311135
if i < completed:
11321136
continue
1137+
if reporter is not None:
1138+
reporter.start_chunk(i + 1)
11331139
self._issue(sub_args)
11341140
frames = [frame for frame, _ in self._chunks]
11351141
responses = [resp for _, resp in self._chunks]

0 commit comments

Comments
 (0)