etherCAT_hand_lib.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2011 Shadow Robot Company Ltd.
00004 #
00005 # This program is free software: you can redistribute it and/or modify it
00006 # under the terms of the GNU General Public License as published by the Free
00007 # Software Foundation, either version 2 of the License, or (at your option)
00008 # any later version.
00009 #
00010 # This program is distributed in the hope that it will be useful, but WITHOUT
00011 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00012 # FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
00013 # more details.
00014 #
00015 # You should have received a copy of the GNU General Public License along
00016 # with this program.  If not, see <http://www.gnu.org/licenses/>.
00017 #
00018 
00019 import rospy
00020 
00021 import time
00022 import math
00023 import re
00024 
00025 from std_msgs.msg import Float64
00026 from sensor_msgs.msg import JointState
00027 from sr_robot_msgs.srv import ForceController
00028 from sr_robot_msgs.msg import EthercatDebug
00029 from sr_utilities.hand_finder import HandFinder
00030 
00031 
00032 class EtherCAT_Hand_Lib(object):
00033 
00034     """
00035     Useful python library to communicate with the etherCAT hand.
00036     """
00037     sensors = ["FFJ1", "FFJ2", "FFJ3", "FFJ4",
00038                "MFJ1", "MFJ2", "MFJ3", "MFJ4",
00039                "RFJ1", "RFJ2", "RFJ3", "RFJ4",
00040                "LFJ1", "LFJ2", "LFJ3", "LFJ4", "LFJ5",
00041                "THJ1", "THJ2", "THJ3", "THJ4", "THJ5A", "THJ5B",
00042                "WRJ1A", "WRJ1B", "WRJ2",
00043                "ACCX", "ACCY", "ACCZ",
00044                "GYRX", "GYRY", "GYRZ",
00045                "AN0", "AN1", "AN2", "AN3"]
00046 
00047     def __init__(self):
00048         """
00049         Useful python library to communicate with the etherCAT hand.
00050         """
00051         self.hand_finder = HandFinder()
00052         self.hand_params = self.hand_finder.get_hand_parameters()
00053         self.hand_id = ''
00054         if len(self.hand_params.mapping) is not 0:
00055             self.hand_id = self.hand_params.mapping.itervalues().next()
00056         self.debug_subscriber = None
00057         self.joint_state_subscriber = None
00058         self.record_js_callback = None
00059         self.raw_values = []
00060         self.pid_services = {}
00061         self.publishers = {}
00062         self.positions = {}
00063         self.velocities = {}
00064         self.efforts = {}
00065 
00066         self.msg_to_send = Float64()
00067         self.msg_to_send.data = 0.0
00068 
00069         # TODO: read this from parameter server
00070         self.compounds = {}
00071 
00072         joint_to_sensor_mapping = []
00073         try:
00074             rospy.wait_for_message(self.hand_id + "/debug_etherCAT_data",
00075                                    EthercatDebug, timeout=0.2)
00076             try:
00077                 joint_to_sensor_mapping \
00078                     = rospy.get_param(self.hand_id + "/joint_to_sensor_mapping")
00079             except:
00080                 rospy.logwarn("The parameter joint_to_sensor_mapping "
00081                               "was not found, you won't be able to get the "
00082                               "raw values from the EtherCAT compound sensors.")
00083         except:
00084             pass
00085 
00086         for mapping in joint_to_sensor_mapping:
00087             if mapping[0] is 1:
00088                 if "THJ5" in mapping[1][0]:
00089                     self.compounds["THJ5"] = [["THJ5A", mapping[1][1]],
00090                                               ["THJ5B", mapping[2][1]]]
00091                 elif "WRJ1" in mapping[1][0]:
00092                     self.compounds["WRJ1"] = [["WRJ1A", mapping[1][1]],
00093                                               ["WRJ1B", mapping[2][1]]]
00094 
00095     def sendupdate(self, joint_name, target, controller_type="effort"):
00096         if joint_name not in self.publishers:
00097             topic = "sh_" + joint_name.lower() + "_" + controller_type \
00098                     + "_controller/command"
00099             self.publishers[joint_name] = rospy.Publisher(topic, Float64)
00100 
00101         self.msg_to_send.data = math.radians(float(target))
00102         self.publishers[joint_name].publish(self.msg_to_send)
00103 
00104     def get_position(self, joint_name):
00105         value = None
00106         try:
00107             value = self.positions[joint_name]
00108         except:
00109             # We check if the reason to except is that we are trying to
00110             # access the joint 0
00111             # Position of the J0 is the addition of the positions of J1 and J2
00112             m = re.match("(?P<finger>\w{2})J0", joint_name)
00113             if m is not None:
00114                 value = self.positions[m.group("finger") + "J1"] +\
00115                     self.positions[m.group("finger") + "J2"]
00116             else:
00117                 raise
00118         return value
00119 
00120     def get_velocity(self, joint_name):
00121         value = None
00122         try:
00123             value = self.velocities[joint_name]
00124         except:
00125             # We check if the reason to except is that we are
00126             #  trying to access the joint 0
00127             m = re.match("(?P<finger>\w{2})J0", joint_name)
00128             if m is not None:
00129                 value = self.velocities[m.group("finger") + "J1"] + \
00130                     self.velocities[m.group("finger") + "J2"]
00131             else:
00132                 raise
00133         return value
00134 
00135     def get_effort(self, joint_name):
00136         value = None
00137         try:
00138             value = self.efforts[joint_name]
00139         except:
00140             # We check if the reason to except is that we are
00141             #  trying to access the joint 0
00142             # Effort of the J0 is the same as the effort of
00143             # J1 and J2, so we pick J1
00144             m = re.match("(?P<finger>\w{2})J0", joint_name)
00145             if m is not None:
00146                 value = self.efforts[m.group("finger") + "J1"]
00147             else:
00148                 raise
00149         return value
00150 
00151     def start_record(self, joint_name, callback):
00152         self.record_js_callback = callback
00153 
00154     def stop_record(self, joint_name):
00155         self.callback = None
00156 
00157     def set_pid(self, joint_name, pid_parameters):
00158         """
00159         """
00160         if joint_name not in self.pid_services:
00161             service_name = "realtime_loop/change_force_PID_" + joint_name
00162             self.pid_services[joint_name] = rospy.ServiceProxy(service_name,
00163                                                                ForceController)
00164 
00165         self.pid_services[joint_name](pid_parameters["max_pwm"],
00166                                       pid_parameters["sgleftref"],
00167                                       pid_parameters["sgrightref"],
00168                                       pid_parameters["f"],
00169                                       pid_parameters["p"],
00170                                       pid_parameters["i"],
00171                                       pid_parameters["d"],
00172                                       pid_parameters["imax"],
00173                                       pid_parameters["deadband"],
00174                                       pid_parameters["sign"])
00175 
00176     def debug_callback(self, msg):
00177         self.raw_values = msg.sensors
00178 
00179     def joint_state_callback(self, msg):
00180         for name, pos, vel, effort in \
00181                 zip(msg.name, msg.position, msg.velocity, msg.effort):
00182             self.positions[name] = pos
00183             self.velocities[name] = vel
00184             self.efforts[name] = effort
00185 
00186         if self.record_js_callback is not None:
00187             self.record_js_callback()
00188 
00189     def get_raw_value(self, sensor_name):
00190         value = 0.0
00191 
00192         if not self.raw_values:
00193             rospy.logwarn("No values received from the etherCAT hand.")
00194             return -1.0
00195 
00196         try:
00197             if sensor_name in self.compounds.keys():
00198                 for sub_compound in self.compounds[sensor_name]:
00199                     index = self.sensors.index(sub_compound[0])
00200                     value = value + (self.raw_values[index] * sub_compound[1])
00201             else:
00202                 index = self.sensors.index(sensor_name)
00203                 value = self.raw_values[index]
00204         except:
00205             # if the value is not found we're returning 4095
00206             value = 4095
00207         return value
00208 
00209     def get_average_raw_value(self, sensor_name, number_of_samples=10):
00210         """
00211         Get the average raw value for the given sensor, average on
00212         number_of_samples
00213         """
00214         tmp_raw_values = []
00215         for i in range(0, number_of_samples):
00216             tmp_raw_values.append(self.get_raw_value(sensor_name))
00217             time.sleep(0.002)
00218 
00219         average = float(sum(tmp_raw_values)) / len(tmp_raw_values)
00220         return average
00221 
00222     def activate(self):
00223         # check if something is being published to those topics, otherwise
00224         # return false (the library is not activated)
00225         try:
00226             rospy.wait_for_message(self.hand_id + "/debug_etherCAT_data",
00227                                    EthercatDebug, timeout=0.2)
00228             rospy.wait_for_message("joint_states", JointState, timeout=0.2)
00229         except:
00230             return False
00231 
00232         self.debug_subscriber =\
00233             rospy.Subscriber(self.hand_id + "/debug_etherCAT_data",
00234                              EthercatDebug, self.debug_callback)
00235         self.joint_state_subscriber =\
00236             rospy.Subscriber("joint_states",
00237                              JointState, self.joint_state_callback)
00238         return True
00239 
00240     def activate_joint_states(self):
00241         # check if something is being published to this topic, otherwise
00242         # return false (the library is not activated)
00243         try:
00244             rospy.wait_for_message("joint_states", JointState, timeout=0.2)
00245             self.joint_state_subscriber = \
00246                 rospy.Subscriber("joint_states", JointState,
00247                                  self.joint_state_callback)
00248         except:
00249             return False
00250 
00251         return True
00252 
00253     def on_close(self):
00254         if self.debug_subscriber is not None:
00255             self.debug_subscriber.unregister()
00256             self.debug_subscriber = None
00257 
00258         if self.joint_state_subscriber is not None:
00259             self.joint_state_subscriber.unregister()
00260             self.joint_state_subscriber = None


sr_robot_lib
Author(s): Ugo Cupcic, Toni Oliver
autogenerated on Mon Jul 1 2019 20:06:26