shadowhand_ros.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright 2011 Shadow Robot Company Ltd.
4 #
5 # This program is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation version 2 of the License.
8 #
9 # This program is distributed in the hope that it will be useful, but WITHOUT
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 # more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program. If not, see <http://www.gnu.org/licenses/>.
16 
17 import time
18 import os
19 import math
20 import rospy
21 import rospkg
22 import threading
23 import rosgraph.masterapi
24 import control_msgs.msg
25 from sr_robot_msgs.msg import sendupdate, joint, joints_data, JointControllerState
26 from sensor_msgs.msg import JointState
27 from std_msgs.msg import Float64
28 from sr_hand.grasps_parser import GraspParser
29 from sr_hand.grasps_interpoler import GraspInterpoler
30 from sr_hand.tactile_receiver import TactileReceiver
31 # this is only used to detect if the hand is a Gazebo hand, not to
32 # actually reconfigure anything
33 from dynamic_reconfigure.msg import Config
34 
35 
36 class Joint(object):
37 
38  def __init__(self, name="", motor="", j_min=0, j_max=90):
39  self.name = name
40  self.motor = motor
41  self.min = j_min
42  self.max = j_max
43 
44 
45 class ShadowHand_ROS(object):
46 
47  """
48  This is a python library used to easily access the shadow hand ROS interface.
49  """
50 
51  def __init__(self):
52  """
53  Builds the library, creates the communication node in ROS and
54  initializes the hand publisher and subscriber to the default
55  values of shadowhand_data and sendupdate
56  """
57  self.allJoints = [Joint("THJ1", "smart_motor_th1", 0, 90),
58  Joint("THJ2", "smart_motor_th2", -40, 40),
59  Joint("THJ3", "smart_motor_th3", -15, 15),
60  Joint("THJ4", "smart_motor_th4", 0, 75),
61  Joint("THJ5", "smart_motor_th5", -60, 60),
62  Joint("FFJ0", "smart_motor_ff2", 0, 180),
63  Joint("FFJ3", "smart_motor_ff3"),
64  Joint("FFJ4", "smart_motor_ff4", -20, 20),
65  Joint("MFJ0", "smart_motor_mf2", 0, 180),
66  Joint("MFJ3", "smart_motor_mf3"),
67  Joint("MFJ4", "smart_motor_mf4", -20, 20),
68  Joint("RFJ0", "smart_motor_rf2", 0, 180),
69  Joint("RFJ3", "smart_motor_rf3"),
70  Joint("RFJ4", "smart_motor_rf4", -20, 20),
71  Joint("LFJ0", "smart_motor_lf2", 0, 180),
72  Joint("LFJ3", "smart_motor_lf3"),
73  Joint("LFJ4", "smart_motor_lf4", -20, 20),
74  Joint("LFJ5", "smart_motor_lf5", 0, 45),
75  Joint("WRJ1", "smart_motor_wr1", -45, 30),
76  Joint("WRJ2", "smart_motor_wr2", -30, 10),
77  ]
78  self.handJoints = []
79  self.armJoints = [Joint("ShoulderJRotate", "", -45, 60),
80  Joint("ShoulderJSwing", "", 0, 80),
81  Joint("ElbowJSwing", "", 0, 120),
82  Joint("ElbowJRotate", "", -80, 80)
83  ]
84  self.lastMsg = joints_data()
85  self.lastArmMsg = joints_data()
86  self.cyberglove_pub = 0
87  self.cyberglove_sub = 0
88  self.cybergrasp_pub = 0
89  self.cyberglove_sub = 0
90  self.isFirstMessage = True
91  self.isFirstMessageArm = True
92  self.isReady = False
93  self.liste = 0
94  self.hasarm = 0
95  self.dict_pos = {}
96  self.dict_tar = {}
97  self.dict_arm_pos = {}
98  self.dict_arm_tar = {}
100  self.eth_publishers = {}
101  self.eth_subscribers = {}
102  # rospy.init_node('python_hand_library')
103  self.sendupdate_lock = threading.Lock()
104 
105  self.joint_states_lock = threading.Lock()
106 
107  # contains the ending for the topic depending on which controllers are
108  # loaded
109  self.topic_ending = ""
110 
111  # EtherCAT hand
113 
114  #
115  # Grasps
117 
118  self.rootPath = rospkg.RosPack().get_path('sr_hand')
119  self.grasp_parser.parse_tree(
120  self.rootPath + "/scripts/sr_hand/grasps.xml")
121 
123  self.pub = rospy.Publisher(
124  'srh/sendupdate', sendupdate, queue_size=1, latch=True)
125  self.pub_arm = rospy.Publisher(
126  'sr_arm/sendupdate', sendupdate, queue_size=1, latch=True)
127 
128  self.sub_arm = rospy.Subscriber(
129  'sr_arm/shadowhand_data', joints_data, self.callback_arm)
130  self.sub = rospy.Subscriber(
131  'srh/shadowhand_data', joints_data, self.callback)
132 
134 
135  self.hand_velocity = {}
136  self.hand_effort = {}
137 
138  if (self.hand_type == "etherCAT") or (self.hand_type == "gazebo"):
139  self.joint_states_listener = rospy.Subscriber(
140  "joint_states", JointState, self.joint_states_callback)
141  # Initialize the command publishers here, to avoid the delay caused
142  # when initializing them in the sendupdate
143  for jnt in self.allJoints:
144  if jnt.name not in self.eth_publishers:
145  topic = "sh_" + \
146  jnt.name.lower() + self.topic_ending + "/command"
147  self.eth_publishers[jnt.name] = rospy.Publisher(
148  topic, Float64, queue_size=1, latch=True)
150 
151  threading.Thread(None, rospy.spin)
152 
153  def create_grasp_interpoler(self, current_step, next_step):
154  self.grasp_interpoler = GraspInterpoler(current_step, next_step)
155 
156  def callback_ethercat_states(self, data, jointName):
157  """
158  @param data: The ROS message received which called the callback
159  If the message is the first received, initializes the dictionnaries
160  Else, it updates the lastMsg
161  @param joint: a Joint object that contains the name of the joint that we receive data from
162  """
163  joint_data = joint(joint_name=jointName, joint_target=math.degrees(
164  float(data.set_point)), joint_position=math.degrees(float(data.process_value)))
165 
166  # update the dictionary of joints
167  self.dict_ethercat_joints[joint_data.joint_name] = joint_data
168 
169  # Build a CAN hand style lastMsg with the latest info in
170  # self.dict_ethercat_joints
171  self.lastMsg = joints_data()
172 
173  for joint_name in self.dict_ethercat_joints.keys():
174  self.lastMsg.joints_list.append(
175  self.dict_ethercat_joints[joint_name])
176  # self.lastMsg.joints_list_length++
177 
178  # TODO This is not the right way to do it. We should check that messages from all the joints have been
179  # received (but we don't know if we have all the joints, e.g three
180  # finger hand)
181  self.isReady = True
182 
183  # if self.isFirstMessage :
184  # self.init_actual_joints()
185  # for joint in self.lastMsg.joints_list :
186  # self.dict_pos[joint.joint_name]=joint.joint_position
187  # self.dict_tar[joint.joint_name]=joint.joint_target
188  # self.isFirstMessage = False
189  # self.isReady = True
190 
191  def callback(self, data):
192  """
193  @param data: The ROS message received which called the callback
194  If the message is the first received, initializes the dictionnaries
195  Else, it updates the lastMsg
196  """
197  self.lastMsg = data
198  if self.isFirstMessage:
199  self.init_actual_joints()
200  for jnt in self.lastMsg.joints_list:
201  self.dict_pos[jnt.joint_name] = jnt.joint_position
202  self.dict_tar[jnt.joint_name] = jnt.joint_target
203  self.isFirstMessage = False
204  self.isReady = True
205 
206  def callback_arm(self, data):
207  """
208  @param data: The ROS message received which called the callback
209  If the message is the first received, initializes the dictionnaries
210  Else, it updates the lastMsg
211  """
212  self.lastArmMsg = data
213  if self.isFirstMessageArm:
214  for jnt in self.lastArmMsg.joints_list:
215  self.dict_arm_pos[jnt.joint_name] = jnt.joint_position
216  self.dict_arm_tar[jnt.joint_name] = jnt.joint_target
217 
219  """
220  Initializes the library with just the fingers actually connected
221  """
222  for joint_all in self.allJoints:
223  for joint_msg in self.lastMsg.joints_list:
224  if joint_msg.joint_name == joint_all.name:
225  self.handJoints.append(joint_all)
226 
227  def check_hand_type(self):
228  """
229  @return : true if some hand is detected
230  """
231  if self.check_gazebo_hand_presence():
232  return "gazebo"
233  elif self.check_etherCAT_hand_presence():
234  return "etherCAT"
235  elif self.check_CAN_hand_presence():
236  return "CANhand"
237  return None
238 
240  """
241  @return : true if the CAN hand is detected
242  """
243  t = 0.0
244  while not self.isReady:
245  time.sleep(1.0)
246  t = t + 1.0
247  rospy.loginfo(
248  "Waiting for service since " + str(t) + " seconds...")
249  if t >= 5.0:
250  rospy.logerr(
251  "No hand found. Are you sure the ROS hand is running ?")
252  return False
253  return True
254 
255  def set_shadowhand_data_topic(self, topic):
256  """
257  @param topic: The new topic to be set as the hand publishing topic
258  Set the library to listen to a new topic
259  """
260  print 'Changing subscriber to ' + topic
261  self.sub = rospy.Subscriber(topic, joints_data, self.callback)
262 
263  def set_sendupdate_topic(self, topic):
264  """
265  @param topic: The new topic to be set as the hand subscribing topic
266  Set the library to publish to a new topic
267  """
268  print 'Changing publisher to ' + topic
269  self.pub = rospy.Publisher(topic, sendupdate, latch=True)
270 
271  def sendupdate_from_dict(self, dicti):
272  """
273  @param dicti: Dictionnary containing all the targets to send, mapping
274  the name of the joint to the value of its target.
275  Sends new targets to the hand from a dictionnary
276  """
277  self.sendupdate_lock.acquire()
278 
279  if (self.hand_type == "etherCAT") or (self.hand_type == "gazebo"):
280  for join in dicti.keys():
281 
282  if join not in self.eth_publishers:
283  topic = "sh_" + \
284  join.lower() + self.topic_ending + "/command"
285  self.eth_publishers[join] = rospy.Publisher(
286  topic, Float64, latch=True)
287 
288  msg_to_send = Float64()
289  msg_to_send.data = math.radians(float(dicti[join]))
290  self.eth_publishers[join].publish(msg_to_send)
291  elif self.hand_type == "CANhand":
292  message = []
293  for join in dicti.keys():
294  message.append(
295  joint(joint_name=join, joint_target=dicti[join]))
296  self.pub.publish(sendupdate(len(message), message))
297 
298  self.sendupdate_lock.release()
299 
300  def sendupdate(self, jointName, angle=0):
301  """
302  @param jointName: Name of the joint to update
303  @param angle: Target of the joint, 0 if not set
304  Sends a new target for the specified joint
305  """
306  self.sendupdate_lock.acquire()
307 
308  if (self.hand_type == "etherCAT") or (self.hand_type == "gazebo"):
309  if jointName not in self.eth_publishers:
310  topic = "sh_" + \
311  jointName.lower() + self.topic_ending + "/command"
312  self.eth_publishers[jointName] = rospy.Publisher(
313  topic, Float64, latch=True)
314 
315  msg_to_send = Float64()
316  msg_to_send.data = math.radians(float(angle))
317  self.eth_publishers[jointName].publish(msg_to_send)
318  elif self.hand_type == "CANhand":
319  message = [joint(joint_name=jointName, joint_target=angle)]
320  self.pub.publish(sendupdate(len(message), message))
321 
322  self.sendupdate_lock.release()
323 
324  def sendupdate_arm_from_dict(self, dicti):
325  """
326  @param dicti: Dictionnary containing all the targets to send, mapping
327  the name of the joint to the value of its target.
328  Sends new targets to the hand from a dictionnary
329  """
330  message = []
331  for join in dicti.keys():
332  message.append(joint(joint_name=join, joint_target=dicti[join]))
333  self.pub_arm.publish(sendupdate(len(message), message))
334 
335  def sendupdate_arm(self, jointName, angle=0):
336  """
337  @param jointName: Name of the joint to update
338  @param angle: Target of the joint, 0 if not set
339  Sends a new target for the specified joint
340  """
341  message = [joint(joint_name=jointName, joint_target=angle)]
342  self.pub_arm.publish(sendupdate(len(message), message))
343 
344  def valueof(self, jointName):
345  """
346  @param jointName: Name of the joint to read the value
347  @return: 'NaN' if the value is not correct, the actual position of the joint else
348  """
349  for jnt in self.lastMsg.joints_list:
350  if jnt.joint_name == jointName:
351  return float(jnt.joint_position)
352  for jnt in self.lastArmMsg.joints_list:
353  if jnt.joint_name == jointName:
354  return float(jnt.joint_position)
355  return 'NaN'
356 
357  def has_arm(self):
358  """
359  @return : True if an arm is detected on the roscore
360  """
361  if not self.hasarm == 0:
362  return self.hasarm
363  self.hasarm = False
364  if self.liste == 0:
365  master = rosgraph.masterapi.Master('/rostopic')
366  self.liste = master.getPublishedTopics('/')
367  for topic_typ in self.liste:
368  for topic in topic_typ:
369  if 'sr_arm/shadowhand_data' in topic:
370  self.hasarm = True
371  return self.hasarm
372 
373  def record_step_to_file(self, filename, grasp_as_xml):
374  """
375  @param filename: name (or path) of the file to save to
376  @param grasp_as_xml: xml-formatted grasp
377  Write the grasp at the end of the file, creates the file if does not exist
378  """
379  if os.path.exists(filename):
380  obj = open(filename, 'r')
381  text = obj.readlines()
382  obj.close()
383  objWrite = open(filename, 'w')
384  for index in range(0, len(text) - 1):
385  objWrite.write(text[index])
386  objWrite.write(grasp_as_xml)
387  objWrite.write('</root>')
388  objWrite.close()
389  else:
390  objWrite = open(filename, 'w')
391  objWrite.write('<root>\n')
392  objWrite.write(grasp_as_xml)
393  objWrite.write('</root>')
394  objWrite.close()
395 
396  def save_hand_position_to_file(self, filename):
397  objFile = open(filename, 'w')
398  for key, value in self.dict_pos:
399  objFile.write(key + ' ' + value + '\n')
400  objFile.close()
401 
403  """
404  @return: dictionnary mapping joint names to actual positions
405  Read all the positions in the lastMsg
406  """
407  if not self.isReady:
408  return
409  for jnt in self.lastMsg.joints_list:
410  self.dict_pos[jnt.joint_name] = jnt.joint_position
411  return self.dict_pos
412 
414  """
415  @return: dictionnary mapping joint names to current targets
416  Read all the targets in the lastMsg
417  """
418  for jnt in self.lastMsg.joints_list:
419  self.dict_tar[jnt.joint_name] = jnt.joint_target
420  return self.dict_tar
421 
423  """
424  @return: dictionary mapping joint names to current velocities
425  """
426  with self.joint_states_lock:
427  return self.hand_velocity
428 
430  """
431  @return: dictionary mapping joint names to current efforts
432  """
433  with self.joint_states_lock:
434  return self.hand_effort
435 
437  """
438  @return: dictionnary mapping joint names to actual positions
439  Read all the positions in the lastMsg
440  """
441  for jnt in self.lastArmMsg.joints_list:
442  self.dict_arm_pos[jnt.joint_name] = jnt.joint_position
443  return self.dict_arm_pos
444 
446  """
447  @return: dictionnary mapping joint names to actual targets
448  Read all the targets in the lastMsg
449  """
450  for jnt in self.lastArmMsg.joints_list:
451  self.dict_arm_tar[jnt.joint_name] = jnt.joint_target
452  return self.dict_arm_tar
453 
454  def resend_targets(self):
455  """
456  Resend the targets read in the lastMsg to the hand
457  """
458  for key, value in self.dict_tar.items():
459  self.sendupdate(jointName=key, angle=value)
460 
461  def callVisualisationService(self, callList=0, reset=0):
462  """
463  @param callList: dictionnary mapping joint names to information that should be displayed
464  @param reset: flag used to tell if the parameters should be replaced by the new ones or
465  just added to the previous ones.
466  Calls a ROS service to display various information in Rviz
467  """
468  if reset == 0:
469  print 'no reset'
470 
471  if reset == 1:
472  print 'reset'
473 
475  """
476  Only used to check if a real etherCAT hand is detected in the system
477  check if something is being published to this topic, otherwise
478  return false
479  Bear in mind that the gazebo hand also publishes the joint_states topic,
480  so we need to check for the gazebo hand first
481  """
482 
483  try:
484  rospy.wait_for_message("joint_states", JointState, timeout=0.2)
485  except rospy.ROSException:
486  rospy.logwarn("no message received from joint_states")
487  return False
488 
489  return True
490 
492  """
493  Only used to check if a Gazebo simulated (etherCAT protocol) hand is detected in the system
494  check if something is being published to this topic, otherwise
495  return false
496  """
497 
498  try:
499  rospy.wait_for_message(
500  "gazebo/parameter_updates", Config, timeout=0.2)
501  except rospy.ROSException:
502  return False
503 
504  return True
505 
507  """
508  At the moment we just try to use the mixed position velocity controllers
509  """
510  success = True
511  for joint_all in self.allJoints:
512  self.topic_ending = "_position_controller"
513  topic = "sh_" + \
514  joint_all.name.lower() + self.topic_ending + "/state"
515  success = True
516  try:
517  rospy.wait_for_message(
518  topic, control_msgs.msg.JointControllerState, timeout=0.2)
519  except:
520  try:
521  self.topic_ending = "_mixed_position_velocity_controller"
522  topic = "sh_" + \
523  joint_all.name.lower() + self.topic_ending + "/state"
524  rospy.wait_for_message(
525  topic, JointControllerState, timeout=0.2)
526  except rospy.ROSException:
527  success = False
528 
529  if success:
530  if self.topic_ending == "_mixed_position_velocity_controller":
531  self.eth_subscribers[joint_all.name] = rospy.Subscriber(
532  topic, JointControllerState, self.callback_ethercat_states, joint_all.name)
533  else:
534  self.eth_subscribers[joint_all.name] = rospy.Subscriber(
535  topic,
536  control_msgs.msg.JointControllerState,
538  joint_all.name)
539 
540  if len(self.eth_subscribers) > 0:
541  return True
542 
543  return False
544 
545  def joint_states_callback(self, joint_state):
546  """
547  The callback function for the topic joint_states.
548  It will store the received joint velocity and effort information in two dictionaries
549  Velocity will be converted to degrees/s.
550  Effort units are kept as they are (currently ADC units, as no calibration is performed on the strain gauges)
551 
552  @param joint_state: the message containing the joints data.
553  """
554  with self.joint_states_lock:
555  self.hand_velocity = {n: math.degrees(v)
556  for n, v in zip(joint_state.name, joint_state.velocity)}
557  self.hand_effort = {
558  n: e for n, e in zip(joint_state.name, joint_state.effort)}
559 
560  for finger in ['FF', 'MF', 'RF', 'LF']:
561  for dic in [self.hand_velocity, self.hand_effort]:
562  if (finger + 'J1') in dic and (finger + 'J2') in dic:
563  dic[finger + 'J0'] = dic[
564  finger + 'J1'] + dic[finger + 'J2']
565  del dic[finger + 'J1']
566  del dic[finger + 'J2']
567 
568  def get_tactile_type(self):
569  return self.tactile_receiver.get_tactile_type()
570 
571  def get_tactile_state(self):
572  return self.tactile_receiver.get_tactile_state()
def joint_states_callback(self, joint_state)
def __init__(self, name="", motor="", j_min=0, j_max=90)
def sendupdate_arm(self, jointName, angle=0)
def record_step_to_file(self, filename, grasp_as_xml)
def save_hand_position_to_file(self, filename)
def callback_ethercat_states(self, data, jointName)
def create_grasp_interpoler(self, current_step, next_step)
def callVisualisationService(self, callList=0, reset=0)
def sendupdate(self, jointName, angle=0)


sr_hand
Author(s): Ugo Cupcic
autogenerated on Mon Feb 28 2022 23:52:24