servo_controller.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 """
00004   servo_controller.py: classes for servo interaction
00005   Copyright (c) 2011 Vanadium Labs LLC.  All right reserved.
00006 
00007   Redistribution and use in source and binary forms, with or without
00008   modification, are permitted provided that the following conditions are met:
00009       * Redistributions of source code must retain the above copyright
00010         notice, this list of conditions and the following disclaimer.
00011       * Redistributions in binary form must reproduce the above copyright
00012         notice, this list of conditions and the following disclaimer in the
00013         documentation and/or other materials provided with the distribution.
00014       * Neither the name of Vanadium Labs LLC nor the names of its 
00015         contributors may be used to endorse or promote products derived 
00016         from this software without specific prior written permission.
00017   
00018   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00019   ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00020   WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00021   DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT,
00022   INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00023   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
00024   OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00025   LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
00026   OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
00027   ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00028 """
00029 
00030 import rospy
00031 
00032 from math import radians
00033 
00034 from std_msgs.msg import Float64
00035 from arbotix_msgs.srv import *
00036 from diagnostic_msgs.msg import *
00037 
00038 from ax12 import *
00039 from joints import *
00040 
00041 class DynamixelServo(Joint):
00042 
00043     def __init__(self, device, name, ns="~joints"):
00044         Joint.__init__(self, device, name)
00045         n = ns+"/"+name+"/"
00046         
00047         self.id = int(rospy.get_param(n+"id"))
00048         self.ticks = rospy.get_param(n+"ticks", 1024)
00049         self.neutral = rospy.get_param(n+"neutral", self.ticks/2)
00050         if self.ticks == 4096:
00051             self.range = 360.0
00052         else:
00053             self.range = 300.0
00054         self.range = rospy.get_param(n+"range", self.range)
00055         self.rad_per_tick = radians(self.range)/self.ticks
00056 
00057         # TODO: load these from URDF
00058         self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0))
00059         self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0))
00060         self.max_speed = radians(rospy.get_param(n+"max_speed",684.0)) 
00061                                                 # max speed = 114 rpm - 684 deg/s
00062         self.invert = rospy.get_param(n+"invert",False)
00063         self.readable = rospy.get_param(n+"readable",True)
00064 
00065         self.status = "OK"
00066         self.level = DiagnosticStatus.OK
00067 
00068         self.dirty = False                      # newly updated position?
00069         self.position = 0.0                     # current position, as returned by servo (radians)
00070         self.desired = 0.0                      # desired position (radians)
00071         self.last_cmd = 0.0                     # last position sent (radians)
00072         self.velocity = 0.0                     # moving speed
00073         self.relaxed = True                     # are we not under torque control?
00074         self.last = rospy.Time.now()
00075 
00076         self.reads = 0.0                        # number of reads
00077         self.errors = 0                         # number of failed reads
00078         self.total_reads = 0.0                  
00079         self.total_errors = [0.0]
00080 
00081         self.voltage = 0.0
00082         self.temperature = 0.0
00083         
00084         # ROS interfaces
00085         rospy.Subscriber(name+'/command', Float64, self.commandCb)
00086         rospy.Service(name+'/relax', Relax, self.relaxCb)
00087 
00088     def interpolate(self, frame):
00089         """ Get the new position to move to, in ticks. """
00090         if self.dirty:
00091             # compute command, limit velocity
00092             cmd = self.desired - self.last_cmd
00093             if cmd > self.max_speed/frame:
00094                 cmd = self.max_speed/frame
00095             elif cmd < -self.max_speed/frame:
00096                 cmd = -self.max_speed/frame
00097             # compute angle, apply limits
00098             ticks = self.angleToTicks(self.last_cmd + cmd)
00099             self.last_cmd = self.ticksToAngle(ticks)
00100             self.speed = cmd*frame
00101             # cap movement
00102             if self.last_cmd == self.desired:
00103                 self.dirty = False
00104             if self.device.fake:
00105                 self.position = self.last_cmd
00106                 return None
00107             return int(ticks)
00108         else:
00109             return None
00110 
00111     def setCurrentFeedback(self, reading):
00112         """ Update angle in radians by reading from servo, or by 
00113             using position passed in from a sync read (in ticks). """
00114         if reading > -1 and reading < self.ticks:     # check validity
00115             self.reads += 1
00116             self.total_reads += 1
00117             last_angle = self.position
00118             self.position = self.ticksToAngle(reading)
00119             # update velocity estimate
00120             t = rospy.Time.now()
00121             self.velocity = (self.position - last_angle)/((t - self.last).to_nsec()/1000000000.0)
00122             self.last = t
00123         else:
00124             rospy.logdebug("Invalid read of servo: id " + str(self.id) + ", value " + str(reading))
00125             self.errors += 1
00126             self.total_reads += 1
00127             return
00128         if self.relaxed:
00129             self.last_cmd = self.position
00130 
00131     def setControlOutput(self, position):
00132         """ Set the position that controller is moving to. 
00133             Returns output value in ticks. """
00134         ticks = self.angleToTicks(position)
00135         self.desired = position
00136         self.dirty = True
00137         self.relaxed = False
00138         return int(ticks)
00139 
00140     def getDiagnostics(self):
00141         """ Get a diagnostics status. """
00142         msg = DiagnosticStatus()
00143         msg.name = self.name
00144         if self.temperature > 60:   # TODO: read this value from eeprom
00145             self.status = "OVERHEATED, SHUTDOWN"
00146             self.level = DiagnosticStatus.ERROR
00147         elif self.temperature > 50 and self.status != "OVERHEATED, SHUTDOWN":
00148             self.status = "OVERHEATING"
00149             self.level = DiagnosticStatus.WARN
00150         elif self.status != "OVERHEATED, SHUTDOWN":
00151             self.status = "OK"
00152             self.level = DiagnosticStatus.OK
00153         msg.level = self.level
00154         msg.message = self.status
00155         msg.values.append(KeyValue("Position", str(self.position)))
00156         msg.values.append(KeyValue("Temperature", str(self.temperature)))
00157         msg.values.append(KeyValue("Voltage", str(self.voltage)))
00158         if self.reads + self.errors > 100:
00159             self.total_errors.append((self.errors*100.0)/(self.reads+self.errors))
00160             if len(self.total_errors) > 10:
00161                 self.total_errors = self.total_errors[-10:]
00162             self.reads = 0
00163             self.errors = 0
00164         msg.values.append(KeyValue("Reads", str(self.total_reads)))
00165         msg.values.append(KeyValue("Error Rate", str(sum(self.total_errors)/len(self.total_errors))+"%" ))
00166         if self.relaxed:
00167             msg.values.append(KeyValue("Torque", "OFF"))
00168         else:
00169             msg.values.append(KeyValue("Torque", "ON"))
00170         return msg
00171 
00172     def angleToTicks(self, angle):
00173         """ Convert an angle to ticks, applying limits. """
00174         ticks = self.neutral + (angle/self.rad_per_tick)
00175         if self.invert:
00176             ticks = self.neutral - (angle/self.rad_per_tick)
00177         if ticks >= self.ticks:
00178             return self.ticks-1.0
00179         if ticks < 0:
00180             return 0
00181         return ticks
00182 
00183     def ticksToAngle(self, ticks):
00184         """ Convert an ticks to angle, applying limits. """
00185         angle = (ticks - self.neutral) * self.rad_per_tick
00186         if self.invert:
00187             angle = -1.0 * angle
00188         return angle        
00189 
00190     def relaxCb(self, req):
00191         """ Turn off servo torque, so that it is pose-able. """
00192         if not self.device.fake:
00193             self.device.disableTorque(self.id)
00194         self.dirty = False
00195         self.relaxed = True
00196         return RelaxResponse()
00197 
00198     def commandCb(self, req):
00199         """ Float64 style command input. """
00200         if self.controller and self.controller.active():
00201             # Under and action control, do not interfere
00202             return
00203         elif self.desired != req.data or self.relaxed:
00204             self.dirty = True   
00205             self.relaxed = False
00206             self.desired = req.data
00207 
00208 class HobbyServo(Joint):
00209 
00210     def __init__(self, device, name, ns="~joints"):
00211         Joint.__init__(self, device, name)
00212         n = ns+"/"+name+"/"
00213         
00214         self.id = int(rospy.get_param(n+"id"))
00215         self.ticks = rospy.get_param(n+"ticks", 2000)
00216         self.neutral = rospy.get_param(n+"neutral", 1500)
00217         self.range = rospy.get_param(n+"range", 180)
00218         self.rad_per_tick = radians(self.range)/self.ticks
00219 
00220         # TODO: load these from URDF
00221         self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0))
00222         self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0))
00223         self.max_speed = radians(rospy.get_param(n+"max_speed",90.0)) 
00224 
00225         self.invert = rospy.get_param(n+"invert",False)
00226 
00227         self.dirty = False                      # newly updated position?
00228         self.position = 0.0                     # current position, as returned by servo (radians)
00229         self.desired = 0.0                      # desired position (radians)
00230         self.last_cmd = 0.0                     # last position sent (radians)
00231         self.velocity = 0.0                     # moving speed
00232         self.last = rospy.Time.now()
00233         
00234         # ROS interfaces
00235         rospy.Subscriber(name+'/command', Float64, self.commandCb)
00236 
00237     def interpolate(self, frame):
00238         """ Get the new position to move to, in ticks. """
00239         if self.dirty:
00240             # compute command, limit velocity
00241             cmd = self.desired - self.last_cmd
00242             if cmd > self.max_speed/frame:
00243                 cmd = self.max_speed/frame
00244             elif cmd < -self.max_speed/frame:
00245                 cmd = -self.max_speed/frame
00246             # compute angle, apply limits
00247             ticks = self.angleToTicks(self.last_cmd + cmd)
00248             self.last_cmd = self.ticksToAngle(ticks)
00249             self.speed = cmd*frame
00250             # cap movement
00251             if self.last_cmd == self.desired:
00252                 self.dirty = False
00253             if self.device.fake:
00254                 self.position = self.last_cmd
00255                 return None
00256             return int(ticks)
00257         else:
00258             return None
00259 
00260     def setCurrentFeedback(self, raw_data):
00261         """ Update angle in radians by reading from servo, or by 
00262             using position passed in from a sync read (in ticks). """
00263         return None
00264 
00265     def setControlOutput(self, position):
00266         """ Set the position that controller is moving to. 
00267             Returns output value in ticks. """
00268         ticks = self.angleToTicks(position)
00269         self.desired = position
00270         self.dirty = True
00271         return int(ticks)
00272 
00273     def getDiagnostics(self):
00274         """ Get a diagnostics status. """
00275         msg = DiagnosticStatus()
00276         msg.name = self.name
00277         msg.level = DiagnosticStatus.OK
00278         msg.message = "OK"
00279         msg.values.append(KeyValue("Position", str(self.position)))
00280         return msg
00281 
00282     def angleToTicks(self, angle):
00283         """ Convert an angle to ticks, applying limits. """
00284         ticks = self.neutral + (angle/self.rad_per_tick)
00285         if self.invert:
00286             ticks = self.neutral - (angle/self.rad_per_tick)
00287         if ticks >= self.ticks:
00288             return self.ticks-1.0
00289         if ticks < 0:
00290             return 0
00291         return ticks
00292 
00293     def ticksToAngle(self, ticks):
00294         """ Convert an ticks to angle, applying limits. """
00295         angle = (ticks - self.neutral) * self.rad_per_tick
00296         if self.invert:
00297             angle = -1.0 * angle
00298         return angle        
00299 
00300     def commandCb(self, req):
00301         """ Float64 style command input. """
00302         if self.controller and self.controller.active():
00303             # Under and action control, do not interfere
00304             return
00305         else:
00306             self.dirty = True
00307             self.desired = req.data
00308 
00309 
00310 from controllers import *
00311 
00312 class ServoController(Controller):
00313 
00314     def __init__(self, device, name):
00315         Controller.__init__(self, device, name)
00316         self.dynamixels = list()
00317         self.hobbyservos = list()
00318         self.iter = 0
00319 
00320         # steal some servos
00321         for joint in device.joints.values():
00322             if isinstance(joint, DynamixelServo):
00323                 self.dynamixels.append(joint)
00324             elif isinstance(joint, HobbyServo):
00325                 self.hobbyservos.append(joint)
00326 
00327         self.w_delta = rospy.Duration(1.0/rospy.get_param("~write_rate", 10.0))
00328         self.w_next = rospy.Time.now() + self.w_delta
00329 
00330         self.r_delta = rospy.Duration(1.0/rospy.get_param("~read_rate", 10.0))
00331         self.r_next = rospy.Time.now() + self.r_delta
00332 
00333     def update(self):
00334         """ Read servo positions, update them. """
00335         if rospy.Time.now() > self.r_next and not self.fake:
00336             if self.device.use_sync_read:
00337                 # arbotix/servostik/wifi board sync_read
00338                 synclist = list()
00339                 for joint in self.dynamixels:
00340                     if joint.readable:
00341                         synclist.append(joint.id)
00342                 if len(synclist) > 0:
00343                     val = self.device.syncRead(synclist, P_PRESENT_POSITION_L, 2)
00344                     if val: 
00345                         for joint in self.dynamixels:
00346                             try:
00347                                 i = synclist.index(joint.id)*2
00348                                 joint.setCurrentFeedback(val[i]+(val[i+1]<<8))
00349                             except:
00350                                 # not a readable servo
00351                                 continue 
00352             else:
00353                 # direct connection, or other hardware with no sync_read capability
00354                 for joint in self.dynamixels:
00355                     joint.setCurrentFeedback(self.device.getPosition(joint.id))
00356             self.r_next = rospy.Time.now() + self.r_delta
00357 
00358         if rospy.Time.now() > self.w_next:
00359             if self.device.use_sync_write and not self.fake:
00360                 syncpkt = list()
00361                 for joint in self.dynamixels:
00362                     v = joint.interpolate(1.0/self.w_delta.to_sec())
00363                     if v != None:   # if was dirty
00364                         syncpkt.append([joint.id,int(v)%256,int(v)>>8])                         
00365                 if len(syncpkt) > 0:      
00366                     self.device.syncWrite(P_GOAL_POSITION_L,syncpkt)
00367             else:
00368                 for joint in self.dynamixels:
00369                     v = joint.interpolate(1.0/self.w_delta.to_sec())
00370                     if v != None:   # if was dirty      
00371                         self.device.setPosition(joint.id, int(v))
00372             for joint in self.hobbyservos: 
00373                 v = joint.interpolate(1.0/self.w_delta.to_sec())
00374                 if v != None:   # if it was dirty   
00375                     self.device.setServo(joint.id, v)
00376             self.w_next = rospy.Time.now() + self.w_delta
00377 
00378     def getDiagnostics(self):
00379         """ Update status of servos (voltages, temperatures). """
00380         if self.iter % 5 == 0 and not self.fake:
00381             if self.device.use_sync_read:
00382                 # arbotix/servostik/wifi board sync_read
00383                 synclist = list()
00384                 for joint in self.dynamixels:
00385                     if joint.readable:
00386                         synclist.append(joint.id)
00387                 if len(synclist) > 0:
00388                     val = self.device.syncRead(synclist, P_PRESENT_VOLTAGE, 2)
00389                     if val:
00390                         for joint in self.dynamixels:
00391                             try:
00392                                 i = synclist.index(joint.id)*2
00393                                 if val[i] < 250:
00394                                     joint.voltage = val[i]/10.0
00395                                 if val[i+1] < 100:
00396                                     joint.temperature = val[i+1]
00397                             except:
00398                                 # not a readable servo
00399                                 continue 
00400             else:
00401                 # direct connection, or other hardware with no sync_read capability
00402                 for joint in self.dynamixels:
00403                     if joint.readable:
00404                         val = self.device.read(joint.id, P_PRESENT_VOLTAGE, 2)
00405                         try:
00406                             joint.voltage = val[0]
00407                             joint.temperature = val[1]
00408                         except:
00409                             continue
00410         self.iter += 1
00411         return None
00412 


arbotix_python
Author(s): Michael Ferguson
autogenerated on Sat Dec 28 2013 16:46:13