shadowhand_ros.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2011 Shadow Robot Company Ltd.
4 #
5 # This program is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 2 of the License, or (at your option)
8 # any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program. If not, see <http://www.gnu.org/licenses/>.
17 #
18 import time
19 import os
20 import math
21 import rospy
22 import rospkg
23 import threading
24 import rosgraph.masterapi
25 import control_msgs.msg
26 from sr_robot_msgs.msg import sendupdate, joint, joints_data, JointControllerState
27 from sensor_msgs.msg import JointState
28 from std_msgs.msg import Float64
29 from sr_hand.grasps_parser import GraspParser
30 from sr_hand.grasps_interpoler import GraspInterpoler
31 from sr_hand.tactile_receiver import TactileReceiver
32 # this is only used to detect if the hand is a Gazebo hand, not to
33 # actually reconfigure anything
34 from dynamic_reconfigure.msg import Config
35 
36 
37 class Joint(object):
38 
39  def __init__(self, name="", motor="", j_min=0, j_max=90):
40  self.name = name
41  self.motor = motor
42  self.min = j_min
43  self.max = j_max
44 
45 
46 class ShadowHand_ROS(object):
47 
48  """
49  This is a python library used to easily access the shadow hand ROS interface.
50  """
51 
52  def __init__(self):
53  """
54  Builds the library, creates the communication node in ROS and
55  initializes the hand publisher and subscriber to the default
56  values of shadowhand_data and sendupdate
57  """
58  self.allJoints = [Joint("THJ1", "smart_motor_th1", 0, 90),
59  Joint("THJ2", "smart_motor_th2", -40, 40),
60  Joint("THJ3", "smart_motor_th3", -15, 15),
61  Joint("THJ4", "smart_motor_th4", 0, 75),
62  Joint("THJ5", "smart_motor_th5", -60, 60),
63  Joint("FFJ0", "smart_motor_ff2", 0, 180),
64  Joint("FFJ3", "smart_motor_ff3"),
65  Joint("FFJ4", "smart_motor_ff4", -20, 20),
66  Joint("MFJ0", "smart_motor_mf2", 0, 180),
67  Joint("MFJ3", "smart_motor_mf3"),
68  Joint("MFJ4", "smart_motor_mf4", -20, 20),
69  Joint("RFJ0", "smart_motor_rf2", 0, 180),
70  Joint("RFJ3", "smart_motor_rf3"),
71  Joint("RFJ4", "smart_motor_rf4", -20, 20),
72  Joint("LFJ0", "smart_motor_lf2", 0, 180),
73  Joint("LFJ3", "smart_motor_lf3"),
74  Joint("LFJ4", "smart_motor_lf4", -20, 20),
75  Joint("LFJ5", "smart_motor_lf5", 0, 45),
76  Joint("WRJ1", "smart_motor_wr1", -45, 30),
77  Joint("WRJ2", "smart_motor_wr2", -30, 10),
78  ]
79  self.handJoints = []
80  self.armJoints = [Joint("ShoulderJRotate", "", -45, 60),
81  Joint("ShoulderJSwing", "", 0, 80),
82  Joint("ElbowJSwing", "", 0, 120),
83  Joint("ElbowJRotate", "", -80, 80)
84  ]
85  self.lastMsg = joints_data()
86  self.lastArmMsg = joints_data()
87  self.cyberglove_pub = 0
88  self.cyberglove_sub = 0
89  self.cybergrasp_pub = 0
90  self.cyberglove_sub = 0
91  self.isFirstMessage = True
92  self.isFirstMessageArm = True
93  self.isReady = False
94  self.liste = 0
95  self.hasarm = 0
96  self.dict_pos = {}
97  self.dict_tar = {}
98  self.dict_arm_pos = {}
99  self.dict_arm_tar = {}
101  self.eth_publishers = {}
102  self.eth_subscribers = {}
103  # rospy.init_node('python_hand_library')
104  self.sendupdate_lock = threading.Lock()
105 
106  self.joint_states_lock = threading.Lock()
107 
108  # contains the ending for the topic depending on which controllers are
109  # loaded
110  self.topic_ending = ""
111 
112  # EtherCAT hand
114 
115  #
116  # Grasps
118 
119  self.rootPath = rospkg.RosPack().get_path('sr_hand')
120  self.grasp_parser.parse_tree(
121  self.rootPath + "/scripts/sr_hand/grasps.xml")
122 
124  self.pub = rospy.Publisher(
125  'srh/sendupdate', sendupdate, queue_size=1, latch=True)
126  self.pub_arm = rospy.Publisher(
127  'sr_arm/sendupdate', sendupdate, queue_size=1, latch=True)
128 
129  self.sub_arm = rospy.Subscriber(
130  'sr_arm/shadowhand_data', joints_data, self.callback_arm)
131  self.sub = rospy.Subscriber(
132  'srh/shadowhand_data', joints_data, self.callback)
133 
135 
136  self.hand_velocity = {}
137  self.hand_effort = {}
138 
139  if (self.hand_type == "etherCAT") or (self.hand_type == "gazebo"):
140  self.joint_states_listener = rospy.Subscriber(
141  "joint_states", JointState, self.joint_states_callback)
142  # Initialize the command publishers here, to avoid the delay caused
143  # when initializing them in the sendupdate
144  for jnt in self.allJoints:
145  if jnt.name not in self.eth_publishers:
146  topic = "sh_" + \
147  jnt.name.lower() + self.topic_ending + "/command"
148  self.eth_publishers[jnt.name] = rospy.Publisher(
149  topic, Float64, queue_size=1, latch=True)
151 
152  threading.Thread(None, rospy.spin)
153 
154  def create_grasp_interpoler(self, current_step, next_step):
155  self.grasp_interpoler = GraspInterpoler(current_step, next_step)
156 
157  def callback_ethercat_states(self, data, jointName):
158  """
159  @param data: The ROS message received which called the callback
160  If the message is the first received, initializes the dictionnaries
161  Else, it updates the lastMsg
162  @param joint: a Joint object that contains the name of the joint that we receive data from
163  """
164  joint_data = joint(joint_name=jointName, joint_target=math.degrees(
165  float(data.set_point)), joint_position=math.degrees(float(data.process_value)))
166 
167  # update the dictionary of joints
168  self.dict_ethercat_joints[joint_data.joint_name] = joint_data
169 
170  # Build a CAN hand style lastMsg with the latest info in
171  # self.dict_ethercat_joints
172  self.lastMsg = joints_data()
173 
174  for joint_name in self.dict_ethercat_joints.keys():
175  self.lastMsg.joints_list.append(
176  self.dict_ethercat_joints[joint_name])
177  # self.lastMsg.joints_list_length++
178 
179  # TODO This is not the right way to do it. We should check that messages from all the joints have been
180  # received (but we don't know if we have all the joints, e.g three
181  # finger hand)
182  self.isReady = True
183 
184  # if self.isFirstMessage :
185  # self.init_actual_joints()
186  # for joint in self.lastMsg.joints_list :
187  # self.dict_pos[joint.joint_name]=joint.joint_position
188  # self.dict_tar[joint.joint_name]=joint.joint_target
189  # self.isFirstMessage = False
190  # self.isReady = True
191 
192  def callback(self, data):
193  """
194  @param data: The ROS message received which called the callback
195  If the message is the first received, initializes the dictionnaries
196  Else, it updates the lastMsg
197  """
198  self.lastMsg = data
199  if self.isFirstMessage:
200  self.init_actual_joints()
201  for jnt in self.lastMsg.joints_list:
202  self.dict_pos[jnt.joint_name] = jnt.joint_position
203  self.dict_tar[jnt.joint_name] = jnt.joint_target
204  self.isFirstMessage = False
205  self.isReady = True
206 
207  def callback_arm(self, data):
208  """
209  @param data: The ROS message received which called the callback
210  If the message is the first received, initializes the dictionnaries
211  Else, it updates the lastMsg
212  """
213  self.lastArmMsg = data
214  if self.isFirstMessageArm:
215  for jnt in self.lastArmMsg.joints_list:
216  self.dict_arm_pos[jnt.joint_name] = jnt.joint_position
217  self.dict_arm_tar[jnt.joint_name] = jnt.joint_target
218 
220  """
221  Initializes the library with just the fingers actually connected
222  """
223  for joint_all in self.allJoints:
224  for joint_msg in self.lastMsg.joints_list:
225  if joint_msg.joint_name == joint_all.name:
226  self.handJoints.append(joint_all)
227 
228  def check_hand_type(self):
229  """
230  @return : true if some hand is detected
231  """
232  if self.check_gazebo_hand_presence():
233  return "gazebo"
234  elif self.check_etherCAT_hand_presence():
235  return "etherCAT"
236  elif self.check_CAN_hand_presence():
237  return "CANhand"
238  return None
239 
241  """
242  @return : true if the CAN hand is detected
243  """
244  t = 0.0
245  while not self.isReady:
246  time.sleep(1.0)
247  t = t + 1.0
248  rospy.loginfo(
249  "Waiting for service since " + str(t) + " seconds...")
250  if t >= 5.0:
251  rospy.logerr(
252  "No hand found. Are you sure the ROS hand is running ?")
253  return False
254  return True
255 
256  def set_shadowhand_data_topic(self, topic):
257  """
258  @param topic: The new topic to be set as the hand publishing topic
259  Set the library to listen to a new topic
260  """
261  print 'Changing subscriber to ' + topic
262  self.sub = rospy.Subscriber(topic, joints_data, self.callback)
263 
264  def set_sendupdate_topic(self, topic):
265  """
266  @param topic: The new topic to be set as the hand subscribing topic
267  Set the library to publish to a new topic
268  """
269  print 'Changing publisher to ' + topic
270  self.pub = rospy.Publisher(topic, sendupdate, latch=True)
271 
272  def sendupdate_from_dict(self, dicti):
273  """
274  @param dicti: Dictionnary containing all the targets to send, mapping
275  the name of the joint to the value of its target.
276  Sends new targets to the hand from a dictionnary
277  """
278  self.sendupdate_lock.acquire()
279 
280  if (self.hand_type == "etherCAT") or (self.hand_type == "gazebo"):
281  for join in dicti.keys():
282 
283  if join not in self.eth_publishers:
284  topic = "sh_" + \
285  join.lower() + self.topic_ending + "/command"
286  self.eth_publishers[join] = rospy.Publisher(
287  topic, Float64, latch=True)
288 
289  msg_to_send = Float64()
290  msg_to_send.data = math.radians(float(dicti[join]))
291  self.eth_publishers[join].publish(msg_to_send)
292  elif self.hand_type == "CANhand":
293  message = []
294  for join in dicti.keys():
295  message.append(
296  joint(joint_name=join, joint_target=dicti[join]))
297  self.pub.publish(sendupdate(len(message), message))
298 
299  self.sendupdate_lock.release()
300 
301  def sendupdate(self, jointName, angle=0):
302  """
303  @param jointName: Name of the joint to update
304  @param angle: Target of the joint, 0 if not set
305  Sends a new target for the specified joint
306  """
307  self.sendupdate_lock.acquire()
308 
309  if (self.hand_type == "etherCAT") or (self.hand_type == "gazebo"):
310  if jointName not in self.eth_publishers:
311  topic = "sh_" + \
312  jointName.lower() + self.topic_ending + "/command"
313  self.eth_publishers[jointName] = rospy.Publisher(
314  topic, Float64, latch=True)
315 
316  msg_to_send = Float64()
317  msg_to_send.data = math.radians(float(angle))
318  self.eth_publishers[jointName].publish(msg_to_send)
319  elif self.hand_type == "CANhand":
320  message = [joint(joint_name=jointName, joint_target=angle)]
321  self.pub.publish(sendupdate(len(message), message))
322 
323  self.sendupdate_lock.release()
324 
325  def sendupdate_arm_from_dict(self, dicti):
326  """
327  @param dicti: Dictionnary containing all the targets to send, mapping
328  the name of the joint to the value of its target.
329  Sends new targets to the hand from a dictionnary
330  """
331  message = []
332  for join in dicti.keys():
333  message.append(joint(joint_name=join, joint_target=dicti[join]))
334  self.pub_arm.publish(sendupdate(len(message), message))
335 
336  def sendupdate_arm(self, jointName, angle=0):
337  """
338  @param jointName: Name of the joint to update
339  @param angle: Target of the joint, 0 if not set
340  Sends a new target for the specified joint
341  """
342  message = [joint(joint_name=jointName, joint_target=angle)]
343  self.pub_arm.publish(sendupdate(len(message), message))
344 
345  def valueof(self, jointName):
346  """
347  @param jointName: Name of the joint to read the value
348  @return: 'NaN' if the value is not correct, the actual position of the joint else
349  """
350  for jnt in self.lastMsg.joints_list:
351  if jnt.joint_name == jointName:
352  return float(jnt.joint_position)
353  for jnt in self.lastArmMsg.joints_list:
354  if jnt.joint_name == jointName:
355  return float(jnt.joint_position)
356  return 'NaN'
357 
358  def has_arm(self):
359  """
360  @return : True if an arm is detected on the roscore
361  """
362  if not self.hasarm == 0:
363  return self.hasarm
364  self.hasarm = False
365  if self.liste == 0:
366  master = rosgraph.masterapi.Master('/rostopic')
367  self.liste = master.getPublishedTopics('/')
368  for topic_typ in self.liste:
369  for topic in topic_typ:
370  if 'sr_arm/shadowhand_data' in topic:
371  self.hasarm = True
372  return self.hasarm
373 
374  def record_step_to_file(self, filename, grasp_as_xml):
375  """
376  @param filename: name (or path) of the file to save to
377  @param grasp_as_xml: xml-formatted grasp
378  Write the grasp at the end of the file, creates the file if does not exist
379  """
380  if os.path.exists(filename):
381  obj = open(filename, 'r')
382  text = obj.readlines()
383  obj.close()
384  objWrite = open(filename, 'w')
385  for index in range(0, len(text) - 1):
386  objWrite.write(text[index])
387  objWrite.write(grasp_as_xml)
388  objWrite.write('</root>')
389  objWrite.close()
390  else:
391  objWrite = open(filename, 'w')
392  objWrite.write('<root>\n')
393  objWrite.write(grasp_as_xml)
394  objWrite.write('</root>')
395  objWrite.close()
396 
397  def save_hand_position_to_file(self, filename):
398  objFile = open(filename, 'w')
399  for key, value in self.dict_pos:
400  objFile.write(key + ' ' + value + '\n')
401  objFile.close()
402 
404  """
405  @return: dictionnary mapping joint names to actual positions
406  Read all the positions in the lastMsg
407  """
408  if not self.isReady:
409  return
410  for jnt in self.lastMsg.joints_list:
411  self.dict_pos[jnt.joint_name] = jnt.joint_position
412  return self.dict_pos
413 
415  """
416  @return: dictionnary mapping joint names to current targets
417  Read all the targets in the lastMsg
418  """
419  for jnt in self.lastMsg.joints_list:
420  self.dict_tar[jnt.joint_name] = jnt.joint_target
421  return self.dict_tar
422 
424  """
425  @return: dictionary mapping joint names to current velocities
426  """
427  with self.joint_states_lock:
428  return self.hand_velocity
429 
431  """
432  @return: dictionary mapping joint names to current efforts
433  """
434  with self.joint_states_lock:
435  return self.hand_effort
436 
438  """
439  @return: dictionnary mapping joint names to actual positions
440  Read all the positions in the lastMsg
441  """
442  for jnt in self.lastArmMsg.joints_list:
443  self.dict_arm_pos[jnt.joint_name] = jnt.joint_position
444  return self.dict_arm_pos
445 
447  """
448  @return: dictionnary mapping joint names to actual targets
449  Read all the targets in the lastMsg
450  """
451  for jnt in self.lastArmMsg.joints_list:
452  self.dict_arm_tar[jnt.joint_name] = jnt.joint_target
453  return self.dict_arm_tar
454 
455  def resend_targets(self):
456  """
457  Resend the targets read in the lastMsg to the hand
458  """
459  for key, value in self.dict_tar.items():
460  self.sendupdate(jointName=key, angle=value)
461 
462  def callVisualisationService(self, callList=0, reset=0):
463  """
464  @param callList: dictionnary mapping joint names to information that should be displayed
465  @param reset: flag used to tell if the parameters should be replaced by the new ones or
466  just added to the previous ones.
467  Calls a ROS service to display various information in Rviz
468  """
469  if reset == 0:
470  print 'no reset'
471 
472  if reset == 1:
473  print 'reset'
474 
476  """
477  Only used to check if a real etherCAT hand is detected in the system
478  check if something is being published to this topic, otherwise
479  return false
480  Bear in mind that the gazebo hand also publishes the joint_states topic,
481  so we need to check for the gazebo hand first
482  """
483 
484  try:
485  rospy.wait_for_message("joint_states", JointState, timeout=0.2)
486  except rospy.ROSException:
487  rospy.logwarn("no message received from joint_states")
488  return False
489 
490  return True
491 
493  """
494  Only used to check if a Gazebo simulated (etherCAT protocol) hand is detected in the system
495  check if something is being published to this topic, otherwise
496  return false
497  """
498 
499  try:
500  rospy.wait_for_message(
501  "gazebo/parameter_updates", Config, timeout=0.2)
502  except rospy.ROSException:
503  return False
504 
505  return True
506 
508  """
509  At the moment we just try to use the mixed position velocity controllers
510  """
511  success = True
512  for joint_all in self.allJoints:
513  self.topic_ending = "_position_controller"
514  topic = "sh_" + \
515  joint_all.name.lower() + self.topic_ending + "/state"
516  success = True
517  try:
518  rospy.wait_for_message(
519  topic, control_msgs.msg.JointControllerState, timeout=0.2)
520  except:
521  try:
522  self.topic_ending = "_mixed_position_velocity_controller"
523  topic = "sh_" + \
524  joint_all.name.lower() + self.topic_ending + "/state"
525  rospy.wait_for_message(
526  topic, JointControllerState, timeout=0.2)
527  except rospy.ROSException:
528  success = False
529 
530  if success:
531  if self.topic_ending == "_mixed_position_velocity_controller":
532  self.eth_subscribers[joint_all.name] = rospy.Subscriber(
533  topic, JointControllerState, self.callback_ethercat_states, joint_all.name)
534  else:
535  self.eth_subscribers[joint_all.name] = rospy.Subscriber(
536  topic,
537  control_msgs.msg.JointControllerState,
539  joint_all.name)
540 
541  if len(self.eth_subscribers) > 0:
542  return True
543 
544  return False
545 
546  def joint_states_callback(self, joint_state):
547  """
548  The callback function for the topic joint_states.
549  It will store the received joint velocity and effort information in two dictionaries
550  Velocity will be converted to degrees/s.
551  Effort units are kept as they are (currently ADC units, as no calibration is performed on the strain gauges)
552 
553  @param joint_state: the message containing the joints data.
554  """
555  with self.joint_states_lock:
556  self.hand_velocity = {n: math.degrees(v)
557  for n, v in zip(joint_state.name, joint_state.velocity)}
558  self.hand_effort = {
559  n: e for n, e in zip(joint_state.name, joint_state.effort)}
560 
561  for finger in ['FF', 'MF', 'RF', 'LF']:
562  for dic in [self.hand_velocity, self.hand_effort]:
563  if (finger + 'J1') in dic and (finger + 'J2') in dic:
564  dic[finger + 'J0'] = dic[
565  finger + 'J1'] + dic[finger + 'J2']
566  del dic[finger + 'J1']
567  del dic[finger + 'J2']
568 
569  def get_tactile_type(self):
570  return self.tactile_receiver.get_tactile_type()
571 
572  def get_tactile_state(self):
573  return self.tactile_receiver.get_tactile_state()
def joint_states_callback(self, joint_state)
def __init__(self, name="", motor="", j_min=0, j_max=90)
def sendupdate_arm(self, jointName, angle=0)
def record_step_to_file(self, filename, grasp_as_xml)
def save_hand_position_to_file(self, filename)
def callback_ethercat_states(self, data, jointName)
def create_grasp_interpoler(self, current_step, next_step)
def callVisualisationService(self, callList=0, reset=0)
def sendupdate(self, jointName, angle=0)


sr_hand
Author(s): Ugo Cupcic
autogenerated on Tue Oct 13 2020 03:55:53