-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathmujoco_simulation.launch.py
More file actions
218 lines (184 loc) · 7.8 KB
/
mujoco_simulation.launch.py
File metadata and controls
218 lines (184 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import os
from pathlib import Path
import yaml
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import (
DeclareLaunchArgument,
ExecuteProcess,
LogInfo,
OpaqueFunction,
TimerAction,
)
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
# Teamplayer arguments to expose (name, description)
# Empty string default means "not set" - will use teamplayer's default
# sim is always true for mujoco simulation
TEAMPLAYER_ARGS = [
("audio", "Whether the audio system should be started"),
("behavior", "Whether the behavior control system should be started"),
("behavior_dsd_file", "The behavior dsd file that should be used"),
("game_controller", "Whether the Gamecontroller module should be started"),
("ipm", "Whether the inverse perspective mapping should be started"),
("localization", "Whether the localization system should be started"),
("motion", "Whether the motion control system should be started"),
("path_planning", "Whether the path planning should be started"),
("teamcom", "Whether the team communication system should be started"),
("vision", "Whether the vision system should be started"),
("world_model", "Whether the world model should be started"),
("monitoring", "Whether the system monitor and udp bridge should be started"),
("record", "Whether the ros bag recording should be started"),
("tts", "Whether to speak"),
]
def generate_domain_bridge_config(robot_domain: int, output_dir: Path) -> Path:
"""Generate domain bridge config file for a single robot.
Returns the config file path.
"""
main_domain = int(os.getenv("ROS_DOMAIN_ID", "0"))
output_dir.mkdir(parents=True, exist_ok=True)
namespace = f"robot{robot_domain}"
config = {
"name": f"robot{robot_domain}_bridge",
"from_domain": main_domain,
"to_domain": robot_domain,
"topics": {
# Clock: main → robot domain
"clock": {
"type": "rosgraph_msgs/msg/Clock",
},
},
}
# Sensor topics: main → robot domain (with remap to remove namespace)
sensor_topics = [
("joint_states", "sensor_msgs/msg/JointState"),
("imu/data", "sensor_msgs/msg/Imu"),
("camera/image_proc", "sensor_msgs/msg/Image"),
("camera/camera_info", "sensor_msgs/msg/CameraInfo"),
("foot_pressure_left/filtered", "bitbots_msgs/msg/FootPressure"),
("foot_pressure_right/filtered", "bitbots_msgs/msg/FootPressure"),
("foot_center_of_pressure_left", "geometry_msgs/msg/PointStamped"),
("foot_center_of_pressure_right", "geometry_msgs/msg/PointStamped"),
]
for topic_suffix, msg_type in sensor_topics:
src_topic = f"{namespace}/{topic_suffix}"
config["topics"][src_topic] = {
"type": msg_type,
"remap": topic_suffix,
}
# Command topic: robot domain → main (reversed direction)
# Key is source topic in from_domain, remap is destination in to_domain
config["topics"]["DynamixelController/command"] = {
"type": "bitbots_msgs/msg/JointCommand",
"from_domain": robot_domain,
"to_domain": main_domain,
"remap": f"{namespace}/DynamixelController/command",
}
config_path = output_dir / f"robot{robot_domain}_bridge.yaml"
with open(config_path, "w") as f:
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
return config_path
def generate_world_xml(num_robots: int, package_share: str, robot_type: str) -> Path:
"""Generate MuJoCo world XML with the correct number of robots."""
template_path = Path(package_share) / "xml" / "adult_field.xml"
output_path = Path(package_share) / "xml" / "generated_world.xml"
offset = 6 * (1/num_robots) # this makes the offset be the default value when there are 4 robots and increse the less robots there are
with open(template_path) as f:
template = f.read()
# Replace placeholder with actual robot count
world_xml = template.replace("{{NUM_ROBOTS}}", str(num_robots)).replace("{{OFFSET}}", str(offset)).replace("{{ROBOT_TYPE}}", robot_type)
with open(output_path, "w") as f:
f.write(world_xml)
return output_path
def launch_setup(context):
"""Dynamically set up launches based on num_robots."""
num_robots = int(LaunchConfiguration("num_robots").perform(context))
robot_type = str(LaunchConfiguration("robot_type").perform(context))
package_share = get_package_share_directory("bitbots_mujoco_sim")
bridge_config_dir = Path(package_share) / "config" / "domain_bridges"
# Get teamplayer argument values - only pass if explicitly set (not empty)
teamplayer_args = ["sim:=true"] # sim is always true for mujoco simulation
for arg_name, _ in TEAMPLAYER_ARGS:
value = LaunchConfiguration(arg_name).perform(context)
if value: # Only pass if not empty string
teamplayer_args.append(f"{arg_name}:={value}")
world_file = generate_world_xml(num_robots, package_share, robot_type)
actions = []
actions.append(
LogInfo(msg=f"Starting MuJoCo simulation with {num_robots} robot(s)"),
)
actions.append(
Node(
package="bitbots_mujoco_sim",
executable="sim",
name="sim_interface",
output="screen",
emulate_tty=True,
parameters=[{"world_file": str(world_file)}],
),
)
for robot_domain in range(num_robots + 11): # 11 is the standart starting id for our robots
config_file = generate_domain_bridge_config(robot_domain, bridge_config_dir)
actions.append(
LogInfo(msg=f"Starting domain bridge for robot{robot_domain} (domain {robot_domain})"),
)
actions.append(
Node(
package="domain_bridge",
executable="domain_bridge",
name=f"domain_bridge_robot{robot_domain}",
arguments=[str(config_file)],
output="screen",
emulate_tty=True,
),
)
actions.append(
TimerAction(
period=3.0,
actions=[
LogInfo(msg=f"Launching teamplayer stack for robot{robot_domain} in domain {robot_domain}"),
ExecuteProcess(
cmd=[
"ros2",
"launch",
"bitbots_bringup",
"teamplayer.launch",
]
+ teamplayer_args,
output="screen",
additional_env={"ROS_DOMAIN_ID": str(robot_domain)},
),
],
)
)
return actions
def generate_launch_description():
"""Launch MuJoCo simulation with domain bridge for multi-robot support."""
declared_args = [
DeclareLaunchArgument(
"num_robots",
default_value="1",
description="Number of robots in the simulation",
),
DeclareLaunchArgument(
"robot_type",
default_value="wolfgang",
description="Set the type of robot used (wolfgang, piplus, x02)",
),
]
# Add all teamplayer arguments with empty default (means use teamplayer's default)
for arg_name, description in TEAMPLAYER_ARGS:
declared_args.append(
DeclareLaunchArgument(
arg_name,
default_value="",
description=description,
)
)
return LaunchDescription(
declared_args
+ [
# All setup happens in OpaqueFunction to ensure proper ordering
OpaqueFunction(function=launch_setup),
]
)