-
-
Notifications
You must be signed in to change notification settings - Fork 7.2k
Expand file tree
/
Copy pathConflictBasedSearch.py
More file actions
228 lines (189 loc) · 10.5 KB
/
ConflictBasedSearch.py
File metadata and controls
228 lines (189 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
Conflict Based Search generates paths in 3 dimensions (x, y, time) for a set of agents. It does
so by performing searches on two levels. The top level search applies constraints that agents
must avoid, and the bottom level performs a single-agent search for individual agents given
the constraints provided by the top level search. Initially, paths are generated for each agent
with no constraints. The paths are checked for conflicts with one another. If any are found, the
top level search generates two nodes. These nodes apply a constraint at the point of conflict for
one of the two agents in conflict. This process repeats until a set of paths are found where no
agents are in conflict. The top level search chooses successor nodes based on the sum of the
cost of all paths, which guarantees optimiality of the final solution.
The full algorithm is defined in this paper: https://cdn.aaai.org/ojs/8140/8140-13-11667-1-2-20201228.pdf
"""
from copy import deepcopy
from enum import Enum
import numpy as np
from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import (
Grid,
ObstacleArrangement,
Position,
)
import time
from PathPlanning.TimeBasedPathPlanning.BaseClasses import MultiAgentPlanner, StartAndGoal
from PathPlanning.TimeBasedPathPlanning.Node import NodePath
from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner
from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner
from PathPlanning.TimeBasedPathPlanning.SpaceTimeAStar import SpaceTimeAStar
from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePaths
from PathPlanning.TimeBasedPathPlanning.ConstraintTree import AgentId, AppliedConstraint, ConstrainedAgent, ConstraintTree, ConstraintTreeNode, ForkingConstraint
class ConflictBasedSearch(MultiAgentPlanner):
@staticmethod
def plan(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> dict[AgentId, NodePath]:
"""
Generate a path from the start to the goal for each agent in the `start_and_goals` list.
"""
print(f"Using single-agent planner: {single_agent_planner_class}")
constraint_tree: ConstraintTree = ConflictBasedSearch.initialize_constraint_tree(grid, start_and_goals, single_agent_planner_class, verbose)
attempted_constraint_combos: set = set()
while constraint_tree.nodes_to_expand:
constraint_tree_node = constraint_tree.get_next_node_to_expand()
if constraint_tree_node is None:
raise RuntimeError("No more nodes to expand in the constraint tree.")
ancestor_constraints = constraint_tree.get_ancestor_constraints(constraint_tree_node.parent_idx)
if verbose:
print(f"Expanding node with constraint {constraint_tree_node.constraint} and parent {constraint_tree_node.parent_idx}")
print(f"\tCOST: {constraint_tree_node.cost}")
if not constraint_tree_node.constraint:
# This means we found a solution!
print(f"\nFound a path with {len(constraint_tree_node.all_constraints)} constraints after {constraint_tree.expanded_node_count()} expansions")
print(f"Final cost: {constraint_tree_node.cost}")
final_paths = {}
for start_and_goal in start_and_goals:
final_paths[start_and_goal.agent_id] = constraint_tree_node.paths[start_and_goal.agent_id]
return final_paths
if not isinstance(constraint_tree_node.constraint, ForkingConstraint):
raise ValueError(f"Expected a ForkingConstraint, but got: {constraint_tree_node.constraint}")
for constrained_agent in constraint_tree_node.constraint.constrained_agents:
applied_constraint = AppliedConstraint(constrained_agent.constraint, constrained_agent.agent)
all_constraints = ancestor_constraints
all_constraints.append(applied_constraint)
new_path = ConflictBasedSearch.plan_for_agent(
constrained_agent,
all_constraints,
constraint_tree,
attempted_constraint_combos,
grid,
single_agent_planner_class,
start_and_goals,
verbose
)
if not new_path:
continue
# Deepcopy to update with applied constraint and new paths
applied_constraint_parent = deepcopy(constraint_tree_node)
applied_constraint_parent.paths[constrained_agent.agent] = new_path
applied_constraint_parent.constraint = applied_constraint
parent_idx = constraint_tree.add_expanded_node(applied_constraint_parent)
new_constraint_tree_node = ConstraintTreeNode(applied_constraint_parent.paths, parent_idx, all_constraints)
if verbose:
print(f"Adding new constraint tree node with constraint: {new_constraint_tree_node.constraint}")
constraint_tree.add_node_to_tree(new_constraint_tree_node)
raise RuntimeError("No solution found")
@staticmethod
def get_agents_start_and_goal(start_and_goal_list: list[StartAndGoal], target_index: AgentId) -> StartAndGoal:
"""
Returns the start and goal of a specific agent
"""
for item in start_and_goal_list:
if item.agent_id == target_index:
return item
raise RuntimeError(f"Could not find agent with index {target_index} in {start_and_goal_list}")
@staticmethod
def plan_for_agent(constrained_agent: ConstrainedAgent,
all_constraints: list[AppliedConstraint],
constraint_tree: ConstraintTree,
attempted_constraint_combos: set,
grid: Grid,
single_agent_planner_class: SingleAgentPlanner,
start_and_goals: list[StartAndGoal],
verbose: bool) -> NodePath | None:
"""
Attempt to generate a path plan for a single agent
"""
num_expansions = constraint_tree.expanded_node_count()
if num_expansions % 50 == 0:
print(f"Expanded {num_expansions} nodes so far...")
print(f"\tNumber of constraints on expanded node: {len(all_constraints)}")
# Skip if we have already tried this set of constraints
constraint_hash = hash(frozenset(all_constraints))
if constraint_hash in attempted_constraint_combos:
if verbose:
print(f"\tSkipping already attempted constraint combination: {all_constraints}")
return None
else:
attempted_constraint_combos.add(constraint_hash)
if verbose:
print(f"\tAll constraints: {all_constraints}")
grid.clear_constraint_reservations()
grid.apply_constraint_reservations(all_constraints)
# Just plan for agent with new constraint
start_and_goal: StartAndGoal = ConflictBasedSearch.get_agents_start_and_goal(start_and_goals, constrained_agent.agent)
try:
if verbose:
print("\tplanning for: {}", start_and_goal)
new_path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, start_and_goal.agent_id, verbose)
return new_path
except Exception as e:
# Note: single agent planners can fail if constraints make planning for an agent impossible. The upper level search does not
# consider if a constraint will make it impossible for an agent to plan when adding a new constraint.
if verbose:
print(f"Error: {e}")
return None
@staticmethod
def initialize_constraint_tree(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool) -> ConstraintTree:
"""
Generate an initial solution by planning for each agent with no obstacles
"""
initial_solution: dict[AgentId, NodePath] = {}
# Generate initial solution (no constraints)
for start_and_goal in start_and_goals:
path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, start_and_goal.agent_id, verbose)
initial_solution[start_and_goal.agent_id] = path
if verbose:
print("Initial solution:")
for (agent_idx, path) in initial_solution.items():
print(f"\nAgent {agent_idx} path:\n {path}")
return ConstraintTree(initial_solution)
class Scenario(Enum):
# Five robots all trying to get through a single cell choke point to reach their goals
NARROW_CORRIDOR = 0
# Three robots in a narrow hallway that requires intelligent conflict resolution
HALLWAY_CROSS = 1
scenario = Scenario.HALLWAY_CROSS
verbose = False
show_animation = True
use_sipp = True # Condition here mainly to appease the linter
np.random.seed(42) # For reproducibility
def main():
grid_side_length = 21
# Default: no obstacles
obstacle_arrangement = ObstacleArrangement.NONE
start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(10)]
if scenario == Scenario.NARROW_CORRIDOR:
obstacle_arrangement=ObstacleArrangement.NARROW_CORRIDOR
start_and_goals = [StartAndGoal(i, Position(1, 8+i), Position(19, 19-i)) for i in range(5)]
elif scenario == Scenario.HALLWAY_CROSS:
obstacle_arrangement=ObstacleArrangement.HALLWAY
start_and_goals = [StartAndGoal(0, Position(6, 10), Position(13, 10)),
StartAndGoal(1, Position(11, 10), Position(6, 10)),
StartAndGoal(2, Position(13, 10), Position(7, 10))]
grid = Grid(
np.array([grid_side_length, grid_side_length]),
num_obstacles=250,
obstacle_avoid_points=[pos for item in start_and_goals for pos in (item.start, item.goal)],
obstacle_arrangement=obstacle_arrangement,
)
single_agent_planner = SafeIntervalPathPlanner if use_sipp else SpaceTimeAStar
start_time = time.time()
paths = ConflictBasedSearch.plan(grid, start_and_goals, single_agent_planner, verbose)
runtime = time.time() - start_time
print(f"\nPlanning took: {runtime:.5f} seconds")
if verbose:
print("Paths:")
for path in paths.values():
print(f"{path}\n")
if not show_animation:
return
PlotNodePaths(grid, start_and_goals, paths)
if __name__ == "__main__":
main()