11# -*- coding: utf-8 -*-
22from __future__ import annotations
3+ from collections .abc import Sequence
34from math import sqrt
5+ from typing import Any
46from typing import Optional
7+ from typing import TypeAlias
8+
9+ FloatVectorLike : TypeAlias = Any
10+ FloatMatrixLike : TypeAlias = Any
511
612class ExponentialMovingAverage :
713 """
@@ -12,7 +18,7 @@ class ExponentialMovingAverage:
1218 Ex: if alpha = 0, use latest value only.
1319 """
1420
15- def __init__ (self , alpha : float ):
21+ def __init__ (self , alpha : float ) -> None :
1622 if alpha < 0 or alpha > 1 :
1723 raise ValueError
1824 self .value : Optional [float ] = None
@@ -51,7 +57,7 @@ def __init__(
5157 ema_alpha : Optional [float ] = None ,
5258 initial_std_value : Optional [float ] = None ,
5359 initial_mean_value : Optional [float ] = None ,
54- ):
60+ ) -> None :
5561 self ._var_value = initial_std_value ** 2 if initial_std_value is not None else None
5662 self .value : Optional [float ] = initial_std_value if initial_std_value is not None else None
5763 self .alpha = alpha
@@ -195,16 +201,19 @@ class CultureGrowthEKF:
195201
196202 def __init__ (
197203 self ,
198- initial_state ,
199- initial_covariance ,
200- process_noise_covariance ,
201- observation_noise_covariance ,
202- angles : list [str ],
204+ initial_state : FloatVectorLike ,
205+ initial_covariance : FloatMatrixLike ,
206+ process_noise_covariance : FloatMatrixLike ,
207+ observation_noise_covariance : FloatMatrixLike ,
208+ angles : Sequence [str ],
203209 outlier_std_threshold : float ,
204210 ) -> None :
205211 import numpy as np
206212
207- initial_state = np .asarray (initial_state )
213+ initial_state = np .asarray (initial_state , dtype = float )
214+ initial_covariance = np .asarray (initial_covariance , dtype = float )
215+ process_noise_covariance = np .asarray (process_noise_covariance , dtype = float )
216+ observation_noise_covariance = np .asarray (observation_noise_covariance , dtype = float )
208217
209218 assert initial_state .shape [0 ] == 2
210219 assert (
@@ -228,10 +237,15 @@ def __init__(
228237 0.975 , 0.80 , initial_std_value = np .sqrt (observation_noise_covariance [0 ][0 ])
229238 )
230239
231- def update (self , obs : list [float ], dt : float , recent_dilution = False ):
240+ def update (
241+ self ,
242+ obs : FloatVectorLike ,
243+ dt : float ,
244+ recent_dilution : bool = False ,
245+ ) -> tuple [FloatVectorLike , FloatMatrixLike ]:
232246 import numpy as np
233247
234- observation = np .asarray (obs )
248+ observation = np .asarray (obs , dtype = float )
235249 assert observation .shape [0 ] == self .n_sensors , (observation , self .n_sensors )
236250
237251 # Predict
@@ -287,7 +301,6 @@ def update(self, obs: list[float], dt: float, recent_dilution=False):
287301
288302 # update gr process covariance if required
289303 nis = (residual_state [0 ] ** 2 ) / residual_covariance [0 , 0 ]
290-
291304 if nis > _NIS_THRESHOLD :
292305 self .process_noise_covariance [1 , 1 ] *= 2
293306 else :
@@ -300,13 +313,14 @@ def update(self, obs: list[float], dt: float, recent_dilution=False):
300313
301314 return self .state_ , self .covariance_
302315
303- def update_observation_noise_cov (self , residual_state ) :
316+ def update_observation_noise_cov (self , residual_state : FloatVectorLike ) -> FloatMatrixLike :
304317 """
305318 Exponentially-weighted measurement noise covariance.
306319 """
307320 import numpy as np
308321
309322 lambda_ = 0.97 # controls the “memory” of the estimator. 0.97 means the filter effectively averages the last ≈ 1 / (1-0.97) ≈ 33 timesteps.
323+ residual_state = np .asarray (residual_state , dtype = float )
310324 rrT = np .outer (residual_state , residual_state )
311325
312326 observation_noise_covariance = lambda_ * self .observation_noise_covariance + (1.0 - lambda_ ) * rrT
@@ -317,7 +331,7 @@ def update_observation_noise_cov(self, residual_state):
317331
318332 return observation_noise_covariance
319333
320- def update_state_from_previous_state (self , state , dt : float ):
334+ def update_state_from_previous_state (self , state : FloatVectorLike , dt : float ) -> FloatVectorLike :
321335 """
322336 Denoted "f" in literature, x_{k} = f(x_{k-1})
323337
@@ -329,10 +343,10 @@ def update_state_from_previous_state(self, state, dt: float):
329343 """
330344 import numpy as np
331345
332- od , rate = state
346+ od , rate = np . asarray ( state , dtype = float )
333347 return np .array ([od * np .exp (rate * dt ), rate ])
334348
335- def _J_update_observations_from_state (self , state_prediction ) :
349+ def _J_update_observations_from_state (self , state_prediction : FloatVectorLike ) -> FloatMatrixLike :
336350 """
337351 Jacobian of observations model, encoded as update_observations_from_state
338352
@@ -362,7 +376,17 @@ def _J_update_observations_from_state(self, state_prediction):
362376 J [i , 0 ] = 1.0 if (angle != "180" ) else - exp (- (od - 1 ))
363377 return J
364378
365- def update_covariance_from_old_covariance (self , state , covariance , dt : float , recent_dilution : bool ):
379+ def update_covariance_from_old_covariance (
380+ self ,
381+ state : FloatVectorLike ,
382+ covariance : FloatMatrixLike ,
383+ dt : float ,
384+ recent_dilution : bool ,
385+ ) -> FloatMatrixLike :
386+ import numpy as np
387+
388+ state = np .asarray (state , dtype = float )
389+ covariance = np .asarray (covariance , dtype = float )
366390 Q = self .process_noise_covariance .copy ().astype (float )
367391
368392 if recent_dilution :
@@ -372,7 +396,7 @@ def update_covariance_from_old_covariance(self, state, covariance, dt: float, re
372396 jacobian = self ._J_update_state_from_previous_state (state , dt )
373397 return jacobian @ covariance @ jacobian .T + Q
374398
375- def update_observations_from_state (self , state_predictions ) :
399+ def update_observations_from_state (self , state_predictions : FloatVectorLike ) -> FloatVectorLike :
376400 """
377401 "h" in the literature, z_k = h(x_k).
378402
@@ -388,7 +412,7 @@ def update_observations_from_state(self, state_predictions):
388412 obs [i ] = od if (angle != "180" ) else np .exp (- (od - 1 ))
389413 return obs
390414
391- def _J_update_state_from_previous_state (self , state , dt : float ):
415+ def _J_update_state_from_previous_state (self , state : FloatVectorLike , dt : float ) -> FloatMatrixLike :
392416 """
393417 The prediction process is (encoded in update_state_from_previous_state)
394418
@@ -410,7 +434,7 @@ def _J_update_state_from_previous_state(self, state, dt: float):
410434
411435 J = np .zeros ((2 , 2 ))
412436
413- od , rate = state
437+ od , rate = np . asarray ( state , dtype = float )
414438 J [0 , 0 ] = np .exp (rate * dt )
415439 J [1 , 1 ] = 1
416440
@@ -419,9 +443,10 @@ def _J_update_state_from_previous_state(self, state, dt: float):
419443 return J
420444
421445 @staticmethod
422- def _is_positive_definite (A ) -> bool :
446+ def _is_positive_definite (A : FloatMatrixLike ) -> bool :
423447 import numpy as np
424448
449+ A = np .asarray (A , dtype = float )
425450 if np .array_equal (A , A .T ):
426451 try :
427452 return True
0 commit comments