arm_client.py
Go to the documentation of this file.
00001 #
00002 # subscribe to thw joint angles and raw forces topics,  and provide FK
00003 # etc.
00004 #
00005 # Copyright (c) 2009, Georgia Tech Research Corporation
00006 # All rights reserved.
00007 # 
00008 # Redistribution and use in source and binary forms, with or without
00009 # modification, are permitted provided that the following conditions are met:
00010 #     * Redistributions of source code must retain the above copyright
00011 #       notice, this list of conditions and the following disclaimer.
00012 #     * Redistributions in binary form must reproduce the above copyright
00013 #       notice, this list of conditions and the following disclaimer in the
00014 #       documentation and/or other materials provided with the distribution.
00015 #     * Neither the name of the Georgia Tech Research Corporation nor the
00016 #       names of its contributors may be used to endorse or promote products
00017 #       derived from this software without specific prior written permission.
00018 # 
00019 # THIS SOFTWARE IS PROVIDED BY GEORGIA TECH RESEARCH CORPORATION ''AS IS'' AND
00020 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00021 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00022 # DISCLAIMED. IN NO EVENT SHALL GEORGIA TECH BE LIABLE FOR ANY DIRECT, INDIRECT,
00023 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00024 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
00025 # OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00026 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
00027 # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
00028 # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029 #
00030 
00031 # Author: Advait Jain
00032 
00033 import math
00034 import numpy as np
00035 import copy
00036 import sys, time, os
00037 from threading import RLock
00038 
00039 import roslib; roslib.load_manifest('hrl_cody_arms')
00040 import rospy
00041 
00042 import hrl_lib.viz as hv
00043 
00044 from hrl_msgs.msg import FloatArray
00045 from std_msgs.msg import Header, Bool, Empty
00046 
00047 from std_srvs.srv import Empty as Empty_srv
00048 from visualization_msgs.msg import Marker
00049 
00050 
00051 # used in client and server.
00052 def get_joint_name_dict():
00053     joint_names_list = {}
00054     joint_names_list['right_arm'] = ['m3joint_ma1_j%d'%i for i in range(7)]
00055     joint_names_list['left_arm'] = ['m3joint_ma2_j%d'%i for i in range(7)]
00056     return joint_names_list
00057 
00058 
00059 class MekaArmClient():
00060     ##
00061     # @param arms: object of the ArmKinematics class.
00062     def __init__(self, arms):
00063         self.cb_lock = RLock()
00064         self.r_arm_jep = None
00065         self.l_arm_jep = None
00066         self.r_arm_alpha = None
00067         self.l_arm_alpha = None
00068         self.r_arm_q = None
00069         self.l_arm_q = None
00070         self.r_arm_force = None
00071         self.r_arm_raw_force = None
00072         self.l_arm_force = None
00073         self.l_arm_raw_force = None
00074         self.pwr_state = False
00075 
00076         self.left_arm_ft = {'force': np.matrix(np.zeros((3,1),dtype='float32')),
00077                              'torque': np.matrix(np.zeros((3,1),dtype='float32'))}
00078         self.right_arm_ft = {'force': np.matrix(np.zeros((3,1),dtype='float32')),
00079                             'torque': np.matrix(np.zeros((3,1),dtype='float32'))}
00080         self.fts_bias = {'left_arm': self.left_arm_ft, 'right_arm': self.right_arm_ft}
00081         self.arms = arms
00082 
00083         self.joint_names_list = get_joint_name_dict()
00084 
00085         self.r_jep_cmd_pub = rospy.Publisher('/r_arm/command/jep', FloatArray)
00086         self.l_jep_cmd_pub = rospy.Publisher('/l_arm/command/jep', FloatArray)
00087         self.r_alph_cmd_pub = rospy.Publisher('/r_arm/command/joint_impedance_scale', FloatArray)
00088         self.l_alph_cmd_pub = rospy.Publisher('/l_arm/command/joint_impedance_scale', FloatArray)
00089         self.stop_pub = rospy.Publisher('/arms/stop', Empty)
00090         self.motors_off_pub = rospy.Publisher('/arms/command/motors_off', Empty)
00091 
00092         self.cep_marker_pub = rospy.Publisher('/arms/viz/cep', Marker)
00093 
00094         rospy.Subscriber('/r_arm/jep', FloatArray, self.r_arm_jep_cb)
00095         rospy.Subscriber('/l_arm/jep', FloatArray, self.l_arm_jep_cb)
00096         rospy.Subscriber('/r_arm/joint_impedance_scale', FloatArray, self.r_arm_alpha_cb)
00097         rospy.Subscriber('/l_arm/joint_impedance_scale', FloatArray, self.l_arm_alpha_cb)
00098 
00099         rospy.Subscriber('/r_arm/q', FloatArray, self.r_arm_q_cb)
00100         rospy.Subscriber('/l_arm/q', FloatArray, self.l_arm_q_cb)
00101 
00102         rospy.Subscriber('/r_arm/force', FloatArray, self.r_arm_force_cb)
00103         rospy.Subscriber('/l_arm/force', FloatArray, self.l_arm_force_cb)
00104         rospy.Subscriber('/r_arm/force_raw', FloatArray, self.r_arm_raw_force_cb)
00105         rospy.Subscriber('/l_arm/force_raw', FloatArray, self.l_arm_raw_force_cb)
00106 
00107         rospy.Subscriber('/arms/pwr_state', Bool, self.pwr_state_cb)
00108 
00109         rospy.wait_for_service('toggle_floating_arms')
00110         self.toggle_floating_arms = rospy.ServiceProxy('toggle_floating_arms', Empty_srv)
00111 
00112 
00113         try:
00114             rospy.init_node('cody_arm_client', anonymous=True)
00115         except rospy.ROSException:
00116             pass
00117 
00118     #---------- ROS callbacks -----------------
00119     def pwr_state_cb(self, msg):
00120         self.cb_lock.acquire()
00121         self.pwr_state = msg.data
00122         self.cb_lock.release()
00123 
00124     def r_arm_jep_cb(self, msg):
00125         self.cb_lock.acquire()
00126         self.r_arm_jep = list(msg.data)
00127         self.cb_lock.release()
00128 
00129         # publish the CEP marker.
00130         cep, r = self.arms.FK_all('right_arm', self.r_arm_jep)
00131         o = np.matrix([0.,0.,0.,1.]).T
00132         cep_marker = hv.single_marker(cep, o, 'sphere',
00133                         '/torso_lift_link', color=(0., 0., 1., 1.),
00134                         scale = (0.02, 0.02, 0.02), duration=0.)
00135 
00136         cep_marker.header.stamp = msg.header.stamp
00137         self.cep_marker_pub.publish(cep_marker)
00138 
00139     def l_arm_jep_cb(self, msg):
00140         self.cb_lock.acquire()
00141         self.l_arm_jep = list(msg.data)
00142         self.cb_lock.release()
00143 
00144     def r_arm_alpha_cb(self, msg):
00145         self.cb_lock.acquire()
00146         self.r_arm_alpha = list(msg.data)
00147         self.cb_lock.release()
00148 
00149     def l_arm_alpha_cb(self, msg):
00150         self.cb_lock.acquire()
00151         self.l_arm_alpha = list(msg.data)
00152         self.cb_lock.release()
00153 
00154     def r_arm_q_cb(self, msg):
00155         self.cb_lock.acquire()
00156         self.r_arm_q = list(msg.data)
00157         self.cb_lock.release()
00158 
00159     def l_arm_q_cb(self, msg):
00160         self.cb_lock.acquire()
00161         self.l_arm_q = list(msg.data)
00162         self.cb_lock.release()
00163 
00164     def r_arm_force_cb(self, msg):
00165         self.cb_lock.acquire()
00166         self.r_arm_force = msg.data
00167         self.cb_lock.release()
00168 
00169     def l_arm_force_cb(self, msg):
00170         self.cb_lock.acquire()
00171         self.l_arm_force = msg.data
00172         self.cb_lock.release()
00173 
00174     def r_arm_raw_force_cb(self, msg):
00175         self.cb_lock.acquire()
00176         self.r_arm_raw_force = msg.data
00177         self.cb_lock.release()
00178 
00179     def l_arm_raw_force_cb(self, msg):
00180         self.cb_lock.acquire()
00181         self.l_arm_raw_force = msg.data
00182         self.cb_lock.release()
00183 
00184     #--------- functions to use -----------------
00185 
00186     ## Returns the current position, rotation of the arm.
00187     # @param arm 0 for right, 1 for left
00188     # @return position, rotation
00189     def end_effector_pos(self, arm):
00190         q = self.get_joint_angles(arm)
00191         return self.arms.FK_all(arm, q)
00192 
00193     ##
00194     # @return list of 7 joint angles.
00195     def get_joint_angles(self, arm):
00196         self.cb_lock.acquire()
00197         if arm == 'right_arm':
00198             q = copy.copy(self.r_arm_q)
00199         elif arm == 'left_arm':
00200             q = copy.copy(self.l_arm_q)
00201         else:
00202             self.cb_lock.release()
00203             raise RuntimeError('Undefined arm: %s'%(arm))
00204         self.cb_lock.release()
00205         return q
00206 
00207     def get_wrist_force(self, arm, bias=True, base_frame=False,
00208                         filtered = True):
00209         self.cb_lock.acquire()
00210         if arm == 'right_arm':
00211             if filtered:
00212                 f = copy.copy(self.r_arm_force)
00213             else:
00214                 f = copy.copy(self.r_arm_raw_force)
00215         elif arm == 'left_arm':
00216             if filtered:
00217                 f = copy.copy(self.l_arm_force)
00218             else:
00219                 f = copy.copy(self.l_arm_raw_force)
00220         else:
00221             self.cb_lock.release()
00222             raise RuntimeError('Undefined arm: %s'%(arm))
00223         self.cb_lock.release()
00224 
00225         f_mat = np.matrix(f).T
00226         if bias:
00227             f_mat = f_mat - self.fts_bias[arm]['force']
00228         
00229         if base_frame:
00230             q = self.get_joint_angles(arm)
00231             rot = self.arms.FK_rot(arm, q)
00232             f_mat = rot*f_mat
00233         return f_mat
00234             
00235     def bias_wrist_ft(self, arm):
00236         f_list = []
00237         t_list = []
00238         print 'Starting biasing...'
00239         for i in range(20):
00240             f_list.append(self.get_wrist_force(arm, bias=False))
00241             rospy.sleep(0.02)
00242 
00243         f_b = np.mean(np.column_stack(f_list), 1)
00244         # torque is unimplemented.
00245         t_b = self.get_wrist_torque(arm, bias=False)
00246         self.fts_bias[arm]['force'] = f_b
00247         self.fts_bias[arm]['torque'] = t_b
00248         print 'self.fts_bias[arm][\'force\']', self.fts_bias[arm]['force']
00249         print 'arm:', arm
00250         print '...done'
00251 
00252     ##
00253     # @return list of floats b/w 0 and 1.
00254     def get_impedance_scale(self, arm):
00255         self.cb_lock.acquire()
00256         if arm == 'right_arm':
00257             sc = copy.copy(self.r_arm_alpha)
00258         elif arm == 'left_arm':
00259             sc = copy.copy(self.l_arm_alpha)
00260         else:
00261             self.cb_lock.release()
00262             raise RuntimeError('Undefined arm: %s'%(arm))
00263         self.cb_lock.release()
00264         return sc
00265 
00266     ##
00267     # @param s - list of floats b/w 0 and 1.
00268     def set_impedance_scale(self, arm, s):
00269         if arm == 'right_arm': 
00270             pub = self.r_alph_cmd_pub
00271         elif arm == 'left_arm':
00272             pub = self.l_alph_cmd_pub
00273         else:
00274             raise RuntimeError('Undefined arm: %s'%(arm))
00275         time_stamp = rospy.Time.now()
00276         h = Header()
00277         h.stamp = time_stamp
00278         pub.publish(FloatArray(h, s))
00279 
00280     def get_jep(self, arm):
00281         self.cb_lock.acquire()
00282         if arm == 'right_arm':
00283             jep = copy.copy(self.r_arm_jep)
00284         elif arm == 'left_arm':
00285             jep = copy.copy(self.l_arm_jep)
00286         else:
00287             self.cb_lock.release()
00288             raise RuntimeError('Undefined arm: %s'%(arm))
00289         self.cb_lock.release()
00290         return jep
00291 
00292     ##
00293     # @param q - list of 7 joint angles in RADIANS. according to meka's coordinate frames.
00294     # @param duration - for compatibility with the PR2 class.
00295     def set_jep(self, arm, q, duration=None):
00296         if arm == 'right_arm': 
00297             pub = self.r_jep_cmd_pub
00298         elif arm == 'left_arm':
00299             pub = self.l_jep_cmd_pub
00300         else:
00301             raise RuntimeError('Undefined arm: %s'%(arm))
00302         time_stamp = rospy.Time.now()
00303         h = Header()
00304         h.stamp = time_stamp
00305         pub.publish(FloatArray(h, q))
00306 
00307 
00308     ##
00309     #Function that commands the arm(s) to incrementally move to a jep
00310     #@param speed the max angular speed (in radians per second)
00311     #@return 'reach'
00312     def go_jep(self, arm, q, stopping_function=None, speed=math.radians(30)):
00313         if speed>math.radians(90.):
00314             speed = math.radians(90.)
00315 
00316         qs_list,qe_list,ns_list,qstep_list = [],[],[],[]
00317         done_list = []
00318         time_between_cmds = 0.025
00319         
00320         #qs = np.matrix(self.get_joint_angles(arm))
00321         qs = np.matrix(self.get_jep(arm))
00322         qe = np.matrix(q)
00323         max_change = np.max(np.abs(qe-qs))
00324 
00325         total_time = max_change/speed
00326         n_steps = int(total_time/time_between_cmds+0.5)
00327 
00328         qstep = (qe-qs)/n_steps
00329 
00330         if stopping_function != None:
00331             done = stopping_function()
00332         else:
00333             done = False
00334 
00335         step_number = 0
00336         t0 = rospy.Time.now().to_time()
00337         t_end = t0
00338         while done==False:
00339             t_end += time_between_cmds
00340             t1 = rospy.Time.now().to_time()
00341 
00342             if stopping_function != None:
00343                 done = stopping_function()
00344             if step_number < n_steps and done == False:
00345                 q = (qs + step_number*qstep).A1.tolist()
00346                 self.set_jep(arm, q)
00347             else:
00348                 done = True
00349 
00350             while t1 < t_end:
00351                 if stopping_function != None:
00352                     done = done or stopping_function()
00353                 rospy.sleep(time_between_cmds/5)
00354                 t1 = rospy.Time.now().to_time()
00355             step_number += 1
00356 
00357         rospy.sleep(time_between_cmds)
00358         return 'reach'
00359 
00360     # Expect this to crash the program because sending a stop crashes
00361     # the meka server
00362     def stop(self):
00363         self.stop_pub.publish()
00364 
00365     def is_motor_power_on(self):
00366         return self.pwr_state
00367 
00368     def go_cep(self, arm, p, rot, speed = 0.10,
00369                      stopping_function = None, q_guess = None):
00370         q = self.arms.IK(arm, p, rot, q_guess)
00371         if q == None:
00372             print 'IK soln NOT found.'
00373             print 'trying to reach p= ', p
00374             return 'fail'
00375         else:
00376             q_start = np.matrix(self.get_joint_angles(arm))
00377             p_start = self.arms.FK(arm, q_start.A1.tolist())
00378             q_end = np.matrix(q)
00379     
00380             dist = np.linalg.norm(p-p_start)
00381             total_time = dist/speed
00382             max_change = np.max(np.abs(q_end-q_start))
00383             ang_speed = max_change/total_time
00384             return self.go_jep(arm, q, stopping_function, speed=ang_speed)
00385 
00386     ##
00387     # linearly interpolates the commanded cep.
00388     # @param arm - 'left_arm' or 'right_arm'
00389     # @param p - 3x1 np matrix
00390     # @param rot - rotation matrix
00391     # @param speed - linear speed (m/s)
00392     # @param stopping_function - returns True or False
00393     # @return string (reason for stopping)
00394     def go_cep_interpolate(self, arm, p, rot=None, speed=0.10,
00395                                  stopping_function=None):
00396         rot = None # Rotational interpolation not implemented right now.
00397         time_between_cmds = 0.025
00398 
00399         q_guess = self.get_jep(arm)
00400         cep = self.arms.FK(arm, q_guess)
00401         if rot == None:
00402             rot = self.arms.FK_rot(arm, q_guess)
00403 
00404         vec = p-cep
00405         dist = np.linalg.norm(vec)
00406         total_time = dist/speed
00407         n_steps = int(total_time/time_between_cmds + 0.5)
00408         vec = vec/dist
00409         vec = vec * speed * time_between_cmds
00410         
00411         pt = cep
00412         all_done = False
00413         i = 0 
00414         t0 = rospy.Time.now().to_time()
00415         t_end = t0
00416         while all_done==False:
00417             t_end += time_between_cmds
00418             t1 = rospy.Time.now().to_time()
00419             pt = pt + vec
00420             q = self.arms.IK(arm, pt, rot, q_guess)
00421 
00422             if q == None:
00423                 all_done = True
00424                 stop = 'IK fail'
00425                 continue
00426             self.set_jep(arm, q)
00427             q_guess = q
00428             while t1<t_end:
00429                 if stopping_function != None:
00430                     all_done = stopping_function()
00431                 if all_done:
00432                     stop = 'Stopping Condition'
00433                     break
00434                 rospy.sleep(time_between_cmds/5)
00435                 t1 = rospy.Time.now().to_time()
00436 
00437             i+=1
00438             if i == n_steps:
00439                 all_done = True
00440                 stop = ''
00441         return stop
00442 
00443     ##  
00444     # @param vec - displacement vector (base frame)
00445     # @param q_guess - previous JEP?
00446     # @return string
00447     def move_till_hit(self, arm, vec=np.matrix([0.3,0.,0.]).T, force_threshold=3.0,
00448                       speed=0.1, bias_FT=True):
00449         unit_vec =  vec/np.linalg.norm(vec)
00450         def stopping_function():
00451             force = self.get_wrist_force(arm, base_frame = True)
00452             force_projection = force.T*unit_vec *-1 # projection in direction opposite to motion.
00453             if force_projection>force_threshold:
00454                 return True
00455             elif np.linalg.norm(force)>45.:
00456                 return True
00457             else:
00458                 return False
00459 
00460         jep = self.get_jep(arm)
00461         cep, rot = self.arms.FK_all(arm, jep)
00462 
00463         if bias_FT:
00464             self.bias_wrist_ft(arm)
00465         time.sleep(0.5)
00466 
00467         p = cep + vec
00468         return self.go_cep_interpolate(arm, p, rot, speed,
00469                                        stopping_function)
00470 
00471     def motors_off(self):
00472         self.motors_off_pub.publish()
00473 
00474 #    def step(self):
00475 #        rospy.sleep(0.01)
00476 
00477     #-------- unimplemented functions -----------------
00478 
00479     # leaving this unimplemented for now. Advait Nov 14, 2010.
00480     def get_joint_velocities(self, arm):
00481         pass
00482 
00483     # leaving this unimplemented for now. Advait Nov 14, 2010.
00484     def get_joint_accelerations(self, arm):
00485         pass
00486 
00487     # leaving this unimplemented for now. Advait Nov 14, 2010.
00488     def get_joint_torques(self, arm):
00489         pass
00490 
00491     # leaving this unimplemented for now. Advait Nov 14, 2010.
00492     def get_wrist_torque(self, arm, bias=True):
00493         pass
00494 
00495     # leaving this unimplemented for now. Advait Nov 14, 2010.
00496     def power_on(self):
00497         pass
00498 
00499     # leaving this unimplemented for now. Advait Nov 14, 2010.
00500     # something to change and get arm_settings.
00501 
00502 
00503 if __name__ == '__main__':
00504     import arms as ar
00505     import m3.toolbox as m3t
00506     import hrl_lib.transforms as tr
00507 
00508     r_arm = 'right_arm'
00509     l_arm = 'left_arm'
00510 
00511     arms = ar.M3HrlRobot(0.16)
00512     ac = MekaArmClient(arms)
00513 
00514     # print FT sensor readings.
00515     if False:
00516         ac.bias_wrist_ft(r_arm)
00517         while not rospy.is_shutdown():
00518             f = ac.get_wrist_force(r_arm)
00519             print 'f:', f.A1
00520             rospy.sleep(0.05)
00521 
00522     # move the arms.
00523     if False:
00524         print 'hit a key to move the arms.'
00525         k=m3t.get_keystroke()
00526 
00527         rot_mat = tr.rotY(math.radians(-90))
00528         p = np.matrix([0.3, -0.24, -0.3]).T
00529     #    q = arms.IK(l_arm, p, rot_mat)
00530     #    ac.go_jep(l_arm, q)
00531     #    ac.go_cep(l_arm, p, rot_mat)
00532         ac.go_cep(r_arm, p, rot_mat)
00533 
00534     #    jep = ac.get_jep(r_arm)
00535     #    ac.go_jep(r_arm, jep)
00536 
00537         rospy.sleep(0.5)
00538         raw_input('Hit ENTER to print ee position')
00539         q = ac.get_joint_angles(r_arm)
00540         ee = ac.arms.FK(r_arm, q)
00541         print 'ee:', ee.A1
00542         print 'desired ee:', p.A1
00543 
00544     if False:
00545         print 'hit a key to float arms.'
00546         k=m3t.get_keystroke()
00547         ac.toggle_floating_arms()
00548 
00549         print 'hit a key to UNfloat arms.'
00550         k=m3t.get_keystroke()
00551         ac.toggle_floating_arms()
00552 
00553         #ac.move_till_hit(l_arm)
00554         #ac.motors_off()
00555     #    ac.stop()
00556 
00557     if False:
00558         while not rospy.is_shutdown():
00559             jep = ac.get_jep(r_arm)
00560             print 'jep:', jep
00561             rospy.sleep(0.05)
00562 
00563     if True:
00564         rospy.sleep(1.)
00565         isc =  ac.get_impedance_scale(r_arm)
00566         print isc
00567         isc[1] = 0.3
00568         ac.set_impedance_scale(r_arm, isc)
00569         rospy.sleep(1.)
00570         isc =  ac.get_impedance_scale(r_arm)
00571         print isc
00572 
00573 
00574 
00575 


hrl_cody_arms
Author(s): Advait Jain, Advisor: Prof. Charlie Kemp, Lab: Healthcare Robotics Lab
autogenerated on Wed Nov 27 2013 12:11:49