Skip to content

Commit ae0e850

Browse files
committed
tests: update deps
1 parent 2f63362 commit ae0e850

2 files changed

Lines changed: 153 additions & 0 deletions

File tree

test/requirements.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
annotated-types==0.7.0
12
black==22.3.0
23
catkin-pkg==1.0.0
34
click==8.2.1
@@ -17,6 +18,8 @@ platformdirs==4.3.8
1718
pluggy==1.6.0
1819
psutil==7.0.0
1920
pycodestyle==2.14.0
21+
pydantic==2.11.7
22+
pydantic_core==2.33.2
2023
pydocstyle==6.3.0
2124
pyflakes==3.4.0
2225
Pygments==2.19.2
@@ -27,4 +30,5 @@ PyYAML==6.0.2
2730
six==1.17.0
2831
snowballstemmer==3.0.1
2932
tomli==2.2.1
33+
typing-inspection==0.4.1
3034
typing_extensions==4.14.1

test/test_follow_road.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
#!/usr/bin/env python3
2+
3+
"""Tests for the Vacuum Cleaner (no localization) ROS launch file."""
4+
5+
import os
6+
import unittest
7+
import pytest
8+
import time
9+
10+
import launch_testing
11+
from launch import LaunchDescription
12+
from launch.actions import IncludeLaunchDescription
13+
from launch.launch_description_sources import PythonLaunchDescriptionSource
14+
import psutil
15+
16+
import rclpy
17+
from nav_msgs.msg import Odometry
18+
from geometry_msgs.msg import Twist
19+
from sensor_msgs.msg import LaserScan
20+
21+
22+
@pytest.mark.launch_test
23+
def generate_test_description():
24+
"""Generate the launch description for simple circuit exercise tests."""
25+
# Get absolute path to the launcher
26+
launcher_path = os.path.join(
27+
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
28+
"Launchers",
29+
"follow_road.launch.py",
30+
)
31+
32+
# Start the launch file with arguments
33+
launch_description = IncludeLaunchDescription(
34+
PythonLaunchDescriptionSource(launcher_path),
35+
launch_arguments={
36+
"headless": "True",
37+
"use_simulator": "True",
38+
"simulation_config_file": "/opt/jderobot/jderobot_drones/sim_config/gzsim/as2_config.yaml", # <-- Add this line
39+
"world_file": "/opt/jderobot/jderobot_drones/worlds/follow_road_harmonic.world",
40+
"world_name": "follow_road_harmonic",
41+
}.items(),
42+
)
43+
44+
return LaunchDescription(
45+
[
46+
launch_description,
47+
launch_testing.actions.ReadyToTest(),
48+
]
49+
)
50+
51+
52+
class TestTopicMsgs(unittest.TestCase):
53+
"""Unit tests for verifying camera topics and images."""
54+
55+
@classmethod
56+
def setUpClass(cls):
57+
"""Set up ROS node for testing."""
58+
rclpy.init()
59+
cls.node = rclpy.create_node("test_node")
60+
61+
# wait for topics to be setup
62+
time.sleep(10)
63+
64+
@classmethod
65+
def tearDownClass(cls):
66+
"""Clean up ROS resources after tests."""
67+
# Clean up ROS resources
68+
cls.node.destroy_node()
69+
rclpy.shutdown()
70+
# Stop any running Gazebo processes
71+
for proc in psutil.process_iter(["name"]):
72+
if proc.info["name"] == "gzserver":
73+
print(f"Stopping gzserver (PID={proc.pid})")
74+
proc.terminate()
75+
76+
# def test_odom_works(self, proc_output):
77+
# """Test that the odometry state is updated when the vehicle moves."""
78+
# msgs = []
79+
80+
# # Create a subscription to the odometry topic
81+
# sub = self.__class__.node.create_subscription(
82+
# Odometry,
83+
# "/odom",
84+
# lambda msg: msgs.append(msg),
85+
# qos_profile=10,
86+
# )
87+
88+
# # Wait for messages (up to 1 second)
89+
# for _ in range(10):
90+
# rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
91+
# if msgs:
92+
# break
93+
94+
# # Create a publisher to the velocity topic to trigger odometry updates
95+
# pub = self.__class__.node.create_publisher(
96+
# Twist,
97+
# "/cmd_vel",
98+
# qos_profile=10,
99+
# )
100+
101+
# # Publish a high velocity message to move the vehicle
102+
# twist_msg = Twist()
103+
# twist_msg.linear.x = 45.0 # Move forward at 45 m/s
104+
# twist_msg.angular.z = 120.0 # Turn at 120 rad/s
105+
106+
# # Wait for a short time to allow odometry updates
107+
# for _ in range(10):
108+
# pub.publish(twist_msg)
109+
# rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
110+
111+
# # Check that the vehicle moved by comparing the difference in odom msgs
112+
# self.assertNotAlmostEqual(
113+
# msgs[0].pose.pose.position.x,
114+
# msgs[-1].pose.pose.position.x,
115+
# msg="Odometry messages did not change after publishing velocity.",
116+
# )
117+
# self.assertNotAlmostEqual(
118+
# msgs[0].pose.pose.position.y,
119+
# msgs[-1].pose.pose.position.y,
120+
# msg="Odometry messages did not change after publishing velocity.",
121+
# )
122+
123+
# # Clean up subscriptions
124+
# self.__class__.node.destroy_subscription(sub)
125+
# self.__class__.node.destroy_publisher(pub)
126+
127+
# def test_laser_scan_works(self, proc_output):
128+
# """Test that the laser scan topic is published."""
129+
# msgs = []
130+
131+
# # Create a subscription to the laser scan topic
132+
# sub = self.__class__.node.create_subscription(
133+
# LaserScan,
134+
# "/roombaROS/laser/scan",
135+
# lambda msg: msgs.append(msg),
136+
# qos_profile=10,
137+
# )
138+
139+
# # Wait for messages (up to 1 second)
140+
# for _ in range(10):
141+
# rclpy.spin_once(self.__class__.node, timeout_sec=0.1)
142+
# if msgs:
143+
# break
144+
145+
# # Check that we received at least one message
146+
# self.assertGreater(len(msgs), 0, msg="No laser scan messages received.")
147+
148+
# # Clean up subscriptions
149+
# self.__class__.node.destroy_subscription(sub)

0 commit comments

Comments
 (0)