Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 import numpy as np
00033 import copy
00034 
00035 import roslib
00036 roslib.load_manifest('hrl_pr2_arms')
00037 
00038 import rospy
00039 import actionlib
00040 from pr2_controllers_msgs.msg import JointTrajectoryAction, JointTrajectoryGoal 
00041 from pr2_controllers_msgs.msg import JointTrajectoryControllerState 
00042 from trajectory_msgs.msg import JointTrajectoryPoint
00043 
00044 from ep_arm_base import EPArmBase, create_ep_arm
00045 
00046 
00047 
00048 
00049 
00050 class PR2ArmJointTraj(EPArmBase):
00051     def __init__(self, arm_side, urdf, base_link='torso_lift_link', end_link='%s_gripper_tool_frame', 
00052                  controller_name='/%s_arm_controller', kdl_tree=None, timeout=1.):
00053         super(PR2ArmJointTraj, self).__init__(arm_side, urdf, base_link, end_link, 
00054                                               controller_name, kdl_tree, timeout)
00055         self.joint_action_client = actionlib.SimpleActionClient(
00056                                        self.controller_name + '/joint_trajectory_action',
00057                                        JointTrajectoryAction)
00058 
00059         self._state_sub = rospy.Subscriber(self.controller_name + '/state', 
00060                                            JointTrajectoryControllerState, self._ctrl_state_cb)
00061         if self.wait_for_joint_angles(0):
00062             if not self.wait_for_ep(timeout):
00063                 rospy.logwarn("[pr2_arm_joint_traj] Timed out waiting for EP.")
00064             elif not self.joint_action_client.wait_for_server(rospy.Duration(timeout)):
00065                 rospy.logwarn("[pr2_arm_joint_traj] JointTrajectoryAction action server timed out.")
00066 
00067     def _ctrl_state_cb(self, ctrl_state):
00068         self._save_ep(np.array(ctrl_state.desired.positions))
00069         with self.ctrl_state_lock:
00070             self.ctrl_state_dict["frame"] = ctrl_state.header.frame_id
00071             self.ctrl_state_dict["x_desi"] = np.array(ctrl_state.desired.positions)
00072             self.ctrl_state_dict["xd_desi"] = np.array(ctrl_state.desired.velocities)
00073             self.ctrl_state_dict["xdd_desi"] = np.array(ctrl_state.desired.accelerations)
00074             self.ctrl_state_dict["x_act"] = np.array(ctrl_state.actual.positions)
00075             self.ctrl_state_dict["xd_act"] = np.array(ctrl_state.actual.velocities)
00076             self.ctrl_state_dict["xdd_act"] = np.array(ctrl_state.actual.accelerations)
00077             self.ctrl_state_dict["x_err"] = np.array(ctrl_state.error.positions)
00078             self.ctrl_state_dict["xd_err"] = np.array(ctrl_state.error.velocities)
00079             self.ctrl_state_dict["xdd_err"] = np.array(ctrl_state.error.accelerations)
00080 
00081     
00082     
00083     
00084     def get_ep(self, wrapped=False):
00085         with self.ep_lock:
00086             ret_ep = copy.copy(self.ep)
00087         if wrapped:
00088             return self.wrap_angles(ret_ep)
00089         else:
00090             return ret_ep
00091             
00092     
00093     
00094     
00095     
00096     
00097     def set_ep(self, jep, duration, delay=0.0):
00098         jtg = JointTrajectoryGoal()
00099         jtg.trajectory.header.stamp = rospy.Time.now() + rospy.Duration(delay)
00100         jtg.trajectory.joint_names = self.get_joint_names()
00101         jtp = JointTrajectoryPoint()
00102         jtp.positions = list(jep)
00103         jtp.time_from_start = rospy.Duration(duration)
00104         jtg.trajectory.points.append(jtp)
00105         self.joint_action_client.send_goal(jtg)
00106 
00107     
00108     
00109     
00110     
00111     
00112     
00113     
00114     def interpolate_ep(self, ep_a, ep_b, t_vals):
00115         linspace_list = [[ep_a[i] + (ep_b[i] - ep_a[i]) * t for t in t_vals] 
00116                          for i in range(len(ep_a))]
00117         return np.dstack(linspace_list)[0]