1414
1515import logging
1616import os
17- from typing import Any , Literal , get_args
17+ from typing import Any , Iterable , Literal , get_args
1818
1919import pandas as pd
2020import requests
2121
22- # Rating files use the same USGS RDB shape as NWIS responses (comment
23- # block prefixed with ``#``, header row, format-spec row, then tab-separated
24- # data), so we reuse the parser already in ``nwis``. ``_read_rdb`` is private;
25- # if it ever moves or its contract changes we want a loud failure here, hence
26- # the explicit import rather than a copy.
22+ # Rating files use the same USGS RDB shape as NWIS responses, so we reuse
23+ # the parser already in ``nwis``. ``_read_rdb`` is private; if it ever
24+ # moves we want a loud failure here, hence the explicit import.
2725from dataretrieval .nwis import _read_rdb
2826
29- from .utils import BASE_URL , _default_headers , _format_api_dates
27+ from .utils import _DURATION_RE , BASE_URL , _default_headers , _format_api_dates
3028
3129logger = logging .getLogger (__name__ )
3230
3634_VALID_FILE_TYPES = get_args (RATING_FILE_TYPE )
3735
3836
39- def _quote_cql_str (value : str ) -> str :
40- """Escape a string for inclusion in a single-quoted CQL literal.
41-
42- CQL escapes a single quote by doubling it. Most monitoring-location IDs
43- can never contain a quote, but the function accepts arbitrary strings,
44- so we defend against malformed filters / injection regardless.
45- """
46- return value .replace ("'" , "''" )
47-
48-
49- def _build_filter (
50- monitoring_location_id : str | list [str ] | None ,
51- file_type : str | None ,
52- ) -> str | None :
53- """Compose the CQL filter sent to STAC ``/search``.
54-
55- Mirrors R's logic: only pin ``file_type`` when a single value was given,
56- so a multi-type request returns every matching site and the file-type
57- filtering happens client-side from the per-feature properties.
58- """
59- parts : list [str ] = []
60- if monitoring_location_id is not None :
61- ids = (
62- [monitoring_location_id ]
63- if isinstance (monitoring_location_id , str )
64- else list (monitoring_location_id )
65- )
66- joined = "', '" .join (_quote_cql_str (i ) for i in ids )
67- parts .append (f"monitoring_location_id IN ('{ joined } ')" )
68- if file_type is not None :
69- parts .append (f"file_type = '{ _quote_cql_str (file_type )} '" )
70- return " AND " .join (parts ) if parts else None
71-
72-
73- def _search (
74- filter_str : str | None ,
75- time_str : str | None ,
76- bbox : list [float ] | None ,
77- limit : int ,
78- ssl_check : bool ,
79- ) -> list [dict [str , Any ]]:
80- """Run a single STAC ``/search`` request and return its features."""
81- params : dict [str , Any ] = {"limit" : limit }
82- if filter_str is not None :
83- params ["filter" ] = filter_str
84- if time_str is not None :
85- params ["datetime" ] = time_str
86- if bbox is not None :
87- params ["bbox" ] = "," .join (str (b ) for b in bbox )
88-
89- response = requests .get (
90- f"{ STAC_URL } /search" ,
91- params = params ,
92- headers = _default_headers (),
93- verify = ssl_check ,
94- )
95- response .raise_for_status ()
96- return response .json ().get ("features" , [])
97-
98-
99- def _extract_rdb_comment (rdb : str ) -> list [str ]:
100- """Return the RDB ``#``-prefixed comment block as a list of header lines.
101-
102- The comment block carries useful per-rating metadata — rating id,
103- parameter description, expansion type, last-shifted timestamp, etc.
104- R's ``read_waterdata_ratings`` exposes this via ``comment(df)``; we
105- attach it to ``df.attrs["comment"]`` so callers can inspect or log
106- provenance without re-reading the on-disk RDB.
107- """
108- return [line for line in rdb .splitlines () if line .startswith ("#" )]
109-
110-
111- def _download_and_parse (
112- feature : dict [str , Any ],
113- file_path : str | None ,
114- ssl_check : bool ,
115- ) -> pd .DataFrame :
116- """Fetch the feature's data asset, parse RDB, optionally persist to disk."""
117- url = feature ["assets" ]["data" ]["href" ]
118- fid = feature ["id" ]
119-
120- response = requests .get (url , headers = _default_headers (), verify = ssl_check )
121- response .raise_for_status ()
122-
123- if file_path is not None :
124- with open (os .path .join (file_path , fid ), "w" ) as f :
125- f .write (response .text )
126-
127- df = _read_rdb (response .text )
128- df .attrs ["comment" ] = _extract_rdb_comment (response .text )
129- df .attrs ["url" ] = url
130- return df
131-
132-
13337def get_ratings (
13438 monitoring_location_id : str | list [str ] | None = None ,
13539 file_type : RATING_FILE_TYPE | list [RATING_FILE_TYPE ] = "exsa" ,
@@ -240,29 +144,22 @@ def get_ratings(
240144 ... )
241145
242146 """
243- file_types = [ file_type ] if isinstance ( file_type , str ) else list (file_type )
147+ file_types = _as_list (file_type )
244148 invalid = [ft for ft in file_types if ft not in _VALID_FILE_TYPES ]
245149 if invalid :
246150 raise ValueError (
247- f"Invalid file_type { invalid !r} . Valid options: { list (_VALID_FILE_TYPES )} ."
151+ f"Invalid file_type { invalid !r} ; "
152+ f"valid options are { list (_VALID_FILE_TYPES )} ."
153+ )
154+
155+ if time is not None and any (_DURATION_RE .match (str (v )) for v in _as_list (time )):
156+ raise ValueError (
157+ "ISO 8601 durations (e.g. 'P7D') are not supported in `time` "
158+ "for the rating-curve service. Provide a date or interval instead."
248159 )
160+ time_str = _format_api_dates (time ) if time is not None else None
249161
250- if time is not None :
251- # The rating-curve STAC service rejects ISO 8601 durations; surface a
252- # clear error rather than letting the server return a confusing 4xx.
253- time_values = time if isinstance (time , list ) else [time ]
254- if any (v is not None and "P" in str (v ).upper () for v in time_values ):
255- raise ValueError (
256- "ISO 8601 durations (e.g. 'P7D') are not supported in "
257- "`time` for the rating-curve service. Provide a date or "
258- "interval instead."
259- )
260- time_str = _format_api_dates (time , date = False )
261- else :
262- time_str = None
263-
264- # Mirror R: only pin file_type in the server-side filter when one type
265- # is requested. With multiple types, fetch all and filter locally.
162+ # Mirror R: pin file_type server-side only when one type is requested.
266163 server_file_type = file_types [0 ] if len (file_types ) == 1 else None
267164 filter_str = _build_filter (monitoring_location_id , server_file_type )
268165
@@ -271,22 +168,108 @@ def get_ratings(
271168 if not download_and_parse :
272169 return features
273170
171+ requested = set (file_types )
172+ matching = [
173+ f for f in features if f .get ("properties" , {}).get ("file_type" ) in requested
174+ ]
175+
274176 if file_path is not None :
275177 os .makedirs (file_path , exist_ok = True )
276178
277179 out : dict [str , pd .DataFrame ] = {}
278- requested = set (file_types )
279- for feature in features :
280- # Multi-type requests skip the server-side file_type filter, so
281- # filter here on the per-feature property (more reliable than
282- # substring-matching the URL).
283- feat_type = feature .get ("properties" , {}).get ("file_type" )
284- if feat_type not in requested :
285- continue
180+ for feature in matching :
286181 fid = feature ["id" ]
287182 try :
288183 out [fid ] = _download_and_parse (feature , file_path , ssl_check )
289184 except (requests .RequestException , ValueError , OSError ) as e :
290185 logger .warning ("Failed to download / parse %s: %s" , fid , e )
291186
292187 return out
188+
189+
190+ def _as_list (x : str | Iterable [str ]) -> list [str ]:
191+ """Normalize a string or iterable-of-strings to a list."""
192+ return [x ] if isinstance (x , str ) else list (x )
193+
194+
195+ def _quote_cql_str (value : str ) -> str :
196+ """Escape a single-quoted CQL literal by doubling embedded quotes.
197+
198+ Defends against malformed filters / injection on arbitrary user input,
199+ even though valid USGS monitoring-location IDs cannot contain a quote.
200+ """
201+ return value .replace ("'" , "''" )
202+
203+
204+ def _build_filter (
205+ monitoring_location_id : str | list [str ] | None ,
206+ file_type : str | None ,
207+ ) -> str | None :
208+ """Compose the CQL filter sent to STAC ``/search``.
209+
210+ Returns ``None`` when neither argument constrains the search.
211+ """
212+ parts : list [str ] = []
213+ if monitoring_location_id is not None :
214+ ids = _as_list (monitoring_location_id )
215+ joined = "', '" .join (_quote_cql_str (i ) for i in ids )
216+ parts .append (f"monitoring_location_id IN ('{ joined } ')" )
217+ if file_type is not None :
218+ parts .append (f"file_type = '{ _quote_cql_str (file_type )} '" )
219+ return " AND " .join (parts ) if parts else None
220+
221+
222+ def _search (
223+ filter_str : str | None ,
224+ time_str : str | None ,
225+ bbox : list [float ] | None ,
226+ limit : int ,
227+ ssl_check : bool ,
228+ ) -> list [dict [str , Any ]]:
229+ """Run a single STAC ``/search`` request and return its features."""
230+ params : dict [str , Any ] = {"limit" : limit }
231+ if filter_str is not None :
232+ params ["filter" ] = filter_str
233+ if time_str is not None :
234+ params ["datetime" ] = time_str
235+ if bbox is not None :
236+ params ["bbox" ] = "," .join (map (str , bbox ))
237+
238+ response = requests .get (
239+ f"{ STAC_URL } /search" ,
240+ params = params ,
241+ headers = _default_headers (),
242+ verify = ssl_check ,
243+ )
244+ response .raise_for_status ()
245+ return response .json ().get ("features" , [])
246+
247+
248+ def _extract_rdb_comment (rdb : str ) -> list [str ]:
249+ """Return the RDB ``#``-prefixed comment block as a list of header lines.
250+
251+ Carries useful per-rating provenance — rating id, parameter description,
252+ expansion type, last-shifted timestamp, etc. R's ``read_waterdata_ratings``
253+ exposes the equivalent via ``comment(df)``.
254+ """
255+ return [line for line in rdb .splitlines () if line .startswith ("#" )]
256+
257+
258+ def _download_and_parse (
259+ feature : dict [str , Any ],
260+ file_path : str | None ,
261+ ssl_check : bool ,
262+ ) -> pd .DataFrame :
263+ """Fetch the feature's data asset, parse RDB, optionally persist to disk."""
264+ url = feature ["assets" ]["data" ]["href" ]
265+ response = requests .get (url , headers = _default_headers (), verify = ssl_check )
266+ response .raise_for_status ()
267+
268+ if file_path is not None :
269+ with open (os .path .join (file_path , feature ["id" ]), "w" ) as f :
270+ f .write (response .text )
271+
272+ df = _read_rdb (response .text )
273+ df .attrs ["comment" ] = _extract_rdb_comment (response .text )
274+ df .attrs ["url" ] = url
275+ return df
0 commit comments