00001
00002
00003 """
00004 servo_controller.py: classes for servo interaction
00005 Copyright (c) 2011 Vanadium Labs LLC. All right reserved.
00006
00007 Redistribution and use in source and binary forms, with or without
00008 modification, are permitted provided that the following conditions are met:
00009 * Redistributions of source code must retain the above copyright
00010 notice, this list of conditions and the following disclaimer.
00011 * Redistributions in binary form must reproduce the above copyright
00012 notice, this list of conditions and the following disclaimer in the
00013 documentation and/or other materials provided with the distribution.
00014 * Neither the name of Vanadium Labs LLC nor the names of its
00015 contributors may be used to endorse or promote products derived
00016 from this software without specific prior written permission.
00017
00018 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00019 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00020 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00021 DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT,
00022 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
00024 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00025 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
00026 OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
00027 ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00028 """
00029
00030 import rospy
00031
00032 from math import radians
00033
00034 from std_msgs.msg import Float64
00035 from arbotix_msgs.srv import *
00036 from diagnostic_msgs.msg import *
00037
00038 from ax12 import *
00039 from joints import *
00040
00041 class DynamixelServo(Joint):
00042
00043 def __init__(self, device, name, ns="~joints"):
00044 Joint.__init__(self, device, name)
00045 n = ns+"/"+name+"/"
00046
00047 self.id = int(rospy.get_param(n+"id"))
00048 self.ticks = rospy.get_param(n+"ticks", 1024)
00049 self.neutral = rospy.get_param(n+"neutral", self.ticks/2)
00050 if self.ticks == 4096:
00051 self.range = 360.0
00052 else:
00053 self.range = 300.0
00054 self.range = rospy.get_param(n+"range", self.range)
00055 self.rad_per_tick = radians(self.range)/self.ticks
00056
00057
00058 self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0))
00059 self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0))
00060 self.max_speed = radians(rospy.get_param(n+"max_speed",684.0))
00061
00062 self.invert = rospy.get_param(n+"invert",False)
00063 self.readable = rospy.get_param(n+"readable",True)
00064
00065 self.status = "OK"
00066 self.level = DiagnosticStatus.OK
00067
00068 self.dirty = False
00069 self.position = 0.0
00070 self.desired = 0.0
00071 self.last_cmd = 0.0
00072 self.velocity = 0.0
00073 self.relaxed = True
00074 self.last = rospy.Time.now()
00075
00076 self.reads = 0.0
00077 self.errors = 0
00078 self.total_reads = 0.0
00079 self.total_errors = [0.0]
00080
00081 self.voltage = 0.0
00082 self.temperature = 0.0
00083
00084
00085 rospy.Subscriber(name+'/command', Float64, self.commandCb)
00086 rospy.Service(name+'/relax', Relax, self.relaxCb)
00087
00088 def interpolate(self, frame):
00089 """ Get the new position to move to, in ticks. """
00090 if self.dirty:
00091
00092 cmd = self.desired - self.last_cmd
00093 if cmd > self.max_speed/frame:
00094 cmd = self.max_speed/frame
00095 elif cmd < -self.max_speed/frame:
00096 cmd = -self.max_speed/frame
00097
00098 ticks = self.angleToTicks(self.last_cmd + cmd)
00099 self.last_cmd = self.ticksToAngle(ticks)
00100 self.speed = cmd*frame
00101
00102 if self.last_cmd == self.desired:
00103 self.dirty = False
00104 if self.device.fake:
00105 self.position = self.last_cmd
00106 return None
00107 return int(ticks)
00108 else:
00109 return None
00110
00111 def setCurrentFeedback(self, reading):
00112 """ Update angle in radians by reading from servo, or by
00113 using position passed in from a sync read (in ticks). """
00114 if reading > -1 and reading < self.ticks:
00115 self.reads += 1
00116 self.total_reads += 1
00117 last_angle = self.position
00118 self.position = self.ticksToAngle(reading)
00119
00120 t = rospy.Time.now()
00121 self.velocity = (self.position - last_angle)/((t - self.last).to_nsec()/1000000000.0)
00122 self.last = t
00123 else:
00124 rospy.logdebug("Invalid read of servo: id " + str(self.id) + ", value " + str(reading))
00125 self.errors += 1
00126 self.total_reads += 1
00127 return
00128 if self.relaxed:
00129 self.last_cmd = self.position
00130
00131 def setControlOutput(self, position):
00132 """ Set the position that controller is moving to.
00133 Returns output value in ticks. """
00134 ticks = self.angleToTicks(position)
00135 self.desired = position
00136 self.dirty = True
00137 self.relaxed = False
00138 return int(ticks)
00139
00140 def getDiagnostics(self):
00141 """ Get a diagnostics status. """
00142 msg = DiagnosticStatus()
00143 msg.name = self.name
00144 if self.temperature > 60:
00145 self.status = "OVERHEATED, SHUTDOWN"
00146 self.level = DiagnosticStatus.ERROR
00147 elif self.temperature > 50 and self.status != "OVERHEATED, SHUTDOWN":
00148 self.status = "OVERHEATING"
00149 self.level = DiagnosticStatus.WARN
00150 elif self.status != "OVERHEATED, SHUTDOWN":
00151 self.status = "OK"
00152 self.level = DiagnosticStatus.OK
00153 msg.level = self.level
00154 msg.message = self.status
00155 msg.values.append(KeyValue("Position", str(self.position)))
00156 msg.values.append(KeyValue("Temperature", str(self.temperature)))
00157 msg.values.append(KeyValue("Voltage", str(self.voltage)))
00158 if self.reads + self.errors > 100:
00159 self.total_errors.append((self.errors*100.0)/(self.reads+self.errors))
00160 if len(self.total_errors) > 10:
00161 self.total_errors = self.total_errors[-10:]
00162 self.reads = 0
00163 self.errors = 0
00164 msg.values.append(KeyValue("Reads", str(self.total_reads)))
00165 msg.values.append(KeyValue("Error Rate", str(sum(self.total_errors)/len(self.total_errors))+"%" ))
00166 if self.relaxed:
00167 msg.values.append(KeyValue("Torque", "OFF"))
00168 else:
00169 msg.values.append(KeyValue("Torque", "ON"))
00170 return msg
00171
00172 def angleToTicks(self, angle):
00173 """ Convert an angle to ticks, applying limits. """
00174 ticks = self.neutral + (angle/self.rad_per_tick)
00175 if self.invert:
00176 ticks = self.neutral - (angle/self.rad_per_tick)
00177 if ticks >= self.ticks:
00178 return self.ticks-1.0
00179 if ticks < 0:
00180 return 0
00181 return ticks
00182
00183 def ticksToAngle(self, ticks):
00184 """ Convert an ticks to angle, applying limits. """
00185 angle = (ticks - self.neutral) * self.rad_per_tick
00186 if self.invert:
00187 angle = -1.0 * angle
00188 return angle
00189
00190 def relaxCb(self, req):
00191 """ Turn off servo torque, so that it is pose-able. """
00192 if not self.device.fake:
00193 self.device.disableTorque(self.id)
00194 self.dirty = False
00195 self.relaxed = True
00196 return RelaxResponse()
00197
00198 def commandCb(self, req):
00199 """ Float64 style command input. """
00200 if self.controller and self.controller.active():
00201
00202 return
00203 elif self.desired != req.data or self.relaxed:
00204 self.dirty = True
00205 self.relaxed = False
00206 self.desired = req.data
00207
00208 class HobbyServo(Joint):
00209
00210 def __init__(self, device, name, ns="~joints"):
00211 Joint.__init__(self, device, name)
00212 n = ns+"/"+name+"/"
00213
00214 self.id = int(rospy.get_param(n+"id"))
00215 self.ticks = rospy.get_param(n+"ticks", 2000)
00216 self.neutral = rospy.get_param(n+"neutral", 1500)
00217 self.range = rospy.get_param(n+"range", 180)
00218 self.rad_per_tick = radians(self.range)/self.ticks
00219
00220
00221 self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0))
00222 self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0))
00223 self.max_speed = radians(rospy.get_param(n+"max_speed",90.0))
00224
00225 self.invert = rospy.get_param(n+"invert",False)
00226
00227 self.dirty = False
00228 self.position = 0.0
00229 self.desired = 0.0
00230 self.last_cmd = 0.0
00231 self.velocity = 0.0
00232 self.last = rospy.Time.now()
00233
00234
00235 rospy.Subscriber(name+'/command', Float64, self.commandCb)
00236
00237 def interpolate(self, frame):
00238 """ Get the new position to move to, in ticks. """
00239 if self.dirty:
00240
00241 cmd = self.desired - self.last_cmd
00242 if cmd > self.max_speed/frame:
00243 cmd = self.max_speed/frame
00244 elif cmd < -self.max_speed/frame:
00245 cmd = -self.max_speed/frame
00246
00247 ticks = self.angleToTicks(self.last_cmd + cmd)
00248 self.last_cmd = self.ticksToAngle(ticks)
00249 self.speed = cmd*frame
00250
00251 if self.last_cmd == self.desired:
00252 self.dirty = False
00253 if self.device.fake:
00254 self.position = self.last_cmd
00255 return None
00256 return int(ticks)
00257 else:
00258 return None
00259
00260 def setCurrentFeedback(self, raw_data):
00261 """ Update angle in radians by reading from servo, or by
00262 using position passed in from a sync read (in ticks). """
00263 return None
00264
00265 def setControlOutput(self, position):
00266 """ Set the position that controller is moving to.
00267 Returns output value in ticks. """
00268 ticks = self.angleToTicks(position)
00269 self.desired = position
00270 self.dirty = True
00271 return int(ticks)
00272
00273 def getDiagnostics(self):
00274 """ Get a diagnostics status. """
00275 msg = DiagnosticStatus()
00276 msg.name = self.name
00277 msg.level = DiagnosticStatus.OK
00278 msg.message = "OK"
00279 msg.values.append(KeyValue("Position", str(self.position)))
00280 return msg
00281
00282 def angleToTicks(self, angle):
00283 """ Convert an angle to ticks, applying limits. """
00284 ticks = self.neutral + (angle/self.rad_per_tick)
00285 if self.invert:
00286 ticks = self.neutral - (angle/self.rad_per_tick)
00287 if ticks >= self.ticks:
00288 return self.ticks-1.0
00289 if ticks < 0:
00290 return 0
00291 return ticks
00292
00293 def ticksToAngle(self, ticks):
00294 """ Convert an ticks to angle, applying limits. """
00295 angle = (ticks - self.neutral) * self.rad_per_tick
00296 if self.invert:
00297 angle = -1.0 * angle
00298 return angle
00299
00300 def commandCb(self, req):
00301 """ Float64 style command input. """
00302 if self.controller and self.controller.active():
00303
00304 return
00305 else:
00306 self.dirty = True
00307 self.desired = req.data
00308
00309
00310 from controllers import *
00311
00312 class ServoController(Controller):
00313
00314 def __init__(self, device, name):
00315 Controller.__init__(self, device, name)
00316 self.dynamixels = list()
00317 self.hobbyservos = list()
00318 self.iter = 0
00319
00320
00321 for joint in device.joints.values():
00322 if isinstance(joint, DynamixelServo):
00323 self.dynamixels.append(joint)
00324 elif isinstance(joint, HobbyServo):
00325 self.hobbyservos.append(joint)
00326
00327 self.w_delta = rospy.Duration(1.0/rospy.get_param("~write_rate", 10.0))
00328 self.w_next = rospy.Time.now() + self.w_delta
00329
00330 self.r_delta = rospy.Duration(1.0/rospy.get_param("~read_rate", 10.0))
00331 self.r_next = rospy.Time.now() + self.r_delta
00332
00333 def update(self):
00334 """ Read servo positions, update them. """
00335 if rospy.Time.now() > self.r_next and not self.fake:
00336 if self.device.use_sync_read:
00337
00338 synclist = list()
00339 for joint in self.dynamixels:
00340 if joint.readable:
00341 synclist.append(joint.id)
00342 if len(synclist) > 0:
00343 val = self.device.syncRead(synclist, P_PRESENT_POSITION_L, 2)
00344 if val:
00345 for joint in self.dynamixels:
00346 try:
00347 i = synclist.index(joint.id)*2
00348 joint.setCurrentFeedback(val[i]+(val[i+1]<<8))
00349 except:
00350
00351 continue
00352 else:
00353
00354 for joint in self.dynamixels:
00355 joint.setCurrentFeedback(self.device.getPosition(joint.id))
00356 self.r_next = rospy.Time.now() + self.r_delta
00357
00358 if rospy.Time.now() > self.w_next:
00359 if self.device.use_sync_write and not self.fake:
00360 syncpkt = list()
00361 for joint in self.dynamixels:
00362 v = joint.interpolate(1.0/self.w_delta.to_sec())
00363 if v != None:
00364 syncpkt.append([joint.id,int(v)%256,int(v)>>8])
00365 if len(syncpkt) > 0:
00366 self.device.syncWrite(P_GOAL_POSITION_L,syncpkt)
00367 else:
00368 for joint in self.dynamixels:
00369 v = joint.interpolate(1.0/self.w_delta.to_sec())
00370 if v != None:
00371 self.device.setPosition(joint.id, int(v))
00372 for joint in self.hobbyservos:
00373 v = joint.interpolate(1.0/self.w_delta.to_sec())
00374 if v != None:
00375 self.device.setServo(joint.id, v)
00376 self.w_next = rospy.Time.now() + self.w_delta
00377
00378 def getDiagnostics(self):
00379 """ Update status of servos (voltages, temperatures). """
00380 if self.iter % 5 == 0 and not self.fake:
00381 if self.device.use_sync_read:
00382
00383 synclist = list()
00384 for joint in self.dynamixels:
00385 if joint.readable:
00386 synclist.append(joint.id)
00387 if len(synclist) > 0:
00388 val = self.device.syncRead(synclist, P_PRESENT_VOLTAGE, 2)
00389 if val:
00390 for joint in self.dynamixels:
00391 try:
00392 i = synclist.index(joint.id)*2
00393 if val[i] < 250:
00394 joint.voltage = val[i]/10.0
00395 if val[i+1] < 100:
00396 joint.temperature = val[i+1]
00397 except:
00398
00399 continue
00400 else:
00401
00402 for joint in self.dynamixels:
00403 if joint.readable:
00404 val = self.device.read(joint.id, P_PRESENT_VOLTAGE, 2)
00405 try:
00406 joint.voltage = val[0]
00407 joint.temperature = val[1]
00408 except:
00409 continue
00410 self.iter += 1
00411 return None
00412