-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathevaluate.py
More file actions
136 lines (110 loc) · 5.75 KB
/
Copy pathevaluate.py
File metadata and controls
136 lines (110 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Some tools to help evaluate and plot performance, used in optimization and in jupyter notebooks"""
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from pynumdiff.utils import utility
# pylint: disable-msg=too-many-locals, too-many-arguments
def plot(x, dt, x_hat, dxdt_hat, x_truth, dxdt_truth, xlim=None, show_error=True, markersize=5):
"""Make comparison plots of 'x (blue) vs x_truth (black) vs x_hat (red)' and 'dxdt_truth
(black) vs dxdt_hat (red)'
:param np.array[float] x: array of noisy data
:param float dt: a float number representing the step size
:param np.array[float] x_hat: array of smoothed estimation of x
:param np.array[float] dxdt_hat: array of estimated derivative
:param np.array[float] x_truth: array of noise-free time series
:param np.array[float] dxdt_truth: array of true derivative
:param list[int] xlim: a list specifying range of x
:param bool show_error: whether to show the rmse
:param int markersize: marker size of noisy observations
:return: Display two plots
"""
t = np.arange(0, dt*len(x), dt)
if xlim is None:
xlim = [t[0], t[-1]]
fig = plt.figure(figsize=(18, 6))
ax_x = fig.add_subplot(121)
ax_dxdt = fig.add_subplot(122)
ax_x.plot(t, x_truth, '--', color='black', linewidth=3, label=r"true $x$")
ax_x.plot(t, x, '.', color='blue', zorder=-100, markersize=markersize, label=r"noisy data")
ax_x.plot(t, x_hat, color='red', label=r"estimated $\hat{x}$")
ax_x.set_ylabel('Position', fontsize=18)
ax_x.set_xlabel('Time', fontsize=18)
ax_x.set_xlim(xlim[0], xlim[-1])
ax_x.tick_params(axis='x', labelsize=15)
ax_x.tick_params(axis='y', labelsize=15)
ax_x.legend(loc='lower right', fontsize=12)
ax_x.set_rasterization_zorder(0)
ax_dxdt.plot(t, dxdt_truth, '--', color='black', linewidth=3, label=r"true $\frac{dx}{dt}$")
ax_dxdt.plot(t, dxdt_hat, color='red', label=r"est. $\hat{\frac{dx}{dt}}$")
ax_dxdt.set_ylabel('Velocity', fontsize=18)
ax_dxdt.set_xlabel('Time', fontsize=18)
ax_dxdt.set_xlim(xlim[0], xlim[-1])
ax_dxdt.tick_params(axis='x', labelsize=15)
ax_dxdt.tick_params(axis='y', labelsize=15)
ax_dxdt.legend(loc='lower right', fontsize=12)
ax_dxdt.set_rasterization_zorder(0)
fig.tight_layout()
if show_error:
_, _, rms_dxdt = metrics(x, dt, x_hat, dxdt_hat, x_truth, dxdt_truth)
print('RMS error in velocity: ', rms_dxdt)
def metrics(x, dt, x_hat, dxdt_hat, x_truth=None, dxdt_truth=None, padding=0):
"""Evaluate x_hat based on various metrics, depending on whether dxdt_truth and x_truth are known or not.
:param np.array[float] x: data that was differentiated
:param float dt: step size
:param np.array[float] x_hat: estimated (smoothed) x
:param np.array[float] dxdt_hat: estimated xdot
:param np.array[float] x_truth: true value of x, if known
:param np.array[float] dxdt_truth: true value of dxdt, if known, optional
:param int padding: number of snapshots on either side of the array to ignore when calculating
the metric. If :code:`'auto'`, defaults to 2.5% of the size of x
:return: tuple[float, float, float] containing\n
- **rms_rec_x** -- RMS error between the integral of dxdt_hat and x
- **rms_x** -- RMS error between x_hat and x_truth, returns None if x_truth is None
- **rms_dxdt** -- RMS error between dxdt_hat and dxdt_truth, returns None if dxdt_hat is None
"""
if np.isnan(x_hat).any():
return np.nan, np.nan, np.nan
if padding == 'auto':
padding = int(0.025*len(x))
padding = max(padding, 1)
s = slice(padding, len(x)-padding) # slice out data we want to measure
# RMS of dxdt and x_hat
root = np.sqrt(s.stop - s.start)
rms_dxdt = np.linalg.norm(dxdt_hat[s] - dxdt_truth[s]) / root if dxdt_truth is not None else None
rms_x = np.linalg.norm(x_hat[s] - x_truth[s]) / root if x_truth is not None else None
# RMS reconstructed x from integrating dxdt vs given raw x, available even in the absence of ground truth
rec_x = utility.integrate_dxdt_hat(dxdt_hat, dt)
x0 = utility.estimate_integration_constant(x, rec_x)
rec_x = rec_x + x0
rms_rec_x = np.linalg.norm(rec_x[s] - x[s]) / root
return rms_rec_x, rms_x, rms_dxdt
def error_correlation(dxdt_hat, dxdt_truth, padding=0):
"""Calculate the error correlation (pearsons correlation coefficient) between the estimated
dxdt and true dxdt
:param np.array[float] dxdt_hat: estimated xdot
:param np.array[float] dxdt_truth: true value of dxdt, if known, optional
:param int padding: number of snapshots on either side of the array to ignore when calculating
the metric. If :code:`'auto'`, defaults to 2.5% of the size of x
:return: (float) -- r-squared correlation coefficient
"""
if padding == 'auto':
padding = int(0.025*len(dxdt_hat))
padding = max(padding, 1)
s = slice(padding, len(dxdt_hat)-padding) # slice out data we want to measure
errors = (dxdt_hat[s] - dxdt_truth[s])
r = stats.linregress(dxdt_truth[s] - np.mean(dxdt_truth[s]), errors)
return r.rvalue**2
def total_variation(x, padding=0):
"""Calculate the total variation of an array. Used by optimizer.
:param np.array[float] x: data
:param int padding: number of snapshots on either side of the array to ignore when calculating
the metric. If :code:`'auto'`, defaults to 2.5% of the size of x
:return: (float) -- total variation
"""
if np.isnan(x).any():
return np.nan
if padding == 'auto':
padding = int(0.025*len(x))
padding = max(padding, 1)
x = x[padding:len(x)-padding]
return np.linalg.norm(x[1:]-x[:-1], 1)/len(x) # normalized version of what cvxpy.tv does