Skip to content

Commit 16fc920

Browse files
thodson-usgsclaude
andcommitted
fix(waterdata): never let the progress line crash or truncate a query
Code review surfaced two real defects: - ProgressReporter rendering wrote to stderr unguarded. Because the per-page `reporter.add_page()` call sits inside `_walk_pages`' broad `except Exception`, a render failure — BrokenPipeError when output is piped to `head`, or UnicodeEncodeError for the `·` separator on a non-UTF-8 stderr — was misread as a failed request and silently truncated the result; on the first (pre-loop) page it crashed the query outright. `_render` and `close` now swallow stream errors and disable further rendering, so progress is strictly best-effort. - The "Geopandas not installed" advisory lived in the per-page `_handle_stats_nesting`, so a multi-page stats query (now at WARNING) emitted one warning per page. Moved it to `get_stats_data` so it fires once, matching `_walk_pages`. Also harden rate-limit formatting against non-decimal unicode digits (`isascii() and isdigit()`), and add regression tests: a broken progress stream must not truncate pagination, and the geopandas advisory fires once. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c940fcc commit 16fc920

4 files changed

Lines changed: 95 additions & 38 deletions

File tree

dataretrieval/waterdata/_progress.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,28 @@ def _format(self) -> str:
110110
parts.append(f"{self.rows:,} rows")
111111
if self.rate_remaining is not None:
112112
# The header is a string; group it like the row count when it's a
113-
# plain integer, otherwise show it verbatim.
113+
# plain ASCII integer, otherwise show it verbatim. (``str.isdigit``
114+
# alone is True for non-decimal unicode digits that ``int`` rejects.)
114115
rate = self.rate_remaining
115-
rate = f"{int(rate):,}" if rate.isdigit() else rate
116+
rate = f"{int(rate):,}" if rate.isascii() and rate.isdigit() else rate
116117
parts.append(f"{rate} requests left")
117118
return "Progress: " + " · ".join(parts)
118119

119120
def _render(self) -> None:
120121
if not self.enabled or self._closed:
121122
return
122-
line = self._format()
123-
pad = max(self._last_len - len(line), 0)
124-
self._stream.write("\r" + line + " " * pad)
125-
self._stream.flush()
126-
self._last_len = len(line)
123+
try:
124+
line = self._format()
125+
pad = max(self._last_len - len(line), 0)
126+
self._stream.write("\r" + line + " " * pad)
127+
self._stream.flush()
128+
self._last_len = len(line)
129+
except Exception: # noqa: BLE001
130+
# Progress output is best-effort cosmetics; a broken pipe (output
131+
# piped to ``head``), a closed stream, or an encoding error must
132+
# never disturb — let alone truncate — the query. Disable so we
133+
# don't retry on every subsequent page.
134+
self.enabled = False
127135

128136
def close(self) -> None:
129137
"""Finalize the line with a trailing newline so it persists on screen.
@@ -137,9 +145,12 @@ def close(self) -> None:
137145
self._closed = True
138146
if not (self.enabled and (self.pages or self.current_chunk)):
139147
return
140-
self._stream.write("\n")
141-
self._maybe_hint_api_key()
142-
self._stream.flush()
148+
try:
149+
self._stream.write("\n")
150+
self._maybe_hint_api_key()
151+
self._stream.flush()
152+
except Exception: # noqa: BLE001
153+
self.enabled = False
143154

144155
def _maybe_hint_api_key(self) -> None:
145156
global _api_key_hint_shown

dataretrieval/waterdata/utils.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -963,12 +963,6 @@ def _handle_stats_nesting(
963963
if body is None:
964964
return pd.DataFrame()
965965

966-
if not geopd:
967-
logger.warning(
968-
"Geopandas not installed. Geometries will be flattened "
969-
"into pandas DataFrames."
970-
)
971-
972966
# If geopandas not installed, return a pandas dataframe
973967
# otherwise return a geodataframe
974968
if not geopd:
@@ -1119,6 +1113,12 @@ def get_stats_data(
11191113
req = request.prepare()
11201114
logger.debug("Request: %s", req.url)
11211115

1116+
if not GEOPANDAS:
1117+
logger.warning(
1118+
"Geopandas not installed. Geometries will be flattened "
1119+
"into pandas DataFrames."
1120+
)
1121+
11221122
# create temp client if not provided
11231123
# and close it after the request is done
11241124
close_client = client is None

tests/waterdata_progress_test.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,24 @@ def test_close_without_activity_writes_nothing():
102102
assert stream.getvalue() == ""
103103

104104

105+
class _RaisingStream:
106+
"""A stream whose writes always fail, e.g. a broken pipe (output | head)."""
107+
108+
def write(self, *_):
109+
raise BrokenPipeError("broken pipe")
110+
111+
def flush(self):
112+
pass
113+
114+
115+
def test_reporter_swallows_stream_errors_and_disables(monkeypatch):
116+
monkeypatch.delenv("API_USGS_PAT", raising=False)
117+
reporter = ProgressReporter(stream=_RaisingStream(), enabled=True)
118+
reporter.add_page(rows=1) # render write raises -> must be swallowed
119+
reporter.close() # newline + hint writes raise -> must be swallowed
120+
assert reporter.enabled is False
121+
122+
105123
# -- API-key pointer -----------------------------------------------------------
106124

107125

@@ -262,3 +280,25 @@ def test_walk_pages_without_context_does_not_error():
262280
df, _ = _walk_pages(geopd=False, req=req, client=client)
263281
assert len(df) == 1
264282
assert current() is None
283+
284+
285+
def test_broken_progress_stream_does_not_truncate_pagination():
286+
# A render failure (broken pipe) lands inside _walk_pages' per-page try;
287+
# it must NOT be mistaken for a failed request and silently drop pages.
288+
resp1 = _resp(
289+
[{"id": "1", "properties": {"v": "a"}}], next_url="https://example.com/p2"
290+
)
291+
resp2 = _resp([{"id": "2", "properties": {"v": "b"}}])
292+
client = mock.MagicMock(spec=requests.Session)
293+
client.send.return_value = resp1
294+
client.request.return_value = resp2
295+
296+
req = mock.MagicMock(spec=requests.PreparedRequest)
297+
req.method = "GET"
298+
req.headers = {}
299+
req.url = "https://example.com/p1"
300+
301+
with progress_context(stream=_RaisingStream(), enabled=True):
302+
df, _ = _walk_pages(geopd=False, req=req, client=client)
303+
304+
assert len(df) == 2 # both pages returned despite the broken progress stream

tests/waterdata_utils_test.py

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -307,31 +307,37 @@ def test_walk_pages_warns_when_geopandas_missing(caplog):
307307
assert any("Geopandas not installed" in m for m in _warning_messages(caplog))
308308

309309

310-
def test_handle_stats_nesting_warns_when_geopandas_missing(caplog):
311-
# This path previously logged at INFO (silent by default); it must warn.
310+
def test_get_stats_data_warns_once_when_geopandas_missing(caplog, monkeypatch):
311+
# The advisory must fire once per query, not once per paginated page
312+
# (it previously lived in the per-page _handle_stats_nesting helper).
313+
from dataretrieval.waterdata.utils import get_stats_data
314+
315+
monkeypatch.setattr(_utils_module, "GEOPANDAS", False)
316+
monkeypatch.setattr(
317+
_utils_module,
318+
"_handle_stats_nesting",
319+
mock.MagicMock(return_value=pd.DataFrame()),
320+
)
312321
caplog.set_level(logging.WARNING, logger=_LOGGER_NAME)
313-
body = {
314-
"next": None,
315-
"features": [
316-
{
317-
"properties": {
318-
"monitoring_location_id": "USGS-12345",
319-
"data": [
320-
{
321-
"parameter_code": "00060",
322-
"unit_of_measure": "ft^3/s",
323-
"parent_time_series_id": "ts-1",
324-
"values": [{"statistic_id": "mean", "value": 10.0}],
325-
}
326-
],
327-
},
328-
}
329-
],
330-
}
331322

332-
_handle_stats_nesting(body, geopd=False)
323+
page1 = mock.MagicMock(status_code=200, headers={})
324+
page1.json.return_value = {"next": "tok", "features": []}
325+
page2 = mock.MagicMock(status_code=200, headers={})
326+
page2.json.return_value = {"next": None, "features": []}
333327

334-
assert any("Geopandas not installed" in m for m in _warning_messages(caplog))
328+
client = mock.MagicMock(spec=requests.Session)
329+
client.send.return_value = page1
330+
client.request.return_value = page2
331+
332+
get_stats_data(
333+
args={"monitoring_location_id": "USGS-1"},
334+
service="observationNormals",
335+
expand_percentiles=False,
336+
client=client,
337+
)
338+
339+
geo = [m for m in _warning_messages(caplog) if "Geopandas not installed" in m]
340+
assert len(geo) == 1
335341

336342

337343
# --- _arrange_cols ----------------------------------------------------------

0 commit comments

Comments
 (0)