1313# limitations under the License.
1414
1515import os
16+ import time
1617import subprocess
18+ import threading
1719import unittest
1820
1921from ament_index_python .packages import get_package_prefix
2022from ament_index_python .packages import get_package_share_directory
23+ from control_msgs .msg import HardwareDeviceStatus
24+ from control_msgs .msg import HardwareStatus
2125from launch import LaunchDescription
2226import launch_testing
2327from launch_testing .actions import ReadyToTest
2630import pytest
2731import rclpy
2832
33+ from controller_manager .controller_manager_services import list_controllers
2934from controller_manager .launch_utils import generate_controllers_spawner_launch_description
3035from controller_manager .test_utils import check_controllers_running
3136from controller_manager .test_utils import check_node_running
@@ -94,16 +99,86 @@ def setUp(self):
9499 def tearDown (self ):
95100 self .node .destroy_node ()
96101
102+ def controller_states (self ):
103+ controllers = list_controllers (self .node , "controller_manager" , 5.0 ).controller
104+ return {controller .name : controller .state for controller in controllers }
105+
106+ def ensure_ctrl1_active (self ):
107+ states = self .controller_states ()
108+ if "ctrl_1" not in states :
109+ result = run_cli (
110+ "load_controller" , "--set-state" , "active" , "ctrl_1" , self .robot_controllers
111+ )
112+ self .assertEqual (result .returncode , 0 , result .stderr )
113+ elif states ["ctrl_1" ] == "unconfigured" :
114+ result = run_cli ("set_controller_state" , "ctrl_1" , "inactive" )
115+ self .assertEqual (result .returncode , 0 , result .stderr )
116+ result = run_cli ("set_controller_state" , "ctrl_1" , "active" )
117+ self .assertEqual (result .returncode , 0 , result .stderr )
118+ elif states ["ctrl_1" ] == "inactive" :
119+ result = run_cli ("set_controller_state" , "ctrl_1" , "active" )
120+ self .assertEqual (result .returncode , 0 , result .stderr )
121+
122+ check_controllers_running (self .node , ["ctrl_1" ], state = "active" )
123+
124+ def ensure_ctrl2_inactive (self ):
125+ states = self .controller_states ()
126+ if "ctrl_2" not in states :
127+ result = run_cli (
128+ "load_controller" ,
129+ "--set-state" ,
130+ "inactive" ,
131+ "ctrl_2" ,
132+ self .robot_controllers ,
133+ )
134+ self .assertEqual (result .returncode , 0 , result .stderr )
135+ elif states ["ctrl_2" ] == "unconfigured" :
136+ result = run_cli ("set_controller_state" , "ctrl_2" , "inactive" )
137+ self .assertEqual (result .returncode , 0 , result .stderr )
138+ elif states ["ctrl_2" ] == "active" :
139+ result = run_cli ("set_controller_state" , "ctrl_2" , "inactive" )
140+ self .assertEqual (result .returncode , 0 , result .stderr )
141+
142+ check_controllers_running (self .node , ["ctrl_2" ], state = "inactive" )
143+
144+ def unload_ctrl2_if_loaded (self ):
145+ states = self .controller_states ()
146+ if "ctrl_2" not in states :
147+ return
148+
149+ if states ["ctrl_2" ] == "active" :
150+ result = run_cli ("set_controller_state" , "ctrl_2" , "inactive" )
151+ self .assertEqual (result .returncode , 0 , result .stderr )
152+
153+ result = run_cli ("unload_controller" , "ctrl_2" )
154+ self .assertEqual (result .returncode , 0 , result .stderr )
155+ self .assertNotIn ("ctrl_2" , self .controller_states ())
156+
97157 def test_node_start (self ):
98158 check_node_running (self .node , "controller_manager" )
99159 check_controllers_running (self .node , ["ctrl_1" ], state = "active" )
100160
161+ def test_list_controllers (self ):
162+ self .ensure_ctrl1_active ()
163+ result = run_cli ("list_controllers" )
164+
165+ self .assertEqual (result .returncode , 0 , result .stderr )
166+ self .assertIn ("ctrl_1" , result .stdout )
167+ self .assertIn ("active" , result .stdout )
168+
101169 def test_list_controller_types (self ):
102170 result = run_cli ("list_controller_types" )
103171
104172 self .assertEqual (result .returncode , 0 , result .stderr )
105173 self .assertIn ("controller_manager/test_controller" , result .stdout )
106174
175+ def test_list_hardware_components (self ):
176+ result = run_cli ("list_hardware_components" )
177+
178+ self .assertEqual (result .returncode , 0 , result .stderr )
179+ self .assertIn ("TestSystemComponent" , result .stdout )
180+ self .assertIn ("TestSensorComponent" , result .stdout )
181+
107182 def test_list_hardware_interfaces (self ):
108183 result = run_cli ("list_hardware_interfaces" )
109184
@@ -121,19 +196,52 @@ def test_reload_controller_libraries_force_kill(self):
121196 self .assertEqual (result .returncode , 0 , result .stderr )
122197 check_controllers_running (self .node , ["ctrl_1" ], state = "active" )
123198
124- def test_switch_controllers_best_effort (self ):
199+ def test_reload_controller_libraries_without_force_kill_fails (self ):
200+ self .ensure_ctrl1_active ()
201+
202+ result = run_cli ("reload_controller_libraries" )
203+
204+ self .assertNotEqual (result .returncode , 0 )
125205 check_controllers_running (self .node , ["ctrl_1" ], state = "active" )
126206
207+ def test_load_controller (self ):
208+ self .unload_ctrl2_if_loaded ()
209+
127210 result = run_cli (
128- "load_controller" ,
129- "--set-state" ,
130- "inactive" ,
131- "ctrl_2" ,
132- self .robot_controllers ,
211+ "load_controller" , "--set-state" , "inactive" , "ctrl_2" , self .robot_controllers
133212 )
134213 self .assertEqual (result .returncode , 0 , result .stderr )
135214 check_controllers_running (self .node , ["ctrl_2" ], state = "inactive" )
136215
216+ def test_set_controller_state (self ):
217+ self .ensure_ctrl2_inactive ()
218+
219+ result = run_cli ("set_controller_state" , "ctrl_2" , "active" )
220+ self .assertEqual (result .returncode , 0 , result .stderr )
221+ check_controllers_running (self .node , ["ctrl_2" ], state = "active" )
222+
223+ result = run_cli ("set_controller_state" , "ctrl_2" , "inactive" )
224+ self .assertEqual (result .returncode , 0 , result .stderr )
225+ check_controllers_running (self .node , ["ctrl_2" ], state = "inactive" )
226+
227+ def test_cleanup_controller (self ):
228+ self .ensure_ctrl2_inactive ()
229+
230+ result = run_cli ("cleanup_controller" , "ctrl_2" )
231+ self .assertEqual (result .returncode , 0 , result .stderr )
232+ check_controllers_running (self .node , ["ctrl_2" ], state = "unconfigured" )
233+
234+ def test_unload_controller (self ):
235+ self .ensure_ctrl2_inactive ()
236+
237+ result = run_cli ("unload_controller" , "ctrl_2" )
238+ self .assertEqual (result .returncode , 0 , result .stderr )
239+ self .assertNotIn ("ctrl_2" , self .controller_states ())
240+
241+ def test_switch_controllers_best_effort (self ):
242+ self .ensure_ctrl1_active ()
243+ self .ensure_ctrl2_inactive ()
244+
137245 result = run_cli (
138246 "switch_controllers" ,
139247 "--best-effort" ,
@@ -147,6 +255,48 @@ def test_switch_controllers_best_effort(self):
147255 check_controllers_running (self .node , ["ctrl_1" ], state = "inactive" )
148256 check_controllers_running (self .node , ["ctrl_2" ], state = "active" )
149257
258+ def test_view_hardware_status (self ):
259+ publisher = self .node .create_publisher (HardwareStatus , "/test/hardware_status" , 10 )
260+ stop_publishing = threading .Event ()
261+
262+ def publish_status ():
263+ while not stop_publishing .is_set ():
264+ msg = HardwareStatus ()
265+ msg .header .stamp = self .node .get_clock ().now ().to_msg ()
266+ msg .hardware_id = "test_hardware_component"
267+
268+ device_status = HardwareDeviceStatus ()
269+ device_status .device_id = "test_device"
270+ msg .hardware_device_states .append (device_status )
271+
272+ publisher .publish (msg )
273+ time .sleep (0.1 )
274+
275+ process = subprocess .Popen (
276+ ["ros2" , "control" , "view_hardware_status" , "--hardware-id" , "missing_hardware" ],
277+ stdout = subprocess .PIPE ,
278+ stderr = subprocess .PIPE ,
279+ text = True ,
280+ )
281+
282+ publisher_thread = threading .Thread (target = publish_status , daemon = True )
283+ publisher_thread .start ()
284+
285+ try :
286+ stdout , stderr = process .communicate (timeout = 10.0 )
287+ finally :
288+ stop_publishing .set ()
289+ publisher_thread .join (timeout = 1.0 )
290+ if process .poll () is None :
291+ process .kill ()
292+ process .communicate ()
293+
294+ self .assertEqual (process .returncode , 0 , stderr )
295+ self .assertIn ("Subscribing to the following topics" , stdout )
296+ self .assertIn ("/test/hardware_status" , stdout )
297+ self .assertIn ("Available Hardware IDs" , stdout )
298+ self .assertIn ("test_hardware_component" , stdout )
299+
150300
151301@launch_testing .post_shutdown_test ()
152302class TestShutdown (unittest .TestCase ):
0 commit comments