1414import psutil
1515
1616import rclpy
17- from nav_msgs .msg import Odometry
1817from geometry_msgs .msg import Twist
19- from sensor_msgs .msg import LaserScan
18+ from sensor_msgs .msg import Imu
2019
2120
2221@pytest .mark .launch_test
@@ -29,15 +28,18 @@ def generate_test_description():
2928 "follow_road.launch.py" ,
3029 )
3130
31+ config_file = "/opt/jderobot/jderobot_drones/sim_config/gzsim/world.json"
32+
3233 # Start the launch file with arguments
3334 launch_description = IncludeLaunchDescription (
3435 PythonLaunchDescriptionSource (launcher_path ),
3536 launch_arguments = {
3637 "headless" : "True" ,
3738 "use_simulator" : "True" ,
38- "simulation_config_file" : "/opt/jderobot/jderobot_drones/sim_config/gzsim/as2_config.yaml" , # <-- Add this line
39- "world_file" : "/opt/jderobot/jderobot_drones/worlds/follow_road_harmonic.world" ,
40- "world_name" : "follow_road_harmonic" ,
39+ "use_sim_time" : "True" ,
40+ "simulation_config_file" : config_file ,
41+ "plugin_name" : "differential_flatness_controller" ,
42+ # "plugin_name": "pid_speed_controller",
4143 }.items (),
4244 )
4345
@@ -69,81 +71,65 @@ def tearDownClass(cls):
6971 rclpy .shutdown ()
7072 # Stop any running Gazebo processes
7173 for proc in psutil .process_iter (["name" ]):
72- if proc .info ["name" ] == "gzserver" :
74+ if proc .info ["name" ] in [ "gzserver" , "gazebo" , "ign gazebo" , "gz" ] :
7375 print (f"Stopping gzserver (PID={ proc .pid } )" )
7476 proc .terminate ()
7577
76- # def test_odom_works(self, proc_output):
77- # """Test that the odometry state is updated when the vehicle moves."""
78- # msgs = []
79-
80- # # Create a subscription to the odometry topic
81- # sub = self.__class__.node.create_subscription(
82- # Odometry,
83- # "/odom",
84- # lambda msg: msgs.append(msg),
85- # qos_profile=10,
86- # )
87-
88- # # Wait for messages (up to 1 second)
89- # for _ in range(10):
90- # rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
91- # if msgs:
92- # break
93-
94- # # Create a publisher to the velocity topic to trigger odometry updates
95- # pub = self.__class__.node.create_publisher(
96- # Twist,
97- # "/cmd_vel",
98- # qos_profile=10,
99- # )
100-
101- # # Publish a high velocity message to move the vehicle
102- # twist_msg = Twist()
103- # twist_msg.linear.x = 45.0 # Move forward at 45 m/s
104- # twist_msg.angular.z = 120.0 # Turn at 120 rad/s
105-
106- # # Wait for a short time to allow odometry updates
107- # for _ in range(10):
108- # pub.publish(twist_msg)
109- # rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
110-
111- # # Check that the vehicle moved by comparing the difference in odom msgs
112- # self.assertNotAlmostEqual(
113- # msgs[0].pose.pose.position.x,
114- # msgs[-1].pose.pose.position.x,
115- # msg="Odometry messages did not change after publishing velocity.",
116- # )
117- # self.assertNotAlmostEqual(
118- # msgs[0].pose.pose.position.y,
119- # msgs[-1].pose.pose.position.y,
120- # msg="Odometry messages did not change after publishing velocity.",
121- # )
122-
123- # # Clean up subscriptions
124- # self.__class__.node.destroy_subscription(sub)
125- # self.__class__.node.destroy_publisher(pub)
126-
127- # def test_laser_scan_works(self, proc_output):
128- # """Test that the laser scan topic is published."""
129- # msgs = []
130-
131- # # Create a subscription to the laser scan topic
132- # sub = self.__class__.node.create_subscription(
133- # LaserScan,
134- # "/roombaROS/laser/scan",
135- # lambda msg: msgs.append(msg),
136- # qos_profile=10,
137- # )
138-
139- # # Wait for messages (up to 1 second)
140- # for _ in range(10):
141- # rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
142- # if msgs:
143- # break
144-
145- # # Check that we received at least one message
146- # self.assertGreater(len(msgs), 0, msg="No laser scan messages received.")
147-
148- # # Clean up subscriptions
149- # self.__class__.node.destroy_subscription(sub)
78+ def test_imu_works (self , proc_output ):
79+ """Test that the imu topic works."""
80+ msgs = []
81+
82+ sub = self .__class__ .node .create_subscription (
83+ Imu ,
84+ "/drone0/sensor_measurements/imu" ,
85+ lambda msg : msgs .append (msg ),
86+ qos_profile = 10 ,
87+ )
88+
89+ # Wait for messages (up to 1 second)
90+ for _ in range (10 ):
91+ rclpy .spin_once (self .__class__ .node , timeout_sec = 0.1 )
92+ if msgs :
93+ break
94+
95+ # Check that we received msgs
96+ self .assertGreater (len (msgs ), 0 , msg = "No IMU messages received." )
97+
98+ # Clean up subscriptions
99+ self .__class__ .node .destroy_subscription (sub )
100+
101+ def test_movement_works (self , proc_output ):
102+ """Test that the velocity topic works correctly."""
103+ msgs = []
104+
105+ # Create a subscription to the velocity topic
106+ sub = self .__class__ .node .create_subscription (
107+ Twist ,
108+ "/gz/drone0/cmd_vel" ,
109+ lambda msg : msgs .append (msg ),
110+ qos_profile = 10 ,
111+ )
112+
113+ # Create a publisher to the velocity topic
114+ pub = self .__class__ .node .create_publisher (
115+ Twist ,
116+ "/gz/drone0/cmd_vel" ,
117+ qos_profile = 10 ,
118+ )
119+
120+ # Publish a high velocity message to move the vehicle
121+ twist_msg = Twist ()
122+ twist_msg .linear .x = 45.0 # Move forward at 45 m/s
123+ twist_msg .angular .z = 120.0 # Turn at 120 rad/s
124+
125+ # Wait for a short time to allow odometry updates
126+ for _ in range (15 ):
127+ pub .publish (twist_msg )
128+ rclpy .spin_once (self .__class__ .node , timeout_sec = 0.1 )
129+
130+ # Check that we received some messages
131+ self .assertNotEqual (len (msgs ), 0 , msg = "No cmd_vel messages received." )
132+
133+ # Clean up subscriptions
134+ self .__class__ .node .destroy_subscription (sub )
135+ self .__class__ .node .destroy_publisher (pub )
0 commit comments