$search
00001 #!/usr/bin/env python 00002 00003 """ 00004 servo_controller.py: classes for servo interaction 00005 Copyright (c) 2011 Vanadium Labs LLC. All right reserved. 00006 00007 Redistribution and use in source and binary forms, with or without 00008 modification, are permitted provided that the following conditions are met: 00009 * Redistributions of source code must retain the above copyright 00010 notice, this list of conditions and the following disclaimer. 00011 * Redistributions in binary form must reproduce the above copyright 00012 notice, this list of conditions and the following disclaimer in the 00013 documentation and/or other materials provided with the distribution. 00014 * Neither the name of Vanadium Labs LLC nor the names of its 00015 contributors may be used to endorse or promote products derived 00016 from this software without specific prior written permission. 00017 00018 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 00019 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00020 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00021 DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT, 00022 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 00023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 00024 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00025 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE 00026 OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 00027 ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00028 """ 00029 00030 import rospy 00031 00032 from math import radians 00033 00034 from std_msgs.msg import Float64 00035 from arbotix_msgs.srv import * 00036 from diagnostic_msgs.msg import * 00037 00038 from ax12 import * 00039 from joints import * 00040 00041 class DynamixelServo(Joint): 00042 00043 def __init__(self, device, name, ns="~joints"): 00044 Joint.__init__(self, device, name) 00045 n = ns+"/"+name+"/" 00046 00047 self.id = int(rospy.get_param(n+"id")) 00048 self.ticks = rospy.get_param(n+"ticks", 1024) 00049 self.neutral = rospy.get_param(n+"neutral", self.ticks/2) 00050 if self.ticks == 4096: 00051 self.range = 360.0 00052 else: 00053 self.range = 300.0 00054 self.range = rospy.get_param(n+"range", self.range) 00055 self.rad_per_tick = radians(self.range)/self.ticks 00056 00057 # TODO: load these from URDF 00058 self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0)) 00059 self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0)) 00060 self.max_speed = radians(rospy.get_param(n+"max_speed",684.0)) 00061 # max speed = 114 rpm - 684 deg/s 00062 self.invert = rospy.get_param(n+"invert",False) 00063 self.readable = rospy.get_param(n+"readable",True) 00064 00065 self.status = "OK" 00066 self.level = DiagnosticStatus.OK 00067 00068 self.dirty = False # newly updated position? 00069 self.position = 0.0 # current position, as returned by servo (radians) 00070 self.desired = 0.0 # desired position (radians) 00071 self.last_cmd = 0.0 # last position sent (radians) 00072 self.velocity = 0.0 # moving speed 00073 self.relaxed = True # are we not under torque control? 00074 self.last = rospy.Time.now() 00075 00076 self.reads = 0.0 # number of reads 00077 self.errors = 0 # number of failed reads 00078 self.total_reads = 0.0 00079 self.total_errors = [0.0] 00080 00081 self.voltage = 0.0 00082 self.temperature = 0.0 00083 00084 # ROS interfaces 00085 rospy.Subscriber(name+'/command', Float64, self.commandCb) 00086 rospy.Service(name+'/relax', Relax, self.relaxCb) 00087 00088 def interpolate(self, frame): 00089 """ Get the new position to move to, in ticks. """ 00090 if self.dirty: 00091 # compute command, limit velocity 00092 cmd = self.desired - self.last_cmd 00093 if cmd > self.max_speed/frame: 00094 cmd = self.max_speed/frame 00095 elif cmd < -self.max_speed/frame: 00096 cmd = -self.max_speed/frame 00097 # compute angle, apply limits 00098 ticks = self.angleToTicks(self.last_cmd + cmd) 00099 self.last_cmd = self.ticksToAngle(ticks) 00100 self.speed = cmd*frame 00101 # cap movement 00102 if self.last_cmd == self.desired: 00103 self.dirty = False 00104 if self.device.fake: 00105 self.position = self.last_cmd 00106 return None 00107 return int(ticks) 00108 else: 00109 return None 00110 00111 def setCurrentFeedback(self, reading): 00112 """ Update angle in radians by reading from servo, or by 00113 using position passed in from a sync read (in ticks). """ 00114 if reading > -1 and reading < self.ticks: # check validity 00115 self.reads += 1 00116 self.total_reads += 1 00117 last_angle = self.position 00118 self.position = self.ticksToAngle(reading) 00119 # update velocity estimate 00120 t = rospy.Time.now() 00121 self.velocity = (self.position - last_angle)/((t - self.last).to_nsec()/1000000000.0) 00122 self.last = t 00123 else: 00124 rospy.logdebug("Invalid read of servo: id " + str(self.id) + ", value " + str(reading)) 00125 self.errors += 1 00126 self.total_reads += 1 00127 return 00128 if self.relaxed: 00129 self.last_cmd = self.position 00130 00131 def setControlOutput(self, position): 00132 """ Set the position that controller is moving to. 00133 Returns output value in ticks. """ 00134 ticks = self.angleToTicks(position) 00135 self.desired = position 00136 self.dirty = True 00137 self.relaxed = False 00138 return int(ticks) 00139 00140 def getDiagnostics(self): 00141 """ Get a diagnostics status. """ 00142 msg = DiagnosticStatus() 00143 msg.name = self.name 00144 if self.temperature > 60: # TODO: read this value from eeprom 00145 self.status = "OVERHEATED, SHUTDOWN" 00146 self.level = DiagnosticStatus.ERROR 00147 elif self.temperature > 50 and self.status != "OVERHEATED, SHUTDOWN": 00148 self.status = "OVERHEATING" 00149 self.level = DiagnosticStatus.WARN 00150 elif self.status != "OVERHEATED, SHUTDOWN": 00151 self.status = "OK" 00152 self.level = DiagnosticStatus.OK 00153 msg.level = self.level 00154 msg.message = self.status 00155 msg.values.append(KeyValue("Position", str(self.position))) 00156 msg.values.append(KeyValue("Temperature", str(self.temperature))) 00157 msg.values.append(KeyValue("Voltage", str(self.voltage))) 00158 if self.reads + self.errors > 100: 00159 self.total_errors.append((self.errors*100.0)/(self.reads+self.errors)) 00160 if len(self.total_errors) > 10: 00161 self.total_errors = self.total_errors[-10:] 00162 self.reads = 0 00163 self.errors = 0 00164 msg.values.append(KeyValue("Reads", str(self.total_reads))) 00165 msg.values.append(KeyValue("Error Rate", str(sum(self.total_errors)/len(self.total_errors))+"%" )) 00166 if self.relaxed: 00167 msg.values.append(KeyValue("Torque", "OFF")) 00168 else: 00169 msg.values.append(KeyValue("Torque", "ON")) 00170 return msg 00171 00172 def angleToTicks(self, angle): 00173 """ Convert an angle to ticks, applying limits. """ 00174 ticks = self.neutral + (angle/self.rad_per_tick) 00175 if self.invert: 00176 ticks = self.neutral - (angle/self.rad_per_tick) 00177 if ticks >= self.ticks: 00178 return self.ticks-1.0 00179 if ticks < 0: 00180 return 0 00181 return ticks 00182 00183 def ticksToAngle(self, ticks): 00184 """ Convert an ticks to angle, applying limits. """ 00185 angle = (ticks - self.neutral) * self.rad_per_tick 00186 if self.invert: 00187 angle = -1.0 * angle 00188 return angle 00189 00190 def relaxCb(self, req): 00191 """ Turn off servo torque, so that it is pose-able. """ 00192 if not self.device.fake: 00193 self.device.disableTorque(self.id) 00194 self.dirty = False 00195 self.relaxed = True 00196 return RelaxResponse() 00197 00198 def commandCb(self, req): 00199 """ Float64 style command input. """ 00200 if self.controller and self.controller.active(): 00201 # Under and action control, do not interfere 00202 return 00203 elif self.desired != req.data or self.relaxed: 00204 self.dirty = True 00205 self.relaxed = False 00206 self.desired = req.data 00207 00208 class HobbyServo(Joint): 00209 00210 def __init__(self, device, name, ns="~joints"): 00211 Joint.__init__(self, device, name) 00212 n = ns+"/"+name+"/" 00213 00214 self.id = int(rospy.get_param(n+"id")) 00215 self.ticks = rospy.get_param(n+"ticks", 2000) 00216 self.neutral = rospy.get_param(n+"neutral", 1500) 00217 self.range = rospy.get_param(n+"range", 180) 00218 self.rad_per_tick = radians(self.range)/self.ticks 00219 00220 # TODO: load these from URDF 00221 self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0)) 00222 self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0)) 00223 self.max_speed = radians(rospy.get_param(n+"max_speed",90.0)) 00224 00225 self.invert = rospy.get_param(n+"invert",False) 00226 00227 self.dirty = False # newly updated position? 00228 self.position = 0.0 # current position, as returned by servo (radians) 00229 self.desired = 0.0 # desired position (radians) 00230 self.last_cmd = 0.0 # last position sent (radians) 00231 self.velocity = 0.0 # moving speed 00232 self.last = rospy.Time.now() 00233 00234 # ROS interfaces 00235 rospy.Subscriber(name+'/command', Float64, self.commandCb) 00236 00237 def interpolate(self, frame): 00238 """ Get the new position to move to, in ticks. """ 00239 if self.dirty: 00240 # compute command, limit velocity 00241 cmd = self.desired - self.last_cmd 00242 if cmd > self.max_speed/frame: 00243 cmd = self.max_speed/frame 00244 elif cmd < -self.max_speed/frame: 00245 cmd = -self.max_speed/frame 00246 # compute angle, apply limits 00247 ticks = self.angleToTicks(self.last_cmd + cmd) 00248 self.last_cmd = self.ticksToAngle(ticks) 00249 self.speed = cmd*frame 00250 # cap movement 00251 if self.last_cmd == self.desired: 00252 self.dirty = False 00253 if self.device.fake: 00254 self.position = self.last_cmd 00255 return None 00256 return int(ticks) 00257 else: 00258 return None 00259 00260 def setCurrentFeedback(self, raw_data): 00261 """ Update angle in radians by reading from servo, or by 00262 using position passed in from a sync read (in ticks). """ 00263 return None 00264 00265 def setControlOutput(self, position): 00266 """ Set the position that controller is moving to. 00267 Returns output value in ticks. """ 00268 ticks = self.angleToTicks(position) 00269 self.desired = position 00270 self.dirty = True 00271 return int(ticks) 00272 00273 def getDiagnostics(self): 00274 """ Get a diagnostics status. """ 00275 msg = DiagnosticStatus() 00276 msg.name = self.name 00277 msg.level = DiagnosticStatus.OK 00278 msg.message = "OK" 00279 msg.values.append(KeyValue("Position", str(self.position))) 00280 return msg 00281 00282 def angleToTicks(self, angle): 00283 """ Convert an angle to ticks, applying limits. """ 00284 ticks = self.neutral + (angle/self.rad_per_tick) 00285 if self.invert: 00286 ticks = self.neutral - (angle/self.rad_per_tick) 00287 if ticks >= self.ticks: 00288 return self.ticks-1.0 00289 if ticks < 0: 00290 return 0 00291 return ticks 00292 00293 def ticksToAngle(self, ticks): 00294 """ Convert an ticks to angle, applying limits. """ 00295 angle = (ticks - self.neutral) * self.rad_per_tick 00296 if self.invert: 00297 angle = -1.0 * angle 00298 return angle 00299 00300 def commandCb(self, req): 00301 """ Float64 style command input. """ 00302 if self.controller and self.controller.active(): 00303 # Under and action control, do not interfere 00304 return 00305 else: 00306 self.dirty = True 00307 self.desired = req.data 00308 00309 00310 from controllers import * 00311 00312 class ServoController(Controller): 00313 00314 def __init__(self, device, name): 00315 Controller.__init__(self, device, name) 00316 self.dynamixels = list() 00317 self.hobbyservos = list() 00318 self.iter = 0 00319 00320 # steal some servos 00321 for joint in device.joints.values(): 00322 if isinstance(joint, DynamixelServo): 00323 self.dynamixels.append(joint) 00324 elif isinstance(joint, HobbyServo): 00325 self.hobbyservos.append(joint) 00326 00327 self.w_delta = rospy.Duration(1.0/rospy.get_param("~write_rate", 10.0)) 00328 self.w_next = rospy.Time.now() + self.w_delta 00329 00330 self.r_delta = rospy.Duration(1.0/rospy.get_param("~read_rate", 10.0)) 00331 self.r_next = rospy.Time.now() + self.r_delta 00332 00333 def update(self): 00334 """ Read servo positions, update them. """ 00335 if rospy.Time.now() > self.r_next and not self.fake: 00336 if self.device.use_sync_read: 00337 # arbotix/servostik/wifi board sync_read 00338 synclist = list() 00339 for joint in self.dynamixels: 00340 if joint.readable: 00341 synclist.append(joint.id) 00342 if len(synclist) > 0: 00343 val = self.device.syncRead(synclist, P_PRESENT_POSITION_L, 2) 00344 if val: 00345 for joint in self.dynamixels: 00346 try: 00347 i = synclist.index(joint.id)*2 00348 joint.setCurrentFeedback(val[i]+(val[i+1]<<8)) 00349 except: 00350 # not a readable servo 00351 continue 00352 else: 00353 # direct connection, or other hardware with no sync_read capability 00354 for joint in self.dynamixels: 00355 joint.setCurrentFeedback(self.device.getPosition(joint.id)) 00356 self.r_next = rospy.Time.now() + self.r_delta 00357 00358 if rospy.Time.now() > self.w_next: 00359 if self.device.use_sync_write and not self.fake: 00360 syncpkt = list() 00361 for joint in self.dynamixels: 00362 v = joint.interpolate(1.0/self.w_delta.to_sec()) 00363 if v != None: # if was dirty 00364 syncpkt.append([joint.id,int(v)%256,int(v)>>8]) 00365 if len(syncpkt) > 0: 00366 self.device.syncWrite(P_GOAL_POSITION_L,syncpkt) 00367 else: 00368 for joint in self.dynamixels: 00369 v = joint.interpolate(1.0/self.w_delta.to_sec()) 00370 if v != None: # if was dirty 00371 self.device.setPosition(joint.id, int(v)) 00372 for joint in self.hobbyservos: 00373 v = joint.interpolate(1.0/self.w_delta.to_sec()) 00374 if v != None: # if it was dirty 00375 self.device.setServo(joint.id, v) 00376 self.w_next = rospy.Time.now() + self.w_delta 00377 00378 def getDiagnostics(self): 00379 """ Update status of servos (voltages, temperatures). """ 00380 if self.iter % 5 == 0 and not self.fake: 00381 if self.device.use_sync_read: 00382 # arbotix/servostik/wifi board sync_read 00383 synclist = list() 00384 for joint in self.dynamixels: 00385 if joint.readable: 00386 synclist.append(joint.id) 00387 if len(synclist) > 0: 00388 val = self.device.syncRead(synclist, P_PRESENT_VOLTAGE, 2) 00389 if val: 00390 for joint in self.dynamixels: 00391 try: 00392 i = synclist.index(joint.id)*2 00393 if val[i] < 250: 00394 joint.voltage = val[i]/10.0 00395 if val[i+1] < 100: 00396 joint.temperature = val[i+1] 00397 except: 00398 # not a readable servo 00399 continue 00400 else: 00401 # direct connection, or other hardware with no sync_read capability 00402 for joint in self.dynamixels: 00403 if joint.readable: 00404 val = self.device.read(joint.id, P_PRESENT_VOLTAGE, 2) 00405 try: 00406 joint.voltage = val[0] 00407 joint.temperature = val[1] 00408 except: 00409 continue 00410 self.iter += 1 00411 return None 00412