00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 import numpy as np, math
00023 from threading import RLock, Timer
00024 import sys, copy
00025
00026 import roslib; roslib.load_manifest('sttr_behaviors')
00027 import rospy
00028 import actionlib
00029 import tf
00030
00031 from sensor_msgs.msg import JointState
00032 from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
00033 from hrl_msgs.msg import FloatArrayBare
00034
00035 from equilibrium_point_control.hrl_arm import HRLArm
00036 from pykdl_utils.kdl_kinematics import create_kdl_kin
00037
00038 class CronaArm(HRLArm):
00039 def __init__(self, arm, tf_listener=None):
00040 if arm != 'l' and arm != 'r':
00041 raise Exception, 'Arm should only be "l" or "r"'
00042 kinematics = create_kdl_kin('/torso_chest_link', arm + '_hand_link')
00043 HRLArm.__init__(self, kinematics)
00044 self.joint_names_list = kinematics.get_joint_names()
00045 self.torso_position = None
00046 self.arm_efforts = None
00047 self.delta_jep = None
00048
00049 try:
00050 self.kp = [rospy.get_param(arm+'_arm_controller/gains/'+nm+'/p') for nm in self.joint_names_list]
00051 except:
00052 print "kp is not on param server ... exiting"
00053 assert(False)
00054
00055
00056
00057
00058
00059 try:
00060 self.kd = [rospy.get_param(arm+'_arm_controller/gains/'+nm+'/d') for nm in self.joint_names_list]
00061 except:
00062 print "kd is not on param server ... exiting"
00063 assert(False)
00064
00065 rospy.Subscriber('/joint_states', JointState, self.joint_states_cb)
00066
00067
00068 rospy.Subscriber ("/haptic_mpc/q_des", FloatArrayBare, self.set_ep_ros)
00069 rospy.Subscriber ("/haptic_mpc/delta_q_des", FloatArrayBare, self.set_delta_ep_ros)
00070
00071
00072
00073
00074
00075 try:
00076 if tf_listener == None:
00077 self.tf_lstnr = tf.TransformListener()
00078 else:
00079 self.tf_lstnr = tf_listener
00080 except rospy.ServiceException, e:
00081 rospy.loginfo("ServiceException caught while instantiating a TF listener. This seems to be normal.")
00082 pass
00083
00084 self.joint_angles_pub = rospy.Publisher(arm+'_arm_controller/command',
00085 JointTrajectory)
00086
00087
00088
00089
00090
00091 def joint_states_cb(self, data):
00092 arm_angles = []
00093 arm_efforts = []
00094 arm_vel = []
00095 jt_idx_list = [0]*len(self.joint_names_list)
00096 for i, jt_nm in enumerate(self.joint_names_list):
00097 jt_idx_list[i] = data.name.index(jt_nm)
00098
00099 for i, idx in enumerate(jt_idx_list):
00100 if data.name[idx] != self.joint_names_list[i]:
00101 raise RuntimeError('joint angle name does not match.')
00102 arm_angles.append(data.position[idx])
00103 arm_efforts.append(data.effort[idx])
00104 arm_vel.append(data.velocity[idx])
00105
00106 with self.lock:
00107 self.q = arm_angles
00108 self.arm_efforts = arm_efforts
00109 self.qdot = arm_vel
00110
00111 torso_idx = data.name.index('torso_chest_joint')
00112 self.torso_position = data.position[torso_idx]
00113
00114 def set_ep(self, jep, duration=0.15):
00115 jep = copy.copy(jep)
00116 if jep is None or len(jep) != len(self.joint_names_list):
00117 raise RuntimeError("set_jep value is " + str(jep))
00118
00119 with self.lock:
00120 trajectory = JointTrajectory()
00121 trajectory.joint_names = self.joint_names_list
00122 jtp = JointTrajectoryPoint()
00123 jtp.positions = jep
00124 jtp.time_from_start = rospy.Duration(duration)
00125 trajectory.points.append(jtp)
00126 self.joint_angles_pub.publish(trajectory)
00127 self.ep = jep
00128
00129 def set_delta_ep_ros(self, msg):
00130 delta_jep = copy.copy(msg.data)
00131 if delta_jep is None or len(delta_jep) != len(self.joint_names_list):
00132 raise RuntimeError("set_jep value is " + str(delta_jep))
00133
00134 with self.lock:
00135 if self.ep == None:
00136 self.ep = self.get_joint_angles()
00137 jep = (np.array(self.ep) + np.array(delta_jep)).tolist()
00138 trajectory = JointTrajectory()
00139 trajectory.joint_names = self.joint_names_list
00140 jtp = JointTrajectoryPoint()
00141 jtp.positions = jep
00142 jtp.time_from_start = rospy.Duration(0.15)
00143 trajectory.points.append(jtp)
00144 self.joint_angles_pub.publish(trajectory)
00145 self.ep = jep
00146
00147 def set_ep_ros(self, msg):
00148 with self.lock:
00149 des_jep = copy.copy(msg.data)
00150 if des_jep is None or len(des_jep) != len(self.joint_names_list):
00151 raise RuntimeError("set_jep value is " + str(des_jep))
00152
00153 jep = (np.array(des_jep)).tolist()
00154 trajectory = JointTrajectory()
00155 trajectory.joint_names = self.joint_names_list
00156 jtp = JointTrajectoryPoint()
00157 jtp.positions = jep
00158 jtp.time_from_start = rospy.Duration(0.15)
00159 trajectory.points.append(jtp)
00160 self.joint_angles_pub.publish(trajectory)
00161 self.ep = jep
00162
00163 def wrap_angles(self, q):
00164 for ind in [4, 6]:
00165 while q[ind] < -np.pi:
00166 q[ind] += 2*np.pi
00167 while q[ind] > np.pi:
00168 q[ind] -= 2*np.pi
00169 return q
00170
00171 def publish_rviz_markers(self):
00172
00173 o = np.matrix([0.,0.,0.,1.]).T
00174 jep = self.get_ep()
00175 cep, r = self.kinematics.FK(jep)
00176 cep_marker = hv.single_marker(cep, o, 'sphere',
00177 '/torso_lift_link', color=(0., 0., 1., 1.),
00178 scale = (0.02, 0.02, 0.02), duration=0.,
00179 m_id=1)
00180 cep_marker.header.stamp = rospy.Time.now()
00181 self.marker_pub.publish(cep_marker)
00182
00183 q = self.get_joint_angles()
00184 ee, r = self.kinematics.FK(q)
00185 ee_marker = hv.single_marker(ee, o, 'sphere',
00186 '/torso_lift_link', color=(0., 1., 0., 1.),
00187 scale = (0.02, 0.02, 0.02), duration=0.,
00188 m_id=2)
00189 ee_marker.header.stamp = rospy.Time.now()
00190 self.marker_pub.publish(ee_marker)
00191
00192
00193 if __name__ == '__main__':
00194 rospy.init_node('crona_arms_test')
00195 robot = CronaArm('l')