linear_controller.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 """
00004   linear_controller.py - controller for a linear actuator with analog feedback
00005   Copyright (c) 2011 Vanadium Labs LLC.  All right reserved.
00006 
00007   Redistribution and use in source and binary forms, with or without
00008   modification, are permitted provided that the following conditions are met:
00009       * Redistributions of source code must retain the above copyright
00010         notice, this list of conditions and the following disclaimer.
00011       * Redistributions in binary form must reproduce the above copyright
00012         notice, this list of conditions and the following disclaimer in the
00013         documentation and/or other materials provided with the distribution.
00014       * Neither the name of Vanadium Labs LLC nor the names of its 
00015         contributors may be used to endorse or promote products derived 
00016         from this software without specific prior written permission.
00017   
00018   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00019   ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00020   WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00021   DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT,
00022   INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00023   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
00024   OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00025   LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
00026   OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
00027   ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00028 """
00029 
00030 import rospy, actionlib
00031 
00032 from joints import *
00033 from controllers import *
00034 from std_msgs.msg import Float64
00035 from diagnostic_msgs.msg import *
00036 from std_srvs.srv import *
00037 
00038 from struct import unpack
00039 
00040 class LinearJoint(Joint):
00041     def __init__(self, device, name):
00042         Joint.__init__(self, device, name)
00043 
00044         self.dirty = False
00045         self.position = 0.0                     # current position, as returned by feedback (meters)
00046         self.desired = 0.0                      # desired position (meters)
00047         self.velocity = 0.0                     # moving speed
00048         self.last = rospy.Time.now()
00049 
00050         # TODO: load these from URDF
00051         self.min = rospy.get_param('~joints/'+name+'/min_position',0.0)
00052         self.max = rospy.get_param('~joints/'+name+'/max_position',0.5)
00053         self.max_speed = rospy.get_param('~joints/'+name+'/max_speed',0.0508)
00054 
00055         # calibration data {reading: position}
00056         self.cal = { -1: -1, 1: 1 }
00057         self.cal_raw = rospy.get_param('~joints/'+name+'/calibration_data', self.cal)
00058         self.cal = dict()
00059         for key, value in self.cal_raw.items():
00060             self.cal[int(key)] = value
00061         self.keys = sorted(self.cal.keys())
00062 
00063         rospy.Subscriber(name+'/command', Float64, self.commandCb)
00064         
00065     def interpolate(self, frame):
00066         """ Get new output: 1 = increase position, -1 is decrease position. """
00067         if self.dirty:
00068             cmd = self.desired - self.position
00069             if self.device.fake: 
00070                 self.position = self.desired
00071                 self.dirty = False
00072                 return None
00073             if cmd > 0.01:
00074                 return 1
00075             elif cmd < -0.01:
00076                 return -1
00077             else:
00078                 self.dirty = False
00079                 return 0
00080         else:
00081             return None
00082 
00083     def setCurrentFeedback(self, reading):
00084         if reading >= self.keys[0] and reading <= self.keys[-1]:
00085             last_angle = self.position
00086             self.position = self.readingToPosition(reading)
00087             # update velocity estimate
00088             t = rospy.Time.now()
00089             self.velocity = (self.position - last_angle)/((t - self.last).to_nsec()/1000000000.0)
00090             self.last = t
00091         else:
00092             rospy.logerr(self.name + ": feedback reading out of range")
00093 
00094     def setControlOutput(self, position):
00095         """ Set the position that controller is moving to. 
00096             Returns output value in raw_data format. """
00097         if position <= self.max and position >= self.min:
00098             self.desired = position
00099             self.dirty = True
00100         else:
00101             rospy.logerr(self.name + ": requested position is out of range: " + str(position))
00102         return None # TODO
00103     
00104     def getDiagnostics(self):
00105         """ Get a diagnostics status. """
00106         msg = DiagnosticStatus()
00107         msg.name = self.name
00108         msg.level = DiagnosticStatus.OK
00109         if self.dirty:
00110             msg.message = "Moving"
00111         else:
00112             msg.message = "OK"
00113         msg.values.append(KeyValue("Position", str(self.position)))
00114         return msg
00115 
00116     def commandCb(self, req):
00117         """ Float64 style command input. """
00118         if self.device.fake:
00119             self.position = req.data
00120         else:
00121             if req.data <= self.max and req.data >= self.min:
00122                 self.desired = req.data
00123                 self.dirty = True
00124             else:
00125                 rospy.logerr(self.name + ": requested position is out of range: " + str(req))
00126 
00127     def readingToPosition(self, reading):
00128         low = 0
00129         while reading > self.keys[low+1]:
00130             low += 1
00131         high = len(self.keys) - 1
00132         while reading < self.keys[high-1]:
00133             high += -1
00134         x = self.keys[high] - self.keys[low]
00135         y = self.cal[self.keys[high]] - self.cal[self.keys[low]]
00136         x1 = reading - self.keys[low]
00137         y1 = y * ( float(x1)/float(x) )
00138         return self.cal[self.keys[low]] + y1
00139 
00140 
00141 class LinearControllerAbsolute(Controller):
00142     """ A controller for a linear actuator, with absolute positional feedback. """
00143 
00144     def __init__(self, device, name):
00145         Controller.__init__(self, device, name)
00146 
00147         self.a = rospy.get_param('~controllers/'+name+'/motor_a',29)
00148         self.b = rospy.get_param('~controllers/'+name+'/motor_b',30)
00149         self.p = rospy.get_param('~controllers/'+name+'/motor_pwm',31)
00150         self.analog = rospy.get_param('~controllers/'+name+'/feedback',0)
00151         self.last = 0
00152         self.last_reading = 0
00153 
00154         self.delta = rospy.Duration(1.0/rospy.get_param('~controllers/'+name+'/rate', 10.0))
00155         self.next = rospy.Time.now() + self.delta
00156 
00157         self.joint = device.joints[rospy.get_param('~controllers/'+name+'/joint')]
00158 
00159         rospy.loginfo("Started LinearController ("+self.name+").")
00160 
00161     def startup(self):
00162         if not self.fake:
00163             self.joint.setCurrentFeedback(self.device.getAnalog(self.analog))
00164 
00165     def update(self):
00166         now = rospy.Time.now()
00167         if now > self.next:
00168             # read current position
00169             if self.joint.dirty:
00170                 if not self.fake:
00171                     try:
00172                         self.last_reading = self.getPosition()
00173                         self.joint.setCurrentFeedback(self.last_reading)
00174                     except Exception as e:
00175                         print "linear error: ", e
00176                 # update movement
00177                 output = self.joint.interpolate(1.0/self.delta.to_sec())
00178                 if self.last != output and not self.fake: 
00179                     self.setSpeed(output)
00180                     self.last = output
00181             self.next = now + self.delta
00182     
00183     def setSpeed(self, speed):
00184         """ Set speed of actuator. """
00185         if speed > 0:
00186             self.device.setDigital(self.a, 0); self.device.setDigital(self.b, 1);   # up
00187             self.device.setDigital(self.p, 1)
00188         elif speed < 0:
00189             self.device.setDigital(self.a, 1); self.device.setDigital(self.b, 0);   # down
00190             self.device.setDigital(self.p, 1)
00191         else:
00192             self.device.setDigital(self.p, 0)
00193 
00194     def getPosition(self):
00195         return self.device.getAnalog(self.analog)
00196 
00197     def shutdown(self):
00198         if not self.fake:
00199             self.device.setDigital(self.p, 0)
00200 
00201     def getDiagnostics(self):
00202         """ Get a diagnostics status. """
00203         msg = DiagnosticStatus()
00204         msg.name = self.name
00205 
00206         msg.level = DiagnosticStatus.OK
00207         msg.message = "OK"
00208         if not self.fake:
00209             msg.values.append(KeyValue("Encoder Reading", str(self.last_reading)))
00210         
00211         return msg
00212 
00213 
00214 class LinearControllerIncremental(LinearControllerAbsolute):
00215     """ A controller for a linear actuator, without absolute encoder. """
00216     POSITION_L  = 100
00217     POSITION_H  = 101
00218     DIRECTION   = 102
00219 
00220     def __init__(self, device, name):
00221         Controller.__init__(self, device, name)
00222         self.pause = True
00223 
00224         self.a = rospy.get_param('~controllers/'+name+'/motor_a',29)
00225         self.b = rospy.get_param('~controllers/'+name+'/motor_b',30)
00226         self.p = rospy.get_param('~controllers/'+name+'/motor_pwm',31)
00227         self.last = 0
00228         self.last_reading = 0
00229 
00230         self.delta = rospy.Duration(1.0/rospy.get_param('~controllers/'+name+'/rate', 10.0))
00231         self.next = rospy.Time.now() + self.delta
00232 
00233         self.joint = device.joints[rospy.get_param('~controllers/'+name+'/joint')]
00234 
00235         rospy.Service(name+'/zero', Empty, self.zeroCb)
00236         rospy.loginfo("Started LinearControllerIncremental ("+self.name+").")
00237 
00238     def startup(self):
00239         if not self.fake:
00240             self.zeroEncoder()
00241 
00242     def setSpeed(self, speed):
00243         """ Set speed of actuator. We need to set direction for encoder. """
00244         if speed > 0:
00245             self.device.write(253, self.DIRECTION, [1])
00246             self.device.setDigital(self.a, 0); self.device.setDigital(self.b, 1);   # up
00247             self.device.setDigital(self.p, 1)
00248         elif speed < 0:
00249             self.device.write(253, self.DIRECTION, [0])
00250             self.device.setDigital(self.a, 1); self.device.setDigital(self.b, 0);   # down
00251             self.device.setDigital(self.p, 1)
00252         else:
00253             self.device.setDigital(self.p, 0)
00254 
00255     def getPosition(self):
00256         return unpack('=h', "".join([chr(k) for k in self.device.read(253, self.POSITION_L, 2)]) )[0]
00257 
00258     def zeroEncoder(self, timeout=15.0):
00259         rospy.loginfo(self.name + ': zeroing encoder')
00260         self.setSpeed(1)
00261         last_pos = None
00262         for i in range(int(timeout)):
00263             if rospy.is_shutdown():
00264                 return
00265             try:
00266                 new_pos = self.getPosition()
00267             except:
00268                 pass
00269             if last_pos == new_pos:
00270                 break
00271             last_pos = new_pos
00272             rospy.sleep(1)
00273         self.setSpeed(0)
00274         self.device.write(253, self.POSITION_L, [0, 0])
00275         self.joint.setCurrentFeedback(0)
00276 
00277     def zeroCb(self, msg):
00278         if not self.fake:
00279             self.zeroEncoder(15.0)
00280         return EmptyResponse()
00281 
00282     def shutdown(self):
00283         if not self.fake:
00284             self.setSpeed(0)
00285 


arbotix_python
Author(s): Michael Ferguson
autogenerated on Sat Jun 8 2019 19:34:55