etherCAT_hand_lib.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2011 Shadow Robot Company Ltd.
00004 #
00005 # This program is free software: you can redistribute it and/or modify it
00006 # under the terms of the GNU General Public License as published by the Free
00007 # Software Foundation, either version 2 of the License, or (at your option)
00008 # any later version.
00009 #
00010 # This program is distributed in the hope that it will be useful, but WITHOUT
00011 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00012 # FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
00013 # more details.
00014 #
00015 # You should have received a copy of the GNU General Public License along
00016 # with this program.  If not, see <http://www.gnu.org/licenses/>.
00017 #
00018 
00019 import roslib; roslib.load_manifest('sr_robot_lib')
00020 import rospy
00021 
00022 import time, math
00023 import re
00024 
00025 from std_msgs.msg import Float64
00026 from sensor_msgs.msg import JointState
00027 from sr_robot_msgs.srv import ForceController
00028 from sr_robot_msgs.msg import EthercatDebug
00029 
00030 class EtherCAT_Hand_Lib(object):
00031     """
00032     Useful python library to communicate with the etherCAT hand.
00033     """
00034     sensors = [ "FFJ1",  "FFJ2",  "FFJ3", "FFJ4",
00035                 "MFJ1",  "MFJ2",  "MFJ3", "MFJ4",
00036                 "RFJ1",  "RFJ2",  "RFJ3", "RFJ4",
00037                 "LFJ1",  "LFJ2",  "LFJ3", "LFJ4", "LFJ5",
00038                 "THJ1",  "THJ2",  "THJ3", "THJ4", "THJ5A", "THJ5B",
00039                 "WRJ1A", "WRJ1B", "WRJ2",
00040                 "ACCX",  "ACCY",  "ACCZ",
00041                 "GYRX",  "GYRY",  "GYRZ",
00042                 "AN0",   "AN1",   "AN2",  "AN3" ]
00043 
00044     def __init__(self):
00045         """
00046         Useful python library to communicate with the etherCAT hand.
00047         """
00048         self.debug_subscriber = None
00049         self.joint_state_subscriber = None
00050         self.record_js_callback = None
00051         self.raw_values = []
00052         self.pid_services = {}
00053         self.publishers = {}
00054         self.positions = {}
00055         self.velocities = {}
00056         self.efforts = {}
00057 
00058         self.msg_to_send = Float64()
00059         self.msg_to_send.data= 0.0
00060 
00061         #TODO: read this from parameter server
00062         self.compounds = {}
00063 
00064         joint_to_sensor_mapping = []
00065         try:
00066             joint_to_sensor_mapping = rospy.get_param("joint_to_sensor_mapping")
00067         except:
00068             rospy.logwarn("The parameter joint_to_sensor_mapping was not found, you won't be able to get the raw values from the the EtherCAT compound sensors.")
00069 
00070         for mapping in joint_to_sensor_mapping:
00071             if mapping[0] is 1:
00072                 if "THJ5" in mapping[1][0]:
00073                     self.compounds["THJ5"] = [["THJ5A", mapping[1][1]],
00074                                               ["THJ5B", mapping[2][1]]]
00075                 elif "WRJ1" in mapping[1][0]:
00076                     self.compounds["WRJ1"] = [["WRJ1A", mapping[1][1]],
00077                                               ["WRJ1B", mapping[2][1]]]
00078 
00079     def sendupdate(self, joint_name, target, controller_type = "effort"):
00080         if not self.publishers.has_key(joint_name):
00081             topic = "sh_"+ joint_name.lower() +"_"+controller_type+"_controller/command"
00082             self.publishers[joint_name] = rospy.Publisher(topic, Float64)
00083 
00084         self.msg_to_send.data = math.radians( float( target ) )
00085         self.publishers[joint_name].publish(self.msg_to_send)
00086 
00087     def get_position(self, joint_name):
00088         value = None
00089         try:
00090             value = self.positions[joint_name]
00091         except:
00092             #We check if the reason to except is that we are trying to access the joint 0
00093             #Position of the J0 is the addition of the positions of J1 and J2
00094             m = re.match("(?P<finger>\w{2})J0", joint_name)
00095             if m is not None:
00096                 value = self.positions[m.group("finger") + "J1"] + self.positions[m.group("finger") + "J2"]
00097             else:
00098                 raise
00099         return value
00100 
00101     def get_velocity(self, joint_name):
00102         value = None
00103         try:
00104             value = self.velocities[joint_name]
00105         except:
00106             #We check if the reason to except is that we are trying to access the joint 0
00107             m = re.match("(?P<finger>\w{2})J0", joint_name)
00108             if m is not None:
00109                 value = self.velocities[m.group("finger") + "J1"] + self.velocities[m.group("finger") + "J2"]
00110             else:
00111                 raise
00112         return value
00113 
00114     def get_effort(self, joint_name):
00115         value = None
00116         try:
00117             value = self.efforts[joint_name]
00118         except:
00119             #We check if the reason to except is that we are trying to access the joint 0
00120             #Effort of the J0 is the same as the effort of J1 and J2, so we pick J1
00121             m = re.match("(?P<finger>\w{2})J0", joint_name)
00122             if m is not None:
00123                 value = self.efforts[m.group("finger") + "J1"]
00124             else:
00125                 raise
00126         return value
00127 
00128     def start_record(self, joint_name, callback):
00129         self.record_js_callback = callback
00130 
00131     def stop_record(self, joint_name):
00132         self.callback = None
00133 
00134 
00135     def set_pid(self, joint_name, pid_parameters):
00136         """
00137         """
00138         if not self.pid_services.has_key(joint_name):
00139             service_name =  "realtime_loop/change_force_PID_"+joint_name
00140             self.pid_services[joint_name] = rospy.ServiceProxy(service_name, ForceController)
00141 
00142         self.pid_services[joint_name]( pid_parameters["max_pwm"],
00143                                        pid_parameters["sgleftref"],
00144                                        pid_parameters["sgrightref"],
00145                                        pid_parameters["f"],
00146                                        pid_parameters["p"],
00147                                        pid_parameters["i"],
00148                                        pid_parameters["d"],
00149                                        pid_parameters["imax"],
00150                                        pid_parameters["deadband"],
00151                                        pid_parameters["sign"] )
00152 
00153     def debug_callback(self, msg):
00154         self.raw_values = msg.sensors
00155 
00156     def joint_state_callback(self, msg):
00157         for name,pos,vel,effort in zip( msg.name, msg.position, msg.velocity, msg.effort ):
00158             self.positions[name] = pos
00159             self.velocities[name] = vel
00160             self.efforts[name] = effort
00161 
00162         if self.record_js_callback is not None:
00163             self.record_js_callback()
00164 
00165 
00166     def get_raw_value(self, sensor_name):
00167         value = 0.0
00168 
00169         if self.raw_values == []:
00170             rospy.logwarn("No values received from the etherCAT hand.")
00171             return -1.0
00172 
00173         try:
00174             if sensor_name in self.compounds.keys():
00175                 for sub_compound in self.compounds[sensor_name]:
00176                     index = self.sensors.index( sub_compound[0] )
00177                     value = value + ( self.raw_values[index] * sub_compound[1] )
00178             else:
00179                 index = self.sensors.index( sensor_name )
00180                 value = self.raw_values[index]
00181         except:
00182             #if the value is not found we're returning 4095
00183             value = 4095
00184         return value
00185 
00186     def get_average_raw_value(self, sensor_name, number_of_samples=10):
00187         """
00188         Get the average raw value for the given sensor, average on
00189         number_of_samples
00190         """
00191         tmp_raw_values = []
00192         for i in range(0, number_of_samples):
00193             tmp_raw_values.append( self.get_raw_value(sensor_name) )
00194             time.sleep(0.002)
00195 
00196         average = float( sum(tmp_raw_values) )/len(tmp_raw_values)
00197         return average
00198 
00199 
00200     def activate(self):
00201         #check if something is being published to those topics, otherwise
00202         # return false (the library is not activated)
00203         try:
00204             rospy.wait_for_message("debug_etherCAT_data", EthercatDebug, timeout = 0.2)
00205             rospy.wait_for_message("joint_states", JointState, timeout = 0.2)
00206         except:
00207             return False
00208 
00209         self.debug_subscriber = rospy.Subscriber("debug_etherCAT_data", EthercatDebug, self.debug_callback)
00210         self.joint_state_subscriber = rospy.Subscriber("joint_states", JointState, self.joint_state_callback)
00211         return True
00212 
00213     def activate_joint_states(self):
00214         #check if something is being published to this topic, otherwise
00215         # return false (the library is not activated)
00216         try:
00217             rospy.wait_for_message("joint_states", JointState, timeout = 0.2)
00218             self.joint_state_subscriber = rospy.Subscriber("joint_states", JointState, self.joint_state_callback)
00219         except:
00220             try:
00221                 rospy.wait_for_message("gazebo/joint_states", JointState, timeout = 0.2)
00222                 self.joint_state_subscriber = rospy.Subscriber("gazebo/joint_states", JointState, self.joint_state_callback)
00223             except:
00224                 return False
00225 
00226         return True
00227 
00228     def on_close(self):
00229         if self.debug_subscriber is not None:
00230             self.debug_subscriber.unregister()
00231             self.debug_subscriber = None
00232 
00233         if self.joint_state_subscriber is not None:
00234             self.joint_state_subscriber.unregister()
00235             self.joint_state_subscriber = None
00236 


sr_robot_lib
Author(s): Ugo Cupcic / ugo@shadowrobot.com , software@shadowrobot.com
autogenerated on Mon Oct 6 2014 07:37:50