@@ -46,68 +46,76 @@ def __lt__(self, other) -> bool:
4646 return self .cost < other .cost
4747
4848 def get_constraint_point (self , verbose = False ) -> Optional [ForkingConstraint ]:
49+ """
50+ Check paths for any constraints, and if any are found return the earliest one.
51+ """
4952
5053 final_t = max (path .goal_reached_time () for path in self .paths .values ())
5154 positions_at_time : dict [PositionAtTime , AgentId ] = {}
5255 for t in range (final_t + 1 ):
53- possible_constraints : list [ForkingConstraint ] = []
54- for agent_id , path in self .paths .items ():
55- # Check for edge conflicts
56- last_position = None
57- if t > 0 :
58- last_position = path .get_position (t - 1 )
59-
60- position = path .get_position (t )
61- if position is None :
62- continue
63- position_at_time = PositionAtTime (position , t )
64- if position_at_time not in positions_at_time :
65- positions_at_time [position_at_time ] = AgentId (agent_id )
66-
67- # edge conflict
68- if last_position :
69- new_position_at_last_time = PositionAtTime (position , t - 1 )
70- old_position_at_new_time = PositionAtTime (last_position , t )
71- if new_position_at_last_time in positions_at_time and old_position_at_new_time in positions_at_time :
72- conflicting_agent_id1 = positions_at_time [new_position_at_last_time ]
73- conflicting_agent_id2 = positions_at_time [old_position_at_new_time ]
74-
75- if conflicting_agent_id1 == conflicting_agent_id2 and conflicting_agent_id1 != agent_id :
76- if verbose :
77- print (f"Found edge constraint between with agent { conflicting_agent_id1 } for { agent_id } " )
78- print (f"\t positions old: { old_position_at_new_time } , new: { position_at_time } " )
79- new_constraint = ForkingConstraint ((
80- ConstrainedAgent (agent_id , position_at_time ),
81- ConstrainedAgent (conflicting_agent_id1 , Constraint (position = last_position , time = t ))
82- ))
83- possible_constraints .append (new_constraint )
84- continue
85-
86- # double reservation at a (cell, time) combination
87- if positions_at_time [position_at_time ] != agent_id :
88- conflicting_agent_id = positions_at_time [position_at_time ]
89-
90- constraint = Constraint (position = position , time = t )
91- possible_constraints .append (ForkingConstraint ((
92- ConstrainedAgent (agent_id , constraint ), ConstrainedAgent (conflicting_agent_id , constraint )
93- )))
94- continue
95- if possible_constraints :
96- if verbose :
97- print (f"Choosing best constraint of { possible_constraints } " )
98- # first check for edge constraints
99- for constraint in possible_constraints :
100- if constraint .constrained_agents [0 ].constraint .position != constraint .constrained_agents [1 ].constraint .position :
101- if verbose :
102- print (f"\t Found edge conflict constraint: { constraint } " )
103- return constraint
104- # if none, then return first normal constraint
105- if verbose :
106- print (f"\t Returning normal constraint: { possible_constraints [0 ]} " )
107- return possible_constraints [0 ]
56+ possible_constraints : list [ForkingConstraint ] = self .check_for_constraints_at_time (positions_at_time , t , verbose )
57+ if not possible_constraints :
58+ continue
59+
60+ if verbose :
61+ print (f"Choosing best constraint of { possible_constraints } " )
62+ # first check for edge constraints
63+ for constraint in possible_constraints :
64+ if constraint .constrained_agents [0 ].constraint .position != constraint .constrained_agents [1 ].constraint .position :
65+ if verbose :
66+ print (f"\t Found edge conflict constraint: { constraint } " )
67+ return constraint
68+ # if none, then return first normal constraint
69+ if verbose :
70+ print (f"\t Returning normal constraint: { possible_constraints [0 ]} " )
71+ return possible_constraints [0 ]
10872
10973 return None
11074
75+ def check_for_constraints_at_time (self , positions_at_time : dict [PositionAtTime , AgentId ], t : int , verbose : bool ) -> list [ForkingConstraint ]:
76+ """
77+ Check for constraints between paths at a particular time step
78+ """
79+ possible_constraints : list [ForkingConstraint ] = []
80+ for agent_id , path in self .paths .items ():
81+
82+ position = path .get_position (t )
83+ if position is None :
84+ continue
85+ position_at_time = PositionAtTime (position , t )
86+ if position_at_time not in positions_at_time :
87+ positions_at_time [position_at_time ] = AgentId (agent_id )
88+
89+ # double reservation at a (cell, time) combination
90+ if positions_at_time [position_at_time ] != agent_id :
91+ conflicting_agent_id = positions_at_time [position_at_time ]
92+ constraint = Constraint (position = position , time = t )
93+ possible_constraints .append (ForkingConstraint ((
94+ ConstrainedAgent (agent_id , constraint ), ConstrainedAgent (conflicting_agent_id , constraint )
95+ )))
96+
97+ # Check for edge conflicts (can only happen after first time step)
98+ if t == 0 :
99+ continue
100+ last_position = path .get_position (t - 1 )
101+ new_position_at_last_time = PositionAtTime (position , t - 1 )
102+ old_position_at_new_time = PositionAtTime (last_position , t )
103+ if new_position_at_last_time in positions_at_time and old_position_at_new_time in positions_at_time :
104+ conflicting_agent_id1 = positions_at_time [new_position_at_last_time ]
105+ conflicting_agent_id2 = positions_at_time [old_position_at_new_time ]
106+
107+ if conflicting_agent_id1 == conflicting_agent_id2 and conflicting_agent_id1 != agent_id :
108+ if verbose :
109+ print (f"Found edge constraint between with agent { conflicting_agent_id1 } for { agent_id } " )
110+ print (f"\t positions old: { old_position_at_new_time } , new: { position_at_time } " )
111+ new_constraint = ForkingConstraint ((
112+ ConstrainedAgent (agent_id , position_at_time ),
113+ ConstrainedAgent (conflicting_agent_id1 , Constraint (position = last_position , time = t ))
114+ ))
115+ possible_constraints .append (new_constraint )
116+
117+ return possible_constraints
118+
111119
112120class ConstraintTree :
113121 # Child nodes have been created (Maps node_index to ConstraintTreeNode)
0 commit comments