-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathalgorithms.py
More file actions
238 lines (227 loc) · 9.15 KB
/
algorithms.py
File metadata and controls
238 lines (227 loc) · 9.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import time
import heapq
from collections import deque
# Utility reconstruction
def reconstruct_path(parent, start, goal):
if goal not in parent:
return []
path = [goal]
while path[-1] != start:
path.append(parent[path[-1]])
path.reverse()
return path
# Wrapper to normalize results
def make_result(name, path, visited_order, cost=None, nodes_expanded=None, start_time=None, error=None):
exec_time = None if start_time is None else (time.time() - start_time)
return {
"algorithm": name,
"path": path,
"visited": visited_order,
"path_length": len(path)-1 if path else 0,
"total_cost": cost,
"nodes_expanded": nodes_expanded if nodes_expanded is not None else len(visited_order),
"exec_time_s": exec_time,
"status": "ok" if path and error is None else ("no-path" if error is None else "error"),
"error": error,
}
# Generic goal check supporting intermediate & multi-goal
class GoalStrategy:
def __init__(self, intermediate=None, final_goals=None):
self.intermediate = intermediate if intermediate not in ("None", "", None) else None
self.final_goals = set(final_goals or [])
self.intermediate_reached = False
def is_goal(self, node):
if self.intermediate and not self.intermediate_reached:
if node == self.intermediate:
self.intermediate_reached = True
return False
return node in self.final_goals
def observe(self, node):
if self.intermediate and not self.intermediate_reached and node == self.intermediate:
self.intermediate_reached = True
# BFS (unweighted, shortest in edges)
def bfs(graph, start, goal_strategy: GoalStrategy):
t0 = time.time()
q = deque([start])
parent = {start: None}
visited = []
while q:
node = q.popleft()
visited.append(node)
goal_strategy.observe(node)
if goal_strategy.is_goal(node):
path = reconstruct_path(parent, start, node)
return make_result("BFS", path, visited, start_time=t0)
for nbr in graph.neighbors(node):
if nbr not in parent:
parent[nbr] = node
q.append(nbr)
return make_result("BFS", [], visited, start_time=t0)
# Uniform Cost Search (Dijkstra)
def ucs(graph, start, goal_strategy: GoalStrategy, cost_fn):
t0 = time.time()
frontier = [(0, start)]
parent = {start: None}
best_cost = {start: 0}
visited_order = []
while frontier:
cost, node = heapq.heappop(frontier)
if cost > best_cost.get(node, float("inf")):
continue
visited_order.append(node)
goal_strategy.observe(node)
if goal_strategy.is_goal(node):
path = reconstruct_path(parent, start, node)
return make_result("UCS", path, visited_order, cost, start_time=t0)
for nbr in graph.neighbors(node):
edge_data = graph.get_edge_data(node, nbr, {})
step = cost_fn(edge_data)
new_cost = cost + step
if new_cost < best_cost.get(nbr, float("inf")):
best_cost[nbr] = new_cost
parent[nbr] = node
heapq.heappush(frontier, (new_cost, nbr))
return make_result("UCS", [], visited_order, start_time=t0)
# DFS (recursive stack)
def dfs(graph, start, goal_strategy: GoalStrategy, limit=None):
t0 = time.time()
stack = [(start, None)]
parent = {start: None}
visited_order = []
depth = {start: 0}
while stack:
node, _ = stack.pop()
if node not in visited_order:
visited_order.append(node)
goal_strategy.observe(node)
if goal_strategy.is_goal(node):
path = reconstruct_path(parent, start, node)
return make_result("DFS", path, visited_order, start_time=t0)
if limit is None or depth[node] < limit:
for nbr in reversed(list(graph.neighbors(node))):
if nbr not in parent:
parent[nbr] = node
depth[nbr] = depth[node] + 1
stack.append((nbr, node))
return make_result("DFS", [], visited_order, start_time=t0)
# Depth-Limited Search
def dls(graph, start, goal_strategy: GoalStrategy, limit):
return dfs(graph, start, goal_strategy, limit=limit)
# Iterative Deepening Search
def ids(graph, start, goal_strategy: GoalStrategy, max_limit=20):
t0 = time.time()
aggregate_visited = []
for depth_limit in range(max_limit + 1):
gs = GoalStrategy(goal_strategy.intermediate, goal_strategy.final_goals)
res = dfs(graph, start, gs, limit=depth_limit)
aggregate_visited.extend([n for n in res["visited"] if n not in aggregate_visited])
if res["path"]:
res["visited"] = aggregate_visited
res["algorithm"] = f"IDS(depth={depth_limit})"
res["exec_time_s"] = time.time() - t0
return res
return make_result("IDS", [], aggregate_visited, start_time=t0)
# Bidirectional (without intermediate support)
def bidirectional_search(graph, start, goals):
t0 = time.time()
if not goals:
return make_result("Bidirectional", [], [], start_time=t0, error="No goals provided")
goals = set(goals)
frontier_f = {start}
frontier_b = set(goals)
parent_f = {start: None}
parent_b = {g: None for g in goals}
visited_f = []
visited_b = []
while frontier_f and frontier_b:
new_frontier_f = set()
for node in frontier_f:
visited_f.append(node)
if node in frontier_b:
meet = node
# build path
path_f = reconstruct_path(parent_f, start, meet)
# reverse from meet to a goal
goal = meet if meet in goals else next(g for g in goals if g in parent_b)
# reconstruct backward path
path_b = [meet]
cur = meet
while parent_b[cur] is not None:
cur = parent_b[cur]
path_b.append(cur)
path = path_f + path_b[1:]
return make_result("Bidirectional", path, visited_f + visited_b, start_time=t0)
for nbr in graph.neighbors(node):
if nbr not in parent_f:
parent_f[nbr] = node
new_frontier_f.add(nbr)
frontier_f = new_frontier_f
new_frontier_b = set()
for node in frontier_b:
visited_b.append(node)
if node in frontier_f:
meet = node
path_f = reconstruct_path(parent_f, start, meet)
path_b = [meet]
cur = meet
while parent_b[cur] is not None:
cur = parent_b[cur]
path_b.append(cur)
path = path_f + path_b[1:]
return make_result("Bidirectional", path, visited_f + visited_b, start_time=t0)
for pred in graph.predecessors(node):
if pred not in parent_b:
parent_b[pred] = node
new_frontier_b.add(pred)
frontier_b = new_frontier_b
return make_result("Bidirectional", [], visited_f + visited_b, start_time=t0)
# Greedy Best-First
def greedy_best_first_search(graph, start, goal_strategy: GoalStrategy, heuristic_fn):
t0 = time.time()
frontier = [(heuristic_fn(start), start)]
parent = {start: None}
visited = []
seen = set()
while frontier:
h, node = heapq.heappop(frontier)
if node in seen:
continue
seen.add(node)
visited.append(node)
goal_strategy.observe(node)
if goal_strategy.is_goal(node):
path = reconstruct_path(parent, start, node)
return make_result("Greedy", path, visited, start_time=t0)
for nbr in graph.neighbors(node):
if nbr not in seen:
parent.setdefault(nbr, node)
heapq.heappush(frontier, (heuristic_fn(nbr), nbr))
return make_result("Greedy", [], visited, start_time=t0)
# A* Search
def a_star_search(graph, start, goal_strategy: GoalStrategy, cost_fn, heuristic_fn):
t0 = time.time()
frontier = [(heuristic_fn(start), 0, start)]
parent = {start: None}
g_cost = {start: 0}
visited = []
seen = set()
while frontier:
f, cost_so_far, node = heapq.heappop(frontier)
if node in seen:
continue
seen.add(node)
visited.append(node)
goal_strategy.observe(node)
if goal_strategy.is_goal(node):
path = reconstruct_path(parent, start, node)
return make_result("A*", path, visited, cost_so_far, start_time=t0)
for nbr in graph.neighbors(node):
edge_data = graph.get_edge_data(node, nbr, {})
step = cost_fn(edge_data)
tentative = cost_so_far + step
if tentative < g_cost.get(nbr, float("inf")):
g_cost[nbr] = tentative
parent[nbr] = node
f_score = tentative + heuristic_fn(nbr)
heapq.heappush(frontier, (f_score, tentative, nbr))
return make_result("A*", [], visited, start_time=t0)