diff --git a/src/components/localization/kalman_filter/unscented_kalman_filter_localizer.py b/src/components/localization/kalman_filter/unscented_kalman_filter_localizer.py new file mode 100644 index 00000000..c2e821b8 --- /dev/null +++ b/src/components/localization/kalman_filter/unscented_kalman_filter_localizer.py @@ -0,0 +1,281 @@ +""" +unscented_kalman_filter_localizer.py + +Author: Bruno DOKPOMIWA +""" + +import sys +import numpy as np +import scipy.linalg as spl +from pathlib import Path +from math import cos, sin, sqrt, atan2, pi + +sys.path.append(str(Path(__file__).absolute().parent) + "/../../state") +sys.path.append(str(Path(__file__).absolute().parent) + "/../../array") +sys.path.append(str(Path(__file__).absolute().parent) + "/../../sensors/gnss") +from state import State +from xy_array import XYArray + + +class UnscentedKalmanFilterLocalizer: + """ + Self localization by Unscented Kalman Filter class + """ + + def __init__(self, accel_noise=0.2, yaw_rate_noise=10.0, + obsrv_x_noise=1.0, obsrv_y_noise=1.0, + alpha=0.001, beta=2, kappa=0, color='r'): + """ + Constructor + accel_noise: Standard deviation of acceleration noise + yaw_rate_noise: Standard deviation of yaw rate noise[deg] + obsrv_x_noise: Standard deviation of x observation noise + obsrv_y_noise: Standard deviation of y observation noise + alpha: UKF's alpha parameter (controls spread of sigma points) + beta: UKF's beta parameter (incorporates prior knowledge) + kappa: UKF's kappa parameter (secondary scaling parameter) + color: Color of drawing error covariance ellipse + """ + + # state dimension (x, y, yaw, speed) + self.DIM_NUM = 4 + + # UKF parameters + self.ALPHA = alpha + self.BETA = beta + self.KAPPA = kappa + self._decide_sigma_weights() + + # noise covariance matrices + self.INPUT_NOISE_VAR_MAT = np.diag([accel_noise, np.deg2rad(yaw_rate_noise)]) ** 2 + self.OBSRV_NOISE_VAR_MAT = np.diag([obsrv_x_noise, obsrv_y_noise]) ** 2 + + # initialize state and covariance + self.state = np.zeros((self.DIM_NUM, 1)) + self.cov_mat = np.eye(self.DIM_NUM) + + self.DRAW_COLOR = color + + def _decide_sigma_weights(self): + """ + Private function to decide weights for each sigma points + """ + + self.LAMBDA = self.ALPHA**2 * (self.DIM_NUM + self.KAPPA) - self.DIM_NUM + + # for 2n + 1 sigma points + state_weights, cov_weights = [], [] + DIM_LAMBDA = self.DIM_NUM + self.LAMBDA + state_weights.append(self.LAMBDA / DIM_LAMBDA) # i = 0 + cov_weights.append((self.LAMBDA / DIM_LAMBDA) + (1 - self.ALPHA**2 + self.BETA)) # i = 0 + for i in range(2 * self.DIM_NUM): + state_weights.append(1 / (2 * DIM_LAMBDA)) + cov_weights.append(1 / (2 * DIM_LAMBDA)) + self.STATE_WEIGHTS = np.array([state_weights]) + self.COV_WEIGHTS = np.array([cov_weights]) + + # for generating sigma points + self.GAMMA = sqrt(DIM_LAMBDA) + + def _generate_sigma_points(self, state, cov): + """ + Private function to generate sigma points + state: state vector (x, y, yaw, speed) + cov: covariance matrix 4x4 + Return: array of sigma points (4 x 9) + """ + + sigmas = state + cov_sqr = spl.sqrtm(cov) # matrix square root + + # add each dimension's std to state vector + # positive direction + for i in range(self.DIM_NUM): + sigmas = np.hstack((sigmas, state + self.GAMMA * cov_sqr[:, i:i+1])) + # negative direction + for i in range(self.DIM_NUM): + sigmas = np.hstack((sigmas, state - self.GAMMA * cov_sqr[:, i:i+1])) + return sigmas + + def _predict_sigmas_motion(self, sigmas, input, time_s): + """ + Private function to predict motion of sigma points + sigmas: array of sigma points + input: input vector [accel, yaw_rate] + time_s: simulation interval time[sec] + Return: array of motion predicted sigma points + """ + + pred_sigmas = np.zeros_like(sigmas) + for i in range(sigmas.shape[1]): + pred_sigmas[:, i:i+1] = State.motion_model(sigmas[:, i:i+1], input, time_s) + return pred_sigmas + + def _predict_state_covariance(self, pred_state, pred_sigmas): + """ + Private function to predict state covariance + pred_state: predicted state vector + pred_sigmas: array of motion predicted sigma points + Return: predicted state covariance matrix + """ + + diff = pred_sigmas - pred_state + pred_cov = np.zeros((4, 4)) + for i in range(pred_sigmas.shape[1]): + pred_cov = pred_cov + self.COV_WEIGHTS[0, i] * diff[:, i:i+1] @ diff[:, i:i+1].T + return pred_cov + + def _observation_model(self, state): + """ + Private function of observation model + state: state vector (x, y, yaw, speed) + Return: predicted observation data (x, y) + """ + + return np.array([[state[0, 0]], [state[1, 0]]]) + + def _predict_sigmas_observation(self, sigmas): + """ + Private function to predict observation at each sigma point + sigmas: array of sigma points + Return: predicted observation vector at each sigma point (2 x 9) + """ + + sigmas_num = sigmas.shape[1] + obv_sigmas = np.zeros((2, sigmas_num)) + for i in range(sigmas_num): + obv_sigmas[:, i:i+1] = self._observation_model(sigmas[:, i:i+1]) + return obv_sigmas + + def _predict_observation_covariance(self, pred_obv, pred_obv_sigmas, obsrv_noise_cov): + """ + Private function to predict observation covariance + pred_obv: predicted observation vector + pred_obv_sigmas: predicted observation at each sigma point + obsrv_noise_cov: observation noise covariance matrix + Return: predicted observation covariance matrix + """ + + diff = pred_obv_sigmas - pred_obv + pred_obv_cov = obsrv_noise_cov + for i in range(pred_obv_sigmas.shape[1]): + pred_obv_cov = pred_obv_cov + self.COV_WEIGHTS[0, i] * diff[:, i:i+1] @ diff[:, i:i+1].T + return pred_obv_cov + + def _calculate_correlation_matrix(self, pred_state, pred_sigmas, pred_obv, pred_obv_sigmas): + """ + Private function to calculate correlation matrix between state and observation + pred_state: predicted state vector + pred_sigmas: motion predicted sigma points + pred_obv: predicted observation vector + pred_obv_sigmas: predicted observation at each sigma point + Return: correlation matrix between state and observation + """ + + sigmas_num = pred_sigmas.shape[1] + diff_state = pred_sigmas - pred_state + diff_obv = pred_obv_sigmas - pred_obv + corr_mat = np.zeros((diff_state.shape[0], diff_obv.shape[0])) + for i in range(sigmas_num): + corr_mat = corr_mat + self.COV_WEIGHTS[0, i] * diff_state[:, i:i+1] @ diff_obv[:, i:i+1].T + return corr_mat + + def update(self, state, accel_mps2, yaw_rate_rps, time_s, gnss): + """ + Function to update data + state: Last estimated state data + accel_mps2: Acceleration input from controller + yaw_rate_rps: Yaw rate input from controller + time_s: Simulation interval time[sec] + gnss: GNSS observation data (x, y) + Return: Estimated state data + """ + + last_state = np.array([[state.get_x_m()], + [state.get_y_m()], + [state.get_yaw_rad()], + [state.get_speed_mps()]]) + + # input with noise + input_org = np.array([[accel_mps2], + [yaw_rate_rps]]) + input_noise = np.sqrt(self.INPUT_NOISE_VAR_MAT) @ np.random.randn(2, 1) + next_input = input_org + input_noise + + # augment state with input noise for sigma point generation + # create augmented state [x, y, yaw, speed, accel_noise, yaw_rate_noise] + aug_state = np.vstack((last_state, np.zeros((2, 1)))) + aug_cov = np.block([[self.cov_mat, np.zeros((4, 2))], + [np.zeros((2, 4)), self.INPUT_NOISE_VAR_MAT]]) + + # generate sigma points from augmented state + aug_sigmas = self._generate_sigma_points(aug_state, aug_cov) + + # extract state and input noise from augmented sigma points + state_sigmas = aug_sigmas[:4, :] + input_noise_sigmas = aug_sigmas[4:, :] + + # predict motion of sigma points with input noise + pred_sigmas = np.zeros((4, aug_sigmas.shape[1])) + for i in range(aug_sigmas.shape[1]): + input_with_noise = next_input + input_noise_sigmas[:, i:i+1] + pred_sigmas[:, i:i+1] = State.motion_model(state_sigmas[:, i:i+1], input_with_noise, time_s) + + # compute predicted state mean + pred_state = (self.STATE_WEIGHTS @ pred_sigmas.T).T + + # compute predicted state covariance + pred_cov = self._predict_state_covariance(pred_state, pred_sigmas) + + # predict observation - generate new sigma points from predicted state and covariance + obv_state_sigmas = self._generate_sigma_points(pred_state, pred_cov) + pred_obv_sigmas = self._predict_sigmas_observation(obv_state_sigmas) + pred_obv = (self.STATE_WEIGHTS @ pred_obv_sigmas.T).T + + # compute predicted observation covariance + pred_obv_cov = self._predict_observation_covariance(pred_obv, pred_obv_sigmas, self.OBSRV_NOISE_VAR_MAT) + + # compute correlation matrix between state and observation + corr_mat = self._calculate_correlation_matrix(pred_state, obv_state_sigmas, pred_obv, pred_obv_sigmas) + + # kalman gain + k = corr_mat @ np.linalg.inv(pred_obv_cov) + + # update + inov = gnss - pred_obv + est_state = pred_state + k @ inov + est_cov = pred_cov - k @ pred_obv_cov @ k.T + + self.state = est_state + self.cov_mat = est_cov + + return est_state + + def draw(self, axes, elems, pose): + """ + Function to draw error covariance ellipse + axes: Axes objects of figure + elems: List of plot object + pose: Vehicle's pose[x, y, yaw] + """ + + # extract 2x2 covariance for x and y + xy_cov = self.cov_mat[:2, :2] + eig_val, eig_vec = np.linalg.eig(xy_cov) + if eig_val[0] >= eig_val[1]: + big_idx, small_idx = 0, 1 + else: + big_idx, small_idx = 1, 0 + a, b = sqrt(3.0 * eig_val[big_idx]), sqrt(3.0 * eig_val[small_idx]) + angle = atan2(eig_vec[1, big_idx], eig_vec[0, big_idx]) + + t = np.arange(0, 2 * pi + 0.1, 0.1) + xs = [a * cos(it) for it in t] + ys = [b * sin(it) for it in t] + xys = np.array([xs, ys]) + xys_array = XYArray(xys) + + transformed_xys = xys_array.homogeneous_transformation(pose[0, 0], pose[1, 0], angle) + elip_plot, = axes.plot(transformed_xys.get_x_data(), transformed_xys.get_y_data(), color=self.DRAW_COLOR) + elems.append(elip_plot) + diff --git a/src/simulations/localization/ekf_vs_ukf_comparison/ekf_vs_ukf_comparison.gif b/src/simulations/localization/ekf_vs_ukf_comparison/ekf_vs_ukf_comparison.gif new file mode 100644 index 00000000..6738a706 Binary files /dev/null and b/src/simulations/localization/ekf_vs_ukf_comparison/ekf_vs_ukf_comparison.gif differ diff --git a/src/simulations/localization/ekf_vs_ukf_comparison/ekf_vs_ukf_comparison.py b/src/simulations/localization/ekf_vs_ukf_comparison/ekf_vs_ukf_comparison.py new file mode 100644 index 00000000..f281d1f8 --- /dev/null +++ b/src/simulations/localization/ekf_vs_ukf_comparison/ekf_vs_ukf_comparison.py @@ -0,0 +1,93 @@ +""" +ekf_vs_ukf_comparison.py + +Author: Bruno DOKPOMIWA + +This script tries to compare Extended Kalman Filter (EKF) and Unscented Kalman Filter (UKF) +for vehicle localization. +""" + +# import path setting +import sys +from pathlib import Path + +abs_dir_path = str(Path(__file__).absolute().parent) +relative_path = "/../../../components/" + +sys.path.append(abs_dir_path + relative_path + "visualization") +sys.path.append(abs_dir_path + relative_path + "state") +sys.path.append(abs_dir_path + relative_path + "vehicle") +sys.path.append(abs_dir_path + relative_path + "course/cubic_spline_course") +sys.path.append(abs_dir_path + relative_path + "control/pure_pursuit") +sys.path.append(abs_dir_path + relative_path + "sensors") +sys.path.append(abs_dir_path + relative_path + "sensors/gnss") +sys.path.append(abs_dir_path + relative_path + "localization/kalman_filter") + + +# import component modules +from global_xy_visualizer import GlobalXYVisualizer +from min_max import MinMax +from time_parameters import TimeParameters +from vehicle_specification import VehicleSpecification +from state import State +from four_wheels_vehicle import FourWheelsVehicle +from cubic_spline_course import CubicSplineCourse +from pure_pursuit_controller import PurePursuitController +from sensors import Sensors +from gnss import Gnss +from extended_kalman_filter_localizer import ExtendedKalmanFilterLocalizer +from unscented_kalman_filter_localizer import UnscentedKalmanFilterLocalizer + + +# flag to show plot figure +# when executed as unit test, this flag is set as false +show_plot = True + + +def main(): + """ + Main process function + Comparison between EKF and UKF localization + """ + + # set simulation parameters + x_lim, y_lim = MinMax(-5, 55), MinMax(-20, 25) + vis = GlobalXYVisualizer(x_lim, y_lim, TimeParameters(span_sec=30)) + + # create course data instance (trajectory) + course = CubicSplineCourse([0.0, 10.0, 25, 40, 50], + [0.0, 4, -12, 20, -13], + 20) + vis.add_object(course) + + # create vehicle's spec instance + spec = VehicleSpecification(area_size=20.0) + + gnss = Sensors(gnss=Gnss(x_noise_std=1.0, y_noise_std=1.0, color='g')) + + # create EKF vehicle + ekf_state = State(color='b', x_m=0.0, y_m=0.0, yaw_rad=0.0, speed_mps=0.0) + ekf_controller = PurePursuitController(spec, course, color='m') + ekf_localizer = ExtendedKalmanFilterLocalizer(color='r') + ekf_vehicle = FourWheelsVehicle(ekf_state, spec, controller=ekf_controller, + sensors=gnss, localizer=ekf_localizer) + vis.add_object(ekf_vehicle) + + # create UKF vehicle + ukf_state = State(color='c', x_m=0.0, y_m=0.0, yaw_rad=0.0, speed_mps=0.0) + ukf_controller = PurePursuitController(spec, course, color='m') + ukf_localizer = UnscentedKalmanFilterLocalizer(color='orange') + ukf_vehicle = FourWheelsVehicle(ukf_state, spec, controller=ukf_controller, + sensors=gnss, localizer=ukf_localizer) + vis.add_object(ukf_vehicle) + + # plot figure is not shown when executed as unit test + if not show_plot: vis.not_show_plot() + + # show plot figure + vis.draw() + + +# execute main process +if __name__ == "__main__": + main() diff --git a/src/simulations/localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization.gif b/src/simulations/localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization.gif new file mode 100644 index 00000000..acebcb46 Binary files /dev/null and b/src/simulations/localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization.gif differ diff --git a/src/simulations/localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization.py b/src/simulations/localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization.py new file mode 100644 index 00000000..939e4215 --- /dev/null +++ b/src/simulations/localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization.py @@ -0,0 +1,83 @@ +""" +unscented_kalman_filter_localization.py + +Author: Bruno DOKPOMIWA +""" + +# import path setting +import sys +from pathlib import Path + +abs_dir_path = str(Path(__file__).absolute().parent) +relative_path = "/../../../components/" + +sys.path.append(abs_dir_path + relative_path + "visualization") +sys.path.append(abs_dir_path + relative_path + "state") +sys.path.append(abs_dir_path + relative_path + "vehicle") +sys.path.append(abs_dir_path + relative_path + "course/cubic_spline_course") +sys.path.append(abs_dir_path + relative_path + "control/pure_pursuit") +sys.path.append(abs_dir_path + relative_path + "sensors") +sys.path.append(abs_dir_path + relative_path + "sensors/gnss") +sys.path.append(abs_dir_path + relative_path + "localization/kalman_filter") + + +# import component modules +from global_xy_visualizer import GlobalXYVisualizer +from min_max import MinMax +from time_parameters import TimeParameters +from vehicle_specification import VehicleSpecification +from state import State +from four_wheels_vehicle import FourWheelsVehicle +from cubic_spline_course import CubicSplineCourse +from pure_pursuit_controller import PurePursuitController +from sensors import Sensors +from gnss import Gnss +from unscented_kalman_filter_localizer import UnscentedKalmanFilterLocalizer + + +# flag to show plot figure +# when executed as unit test, this flag is set as false +show_plot = True + + +def main(): + """ + Main process function + """ + + # set simulation parameters + x_lim, y_lim = MinMax(-5, 55), MinMax(-20, 25) + vis = GlobalXYVisualizer(x_lim, y_lim, TimeParameters(span_sec=30)) + + # create course data instance + course = CubicSplineCourse([0.0, 10.0, 25, 40, 50], + [0.0, 4, -12, 20, -13], + 20) + vis.add_object(course) + + # create vehicle's spec instance + spec = VehicleSpecification(area_size=20.0) + + # create vehicle's state instance + state = State(color='b') + + # create controller instance + pure_pursuit = PurePursuitController(spec, course, color='m') + + # create vehicle instance + # set state, spec, controller, sensors and localizer instances as arguments + gnss = Sensors(gnss=Gnss(x_noise_std=1.0, y_noise_std=1.0)) + ukf = UnscentedKalmanFilterLocalizer() + vehicle = FourWheelsVehicle(state, spec, controller=pure_pursuit, sensors=gnss, localizer=ukf) + vis.add_object(vehicle) + + # plot figure is not shown when executed as unit test + if not show_plot: vis.not_show_plot() + + # show plot figure + vis.draw() + +# execute main process +if __name__ == "__main__": + main() + diff --git a/test/test_ekf_vs_ukf_comparison.py b/test/test_ekf_vs_ukf_comparison.py new file mode 100644 index 00000000..40d1930f --- /dev/null +++ b/test/test_ekf_vs_ukf_comparison.py @@ -0,0 +1,19 @@ +""" +Test of EKF vs UKF Localization Comparison Simulation + +Author: Bruno DOKPOMIWA +""" + +from pathlib import Path +import sys +import pytest + +sys.path.append(str(Path(__file__).absolute().parent) + "/../src/simulations/localization/ekf_vs_ukf_comparison") +import ekf_vs_ukf_comparison + + +def test_simulation(): + ekf_vs_ukf_comparison.show_plot = False + + ekf_vs_ukf_comparison.main() + diff --git a/test/test_ukf_localization_path_tracking.py b/test/test_ukf_localization_path_tracking.py new file mode 100644 index 00000000..032d2983 --- /dev/null +++ b/test/test_ukf_localization_path_tracking.py @@ -0,0 +1,19 @@ +""" +Test of Localization and Path tracking Simulation by UKF and Pure pursuit + +Author: Bruno DOKPOMIWA +""" + +from pathlib import Path +import sys +import pytest + +sys.path.append(str(Path(__file__).absolute().parent) + "/../src/simulations/localization/unscented_kalman_filter_localization") +import unscented_kalman_filter_localization + + +def test_simulation(): + unscented_kalman_filter_localization.show_plot = False + + unscented_kalman_filter_localization.main() +