-
Notifications
You must be signed in to change notification settings - Fork 59
Expand file tree
/
Copy pathutils.py
More file actions
364 lines (294 loc) · 11.3 KB
/
Copy pathutils.py
File metadata and controls
364 lines (294 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
"""
Useful utilities for data munging.
"""
import warnings
from collections.abc import Iterable
import httpx
import pandas as pd
import dataretrieval
from dataretrieval.codes import tz
HTTPX_DEFAULTS = {
"follow_redirects": True,
"timeout": httpx.Timeout(60.0, connect=10.0),
}
def to_str(listlike, delimiter=","):
"""Translates list-like objects into strings.
Parameters
----------
listlike: list-like object
An object that is a list, or list-like
(e.g., ``pandas.core.series.Series``)
delimiter: string, optional
The delimiter that is placed between entries in listlike when it is
turned into a string. Default value is a comma.
Returns
-------
listlike: string
The listlike object as string separated by the delimiter
Examples
--------
.. doctest::
>>> dataretrieval.utils.to_str([1, "a", 2])
'1,a,2'
>>> dataretrieval.utils.to_str([0, 10, 42], delimiter="+")
'0+10+42'
"""
if isinstance(listlike, str):
return listlike
if isinstance(listlike, Iterable):
return delimiter.join(map(str, listlike))
return None
def format_datetime(df, date_field, time_field, tz_field):
"""Creates a datetime field from separate date, time, and
time zone fields.
Assumes ISO 8601.
Parameters
----------
df: ``pandas.DataFrame``
A data frame containing date, time, and timezone fields.
date_field: string
Name of date column in df.
time_field: string
Name of time column in df.
tz_field: string
Name of time zone column in df.
Returns
-------
df: ``pandas.DataFrame``
The data frame with a formatted 'datetime' column
"""
# create a datetime index from the columns in qwdata response
df[tz_field] = df[tz_field].map(tz)
df["datetime"] = pd.to_datetime(
df[date_field] + " " + df[time_field] + " " + df[tz_field],
format="mixed",
utc=True,
)
# if there are any incomplete dates, warn the user
if df["datetime"].isna().any():
count = df["datetime"].isna().sum()
warnings.warn(
f"Warning: {count} incomplete dates found, "
+ "consider setting datetime_index to False.",
UserWarning,
stacklevel=2,
)
return df
# (time-suffix, tz-suffix) pairs that follow a "<prefix>Date" column.
_TIME_TZ_SUFFIXES = (
# WQX3 / Samples, e.g.
# Activity_StartDate / Activity_StartTime / Activity_StartTimeZone
("Time", "TimeZone"),
# Legacy WQP (slash-separated), e.g.
# ActivityStartDate / ActivityStartTime/Time / ActivityStartTime/TimeZoneCode
("Time/Time", "Time/TimeZoneCode"),
)
def _build_utc_datetime(
date_series: pd.Series, time_series: pd.Series, tz_series: pd.Series
) -> pd.Series:
"""Combine date + time + tz-abbreviation columns into a UTC pandas Series.
Unknown timezone codes (and rows missing any of the three values) yield
``NaT``. The input columns are not mutated.
"""
offsets = tz_series.map(tz)
combined = (
date_series.astype("string")
+ " "
+ time_series.astype("string")
+ " "
+ offsets.astype("string")
)
return pd.to_datetime(
combined, format="%Y-%m-%d %H:%M:%S %z", utc=True, errors="coerce"
)
def _attach_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Add ``<prefix>DateTime`` UTC columns for any Date/Time/TimeZone triplets
and sort the frame by the activity-start datetime.
Detects two naming patterns that appear in USGS Samples and Water Quality
Portal CSV responses:
* **WQX3** — ``<prefix>Date``, ``<prefix>Time``, ``<prefix>TimeZone``
* **Legacy WQP** — ``<prefix>Date``, ``<prefix>Time/Time``,
``<prefix>Time/TimeZoneCode``
For every triplet present, a new ``<prefix>DateTime`` column is appended
holding a UTC ``Timestamp`` (offsets resolved via
:data:`dataretrieval.codes.tz`). The original Date/Time/TimeZone columns
are left intact, and an existing ``<prefix>DateTime`` column is never
overwritten.
Rows are sorted (and the index reset) by the canonical activity-start
datetime when present — ``Activity_StartDateTime`` (WQX3) or
``ActivityStartDateTime`` (legacy WQP) — falling back to the first
detected ``*Date`` column. Mirrors R ``dataRetrieval``'s
end-of-pipeline sort in ``importWQP.R``.
Parameters
----------
df : ``pandas.DataFrame``
DataFrame returned from a Samples or WQP CSV endpoint.
Returns
-------
df : ``pandas.DataFrame``
A new DataFrame with derivable ``<prefix>DateTime`` columns appended
and rows sorted by the activity-start datetime (if any date column
was detected).
"""
columns = set(df.columns)
new_columns = {}
first_date_col = None
for col in df.columns:
if not col.endswith("Date"):
continue
if first_date_col is None:
first_date_col = col
prefix = col.removesuffix("Date")
target = prefix + "DateTime"
if target in columns or target in new_columns:
continue
for time_suffix, tz_suffix in _TIME_TZ_SUFFIXES:
time_col = prefix + time_suffix
tz_col = prefix + tz_suffix
if time_col in columns and tz_col in columns:
new_columns[target] = _build_utc_datetime(
df[col], df[time_col], df[tz_col]
)
break
if new_columns:
# Concat in one shot — per-column assignment on a wide CSV-derived
# frame triggers pandas' fragmentation PerformanceWarning.
df = pd.concat([df, pd.DataFrame(new_columns, index=df.index)], axis=1)
if "Activity_StartDateTime" in df.columns:
sort_key = "Activity_StartDateTime"
elif "ActivityStartDateTime" in df.columns:
sort_key = "ActivityStartDateTime"
else:
sort_key = first_date_col
if sort_key is not None:
df = df.sort_values(by=sort_key, ignore_index=True)
return df
class BaseMetadata:
"""Base class for the metadata returned alongside a service's data.
A concrete value object holding the response URL, query time, and headers;
the modern ``waterdata`` getters return it directly.
``site_info`` and ``variable_info`` are legacy hooks: the ``nwis`` / ``wqp``
metadata subclasses override them to look up site (or, historically,
variable) details for the query. They are not part of the modern
``waterdata`` contract, so on the base they raise ``NotImplementedError``.
Attributes
----------
url : str
Response url
query_time: datetme.timedelta
Response elapsed time
header: httpx.Headers
Response headers
"""
def __init__(self, response) -> None:
"""Generates a standard set of metadata informed by the response.
Parameters
----------
response: Response
Response object from httpx module
Returns
-------
md: :obj:`dataretrieval.utils.BaseMetadata`
A ``dataretrieval`` custom :obj:`dataretrieval.utils.BaseMetadata` object.
"""
# Coerce httpx.URL -> str: BaseMetadata.url has always been str.
self.url = str(response.url)
self.query_time = response.elapsed
self.header = response.headers
self.comment = None
# # not sure what statistic_info is
# self.statistic_info = None
# # disclaimer seems to be only part of importWaterML1
# self.disclaimer = None
# These properties are to be set by `nwis` or `wqp`-specific metadata classes.
@property
def site_info(self):
raise NotImplementedError(
"site_info must be implemented by utils.BaseMetadata children"
)
@property
def variable_info(self):
raise NotImplementedError(
"variable_info must be implemented by utils.BaseMetadata children"
)
def __repr__(self) -> str:
return f"{type(self).__name__}(url={self.url})"
_URL_TOO_LONG_EXAMPLE = """
# n is the number of chunks to divide the query into \n
split_list = np.array_split(site_list, n)
data_list = [] # list to store chunk results in \n
# loop through chunks and make requests \n
for site_list in split_list: \n
data = nwis.get_record(sites=site_list, service='dv', \n
start=start, end=end) \n
data_list.append(data) # append results to list"""
def _url_too_long_error(detail: str) -> ValueError:
return ValueError(
"Request URL too long. Modify your query to use fewer sites. "
f"{detail}. Pseudo-code example of how to split your query: "
f"\n {_URL_TOO_LONG_EXAMPLE}"
)
def query(url, payload, delimiter=",", ssl_check=True):
"""Send a query.
Wrapper for httpx.get that handles errors, converts listed
query parameters to comma separated strings, and returns response.
Parameters
----------
url: string
URL to query
payload: dict
query parameters passed to ``httpx.get``
delimiter: string
delimiter to use with lists
ssl_check: bool
If True, check SSL certificates, if False, do not check SSL,
default is True
Returns
-------
string: query response
The response from the API query ``httpx.get`` function call.
"""
for key, value in payload.items():
payload[key] = to_str(value, delimiter)
# httpx serializes None params as ``foo=``; USGS rejects with 400.
# Drop them. (``to_str`` returns None for non-iterable scalars like bools.)
payload = {k: v for k, v in payload.items() if v is not None}
user_agent = {"user-agent": f"python-dataretrieval/{dataretrieval.__version__}"}
try:
response = httpx.get(
url,
params=payload,
headers=user_agent,
verify=ssl_check,
**HTTPX_DEFAULTS,
)
except httpx.InvalidURL as exc:
raise _url_too_long_error(f"httpx rejected the URL client-side: {exc}") from exc
if response.status_code == 400:
raise ValueError(
f"Bad Request, check that your parameters are correct. URL: {response.url}"
)
elif response.status_code == 404:
raise ValueError(
"Page Not Found Error. May be the result of an empty query. "
+ f"URL: {response.url}"
)
elif response.status_code == 414:
raise _url_too_long_error(f"API response reason: {response.reason_phrase}")
elif 500 <= response.status_code < 600:
raise ValueError(
f"Service Unavailable: {response.status_code} {response.reason_phrase}. "
+ f"The service at {response.url} may be down or experiencing issues."
)
if response.text.startswith("No sites/data"):
raise NoSitesError(response.url)
return response
class NoSitesError(Exception):
"""Custom error class used when selection criteria returns no sites/data."""
def __init__(self, url):
self.url = url
def __str__(self):
return (
"No sites/data found using the selection criteria specified in "
f"url: {self.url}"
)