Skip to content

Commit 5f81c1e

Browse files
authored
Merge pull request #50 from Rajat-Arora/feature/informed_rrt_star
Feature/informed rrt star
2 parents 2dc7c50 + 078ee5f commit 5f81c1e

8 files changed

Lines changed: 507 additions & 0 deletions

File tree

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Python sample codes and documents about Autonomous vehicle control algorithm. Th
2727
* [Hybrid A*](#hybrid-a)
2828
* [Dijkstra](#dijkstra)
2929
* [RRT](#rrt)
30+
* [Informed RRT*](#informed-rrt)
3031
* [Path Tracking](#path-tracking)
3132
* [Pure pursuit Path Tracking](#pure-pursuit-path-tracking)
3233
* [Rear wheel feedback Path Tracking](#rear-wheel-feedback-path-tracking)
@@ -126,6 +127,13 @@ Planning(Reduce frames by sampling every nth node to prevent memory exhaustion)
126127
#### RRT
127128
Planning
128129
![](src/simulations/path_planning/rrt_path_planning/rrt_search.gif)
130+
Navigation
131+
![](src/simulations/path_planning/rrt_path_planning/rrt_navigate.gif)
132+
#### Informed RRT*
133+
Planning
134+
![](src/simulations/path_planning/informed_rrt_star_path_planning/informed_rrt_star_search.gif)
135+
Navigation
136+
![](src/simulations/path_planning/informed_rrt_star_path_planning/informed_rrt_star_navigate.gif)
129137
### Path Tracking
130138
#### Pure pursuit Path Tracking
131139
![](src/simulations/path_tracking/pure_pursuit_path_tracking/pure_pursuit_path_tracking.gif)
Lines changed: 373 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,373 @@
1+
"""
2+
informed_rrt_star_path_planner.py
3+
4+
Author: Rajat Arora
5+
Informed RRT* (Informed Rapidly-exploring Random Tree Star) path planner.
6+
"""
7+
8+
import matplotlib
9+
10+
from matplotlib.patches import Ellipse
11+
import numpy as np
12+
import matplotlib.pyplot as plt
13+
import matplotlib.animation as anm
14+
from matplotlib.animation import PillowWriter
15+
from matplotlib.colors import ListedColormap
16+
from pathlib import Path
17+
import json
18+
import sys
19+
20+
abs_dir_path = str(Path(__file__).absolute().parent)
21+
relative_path = "/../../../components/"
22+
sys.path.append(abs_dir_path + relative_path + "mapping/grid")
23+
24+
class InformedRrtStarPathPlanner:
25+
def __init__(
26+
self,
27+
start,
28+
goal,
29+
map_file,
30+
x_lim=None,
31+
y_lim=None,
32+
path_filename=None,
33+
gif_name=None,
34+
max_iterations=5000,
35+
step_size=0.5,
36+
goal_sample_rate=0.05,
37+
visualize_live=False,
38+
):
39+
40+
if visualize_live:
41+
matplotlib.use("TkAgg")
42+
else:
43+
matplotlib.use("Agg")
44+
45+
np.random.seed(42)
46+
47+
self.start = start
48+
self.goal = goal
49+
self.path_filename = path_filename
50+
self.max_iterations = max_iterations
51+
self.step_size = step_size
52+
self.goal_sample_rate = goal_sample_rate
53+
self.visualize_live = visualize_live
54+
55+
self.explored_nodes = []
56+
self.tree_edges = []
57+
self.path = []
58+
59+
self.grid = self.load_grid_from_file(map_file)
60+
61+
x_min, x_max = x_lim.min_value(), x_lim.max_value()
62+
y_min, y_max = y_lim.min_value(), y_lim.max_value()
63+
64+
self.resolution = (x_max - x_min) / self.grid.shape[1]
65+
self.x_range = np.arange(x_min, x_max, self.resolution)
66+
self.y_range = np.arange(y_min, y_max, self.resolution)
67+
68+
self.x_min, self.x_max = x_min, x_max
69+
self.y_min, self.y_max = y_min, y_max
70+
71+
self.search()
72+
73+
if not self.visualize_live:
74+
self.visualize_search(gif_name, frame_skip=5)
75+
76+
def load_grid_from_file(self, file_path):
77+
with open(file_path, "r") as f:
78+
return np.array(json.load(f))
79+
80+
def _world_to_grid(self, p):
81+
gx = int((p[0] - self.x_range[0]) / self.resolution)
82+
gy = int((p[1] - self.y_range[0]) / self.resolution)
83+
return gx, gy
84+
85+
def is_valid(self, x, y):
86+
return (
87+
0 <= x < self.grid.shape[1]
88+
and 0 <= y < self.grid.shape[0]
89+
and self.grid[y, x] == 0
90+
)
91+
92+
def line_collision_check(self, p1, p2, samples=20):
93+
for i in range(samples + 1):
94+
t = i / samples
95+
x = p1[0] + t * (p2[0] - p1[0])
96+
y = p1[1] + t * (p2[1] - p1[1])
97+
gx, gy = self._world_to_grid((x, y))
98+
if not self.is_valid(gx, gy):
99+
return False
100+
return True
101+
102+
def get_nearest_node(self, nodes, p):
103+
return nodes[int(np.argmin([np.linalg.norm(np.array(n) - p) for n in nodes]))]
104+
105+
def get_near_nodes(self, nodes, new_node, radius):
106+
return [
107+
i for i, n in enumerate(nodes)
108+
if np.linalg.norm(np.array(n) - np.array(new_node)) <= radius
109+
]
110+
111+
def get_rewire_radius(self, n):
112+
return 30.0 * np.sqrt(np.log(n) / n)
113+
114+
def extend(self, nearest, rnd):
115+
d = np.array(rnd) - np.array(nearest)
116+
dist = np.linalg.norm(d)
117+
if dist < 1e-6:
118+
return None
119+
return tuple(np.array(nearest) + d / dist * min(self.step_size, dist))
120+
121+
122+
def sample_informed(self, c_best):
123+
c_min = np.linalg.norm(np.array(self.start) - np.array(self.goal))
124+
if c_best < float('inf'):
125+
126+
a = c_best / 2.0
127+
b = np.sqrt(max(0, c_best**2 - c_min**2)) / 2.0
128+
theta = np.arctan2(self.goal[1] - self.start[1], self.goal[0] - self.start[0])
129+
centre = (np.array(self.start) + np.array(self.goal)) / 2.0
130+
131+
cos_theta, sin_theta = np.cos(theta), np.sin(theta)
132+
C = np.array([[cos_theta, -sin_theta],
133+
[sin_theta, cos_theta]])
134+
135+
r = np.sqrt(np.random.uniform(0, 1))
136+
phi = np.random.uniform(0, 2 * np.pi)
137+
x_ball = np.array([[r * a * np.cos(phi)], [r * b * np.sin(phi)]])
138+
139+
x_rand = C @ x_ball + centre.reshape(2,1)
140+
141+
return (x_rand[0,0], x_rand[1,0])
142+
143+
else:
144+
return (
145+
np.random.uniform(self.x_min, self.x_max),
146+
np.random.uniform(self.y_min, self.y_max),
147+
)
148+
149+
150+
def search(self):
151+
nodes = [self.start]
152+
parent = {0: None}
153+
cost = {0: 0.0}
154+
goal_idx = None
155+
self.history = []
156+
self.cost_history = []
157+
self.path_history = []
158+
159+
if self.visualize_live:
160+
plt.ion()
161+
self.fig, self.ax = plt.subplots(figsize=(10, 8))
162+
self.cmap = ListedColormap([[1, 1, 1], [0, 0, 0]])
163+
164+
for iteration in range(self.max_iterations):
165+
166+
# Sample point with informed sampling
167+
c_best = cost[goal_idx] if goal_idx is not None else float('inf')
168+
rnd = self.sample_informed(c_best)
169+
170+
nearest = self.get_nearest_node(nodes, rnd)
171+
nearest_idx = nodes.index(nearest)
172+
new_node = self.extend(nearest, rnd)
173+
174+
175+
if new_node and self.line_collision_check(nearest, new_node):
176+
177+
radius = self.get_rewire_radius(len(nodes))
178+
near_indices = self.get_near_nodes(nodes, new_node, radius)
179+
180+
if goal_idx != None and nearest_idx == goal_idx:
181+
continue
182+
183+
# Optimization1: Choose best parent among near nodes
184+
min_cost = cost[nearest_idx] + np.linalg.norm(np.array(new_node) - np.array(nearest))
185+
best_parent_idx = nearest_idx
186+
for near_idx in near_indices:
187+
near_node = nodes[near_idx]
188+
new_cost = cost[near_idx] + np.linalg.norm(np.array(new_node) - np.array(near_node))
189+
if new_cost < min_cost:
190+
if self.line_collision_check(near_node, new_node):
191+
min_cost = new_cost
192+
best_parent_idx = near_idx
193+
194+
195+
nodes.append(new_node)
196+
new_idx = len(nodes) - 1
197+
parent[new_idx] = best_parent_idx
198+
cost[new_idx] = min_cost
199+
self.tree_edges.append((nodes[best_parent_idx], new_node))
200+
201+
# Optimization 2: Rewire nearby nodes to find better paths
202+
for near_idx in near_indices:
203+
near_node = nodes[near_idx]
204+
new_cost = cost[new_idx] + np.linalg.norm(np.array(near_node) - np.array(new_node))
205+
if new_cost < cost[near_idx]:
206+
if self.line_collision_check(near_node, new_node):
207+
208+
old_parent_idx = parent[near_idx]
209+
old_edge = (nodes[old_parent_idx], near_node)
210+
211+
if old_edge in self.tree_edges:
212+
self.tree_edges.remove(old_edge)
213+
elif (near_node, nodes[old_parent_idx]) in self.tree_edges:
214+
self.tree_edges.remove((near_node, nodes[old_parent_idx]))
215+
216+
parent[near_idx] = new_idx
217+
cost[near_idx] = new_cost
218+
self.tree_edges.append((new_node, near_node))
219+
220+
# Check if goal is reached
221+
dist_to_goal = np.linalg.norm(np.array(new_node) - np.array(self.goal))
222+
if dist_to_goal <= self.step_size:
223+
if self.line_collision_check(new_node, self.goal):
224+
final_cost = cost[new_idx] + dist_to_goal
225+
226+
if goal_idx is None or final_cost < cost[goal_idx]:
227+
228+
if goal_idx is None:
229+
nodes.append(self.goal)
230+
goal_idx = len(nodes) - 1
231+
else:
232+
old_parent_of_goal = nodes[parent[goal_idx]]
233+
old_goal_edge = (old_parent_of_goal, self.goal)
234+
if old_goal_edge in self.tree_edges:
235+
self.tree_edges.remove(old_goal_edge)
236+
237+
parent[goal_idx] = new_idx
238+
cost[goal_idx] = final_cost
239+
self.tree_edges.append((new_node, self.goal))
240+
241+
# Store snapshots for visualization
242+
self.history.append(list(self.tree_edges))
243+
current_c = cost[goal_idx] if goal_idx is not None else float('inf')
244+
self.cost_history.append(current_c)
245+
246+
if goal_idx is not None:
247+
current_path = self.reconstruct_path(nodes, parent, goal_idx)
248+
self.path_history.append(current_path)
249+
else:
250+
self.path_history.append(None)
251+
252+
if self.visualize_live and iteration % 20 == 0:
253+
self.plot_search_iteration(iteration, self.cost_history[-1], nodes, parent, goal_idx)
254+
255+
# Save the final path
256+
if goal_idx is not None:
257+
final_path = self.reconstruct_path(nodes, parent, goal_idx)
258+
sparse_path = self.make_sparse_path(final_path, n=50)
259+
self.save_path(sparse_path)
260+
print(f"Sparse path saved to {self.path_filename}")
261+
262+
if self.visualize_live:
263+
self.plot_search_iteration(self.max_iterations, cost[goal_idx] if goal_idx is not None else float('inf'), nodes, parent, goal_idx)
264+
plt.ioff()
265+
plt.show()
266+
267+
268+
def plot_search_iteration(self, iteration, c_best, nodes, parent, goal_idx):
269+
self.ax.clear()
270+
271+
self.ax.imshow(
272+
self.grid,
273+
extent=[self.x_range[0], self.x_range[-1], self.y_range[0], self.y_range[-1]],
274+
origin="lower",
275+
cmap=self.cmap,
276+
)
277+
278+
for start_node, end_node in self.tree_edges:
279+
self.ax.plot([start_node[0], end_node[0]], [start_node[1], end_node[1]], "b-", alpha=0.3, lw=0.5)
280+
281+
if c_best < float('inf'):
282+
c_min = np.linalg.norm(np.array(self.goal) - np.array(self.start))
283+
center = (np.array(self.start) + np.array(self.goal)) / 2.0
284+
angle = np.degrees(np.arctan2(self.goal[1]-self.start[1], self.goal[0]-self.start[0]))
285+
286+
width = c_best
287+
height = np.sqrt(max(0.1, c_best**2 - c_min**2))
288+
289+
ell = Ellipse(xy=center, width=width, height=height, angle=angle,
290+
edgecolor='r', fc='None', lw=1.5, ls='--', alpha=0.5)
291+
self.ax.add_patch(ell)
292+
293+
if goal_idx is not None:
294+
path = self.reconstruct_path(nodes, parent, goal_idx)
295+
if path:
296+
path = np.array(path)
297+
self.ax.plot(path[:, 0], path[:, 1], "y-", lw=2, alpha=0.9, label="Best Path")
298+
299+
self.ax.plot(self.start[0], self.start[1], "go", markersize=10)
300+
self.ax.plot(self.goal[0], self.goal[1], "ro", markersize=10)
301+
cost_text = f"{c_best:.2f}" if c_best < float('inf') else "inf"
302+
self.ax.set_title(f"Iteration {iteration} | Cost: {cost_text}")
303+
self.ax.legend()
304+
plt.pause(0.01)
305+
306+
def reconstruct_path(self, nodes, parent, idx):
307+
path = []
308+
while idx is not None:
309+
path.append(nodes[idx])
310+
idx = parent[idx]
311+
return path[::-1]
312+
313+
def make_sparse_path(self, path, n=20):
314+
idxs = np.linspace(0, len(path) - 1, min(n, len(path)), dtype=int)
315+
return [path[i] for i in idxs]
316+
317+
def save_path(self, path):
318+
Path(self.path_filename).parent.mkdir(parents=True, exist_ok=True)
319+
with open(self.path_filename, "w") as f:
320+
json.dump(path, f)
321+
322+
def visualize_search(self, gif_name, frame_skip=3):
323+
if not self.tree_edges:
324+
return
325+
326+
Path(gif_name).parent.mkdir(parents=True, exist_ok=True)
327+
328+
fig, ax = plt.subplots(figsize=(10, 8))
329+
cmap = ListedColormap([[1, 1, 1], [0, 0, 0]])
330+
331+
# Only save every frame_skip frames for faster generation
332+
frames = list(range(0, len(self.tree_edges), frame_skip))
333+
print(f"Creating GIF with {len(frames)} frames (skipping every {frame_skip})...")
334+
335+
def update(frame_idx):
336+
i = frames[frame_idx]
337+
ax.clear()
338+
339+
ax.imshow(
340+
self.grid,
341+
extent=[self.x_range[0], self.x_range[-1], self.y_range[0], self.y_range[-1]],
342+
origin="lower",
343+
cmap=cmap,
344+
)
345+
346+
for e in self.history[i]:
347+
ax.plot([e[0][0], e[1][0]], [e[0][1], e[1][1]], "b-", alpha=0.3, lw=0.5)
348+
349+
c_best = self.cost_history[i]
350+
if c_best < float('inf'):
351+
c_min = np.linalg.norm(np.array(self.goal) - np.array(self.start))
352+
center = (np.array(self.start) + np.array(self.goal)) / 2.0
353+
angle = np.degrees(np.arctan2(self.goal[1]-self.start[1], self.goal[0]-self.start[0]))
354+
355+
width = c_best
356+
height = np.sqrt(max(0.1, c_best**2 - c_min**2))
357+
358+
ell = Ellipse(xy=center, width=width, height=height, angle=angle,
359+
edgecolor='r', fc='None', lw=1.5, ls='--', alpha=0.5)
360+
ax.add_patch(ell)
361+
362+
if self.path_history[i] is not None:
363+
path = np.array(self.path_history[i])
364+
ax.plot(path[:, 0], path[:, 1], "y-", lw=2, alpha=0.9, label="Best Path")
365+
366+
ax.plot(self.start[0], self.start[1], "go", markersize=10)
367+
ax.plot(self.goal[0], self.goal[1], "ro", markersize=10)
368+
ax.set_title(f"Iteration {i} | Cost: {c_best:.2f}")
369+
370+
ani = anm.FuncAnimation(fig, update, frames=len(frames), interval=40)
371+
ani.save(gif_name, writer=PillowWriter(fps=25))
372+
print(f"GIF saved to {gif_name}")
373+
plt.close(fig)
324 KB
Loading

0 commit comments

Comments
 (0)