-
Notifications
You must be signed in to change notification settings - Fork 157
Expand file tree
/
Copy pathxarray.py
More file actions
422 lines (357 loc) · 15.4 KB
/
Copy pathxarray.py
File metadata and controls
422 lines (357 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Hazard input using xarray
"""
import copy
import itertools
import logging
from dataclasses import InitVar, dataclass, field
from pathlib import Path
from typing import Any, Callable, Hashable
import numpy as np
import pandas as pd
import sparse as sp
import xarray as xr
from scipy import sparse
import climada.util.constants as u_const
import climada.util.dates_times as u_dt
from climada.hazard.centroids.centr import Centroids
LOGGER = logging.getLogger(__name__)
DEF_COORDS = {"event": "time", "longitude": "longitude", "latitude": "latitude"}
"""Default coordinates when reading Hazard data from an xarray Dataset"""
DEF_DATA_VARS = ["fraction", "frequency", "event_id", "event_name", "date"]
"""Default keys for optional Hazard attributes when reading from an xarray Dataset"""
def _to_csr_matrix(array: xr.DataArray) -> sparse.csr_matrix:
"""Store a numpy array as sparse matrix, optimizing storage space
The CSR matrix stores NaNs explicitly, so we set them to zero.
"""
array = array.where(lambda x: ~np.isnan(x), 0)
array = xr.apply_ufunc(
sp.COO.from_numpy,
array,
dask="parallelized",
output_dtypes=[array.dtype],
)
sparse_coo = array.compute().data # Load into memory
return sparse.csr_matrix(
sparse_coo.tocsr()
) # Convert sparse.COO to scipy.sparse.csr_array
# Define accessors for xarray DataArrays
def _default_accessor(array: xr.DataArray) -> np.ndarray:
"""Take a DataArray and return its numpy representation"""
return array.to_numpy()
def _strict_positive_int_accessor(array: xr.DataArray) -> np.ndarray:
"""Take a positive int DataArray and return its numpy representation
Raises
------
TypeError
If the underlying data type is not integer
ValueError
If any value is zero or less
"""
if not np.issubdtype(array.dtype, np.integer):
raise TypeError(f"'{array.name}' data array must be integers")
if not (array > 0).all():
raise ValueError(f"'{array.name}' data must be larger than zero")
return array.to_numpy()
def _date_to_ordinal_accessor(array: xr.DataArray, strict: bool = True) -> np.ndarray:
"""Take a DataArray and transform it into ordinals"""
try:
if np.issubdtype(array.dtype, np.integer):
# Assume that data is ordinals
return _strict_positive_int_accessor(array)
# Try transforming to ordinals
return np.array(u_dt.datetime64_to_ordinal(array.to_numpy()))
# Handle access errors
except (ValueError, TypeError, AttributeError) as err:
if strict:
raise err
LOGGER.warning(
"Failed to read values of '%s' as dates or ordinals. Hazard.date "
"will be ones only",
array.name,
)
return np.ones(array.shape)
def _year_month_day_accessor(array: xr.DataArray, strict: bool = True) -> np.ndarray:
"""Take an array and return am array of YYYY-MM-DD strings"""
try:
return array.dt.strftime("%Y-%m-%d").to_numpy()
# Handle access errors
except (ValueError, TypeError, AttributeError) as err:
if strict:
raise err
LOGGER.warning(
"Failed to read values of '%s' as dates. Hazard.event_name will be "
"empty strings",
array.name,
)
return np.full(array.shape, "")
def _maybe_repeat(values: np.ndarray, times: int) -> np.ndarray:
"""Return the array or repeat a single-valued array
If ``values`` has size 1, return an array that repeats this value ``times``
times. If the size is different, just return the array.
"""
if values.size == 1:
return np.array(list(itertools.repeat(values.flat[0], times)))
return values
def _load_from_xarray_or_return_default(
data: xr.Dataset,
user_key: str | None,
default_key: str,
hazard_attr: str,
accessor: Callable[[xr.DataArray], Any],
default_value: Any,
) -> Any:
"""Load data for a single Hazard attribute or return the default value
Does the following based on the ``user_key``:
* If the key is an empty string, return the default value
* If the key is a non-empty string, load the data for that key and return it.
* If the key is ``None``, look for the ``default_key`` in the data. If it exists,
return that data. If not, return the default value.
Parameters
----------
user_key : str or None
The key set by the user to identify the DataArray to read data from.
default_key : str
The default key identifying the DataArray to read data from.
hazard_attr : str
The name of the attribute of ``Hazard`` where the data will be stored in.
accessor : Callable
A callable that takes the DataArray as argument and returns the data structure
that is required by the ``Hazard`` attribute.
default_value
The default value/array in case the data could not be found.
Returns
-------
The object that will be stored in the ``Hazard`` attribute with name ``hazard_attr``
Raises
------
KeyError
If ``user_key`` was a non-empty string but no such key was found in the data
RuntimeError
If the data structure loaded has a different shape than the default data
structure
"""
# User does not want to read data
if user_key == "":
LOGGER.debug("Using default values for Hazard.%s per user request", hazard_attr)
return default_value
if not pd.isna(user_key):
# Read key exclusively
LOGGER.debug(
"Reading data for Hazard.%s from DataArray '%s'", hazard_attr, user_key
)
val = accessor(data[user_key])
else:
# Try default key
try:
val = accessor(data[default_key])
LOGGER.debug(
"Reading data for Hazard.%s from DataArray '%s'",
hazard_attr,
default_key,
)
except KeyError:
LOGGER.debug(
"Using default values for Hazard.%s. No data found", hazard_attr
)
return default_value
def vshape(array):
"""Return a shape tuple for any array-like type we use"""
if isinstance(array, list):
return len(array)
if isinstance(array, sparse.csr_matrix):
return array.get_shape()
return array.shape
# Check size for read data
if default_value is not None and not np.array_equal(
vshape(val), vshape(default_value)
):
raise RuntimeError(
f"'{user_key if user_key else default_key}' must have shape "
f"{vshape(default_value)}, but shape is {vshape(val)}"
)
# Return the data
return val
@dataclass(repr=False, eq=False)
class HazardXarrayReader:
"""A helper class for creating a Hazard object from an xarray dataset
Initialize this class, then use :py:meth:`get_hazard_kwargs` to retrieve the kwargs
to be passed to the :py:class:`~climada.hazard.base.Hazard` initializer.
Attributes
----------
data : xr.Dataset
The data to be read as hazard.
intensity : str
The name of the variable containing the hazard intensity information.
Default: ``"intensity"``
coordinate_vars : dict(str, str)
Mapping from default coordinate names to coordinate names in the dataset.
data_vars : dict(str, str)
Mapping from default variable names to variable names in the dataset.
crs : str
Coordinate reference system of the data to be read. Defaults to ``"EPSG:4326"``
(WGS 84).
rechunk : bool
If ``False``, automatically rechunk the data for more efficient reads from disk.
Default: ``False``.
"""
data: xr.Dataset
intensity: str = "intensity"
coordinate_vars: InitVar[dict[str, str] | None] = field(default=None, kw_only=True)
data_vars: dict[str, str] | None = field(default=None, kw_only=True)
crs: str = field(default=u_const.DEF_CRS, kw_only=True)
rechunk: bool = field(default=False, kw_only=True)
coords: dict[str, str] = field(
init=False, default_factory=lambda: copy.deepcopy(DEF_COORDS)
)
data_dims: dict[str, tuple[Hashable, ...]] = field(init=False, default_factory=dict)
hazard_kwargs: dict[str, Any] = field(init=False, default_factory=dict)
def __post_init__(self, coordinate_vars):
"""Update coordinate and dimension names"""
# Update coordinate identifiers
coordinate_vars = coordinate_vars or {}
unknown_coords = [co for co in coordinate_vars if co not in self.coords]
if unknown_coords:
raise ValueError(
f"Unknown coordinates passed: '{unknown_coords}'. Supported "
f"coordinates are {list(self.coords.keys())}."
)
self.coords.update(coordinate_vars)
# Retrieve dimensions of coordinates
try:
self.data_dims = {
"event": self.data[self.coords["event"]].dims,
"longitude": self.data[self.coords["longitude"]].dims,
"latitude": self.data[self.coords["latitude"]].dims,
}
# Handle KeyError for better error message
except KeyError as err:
key = err.args[0]
raise RuntimeError(
f"Dataset is missing dimension/coordinate: {key}. Dataset dimensions: "
f"{list(self.data.dims.keys())}"
) from err
# Check for unexpected keys
self.data_vars = self.data_vars or {}
default_keys = copy.deepcopy(DEF_DATA_VARS)
unknown_keys = [key for key in self.data_vars.keys() if key not in default_keys]
if unknown_keys:
raise ValueError(
f"Unknown data variables passed: '{unknown_keys}'. Supported data "
f"variables are {default_keys}."
)
@classmethod
def from_file(cls, filename: Path | str, *args, open_dataset_kws=None, **kwargs):
"""Open reader from a file"""
open_dataset_kws = open_dataset_kws or {}
open_dataset_kws = {"chunks": "auto"} | open_dataset_kws
with xr.open_dataset(filename, **open_dataset_kws) as dset:
return cls(dset, *args, **kwargs)
def rechunk_data(self, data: xr.Dataset) -> xr.Dataset:
"""Try to rechunk the data to optimize the stack operation afterwards."""
chunks = (
# We want one event to be contained in one chunk
{dim: -1 for dim in self.data_dims["longitude"]}
| {dim: -1 for dim in self.data_dims["latitude"]}
# Automated chunking in the event dimensions (as many as fit)
| {dim: "auto" for dim in self.data_dims["event"]}
)
return data.chunk(chunks=chunks)
def get_hazard_kwargs(self) -> dict[str, Any]:
"""Return kwargs to initialize the hazard"""
# Shallow copy of the data
data = self.data.copy()
# Try promoting single-value coordinates to dimensions
for key, val in self.data_dims.items():
if not val:
coord = self.coords[key]
LOGGER.debug("Promoting Dataset coordinate '%s' to dimension", coord)
data = data.expand_dims(coord)
self.data_dims[key] = data[coord].dims
# Maybe rechunk
if self.rechunk:
data = self.rechunk_data(data)
# Stack (vectorize) the entire dataset into 2D (time, lat/lon)
# NOTE: We want the set union of the dimensions, but Python 'set' does not
# preserve order. However, we want longitude to run faster than latitude.
# So we use 'dict' without values, as 'dict' preserves insertion order
# (dict keys behave like a set).
data = data.stack(
event=self.data_dims["event"],
lat_lon=list(
dict.fromkeys(
self.data_dims["latitude"] + self.data_dims["longitude"]
).keys()
),
)
# Transform coordinates into centroids
centroids = Centroids(
lat=data[self.coords["latitude"]].to_numpy(),
lon=data[self.coords["longitude"]].to_numpy(),
crs=self.crs,
)
# Read the intensity data
LOGGER.debug("Loading Hazard intensity from DataArray '%s'", self.intensity)
intensity_matrix = _to_csr_matrix(data[self.intensity])
# Create a DataFrame storing access information for each of data_vars
# NOTE: Each row will be passed as arguments to
# `load_from_xarray_or_return_default`, see its docstring for further
# explanation of the DataFrame columns / keywords.
num_events = data.sizes["event"]
data_ident = pd.DataFrame(
data={
# The attribute of the Hazard class where the data will be stored
"hazard_attr": DEF_DATA_VARS,
# The identifier and default key used in this method
"default_key": DEF_DATA_VARS,
# The key assigned by the user
"user_key": None,
# The default value for each attribute
"default_value": [
None,
np.ones(num_events),
np.array(range(num_events), dtype=int) + 1,
list(
_year_month_day_accessor(
data[self.coords["event"]], strict=False
).flat
),
_date_to_ordinal_accessor(data[self.coords["event"]], strict=False),
],
# The accessor for the data in the Dataset
"accessor": [
_to_csr_matrix,
lambda x: _maybe_repeat(_default_accessor(x), num_events),
_strict_positive_int_accessor,
lambda x: list(
_maybe_repeat(_default_accessor(x), num_events).flat
),
lambda x: _maybe_repeat(_date_to_ordinal_accessor(x), num_events),
],
}
)
# Update with keys provided by the user
# NOTE: Keys in 'default_keys' missing from 'data_vars' will be set to 'None'
# (which is exactly what we want) and the result is written into
# 'user_key'. 'default_keys' is not modified.
data_ident["user_key"] = data_ident["default_key"].map(self.data_vars)
# Set the Hazard attributes
for _, ident in data_ident.iterrows():
self.hazard_kwargs[ident["hazard_attr"]] = (
_load_from_xarray_or_return_default(data=data, **ident)
)
# Done!
LOGGER.debug("Hazard successfully loaded. Number of events: %i", num_events)
self.hazard_kwargs.update(centroids=centroids, intensity=intensity_matrix)
return self.hazard_kwargs