00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 __author__ = 'Antons Rebguns'
00037 __copyright__ = 'Copyright (c) 2010-2011 Antons Rebguns'
00038
00039 __license__ = 'BSD'
00040 __maintainer__ = 'Antons Rebguns'
00041 __email__ = 'anton@email.arizona.edu'
00042
00043
00044 import math
00045
00046 import roslib
00047 roslib.load_manifest('dynamixel_controllers')
00048
00049 import rospy
00050
00051 from dynamixel_driver.dynamixel_const import *
00052
00053 from dynamixel_controllers.srv import SetSpeed
00054 from dynamixel_controllers.srv import TorqueEnable
00055 from dynamixel_controllers.srv import SetComplianceSlope
00056 from dynamixel_controllers.srv import SetComplianceMargin
00057 from dynamixel_controllers.srv import SetCompliancePunch
00058 from dynamixel_controllers.srv import SetTorqueLimit
00059
00060 from std_msgs.msg import Float64
00061 from dynamixel_msgs.msg import MotorStateList
00062 from dynamixel_msgs.msg import JointState
00063
00064 class JointController:
00065 def __init__(self, out_cb, param_path, port_name):
00066 self.running = False
00067 self.send_packet_callback = out_cb
00068 self.topic_name = param_path
00069 self.port_namespace = port_name[port_name.rfind('/') + 1:]
00070 self.joint_name = rospy.get_param(self.topic_name + '/joint_name')
00071 self.joint_speed = rospy.get_param(self.topic_name + '/joint_speed', 1.0)
00072 self.compliance_slope = rospy.get_param(self.topic_name + '/joint_compliance_slope', None)
00073 self.compliance_margin = rospy.get_param(self.topic_name + '/joint_compliance_margin', None)
00074 self.compliance_punch = rospy.get_param(self.topic_name + '/joint_compliance_punch', None)
00075 self.torque_limit = rospy.get_param(self.topic_name + '/joint_torque_limit', None)
00076
00077 self.__ensure_limits()
00078
00079 self.speed_service = rospy.Service(self.topic_name + '/set_speed', SetSpeed, self.process_set_speed)
00080 self.torque_service = rospy.Service(self.topic_name + '/torque_enable', TorqueEnable, self.process_torque_enable)
00081 self.compliance_slope_service = rospy.Service(self.topic_name + '/set_compliance_slope', SetComplianceSlope, self.process_set_compliance_slope)
00082 self.compliance_marigin_service = rospy.Service(self.topic_name + '/set_compliance_margin', SetComplianceMargin, self.process_set_compliance_margin)
00083 self.compliance_punch_service = rospy.Service(self.topic_name + '/set_compliance_punch', SetCompliancePunch, self.process_set_compliance_punch)
00084 self.torque_limit_service = rospy.Service(self.topic_name + '/set_torque_limit', SetTorqueLimit, self.process_set_torque_limit)
00085
00086 def __ensure_limits(self):
00087 if self.compliance_slope is not None:
00088 if self.compliance_slope < DXL_MIN_COMPLIANCE_SLOPE: self.compliance_slope = DXL_MIN_COMPLIANCE_SLOPE
00089 elif self.compliance_slope > DXL_MAX_COMPLIANCE_SLOPE: self.compliance_slope = DXL_MAX_COMPLIANCE_SLOPE
00090 else: self.compliance_slope = int(self.compliance_slope)
00091
00092 if self.compliance_margin is not None:
00093 if self.compliance_margin < DXL_MIN_COMPLIANCE_MARGIN: self.compliance_margin = DXL_MIN_COMPLIANCE_MARGIN
00094 elif self.compliance_margin > DXL_MAX_COMPLIANCE_MARGIN: self.compliance_margin = DXL_MAX_COMPLIANCE_MARGIN
00095 else: self.compliance_margin = int(self.compliance_margin)
00096
00097 if self.compliance_punch is not None:
00098 if self.compliance_punch < DXL_MIN_PUNCH: self.compliance_punch = DXL_MIN_PUNCH
00099 elif self.compliance_punch > DXL_MAX_PUNCH: self.compliance_punch = DXL_MAX_PUNCH
00100 else: self.compliance_punch = int(self.compliance_punch)
00101
00102 if self.torque_limit is not None:
00103 if self.torque_limit < 0: self.torque_limit = 0.0
00104 elif self.torque_limit > 1: self.torque_limit = 1.0
00105
00106 def initialize(self):
00107 raise NotImplementedError
00108
00109 def start(self):
00110 self.running = True
00111 self.joint_state_pub = rospy.Publisher(self.topic_name + '/state', JointState)
00112 self.command_sub = rospy.Subscriber(self.topic_name + '/command', Float64, self.process_command)
00113 self.motor_states_sub = rospy.Subscriber('motor_states/%s' % self.port_namespace, MotorStateList, self.process_motor_states)
00114
00115 def stop(self):
00116 self.running = False
00117 self.joint_state_pub.unregister()
00118 self.motor_states_sub.unregister()
00119 self.command_sub.unregister()
00120 self.speed_service.shutdown('normal shutdown')
00121 self.torque_service.shutdown('normal shutdown')
00122 self.compliance_slope_service.shutdown('normal shutdown')
00123
00124 def set_torque_enable(self, torque_enable):
00125 raise NotImplementedError
00126
00127 def set_speed(self, speed):
00128 raise NotImplementedError
00129
00130 def set_compliance_slope(self, slope):
00131 raise NotImplementedError
00132
00133 def set_compliance_margin(self, margin):
00134 raise NotImplementedError
00135
00136 def set_compliance_punch(self, punch):
00137 raise NotImplementedError
00138
00139 def set_torque_limit(self, max_torque):
00140 raise NotImplementedError
00141
00142 def process_set_speed(self, req):
00143 self.set_speed(req.speed)
00144 return []
00145
00146 def process_torque_enable(self, req):
00147 self.set_torque_enable(req.torque_enable)
00148 return []
00149
00150 def process_set_compliance_slope(self, req):
00151 self.set_compliance_slope(req.slope)
00152 return []
00153
00154 def process_set_compliance_margin(self, req):
00155 self.set_compliance_margin(req.margin)
00156 return []
00157
00158 def process_set_compliance_punch(self, req):
00159 self.set_compliance_punch(req.punch)
00160 return []
00161
00162 def process_set_torque_limit(self, req):
00163 self.set_torque_limit(req.torque_limit)
00164 return []
00165
00166 def process_motor_states(self, state_list):
00167 raise NotImplementedError
00168
00169 def process_command(self, msg):
00170 raise NotImplementedError
00171
00172 def rad_to_raw(self, angle, initial_position_raw, flipped, encoder_ticks_per_radian):
00173 """ angle is in radians """
00174
00175 angle_raw = angle * encoder_ticks_per_radian
00176
00177 return int(round(initial_position_raw - angle_raw if flipped else initial_position_raw + angle_raw))
00178
00179 def raw_to_rad(self, raw, initial_position_raw, flipped, radians_per_encoder_tick):
00180 return (initial_position_raw - raw if flipped else raw - initial_position_raw) * radians_per_encoder_tick
00181