55import contextlib
66import copy
77import itertools
8+ import sys
9+ import time
810from functools import partial
911from typing import Callable
1012
2729 assert_for_message ,
2830 call_trigger_service ,
2931 wait_for_register ,
32+ wait_for_subscriber ,
3033)
3134from rclpy .node import Node
3235from rclpy .time import Time
3336from sensor_msgs .msg import JointState , NavSatFix
34- from std_srvs .srv import Trigger
3537from tf2_ros import TransformException # ty: ignore[unresolved-import]
3638from tf2_ros .buffer import Buffer
3739from tf2_ros .transform_listener import TransformListener
3840
3941launch_fixtures = []
4042
4143
42- def get_tests (namespace_vehicle : str , namespace_gps : str , timeout : int ) -> dict : # noqa: PLR0915
44+ def get_tests (namespace_vehicle : str , namespace_gps : str ) -> dict : # noqa: PLR0915
4345 """Get the Nav2 tests.
4446
4547 Args:
4648 namespace_vehicle (str): The namespace of the vehicle.
4749 namespace_gps (str): The namespace of the GPS.
48- timeout (int): The timeout in seconds.
4950
5051 Returns:
5152 dict: A dictionary containing the tests.
5253 """
5354
54- def test_wait_for_register (_self : object ) -> None :
55+ def test_wait_for_register (_self : object , timeout : int ) -> None :
5556 """Test that the panther core is registered in the RCDT.
5657
5758 Args:
5859 _self (object): The test class instance.
60+ timeout (int): The timeout in seconds.
5961 """
6062 wait_for_register (timeout = timeout )
6163
62- def test_joint_states_published (_self : object ) -> None :
64+ def test_joint_states_published (_self : object , timeout : int ) -> None :
6365 """Test that the joint states are published.
6466
6567 Args:
6668 _self (object): The test class instance.
69+ timeout (int): The timeout in seconds.
6770 """
6871 assert_for_message (
6972 JointState , f"/{ namespace_vehicle } /joint_states" , timeout = timeout
7073 )
7174
7275 def test_e_stop_request_reset (
73- _self : object , request : SubRequest , test_node : Node
76+ _self : object , request : SubRequest , test_node : Node , timeout : int
7477 ) -> None :
7578 """Test that the E-Stop request service can be called to unlock the Panther.
7679
7780 Args:
7881 _self (object): The test class instance.
7982 request (SubRequest): The pytest request object, used to access command line options
8083 test_node (Node): The ROS 2 node to use for the test.
84+ timeout (int): The timeout in seconds.
8185 """
8286 if request .config .getoption ("simulation" ):
8387 pytest .skip ("E-Stop is not available." ) # ty: ignore[call-non-callable]
@@ -91,26 +95,36 @@ def test_e_stop_request_reset(
9195 )
9296
9397 def test_goal_pose (
94- _self : object , test_node : Node , navigation_distance_tolerance : float
98+ _self : object ,
99+ test_node : Node ,
100+ navigation_distance_tolerance : float ,
101+ timeout : int ,
95102 ) -> None :
96103 """Test that navigation to a goal pose is successful.
97104
98105 Args:
99106 _self (object): The test class instance.
100107 test_node (Node): The ROS 2 node to use for the test.
101108 navigation_distance_tolerance (float): The tolerance for navigation.
109+ timeout (int): The timeout in seconds.
110+
111+ Raises:
112+ AssertionError: When a timeout occurs.
102113 """
103114 # 1) Obtain current pose in map frame:
104115 tf_buffer = Buffer ()
105116 TransformListener (tf_buffer , test_node )
106117 current_pose = TransformStamped ()
107118
119+ start_time = time .time ()
108120 while current_pose == TransformStamped ():
109121 rclpy .spin_once (test_node , timeout_sec = 0 )
110122 with contextlib .suppress (TransformException ):
111123 current_pose = tf_buffer .lookup_transform (
112124 "map" , f"{ namespace_vehicle } /base_link" , Time ()
113125 )
126+ if time .time () - start_time > timeout :
127+ raise AssertionError ("Timeout while waiting for current pose." )
114128
115129 # 2) Publish goal pose 1 meter in front of current position:
116130 goal_pose = PoseStamped ()
@@ -120,39 +134,45 @@ def test_goal_pose(
120134 goal_pose .pose .position .z = current_pose .transform .translation .z
121135
122136 publisher = test_node .create_publisher (PoseStamped , "/goal_pose" , 10 )
137+ wait_for_subscriber (publisher , timeout )
123138 publisher .publish (goal_pose )
139+ test_node .get_logger ().info ("Published goal pose for navigation." )
124140
125141 # 3) Wait until goal is reached within tolerance:
126- while (
127- abs (current_pose .transform .translation .x - goal_pose .pose .position .x )
128- > navigation_distance_tolerance
129- ):
142+ start_time = time .time ()
143+ distance : float = sys .float_info .max
144+ while distance > navigation_distance_tolerance :
130145 rclpy .spin_once (test_node , timeout_sec = 0 )
131146 with contextlib .suppress (TransformException ):
132147 current_pose = tf_buffer .lookup_transform (
133148 "map" , f"{ namespace_vehicle } /base_link" , Time ()
134149 )
150+ distance = abs (
151+ current_pose .transform .translation .x - goal_pose .pose .position .x
152+ )
153+ if time .time () - start_time > timeout :
154+ raise AssertionError (
155+ f"Timeout. Distance is { distance } while tolerance is { navigation_distance_tolerance } ."
156+ )
135157
136- # 4) Stop the navigation, since the test can finish before the goal is reached due to the tolerance:
137- stop_navigation_client = test_node .create_client (
138- Trigger , f"/{ namespace_vehicle } /nav2_manager/stop"
139- )
140- future = stop_navigation_client .call_async (Trigger .Request ())
141- rclpy .spin_until_future_complete (test_node , future , timeout_sec = timeout )
142- service_response : Trigger .Response = future .result ()
143- assert False if not service_response else service_response .success , (
144- "Failed to stop navigation."
158+ # 4) Stop navigation, since the goal can be reached before the navigation is finished due to tolerance:
159+ assert call_trigger_service (
160+ test_node , f"/{ namespace_vehicle } /nav2_manager/stop" , timeout
145161 )
146162
147163 def test_gps_navigation (
148- _self : object , test_node : Node , navigation_degree_tolerance : float
164+ _self : object , test_node : Node , navigation_degree_tolerance : float , timeout : int
149165 ) -> None :
150166 """Test that GPS navigation to a goal pose is successful.
151167
152168 Args:
153169 _self (object): The test class instance.
154170 test_node (Node): The ROS 2 node to use for the test.
155171 navigation_degree_tolerance (float): The tolerance for navigation.
172+ timeout (int): The timeout in seconds.
173+
174+ Raises:
175+ AssertionError: When a timeout occurs.
156176 """
157177 # 1) Obtain current GPS location:
158178 current_nav_sat = NavSatFix ()
@@ -165,14 +185,18 @@ def callback(msg: NavSatFix) -> None:
165185 NavSatFix , f"/{ namespace_gps } /gps/fix" , callback , 10
166186 )
167187
188+ start_time = time .time ()
168189 while current_nav_sat == NavSatFix ():
169190 rclpy .spin_once (test_node , timeout_sec = 0 )
191+ if time .time () - start_time > timeout :
192+ raise AssertionError ("Timeout while waiting for current GPS location." )
170193
171194 # 2) Publish goal GPS location 1e-5 degrees north of current location:
172195 goal_nav_sat = copy .deepcopy (current_nav_sat )
173196 goal_nav_sat .latitude += 1e-5
174197
175198 publisher = test_node .create_publisher (GeoPath , "/gps_waypoints" , 10 )
199+ wait_for_subscriber (publisher , timeout )
176200 goal_msg = GeoPath ()
177201 goal_pose = GeoPoseStamped ()
178202 goal_pose .pose .position .latitude = goal_nav_sat .latitude
@@ -181,21 +205,19 @@ def callback(msg: NavSatFix) -> None:
181205 publisher .publish (goal_msg )
182206
183207 # 3) Wait until goal is reached within tolerance:
184- while (
185- abs (goal_nav_sat .latitude - current_nav_sat .latitude )
186- > navigation_degree_tolerance
187- ):
208+ start_time = time .time ()
209+ distance : float = sys .float_info .max
210+ while distance > navigation_degree_tolerance :
188211 rclpy .spin_once (test_node , timeout_sec = 0 )
212+ distance = abs (current_nav_sat .latitude - goal_nav_sat .latitude )
213+ if time .time () - start_time > timeout :
214+ raise AssertionError (
215+ f"Timeout. Distance is { distance } while tolerance is { navigation_degree_tolerance } ."
216+ )
189217
190- # 4) Stop the navigation, since the test can finish before the goal is reached due to the tolerance:
191- stop_navigation_client = test_node .create_client (
192- Trigger , f"/{ namespace_vehicle } /nav2_manager/stop"
193- )
194- future = stop_navigation_client .call_async (Trigger .Request ())
195- rclpy .spin_until_future_complete (test_node , future , timeout_sec = timeout )
196- service_response : Trigger .Response = future .result ()
197- assert False if not service_response else service_response .success , (
198- "Failed to stop navigation."
218+ # 4) Stop navigation, since the goal can be reached before the navigation is finished due to tolerance:
219+ assert call_trigger_service (
220+ test_node , f"/{ namespace_vehicle } /nav2_manager/stop" , timeout
199221 )
200222
201223 # Collect all test methods defined above
@@ -280,7 +302,7 @@ def launch_fixture(
280302 test_class = type (
281303 f"TestNav2_{ vehicle_platform } _{ lidar_platform } _{ gps_platform } " ,
282304 (object ,),
283- get_tests (vehicle_platform , gps_platform , timeout = 150 ),
305+ get_tests (vehicle_platform , gps_platform ),
284306 )
285307 pytest .mark .launch (fixture = launch_fixture_wrapper )(test_class )
286308 globals ()[test_class .__name__ ] = test_class
0 commit comments