|
1 | 1 | import numpy as np |
2 | 2 | from warnings import warn |
3 | | -from scipy.linalg import expm |
| 3 | +from scipy.linalg import expm, sqrtm |
| 4 | + |
| 5 | +try: import cvxpy |
| 6 | +except ImportError: pass |
4 | 7 |
|
5 | 8 |
|
6 | 9 | def kalman_filter(y, _t, xhat0, P0, A, Q, C, R, B=None, u=None, save_P=True): |
@@ -251,3 +254,83 @@ def constant_jerk(x, dt, params=None, options=None, r=None, q=None, forwardbackw |
251 | 254 | raise ValueError("`q` and `r` must be given.") |
252 | 255 |
|
253 | 256 | return rtsdiff(x, dt, 3, q/r, forwardbackward) |
| 257 | + |
| 258 | + |
| 259 | +def robustdiff(x, dt, order, qr_ratio): |
| 260 | + """Perform robust differentiation using L1-norm optimization instead of L2-norm RTS smoothing. |
| 261 | + |
| 262 | + This function solves the L1-normed MAP optimization problem for outlier-resistant differentiation: |
| 263 | + min_{x_n} sum_{n=0}^{N-1} ||R^(-1/2)(y_n - C x_n)||_1 + sum_{n=1}^{N-1} ||Q^(-1/2)(x_n - A x_{n-1})||_1 |
| 264 | + |
| 265 | + The L1 norm provides better robustness to outliers compared to the L2 norm used in standard |
| 266 | + Kalman smoothing. Uses CVXPY for convex optimization. |
| 267 | +
|
| 268 | + :param np.array[float] x: data series to differentiate |
| 269 | + :param float dt: step size |
| 270 | + :param int order: which derivative to stabilize in the constant-derivative model (1=velocity, 2=acceleration, 3=jerk) |
| 271 | + :param float qr_ratio: the process noise level of the divided by the measurement noise level, because the result is |
| 272 | + dependent on the relative rather than absolute size of :math:`q` and :math:`r`. |
| 273 | +
|
| 274 | + :return: tuple[np.array, np.array] of\n |
| 275 | + - **x_hat** -- estimated (smoothed) x |
| 276 | + - **dxdt_hat** -- estimated derivative of x |
| 277 | + """ |
| 278 | + #q = 10**int(np.log10(qr_ratio)) # even-ish split of the powers across 0 |
| 279 | + #r = q/qr_ratio |
| 280 | + q = 1e4 |
| 281 | + r = q/qr_ratio |
| 282 | + |
| 283 | + A_c = np.diag(np.ones(order), 1) # continuous-time A just has 1s on the first diagonal (where 0th is main diagonal) |
| 284 | + Q_c = np.zeros(A_c.shape); Q_c[-1,-1] = q # continuous-time uncertainty around the last derivative |
| 285 | + C = np.zeros((1, order+1)); C[0,0] = 1 # we measure only y = noisy x |
| 286 | + R = np.array([[r]]) # 1 observed state, so this is 1x1 |
| 287 | + |
| 288 | + # convert to discrete time using matrix exponential |
| 289 | + eM = expm(np.block([[A_c, Q_c], [np.zeros(A_c.shape), -A_c.T]]) * dt) |
| 290 | + A_d = eM[:order+1, :order+1] |
| 291 | + Q_d = eM[:order+1, order+1:] @ A_d.T |
| 292 | + if np.linalg.cond(Q_d) > 1e12: Q_d += np.eye(order + 1)*1e-15 # for numerical stability with convex solver. Doesn't change answers appreciably (or at all). |
| 293 | + |
| 294 | + print(f"order = {order}, qr_ratio = {qr_ratio:.3e}, (q,r) = {(q,r)}, cond(Q_d) = {np.linalg.cond(Q_d):.3e}") |
| 295 | + |
| 296 | + # Solve the L1-norm optimization problem |
| 297 | + x_states = convex_smooth(x, A_d, Q_d, R, C) |
| 298 | + return x_states[:, 0], x_states[:, 1] |
| 299 | + |
| 300 | + |
| 301 | +def convex_smooth(y, A, Q, R, C, huberM=1): |
| 302 | + """Solve the L1-norm optimization problem for robust smoothing using CVXPY. |
| 303 | +
|
| 304 | + :param np.array[float] y: measurements |
| 305 | + :param np.array A: discrete-time state transition matrix |
| 306 | + :param np.array Q: discrete-time process noise covariance matrix |
| 307 | + :param np.array R: measurement noise covariance matrix |
| 308 | + :param np.array C: measurement matrix |
| 309 | + :param float huberM: radius where quadratic of Huber loss function turns linear. M=0 reduces to the :math:`\\ell_1` norm. |
| 310 | + :return: np.array -- state estimates (N x state_dim) |
| 311 | + """ |
| 312 | + N = len(y) |
| 313 | + x_states = cvxpy.Variable((N, A.shape[0])) # each row is [position, velocity, acceleration, ...] at step n |
| 314 | + |
| 315 | + R_sqrt_inv = np.linalg.inv(sqrtm(R)) |
| 316 | + Q_sqrt_inv = np.linalg.inv(sqrtm(Q)) |
| 317 | + objective = cvxpy.sum([cvxpy.norm(R_sqrt_inv @ (y[n] - C @ x_states[n]), 1) if huberM == 0 |
| 318 | + else cvxpy.sum(cvxpy.huber(R_sqrt_inv @ (y[n] - C @ x_states[n]), huberM)) for n in range(N)]) # Measurement terms: sum of |R^(-1/2)(y_n - C x_n)|_1 |
| 319 | + objective += cvxpy.sum([cvxpy.norm(Q_sqrt_inv @ (x_states[n] - A @ x_states[n-1]), 1) if huberM == 0 |
| 320 | + else cvxpy.sum(cvxpy.huber(Q_sqrt_inv @ (x_states[n] - A @ x_states[n-1]), huberM)) for n in range(1, N)]) # Process terms: sum of |Q^(-1/2)(x_n - A x_{n-1})|_1 |
| 321 | + |
| 322 | + problem = cvxpy.Problem(cvxpy.Minimize(objective)) |
| 323 | + try: |
| 324 | + problem.solve(solver=cvxpy.CLARABEL) |
| 325 | + except cvxpy.error.SolverError as e1: |
| 326 | + print(f"CLARABEL failed ({e1}). Retrying with SCS.") |
| 327 | + try: |
| 328 | + problem.solve(solver=cvxpy.SCS) # SCS is a lot slower but pretty bulletproof even with big condition numbers |
| 329 | + except cvxpy.error.SolverError as e2: |
| 330 | + print(f"SCS failed ({e2}). Returning NaNs.") |
| 331 | + |
| 332 | + if x_states.value is None: |
| 333 | + print(f"Solver returned None. Status: {problem.status}") |
| 334 | + return np.full((N, A.shape[0]), np.nan) |
| 335 | + |
| 336 | + return x_states.value |
0 commit comments