11"""Interfaces to PyBullet robots."""
22from __future__ import annotations
33
4- from typing import TYPE_CHECKING , Callable , Collection , Dict , Iterator , List , \
5- Optional , Sequence , Tuple , cast
4+ from typing import TYPE_CHECKING , Collection , Iterator , List , Optional , \
5+ Sequence
66
77import numpy as np
88import pybullet as p
9- from gym .spaces import Box
109
1110from predicators .src import utils
1211from predicators .src .settings import CFG
13- from predicators .src .structs import Action , Array , JointsState , Object , \
14- ParameterizedOption , Pose3D , State , Type
12+ from predicators .src .structs import JointsState
1513
1614if TYPE_CHECKING :
1715 from predicators .src .pybullet_helpers .robots import SingleArmPyBulletRobot
1816
19- ################################# Controllers #################################
20-
21-
22- def create_move_end_effector_to_pose_option (
23- robot : SingleArmPyBulletRobot ,
24- name : str ,
25- types : Sequence [Type ],
26- params_space : Box ,
27- get_current_and_target_pose_and_finger_status : Callable [
28- [State , Sequence [Object ], Array ], Tuple [Pose3D , Pose3D , str ]],
29- move_to_pose_tol : float ,
30- max_vel_norm : float ,
31- finger_action_nudge_magnitude : float ,
32- ) -> ParameterizedOption :
33- """A generic utility that creates a ParameterizedOption for moving the end
34- effector to a target pose, given a function that takes in the current
35- state, objects, and parameters, and returns the current pose and target
36- pose of the end effector, and the finger status."""
37-
38- assert robot .get_name () == "fetch" , "Move end effector to pose option " + \
39- f"not implemented for robot { robot .get_name ()} ."
40-
41- def _policy (state : State , memory : Dict , objects : Sequence [Object ],
42- params : Array ) -> Action :
43- del memory # unused
44- # First handle the main arm joints.
45- current , target , finger_status = \
46- get_current_and_target_pose_and_finger_status (
47- state , objects , params )
48- # Run IK to determine the target joint positions.
49- ee_delta = np .subtract (target , current )
50- # Reduce the target to conform to the max velocity constraint.
51- ee_norm = np .linalg .norm (ee_delta )
52- if ee_norm > max_vel_norm :
53- ee_delta = ee_delta * max_vel_norm / ee_norm
54- ee_action = np .add (current , ee_delta )
55- # Keep validate as False because validate=True would update the
56- # state of the robot during simulation, which overrides physics.
57- joints_state = robot .inverse_kinematics (
58- (ee_action [0 ], ee_action [1 ], ee_action [2 ]), validate = False )
59- # Handle the fingers. Fingers drift if left alone.
60- # When the fingers are not explicitly being opened or closed, we
61- # nudge the fingers toward being open or closed according to the
62- # finger status.
63- if finger_status == "open" :
64- finger_delta = finger_action_nudge_magnitude
65- else :
66- assert finger_status == "closed"
67- finger_delta = - finger_action_nudge_magnitude
68- # Extract the current finger state.
69- state = cast (utils .PyBulletState , state )
70- finger_state = state .joints_state [robot .left_finger_joint_idx ]
71- # The finger action is an absolute joint position for the fingers.
72- f_action = finger_state + finger_delta
73- # Override the meaningless finger values in joint_action.
74- joints_state [robot .left_finger_joint_idx ] = f_action
75- joints_state [robot .right_finger_joint_idx ] = f_action
76- action_arr = np .array (joints_state , dtype = np .float32 )
77- # This clipping is needed sometimes for the joint limits.
78- action_arr = np .clip (action_arr , robot .action_space .low ,
79- robot .action_space .high )
80- assert robot .action_space .contains (action_arr )
81- return Action (action_arr )
82-
83- def _terminal (state : State , memory : Dict , objects : Sequence [Object ],
84- params : Array ) -> bool :
85- del memory # unused
86- current , target , _ = \
87- get_current_and_target_pose_and_finger_status (
88- state , objects , params )
89- squared_dist = np .sum (np .square (np .subtract (current , target )))
90- return squared_dist < move_to_pose_tol
91-
92- return ParameterizedOption (name ,
93- types = types ,
94- params_space = params_space ,
95- policy = _policy ,
96- initiable = lambda _1 , _2 , _3 , _4 : True ,
97- terminal = _terminal )
98-
99-
100- def create_change_fingers_option (
101- robot : SingleArmPyBulletRobot ,
102- name : str ,
103- types : Sequence [Type ],
104- params_space : Box ,
105- get_current_and_target_val : Callable [[State , Sequence [Object ], Array ],
106- Tuple [float , float ]],
107- max_vel_norm : float ,
108- grasp_tol : float ,
109- ) -> ParameterizedOption :
110- """A generic utility that creates a ParameterizedOption for changing the
111- robot fingers, given a function that takes in the current state, objects,
112- and parameters, and returns the current and target finger joint values."""
113-
114- assert robot .get_name () == "fetch" , "Change fingers option not " + \
115- f"implemented for robot { robot .get_name ()} ."
116-
117- def _policy (state : State , memory : Dict , objects : Sequence [Object ],
118- params : Array ) -> Action :
119- del memory # unused
120- current_val , target_val = get_current_and_target_val (
121- state , objects , params )
122- f_delta = target_val - current_val
123- f_delta = np .clip (f_delta , - max_vel_norm , max_vel_norm )
124- f_action = current_val + f_delta
125- # Don't change the rest of the joints.
126- state = cast (utils .PyBulletState , state )
127- target = np .array (state .joints_state , dtype = np .float32 )
128- target [robot .left_finger_joint_idx ] = f_action
129- target [robot .right_finger_joint_idx ] = f_action
130- # This clipping is needed sometimes for the joint limits.
131- target = np .clip (target , robot .action_space .low ,
132- robot .action_space .high )
133- assert robot .action_space .contains (target )
134- return Action (target )
135-
136- def _terminal (state : State , memory : Dict , objects : Sequence [Object ],
137- params : Array ) -> bool :
138- del memory # unused
139- current_val , target_val = get_current_and_target_val (
140- state , objects , params )
141- squared_dist = (target_val - current_val )** 2
142- return squared_dist < grasp_tol
143-
144- return ParameterizedOption (name ,
145- types = types ,
146- params_space = params_space ,
147- policy = _policy ,
148- initiable = lambda _1 , _2 , _3 , _4 : True ,
149- terminal = _terminal )
150-
151-
15217########################### Other utility functions ###########################
15318
15419
@@ -169,89 +34,6 @@ def get_kinematic_chain(robot: int, end_effector: int,
16934 return kinematic_chain
17035
17136
172- def pybullet_inverse_kinematics (
173- robot : int ,
174- end_effector : int ,
175- target_position : Pose3D ,
176- target_orientation : Sequence [float ],
177- joints : Sequence [int ],
178- physics_client_id : int ,
179- validate : bool = True ,
180- ) -> JointsState :
181- """Runs IK and returns a joints state for the given (free) joints.
182-
183- If validate is True, the PyBullet IK solver is called multiple
184- times, resetting the robot state each time, until the target
185- position is reached. If the target position is not reached after a
186- maximum number of iters, an exception is raised.
187- """
188- # Figure out which joint each dimension of the return of IK corresponds to.
189- free_joints = []
190- num_joints = p .getNumJoints (robot , physicsClientId = physics_client_id )
191- for idx in range (num_joints ):
192- joint_info = p .getJointInfo (robot ,
193- idx ,
194- physicsClientId = physics_client_id )
195- if joint_info [3 ] > - 1 :
196- free_joints .append (idx )
197- assert set (joints ).issubset (set (free_joints ))
198-
199- # Record the initial state of the joints so that we can reset them after.
200- if validate :
201- initial_joints_states = p .getJointStates (
202- robot , free_joints , physicsClientId = physics_client_id )
203- assert len (initial_joints_states ) == len (free_joints )
204-
205- # Running IK once is often insufficient, so we run it multiple times until
206- # convergence. If it does not converge, an error is raised.
207- convergence_tol = CFG .pybullet_ik_tol
208- for _ in range (CFG .pybullet_max_ik_iters ):
209- free_joint_vals = p .calculateInverseKinematics (
210- robot ,
211- end_effector ,
212- target_position ,
213- targetOrientation = target_orientation ,
214- physicsClientId = physics_client_id )
215- assert len (free_joints ) == len (free_joint_vals )
216- if not validate :
217- break
218- # Update the robot state and check if the desired position and
219- # orientation are reached.
220- for joint , joint_val in zip (free_joints , free_joint_vals ):
221- p .resetJointState (robot ,
222- joint ,
223- targetValue = joint_val ,
224- physicsClientId = physics_client_id )
225- ee_link_state = p .getLinkState (robot ,
226- end_effector ,
227- computeForwardKinematics = True ,
228- physicsClientId = physics_client_id )
229- position = ee_link_state [4 ]
230- # Note: we are checking positions only for convergence.
231- if np .allclose (position , target_position , atol = convergence_tol ):
232- break
233- else :
234- raise Exception ("Inverse kinematics failed to converge." )
235-
236- # Reset the joint states to their initial values to avoid modifying the
237- # PyBullet internal state.
238- if validate :
239- for joint , (pos , vel , _ , _ ) in zip (free_joints , initial_joints_states ):
240- p .resetJointState (robot ,
241- joint ,
242- targetValue = pos ,
243- targetVelocity = vel ,
244- physicsClientId = physics_client_id )
245- # Order the found free_joint_vals based on the requested joints.
246- joint_vals = []
247- for joint in joints :
248- free_joint_idx = free_joints .index (joint )
249- joint_val = free_joint_vals [free_joint_idx ]
250- joint_vals .append (joint_val )
251-
252- return joint_vals
253-
254-
25537def run_motion_planning (
25638 robot : SingleArmPyBulletRobot , initial_state : JointsState ,
25739 target_state : JointsState , collision_bodies : Collection [int ],
0 commit comments