servo_controller.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 """
00004   servo_controller.py: classes for servo interaction
00005   Copyright (c) 2011-2013 Vanadium Labs LLC. All right reserved.
00006 
00007   Redistribution and use in source and binary forms, with or without
00008   modification, are permitted provided that the following conditions are met:
00009       * Redistributions of source code must retain the above copyright
00010         notice, this list of conditions and the following disclaimer.
00011       * Redistributions in binary form must reproduce the above copyright
00012         notice, this list of conditions and the following disclaimer in the
00013         documentation and/or other materials provided with the distribution.
00014       * Neither the name of Vanadium Labs LLC nor the names of its 
00015         contributors may be used to endorse or promote products derived 
00016         from this software without specific prior written permission.
00017   
00018   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00019   ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00020   WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00021   DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT,
00022   INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00023   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
00024   OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00025   LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
00026   OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
00027   ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00028 """
00029 
00030 import rospy
00031 
00032 from math import radians
00033 
00034 from std_msgs.msg import Float64
00035 from arbotix_msgs.srv import *
00036 from diagnostic_msgs.msg import *
00037 
00038 from ax12 import *
00039 from joints import *
00040 
00041 class DynamixelServo(Joint):
00042 
00043     def __init__(self, device, name, ns="~joints"):
00044         Joint.__init__(self, device, name)
00045         n = ns+"/"+name+"/"
00046         
00047         self.id = int(rospy.get_param(n+"id"))
00048         self.ticks = rospy.get_param(n+"ticks", 1024)
00049         self.neutral = rospy.get_param(n+"neutral", self.ticks/2)
00050         if self.ticks == 4096:
00051             self.range = 360.0
00052         else:
00053             self.range = 300.0
00054         self.range = rospy.get_param(n+"range", self.range)
00055         self.rad_per_tick = radians(self.range)/self.ticks
00056 
00057         # TODO: load these from URDF
00058         self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0))
00059         self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0))
00060         self.max_speed = radians(rospy.get_param(n+"max_speed",684.0)) 
00061                                                 # max speed = 114 rpm - 684 deg/s
00062         self.invert = rospy.get_param(n+"invert",False)
00063         self.readable = rospy.get_param(n+"readable",True)
00064 
00065         self.status = "OK"
00066         self.level = DiagnosticStatus.OK
00067 
00068         self.dirty = False                      # newly updated position?
00069         self.position = 0.0                     # current position, as returned by servo (radians)
00070         self.desired = 0.0                      # desired position (radians)
00071         self.last_cmd = 0.0                     # last position sent (radians)
00072         self.velocity = 0.0                     # moving speed
00073         self.enabled = True                     # can we take commands?
00074         self.active = False                     # are we under torque control?
00075         self.last = rospy.Time.now()
00076 
00077         self.reads = 0.0                        # number of reads
00078         self.errors = 0                         # number of failed reads
00079         self.total_reads = 0.0                  
00080         self.total_errors = [0.0]
00081 
00082         self.voltage = 0.0
00083         self.temperature = 0.0
00084         
00085         # ROS interfaces
00086         rospy.Subscriber(name+'/command', Float64, self.commandCb)
00087         rospy.Service(name+'/relax', Relax, self.relaxCb)
00088         rospy.Service(name+'/enable', Enable, self.enableCb)
00089         rospy.Service(name+'/set_speed', SetSpeed, self.setSpeedCb)
00090 
00091     def interpolate(self, frame):
00092         """ Get the new position to move to, in ticks. """
00093         if self.enabled and self.active and self.dirty:
00094             # compute command, limit velocity
00095             cmd = self.desired - self.last_cmd
00096             if cmd > self.max_speed/frame:
00097                 cmd = self.max_speed/frame
00098             elif cmd < -self.max_speed/frame:
00099                 cmd = -self.max_speed/frame
00100             # compute angle, apply limits
00101             ticks = self.angleToTicks(self.last_cmd + cmd)
00102             self.last_cmd = self.ticksToAngle(ticks)
00103             self.speed = cmd*frame
00104             # cap movement
00105             if self.last_cmd == self.desired:
00106                 self.dirty = False
00107             # when fake, need to set position/velocity here
00108             if self.device.fake:
00109                 last_angle = self.position
00110                 self.position = self.last_cmd
00111                 t = rospy.Time.now()
00112                 self.velocity = (self.position - last_angle)/((t - self.last).to_nsec()/1000000000.0)
00113                 self.last = t
00114                 return None
00115             return int(ticks)
00116         else:
00117             # when fake, need to reset velocity to 0 here.
00118             if self.device.fake:
00119                 self.velocity = 0.0
00120                 self.last = rospy.Time.now()
00121             return None
00122 
00123     def setCurrentFeedback(self, reading):
00124         """ Update angle in radians by reading from servo, or by 
00125             using position passed in from a sync read (in ticks). """
00126         if reading > -1 and reading < self.ticks:     # check validity
00127             self.reads += 1
00128             self.total_reads += 1
00129             last_angle = self.position
00130             self.position = self.ticksToAngle(reading)
00131             # update velocity estimate
00132             t = rospy.Time.now()
00133             self.velocity = (self.position - last_angle)/((t - self.last).to_nsec()/1000000000.0)
00134             self.last = t
00135         else:
00136             rospy.logdebug("Invalid read of servo: id " + str(self.id) + ", value " + str(reading))
00137             self.errors += 1
00138             self.total_reads += 1
00139             return
00140         if not self.active:
00141             self.last_cmd = self.position
00142 
00143     def setControlOutput(self, position):
00144         """ Set the position that controller is moving to. 
00145             Returns output value in ticks. """
00146         if self.enabled:
00147             ticks = self.angleToTicks(position)
00148             self.desired = position
00149             self.dirty = True
00150             self.active = True
00151             return int(ticks)
00152         return -1
00153 
00154     def getDiagnostics(self):
00155         """ Get a diagnostics status. """
00156         msg = DiagnosticStatus()
00157         msg.name = self.name
00158         if self.temperature > 60:   # TODO: read this value from eeprom
00159             self.status = "OVERHEATED, SHUTDOWN"
00160             self.level = DiagnosticStatus.ERROR
00161         elif self.temperature > 50 and self.status != "OVERHEATED, SHUTDOWN":
00162             self.status = "OVERHEATING"
00163             self.level = DiagnosticStatus.WARN
00164         elif self.status != "OVERHEATED, SHUTDOWN":
00165             self.status = "OK"
00166             self.level = DiagnosticStatus.OK
00167         msg.level = self.level
00168         msg.message = self.status
00169         msg.values.append(KeyValue("Position", str(self.position)))
00170         msg.values.append(KeyValue("Temperature", str(self.temperature)))
00171         msg.values.append(KeyValue("Voltage", str(self.voltage)))
00172         if self.reads + self.errors > 100:
00173             self.total_errors.append((self.errors*100.0)/(self.reads+self.errors))
00174             if len(self.total_errors) > 10:
00175                 self.total_errors = self.total_errors[-10:]
00176             self.reads = 0
00177             self.errors = 0
00178         msg.values.append(KeyValue("Reads", str(self.total_reads)))
00179         msg.values.append(KeyValue("Error Rate", str(sum(self.total_errors)/len(self.total_errors))+"%" ))
00180         if self.active:
00181             msg.values.append(KeyValue("Torque", "ON"))
00182         else:
00183             msg.values.append(KeyValue("Torque", "OFF"))
00184         return msg
00185 
00186     def angleToTicks(self, angle):
00187         """ Convert an angle to ticks, applying limits. """
00188         ticks = self.neutral + (angle/self.rad_per_tick)
00189         if self.invert:
00190             ticks = self.neutral - (angle/self.rad_per_tick)
00191         if ticks >= self.ticks:
00192             return self.ticks-1.0
00193         if ticks < 0:
00194             return 0
00195         return ticks
00196 
00197     def ticksToAngle(self, ticks):
00198         """ Convert an ticks to angle, applying limits. """
00199         angle = (ticks - self.neutral) * self.rad_per_tick
00200         if self.invert:
00201             angle = -1.0 * angle
00202         return angle
00203 
00204     def speedToTicks(self, rads_per_sec):
00205         """ Convert speed in radians per second to ticks, applying limits. """
00206         ticks = self.ticks * rads_per_sec / self.max_speed
00207         if ticks >= self.ticks:
00208             return self.ticks-1.0
00209         if ticks < 0:
00210             return 0
00211         return ticks  
00212 
00213     def enableCb(self, req):
00214         """ Turn on/off servo torque, so that it is pose-able. """
00215         if req.enable:
00216             self.enabled = True
00217         else:
00218             if not self.device.fake:
00219                 self.device.disableTorque(self.id)
00220             self.dirty = False
00221             self.enabled = False
00222             self.active = False
00223         return EnableResponse(self.enabled)
00224 
00225     def relaxCb(self, req):
00226         """ Turn off servo torque, so that it is pose-able. """
00227         if not self.device.fake:
00228             self.device.disableTorque(self.id)
00229         self.dirty = False
00230         self.active = False
00231         return RelaxResponse()
00232 
00233     def commandCb(self, req):
00234         """ Float64 style command input. """
00235         if self.enabled:
00236             if self.controller and self.controller.active():
00237                 # Under and action control, do not interfere
00238                 return
00239             elif self.desired != req.data or not self.active:
00240                 self.dirty = True
00241                 self.active = True
00242                 self.desired = req.data
00243                 
00244     def setSpeedCb(self, req):
00245         """ Set servo speed. Requested speed is in radians per second.
00246             Don't allow 0 which means "max speed" to a Dynamixel in joint mode. """
00247         if not self.device.fake:
00248             ticks_per_sec = max(1, int(self.speedToTicks(req.speed)))
00249             self.device.setSpeed(self.id, ticks_per_sec)
00250         return SetSpeedResponse()
00251 
00252 class HobbyServo(Joint):
00253 
00254     def __init__(self, device, name, ns="~joints"):
00255         Joint.__init__(self, device, name)
00256         n = ns+"/"+name+"/"
00257         
00258         self.id = int(rospy.get_param(n+"id"))
00259         self.ticks = rospy.get_param(n+"ticks", 2000)
00260         self.neutral = rospy.get_param(n+"neutral", 1500)
00261         self.range = rospy.get_param(n+"range", 180)
00262         self.rad_per_tick = radians(self.range)/self.ticks
00263 
00264         # TODO: load these from URDF
00265         self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0))
00266         self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0))
00267         self.max_speed = radians(rospy.get_param(n+"max_speed",90.0)) 
00268 
00269         self.invert = rospy.get_param(n+"invert",False)
00270 
00271         self.dirty = False                      # newly updated position?
00272         self.position = 0.0                     # current position, as returned by servo (radians)
00273         self.desired = 0.0                      # desired position (radians)
00274         self.last_cmd = 0.0                     # last position sent (radians)
00275         self.velocity = 0.0                     # moving speed
00276         self.last = rospy.Time.now()
00277         
00278         # ROS interfaces
00279         rospy.Subscriber(name+'/command', Float64, self.commandCb)
00280 
00281     def interpolate(self, frame):
00282         """ Get the new position to move to, in ticks. """
00283         if self.dirty:
00284             # compute command, limit velocity
00285             cmd = self.desired - self.last_cmd
00286             if cmd > self.max_speed/frame:
00287                 cmd = self.max_speed/frame
00288             elif cmd < -self.max_speed/frame:
00289                 cmd = -self.max_speed/frame
00290             # compute angle, apply limits
00291             ticks = self.angleToTicks(self.last_cmd + cmd)
00292             self.last_cmd = self.ticksToAngle(ticks)
00293             self.speed = cmd*frame
00294             # cap movement
00295             if self.last_cmd == self.desired:
00296                 self.dirty = False
00297             if self.device.fake:
00298                 self.position = self.last_cmd
00299                 return None
00300             return int(ticks)
00301         else:
00302             return None
00303 
00304     def setCurrentFeedback(self, raw_data):
00305         """ Update angle in radians by reading from servo, or by 
00306             using position passed in from a sync read (in ticks). """
00307         return None
00308 
00309     def setControlOutput(self, position):
00310         """ Set the position that controller is moving to. 
00311             Returns output value in ticks. """
00312         ticks = self.angleToTicks(position)
00313         self.desired = position
00314         self.dirty = True
00315         return int(ticks)
00316 
00317     def getDiagnostics(self):
00318         """ Get a diagnostics status. """
00319         msg = DiagnosticStatus()
00320         msg.name = self.name
00321         msg.level = DiagnosticStatus.OK
00322         msg.message = "OK"
00323         msg.values.append(KeyValue("Position", str(self.position)))
00324         return msg
00325 
00326     def angleToTicks(self, angle):
00327         """ Convert an angle to ticks, applying limits. """
00328         ticks = self.neutral + (angle/self.rad_per_tick)
00329         if self.invert:
00330             ticks = self.neutral - (angle/self.rad_per_tick)
00331         if ticks >= self.ticks:
00332             return self.ticks-1.0
00333         if ticks < 0:
00334             return 0
00335         return ticks
00336 
00337     def ticksToAngle(self, ticks):
00338         """ Convert an ticks to angle, applying limits. """
00339         angle = (ticks - self.neutral) * self.rad_per_tick
00340         if self.invert:
00341             angle = -1.0 * angle
00342         return angle        
00343 
00344     def commandCb(self, req):
00345         """ Float64 style command input. """
00346         if self.controller and self.controller.active():
00347             # Under and action control, do not interfere
00348             return
00349         else:
00350             self.dirty = True
00351             self.desired = req.data
00352 
00353 
00354 from controllers import *
00355 
00356 class ServoController(Controller):
00357 
00358     def __init__(self, device, name):
00359         Controller.__init__(self, device, name)
00360         self.dynamixels = list()
00361         self.hobbyservos = list()
00362         self.iter = 0
00363 
00364         # steal some servos
00365         for joint in device.joints.values():
00366             if isinstance(joint, DynamixelServo):
00367                 self.dynamixels.append(joint)
00368             elif isinstance(joint, HobbyServo):
00369                 self.hobbyservos.append(joint)
00370 
00371         self.w_delta = rospy.Duration(1.0/rospy.get_param("~write_rate", 10.0))
00372         self.w_next = rospy.Time.now() + self.w_delta
00373 
00374         self.r_delta = rospy.Duration(1.0/rospy.get_param("~read_rate", 10.0))
00375         self.r_next = rospy.Time.now() + self.r_delta
00376 
00377         rospy.Service(name + '/relax_all', Relax, self.relaxCb)
00378         rospy.Service(name + '/enable_all', Enable, self.enableCb)
00379 
00380     def update(self):
00381         """ Read servo positions, update them. """
00382         if rospy.Time.now() > self.r_next and not self.fake:
00383             if self.device.use_sync_read:
00384                 # arbotix/servostik/wifi board sync_read
00385                 synclist = list()
00386                 for joint in self.dynamixels:
00387                     if joint.readable:
00388                         synclist.append(joint.id)
00389                 if len(synclist) > 0:
00390                     val = self.device.syncRead(synclist, P_PRESENT_POSITION_L, 2)
00391                     if val: 
00392                         for joint in self.dynamixels:
00393                             try:
00394                                 i = synclist.index(joint.id)*2
00395                                 joint.setCurrentFeedback(val[i]+(val[i+1]<<8))
00396                             except:
00397                                 # not a readable servo
00398                                 continue 
00399             else:
00400                 # direct connection, or other hardware with no sync_read capability
00401                 for joint in self.dynamixels:
00402                     joint.setCurrentFeedback(self.device.getPosition(joint.id))
00403             self.r_next = rospy.Time.now() + self.r_delta
00404 
00405         if rospy.Time.now() > self.w_next:
00406             if self.device.use_sync_write and not self.fake:
00407                 syncpkt = list()
00408                 for joint in self.dynamixels:
00409                     v = joint.interpolate(1.0/self.w_delta.to_sec())
00410                     if v != None:   # if was dirty
00411                         syncpkt.append([joint.id,int(v)%256,int(v)>>8])                         
00412                 if len(syncpkt) > 0:      
00413                     self.device.syncWrite(P_GOAL_POSITION_L,syncpkt)
00414             else:
00415                 for joint in self.dynamixels:
00416                     v = joint.interpolate(1.0/self.w_delta.to_sec())
00417                     if v != None:   # if was dirty      
00418                         self.device.setPosition(joint.id, int(v))
00419             for joint in self.hobbyservos: 
00420                 v = joint.interpolate(1.0/self.w_delta.to_sec())
00421                 if v != None:   # if it was dirty   
00422                     self.device.setServo(joint.id, v)
00423             self.w_next = rospy.Time.now() + self.w_delta
00424 
00425     def getDiagnostics(self):
00426         """ Update status of servos (voltages, temperatures). """
00427         if self.iter % 5 == 0 and not self.fake:
00428             if self.device.use_sync_read:
00429                 # arbotix/servostik/wifi board sync_read
00430                 synclist = list()
00431                 for joint in self.dynamixels:
00432                     if joint.readable:
00433                         synclist.append(joint.id)
00434                 if len(synclist) > 0:
00435                     val = self.device.syncRead(synclist, P_PRESENT_VOLTAGE, 2)
00436                     if val:
00437                         for joint in self.dynamixels:
00438                             try:
00439                                 i = synclist.index(joint.id)*2
00440                                 if val[i] < 250:
00441                                     joint.voltage = val[i]/10.0
00442                                 if val[i+1] < 100:
00443                                     joint.temperature = val[i+1]
00444                             except:
00445                                 # not a readable servo
00446                                 continue 
00447             else:
00448                 # direct connection, or other hardware with no sync_read capability
00449                 for joint in self.dynamixels:
00450                     if joint.readable:
00451                         val = self.device.read(joint.id, P_PRESENT_VOLTAGE, 2)
00452                         try:
00453                             joint.voltage = val[0]
00454                             joint.temperature = val[1]
00455                         except:
00456                             continue
00457         self.iter += 1
00458         return None
00459 
00460     def enableCb(self, req):
00461         """ Turn on/off all servos torque, so that they are pose-able. """
00462         for joint in self.dynamixels:
00463             resp = joint.enableCb(req)
00464         return resp
00465 
00466     def relaxCb(self, req):
00467         """ Turn off all servos torque, so that they are pose-able. """
00468         for joint in self.dynamixels:
00469             resp = joint.relaxCb(req)
00470         return resp


arbotix_python
Author(s): Michael Ferguson
autogenerated on Sat Jun 8 2019 19:34:55