Skip to content

Commit 2f63362

Browse files
committed
tests: add test for vaccum cleaner exercise
1 parent 5fa8abb commit 2f63362

2 files changed

Lines changed: 144 additions & 1 deletion

File tree

test/test_simple_circuit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python3
22

3-
"""Tests for 3D reconstruction ROS launch file."""
3+
"""Tests for Line Follower (Simple Circuit) ROS launch file."""
44

55
import os
66
import unittest

test/test_vacuum_cleaner.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
#!/usr/bin/env python3
2+
3+
"""Tests for the Vacuum Cleaner (no localization) ROS launch file."""
4+
5+
import os
6+
import unittest
7+
import pytest
8+
import time
9+
10+
import launch_testing
11+
from launch import LaunchDescription
12+
from launch.actions import IncludeLaunchDescription
13+
from launch.launch_description_sources import PythonLaunchDescriptionSource
14+
import psutil
15+
16+
import rclpy
17+
from nav_msgs.msg import Odometry
18+
from geometry_msgs.msg import Twist
19+
from sensor_msgs.msg import LaserScan
20+
21+
22+
@pytest.mark.launch_test
23+
def generate_test_description():
24+
"""Generate the launch description for simple circuit exercise tests."""
25+
# Get absolute path to the launcher
26+
launcher_path = os.path.join(
27+
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
28+
"Launchers",
29+
"vacuum_cleaner_headless.launch.py",
30+
)
31+
32+
# Start the launch file with arguments
33+
launch_description = IncludeLaunchDescription(
34+
PythonLaunchDescriptionSource(launcher_path),
35+
launch_arguments={"headless": "True", "use_simulator": "True"}.items(),
36+
)
37+
38+
return LaunchDescription(
39+
[
40+
launch_description,
41+
launch_testing.actions.ReadyToTest(),
42+
]
43+
)
44+
45+
46+
class TestTopicMsgs(unittest.TestCase):
47+
"""Unit tests for verifying camera topics and images."""
48+
49+
@classmethod
50+
def setUpClass(cls):
51+
"""Set up ROS node for testing."""
52+
rclpy.init()
53+
cls.node = rclpy.create_node("test_node")
54+
55+
# wait for topics to be setup
56+
time.sleep(10)
57+
58+
@classmethod
59+
def tearDownClass(cls):
60+
"""Clean up ROS resources after tests."""
61+
# Clean up ROS resources
62+
cls.node.destroy_node()
63+
rclpy.shutdown()
64+
# Stop any running Gazebo processes
65+
for proc in psutil.process_iter(["name"]):
66+
if proc.info["name"] == "gzserver":
67+
print(f"Stopping gzserver (PID={proc.pid})")
68+
proc.terminate()
69+
70+
def test_odom_works(self, proc_output):
71+
"""Test that the odometry state is updated when the vehicle moves."""
72+
msgs = []
73+
74+
# Create a subscription to the odometry topic
75+
sub = self.__class__.node.create_subscription(
76+
Odometry,
77+
"/odom",
78+
lambda msg: msgs.append(msg),
79+
qos_profile=10,
80+
)
81+
82+
# Wait for messages (up to 1 second)
83+
for _ in range(10):
84+
rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
85+
if msgs:
86+
break
87+
88+
# Create a publisher to the velocity topic to trigger odometry updates
89+
pub = self.__class__.node.create_publisher(
90+
Twist,
91+
"/cmd_vel",
92+
qos_profile=10,
93+
)
94+
95+
# Publish a high velocity message to move the vehicle
96+
twist_msg = Twist()
97+
twist_msg.linear.x = 45.0 # Move forward at 45 m/s
98+
twist_msg.angular.z = 120.0 # Turn at 120 rad/s
99+
100+
# Wait for a short time to allow odometry updates
101+
for _ in range(10):
102+
pub.publish(twist_msg)
103+
rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
104+
105+
# Check that the vehicle moved by comparing the difference in odom msgs
106+
self.assertNotAlmostEqual(
107+
msgs[0].pose.pose.position.x,
108+
msgs[-1].pose.pose.position.x,
109+
msg="Odometry messages did not change after publishing velocity.",
110+
)
111+
self.assertNotAlmostEqual(
112+
msgs[0].pose.pose.position.y,
113+
msgs[-1].pose.pose.position.y,
114+
msg="Odometry messages did not change after publishing velocity.",
115+
)
116+
117+
# Clean up subscriptions
118+
self.__class__.node.destroy_subscription(sub)
119+
self.__class__.node.destroy_publisher(pub)
120+
121+
def test_laser_scan_works(self, proc_output):
122+
"""Test that the laser scan topic is published."""
123+
msgs = []
124+
125+
# Create a subscription to the laser scan topic
126+
sub = self.__class__.node.create_subscription(
127+
LaserScan,
128+
"/roombaROS/laser/scan",
129+
lambda msg: msgs.append(msg),
130+
qos_profile=10,
131+
)
132+
133+
# Wait for messages (up to 1 second)
134+
for _ in range(10):
135+
rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
136+
if msgs:
137+
break
138+
139+
# Check that we received at least one message
140+
self.assertGreater(len(msgs), 0, msg="No laser scan messages received.")
141+
142+
# Clean up subscriptions
143+
self.__class__.node.destroy_subscription(sub)

0 commit comments

Comments
 (0)