diff --git a/src/components/mapping/potential/potential_field_map.py b/src/components/mapping/potential/potential_field_map.py new file mode 100644 index 00000000..c4b12e13 --- /dev/null +++ b/src/components/mapping/potential/potential_field_map.py @@ -0,0 +1,245 @@ +""" +potential_field_map.py + +Author: Panav Arpit Raaj +""" + +import numpy as np +import sys +from pathlib import Path +import matplotlib.patches as patches +import matplotlib.cm as cm +sys.path.append(str(Path(__file__).absolute().parent) + "/../grid") +from grid_map import GridMap +from grid_map import FloatGrid + + +class PotentialFieldMap: + """ + Potential field map class with persistent obstacle memory. + + The field maintains a memory of all previously observed obstacles, + ensuring stable path planning even when obstacles are outside the + current sensor field of view. + + Uses incremental updates for efficiency - only recalculates the + repulsive field for cells affected by newly observed obstacles. + """ + + def __init__(self, width_m=60.0, height_m=60.0, resolution_m=1.0, + center_x_m=0.0, center_y_m=0.0, zeta=0.001, eta=10.0, rho=4.0, + goal_x_m=0.0, goal_y_m=0.0): + """ + Constructor + zeta: Attractive scaling gain + eta: Repulsive scaling gain + rho: Obstacle Influence distance + goal_x_m: Goal x position of map[m] + goal_y_m: Goal y position of map[m] + """ + + self.map = GridMap(width_m, height_m, resolution_m, + center_x_m, center_y_m) + self.zeta = zeta + self.eta = eta + self.rho = rho + self.goal_x_m = goal_x_m + self.goal_y_m = goal_y_m + self.max_cost = 1000.0 + self.free_cost = 0.0 + + # Persistent obstacle memory: stores grid indices of observed obstacles + # Using a set for O(1) lookup and automatic deduplication + self.obstacle_memory = set() + + # Pre-compute and cache the attractive potential (only depends on goal) + self._attractive_cache = None + self._repulsive_cache = None + self._initialize_caches() + + def _initialize_caches(self): + """Initialize the potential caches.""" + num_grids = self.map.all_grids_num + self._attractive_cache = np.zeros(num_grids) + self._repulsive_cache = np.zeros(num_grids) + + # Pre-compute attractive potential (constant for a given goal) + for vector_idx in range(num_grids): + center_x, center_y = self.map.calculate_grid_center_xy_pos_from_vector_index(vector_idx) + self._attractive_cache[vector_idx] = self._compute_attractive_potential(center_x, center_y) + + def clear_memory(self): + """ + Clear the persistent obstacle memory. + Use this when starting a new navigation session or + when the environment has changed significantly. + """ + self.obstacle_memory.clear() + self._repulsive_cache.fill(0.0) + # Update the map with just attractive potential + self._update_total_potential() + + def _discretize_position(self, x_m, y_m): + """ + Convert continuous position to a discretized grid-aligned position. + This ensures consistent obstacle representation in memory. + + Returns: Tuple of (discretized_x, discretized_y) or None if outside map + """ + # Get grid indices for this position using the correct API + x_idx = self.map.calculate_xy_index_from_position( + x_m, self.map.left_bottom_x_m, self.map.width_grids_num - 1) + y_idx = self.map.calculate_xy_index_from_position( + y_m, self.map.left_bottom_y_m, self.map.height_grids_num - 1) + + # Check if position is within map bounds + if x_idx is None or y_idx is None: + return None + + # Convert back to grid center position for consistent representation + vector_idx = self.map.calculate_vector_index_from_xy_index(x_idx, y_idx) + if vector_idx is None: + return None + + center_x, center_y = self.map.calculate_grid_center_xy_pos_from_vector_index(vector_idx) + return (center_x, center_y) + + def _get_affected_grid_range(self, obs_x, obs_y): + """ + Get the range of grid indices affected by an obstacle at (obs_x, obs_y). + Only grids within the influence radius (rho) need to be updated. + """ + # Calculate index bounds based on influence radius + min_x = obs_x - self.rho + max_x = obs_x + self.rho + min_y = obs_y - self.rho + max_y = obs_y + self.rho + + # Convert to grid indices + min_x_idx = self.map.calculate_xy_index_from_position( + min_x, self.map.left_bottom_x_m, self.map.width_grids_num - 1) + max_x_idx = self.map.calculate_xy_index_from_position( + max_x, self.map.left_bottom_x_m, self.map.width_grids_num - 1) + min_y_idx = self.map.calculate_xy_index_from_position( + min_y, self.map.left_bottom_y_m, self.map.height_grids_num - 1) + max_y_idx = self.map.calculate_xy_index_from_position( + max_y, self.map.left_bottom_y_m, self.map.height_grids_num - 1) + + # Handle out-of-bounds cases + if min_x_idx is None: min_x_idx = 0 + if max_x_idx is None: max_x_idx = self.map.width_grids_num - 1 + if min_y_idx is None: min_y_idx = 0 + if max_y_idx is None: max_y_idx = self.map.height_grids_num - 1 + + return min_x_idx, max_x_idx, min_y_idx, max_y_idx + + def _add_obstacle_repulsion(self, obs_x, obs_y): + """ + Add repulsive potential for a single obstacle to the cache. + Only updates grid cells within the influence radius. + """ + min_x_idx, max_x_idx, min_y_idx, max_y_idx = self._get_affected_grid_range(obs_x, obs_y) + eps = 1e-9 + + for x_idx in range(min_x_idx, max_x_idx + 1): + for y_idx in range(min_y_idx, max_y_idx + 1): + vector_idx = self.map.calculate_vector_index_from_xy_index(x_idx, y_idx) + center_x, center_y = self.map.calculate_grid_center_xy_pos_from_xy_index(x_idx, y_idx) + + d = np.sqrt((center_x - obs_x)**2 + (center_y - obs_y)**2) + if d <= self.rho: + u_rep = 0.5 * self.eta * (1.0/(d + eps) - 1.0/self.rho)**2 + self._repulsive_cache[vector_idx] += u_rep + + def _update_total_potential(self): + """Update the total potential in the grid map from caches.""" + for vector_idx in range(self.map.all_grids_num): + total = min(self._attractive_cache[vector_idx] + self._repulsive_cache[vector_idx], + self.max_cost) + self.map.set_grid_data(vector_idx, FloatGrid(value=total)) + + def update_map(self, points_x_list, points_y_list): + """ + Function to update potential field map with persistent memory. + + New obstacle observations are incrementally added to the cached + repulsive field, ensuring efficient updates even as the obstacle + memory grows. + + points_x_list: List of x coordinates of point cloud + points_y_list: List of y coordinates of point cloud + """ + new_obstacles = [] + + # Accumulate new obstacle observations into persistent memory + for x, y in zip(points_x_list, points_y_list): + discretized = self._discretize_position(x, y) + if discretized is not None and discretized not in self.obstacle_memory: + self.obstacle_memory.add(discretized) + new_obstacles.append(discretized) + + # Only update repulsive field for NEW obstacles (incremental update) + for obs_x, obs_y in new_obstacles: + self._add_obstacle_repulsion(obs_x, obs_y) + + # Update total potential map + self._update_total_potential() + + def _compute_attractive_potential(self, x_m, y_m): + """ + Compute attractive potential at a specific position. + """ + d_squared = (x_m - self.goal_x_m)**2 + (y_m - self.goal_y_m)**2 + return 0.5 * self.zeta * d_squared + + def calculate_attractive_potential(self, x_m, y_m): + """ + Function to calculate attractive potential at a specific position + x_m: x coordinate position[m] + y_m: y coordinate position[m] + Return: Attractive potential at position + """ + return self._compute_attractive_potential(x_m, y_m) + + def calculate_repulsive_potential(self, x_m, y_m, obstacle_positions): + """ + Function to calculate total repulsive potential at a specific position + x_m: x coordinate position[m] + y_m: y coordinate position[m] + obstacle_positions: List of (x, y) tuples for obstacle positions + Return: Total repulsive potential at position + """ + u_rep_total = 0.0 + eps = 1e-9 + for obs_x_m, obs_y_m in obstacle_positions: + d = np.sqrt((x_m - obs_x_m)**2 + (y_m - obs_y_m)**2) + if d <= self.rho: + u_rep_total += 0.5 * self.eta * (1.0/(d + eps) - 1.0/self.rho)**2 + return u_rep_total + + def draw_map(self, axes, elems, colormap='jet'): + """ + Function to draw cost map data with color gradient using pcolormesh + axes: Axes object of figure + elems: List of plot object + colormap: Matplotlib colormap name + """ + + # Create grid coordinates for pcolormesh + x_range = np.arange(self.map.width_grids_num + 1) * self.map.resolution_m + self.map.left_bottom_x_m + y_range = np.arange(self.map.height_grids_num + 1) * self.map.resolution_m + self.map.left_bottom_y_m + X, Y = np.meshgrid(x_range, y_range) + + # Reshape grid data into 2D array (height x width) + Z = np.zeros((self.map.height_grids_num, self.map.width_grids_num)) + + for vector_idx in range(self.map.all_grids_num): + val = self.map.get_grid_data(vector_idx).get_data() + if val > self.free_cost: + x_idx, y_idx = self.map.calculate_xy_index_from_vector_index(vector_idx) + Z[y_idx][x_idx] = val + + # Use pcolormesh for efficient heatmap rendering + # vmin/vmax will auto-scale the colors + pcm = axes.pcolormesh(X, Y, Z, cmap=colormap, alpha=0.5, shading='flat') + elems.append(pcm) diff --git a/src/components/mapping/potential/potential_field_mapper.py b/src/components/mapping/potential/potential_field_mapper.py new file mode 100644 index 00000000..0e4129e5 --- /dev/null +++ b/src/components/mapping/potential/potential_field_mapper.py @@ -0,0 +1,84 @@ +""" +potential_field_mapper.py + +Author: Panav Arpit Raaj +""" + +from potential_field_map import PotentialFieldMap + + +class PotentialFieldMapper: + """ + Potential field map construction class + """ + + def __init__(self, width_m=60.0, height_m=60.0, resolution_m=1.0, + center_x_m=0.0, center_y_m=0.0, sensor_params=None, + zeta=1.0, eta=100.0, rho=5.0, + goal_x_m=0.0, goal_y_m=0.0): + """ + Constructor + width_m: Width size of map[m] + height_m: Height size of map[m] + resolution_m: Size of each cells[m] + center_x_m: Center x position of map[m] + center_y_m: Center y position of map[m] + sensor_params: Parameters object of sensor + zeta: Attractive scaling gain + eta: Repulsive scaling gain + rho: Obstacle influence distance[m] + goal_x_m: Goal x position[m] + goal_y_m: Goal y position[m] + """ + + self.map = PotentialFieldMap(width_m, height_m, resolution_m, + center_x_m, center_y_m, + zeta, eta, rho, + goal_x_m, goal_y_m) + self.params = sensor_params + + def update(self, point_cloud, state): + """ + Function to update potential field map + point_cloud: List of points from LiDAR + state: Vehicle's state to transform into global coordinate + """ + + vehicle_pose = state.x_y_yaw() + + points_x_list, points_y_list = [], [] + for point in point_cloud: + global_x, global_y = point.get_transformed_data( + self.params.INST_LON_M, + self.params.INST_LAT_M, + self.params.INST_YAW_RAD, + vehicle_pose[0, 0], + vehicle_pose[1, 0], + vehicle_pose[2, 0] + ) + points_x_list.append(global_x) + points_y_list.append(global_y) + + self.map.update_map(points_x_list, points_y_list) + + def clear_memory(self): + """ + Clear the persistent obstacle memory. + Useful for resetting the field when starting a new navigation goal. + """ + self.map.clear_memory() + + def draw(self, axes, elems, colormap='jet'): + """ + Function to draw potential field data + axes: Axes object of figure + elems: List of plot object + colormap: Matplotlib colormap name for visualization + """ + + self.map.draw_map(axes, elems, colormap) + + # Draw goal position as a star marker + goal_marker, = axes.plot(self.map.goal_x_m, self.map.goal_y_m, + 'r*', markersize=15, label='Goal') + elems.append(goal_marker) diff --git a/src/simulations/mapping/potential_field_map_construction/potential_field_demo.gif b/src/simulations/mapping/potential_field_map_construction/potential_field_demo.gif new file mode 100644 index 00000000..d5429679 Binary files /dev/null and b/src/simulations/mapping/potential_field_map_construction/potential_field_demo.gif differ diff --git a/src/simulations/mapping/potential_field_map_construction/potential_field_map_construction.py b/src/simulations/mapping/potential_field_map_construction/potential_field_map_construction.py new file mode 100644 index 00000000..0cad7d22 --- /dev/null +++ b/src/simulations/mapping/potential_field_map_construction/potential_field_map_construction.py @@ -0,0 +1,107 @@ +""" +potential_field_map_construction.py +Author: Panav Arpit Raaj +""" + +# import path setting +import sys +import numpy as np +from pathlib import Path + +abs_dir_path = str(Path(__file__).absolute().parent) +relative_path = "/../../../components/" + +sys.path.append(abs_dir_path + relative_path + "visualization") +sys.path.append(abs_dir_path + relative_path + "state") +sys.path.append(abs_dir_path + relative_path + "vehicle") +sys.path.append(abs_dir_path + relative_path + "obstacle") +sys.path.append(abs_dir_path + relative_path + "sensors") +sys.path.append(abs_dir_path + relative_path + "sensors/lidar") +sys.path.append(abs_dir_path + relative_path + "mapping/potential") +sys.path.append(abs_dir_path + relative_path + "course/cubic_spline_course") +sys.path.append(abs_dir_path + relative_path + "control/pure_pursuit") + + +# import component modules +from global_xy_visualizer import GlobalXYVisualizer +from min_max import MinMax +from time_parameters import TimeParameters +from vehicle_specification import VehicleSpecification +from state import State +from four_wheels_vehicle import FourWheelsVehicle +from obstacle import Obstacle +from obstacle_list import ObstacleList +from sensors import Sensors +from sensor_parameters import SensorParameters +from omni_directional_lidar import OmniDirectionalLidar +from potential_field_mapper import PotentialFieldMapper +from cubic_spline_course import CubicSplineCourse +from pure_pursuit_controller import PurePursuitController + + +# flag to show plot figure +# when executed as unit test, this flag is set as false +show_plot = True + + +def main(): + """ + Main process function + """ + + # set simulation parameters + x_lim, y_lim = MinMax(-5, 55), MinMax(-20, 25) + gif_path = abs_dir_path + "/potential_field_demo.gif" + vis = GlobalXYVisualizer(x_lim, y_lim, TimeParameters(span_sec=30), show_zoom=False, gif_name=gif_path) + + # create course data instance + course = CubicSplineCourse([0.0, 10.0, 25, 40, 50], + [0.0, 4, -12, 20, -13], + 20) + vis.add_object(course) + + # create obstacle instances + obst_list = ObstacleList() + obst_list.add_obstacle(Obstacle(State(x_m=10.0, y_m=15.0), length_m=10, width_m=8)) + obst_list.add_obstacle(Obstacle(State(x_m=40.0, y_m=0.0), length_m=2, width_m=10)) + obst_list.add_obstacle(Obstacle(State(x_m=10.0, y_m=-10.0, yaw_rad=np.rad2deg(45)), length_m=5, width_m=5)) + obst_list.add_obstacle(Obstacle(State(x_m=30.0, y_m=15.0, yaw_rad=np.rad2deg(10)), length_m=5, width_m=2)) + obst_list.add_obstacle(Obstacle(State(x_m=50.0, y_m=15.0, yaw_rad=np.rad2deg(15)), length_m=5, width_m=2)) + obst_list.add_obstacle(Obstacle(State(x_m=25.0, y_m=0.0), length_m=2, width_m=2)) + obst_list.add_obstacle(Obstacle(State(x_m=35.0, y_m=-15.0), length_m=7, width_m=2)) + vis.add_object(obst_list) + + # create vehicle instance with potential field mapper + spec = VehicleSpecification() + pure_pursuit = PurePursuitController(spec, course) + sensor_params = SensorParameters(lon_m=spec.wheel_base_m/2, max_m=15, dist_std_rate=0.05) + lidar = OmniDirectionalLidar(obst_list, sensor_params) + + # Create potential field mapper with goal at end of course + # zeta: attractive gain, eta: repulsive gain, rho: obstacle influence distance + mapper = PotentialFieldMapper(sensor_params=sensor_params, + center_x_m=25.0, + center_y_m=5.0, + zeta=0.1, + eta=200.0, + rho=5.0, + goal_x_m=50.0, + goal_y_m=-13.0) + + vehicle = FourWheelsVehicle(State(color=spec.color), spec, + controller=pure_pursuit, + sensors=Sensors(lidar=lidar), + mapper=mapper, + show_zoom=False) + vis.add_object(vehicle) + + # plot figure is not shown when executed as unit test + if not show_plot: vis.not_show_plot() + + # show plot figure + vis.draw() + + +# execute main process +if __name__ == "__main__": + main() diff --git a/test/test_potential_field_map_construction.py b/test/test_potential_field_map_construction.py new file mode 100644 index 00000000..35a4f783 --- /dev/null +++ b/test/test_potential_field_map_construction.py @@ -0,0 +1,18 @@ +""" +Test of Potential field map construction + +Author: Panav Arpit Raaj +""" + +from pathlib import Path +import sys +import pytest + +sys.path.append(str(Path(__file__).absolute().parent) + "/../src/simulations/mapping/potential_field_map_construction") +import potential_field_map_construction + + +def test_simulation(): + potential_field_map_construction.show_plot = False + + potential_field_map_construction.main()