diff --git a/robosuite/controllers/composite/composite_controller.py b/robosuite/controllers/composite/composite_controller.py index e97eb15db2..423f368c18 100644 --- a/robosuite/controllers/composite/composite_controller.py +++ b/robosuite/controllers/composite/composite_controller.py @@ -161,7 +161,10 @@ def action_limits(self): for part_name, controller in self.part_controllers.items(): if part_name not in self.arms: if part_name in self.grippers.keys(): - low_g, high_g = ([-1] * self.grippers[part_name].dof, [1] * self.grippers[part_name].dof) + low_g, high_g = ( + [-1] * self.grippers[part_name].dof, + [1] * self.grippers[part_name].dof, + ) low, high = np.concatenate([low, low_g]), np.concatenate([high, high_g]) else: control_dim = controller.control_dim @@ -172,6 +175,61 @@ def action_limits(self): low, high = np.concatenate([low, low_c]), np.concatenate([high, high_c]) return low, high + def create_action_vector(self, action_dict): + """ + A helper function that creates the action vector given a dictionary + """ + full_action_vector = np.zeros(self.action_limits[0].shape) + for part_name, action_vector in action_dict.items(): + if part_name not in self._action_split_indexes: + ROBOSUITE_DEFAULT_LOGGER.debug(f"{part_name} is not specified in the action space") + continue + start_idx, end_idx = self._action_split_indexes[part_name] + if end_idx - start_idx == 0: + # skipping not controlling actions + continue + assert len(action_vector) == (end_idx - start_idx), ROBOSUITE_DEFAULT_LOGGER.error( + f"Action vector for {part_name} is not the correct size. Expected {end_idx - start_idx} for {part_name}, got {len(action_vector)}" + ) + full_action_vector[start_idx:end_idx] = action_vector + return full_action_vector + + def create_action_dict_from_action_vector(self, action_vector): + action_dict = {} + for part_name, (start_idx, end_idx) in self._action_split_indexes.items(): + action_dict[part_name] = action_vector[start_idx:end_idx] + return action_dict + + def get_action_info(self): + action_index_info = [] + action_dim_info = [] + for part_name, ( + start_idx, + end_idx, + ) in self._action_split_indexes.items(): + action_dim_info.append(f"{part_name}: {(end_idx - start_idx)} dim") + action_index_info.append(f"{part_name}: {start_idx}:{end_idx}") + + return action_index_info, action_dim_info + + def print_action_info(self): + action_index_info, action_dim_info = self.get_action_info() + action_index_info_str = ", ".join(action_index_info) + action_dim_info_str = ", ".join(action_dim_info) + ROBOSUITE_DEFAULT_LOGGER.info(f"Action Dimensions: [{action_dim_info_str}]") + ROBOSUITE_DEFAULT_LOGGER.info(f"Action Indices: [{action_index_info_str}]") + + def get_action_info_dict(self): + info_dict = {} + info_dict["Action Dimension"] = self.action_limits[0].shape + info_dict.update(dict(self._action_split_indexes)) + return info_dict + + def print_action_info_dict(self, name: str = ""): + info_dict = self.get_action_info_dict() + info_dict_str = f"\nAction Info for {name}:\n\n{json.dumps(dict(info_dict), indent=4)}" + ROBOSUITE_DEFAULT_LOGGER.info(info_dict_str) + @register_composite_controller class HybridMobileBase(CompositeController): @@ -217,7 +275,7 @@ def create_action_vector(self, action_dict): A helper function that creates the action vector given a dictionary """ full_action_vector = np.zeros(self.action_limits[0].shape) - for (part_name, action_vector) in action_dict.items(): + for part_name, action_vector in action_dict.items(): if part_name not in self._action_split_indexes: ROBOSUITE_DEFAULT_LOGGER.debug(f"{part_name} is not specified in the action space") continue @@ -323,6 +381,10 @@ def setup_whole_body_controller_action_split_idx(self): previous_idx = last_idx def set_goal(self, all_action): + if self.composite_controller_specific_config.get("skip_wbc_action", False): + super().set_goal(all_action) + return + target_qpos = self.joint_action_policy.solve(all_action[: self.joint_action_policy.control_dim]) # create new all_action vector with the IK solver's actions first all_action = np.concatenate([target_qpos, all_action[self.joint_action_policy.control_dim :]]) @@ -343,6 +405,9 @@ def action_limits(self): Returns the action limits for the whole body controller. Corresponds to each term in the action vector passed to env.step(). """ + if self.composite_controller_specific_config.get("skip_wbc_action", False): + return super().action_limits + low, high = [], [] # assumption: IK solver's actions come first low_c, high_c = self.joint_action_policy.control_limits @@ -353,7 +418,10 @@ def action_limits(self): continue if part_name not in self.arms: if part_name in self.grippers.keys(): - low_g, high_g = ([-1] * self.grippers[part_name].dof, [1] * self.grippers[part_name].dof) + low_g, high_g = ( + [-1] * self.grippers[part_name].dof, + [1] * self.grippers[part_name].dof, + ) low, high = np.concatenate([low, low_g]), np.concatenate([high, high_g]) else: control_dim = controller.control_dim @@ -365,8 +433,11 @@ def action_limits(self): return low, high def create_action_vector(self, action_dict: Dict[str, np.ndarray]) -> np.ndarray: + if self.composite_controller_specific_config.get("skip_wbc_action", False): + return super().create_action_vector(action_dict) + full_action_vector = np.zeros(self.action_limits[0].shape) - for (part_name, action_vector) in action_dict.items(): + for part_name, action_vector in action_dict.items(): if part_name not in self._whole_body_controller_action_split_indexes: ROBOSUITE_DEFAULT_LOGGER.debug(f"{part_name} is not specified in the action space") continue @@ -380,24 +451,62 @@ def create_action_vector(self, action_dict: Dict[str, np.ndarray]) -> np.ndarray full_action_vector[start_idx:end_idx] = action_vector return full_action_vector - def print_action_info(self): + def create_action_dict_from_action_vector(self, action_vector): + if self.composite_controller_specific_config.get("skip_wbc_action", False): + return super().create_action_dict_from_action_vector(action_vector) + + action_dict = {} + for part_name, (start_idx, end_idx) in self._whole_body_controller_action_split_indexes.items(): + action_dict[part_name] = action_vector[start_idx:end_idx] + return action_dict + + def get_action_info(self): + if self.composite_controller_specific_config.get("skip_wbc_action", False): + return super().get_action_info() + action_index_info = [] action_dim_info = [] - for part_name, (start_idx, end_idx) in self._whole_body_controller_action_split_indexes.items(): + for part_name, ( + start_idx, + end_idx, + ) in self._whole_body_controller_action_split_indexes.items(): action_dim_info.append(f"{part_name}: {(end_idx - start_idx)} dim") action_index_info.append(f"{part_name}: {start_idx}:{end_idx}") + return action_index_info, action_dim_info - action_dim_info_str = ", ".join(action_dim_info) - ROBOSUITE_DEFAULT_LOGGER.info(f"Action Dimensions: [{action_dim_info_str}]") + def print_action_info(self): + if self.composite_controller_specific_config.get("skip_wbc_action", False): + return super().print_action_info() + action_index_info, action_dim_info = self.get_action_info() action_index_info_str = ", ".join(action_index_info) + action_dim_info_str = ", ".join(action_dim_info) + ROBOSUITE_DEFAULT_LOGGER.info(f"Action Dimensions: [{action_dim_info_str}]") ROBOSUITE_DEFAULT_LOGGER.info(f"Action Indices: [{action_index_info_str}]") - def print_action_info_dict(self, name: str = ""): + def get_action_info_dict(self): + if self.composite_controller_specific_config.get("skip_wbc_action", False): + return super().get_action_info_dict() + info_dict = {} info_dict["Action Dimension"] = self.action_limits[0].shape info_dict.update(dict(self._whole_body_controller_action_split_indexes)) + return info_dict + def get_action_info_dict(self): + if self.composite_controller_specific_config.get("skip_wbc_action", False): + return super().get_action_info_dict() + + info_dict = {} + info_dict["Action Dimension"] = self.action_limits[0].shape + info_dict.update(dict(self._whole_body_controller_action_split_indexes)) + return info_dict + + def print_action_info_dict(self, name: str = ""): + if self.composite_controller_specific_config.get("skip_wbc_action", False): + return super().print_action_info_dict(name) + + info_dict = self.get_action_info_dict() info_dict_str = f"\nAction Info for {name}:\n\n{json.dumps(dict(info_dict), indent=4)}" ROBOSUITE_DEFAULT_LOGGER.info(info_dict_str) @@ -473,7 +582,7 @@ def _init_joint_action_policy(self): max_dq=self.composite_controller_specific_config.get("ik_max_dq", 4), max_dq_torso=self.composite_controller_specific_config.get("ik_max_dq_torso", 0.2), input_rotation_repr=self.composite_controller_specific_config.get("ik_input_rotation_repr", "axis_angle"), - input_type=self.composite_controller_specific_config.get("ik_input_type", "axis_angle"), input_ref_frame=self.composite_controller_specific_config.get("ik_input_ref_frame", "world"), + input_type=self.composite_controller_specific_config.get("ik_input_type", "axis_angle"), debug=self.composite_controller_specific_config.get("verbose", False), ) diff --git a/robosuite/devices/mjgui.py b/robosuite/devices/mjgui.py index e2065941c9..e7741deb5f 100644 --- a/robosuite/devices/mjgui.py +++ b/robosuite/devices/mjgui.py @@ -116,7 +116,7 @@ def _reset_internal_state(self): target_mat = self.env.sim.data.site_xmat[self.env.sim.model.site_name2id(site_name)] set_mocap_pose(self.env.sim, target_pos, target_mat, f"{target_name_prefix}_eef_target") - def input2action(self, goal_update_mode="desired") -> Dict[str, np.ndarray]: + def input2action(self, goal_update_mode="target") -> Dict[str, np.ndarray]: """ Uses mocap body poses to determine action for robot. Obtain input_type (i.e. absolute actions or delta actions) and input_ref_frame (i.e. world frame, base frame or eef frame) @@ -124,8 +124,8 @@ def input2action(self, goal_update_mode="desired") -> Dict[str, np.ndarray]: """ assert ( - goal_update_mode == "desired" - ), "goal_update_mode must be 'desired' for MJGUI: targets are based off the pose of the mocap body." + goal_update_mode == "target" + ), "goal_update_mode must be 'target' for MJGUI: targets are based off the pose of the mocap body." # TODO: unify this logic to be independent from controller type. action: Dict[str, np.ndarray] = {} gripper_dof = self.env.robots[0].gripper[self.active_end_effector].dof diff --git a/robosuite/models/objects/composite/hammer.py b/robosuite/models/objects/composite/hammer.py index 916b7f87ed..b993b7a299 100644 --- a/robosuite/models/objects/composite/hammer.py +++ b/robosuite/models/objects/composite/hammer.py @@ -233,7 +233,7 @@ def init_quat(self): np.array: (x, y, z, w) quaternion orientation for the hammer """ # Randomly sample between +/- flip (such that the hammer head faces one way or the other) - return np.array([0.5, -0.5, 0.5, -0.5]) if self.rng.rand() >= 0.5 else np.array([-0.5, -0.5, -0.5, -0.5]) + return np.array([0.5, -0.5, 0.5, -0.5]) if self.rng.random() >= 0.5 else np.array([-0.5, -0.5, -0.5, -0.5]) @property def handle_geoms(self): diff --git a/robosuite/robots/robot.py b/robosuite/robots/robot.py index 7ba93b0e10..a30c99bf79 100644 --- a/robosuite/robots/robot.py +++ b/robosuite/robots/robot.py @@ -2,7 +2,7 @@ import json import os from collections import OrderedDict -from typing import Optional +from typing import Dict, Optional import numpy as np @@ -328,6 +328,8 @@ def setup_references(self): self._ref_torso_joint_pos_indexes = [ self.sim.model.get_joint_qpos_addr(x) for x in self.robot_model._torso_joints ] + # indices for all joints (includes arms, base, torso, but not gripper joints, as those are handled separately) + self._ref_all_joint_pos_indexes = [self.sim.model.get_joint_qpos_addr(x) for x in self.robot_model.all_joints] def setup_observables(self): """ @@ -638,6 +640,58 @@ def set_gripper_joint_positions(self, jpos: np.ndarray, gripper_arm: Optional[st else: raise ValueError(f"No gripper found for arm {gripper_arm}") + def set_all_robot_joint_positions(self, joint_positions: Dict[str, float]): + """ + Helper method to force robot joint positions to the passed values. + Assumes valid joint names are passed. + + Args: + joint_positions (dict): joint name -> joint position (in angles / radians) + """ + for joint_name, position in joint_positions.items(): + self.sim.data.qpos[self.sim.model.joint_name2id(joint_name)] = position + self.sim.forward() + + def get_robot_joint_positions(self): + """ + Returns: + np.array: joint positions (in angles / radians) + """ + return self.sim.data.qpos[self._ref_joint_pos_indexes] + + def get_all_robot_joint_positions(self) -> Dict[str, float]: + """ + Returns all joint positions including robot joints (arms, base, torso) and gripper joints. + + Returns: + dict: joint name -> joint position (in angles / radians) + """ + joint_positions = {} + + # Get all robot joint positions (includes arms, base, torso, etc.) + all_joint_names = self.robot_model.all_joints + for i, joint_name in enumerate(all_joint_names): + joint_positions[joint_name] = self.sim.data.qpos[self._ref_all_joint_pos_indexes[i]] + + # Add gripper joint positions for all arms + for arm in self.arms: + if self.has_gripper[arm]: + gripper_joint_names = self.gripper_joints[arm] + gripper_positions = self.sim.data.qpos[self._ref_gripper_joint_pos_indexes[arm]] + for joint_name, position in zip(gripper_joint_names, gripper_positions): + joint_positions[joint_name] = position + + return joint_positions + + def get_gripper_joint_positions(self, gripper_arm: Optional[str] = None): + """ + Returns: + np.array: gripper joint positions (in angles / radians) + """ + if gripper_arm is None: + gripper_arm = self.arms[0] + return self.sim.data.qpos[self._ref_gripper_joint_pos_indexes[gripper_arm]] + @property def js_energy(self): """