00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 import numpy as np
00034 
00035 import rospy
00036 from sensor_msgs.msg import JointState
00037 
00038 from urdf_parser_py.urdf import URDF
00039 from pykdl_utils.kdl_kinematics import KDLKinematics
00040 
00041 def create_joint_kin(base_link, end_link, urdf_filename=None, timeout=1., wait=False, description_param="robot_description"):
00042     if urdf_filename is None:
00043         robot = URDF.from_parameter_server(key=description_param)
00044     else:
00045         f = file(urdf_filename, 'r')
00046         robot = Robot.from_xml_string(f.read())
00047         f.close()
00048     if not wait:
00049         return JointKinematics(robot, base_link, end_link, timeout=timeout)
00050     else:
00051         return JointKinematicsWait(robot, base_link, end_link)
00052 
00053 class JointKinematicsBase(KDLKinematics):
00054     
00055     
00056     
00057     
00058     
00059     
00060     
00061     def forward(self, q=None, end_link=None, base_link=None):
00062         if q is None:
00063             q = self.get_joint_angles()
00064             if q is None:
00065                 return None
00066         return super(JointKinematicsBase, self).forward(q, end_link, base_link)
00067 
00068     
00069     
00070     
00071     
00072     def jacobian(self, q=None):
00073         if q is None:
00074             q = self.get_joint_angles()
00075             if q is None:
00076                 return None
00077         return super(JointKinematicsBase, self).jacobian(q)
00078 
00079     
00080     
00081     
00082     
00083     def inertia(self, q=None):
00084         if q is None:
00085             q = self.get_joint_angles()
00086             if q is None:
00087                 return None
00088         return super(JointKinematicsBase, self).inertia(q)
00089 
00090     
00091     
00092     
00093     
00094     def cart_inertia(self, q=None):
00095         if q is None:
00096             q = self.get_joint_angles()
00097             if q is None:
00098                 return None
00099         return super(JointKinematicsBase, self).cart_inertia(q)
00100 
00101     
00102     
00103     
00104     
00105     def wrap_angles(self, q):
00106         contins = self.joint_types == 'continuous'
00107         return np.where(contins, np.mod(q, 2 * np.pi), q)
00108 
00109     
00110     
00111     def end_effector_force(self):
00112         J = self.jacobian()
00113         tau = self.get_joint_efforts()
00114         if J is None or tau is None:
00115             return None
00116         f, _, _, _ = np.linalg.lstsq(J.T, tau)
00117         return f
00118 
00119 
00120 
00121 
00122 
00123 class JointKinematics(JointKinematicsBase):
00124     
00125     
00126     
00127     
00128     
00129     
00130     
00131     
00132     def __init__(self, urdf, base_link, end_link, kdl_tree=None, timeout=1.):
00133         super(JointKinematics, self).__init__(urdf, base_link, end_link, kdl_tree)
00134         self._joint_angles = None
00135         self._joint_velocities = None
00136         self._joint_efforts = None
00137         self._joint_state_inds = None
00138 
00139         rospy.Subscriber('/joint_states', JointState, self._joint_state_cb)
00140 
00141         if timeout > 0:
00142             self.wait_for_joint_angles(timeout)
00143 
00144     
00145     
00146     def _joint_state_cb(self, msg):
00147         if self._joint_state_inds is None:
00148             joint_names_list = self.get_joint_names()
00149             self._joint_state_inds = [msg.name.index(joint_name) for
00150                                      joint_name in joint_names_list]
00151         self._joint_angles = [msg.position[i] for i in self._joint_state_inds]
00152         self._joint_velocities = [msg.velocity[i] for i in self._joint_state_inds]
00153         self._joint_efforts = [msg.effort[i] for i in self._joint_state_inds]
00154 
00155     
00156     
00157     
00158     def wait_for_joint_angles(self, timeout=1.):
00159         if timeout <= 0:
00160             return self._joint_angles is not None
00161         start_time = rospy.get_time()
00162         r = rospy.Rate(100)
00163         while not rospy.is_shutdown() and rospy.get_time() - start_time < timeout:
00164             if self._joint_efforts is not None:
00165                 return True
00166             r.sleep()
00167         if not rospy.is_shutdown():
00168             rospy.logwarn("[joint_state_kdl_kin] Cannot read joint angles, timing out.")
00169         return False
00170 
00171     
00172     
00173     
00174     
00175     def get_joint_angles(self, wrapped=False):
00176         if self._joint_angles is None:
00177             rospy.logwarn("[joint_state_kdl_kin] Joint angles haven't been filled yet.")
00178             return None
00179         if wrapped:
00180             return super(JointKinematics, self).wrap_angles(self._joint_angles)
00181         else:
00182             return np.array(self._joint_angles).copy()
00183 
00184     
00185     
00186     def get_joint_velocities(self):
00187         if self._joint_velocities is None:
00188             rospy.logwarn("[joint_state_kdl_kin] Joint velocities haven't been filled yet.")
00189             return None
00190         return np.array(self._joint_velocities).copy()
00191 
00192     
00193     
00194     def get_joint_efforts(self):
00195         if self._joint_efforts is None:
00196             rospy.logwarn("[joint_state_kdl_kin] Joint efforts haven't been filled yet.")
00197             return None
00198         return np.array(self._joint_efforts).copy()
00199 
00200 
00201 
00202 class JointKinematicsWait(JointKinematicsBase):
00203     
00204     
00205     
00206     
00207     
00208     
00209     
00210     
00211     def __init__(self, urdf, base_link, end_link, kdl_tree=None):
00212         super(JointKinematicsWait, self).__init__(urdf, base_link, end_link, kdl_tree)
00213 
00214     def get_joint_state(self, timeout=1.0):
00215         try:
00216             js = rospy.wait_for_message('/joint_states', JointState, timeout)
00217         except ROSException as e:
00218             rospy.logwarn('get_joint_state timed out after %1.1f s' % timeout)
00219             return None, None
00220         joint_names_list = self.get_joint_names()
00221         joint_state_inds = [js.name.index(joint_name) for joint_name in joint_names_list]
00222         return js, joint_state_inds
00223 
00224     
00225     
00226     
00227     
00228     def get_joint_angles(self, wrapped=False, timeout=1.0):
00229         js, js_inds = self.get_joint_state()
00230         if js is None:
00231             rospy.logwarn("[joint_state_kdl_kin] Joint states haven't been filled yet.")
00232             return None
00233         if len(js.position) < np.max(js_inds):
00234             rospy.logwarn("[joint_state_kdl_kin] Joint positions not fully filled.")
00235             return None
00236         q = np.array(js.position)[js_inds]
00237         if wrapped:
00238             return super(JointKinematicsWait, self).wrap_angles(q)
00239         else:
00240             return q
00241 
00242     
00243     
00244     def get_joint_velocities(self, timeout=1.0):
00245         js, js_inds = self.get_joint_state()
00246         if js is None:
00247             rospy.logwarn("[joint_state_kdl_kin] Joint states haven't been filled yet.")
00248             return None
00249         if len(js.velocity) < np.max(js_inds):
00250             rospy.logwarn("[joint_state_kdl_kin] Joint velocity not fully filled.")
00251             return None
00252         qd = np.array(js.velocity)[js_inds]
00253         if wrapped:
00254             return super(JointKinematicsWait, self).wrap_angles(qd)
00255         else:
00256             return qd
00257 
00258     
00259     
00260     def get_joint_efforts(self, timeout=1.0):
00261         js, js_inds = self.get_joint_state()
00262         if js is None:
00263             rospy.logwarn("[joint_state_kdl_kin] Joint states haven't been filled yet.")
00264             return None
00265         if len(js.effort) < np.max(js_inds):
00266             rospy.logwarn("[joint_state_kdl_kin] Joint efforts not fully filled.")
00267             return None
00268         i = np.array(js.effort)[js_inds]
00269         if wrapped:
00270             return super(JointKinematicsWait, self).wrap_angles(i)
00271         else:
00272             return i
00273 
00274 def main():
00275     rospy.init_node("joint_kinematics")
00276     import sys
00277     def usage():
00278         print("Tests for kdl_parser:\n")
00279         print("kdl_parser <urdf file>")
00280         print("\tLoad the URDF from file.")
00281         print("kdl_parser")
00282         print("\tLoad the URDF from the parameter server.")
00283         sys.exit(1)
00284 
00285     if len(sys.argv) > 2:
00286         usage()
00287     if len(sys.argv) == 2 and (sys.argv[1] == "-h" or sys.argv[1] == "--help"):
00288         usage()
00289     if (len(sys.argv) == 1):
00290         robot = URDF.from_parameter_server()
00291     else:
00292         f = file(sys.argv[1], 'r')
00293         robot = Robot.from_xml_string(f.read())
00294         f.close()
00295 
00296     if True:
00297         import random
00298         base_link = robot.get_root()
00299         end_link = robot.link_map.keys()[random.randint(0, len(robot.link_map)-1)]
00300         print "Root link: %s; Random end link: %s" % (base_link, end_link)
00301         js_kin = JointKinematics(robot, base_link, end_link)
00302         print "Joint angles:", js_kin.get_joint_angles()
00303         print "Joint angles (wrapped):", js_kin.get_joint_angles(True)
00304         print "Joint velocities:", js_kin.get_joint_velocities()
00305         print "Joint efforts:", js_kin.get_joint_efforts()
00306         print "Jacobian:", js_kin.jacobian()
00307         kdl_pose = js_kin.forward()
00308         print "FK:", kdl_pose
00309         print "End effector force:", js_kin.end_effector_force()
00310 
00311         if True:
00312             import tf
00313             from hrl_geom.pose_converter import PoseConv
00314             tf_list = tf.TransformListener()
00315             rospy.sleep(1)
00316             t = tf_list.getLatestCommonTime(base_link, end_link)
00317             tf_pose = PoseConv.to_homo_mat(tf_list.lookupTransform(base_link, end_link, t))
00318             print "Error with TF:", np.linalg.norm(kdl_pose - tf_pose)
00319 
00320 if __name__ == "__main__":
00321     main()