-
Notifications
You must be signed in to change notification settings - Fork 154
Expand file tree
/
Copy pathinterpolation.py
More file actions
418 lines (361 loc) · 15.3 KB
/
interpolation.py
File metadata and controls
418 lines (361 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define interpolation and extrapolation functions for calculating
(local) exceedance frequencies and return periods
"""
import logging
import numpy as np
from scipy import interpolate
LOGGER = logging.getLogger(__name__)
def preprocess_and_interpolate_ev(
exceedance_frequency,
test_values,
frequency,
values,
log_frequency=False,
log_values=False,
value_threshold=None,
method="interpolate",
y_asymptotic=np.nan,
bin_decimals=None,
reverse=False,
):
"""Function to first preprocess (frequency, values) data (if extrapolating, one can bin the
data according to their value, see Notes), compute the cumulative frequencies, and then
inter- and extrapolate either to test frequencies or to test values.
Parameters
----------
exceedance_frequency : array_like
1-D array of test exceedance frequencies for which values (e.g., intensities or impacts) should be
assigned. If given, test_values must be None.
test_values : array_like
1-D array of test values (e.g., intensities or impacts) for which exceedance frequencies should be
assigned. If given, exceedance_frequency must be None.
frequency : array_like
1-D array of frequencies to be interpolated.
values : array_like
1-D array of values (e.g., intensities or impacts) to be interpolated.
log_frequency : bool, optional
If set to True, frequencies are interpolated on log scale. Defaults to False.
log_values : bool, optional
If set to True, values (e.g., intensities) are interpolated on log scale.
Defaults to False.
value_threshold : float, optional
Lower threshold to filter values (e.g., intensities or impacts). Defaults to None.
method : str, optional
Method to interpolate to test x values. Currently available are
"interpolate", "extrapolate", "extrapolate_constant" and "stepfunction". If set to
"interpolate", test x values outside the range of the given x values will be assigned NaN.
If set to "extrapolate_constant" or "stepfunction", test x values larger than given
x values will be assigned largest given y value, and test x values smaller than the given
x values will be assigned y_asymtotic. If set to "extrapolate", values will be extrapolated
(and interpolated). The extrapolation to test frequencies or test values outside of the
data range extends the two interpolations at the edges of the data to outside of
the data. Defaults to "interpolate".
y_asymptotic : float, optional
Has no effect if method is "interpolate". Else, provides return value and if
for test x values larger than given x values, if size < 2 or if method is set
to "extrapolate_constant" or "stepfunction". Defaults to np.nan.
bin_decimals : int, optional
Number of decimals to group and bin the values. Binning results in smoother (and coarser)
interpolation and more stable extrapolation. For more details and sensible values for
bin_decimals, see Notes. If None, values are not binned. Defaults to None.
reverse : bool, optional
If set to True, values are sorted in reverse order (i.e., larger values means
less impact). Defaults to False.
Returns
-------
np.array
interpolated (and extrapolated) values or frequencies for given test frequencies or test
values, respectively.
Raises
------
ValueError
If both test frequencies and test values are given or none of them.
Notes
-----
If an integer bin_decimals is given, the values are binned according to their
bin_decimals decimals, and their corresponding frequencies are summed. This binning leads to
a smoother (and coarser) interpolation, and a more stable extrapolation. For instance, if
bin_decimals=1, the two values 12.01 and 11.97 with corresponding frequencies 0.1 and 0.2 are
combined to a value 12.0 with frequency 0.3. The default bin_decimals=None results in not
binning the values.
E.g., if your values range from 1 to 100, you could use bin_decimals=1, if your values range
from 1e6 to 1e9, you could use bin_decimals=-5, if your values range from 0.0001 to .01, you
could use bin_decimals=5.
"""
# check method
if method not in [
"interpolate",
"extrapolate",
"extrapolate_constant",
"stepfunction",
]:
raise ValueError(f"Unknown method: {method}")
# check that only test frequencies or only test values are given
if exceedance_frequency is not None and test_values is not None:
raise ValueError(
"Both test frequencies and test values are given. This method only handles one of "
"the two. To use this method, please only use one of them."
)
if exceedance_frequency is None and test_values is None:
raise ValueError("No test values or test frequencies are given.")
# sort values and frequencies
sorted_idxs = np.argsort(values)
values = np.squeeze(values[sorted_idxs])
frequency = frequency[sorted_idxs]
if reverse:
values, frequency = (values[::-1], frequency[::-1])
# group similar values together
if isinstance(bin_decimals, int):
frequency, values = _group_frequency(frequency, values, bin_decimals, reverse)
# transform frequencies to cummulative frequencies
frequency = np.cumsum(frequency[::-1])[::-1]
# if test frequencies are provided
if exceedance_frequency is not None:
if method == "stepfunction":
return _stepfunction_ev(
exceedance_frequency,
frequency[::-1],
values[::-1],
y_threshold=value_threshold,
y_asymptotic=y_asymptotic,
)
extrapolation = None if method == "interpolate" else method
return _interpolate_ev(
exceedance_frequency,
frequency[::-1],
values[::-1],
logx=log_frequency,
logy=log_values,
y_threshold=value_threshold,
extrapolation=extrapolation,
y_asymptotic=y_asymptotic,
)
# if test values are provided
if method == "stepfunction":
return _stepfunction_ev(
test_values,
values,
frequency,
x_threshold=value_threshold,
y_asymptotic=y_asymptotic,
)
extrapolation = None if method == "interpolate" else method
return _interpolate_ev(
test_values,
values,
frequency,
logx=log_values,
logy=log_frequency,
x_threshold=value_threshold,
extrapolation=extrapolation,
)
def _interpolate_ev(
x_test,
x_train,
y_train,
logx=False,
logy=False,
x_threshold=None,
y_threshold=None,
extrapolation=None,
y_asymptotic=np.nan,
):
"""
Util function to interpolate (and extrapolate) training data (x_train, y_train)
to new points x_test with several options (log scale, thresholds)
Parameters
----------
x_test : array_like
1-D array of x-values for which training data should be interpolated
x_train : array_like
1-D array of x-values of training data sorted in ascending order
y_train : array_like
1-D array of y-values of training data
logx : bool, optional
If set to True, x_values are converted to log scale. Defaults to False.
logy : bool, optional
If set to True, y_values are converted to log scale. Defaults to False.
x_threshold : float, optional
Lower threshold to filter x_train. Defaults to None.
y_threshold : float, optional
Lower threshold to filter y_train. Defaults to None.
extrapolation : str, optional
If set to 'extrapolate', values will be extrapolated. If set to 'extrapolate_constant',
x_test values smaller than x_train will be assigned y_train[0] (x_train must be sorted
in ascending order), and x_test values larger than x_train will be assigned
y_asymptotic. If set to None, x_test values outside of the range of x_train will be
assigned np.nan. Defaults to None.
y_asymptotic : float, optional
Has no effect if extrapolation is None. Else, provides return value and if
for x_test values larger than x_train, for x_train.size < 2 or if extrapolation is set
to 'extrapolate_constant'. Defaults to np.nan.
Returns
-------
np.array
interpolated values y_test for the test points x_test
"""
# preprocess interpolation data
x_test, x_train, y_train = _preprocess_interpolation_data(
x_test, x_train, y_train, logx, logy, x_threshold, y_threshold
)
# handle case of small training data sizes
if x_train.size < 2:
if not extrapolation:
return np.full_like(x_test, np.nan)
return _interpolate_small_input(x_test, x_train, y_train, logy, y_asymptotic)
# calculate fill values
if extrapolation == "extrapolate":
fill_value = "extrapolate"
elif extrapolation == "extrapolate_constant":
fill_value = (y_train[0], np.log10(y_asymptotic) if logy else y_asymptotic)
else:
fill_value = np.nan
interpolation = interpolate.interp1d(
x_train, y_train, fill_value=fill_value, bounds_error=False
)
y_test = interpolation(x_test)
# adapt output scale
if logy:
y_test = np.power(10.0, y_test)
return y_test
def _stepfunction_ev(
x_test, x_train, y_train, x_threshold=None, y_threshold=None, y_asymptotic=np.nan
):
"""
Util function to interpolate and extrapolate training data (x_train, y_train)
to new points x_test using a step function
Parameters
----------
x_test : array_like
1-D array of x-values for which training data should be interpolated
x_train : array_like
1-D array of x-values of training data sorted in ascending order
y_train : array_like
1-D array of y-values of training data
x_threshold : float, optional
Lower threshold to filter x_train. Defaults to None.
y_threshold : float, optional
Lower threshold to filter y_train. Defaults to None.
y_asymptotic : float, optional
Return value if x_test > x_train. Defaults to np.nan.
Returns
-------
np.array
interpolated values y_test for the test points x_test
"""
# preprocess interpolation data
x_test, x_train, y_train = _preprocess_interpolation_data(
x_test, x_train, y_train, None, None, x_threshold, y_threshold
)
# handle case of small training data sizes
if x_train.size < 2:
return _interpolate_small_input(x_test, x_train, y_train, None, y_asymptotic)
# find indices of x_test if sorted into x_train
indx = np.searchsorted(x_train, x_test)
y_test = y_train[indx.clip(max=len(x_train) - 1)]
y_test[indx == len(x_train)] = y_asymptotic
return y_test
def _preprocess_interpolation_data(
x_test, x_train, y_train, logx, logy, x_threshold, y_threshold
):
"""
helper function to preprocess interpolation training and test data by filtering data below
thresholds and converting to log scale if required
"""
if x_train.shape != y_train.shape:
raise ValueError(
f"Incompatible shapes of input data, x_train {x_train.shape} "
f"and y_train {y_train.shape}. Should be the same"
)
# transform input to float arrays
x_test, x_train, y_train = (
np.array(x_test).astype(float),
np.array(x_train).astype(float),
np.array(y_train).astype(float),
)
# cut x and y above threshold
if x_threshold or x_threshold == 0:
x_th = np.asarray(x_train > x_threshold).squeeze()
x_train = x_train[x_th]
y_train = y_train[x_th]
if y_threshold or y_threshold == 0:
y_th = np.asarray(y_train > y_threshold).squeeze()
x_train = x_train[y_th]
y_train = y_train[y_th]
# convert to log scale
if logx:
x_train, x_test = np.log10(x_train), np.log10(x_test)
if logy:
y_train = np.log10(y_train)
return (x_test, x_train, y_train)
def _interpolate_small_input(x_test, x_train, y_train, logy, y_asymptotic):
"""
helper function to handle if interpolation data is small (empty or one point)
"""
# return y_asymptotic if x_train and y_train empty
if x_train.size == 0:
return np.full_like(x_test, y_asymptotic)
# reconvert logarithmic y_train to original y_train
if logy:
y_train = np.power(10.0, y_train)
# if only one (x_train, y_train), return stepfunction with
# y_train if x_test < x_train and y_asymtotic if x_test > x_train
y_test = np.full_like(x_test, y_train[0])
y_test[np.squeeze(x_test) > np.squeeze(x_train)] = y_asymptotic
return y_test
def _group_frequency(frequency, value, bin_decimals, reverse=False):
"""
Util function to aggregate (add) frequencies for equal values
Parameters
----------
frequency : array_like
Frequency array
value : array_like
Value array in ascending order
bin_decimals : int
decimals according to which values are binned and their corresponding frequency are
grouped.
reverse: bool, optional
if frequencies should be summed in reverse order. Defaults to False.
Returns
-------
tuple
(frequency array after aggregation,
unique value array in ascending order)
"""
frequency, value = np.array(frequency), np.array(value)
if frequency.size == 0 and value.size == 0:
return ([], [])
if reverse:
frequency, value = (frequency[::-1], value[::-1])
# round values and group them
value = np.around(value, decimals=bin_decimals)
value_unique, start_indices = np.unique(value, return_index=True)
if value_unique.size != frequency.size:
if not all(sorted(start_indices) == start_indices):
LOGGER.warning(
"After grouping values using to their decimals, the value array is not sorted."
"The values are not binned. This might be due to floating point error while "
"binning. Please choose a larger value of bin_decimals=%s.",
bin_decimals,
)
return frequency, value
# add frequency for equal value
start_indices = np.insert(start_indices, value_unique.size, frequency.size)
frequency = np.add.reduceat(frequency, start_indices[:-1])
value = value_unique
if reverse:
frequency, value = (frequency[::-1], value[::-1])
return frequency, value