Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 119 additions & 10 deletions robosuite/controllers/composite/composite_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ def action_limits(self):
for part_name, controller in self.part_controllers.items():
if part_name not in self.arms:
if part_name in self.grippers.keys():
low_g, high_g = ([-1] * self.grippers[part_name].dof, [1] * self.grippers[part_name].dof)
low_g, high_g = (
[-1] * self.grippers[part_name].dof,
[1] * self.grippers[part_name].dof,
)
low, high = np.concatenate([low, low_g]), np.concatenate([high, high_g])
else:
control_dim = controller.control_dim
Expand All @@ -172,6 +175,61 @@ def action_limits(self):
low, high = np.concatenate([low, low_c]), np.concatenate([high, high_c])
return low, high

def create_action_vector(self, action_dict):
"""
A helper function that creates the action vector given a dictionary
"""
full_action_vector = np.zeros(self.action_limits[0].shape)
for part_name, action_vector in action_dict.items():
if part_name not in self._action_split_indexes:
ROBOSUITE_DEFAULT_LOGGER.debug(f"{part_name} is not specified in the action space")
continue
start_idx, end_idx = self._action_split_indexes[part_name]
if end_idx - start_idx == 0:
# skipping not controlling actions
continue
assert len(action_vector) == (end_idx - start_idx), ROBOSUITE_DEFAULT_LOGGER.error(
f"Action vector for {part_name} is not the correct size. Expected {end_idx - start_idx} for {part_name}, got {len(action_vector)}"
)
full_action_vector[start_idx:end_idx] = action_vector
return full_action_vector

def create_action_dict_from_action_vector(self, action_vector):
action_dict = {}
for part_name, (start_idx, end_idx) in self._action_split_indexes.items():
action_dict[part_name] = action_vector[start_idx:end_idx]
return action_dict

def get_action_info(self):
action_index_info = []
action_dim_info = []
for part_name, (
start_idx,
end_idx,
) in self._action_split_indexes.items():
action_dim_info.append(f"{part_name}: {(end_idx - start_idx)} dim")
action_index_info.append(f"{part_name}: {start_idx}:{end_idx}")

return action_index_info, action_dim_info

def print_action_info(self):
action_index_info, action_dim_info = self.get_action_info()
action_index_info_str = ", ".join(action_index_info)
action_dim_info_str = ", ".join(action_dim_info)
ROBOSUITE_DEFAULT_LOGGER.info(f"Action Dimensions: [{action_dim_info_str}]")
ROBOSUITE_DEFAULT_LOGGER.info(f"Action Indices: [{action_index_info_str}]")

def get_action_info_dict(self):
info_dict = {}
info_dict["Action Dimension"] = self.action_limits[0].shape
info_dict.update(dict(self._action_split_indexes))
return info_dict

def print_action_info_dict(self, name: str = ""):
info_dict = self.get_action_info_dict()
info_dict_str = f"\nAction Info for {name}:\n\n{json.dumps(dict(info_dict), indent=4)}"
ROBOSUITE_DEFAULT_LOGGER.info(info_dict_str)


@register_composite_controller
class HybridMobileBase(CompositeController):
Expand Down Expand Up @@ -217,7 +275,7 @@ def create_action_vector(self, action_dict):
A helper function that creates the action vector given a dictionary
"""
full_action_vector = np.zeros(self.action_limits[0].shape)
for (part_name, action_vector) in action_dict.items():
for part_name, action_vector in action_dict.items():
if part_name not in self._action_split_indexes:
ROBOSUITE_DEFAULT_LOGGER.debug(f"{part_name} is not specified in the action space")
continue
Expand Down Expand Up @@ -323,6 +381,10 @@ def setup_whole_body_controller_action_split_idx(self):
previous_idx = last_idx

def set_goal(self, all_action):
if self.composite_controller_specific_config.get("skip_wbc_action", False):
super().set_goal(all_action)
return

target_qpos = self.joint_action_policy.solve(all_action[: self.joint_action_policy.control_dim])
# create new all_action vector with the IK solver's actions first
all_action = np.concatenate([target_qpos, all_action[self.joint_action_policy.control_dim :]])
Expand All @@ -343,6 +405,9 @@ def action_limits(self):
Returns the action limits for the whole body controller.
Corresponds to each term in the action vector passed to env.step().
"""
if self.composite_controller_specific_config.get("skip_wbc_action", False):
return super().action_limits

low, high = [], []
# assumption: IK solver's actions come first
low_c, high_c = self.joint_action_policy.control_limits
Expand All @@ -353,7 +418,10 @@ def action_limits(self):
continue
if part_name not in self.arms:
if part_name in self.grippers.keys():
low_g, high_g = ([-1] * self.grippers[part_name].dof, [1] * self.grippers[part_name].dof)
low_g, high_g = (
[-1] * self.grippers[part_name].dof,
[1] * self.grippers[part_name].dof,
)
low, high = np.concatenate([low, low_g]), np.concatenate([high, high_g])
else:
control_dim = controller.control_dim
Expand All @@ -365,8 +433,11 @@ def action_limits(self):
return low, high

def create_action_vector(self, action_dict: Dict[str, np.ndarray]) -> np.ndarray:
if self.composite_controller_specific_config.get("skip_wbc_action", False):
return super().create_action_vector(action_dict)

full_action_vector = np.zeros(self.action_limits[0].shape)
for (part_name, action_vector) in action_dict.items():
for part_name, action_vector in action_dict.items():
if part_name not in self._whole_body_controller_action_split_indexes:
ROBOSUITE_DEFAULT_LOGGER.debug(f"{part_name} is not specified in the action space")
continue
Expand All @@ -380,24 +451,62 @@ def create_action_vector(self, action_dict: Dict[str, np.ndarray]) -> np.ndarray
full_action_vector[start_idx:end_idx] = action_vector
return full_action_vector

def print_action_info(self):
def create_action_dict_from_action_vector(self, action_vector):
if self.composite_controller_specific_config.get("skip_wbc_action", False):
return super().create_action_dict_from_action_vector(action_vector)

action_dict = {}
for part_name, (start_idx, end_idx) in self._whole_body_controller_action_split_indexes.items():
action_dict[part_name] = action_vector[start_idx:end_idx]
return action_dict

def get_action_info(self):
if self.composite_controller_specific_config.get("skip_wbc_action", False):
return super().get_action_info()

action_index_info = []
action_dim_info = []
for part_name, (start_idx, end_idx) in self._whole_body_controller_action_split_indexes.items():
for part_name, (
start_idx,
end_idx,
) in self._whole_body_controller_action_split_indexes.items():
action_dim_info.append(f"{part_name}: {(end_idx - start_idx)} dim")
action_index_info.append(f"{part_name}: {start_idx}:{end_idx}")
return action_index_info, action_dim_info

action_dim_info_str = ", ".join(action_dim_info)
ROBOSUITE_DEFAULT_LOGGER.info(f"Action Dimensions: [{action_dim_info_str}]")
def print_action_info(self):
if self.composite_controller_specific_config.get("skip_wbc_action", False):
return super().print_action_info()

action_index_info, action_dim_info = self.get_action_info()
action_index_info_str = ", ".join(action_index_info)
action_dim_info_str = ", ".join(action_dim_info)
ROBOSUITE_DEFAULT_LOGGER.info(f"Action Dimensions: [{action_dim_info_str}]")
ROBOSUITE_DEFAULT_LOGGER.info(f"Action Indices: [{action_index_info_str}]")

def print_action_info_dict(self, name: str = ""):
def get_action_info_dict(self):
if self.composite_controller_specific_config.get("skip_wbc_action", False):
return super().get_action_info_dict()

info_dict = {}
info_dict["Action Dimension"] = self.action_limits[0].shape
info_dict.update(dict(self._whole_body_controller_action_split_indexes))
return info_dict

def get_action_info_dict(self):
if self.composite_controller_specific_config.get("skip_wbc_action", False):
return super().get_action_info_dict()

info_dict = {}
info_dict["Action Dimension"] = self.action_limits[0].shape
info_dict.update(dict(self._whole_body_controller_action_split_indexes))
return info_dict

def print_action_info_dict(self, name: str = ""):
if self.composite_controller_specific_config.get("skip_wbc_action", False):
return super().print_action_info_dict(name)

info_dict = self.get_action_info_dict()
info_dict_str = f"\nAction Info for {name}:\n\n{json.dumps(dict(info_dict), indent=4)}"
ROBOSUITE_DEFAULT_LOGGER.info(info_dict_str)

Expand Down Expand Up @@ -473,7 +582,7 @@ def _init_joint_action_policy(self):
max_dq=self.composite_controller_specific_config.get("ik_max_dq", 4),
max_dq_torso=self.composite_controller_specific_config.get("ik_max_dq_torso", 0.2),
input_rotation_repr=self.composite_controller_specific_config.get("ik_input_rotation_repr", "axis_angle"),
input_type=self.composite_controller_specific_config.get("ik_input_type", "axis_angle"),
input_ref_frame=self.composite_controller_specific_config.get("ik_input_ref_frame", "world"),
input_type=self.composite_controller_specific_config.get("ik_input_type", "axis_angle"),
debug=self.composite_controller_specific_config.get("verbose", False),
)
6 changes: 3 additions & 3 deletions robosuite/devices/mjgui.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,16 @@ def _reset_internal_state(self):
target_mat = self.env.sim.data.site_xmat[self.env.sim.model.site_name2id(site_name)]
set_mocap_pose(self.env.sim, target_pos, target_mat, f"{target_name_prefix}_eef_target")

def input2action(self, goal_update_mode="desired") -> Dict[str, np.ndarray]:
def input2action(self, goal_update_mode="target") -> Dict[str, np.ndarray]:
"""
Uses mocap body poses to determine action for robot. Obtain input_type
(i.e. absolute actions or delta actions) and input_ref_frame (i.e. world frame, base frame or eef frame)
from the controller itself.

"""
assert (
goal_update_mode == "desired"
), "goal_update_mode must be 'desired' for MJGUI: targets are based off the pose of the mocap body."
goal_update_mode == "target"
), "goal_update_mode must be 'target' for MJGUI: targets are based off the pose of the mocap body."
# TODO: unify this logic to be independent from controller type.
action: Dict[str, np.ndarray] = {}
gripper_dof = self.env.robots[0].gripper[self.active_end_effector].dof
Expand Down
2 changes: 1 addition & 1 deletion robosuite/models/objects/composite/hammer.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def init_quat(self):
np.array: (x, y, z, w) quaternion orientation for the hammer
"""
# Randomly sample between +/- flip (such that the hammer head faces one way or the other)
return np.array([0.5, -0.5, 0.5, -0.5]) if self.rng.rand() >= 0.5 else np.array([-0.5, -0.5, -0.5, -0.5])
return np.array([0.5, -0.5, 0.5, -0.5]) if self.rng.random() >= 0.5 else np.array([-0.5, -0.5, -0.5, -0.5])

@property
def handle_geoms(self):
Expand Down
56 changes: 55 additions & 1 deletion robosuite/robots/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import os
from collections import OrderedDict
from typing import Optional
from typing import Dict, Optional

import numpy as np

Expand Down Expand Up @@ -328,6 +328,8 @@ def setup_references(self):
self._ref_torso_joint_pos_indexes = [
self.sim.model.get_joint_qpos_addr(x) for x in self.robot_model._torso_joints
]
# indices for all joints (includes arms, base, torso, but not gripper joints, as those are handled separately)
self._ref_all_joint_pos_indexes = [self.sim.model.get_joint_qpos_addr(x) for x in self.robot_model.all_joints]

def setup_observables(self):
"""
Expand Down Expand Up @@ -638,6 +640,58 @@ def set_gripper_joint_positions(self, jpos: np.ndarray, gripper_arm: Optional[st
else:
raise ValueError(f"No gripper found for arm {gripper_arm}")

def set_all_robot_joint_positions(self, joint_positions: Dict[str, float]):
"""
Helper method to force robot joint positions to the passed values.
Assumes valid joint names are passed.

Args:
joint_positions (dict): joint name -> joint position (in angles / radians)
"""
for joint_name, position in joint_positions.items():
self.sim.data.qpos[self.sim.model.joint_name2id(joint_name)] = position
self.sim.forward()

def get_robot_joint_positions(self):
"""
Returns:
np.array: joint positions (in angles / radians)
"""
return self.sim.data.qpos[self._ref_joint_pos_indexes]

def get_all_robot_joint_positions(self) -> Dict[str, float]:
"""
Returns all joint positions including robot joints (arms, base, torso) and gripper joints.

Returns:
dict: joint name -> joint position (in angles / radians)
"""
joint_positions = {}

# Get all robot joint positions (includes arms, base, torso, etc.)
all_joint_names = self.robot_model.all_joints
for i, joint_name in enumerate(all_joint_names):
joint_positions[joint_name] = self.sim.data.qpos[self._ref_all_joint_pos_indexes[i]]

# Add gripper joint positions for all arms
for arm in self.arms:
if self.has_gripper[arm]:
gripper_joint_names = self.gripper_joints[arm]
gripper_positions = self.sim.data.qpos[self._ref_gripper_joint_pos_indexes[arm]]
for joint_name, position in zip(gripper_joint_names, gripper_positions):
joint_positions[joint_name] = position

return joint_positions

def get_gripper_joint_positions(self, gripper_arm: Optional[str] = None):
"""
Returns:
np.array: gripper joint positions (in angles / radians)
"""
if gripper_arm is None:
gripper_arm = self.arms[0]
return self.sim.data.qpos[self._ref_gripper_joint_pos_indexes[gripper_arm]]

@property
def js_energy(self):
"""
Expand Down