00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 __author__ = 'Antons Rebguns'
00037 __copyright__ = 'Copyright (c) 2010-2011 Antons Rebguns'
00038
00039 __license__ = 'BSD'
00040 __maintainer__ = 'Antons Rebguns'
00041 __email__ = 'anton@email.arizona.edu'
00042
00043
00044 import math
00045
00046 import rospy
00047
00048 from dynamixel_driver.dynamixel_const import *
00049
00050 from dynamixel_controllers.srv import SetSpeed
00051 from dynamixel_controllers.srv import TorqueEnable
00052 from dynamixel_controllers.srv import SetComplianceSlope
00053 from dynamixel_controllers.srv import SetComplianceMargin
00054 from dynamixel_controllers.srv import SetCompliancePunch
00055 from dynamixel_controllers.srv import SetTorqueLimit
00056
00057 from std_msgs.msg import Float64
00058 from dynamixel_msgs.msg import MotorStateList
00059 from dynamixel_msgs.msg import JointState
00060
00061 class JointController:
00062 def __init__(self, dxl_io, controller_namespace, port_namespace):
00063 self.running = False
00064 self.dxl_io = dxl_io
00065 self.controller_namespace = controller_namespace
00066 self.port_namespace = port_namespace
00067 self.joint_name = rospy.get_param(self.controller_namespace + '/joint_name')
00068 self.joint_speed = rospy.get_param(self.controller_namespace + '/joint_speed', 1.0)
00069 self.compliance_slope = rospy.get_param(self.controller_namespace + '/joint_compliance_slope', None)
00070 self.compliance_margin = rospy.get_param(self.controller_namespace + '/joint_compliance_margin', None)
00071 self.compliance_punch = rospy.get_param(self.controller_namespace + '/joint_compliance_punch', None)
00072 self.torque_limit = rospy.get_param(self.controller_namespace + '/joint_torque_limit', None)
00073
00074 self.__ensure_limits()
00075
00076 self.speed_service = rospy.Service(self.controller_namespace + '/set_speed', SetSpeed, self.process_set_speed)
00077 self.torque_service = rospy.Service(self.controller_namespace + '/torque_enable', TorqueEnable, self.process_torque_enable)
00078 self.compliance_slope_service = rospy.Service(self.controller_namespace + '/set_compliance_slope', SetComplianceSlope, self.process_set_compliance_slope)
00079 self.compliance_marigin_service = rospy.Service(self.controller_namespace + '/set_compliance_margin', SetComplianceMargin, self.process_set_compliance_margin)
00080 self.compliance_punch_service = rospy.Service(self.controller_namespace + '/set_compliance_punch', SetCompliancePunch, self.process_set_compliance_punch)
00081 self.torque_limit_service = rospy.Service(self.controller_namespace + '/set_torque_limit', SetTorqueLimit, self.process_set_torque_limit)
00082
00083 def __ensure_limits(self):
00084 if self.compliance_slope is not None:
00085 if self.compliance_slope < DXL_MIN_COMPLIANCE_SLOPE: self.compliance_slope = DXL_MIN_COMPLIANCE_SLOPE
00086 elif self.compliance_slope > DXL_MAX_COMPLIANCE_SLOPE: self.compliance_slope = DXL_MAX_COMPLIANCE_SLOPE
00087 else: self.compliance_slope = int(self.compliance_slope)
00088
00089 if self.compliance_margin is not None:
00090 if self.compliance_margin < DXL_MIN_COMPLIANCE_MARGIN: self.compliance_margin = DXL_MIN_COMPLIANCE_MARGIN
00091 elif self.compliance_margin > DXL_MAX_COMPLIANCE_MARGIN: self.compliance_margin = DXL_MAX_COMPLIANCE_MARGIN
00092 else: self.compliance_margin = int(self.compliance_margin)
00093
00094 if self.compliance_punch is not None:
00095 if self.compliance_punch < DXL_MIN_PUNCH: self.compliance_punch = DXL_MIN_PUNCH
00096 elif self.compliance_punch > DXL_MAX_PUNCH: self.compliance_punch = DXL_MAX_PUNCH
00097 else: self.compliance_punch = int(self.compliance_punch)
00098
00099 if self.torque_limit is not None:
00100 if self.torque_limit < 0: self.torque_limit = 0.0
00101 elif self.torque_limit > 1: self.torque_limit = 1.0
00102
00103 def initialize(self):
00104 raise NotImplementedError
00105
00106 def start(self):
00107 self.running = True
00108 self.joint_state_pub = rospy.Publisher(self.controller_namespace + '/state', JointState, queue_size=1)
00109 self.command_sub = rospy.Subscriber(self.controller_namespace + '/command', Float64, self.process_command)
00110 self.motor_states_sub = rospy.Subscriber('motor_states/%s' % self.port_namespace, MotorStateList, self.process_motor_states)
00111
00112 def stop(self):
00113 self.running = False
00114 self.joint_state_pub.unregister()
00115 self.motor_states_sub.unregister()
00116 self.command_sub.unregister()
00117 self.speed_service.shutdown('normal shutdown')
00118 self.torque_service.shutdown('normal shutdown')
00119 self.compliance_slope_service.shutdown('normal shutdown')
00120
00121 def set_torque_enable(self, torque_enable):
00122 raise NotImplementedError
00123
00124 def set_speed(self, speed):
00125 raise NotImplementedError
00126
00127 def set_compliance_slope(self, slope):
00128 raise NotImplementedError
00129
00130 def set_compliance_margin(self, margin):
00131 raise NotImplementedError
00132
00133 def set_compliance_punch(self, punch):
00134 raise NotImplementedError
00135
00136 def set_torque_limit(self, max_torque):
00137 raise NotImplementedError
00138
00139 def process_set_speed(self, req):
00140 self.set_speed(req.speed)
00141 return []
00142
00143 def process_torque_enable(self, req):
00144 self.set_torque_enable(req.torque_enable)
00145 return []
00146
00147 def process_set_compliance_slope(self, req):
00148 self.set_compliance_slope(req.slope)
00149 return []
00150
00151 def process_set_compliance_margin(self, req):
00152 self.set_compliance_margin(req.margin)
00153 return []
00154
00155 def process_set_compliance_punch(self, req):
00156 self.set_compliance_punch(req.punch)
00157 return []
00158
00159 def process_set_torque_limit(self, req):
00160 self.set_torque_limit(req.torque_limit)
00161 return []
00162
00163 def process_motor_states(self, state_list):
00164 raise NotImplementedError
00165
00166 def process_command(self, msg):
00167 raise NotImplementedError
00168
00169 def rad_to_raw(self, angle, initial_position_raw, flipped, encoder_ticks_per_radian):
00170 """ angle is in radians """
00171
00172 angle_raw = angle * encoder_ticks_per_radian
00173
00174 return int(round(initial_position_raw - angle_raw if flipped else initial_position_raw + angle_raw))
00175
00176 def raw_to_rad(self, raw, initial_position_raw, flipped, radians_per_encoder_tick):
00177 return (initial_position_raw - raw if flipped else raw - initial_position_raw) * radians_per_encoder_tick
00178