11import os
22
3- import yaml
4- from ament_index_python .packages import get_package_share_directory
53from launch import LaunchDescription
64from launch .actions import (
75 ExecuteProcess ,
108 SetLaunchConfiguration ,
119 IncludeLaunchDescription ,
1210)
13- from launch .conditions import IfCondition
1411from launch .launch_description_sources import PythonLaunchDescriptionSource
15- from launch .substitutions import Command , FindExecutable , PathJoinSubstitution , LaunchConfiguration , PythonExpression , \
16- ThisLaunchFileDir
12+ from launch .substitutions import Command , FindExecutable , PathJoinSubstitution , LaunchConfiguration , PythonExpression
1713from launch_ros .actions import Node
1814from launch_ros .substitutions import FindPackageShare
19-
20-
21- # --------------------------
22- # Internal: minimal generic override
23- # --------------------------
24- def generate_temp_config (config_path , package_name , kv_pairs ):
25- """
26- Load <package_name>/<config_path>, apply overrides from kv_pairs,
27- and write to /tmp/<package_name>/temp_controllers.yaml. Returns the path.
28- kv_pairs: list of (dotted_key, raw_value_str)
29- """
30- pkg_dir = get_package_share_directory (package_name )
31- src_path = os .path .join (pkg_dir , config_path )
32- dst_path = os .path .join ('/tmp' , package_name , 'temp_controllers.yaml' )
33- os .makedirs (os .path .dirname (dst_path ), exist_ok = True )
34-
35- with open (src_path , 'r' ) as f :
36- cfg = yaml .safe_load (f ) or {}
37-
38- for dotted_key , raw_val in kv_pairs :
39- parts = [p for p in dotted_key .split ('.' ) if p ]
40- if len (parts ) < 2 :
41- raise ValueError (
42- f"Key '{ dotted_key } ' is incomplete; expected '<ns>.[ros__parameters.]foo.bar'"
43- )
44- # Auto-insert ros__parameters right after namespace if omitted
45- if parts [1 ] != 'ros__parameters' :
46- parts .insert (1 , 'ros__parameters' )
47-
48- try :
49- val = yaml .safe_load (raw_val )
50- except Exception :
51- val = raw_val
52-
53- cur = cfg
54- for k in parts [:- 1 ]:
55- if not isinstance (cur .get (k ), dict ):
56- cur [k ] = {}
57- cur = cur [k ]
58- cur [parts [- 1 ]] = val
59-
60- with open (dst_path , 'w' ) as f :
61- yaml .dump (cfg , f , sort_keys = False )
62- print (f"[launch] Temp controllers.yaml written to { dst_path } " )
63-
64- return dst_path
65-
66-
67- # --------------------------
68- # ROS nodes / launch wiring
69- # --------------------------
70- def control_spawner (names , inactive = False ):
71- args = list (names )
72- args += ['--param-file' , LaunchConfiguration ('controllers_yaml' )]
73- if inactive :
74- args .append ('--inactive' )
75- return Node (
76- package = 'controller_manager' ,
77- executable = 'spawner' ,
78- arguments = args ,
79- output = 'screen'
80- )
15+ from legged_bringup .launch_utils import (
16+ get_controller_names , generate_temp_config , resolve_policy_paths , download_wandb_onnx , control_spawner
17+ )
8118
8219
8320def setup_controllers (context ):
8421 robot_type_value = LaunchConfiguration ('robot_type' ).perform (context )
8522 policy_path_value = LaunchConfiguration ('policy_path' ).perform (context )
23+ wandb_path_value = LaunchConfiguration ('wandb_path' ).perform (context )
8624 start_step_value = LaunchConfiguration ('start_step' ).perform (context )
8725 ext_pos_corr = LaunchConfiguration ('ext_pos_corr' ).perform (context )
8826
89- kv_pairs = []
27+ if not policy_path_value and wandb_path_value :
28+ policy_path_value = download_wandb_onnx (wandb_path_value )
29+
30+ controllers_config_path = f'config/{ robot_type_value } /controllers.yaml'
31+
32+ kv_pairs = resolve_policy_paths (controllers_config_path , 'motion_tracking_controller' )
9033 if policy_path_value :
9134 abs_path = os .path .abspath (os .path .expanduser (os .path .expandvars (policy_path_value )))
9235 kv_pairs .append (('walking_controller.policy.path' , abs_path ))
@@ -96,7 +39,6 @@ def setup_controllers(context):
9639 kv_pairs .append (('state_estimator.estimation.contact.height_sensor_noise' , 1e10 ))
9740 kv_pairs .append (('state_estimator.estimation.position.topic' , "/glim/odom" ))
9841
99- controllers_config_path = f'config/{ robot_type_value } /controllers.yaml'
10042 temp_controllers_config_path = generate_temp_config (
10143 controllers_config_path ,
10244 'motion_tracking_controller' ,
@@ -108,11 +50,13 @@ def setup_controllers(context):
10850 value = temp_controllers_config_path
10951 )
11052
53+ all_controllers = get_controller_names (controllers_config_path , 'motion_tracking_controller' )
11154 active_list = ["state_estimator" , "standby_controller" ]
112- inactive_list = ["walking_controller" ]
55+ inactive_list = [c for c in all_controllers if c not in active_list ]
11356
114- active_spawner = control_spawner (active_list )
115- inactive_spawner = control_spawner (inactive_list , inactive = True )
57+ param_file = LaunchConfiguration ('controllers_yaml' )
58+ active_spawner = control_spawner (active_list , param_file = param_file )
59+ inactive_spawner = control_spawner (inactive_list , inactive = True , param_file = param_file )
11660
11761 return [set_controllers_yaml , active_spawner , inactive_spawner ]
11862
@@ -144,7 +88,6 @@ def generate_launch_description():
14488 output = 'screen' ,
14589 parameters = [robot_description , {
14690 'publish_frequency' : 500.0 ,
147- 'use_sim_time' : True
14891 }],
14992 )
15093
@@ -156,16 +99,6 @@ def generate_launch_description():
15699 respawn = True ,
157100 )
158101
159- wandb = IncludeLaunchDescription (
160- PythonLaunchDescriptionSource ([ThisLaunchFileDir (), "/wandb.launch.py" ]),
161- launch_arguments = {
162- "wandb_path" : LaunchConfiguration ("wandb_path" )
163- }.items (),
164- condition = IfCondition (
165- PythonExpression (["'" , LaunchConfiguration ('policy_path' ), "' == ''" ])
166- )
167- )
168-
169102 controllers_opaque_func = OpaqueFunction (function = setup_controllers )
170103
171104 # Exclude all Unitree topics... it should start from the same namespace, fuck Unitree!
@@ -220,12 +153,17 @@ def generate_launch_description():
220153 default_value = 'false' ,
221154 description = 'Enable external position correction'
222155 ),
223- wandb ,
156+ DeclareLaunchArgument (
157+ 'wandb_path' ,
158+ default_value = '' ,
159+ description = 'W&B run path to download ONNX from (used when policy_path is empty)'
160+ ),
224161 controllers_opaque_func ,
225162 control_node ,
226163 node_robot_state_publisher ,
227164 rosbag2 ,
228165 IncludeLaunchDescription (
229- PythonLaunchDescriptionSource (teleop )
166+ PythonLaunchDescriptionSource (teleop ),
167+ launch_arguments = {'robot_type' : robot_type }.items ()
230168 )
231169 ])
0 commit comments