Skip to content

Commit 69c61e6

Browse files
committed
tests: follow_person_following_cam test
1 parent ef67bb6 commit 69c61e6

1 file changed

Lines changed: 118 additions & 0 deletions

File tree

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#!/usr/bin/env python3
2+
3+
"""Tests for follow person following camera ROS launch file."""
4+
5+
import os
6+
import time
7+
import unittest
8+
import psutil
9+
import pytest
10+
11+
import launch_testing
12+
from launch import LaunchDescription
13+
from launch.actions import IncludeLaunchDescription
14+
from launch.launch_description_sources import PythonLaunchDescriptionSource
15+
16+
import rclpy
17+
from sensor_msgs.msg import Imu, JointState
18+
from nav_msgs.msg import Odometry
19+
20+
21+
@pytest.mark.launch_test
22+
def generate_test_description():
23+
"""Generate the launch description for follow person following camera tests."""
24+
# Get absolute path to the launcher
25+
launcher_path = os.path.join(
26+
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
27+
"Launchers",
28+
"follow_person_followingcam.launch.py",
29+
)
30+
31+
# Start the launch file with arguments
32+
launch_description = IncludeLaunchDescription(
33+
PythonLaunchDescriptionSource(launcher_path),
34+
launch_arguments={"headless": "True", "use_simulator": "True"}.items(),
35+
)
36+
37+
return LaunchDescription(
38+
[
39+
launch_description,
40+
launch_testing.actions.ReadyToTest(),
41+
]
42+
)
43+
44+
45+
class TestLaunchProcesses(unittest.TestCase):
46+
"""Test for verifying the launch file starts correctly."""
47+
48+
@classmethod
49+
def setUpClass(cls):
50+
"""Set up ROS node and initialize test counters before all tests."""
51+
rclpy.init()
52+
cls.node = rclpy.create_node("test_node")
53+
54+
# wait for topics to be setup
55+
time.sleep(30)
56+
57+
@classmethod
58+
def tearDownClass(cls):
59+
"""Clean up ROS node and shutdown rclpy after all tests."""
60+
cls.node.destroy_node()
61+
rclpy.shutdown()
62+
# Stop the gzserver process if it is running
63+
for proc in psutil.process_iter(["name"]):
64+
if proc.info["name"] == "gzserver":
65+
print(f"Stopping gzserver (PID={proc.pid})")
66+
proc.terminate()
67+
68+
def test_joint_state_topics(self, proc_output):
69+
"""Test that the joint state topics are published correctly."""
70+
# Check if we can get joint state-related topics
71+
msgs = []
72+
sub = self.__class__.node.create_subscription(
73+
JointState,
74+
"/joint_states",
75+
lambda msg: msgs.append(msg),
76+
10,
77+
)
78+
79+
# Wait briefly to see if topics are available
80+
for _ in range(30):
81+
rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
82+
83+
# Check if we received any messages
84+
self.assertGreater(len(msgs), 0, "No messages received on /joint_states topic.")
85+
86+
self.__class__.node.destroy_subscription(sub)
87+
88+
def test_imu_and_odom_topics(self, proc_output):
89+
"""Test that the IMU and Odometry topics are published correctly."""
90+
91+
# Check if we can get IMU-related topics
92+
imu_msgs = []
93+
imu_sub = self.__class__.node.create_subscription(
94+
Imu,
95+
"/imu",
96+
lambda msg: imu_msgs.append(msg),
97+
10,
98+
)
99+
100+
# Check if we can get Odometry-related topics
101+
odom_msgs = []
102+
odom_sub = self.__class__.node.create_subscription(
103+
Odometry,
104+
"/odom",
105+
lambda msg: odom_msgs.append(msg),
106+
10,
107+
)
108+
109+
# Wait briefly to see if topics are available
110+
for _ in range(30):
111+
rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
112+
113+
# Check if we received any messages
114+
self.assertGreater(len(imu_msgs), 0, "No messages received on /imu topic.")
115+
self.assertGreater(len(odom_msgs), 0, "No messages received on /odom topic.")
116+
117+
self.__class__.node.destroy_subscription(imu_sub)
118+
self.__class__.node.destroy_subscription(odom_sub)

0 commit comments

Comments
 (0)