etherCAT_hand_lib.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2011 Shadow Robot Company Ltd.
4 #
5 # This program is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation version 2 of the License.
8 #
9 # This program is distributed in the hope that it will be useful, but WITHOUT
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 # more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 
18 import rospy
19 
20 import time
21 import math
22 import re
23 
24 from std_msgs.msg import Float64
25 from sensor_msgs.msg import JointState
26 from sr_robot_msgs.srv import ForceController
27 from sr_robot_msgs.msg import EthercatDebug
28 from sr_utilities.hand_finder import HandFinder
29 
30 
31 class EtherCAT_Hand_Lib(object):
32 
33  """
34  Useful python library to communicate with the etherCAT hand.
35  """
36  sensors = ["FFJ1", "FFJ2", "FFJ3", "FFJ4",
37  "MFJ1", "MFJ2", "MFJ3", "MFJ4",
38  "RFJ1", "RFJ2", "RFJ3", "RFJ4",
39  "LFJ1", "LFJ2", "LFJ3", "LFJ4", "LFJ5",
40  "THJ1", "THJ2", "THJ3", "THJ4", "THJ5A", "THJ5B",
41  "WRJ1A", "WRJ1B", "WRJ2",
42  "ACCX", "ACCY", "ACCZ",
43  "GYRX", "GYRY", "GYRZ",
44  "AN0", "AN1", "AN2", "AN3"]
45 
46  def __init__(self):
47  """
48  Useful python library to communicate with the etherCAT hand.
49  """
50  self.hand_finder = HandFinder()
51  self.hand_params = self.hand_finder.get_hand_parameters()
52  self.hand_id = ''
53  if len(self.hand_params.mapping) is not 0:
54  self.hand_id = self.hand_params.mapping.itervalues().next()
55  self.debug_subscriber = None
57  self.record_js_callback = None
58  self.raw_values = []
59  self.pid_services = {}
60  self.publishers = {}
61  self.positions = {}
62  self.velocities = {}
63  self.efforts = {}
64 
65  self.msg_to_send = Float64()
66  self.msg_to_send.data = 0.0
67 
68  # TODO: read this from parameter server
69  self.compounds = {}
70 
71  joint_to_sensor_mapping = []
72  try:
73  rospy.wait_for_message(self.hand_id + "/debug_etherCAT_data",
74  EthercatDebug, timeout=0.2)
75  try:
76  joint_to_sensor_mapping \
77  = rospy.get_param(self.hand_id + "/joint_to_sensor_mapping")
78  except:
79  rospy.logwarn("The parameter joint_to_sensor_mapping "
80  "was not found, you won't be able to get the "
81  "raw values from the EtherCAT compound sensors.")
82  except:
83  pass
84 
85  for mapping in joint_to_sensor_mapping:
86  if mapping[0] is 1:
87  if "THJ5" in mapping[1][0]:
88  self.compounds["THJ5"] = [["THJ5A", mapping[1][1]],
89  ["THJ5B", mapping[2][1]]]
90  elif "WRJ1" in mapping[1][0]:
91  self.compounds["WRJ1"] = [["WRJ1A", mapping[1][1]],
92  ["WRJ1B", mapping[2][1]]]
93 
94  def sendupdate(self, joint_name, target, controller_type="effort"):
95  if joint_name not in self.publishers:
96  topic = "sh_" + joint_name.lower() + "_" + controller_type \
97  + "_controller/command"
98  self.publishers[joint_name] = rospy.Publisher(topic, Float64)
99 
100  self.msg_to_send.data = math.radians(float(target))
101  self.publishers[joint_name].publish(self.msg_to_send)
102 
103  def get_position(self, joint_name):
104  value = None
105  try:
106  value = self.positions[joint_name]
107  except:
108  # We check if the reason to except is that we are trying to
109  # access the joint 0
110  # Position of the J0 is the addition of the positions of J1 and J2
111  m = re.match("(?P<finger>\w{2})J0", joint_name)
112  if m is not None:
113  value = self.positions[m.group("finger") + "J1"] +\
114  self.positions[m.group("finger") + "J2"]
115  else:
116  raise
117  return value
118 
119  def get_velocity(self, joint_name):
120  value = None
121  try:
122  value = self.velocities[joint_name]
123  except:
124  # We check if the reason to except is that we are
125  # trying to access the joint 0
126  m = re.match("(?P<finger>\w{2})J0", joint_name)
127  if m is not None:
128  value = self.velocities[m.group("finger") + "J1"] + \
129  self.velocities[m.group("finger") + "J2"]
130  else:
131  raise
132  return value
133 
134  def get_effort(self, joint_name):
135  value = None
136  try:
137  value = self.efforts[joint_name]
138  except:
139  # We check if the reason to except is that we are
140  # trying to access the joint 0
141  # Effort of the J0 is the same as the effort of
142  # J1 and J2, so we pick J1
143  m = re.match("(?P<finger>\w{2})J0", joint_name)
144  if m is not None:
145  value = self.efforts[m.group("finger") + "J1"]
146  else:
147  raise
148  return value
149 
150  def start_record(self, joint_name, callback):
151  self.record_js_callback = callback
152 
153  def stop_record(self, joint_name):
154  self.callback = None
155 
156  def set_pid(self, joint_name, pid_parameters):
157  """
158  """
159  if joint_name not in self.pid_services:
160  service_name = "sr_hand_robot/change_force_PID_" + joint_name
161  self.pid_services[joint_name] = rospy.ServiceProxy(service_name,
162  ForceController)
163 
164  self.pid_services[joint_name](pid_parameters["max_pwm"],
165  pid_parameters["sgleftref"],
166  pid_parameters["sgrightref"],
167  pid_parameters["f"],
168  pid_parameters["p"],
169  pid_parameters["i"],
170  pid_parameters["d"],
171  pid_parameters["imax"],
172  pid_parameters["deadband"],
173  pid_parameters["sign"])
174 
175  def debug_callback(self, msg):
176  if not(all(v == 0 for v in msg.sensors)):
177  self.raw_values = msg.sensors
178 
179  def joint_state_callback(self, msg):
180  for name, pos, vel, effort in \
181  zip(msg.name, msg.position, msg.velocity, msg.effort):
182  self.positions[name] = pos
183  self.velocities[name] = vel
184  self.efforts[name] = effort
185 
186  if self.record_js_callback is not None:
187  self.record_js_callback()
188 
189  def get_raw_value(self, sensor_name):
190  value = 0.0
191 
192  if not self.raw_values:
193  rospy.logwarn("No values received from the etherCAT hand.")
194  return -1.0
195 
196  try:
197  if sensor_name in self.compounds.keys():
198  for sub_compound in self.compounds[sensor_name]:
199  index = self.sensors.index(sub_compound[0])
200  value = value + (self.raw_values[index] * sub_compound[1])
201  else:
202  index = self.sensors.index(sensor_name)
203  value = self.raw_values[index]
204  except:
205  # if the value is not found we're returning 4095
206  value = 4095
207  return value
208 
209  def get_average_raw_value(self, sensor_name, number_of_samples=10, accept_zeros=True):
210  """
211  Get the average raw value for the given sensor, average on
212  number_of_samples
213  """
214  tmp_raw_values = []
215  while len(tmp_raw_values) < number_of_samples:
216  value = self.get_raw_value(sensor_name)
217  if accept_zeros or value > 0.0:
218  tmp_raw_values.append(value)
219  time.sleep(0.002)
220  average = float(sum(tmp_raw_values)) / len(tmp_raw_values)
221  return average
222 
223  def activate(self):
224  # check if something is being published to those topics, otherwise
225  # return false (the library is not activated)
226  try:
227  rospy.wait_for_message(self.hand_id + "/debug_etherCAT_data",
228  EthercatDebug, timeout=0.2)
229  rospy.wait_for_message("joint_states", JointState, timeout=0.2)
230  except:
231  return False
232 
233  self.debug_subscriber =\
234  rospy.Subscriber(self.hand_id + "/debug_etherCAT_data",
235  EthercatDebug, self.debug_callback)
236  self.joint_state_subscriber =\
237  rospy.Subscriber("joint_states",
238  JointState, self.joint_state_callback)
239  return True
240 
242  # check if something is being published to this topic, otherwise
243  # return false (the library is not activated)
244  try:
245  rospy.wait_for_message("joint_states", JointState, timeout=0.2)
246  self.joint_state_subscriber = \
247  rospy.Subscriber("joint_states", JointState,
249  except:
250  return False
251 
252  return True
253 
254  def on_close(self):
255  if self.debug_subscriber is not None:
256  self.debug_subscriber.unregister()
257  self.debug_subscriber = None
258 
259  if self.joint_state_subscriber is not None:
260  self.joint_state_subscriber.unregister()
261  self.joint_state_subscriber = None
def set_pid(self, joint_name, pid_parameters)
def get_average_raw_value(self, sensor_name, number_of_samples=10, accept_zeros=True)
def sendupdate(self, joint_name, target, controller_type="effort")


sr_robot_lib
Author(s): Ugo Cupcic, Toni Oliver
autogenerated on Tue Oct 13 2020 04:01:57