[lightx2v_ros]: add RoboTwin 2.0 simulator and reconstruct ros#1207
[lightx2v_ros]: add RoboTwin 2.0 simulator and reconstruct ros#1207fuheaven wants to merge 1 commit into
Conversation
…iven refactor Introduce a contract-driven ROS architecture that generalizes the FastWAM policy/simulator/viewer stack across environments and add RoboTwin 2.0 support: - common: new EnvContract single-source-of-truth package (LIBERO + RoboTwin namespaces, cameras, action/state dims, policy profile, normalization mode, and a new /<ns>/episode topic for episode-boundary signaling). - simulator: generic BaseSimEnv + SimulatorNode driving any env via the contract; new RoboTwinEnv adapter (SAPIEN dual-arm), and continuous-eval loop (loop / max_episode_steps params) that auto-resets to a fresh episode on success or step cap instead of stopping. - inference: FastWAMPolicy generalized to multi-env (LinearNormalizer with min/max|q01/q99|z-score, per-profile camera composition, optional gripper post-processing); fastwam_node resets policy state on episode boundaries. - configs: add fastwam/robotwin_i2va.json and align fastwam/libero_i2va.json to the new schema (policy_profile / normalize_mode / gripper_postprocess). - vendored RoboTwin is referenced as a git submodule (RoboTwin-Platform/robotwin, branch main) instead of being committed in-tree.
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
There was a problem hiding this comment.
Code Review
This pull request refactors the ROS integration to be environment-agnostic by introducing a shared EnvContract and a generic SimulatorNode that supports both LIBERO and RoboTwin simulation environments. It also updates the policy runner to handle multiple profiles, normalization modes, and camera layouts dynamically. The review feedback highlights several critical improvements: scaling and clipping float RGB values from RoboTwin before casting to uint8 to avoid black images, validating the policy_profile parameter to prevent silent failures, implementing a retry mechanism during initial episode setup to handle unstable seeds, and properly handling the step cap when continuous looping is disabled.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| rgb = np.asarray(cameras[cam]["rgb"])[..., :3] | ||
| images[cam] = np.ascontiguousarray(rgb.astype(np.uint8)) |
There was a problem hiding this comment.
如果 RoboTwin 仿真器返回的 RGB 图像数据类型是浮点型(即像素值在 [0, 1] 之间),直接使用 rgb.astype(np.uint8) 会导致所有像素值被截断为 0 或 1,从而使图像几乎完全变黑。
为了保证兼容性和鲁棒性,建议先检查数据类型,如果是浮点型则先乘以 255 并进行裁剪,再转换为 uint8。
| rgb = np.asarray(cameras[cam]["rgb"])[..., :3] | |
| images[cam] = np.ascontiguousarray(rgb.astype(np.uint8)) | |
| rgb = np.asarray(cameras[cam]["rgb"])[..., :3] | |
| if np.issubdtype(rgb.dtype, np.floating): | |
| rgb = (rgb * 255.0).clip(0, 255) | |
| images[cam] = np.ascontiguousarray(rgb.astype(np.uint8)) |
| self.seed = None if seed is None or int(seed) < 0 else int(seed) | ||
| self.binarize_gripper = bool(binarize_gripper) | ||
| self.default_prompt = str(default_prompt) | ||
| self.policy_profile = str(policy_profile).strip().lower() |
There was a problem hiding this comment.
目前代码中没有对 policy_profile 的有效性进行校验。如果用户传入了不支持的 profile(例如拼写错误),代码会默认回退到 "libero" 的处理逻辑,这可能会导致难以排查的维度不匹配或 KeyError 错误。
建议在初始化时对 policy_profile 进行校验,仅允许 "libero" 和 "robotwin"。
| self.policy_profile = str(policy_profile).strip().lower() | |
| self.policy_profile = str(policy_profile).strip().lower() | |
| if self.policy_profile not in {"libero", "robotwin"}: | |
| raise ValueError(f"Unsupported policy_profile: {self.policy_profile}. Expected 'libero' or 'robotwin'.") |
| self._prepare_runtime() | ||
| self.args = self._build_task_args() | ||
| self.env = self._instantiate_task() | ||
| self._setup_episode() |
There was a problem hiding this comment.
在初始化 RoboTwinEnv 时,直接调用了 self._setup_episode()。由于 RoboTwin 仿真(基于 SAPIEN/physics)在初始随机化物体位置时可能会因为物理碰撞或不稳定状态(例如 UnStableError)而导致设置失败,如果初始 seed 不巧触发了该错误,整个 ROS 节点会在启动时直接崩溃。
建议在初始化时也引入类似于 new_episode 中的重试机制,自动尝试后续的 seed,以提高节点启动的稳定性。
| self._setup_episode() | |
| # Try to setup the initial episode with retries to handle unstable seeds | |
| last_err = None | |
| for _ in range(25): | |
| try: | |
| self._setup_episode() | |
| break | |
| except Exception as exc: | |
| last_err = exc | |
| self.seed += 1 | |
| else: | |
| raise RuntimeError(f"RoboTwin failed to set up the initial episode: {last_err}") |
| def on_action(self, msg): | ||
| if self.success: | ||
| # In loop mode `success` is reset to False synchronously in | ||
| # `_start_next_episode`, so this only drops late actions that raced the | ||
| # episode boundary; in single-episode mode it stops the rollout. | ||
| if not self.loop: | ||
| self.get_logger().warning("episode already succeeded; ignoring action") | ||
| return | ||
|
|
||
| action = np.asarray(msg.data, dtype=np.float32).reshape(-1) | ||
| if action.size != self.contract.action_dim: | ||
| self.get_logger().error( | ||
| f"expected action length {self.contract.action_dim}, got {action.size}" | ||
| ) | ||
| return | ||
|
|
||
| self.obs, success = self.env.step(action) | ||
| self.step_index += 1 | ||
| self.episode_step += 1 | ||
| self.success = bool(success) | ||
|
|
||
| capped = self.max_episode_steps > 0 and self.episode_step >= self.max_episode_steps | ||
| if self.loop and (self.success or capped): | ||
| outcome = "SUCCESS" if self.success else f"step cap ({self.max_episode_steps})" | ||
| self.get_logger().info( | ||
| f"episode {self.episode_index} ended [{outcome}] after {self.episode_step} steps " | ||
| f"(global step {self.step_index}); starting next episode..." | ||
| ) | ||
| # Emit the final frame (success flag reflects the outcome) before rebuilding. | ||
| self.publish_observation() | ||
| self._start_next_episode() | ||
| return | ||
|
|
||
| self.publish_observation() | ||
|
|
||
| if self.success and not self.loop: | ||
| self.get_logger().info(f"episode succeeded at step {self.step_index}") | ||
| self._slow_down_timer() |
There was a problem hiding this comment.
当 self.loop 为 False(单次评估模式)且步数达到上限 max_episode_steps 时,节点目前没有进行任何处理(既没有停止接收动作,也没有降低发布频率或输出警告)。这会导致节点在达到步数上限后依然继续运行,且用户无法直观获知评估已超时失败。
建议在单次评估模式下,无论是成功(self.success)还是达到步数上限(capped),都停止接收后续动作,并降低定时器发布频率。
def on_action(self, msg):
capped = self.max_episode_steps > 0 and self.episode_step >= self.max_episode_steps
if not self.loop and (self.success or capped):
self.get_logger().warning("episode already ended; ignoring action")
return
action = np.asarray(msg.data, dtype=np.float32).reshape(-1)
if action.size != self.contract.action_dim:
self.get_logger().error(
f"expected action length {self.contract.action_dim}, got {action.size}"
)
return
self.obs, success = self.env.step(action)
self.step_index += 1
self.episode_step += 1
self.success = bool(success)
capped = self.max_episode_steps > 0 and self.episode_step >= self.max_episode_steps
if self.loop and (self.success or capped):
outcome = "SUCCESS" if self.success else f"step cap ({self.max_episode_steps})"
self.get_logger().info(
f"episode {self.episode_index} ended [{outcome}] after {self.episode_step} steps "
f"(global step {self.step_index}); starting next episode..."
)
# Emit the final frame (success flag reflects the outcome) before rebuilding.
self.publish_observation()
self._start_next_episode()
return
self.publish_observation()
if not self.loop:
if self.success:
self.get_logger().info(f"episode succeeded at step {self.step_index}")
self._slow_down_timer()
elif capped:
self.get_logger().warning(f"episode failed: reached step cap ({self.max_episode_steps})")
self._slow_down_timer()
重构了ros相关的代码:
之后新增一个环境只需要新增一个envcontract实例
把"仿真环境"和"ROS 管线"解耦成两层:
BaseSimEnv(抽象接口):每个具体仿真器只需产出统一的 Observation(相机 RGB 字典 + 扁平 proprio 向量)并消费一个动作向量,暴露 reset() / step()。
SimulatorNode(通用节点):只跟 BaseSimEnv 接口对话,所有 topic/维度都从 EnvContract 派生。它通过一个 env_factory(node) -> BaseSimEnv 回调,让各环境在构造前自行声明自己的 ROS 参数。
于是 LIBERO 和 RoboTwin 的节点入口(libero_node/main.py、robotwin_node/main.py)被压缩成"取契约 + 传 env 工厂"的几行,新增环境完全不用碰 topic/线程/发布订阅这些管线代码。
这些都从 config_json 读取(policy_profile / normalize_mode / gripper_postprocess),配置即契约,策略代码不再区分环境。