$search
00001 #!/usr/bin/env python 00002 # 00003 # Copyright 2011 Shadow Robot Company Ltd. 00004 # 00005 # This program is free software: you can redistribute it and/or modify it 00006 # under the terms of the GNU General Public License as published by the Free 00007 # Software Foundation, either version 2 of the License, or (at your option) 00008 # any later version. 00009 # 00010 # This program is distributed in the hope that it will be useful, but WITHOUT 00011 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00012 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 00013 # more details. 00014 # 00015 # You should have received a copy of the GNU General Public License along 00016 # with this program. If not, see <http://www.gnu.org/licenses/>. 00017 # 00018 import roslib; roslib.load_manifest('sr_hand') 00019 import time 00020 import os 00021 import math 00022 import rospy 00023 import subprocess 00024 import threading 00025 import rosgraph.masterapi 00026 import pr2_controllers_msgs.msg 00027 from sr_robot_msgs.msg import sendupdate, joint, joints_data, JointControllerState 00028 from sensor_msgs.msg import * 00029 from std_msgs.msg import Float64 00030 from grasps_parser import GraspParser 00031 from grasps_interpoler import GraspInterpoler 00032 00033 class Joint(): 00034 def __init__(self, name="", motor="", min=0, max=90): 00035 self.name = name 00036 self.motor = motor 00037 self.min = min 00038 self.max = max 00039 00040 class ShadowHand_ROS(): 00041 """ 00042 This is a python library used to easily access the shadow hand ROS interface. 00043 """ 00044 def __init__(self): 00045 """ 00046 Builds the library, creates the communication node in ROS and 00047 initializes the hand publisher and subscriber to the default 00048 values of shadowhand_data and sendupdate 00049 """ 00050 self.allJoints = [Joint("THJ1", "smart_motor_th1"), 00051 Joint("THJ2", "smart_motor_th2", -30, 30), 00052 Joint("THJ3", "smart_motor_th3",-15, 15), 00053 Joint("THJ4", "smart_motor_th4",0, 75), 00054 Joint("THJ5", "smart_motor_th5",-60, 60), 00055 Joint("FFJ0", "smart_motor_ff2", 0, 180), 00056 Joint("FFJ3", "smart_motor_ff3"), 00057 Joint("FFJ4", "smart_motor_ff4", -25, 25), 00058 Joint("MFJ0", "smart_motor_mf2", 0, 180), 00059 Joint("MFJ3", "smart_motor_mf3"), 00060 Joint("MFJ4", "smart_motor_mf4", -25, 25), 00061 Joint("RFJ0", "smart_motor_rf2", 0, 180), 00062 Joint("RFJ3", "smart_motor_rf3"), 00063 Joint("RFJ4", "smart_motor_rf4", -25,25), 00064 Joint("LFJ0", "smart_motor_lf2", 0, 180), 00065 Joint("LFJ3", "smart_motor_lf3"), 00066 Joint("LFJ4", "smart_motor_lf4", -25, 25), 00067 Joint("LFJ5", "smart_motor_lf5", 0, 45), 00068 Joint("WRJ1", "smart_motor_wr1", -30, 40), 00069 Joint("WRJ2", "smart_motor_wr2", -30, 10), 00070 ] 00071 self.handJoints = [] 00072 self.armJoints = [Joint("ShoulderJRotate", "", -45, 60), 00073 Joint("ShoulderJSwing", "", 0, 80), 00074 Joint("ElbowJSwing", "", 0,120), 00075 Joint("ElbowJRotate", "", -80,80) 00076 ] 00077 self.lastMsg = joints_data() 00078 self.lastArmMsg = joints_data() 00079 self.cyberglove_pub = 0 00080 self.cyberglove_sub = 0 00081 self.cybergrasp_pub = 0 00082 self.cyberglove_sub = 0 00083 self.isFirstMessage = True 00084 self.isFirstMessageArm = True 00085 self.isReady = False 00086 self.liste = 0 00087 self.hasarm = 0 00088 self.dict_pos = {} 00089 self.dict_tar = {} 00090 self.dict_arm_pos = {} 00091 self.dict_arm_tar = {} 00092 self.dict_ethercat_joints = {} 00093 self.eth_publishers = {} 00094 self.eth_subscribers = {} 00095 #rospy.init_node('python_hand_library') 00096 self.sendupdate_lock = threading.Lock() 00097 00098 #contains the ending for the topic depending on which controllers are loaded 00099 self.topic_ending = "" 00100 00101 ##EtherCAT hand 00102 self.activate_etherCAT_hand() 00103 00104 ### 00105 # Grasps 00106 self.grasp_parser = GraspParser() 00107 process = subprocess.Popen("rospack find sr_hand".split(), stdout=subprocess.PIPE) 00108 self.rootPath = process.communicate()[0] 00109 self.rootPath = self.rootPath.split('\n') 00110 self.rootPath = self.rootPath[0] 00111 #print "path : "+self.rootPath 00112 self.grasp_parser.parse_tree(self.rootPath+"/python_lib/grasp/grasps.xml") 00113 00114 self.grasp_interpoler = 0 00115 self.pub = rospy.Publisher('srh/sendupdate',sendupdate) 00116 self.pub_arm = rospy.Publisher('sr_arm/sendupdate',sendupdate) 00117 00118 self.sub_arm = rospy.Subscriber('sr_arm/shadowhand_data', joints_data,self.callback_arm) 00119 self.sub = rospy.Subscriber('srh/shadowhand_data', joints_data ,self.callback) 00120 00121 threading.Thread(None, rospy.spin) 00122 00123 def create_grasp_interpoler(self, current_step, next_step): 00124 self.grasp_interpoler = GraspInterpoler(current_step, next_step) 00125 00126 def callback_ethercat_states(self, data, jointName): 00127 """ 00128 @param data: The ROS message received which called the callback 00129 If the message is the first received, initializes the dictionnaries 00130 Else, it updates the lastMsg 00131 @param joint: a Joint object that contains the name of the joint that we receive data from 00132 """ 00133 joint_data = joint(joint_name=jointName, joint_target=math.degrees(float(data.set_point)), joint_position=math.degrees(float(data.process_value))) 00134 00135 # update the dictionary of joints 00136 self.dict_ethercat_joints[joint_data.joint_name]=joint_data 00137 00138 # Build a CAN hand style lastMsg with the latest info in self.dict_ethercat_joints 00139 self.lastMsg = joints_data() 00140 00141 for joint_name in self.dict_ethercat_joints.keys() : 00142 self.lastMsg.joints_list.append(self.dict_ethercat_joints[joint_name]) 00143 #self.lastMsg.joints_list_length++ 00144 00145 #TODO This is not the right way to do it. We should check that messages from all the joints have been received (but we don't know if we have all the joints, e.g three finger hand) 00146 self.isReady = True 00147 00148 ''' 00149 if self.isFirstMessage : 00150 self.init_actual_joints() 00151 for joint in self.lastMsg.joints_list : 00152 self.dict_pos[joint.joint_name]=joint.joint_position 00153 self.dict_tar[joint.joint_name]=joint.joint_target 00154 self.isFirstMessage = False 00155 self.isReady = True 00156 ''' 00157 00158 def callback(self, data): 00159 """ 00160 @param data: The ROS message received which called the callback 00161 If the message is the first received, initializes the dictionnaries 00162 Else, it updates the lastMsg 00163 """ 00164 self.lastMsg = data; 00165 if self.isFirstMessage : 00166 self.init_actual_joints() 00167 for joint in self.lastMsg.joints_list : 00168 self.dict_pos[joint.joint_name]=joint.joint_position 00169 self.dict_tar[joint.joint_name]=joint.joint_target 00170 self.isFirstMessage = False 00171 self.isReady = True 00172 00173 def callback_arm(self, data): 00174 """ 00175 @param data: The ROS message received which called the callback 00176 If the message is the first received, initializes the dictionnaries 00177 Else, it updates the lastMsg 00178 """ 00179 self.lastArmMsg = data 00180 if self.isFirstMessageArm : 00181 for joint in self.lastArmMsg.joints_list : 00182 self.dict_arm_pos[joint.joint_name]=joint.joint_position 00183 self.dict_arm_tar[joint.joint_name]=joint.joint_target 00184 00185 def init_actual_joints(self): 00186 """ 00187 Initializes the library with just the fingers actually connected 00188 """ 00189 for joint_all in self.allJoints : 00190 for joint_msg in self.lastMsg.joints_list : 00191 if joint_msg.joint_name == joint_all.name: 00192 self.handJoints.append(joint_all) 00193 00194 def check_hand_type(self): 00195 """ 00196 @return : true if some hand is detected 00197 """ 00198 if self.check_etherCAT_hand_presence(): 00199 return "etherCAT" 00200 elif self.check_gazebo_hand_presence(): 00201 return "gazebo" 00202 elif self.check_CAN_hand_presence(): 00203 return "CANhand" 00204 return None 00205 00206 def check_CAN_hand_presence(self): 00207 """ 00208 @return : true if the CAN hand is detected 00209 """ 00210 t = 0.0 00211 while not self.isReady: 00212 time.sleep(1.0) 00213 t = t+1.0 00214 rospy.loginfo("Waiting for service since "+str(t)+" seconds...") 00215 if t >= 5.0: 00216 rospy.logerr("No hand found. Are you sure the ROS hand is running ?") 00217 return False 00218 return True 00219 00220 def set_shadowhand_data_topic(self, topic): 00221 """ 00222 @param topic: The new topic to be set as the hand publishing topic 00223 Set the library to listen to a new topic 00224 """ 00225 print 'Changing subscriber to ' + topic 00226 self.sub = rospy.Subscriber(topic, joints_data ,self.callback) 00227 00228 def set_sendupdate_topic(self, topic): 00229 """ 00230 @param topic: The new topic to be set as the hand subscribing topic 00231 Set the library to publish to a new topic 00232 """ 00233 print 'Changing publisher to ' + topic 00234 self.pub = rospy.Publisher(topic,sendupdate) 00235 00236 def sendupdate_from_dict(self, dicti): 00237 """ 00238 @param dicti: Dictionnary containing all the targets to send, mapping the name of the joint to the value of its target 00239 Sends new targets to the hand from a dictionnary 00240 """ 00241 #print(dicti) 00242 if (self.check_hand_type() == "etherCAT") or (self.check_hand_type() == "gazebo"): 00243 for join in dicti.keys(): 00244 if not self.eth_publishers.has_key(join): 00245 topic = "/sh_"+ join.lower() + self.topic_ending+"/command" 00246 self.eth_publishers[join] = rospy.Publisher(topic, Float64) 00247 00248 msg_to_send = Float64() 00249 msg_to_send.data = math.radians( float( dicti[join] ) ) 00250 self.eth_publishers[join].publish(msg_to_send) 00251 elif self.check_hand_type() == "CANhand": 00252 message = [] 00253 for join in dicti.keys(): 00254 message.append(joint(joint_name=join, joint_target=dicti[join])) 00255 self.pub.publish(sendupdate(len(message), message)) 00256 00257 def sendupdate(self, jointName, angle=0): 00258 """ 00259 @param jointName: Name of the joint to update 00260 @param angle: Target of the joint, 0 if not set 00261 Sends a new target for the specified joint 00262 """ 00263 self.sendupdate_lock.acquire() 00264 00265 if (self.check_hand_type() == "etherCAT") or (self.check_hand_type() == "gazebo"): 00266 if not self.eth_publishers.has_key(jointName): 00267 topic = "/sh_"+ jointName.lower() + self.topic_ending + "/command" 00268 self.eth_publishers[jointName] = rospy.Publisher(topic, Float64) 00269 00270 msg_to_send = Float64() 00271 msg_to_send.data = math.radians( float( angle ) ) 00272 self.eth_publishers[jointName].publish(msg_to_send) 00273 elif self.check_hand_type() == "CANhand": 00274 message = [joint(joint_name=jointName, joint_target=angle)] 00275 self.pub.publish(sendupdate(len(message), message)) 00276 00277 self.sendupdate_lock.release() 00278 00279 def sendupdate_arm_from_dict(self, dicti): 00280 """ 00281 @param dicti: Dictionnary containing all the targets to send, mapping the name of the joint to the value of its target 00282 Sends new targets to the hand from a dictionnary 00283 """ 00284 message = [] 00285 for join in dicti.keys(): 00286 message.append(joint(joint_name=join, joint_target=dicti[join])) 00287 self.pub_arm.publish(sendupdate(len(message), message)) 00288 00289 def sendupdate_arm(self, jointName, angle=0): 00290 """ 00291 @param jointName: Name of the joint to update 00292 @param angle: Target of the joint, 0 if not set 00293 Sends a new target for the specified joint 00294 """ 00295 message = [joint(joint_name=jointName, joint_target=angle)] 00296 self.pub_arm.publish(sendupdate_arm(len(message), message)) 00297 00298 def valueof(self, jointName): 00299 """ 00300 @param jointName: Name of the joint to read the value 00301 @return: 'NaN' if the value is not correct, the actual position of the joint else 00302 """ 00303 for joint in self.lastMsg.joints_list: 00304 if joint.joint_name == jointName: 00305 return float(joint.joint_position) 00306 for joint in self.lastArmMsg.joints_list: 00307 if joint.joint_name == jointName: 00308 return float(joint.joint_position) 00309 return 'NaN' 00310 00311 def has_arm(self): 00312 """ 00313 @return : True if an arm is detected on the roscore 00314 """ 00315 if not self.hasarm == 0: 00316 return self.hasarm 00317 self.hasarm = False 00318 if self.liste == 0: 00319 master = rosgraph.masterapi.Master('/rostopic') 00320 self.liste = master.getPublishedTopics('/') 00321 for topic_typ in self.liste : 00322 for topic in topic_typ: 00323 if 'sr_arm/shadowhand_data' in topic : 00324 self.hasarm = True 00325 return self.hasarm 00326 00327 def record_step_to_file(self, filename, grasp_as_xml): 00328 """ 00329 @param filename: name (or path) of the file to save to 00330 @param grasp_as_xml: xml-formatted grasp 00331 Write the grasp at the end of the file, creates the file if does not exist 00332 """ 00333 if os.path.exists(filename) : 00334 obj = open(filename, 'r') 00335 text = obj.readlines() 00336 obj.close() 00337 objWrite = open(filename,'w') 00338 for index in range(0,len(text)-1): 00339 objWrite.write(text[index]) 00340 objWrite.write(grasp_as_xml) 00341 objWrite.write('</root>') 00342 objWrite.close() 00343 else : 00344 objWrite = open(filename,'w') 00345 objWrite.write('<root>\n') 00346 objWrite.write(grasp_as_xml) 00347 objWrite.write('</root>') 00348 objWrite.close() 00349 00350 def save_hand_position_to_file(self, filename): 00351 objFile=open(filename,'w') 00352 for key, value in self.dict_pos: 00353 objFile.write(key + ' ' + value + '\n') 00354 objFile.close() 00355 00356 def read_all_current_positions(self): 00357 """ 00358 @return: dictionnary mapping joint names to actual positions 00359 Read all the positions in the lastMsg 00360 """ 00361 if not self.isReady: 00362 return 00363 for joint in self.lastMsg.joints_list: 00364 self.dict_pos[joint.joint_name] = joint.joint_position 00365 return self.dict_pos 00366 00367 def read_all_current_targets(self): 00368 """ 00369 @return: dictionnary mapping joint names to current targets 00370 Read all the targets in the lastMsg 00371 """ 00372 for joint in self.lastMsg.joints_list: 00373 self.dict_tar[joint.joint_name] = joint.joint_target 00374 return self.dict_tar 00375 00376 def read_all_current_arm_positions(self): 00377 """ 00378 @return: dictionnary mapping joint names to actual positions 00379 Read all the positions in the lastMsg 00380 """ 00381 for joint in self.lastArmMsg.joints_list: 00382 self.dict_arm_pos[joint.joint_name] = joint.joint_position 00383 return self.dict_arm_pos 00384 00385 def read_all_current_arm_targets(self): 00386 """ 00387 @return: dictionnary mapping joint names to actual targets 00388 Read all the targets in the lastMsg 00389 """ 00390 for joint in self.lastArmMsg.joints_list: 00391 self.dict_arm_tar[joint.joint_name] = joint.joint_target 00392 return self.dict_arm_tar 00393 00394 def resend_targets(self): 00395 """ 00396 Resend the targets read in the lastMsg to the hand 00397 """ 00398 for key, value in self.dict_tar.items(): 00399 self.sendupdate(jointName=key, angle=value) 00400 00401 def callVisualisationService(self, callList=0, reset = 0): 00402 """ 00403 @param callList: dictionnary mapping joint names to information that should be displayed 00404 @param reset: flag used to tell if the parameters should be replaced by the new ones or just added to the previous ones 00405 Calls a ROS service to display various information in Rviz 00406 """ 00407 if reset == 0: 00408 print 'no reset' 00409 00410 if reset == 1: 00411 print 'reset' 00412 00413 def check_etherCAT_hand_presence(self): 00414 """ 00415 Only used to check if a real etherCAT hand is detected in the system 00416 check if something is being published to this topic, otherwise 00417 return false 00418 """ 00419 try: 00420 rospy.wait_for_message("/joint_states", JointState, timeout = 0.2) 00421 except: 00422 rospy.logwarn("no message received from /joint_states") 00423 return False 00424 00425 return True 00426 00427 def check_gazebo_hand_presence(self): 00428 """ 00429 Only used to check if a Gazebo simulated (etherCAT protocol) hand is detected in the system 00430 check if something is being published to this topic, otherwise 00431 return false 00432 """ 00433 try: 00434 rospy.wait_for_message("/gazebo/joint_states", JointState, timeout = 0.2) 00435 except: 00436 return False 00437 00438 return True 00439 00440 def activate_etherCAT_hand(self): 00441 """ 00442 At the moment we just try to use the mixed position velocity controllers 00443 """ 00444 success = True 00445 for joint_all in self.allJoints : 00446 self.topic_ending = "_mixed_position_velocity_controller" 00447 topic = "/sh_"+ joint_all.name.lower() + self.topic_ending + "/state" 00448 success = True 00449 try: 00450 rospy.wait_for_message(topic, JointControllerState, timeout = 0.2) 00451 except: 00452 try: 00453 self.topic_ending = "_position_controller" 00454 topic = "/sh_"+ joint_all.name.lower() + self.topic_ending + "/state" 00455 rospy.wait_for_message(topic, pr2_controllers_msgs.msg.JointControllerState, timeout = 0.2) 00456 except: 00457 success = False 00458 00459 if success: 00460 if self.topic_ending == "_mixed_position_velocity_controller": 00461 self.eth_subscribers[joint_all.name] = rospy.Subscriber(topic, JointControllerState, self.callback_ethercat_states, joint_all.name) 00462 else: 00463 self.eth_subscribers[joint_all.name] = rospy.Subscriber(topic, pr2_controllers_msgs.msg.JointControllerState, self.callback_ethercat_states, joint_all.name) 00464 00465 if len(self.eth_subscribers) > 0: 00466 return True 00467 00468 return False