diff --git a/README.md b/README.md index 30216e37..dce5e08a 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ Python sample codes and documents about Autonomous vehicle control algorithm. Th * [Rear wheel feedback Path Tracking](#rear-wheel-feedback-path-tracking) * [LQR(Linear Quadratic Regulator) Path Tracking](#lqrlinear-quadratic-regulator-path-tracking) * [Stanley steering control Path tracking](#stanley-steering-control-path-tracking) + * [MPPI Path Tracking](#mppi-path-tracking) * [Perception](#perception) * [Rectangle fitting Detection](#rectangle-fitting-detection) * [Sensor's Extrinsic Parameters Estimation](#sensors-extrinsic-parameters-estimation) @@ -137,6 +138,8 @@ Navigation ![](src/simulations/path_tracking/lqr_path_tracking/lqr_path_tracking.gif) #### Stanley steering control Path Tracking ![](src/simulations/path_tracking/stanley_path_tracking/stanley_path_tracking.gif) +#### MPPI Path Tracking +![](src/simulations/path_tracking/mppi_path_tracking/mppi_path_tracking.gif) ### Perception #### Rectangle fitting Detection ![](src/simulations/perception/point_cloud_rectangle_fitting/point_cloud_rectangle_fitting.gif) diff --git a/src/components/control/mppi/mppi_controller.py b/src/components/control/mppi/mppi_controller.py new file mode 100644 index 00000000..2d5963a8 --- /dev/null +++ b/src/components/control/mppi/mppi_controller.py @@ -0,0 +1,367 @@ +""" +mppi_controller.py + +Model Predictive Path Integral (MPPI) controller for path tracking. +Follows the standard MPPI formulation: warm-started control sequence, additive +noise sampling, stage and terminal costs, information-theoretic weighting, +and optional moving-average smoothing. Uses (steer, accel) as control input; +converts to (accel, yaw_rate) for the existing State.motion_model. Visualizes +optimal and optionally all sampled trajectories. +""" + +import math +import sys +from pathlib import Path +from math import atan2, cos, sin, tan +import numpy as np + +abs_dir_path = str(Path(__file__).absolute().parent) +relative_path = "/../../../components/" +sys.path.append(abs_dir_path + relative_path + "state") +sys.path.append(abs_dir_path + relative_path + "course/cubic_spline_course") + +from state import State + + +class _StateView: + """Minimal state-like object for course methods (x, y, yaw, speed).""" + + def __init__(self, x_m, y_m, yaw_rad, speed_mps): + self.x_m = x_m + self.y_m = y_m + self.yaw_rad = yaw_rad + self.speed_mps = speed_mps + + def get_x_m(self): + return self.x_m + + def get_y_m(self): + return self.y_m + + def get_yaw_rad(self): + return self.yaw_rad + + def get_speed_mps(self): + return self.speed_mps + + +class MppiController: + """ + MPPI path-tracking controller aligned with standard formulation: + warm start (u_prev), exploitation/exploration sampling, stage + terminal cost, + control cost term (param_gamma * u.T @ inv(Sigma) @ v), and optional smoothing. + Control input is (steer_rad, accel_mps2); outputs match vehicle interface + (accel, yaw_rate, steer for draw). + """ + + def __init__( + self, + spec, + course=None, + color="g", + delta_t=0.05, + horizon_step_T=20, + number_of_samples_K=256, + param_exploration=0.0, + param_lambda=50.0, + param_alpha=1.0, + sigma_steer=0.1, + sigma_accel=0.5, + max_steer_abs=0.523, + max_accel_abs=2.0, + stage_cost_weight=None, + terminal_cost_weight=None, + moving_average_window=0, + visualize_optimal_traj=True, + visualize_sampled_trajs=True, + ): + """ + spec: Vehicle specification (wheel_base_m used for motion model). + course: Reference path with search_nearest_point_index, point_x_m, point_y_m, + point_yaw_rad, point_speed_mps. + color: Color for optimal trajectory. + delta_t: Time step for rollout [s]. + horizon_step_T: Prediction horizon (number of steps). + number_of_samples_K: Number of sample trajectories. + param_exploration: Fraction of samples that use pure exploration (no u_prev). + param_lambda: MPPI temperature (lambda). + param_alpha: MPPI alpha; param_gamma = param_lambda * (1 - param_alpha). + sigma_steer, sigma_accel: Std for steer and accel noise (used to build Sigma). + max_steer_abs: Maximum steering angle [rad]. + max_accel_abs: Maximum acceleration [m/s^2]. + stage_cost_weight: [x, y, yaw, v] stage cost weights (default [50, 50, 1, 20]). + terminal_cost_weight: [x, y, yaw, v] terminal cost weights (default same as stage). + moving_average_window: Window for smoothing w_epsilon (0 = disable). + visualize_optimal_traj: If True, draw optimal trajectory. + visualize_sampled_trajs: If True, draw all sampled trajectories. + """ + self.WHEEL_BASE_M = spec.wheel_base_m + self.course = course + self.DRAW_COLOR = color + self.delta_t = delta_t + self.T = horizon_step_T + self.K = number_of_samples_K + self.param_exploration = max(0.0, min(1.0, param_exploration)) + self.param_lambda = max(1e-6, param_lambda) + self.param_alpha = param_alpha + self.param_gamma = self.param_lambda * (1.0 - self.param_alpha) + self.max_steer_abs = max_steer_abs + self.max_accel_abs = max_accel_abs + self.moving_average_window = max(0, moving_average_window) + self.visualize_optimal_traj = visualize_optimal_traj + self.visualize_sampled_trajs = visualize_sampled_trajs + + self.Sigma = np.array([[sigma_steer**2, 0.0], [0.0, sigma_accel**2]]) + self.stage_cost_weight = np.asarray( + stage_cost_weight + if stage_cost_weight is not None + else [50.0, 50.0, 1.0, 20.0] + ) + self.terminal_cost_weight = np.asarray( + terminal_cost_weight + if terminal_cost_weight is not None + else self.stage_cost_weight.copy() + ) + + self.u_prev = np.zeros((self.T, 2)) # (steer, accel) per step + self.prev_waypoints_idx = 0 + + self.target_accel_mps2 = 0.0 + self.target_speed_mps = 0.0 + self.target_steer_rad = 0.0 + self.target_yaw_rate_rps = 0.0 + self.optimal_trajectory = None # (x_list, y_list) + self.sampled_trajectories = [] # list of (x_list, y_list) + self.weights = [] + + def _get_nearest_waypoint(self, x, y, update_prev_idx=False): + """Return (ref_x, ref_y, ref_yaw, ref_v) from course at nearest point to (x, y).""" + if not self.course: + return 0.0, 0.0, 0.0, 0.0 + view = _StateView(x, y, 0.0, 0.0) + nearest_idx = self.course.search_nearest_point_index(view) + ref_x = self.course.point_x_m(nearest_idx) + ref_y = self.course.point_y_m(nearest_idx) + ref_yaw = self.course.point_yaw_rad(nearest_idx) + ref_v = self.course.point_speed_mps(nearest_idx) + if update_prev_idx: + self.prev_waypoints_idx = nearest_idx + return ref_x, ref_y, ref_yaw, ref_v + + def _g(self, v): + """Clamp control (steer, accel) to limits.""" + steer = np.clip(v[0], -self.max_steer_abs, self.max_steer_abs) + accel = np.clip(v[1], -self.max_accel_abs, self.max_accel_abs) + return np.array([steer, accel]) + + def _F(self, x_t, v_t): + """Next state from (x,y,yaw,v) and control (steer, accel). Uses State.motion_model with (accel, yaw_rate).""" + x, y, yaw, v = x_t[0, 0], x_t[1, 0], x_t[2, 0], x_t[3, 0] + steer, accel = float(v_t[0]), float(v_t[1]) + if abs(v) < 1e-9: + yaw_rate = 0.0 + else: + yaw_rate = v / self.WHEEL_BASE_M * tan(steer) + state_vec = np.array([[x], [y], [yaw], [v]]) + input_vec = np.array([[accel], [yaw_rate]]) + return State.motion_model(state_vec, input_vec, self.delta_t) + + def _c(self, x_t): + """Stage cost: weighted squared error to reference + control cost term (filled in by caller with u_prev, v).""" + x, y, yaw, v = ( + float(x_t[0, 0]), + float(x_t[1, 0]), + float(x_t[2, 0]), + float(x_t[3, 0]), + ) + yaw = (yaw + 2.0 * np.pi) % (2.0 * np.pi) + ref_x, ref_y, ref_yaw, ref_v = self._get_nearest_waypoint(x, y) + ref_yaw = (ref_yaw + 2.0 * np.pi) % (2.0 * np.pi) + yaw_diff = np.arctan2(np.sin(yaw - ref_yaw), np.cos(yaw - ref_yaw)) + cost = ( + self.stage_cost_weight[0] * (x - ref_x) ** 2 + + self.stage_cost_weight[1] * (y - ref_y) ** 2 + + self.stage_cost_weight[2] * (yaw_diff**2) + + self.stage_cost_weight[3] * (v - ref_v) ** 2 + ) + return cost + + def _phi(self, x_T): + """Terminal cost.""" + x, y, yaw, v = ( + float(x_T[0, 0]), + float(x_T[1, 0]), + float(x_T[2, 0]), + float(x_T[3, 0]), + ) + yaw = (yaw + 2.0 * np.pi) % (2.0 * np.pi) + ref_x, ref_y, ref_yaw, ref_v = self._get_nearest_waypoint(x, y) + ref_yaw = (ref_yaw + 2.0 * np.pi) % (2.0 * np.pi) + yaw_diff = np.arctan2(np.sin(yaw - ref_yaw), np.cos(yaw - ref_yaw)) + cost = ( + self.terminal_cost_weight[0] * (x - ref_x) ** 2 + + self.terminal_cost_weight[1] * (y - ref_y) ** 2 + + self.terminal_cost_weight[2] * (yaw_diff**2) + + self.terminal_cost_weight[3] * (v - ref_v) ** 2 + ) + return cost + + def _calc_epsilon(self): + """Sample epsilon (K, T, 2) from N(0, Sigma).""" + mu = np.zeros(2) + epsilon = np.random.multivariate_normal(mu, self.Sigma, (self.K, self.T)) + return epsilon + + def _compute_weights(self, S): + """Information-theoretic weights: rho = min(S), w[k] = (1/eta) * exp(-(S[k]-rho)/lambda).""" + rho = S.min() + eta = np.sum(np.exp((-1.0 / self.param_lambda) * (S - rho))) + w = (1.0 / eta) * np.exp((-1.0 / self.param_lambda) * (S - rho)) + return w + + def _moving_average_filter(self, xx, window_size): + """Smooth each column of xx (T, 2) with moving average. Same logic as reference.""" + if window_size < 2: + return xx + b = np.ones(window_size) / window_size + xx_mean = np.zeros_like(xx) + for d in range(xx.shape[1]): + xx_mean[:, d] = np.convolve(xx[:, d], b, mode="same") + n_conv = math.ceil(window_size / 2) + xx_mean[0, d] *= window_size / n_conv + for i in range(1, n_conv): + xx_mean[i, d] *= window_size / (i + n_conv) + xx_mean[-i, d] *= window_size / (i + n_conv - (window_size % 2)) + return xx_mean + + def update(self, state, time_s): + """ + Run one MPPI step: warm start, sample, rollout, cost, weight, update u_prev, + set target from first control. Store optimal and sampled trajectories for draw(). + """ + if not self.course: + self.target_accel_mps2 = 0.0 + self.target_yaw_rate_rps = 0.0 + self.target_steer_rad = 0.0 + self.target_speed_mps = state.get_speed_mps() + self.optimal_trajectory = None + self.sampled_trajectories = [] + self.weights = [] + return + + x0 = np.array( + [ + [state.get_x_m()], + [state.get_y_m()], + [state.get_yaw_rad()], + [state.get_speed_mps()], + ] + ) + self._get_nearest_waypoint(x0[0, 0], x0[1, 0], update_prev_idx=True) + + u = self.u_prev.copy() + epsilon = self._calc_epsilon() + v = np.zeros((self.K, self.T, 2)) + n_exploit = int((1.0 - self.param_exploration) * self.K) + for k in range(self.K): + for t in range(self.T): + if k < n_exploit: + v[k, t] = u[t] + epsilon[k, t] + else: + v[k, t] = epsilon[k, t] + v[k, t] = self._g(v[k, t]) + + S = np.zeros(self.K) + Sigma_inv = np.linalg.inv(self.Sigma) + for k in range(self.K): + x = x0.copy() + for t in range(self.T): + u_t = u[t] + v_t = v[k, t] + S[k] += self._c(x) + self.param_gamma * (u_t.T @ Sigma_inv @ v_t) + x = self._F(x, v_t) + S[k] += self._phi(x) + + w = self._compute_weights(S) + self.weights = w.tolist() + + w_epsilon = np.zeros((self.T, 2)) + for t in range(self.T): + for k in range(self.K): + w_epsilon[t] += w[k] * epsilon[k, t] + if self.moving_average_window >= 2: + w_epsilon = self._moving_average_filter( + w_epsilon, self.moving_average_window + ) + u = u + w_epsilon + u = np.clip( + u, + [-self.max_steer_abs, -self.max_accel_abs], + [self.max_steer_abs, self.max_accel_abs], + ) + + steer0 = float(u[0, 0]) + accel0 = float(u[0, 1]) + self.target_steer_rad = steer0 + self.target_accel_mps2 = accel0 + v0 = state.get_speed_mps() + if abs(v0) < 1e-9: + self.target_yaw_rate_rps = 0.0 + else: + self.target_yaw_rate_rps = v0 / self.WHEEL_BASE_M * tan(steer0) + self.target_speed_mps = v0 + + if self.visualize_optimal_traj: + x = x0.copy() + x_list = [float(x[0, 0])] + y_list = [float(x[1, 0])] + for t in range(self.T): + x = self._F(x, u[t]) + x_list.append(float(x[0, 0])) + y_list.append(float(x[1, 0])) + self.optimal_trajectory = (x_list, y_list) + else: + self.optimal_trajectory = None + + self.sampled_trajectories = [] + if self.visualize_sampled_trajs: + for k in range(self.K): + x = x0.copy() + x_list = [float(x[0, 0])] + y_list = [float(x[1, 0])] + for t in range(self.T): + x = self._F(x, v[k, t]) + x_list.append(float(x[0, 0])) + y_list.append(float(x[1, 0])) + self.sampled_trajectories.append((x_list, y_list)) + + self.u_prev[:-1] = u[1:] + self.u_prev[-1] = u[-1] + + def get_target_accel_mps2(self): + return self.target_accel_mps2 + + def get_target_steer_rad(self): + return self.target_steer_rad + + def get_target_yaw_rate_rps(self): + return self.target_yaw_rate_rps + + def draw(self, axes, elems): + """Draw sampled trajectories (if enabled) and optimal trajectory (if enabled).""" + if self.visualize_sampled_trajs and self.sampled_trajectories: + for (x_list, y_list), w in zip(self.sampled_trajectories, self.weights): + alpha = 0.06 + 0.12 * min(1.0, float(w) * self.K) + (line,) = axes.plot(x_list, y_list, "b-", linewidth=0.35, alpha=alpha) + elems.append(line) + if self.visualize_optimal_traj and self.optimal_trajectory: + x_list, y_list = self.optimal_trajectory + (line,) = axes.plot( + x_list, + y_list, + color=self.DRAW_COLOR, + linewidth=2.0, + alpha=0.9, + label="MPPI trajectory", + ) + elems.append(line) diff --git a/src/simulations/path_tracking/mppi_path_tracking/mppi_path_tracking.gif b/src/simulations/path_tracking/mppi_path_tracking/mppi_path_tracking.gif new file mode 100644 index 00000000..35b99c1b Binary files /dev/null and b/src/simulations/path_tracking/mppi_path_tracking/mppi_path_tracking.gif differ diff --git a/src/simulations/path_tracking/mppi_path_tracking/mppi_path_tracking.py b/src/simulations/path_tracking/mppi_path_tracking/mppi_path_tracking.py new file mode 100644 index 00000000..3e7d8489 --- /dev/null +++ b/src/simulations/path_tracking/mppi_path_tracking/mppi_path_tracking.py @@ -0,0 +1,75 @@ +""" +mppi_path_tracking.py + +Path tracking simulation using MPPI (Model Predictive Path Integral) controller. +Visualizes all evaluated trajectories and the chosen trajectory. +""" + +import sys +from pathlib import Path + +abs_dir_path = str(Path(__file__).absolute().parent) +relative_path = "/../../../components/" + +sys.path.append(abs_dir_path + relative_path + "visualization") +sys.path.append(abs_dir_path + relative_path + "state") +sys.path.append(abs_dir_path + relative_path + "vehicle") +sys.path.append(abs_dir_path + relative_path + "course/cubic_spline_course") +sys.path.append(abs_dir_path + relative_path + "control/mppi") + +from global_xy_visualizer import GlobalXYVisualizer +from min_max import MinMax +from time_parameters import TimeParameters +from vehicle_specification import VehicleSpecification +from state import State +from four_wheels_vehicle import FourWheelsVehicle +from cubic_spline_course import CubicSplineCourse +from mppi_controller import MppiController + +show_plot = True + + +def main(): + x_lim, y_lim = MinMax(-5, 55), MinMax(-20, 25) + gif_path = str(Path(__file__).absolute().parent / "mppi_path_tracking.gif") + vis = GlobalXYVisualizer( + x_lim, y_lim, TimeParameters(span_sec=25), gif_name=gif_path + ) + + course = CubicSplineCourse([0.0, 10.0, 25, 40, 50], [0.0, 4, -12, 20, -13], 20) + vis.add_object(course) + + spec = VehicleSpecification(area_size=20.0) + state = State(color=spec.color) + + mppi = MppiController( + spec, + course, + color="g", + delta_t=0.1, + horizon_step_T=22, + number_of_samples_K=200, + param_exploration=0.05, + param_lambda=50.0, + param_alpha=0.98, + sigma_steer=0.2, + sigma_accel=0.8, + max_steer_abs=0.523, + max_accel_abs=2.0, + stage_cost_weight=[50.0, 50.0, 1.0, 20.0], + terminal_cost_weight=[50.0, 50.0, 1.0, 20.0], + moving_average_window=5, + visualize_optimal_traj=True, + visualize_sampled_trajs=True, + ) + + vehicle = FourWheelsVehicle(state, spec, controller=mppi) + vis.add_object(vehicle) + + if not show_plot: + vis.not_show_plot() + vis.draw() + + +if __name__ == "__main__": + main() diff --git a/test/test_mppi_path_tracking.py b/test/test_mppi_path_tracking.py new file mode 100644 index 00000000..5b8ca134 --- /dev/null +++ b/test/test_mppi_path_tracking.py @@ -0,0 +1,18 @@ +""" +Test of path tracking simulation by MPPI (Model Predictive Path Integral) algorithm +""" + +from pathlib import Path +import sys +import pytest + +sys.path.append( + str(Path(__file__).absolute().parent) + + "/../src/simulations/path_tracking/mppi_path_tracking" +) +import mppi_path_tracking + + +def test_simulation(): + mppi_path_tracking.show_plot = False + mppi_path_tracking.main()