Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 import numpy as np
00033 import copy
00034
00035 import roslib
00036 roslib.load_manifest('hrl_pr2_arms')
00037
00038 import rospy
00039 import actionlib
00040 from pr2_controllers_msgs.msg import JointTrajectoryAction, JointTrajectoryGoal
00041 from pr2_controllers_msgs.msg import JointTrajectoryControllerState
00042 from trajectory_msgs.msg import JointTrajectoryPoint
00043
00044 from ep_arm_base import EPArmBase, create_ep_arm
00045
00046
00047
00048
00049
00050 class PR2ArmJointTraj(EPArmBase):
00051 def __init__(self, arm_side, urdf, base_link='torso_lift_link', end_link='%s_gripper_tool_frame',
00052 controller_name='/%s_arm_controller', kdl_tree=None, timeout=1.):
00053 super(PR2ArmJointTraj, self).__init__(arm_side, urdf, base_link, end_link,
00054 controller_name, kdl_tree, timeout)
00055 self.joint_action_client = actionlib.SimpleActionClient(
00056 self.controller_name + '/joint_trajectory_action',
00057 JointTrajectoryAction)
00058
00059 self._state_sub = rospy.Subscriber(self.controller_name + '/state',
00060 JointTrajectoryControllerState, self._ctrl_state_cb)
00061 if self.wait_for_joint_angles(0):
00062 if not self.wait_for_ep(timeout):
00063 rospy.logwarn("[pr2_arm_joint_traj] Timed out waiting for EP.")
00064 elif not self.joint_action_client.wait_for_server(rospy.Duration(timeout)):
00065 rospy.logwarn("[pr2_arm_joint_traj] JointTrajectoryAction action server timed out.")
00066
00067 def _ctrl_state_cb(self, ctrl_state):
00068 self._save_ep(np.array(ctrl_state.desired.positions))
00069 with self.ctrl_state_lock:
00070 self.ctrl_state_dict["frame"] = ctrl_state.header.frame_id
00071 self.ctrl_state_dict["x_desi"] = np.array(ctrl_state.desired.positions)
00072 self.ctrl_state_dict["xd_desi"] = np.array(ctrl_state.desired.velocities)
00073 self.ctrl_state_dict["xdd_desi"] = np.array(ctrl_state.desired.accelerations)
00074 self.ctrl_state_dict["x_act"] = np.array(ctrl_state.actual.positions)
00075 self.ctrl_state_dict["xd_act"] = np.array(ctrl_state.actual.velocities)
00076 self.ctrl_state_dict["xdd_act"] = np.array(ctrl_state.actual.accelerations)
00077 self.ctrl_state_dict["x_err"] = np.array(ctrl_state.error.positions)
00078 self.ctrl_state_dict["xd_err"] = np.array(ctrl_state.error.velocities)
00079 self.ctrl_state_dict["xdd_err"] = np.array(ctrl_state.error.accelerations)
00080
00081
00082
00083
00084 def get_ep(self, wrapped=False):
00085 with self.ep_lock:
00086 ret_ep = copy.copy(self.ep)
00087 if wrapped:
00088 return self.wrap_angles(ret_ep)
00089 else:
00090 return ret_ep
00091
00092
00093
00094
00095
00096
00097 def set_ep(self, jep, duration, delay=0.0):
00098 jtg = JointTrajectoryGoal()
00099 jtg.trajectory.header.stamp = rospy.Time.now() + rospy.Duration(delay)
00100 jtg.trajectory.joint_names = self.get_joint_names()
00101 jtp = JointTrajectoryPoint()
00102 jtp.positions = list(jep)
00103 jtp.time_from_start = rospy.Duration(duration)
00104 jtg.trajectory.points.append(jtp)
00105 self.joint_action_client.send_goal(jtg)
00106
00107
00108
00109
00110
00111
00112
00113
00114 def interpolate_ep(self, ep_a, ep_b, t_vals):
00115 linspace_list = [[ep_a[i] + (ep_b[i] - ep_a[i]) * t for t in t_vals]
00116 for i in range(len(ep_a))]
00117 return np.dstack(linspace_list)[0]