|
| 1 | +""" |
| 2 | +mppi_controller.py |
| 3 | +
|
| 4 | +Model Predictive Path Integral (MPPI) controller for path tracking. |
| 5 | +Follows the standard MPPI formulation: warm-started control sequence, additive |
| 6 | +noise sampling, stage and terminal costs, information-theoretic weighting, |
| 7 | +and optional moving-average smoothing. Uses (steer, accel) as control input; |
| 8 | +converts to (accel, yaw_rate) for the existing State.motion_model. Visualizes |
| 9 | +optimal and optionally all sampled trajectories. |
| 10 | +""" |
| 11 | + |
| 12 | +import math |
| 13 | +import sys |
| 14 | +from pathlib import Path |
| 15 | +from math import atan2, cos, sin, tan |
| 16 | +import numpy as np |
| 17 | + |
| 18 | +abs_dir_path = str(Path(__file__).absolute().parent) |
| 19 | +relative_path = "/../../../components/" |
| 20 | +sys.path.append(abs_dir_path + relative_path + "state") |
| 21 | +sys.path.append(abs_dir_path + relative_path + "course/cubic_spline_course") |
| 22 | + |
| 23 | +from state import State |
| 24 | + |
| 25 | + |
| 26 | +class _StateView: |
| 27 | + """Minimal state-like object for course methods (x, y, yaw, speed).""" |
| 28 | + |
| 29 | + def __init__(self, x_m, y_m, yaw_rad, speed_mps): |
| 30 | + self.x_m = x_m |
| 31 | + self.y_m = y_m |
| 32 | + self.yaw_rad = yaw_rad |
| 33 | + self.speed_mps = speed_mps |
| 34 | + |
| 35 | + def get_x_m(self): |
| 36 | + return self.x_m |
| 37 | + |
| 38 | + def get_y_m(self): |
| 39 | + return self.y_m |
| 40 | + |
| 41 | + def get_yaw_rad(self): |
| 42 | + return self.yaw_rad |
| 43 | + |
| 44 | + def get_speed_mps(self): |
| 45 | + return self.speed_mps |
| 46 | + |
| 47 | + |
| 48 | +class MppiController: |
| 49 | + """ |
| 50 | + MPPI path-tracking controller aligned with standard formulation: |
| 51 | + warm start (u_prev), exploitation/exploration sampling, stage + terminal cost, |
| 52 | + control cost term (param_gamma * u.T @ inv(Sigma) @ v), and optional smoothing. |
| 53 | + Control input is (steer_rad, accel_mps2); outputs match vehicle interface |
| 54 | + (accel, yaw_rate, steer for draw). |
| 55 | + """ |
| 56 | + |
| 57 | + def __init__( |
| 58 | + self, |
| 59 | + spec, |
| 60 | + course=None, |
| 61 | + color="g", |
| 62 | + delta_t=0.05, |
| 63 | + horizon_step_T=20, |
| 64 | + number_of_samples_K=256, |
| 65 | + param_exploration=0.0, |
| 66 | + param_lambda=50.0, |
| 67 | + param_alpha=1.0, |
| 68 | + sigma_steer=0.1, |
| 69 | + sigma_accel=0.5, |
| 70 | + max_steer_abs=0.523, |
| 71 | + max_accel_abs=2.0, |
| 72 | + stage_cost_weight=None, |
| 73 | + terminal_cost_weight=None, |
| 74 | + moving_average_window=0, |
| 75 | + visualize_optimal_traj=True, |
| 76 | + visualize_sampled_trajs=True, |
| 77 | + ): |
| 78 | + """ |
| 79 | + spec: Vehicle specification (wheel_base_m used for motion model). |
| 80 | + course: Reference path with search_nearest_point_index, point_x_m, point_y_m, |
| 81 | + point_yaw_rad, point_speed_mps. |
| 82 | + color: Color for optimal trajectory. |
| 83 | + delta_t: Time step for rollout [s]. |
| 84 | + horizon_step_T: Prediction horizon (number of steps). |
| 85 | + number_of_samples_K: Number of sample trajectories. |
| 86 | + param_exploration: Fraction of samples that use pure exploration (no u_prev). |
| 87 | + param_lambda: MPPI temperature (lambda). |
| 88 | + param_alpha: MPPI alpha; param_gamma = param_lambda * (1 - param_alpha). |
| 89 | + sigma_steer, sigma_accel: Std for steer and accel noise (used to build Sigma). |
| 90 | + max_steer_abs: Maximum steering angle [rad]. |
| 91 | + max_accel_abs: Maximum acceleration [m/s^2]. |
| 92 | + stage_cost_weight: [x, y, yaw, v] stage cost weights (default [50, 50, 1, 20]). |
| 93 | + terminal_cost_weight: [x, y, yaw, v] terminal cost weights (default same as stage). |
| 94 | + moving_average_window: Window for smoothing w_epsilon (0 = disable). |
| 95 | + visualize_optimal_traj: If True, draw optimal trajectory. |
| 96 | + visualize_sampled_trajs: If True, draw all sampled trajectories. |
| 97 | + """ |
| 98 | + self.WHEEL_BASE_M = spec.wheel_base_m |
| 99 | + self.course = course |
| 100 | + self.DRAW_COLOR = color |
| 101 | + self.delta_t = delta_t |
| 102 | + self.T = horizon_step_T |
| 103 | + self.K = number_of_samples_K |
| 104 | + self.param_exploration = max(0.0, min(1.0, param_exploration)) |
| 105 | + self.param_lambda = max(1e-6, param_lambda) |
| 106 | + self.param_alpha = param_alpha |
| 107 | + self.param_gamma = self.param_lambda * (1.0 - self.param_alpha) |
| 108 | + self.max_steer_abs = max_steer_abs |
| 109 | + self.max_accel_abs = max_accel_abs |
| 110 | + self.moving_average_window = max(0, moving_average_window) |
| 111 | + self.visualize_optimal_traj = visualize_optimal_traj |
| 112 | + self.visualize_sampled_trajs = visualize_sampled_trajs |
| 113 | + |
| 114 | + self.Sigma = np.array([[sigma_steer**2, 0.0], [0.0, sigma_accel**2]]) |
| 115 | + self.stage_cost_weight = np.asarray( |
| 116 | + stage_cost_weight |
| 117 | + if stage_cost_weight is not None |
| 118 | + else [50.0, 50.0, 1.0, 20.0] |
| 119 | + ) |
| 120 | + self.terminal_cost_weight = np.asarray( |
| 121 | + terminal_cost_weight |
| 122 | + if terminal_cost_weight is not None |
| 123 | + else self.stage_cost_weight.copy() |
| 124 | + ) |
| 125 | + |
| 126 | + self.u_prev = np.zeros((self.T, 2)) # (steer, accel) per step |
| 127 | + self.prev_waypoints_idx = 0 |
| 128 | + |
| 129 | + self.target_accel_mps2 = 0.0 |
| 130 | + self.target_speed_mps = 0.0 |
| 131 | + self.target_steer_rad = 0.0 |
| 132 | + self.target_yaw_rate_rps = 0.0 |
| 133 | + self.optimal_trajectory = None # (x_list, y_list) |
| 134 | + self.sampled_trajectories = [] # list of (x_list, y_list) |
| 135 | + self.weights = [] |
| 136 | + |
| 137 | + def _get_nearest_waypoint(self, x, y, update_prev_idx=False): |
| 138 | + """Return (ref_x, ref_y, ref_yaw, ref_v) from course at nearest point to (x, y).""" |
| 139 | + if not self.course: |
| 140 | + return 0.0, 0.0, 0.0, 0.0 |
| 141 | + view = _StateView(x, y, 0.0, 0.0) |
| 142 | + nearest_idx = self.course.search_nearest_point_index(view) |
| 143 | + ref_x = self.course.point_x_m(nearest_idx) |
| 144 | + ref_y = self.course.point_y_m(nearest_idx) |
| 145 | + ref_yaw = self.course.point_yaw_rad(nearest_idx) |
| 146 | + ref_v = self.course.point_speed_mps(nearest_idx) |
| 147 | + if update_prev_idx: |
| 148 | + self.prev_waypoints_idx = nearest_idx |
| 149 | + return ref_x, ref_y, ref_yaw, ref_v |
| 150 | + |
| 151 | + def _g(self, v): |
| 152 | + """Clamp control (steer, accel) to limits.""" |
| 153 | + steer = np.clip(v[0], -self.max_steer_abs, self.max_steer_abs) |
| 154 | + accel = np.clip(v[1], -self.max_accel_abs, self.max_accel_abs) |
| 155 | + return np.array([steer, accel]) |
| 156 | + |
| 157 | + def _F(self, x_t, v_t): |
| 158 | + """Next state from (x,y,yaw,v) and control (steer, accel). Uses State.motion_model with (accel, yaw_rate).""" |
| 159 | + x, y, yaw, v = x_t[0, 0], x_t[1, 0], x_t[2, 0], x_t[3, 0] |
| 160 | + steer, accel = float(v_t[0]), float(v_t[1]) |
| 161 | + if abs(v) < 1e-9: |
| 162 | + yaw_rate = 0.0 |
| 163 | + else: |
| 164 | + yaw_rate = v / self.WHEEL_BASE_M * tan(steer) |
| 165 | + state_vec = np.array([[x], [y], [yaw], [v]]) |
| 166 | + input_vec = np.array([[accel], [yaw_rate]]) |
| 167 | + return State.motion_model(state_vec, input_vec, self.delta_t) |
| 168 | + |
| 169 | + def _c(self, x_t): |
| 170 | + """Stage cost: weighted squared error to reference + control cost term (filled in by caller with u_prev, v).""" |
| 171 | + x, y, yaw, v = ( |
| 172 | + float(x_t[0, 0]), |
| 173 | + float(x_t[1, 0]), |
| 174 | + float(x_t[2, 0]), |
| 175 | + float(x_t[3, 0]), |
| 176 | + ) |
| 177 | + yaw = (yaw + 2.0 * np.pi) % (2.0 * np.pi) |
| 178 | + ref_x, ref_y, ref_yaw, ref_v = self._get_nearest_waypoint(x, y) |
| 179 | + ref_yaw = (ref_yaw + 2.0 * np.pi) % (2.0 * np.pi) |
| 180 | + yaw_diff = np.arctan2(np.sin(yaw - ref_yaw), np.cos(yaw - ref_yaw)) |
| 181 | + cost = ( |
| 182 | + self.stage_cost_weight[0] * (x - ref_x) ** 2 |
| 183 | + + self.stage_cost_weight[1] * (y - ref_y) ** 2 |
| 184 | + + self.stage_cost_weight[2] * (yaw_diff**2) |
| 185 | + + self.stage_cost_weight[3] * (v - ref_v) ** 2 |
| 186 | + ) |
| 187 | + return cost |
| 188 | + |
| 189 | + def _phi(self, x_T): |
| 190 | + """Terminal cost.""" |
| 191 | + x, y, yaw, v = ( |
| 192 | + float(x_T[0, 0]), |
| 193 | + float(x_T[1, 0]), |
| 194 | + float(x_T[2, 0]), |
| 195 | + float(x_T[3, 0]), |
| 196 | + ) |
| 197 | + yaw = (yaw + 2.0 * np.pi) % (2.0 * np.pi) |
| 198 | + ref_x, ref_y, ref_yaw, ref_v = self._get_nearest_waypoint(x, y) |
| 199 | + ref_yaw = (ref_yaw + 2.0 * np.pi) % (2.0 * np.pi) |
| 200 | + yaw_diff = np.arctan2(np.sin(yaw - ref_yaw), np.cos(yaw - ref_yaw)) |
| 201 | + cost = ( |
| 202 | + self.terminal_cost_weight[0] * (x - ref_x) ** 2 |
| 203 | + + self.terminal_cost_weight[1] * (y - ref_y) ** 2 |
| 204 | + + self.terminal_cost_weight[2] * (yaw_diff**2) |
| 205 | + + self.terminal_cost_weight[3] * (v - ref_v) ** 2 |
| 206 | + ) |
| 207 | + return cost |
| 208 | + |
| 209 | + def _calc_epsilon(self): |
| 210 | + """Sample epsilon (K, T, 2) from N(0, Sigma).""" |
| 211 | + mu = np.zeros(2) |
| 212 | + epsilon = np.random.multivariate_normal(mu, self.Sigma, (self.K, self.T)) |
| 213 | + return epsilon |
| 214 | + |
| 215 | + def _compute_weights(self, S): |
| 216 | + """Information-theoretic weights: rho = min(S), w[k] = (1/eta) * exp(-(S[k]-rho)/lambda).""" |
| 217 | + rho = S.min() |
| 218 | + eta = np.sum(np.exp((-1.0 / self.param_lambda) * (S - rho))) |
| 219 | + w = (1.0 / eta) * np.exp((-1.0 / self.param_lambda) * (S - rho)) |
| 220 | + return w |
| 221 | + |
| 222 | + def _moving_average_filter(self, xx, window_size): |
| 223 | + """Smooth each column of xx (T, 2) with moving average. Same logic as reference.""" |
| 224 | + if window_size < 2: |
| 225 | + return xx |
| 226 | + b = np.ones(window_size) / window_size |
| 227 | + xx_mean = np.zeros_like(xx) |
| 228 | + for d in range(xx.shape[1]): |
| 229 | + xx_mean[:, d] = np.convolve(xx[:, d], b, mode="same") |
| 230 | + n_conv = math.ceil(window_size / 2) |
| 231 | + xx_mean[0, d] *= window_size / n_conv |
| 232 | + for i in range(1, n_conv): |
| 233 | + xx_mean[i, d] *= window_size / (i + n_conv) |
| 234 | + xx_mean[-i, d] *= window_size / (i + n_conv - (window_size % 2)) |
| 235 | + return xx_mean |
| 236 | + |
| 237 | + def update(self, state, time_s): |
| 238 | + """ |
| 239 | + Run one MPPI step: warm start, sample, rollout, cost, weight, update u_prev, |
| 240 | + set target from first control. Store optimal and sampled trajectories for draw(). |
| 241 | + """ |
| 242 | + if not self.course: |
| 243 | + self.target_accel_mps2 = 0.0 |
| 244 | + self.target_yaw_rate_rps = 0.0 |
| 245 | + self.target_steer_rad = 0.0 |
| 246 | + self.target_speed_mps = state.get_speed_mps() |
| 247 | + self.optimal_trajectory = None |
| 248 | + self.sampled_trajectories = [] |
| 249 | + self.weights = [] |
| 250 | + return |
| 251 | + |
| 252 | + x0 = np.array( |
| 253 | + [ |
| 254 | + [state.get_x_m()], |
| 255 | + [state.get_y_m()], |
| 256 | + [state.get_yaw_rad()], |
| 257 | + [state.get_speed_mps()], |
| 258 | + ] |
| 259 | + ) |
| 260 | + self._get_nearest_waypoint(x0[0, 0], x0[1, 0], update_prev_idx=True) |
| 261 | + |
| 262 | + u = self.u_prev.copy() |
| 263 | + epsilon = self._calc_epsilon() |
| 264 | + v = np.zeros((self.K, self.T, 2)) |
| 265 | + n_exploit = int((1.0 - self.param_exploration) * self.K) |
| 266 | + for k in range(self.K): |
| 267 | + for t in range(self.T): |
| 268 | + if k < n_exploit: |
| 269 | + v[k, t] = u[t] + epsilon[k, t] |
| 270 | + else: |
| 271 | + v[k, t] = epsilon[k, t] |
| 272 | + v[k, t] = self._g(v[k, t]) |
| 273 | + |
| 274 | + S = np.zeros(self.K) |
| 275 | + Sigma_inv = np.linalg.inv(self.Sigma) |
| 276 | + for k in range(self.K): |
| 277 | + x = x0.copy() |
| 278 | + for t in range(self.T): |
| 279 | + u_t = u[t] |
| 280 | + v_t = v[k, t] |
| 281 | + S[k] += self._c(x) + self.param_gamma * (u_t.T @ Sigma_inv @ v_t) |
| 282 | + x = self._F(x, v_t) |
| 283 | + S[k] += self._phi(x) |
| 284 | + |
| 285 | + w = self._compute_weights(S) |
| 286 | + self.weights = w.tolist() |
| 287 | + |
| 288 | + w_epsilon = np.zeros((self.T, 2)) |
| 289 | + for t in range(self.T): |
| 290 | + for k in range(self.K): |
| 291 | + w_epsilon[t] += w[k] * epsilon[k, t] |
| 292 | + if self.moving_average_window >= 2: |
| 293 | + w_epsilon = self._moving_average_filter( |
| 294 | + w_epsilon, self.moving_average_window |
| 295 | + ) |
| 296 | + u = u + w_epsilon |
| 297 | + u = np.clip( |
| 298 | + u, |
| 299 | + [-self.max_steer_abs, -self.max_accel_abs], |
| 300 | + [self.max_steer_abs, self.max_accel_abs], |
| 301 | + ) |
| 302 | + |
| 303 | + steer0 = float(u[0, 0]) |
| 304 | + accel0 = float(u[0, 1]) |
| 305 | + self.target_steer_rad = steer0 |
| 306 | + self.target_accel_mps2 = accel0 |
| 307 | + v0 = state.get_speed_mps() |
| 308 | + if abs(v0) < 1e-9: |
| 309 | + self.target_yaw_rate_rps = 0.0 |
| 310 | + else: |
| 311 | + self.target_yaw_rate_rps = v0 / self.WHEEL_BASE_M * tan(steer0) |
| 312 | + self.target_speed_mps = v0 |
| 313 | + |
| 314 | + if self.visualize_optimal_traj: |
| 315 | + x = x0.copy() |
| 316 | + x_list = [float(x[0, 0])] |
| 317 | + y_list = [float(x[1, 0])] |
| 318 | + for t in range(self.T): |
| 319 | + x = self._F(x, u[t]) |
| 320 | + x_list.append(float(x[0, 0])) |
| 321 | + y_list.append(float(x[1, 0])) |
| 322 | + self.optimal_trajectory = (x_list, y_list) |
| 323 | + else: |
| 324 | + self.optimal_trajectory = None |
| 325 | + |
| 326 | + self.sampled_trajectories = [] |
| 327 | + if self.visualize_sampled_trajs: |
| 328 | + for k in range(self.K): |
| 329 | + x = x0.copy() |
| 330 | + x_list = [float(x[0, 0])] |
| 331 | + y_list = [float(x[1, 0])] |
| 332 | + for t in range(self.T): |
| 333 | + x = self._F(x, v[k, t]) |
| 334 | + x_list.append(float(x[0, 0])) |
| 335 | + y_list.append(float(x[1, 0])) |
| 336 | + self.sampled_trajectories.append((x_list, y_list)) |
| 337 | + |
| 338 | + self.u_prev[:-1] = u[1:] |
| 339 | + self.u_prev[-1] = u[-1] |
| 340 | + |
| 341 | + def get_target_accel_mps2(self): |
| 342 | + return self.target_accel_mps2 |
| 343 | + |
| 344 | + def get_target_steer_rad(self): |
| 345 | + return self.target_steer_rad |
| 346 | + |
| 347 | + def get_target_yaw_rate_rps(self): |
| 348 | + return self.target_yaw_rate_rps |
| 349 | + |
| 350 | + def draw(self, axes, elems): |
| 351 | + """Draw sampled trajectories (if enabled) and optimal trajectory (if enabled).""" |
| 352 | + if self.visualize_sampled_trajs and self.sampled_trajectories: |
| 353 | + for (x_list, y_list), w in zip(self.sampled_trajectories, self.weights): |
| 354 | + alpha = 0.06 + 0.12 * min(1.0, float(w) * self.K) |
| 355 | + (line,) = axes.plot(x_list, y_list, "b-", linewidth=0.35, alpha=alpha) |
| 356 | + elems.append(line) |
| 357 | + if self.visualize_optimal_traj and self.optimal_trajectory: |
| 358 | + x_list, y_list = self.optimal_trajectory |
| 359 | + (line,) = axes.plot( |
| 360 | + x_list, |
| 361 | + y_list, |
| 362 | + color=self.DRAW_COLOR, |
| 363 | + linewidth=2.0, |
| 364 | + alpha=0.9, |
| 365 | + label="MPPI trajectory", |
| 366 | + ) |
| 367 | + elems.append(line) |
0 commit comments