77import numpy as np
88from PathPlanning .TimeBasedPathPlanning .GridWithDynamicObstacles import (
99 Grid ,
10- Interval ,
1110 ObstacleArrangement ,
1211 Position ,
1312)
13+ from copy import deepcopy
1414from PathPlanning .TimeBasedPathPlanning .BaseClasses import MultiAgentPlanner , StartAndGoal
1515from PathPlanning .TimeBasedPathPlanning .Node import NodePath
1616from PathPlanning .TimeBasedPathPlanning .BaseClasses import SingleAgentPlanner
1717from PathPlanning .TimeBasedPathPlanning .SafeInterval import SafeIntervalPathPlanner
18+ from PathPlanning .TimeBasedPathPlanning .SpaceTimeAStar import SpaceTimeAStar
1819from PathPlanning .TimeBasedPathPlanning .Plotting import PlotNodePaths
20+ from PathPlanning .TimeBasedPathPlanning .ConstraintTree import AgentId , AppliedConstraint , ConstraintTree , ConstraintTreeNode , ForkingConstraint
1921import time
2022
2123class ConflictBasedSearch (MultiAgentPlanner ):
@@ -29,59 +31,90 @@ def plan(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_c
2931 """
3032 print (f"Using single-agent planner: { single_agent_planner_class } " )
3133
34+ initial_solution : dict [AgentId , NodePath ] = {}
35+
3236 # Reserve initial positions
33- for start_and_goal in start_and_goals :
34- grid .reserve_position (start_and_goal .start , start_and_goal .index , Interval (0 , 10 ))
37+ for agent_idx , start_and_goal in enumerate (start_and_goals ):
38+ # grid.reserve_position(start_and_goal.start, start_and_goal.index, Interval(0, 10))
39+ path = single_agent_planner_class .plan (grid , start_and_goal .start , start_and_goal .goal , agent_idx , verbose )
40+ initial_solution [AgentId (agent_idx )] = path
41+
42+ constraint_tree = ConstraintTree (initial_solution )
43+
44+ while constraint_tree .nodes_to_expand :
45+ constraint_tree_node = constraint_tree .get_next_node_to_expand ()
46+ ancestor_constraints = constraint_tree .get_ancestor_constraints (constraint_tree_node .parent_idx )
47+ print (f"Expanded node: { constraint_tree_node .constraint } with parent: { constraint_tree_node .parent_idx } " )
48+ print (f"\t Ancestor constraints: { ancestor_constraints } " )
49+
50+ if verbose :
51+ print (f"Expanding node with constraint { constraint_tree_node .constraint } and parent { constraint_tree_node .parent_idx } " )
52+
53+ if constraint_tree_node is None :
54+ raise RuntimeError ("No more nodes to expand in the constraint tree." )
55+ if not constraint_tree_node .constraint :
56+ # This means we found a solution!
57+ return (start_and_goals , [constraint_tree_node .paths [AgentId (i )] for i in range (len (start_and_goals ))])
58+
59+ if not isinstance (constraint_tree_node .constraint , ForkingConstraint ):
60+ raise ValueError (f"Expected a ForkingConstraint, but got: { constraint_tree_node .constraint } " )
61+
62+ # TODO: contents of this loop should probably be in a helper?
63+ for constrained_agent in constraint_tree_node .constraint .constrained_agents :
64+ paths : dict [AgentId , NodePath ] = {}
3565
36- # Plan in descending order of distance from start to goal
37- # TODO: dont bother doing this
38- start_and_goals = sorted (start_and_goals ,
39- key = lambda item : item .distance_start_to_goal (),
40- reverse = True )
66+ if verbose :
67+ print (f"\n Outer loop step for agent { constrained_agent } " )
4168
42- # first, plan optimally for each agent
43- # now in a loop:
44- # *
69+ applied_constraint = AppliedConstraint ( constraint_tree_node . constraint . constraint , constrained_agent )
70+ all_constraints = deepcopy ( ancestor_constraints )
71+ all_constraints . append ( applied_constraint )
4572
46- # paths = []
47- # for start_and_goal in start_and_goals:
48- # if verbose:
49- # print(f"\nPlanning for agent: {start_and_goal}" )
73+ if verbose :
74+ print (f"\t all constraints: { all_constraints } " )
5075
51- # grid.clear_initial_reservation(start_and_goal.start, start_and_goal.index )
52- # path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, verbose )
76+ grid .clear_constraint_points ( )
77+ grid . apply_constraint_points ( all_constraints )
5378
54- # if path is None:
55- # print(f"Failed to find path for {start_and_goal}")
56- # return []
79+ for agent_idx , start_and_goal in enumerate (start_and_goals ):
80+ path = single_agent_planner_class .plan (grid , start_and_goal .start , start_and_goal .goal , agent_idx , verbose )
81+ if path is None :
82+ raise RuntimeError (f"Failed to find path for { start_and_goal } " )
83+ paths [AgentId (start_and_goal .index )] = path
5784
58- # agent_index = start_and_goal.index
59- # grid.reserve_path(path, agent_index)
60- # paths.append(path )
85+ applied_constraint_parent = deepcopy ( constraint_tree_node ) #TODO: not sure if deepcopy is actually needed
86+ applied_constraint_parent . constraint = applied_constraint
87+ parent_idx = constraint_tree . add_expanded_node ( applied_constraint_parent )
6188
62- return (start_and_goals , paths )
89+ new_constraint_tree_node = ConstraintTreeNode (paths , parent_idx )
90+ if new_constraint_tree_node .constraint is None :
91+ # This means we found a solution!
92+ return (start_and_goals , [paths [AgentId (i )] for i in range (len (start_and_goals ))])
6393
64- def cbs ():
94+ if verbose :
95+ print (f"Adding new constraint tree node with constraint: { new_constraint_tree_node .constraint } " )
96+ constraint_tree .add_node_to_tree (new_constraint_tree_node )
6597
6698verbose = False
6799show_animation = True
68100def main ():
69101 grid_side_length = 21
70102
71- start_and_goals = [StartAndGoal (i , Position (1 , i ), Position (19 , 19 - i )) for i in range (1 , 16 )]
103+ # start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(1, 16)]
104+ start_and_goals = [StartAndGoal (i , Position (1 , 8 + i ), Position (19 , 19 - i )) for i in range (5 )]
72105 obstacle_avoid_points = [pos for item in start_and_goals for pos in (item .start , item .goal )]
73106
74107 grid = Grid (
75108 np .array ([grid_side_length , grid_side_length ]),
76109 num_obstacles = 250 ,
77110 obstacle_avoid_points = obstacle_avoid_points ,
78- # obstacle_arrangement=ObstacleArrangement.NARROW_CORRIDOR,
79- obstacle_arrangement = ObstacleArrangement .ARRANGEMENT1 ,
111+ obstacle_arrangement = ObstacleArrangement .NARROW_CORRIDOR ,
112+ # obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1,
80113 # obstacle_arrangement=ObstacleArrangement.RANDOM,
81114 )
82115
83116 start_time = time .time ()
84- start_and_goals , paths = ConflictBasedSearch .plan (grid , start_and_goals , SafeIntervalPathPlanner , verbose )
117+ start_and_goals , paths = ConflictBasedSearch .plan (grid , start_and_goals , SpaceTimeAStar , verbose )
85118
86119 runtime = time .time () - start_time
87120 print (f"\n Planning took: { runtime :.5f} seconds" )
0 commit comments