linear_controller.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 """
4  linear_controller.py - controller for a linear actuator with analog feedback
5  Copyright (c) 2011 Vanadium Labs LLC. All right reserved.
6 
7  Redistribution and use in source and binary forms, with or without
8  modification, are permitted provided that the following conditions are met:
9  * Redistributions of source code must retain the above copyright
10  notice, this list of conditions and the following disclaimer.
11  * Redistributions in binary form must reproduce the above copyright
12  notice, this list of conditions and the following disclaimer in the
13  documentation and/or other materials provided with the distribution.
14  * Neither the name of Vanadium Labs LLC nor the names of its
15  contributors may be used to endorse or promote products derived
16  from this software without specific prior written permission.
17 
18  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21  DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT,
22  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
24  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
26  OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27  ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 """
29 
30 import rospy, actionlib
31 
32 from joints import *
33 from controllers import *
34 from std_msgs.msg import Float64
35 from diagnostic_msgs.msg import *
36 from std_srvs.srv import *
37 
38 from struct import unpack
39 
41  def __init__(self, device, name):
42  Joint.__init__(self, device, name)
43 
44  self.dirty = False
45  self.position = 0.0 # current position, as returned by feedback (meters)
46  self.desired = 0.0 # desired position (meters)
47  self.velocity = 0.0 # moving speed
48  self.last = rospy.Time.now()
49 
50  # TODO: load these from URDF
51  self.min = rospy.get_param('~joints/'+name+'/min_position',0.0)
52  self.max = rospy.get_param('~joints/'+name+'/max_position',0.5)
53  self.max_speed = rospy.get_param('~joints/'+name+'/max_speed',0.0508)
54 
55  # calibration data {reading: position}
56  self.cal = { -1: -1, 1: 1 }
57  self.cal_raw = rospy.get_param('~joints/'+name+'/calibration_data', self.cal)
58  self.cal = dict()
59  for key, value in self.cal_raw.items():
60  self.cal[int(key)] = value
61  self.keys = sorted(self.cal.keys())
62 
63  rospy.Subscriber(name+'/command', Float64, self.commandCb)
64 
65  def interpolate(self, frame):
66  """ Get new output: 1 = increase position, -1 is decrease position. """
67  if self.dirty:
68  cmd = self.desired - self.position
69  if self.device.fake:
70  self.position = self.desired
71  self.dirty = False
72  return None
73  if cmd > 0.01:
74  return 1
75  elif cmd < -0.01:
76  return -1
77  else:
78  self.dirty = False
79  return 0
80  else:
81  return None
82 
83  def setCurrentFeedback(self, reading):
84  if reading >= self.keys[0] and reading <= self.keys[-1]:
85  last_angle = self.position
86  self.position = self.readingToPosition(reading)
87  # update velocity estimate
88  t = rospy.Time.now()
89  self.velocity = (self.position - last_angle)/((t - self.last).to_nsec()/1000000000.0)
90  self.last = t
91  else:
92  rospy.logerr(self.name + ": feedback reading out of range")
93 
94  def setControlOutput(self, position):
95  """ Set the position that controller is moving to.
96  Returns output value in raw_data format. """
97  if position <= self.max and position >= self.min:
98  self.desired = position
99  self.dirty = True
100  else:
101  rospy.logerr(self.name + ": requested position is out of range: " + str(position))
102  return None # TODO
103 
104  def getDiagnostics(self):
105  """ Get a diagnostics status. """
106  msg = DiagnosticStatus()
107  msg.name = self.name
108  msg.level = DiagnosticStatus.OK
109  if self.dirty:
110  msg.message = "Moving"
111  else:
112  msg.message = "OK"
113  msg.values.append(KeyValue("Position", str(self.position)))
114  return msg
115 
116  def commandCb(self, req):
117  """ Float64 style command input. """
118  if self.device.fake:
119  self.position = req.data
120  else:
121  if req.data <= self.max and req.data >= self.min:
122  self.desired = req.data
123  self.dirty = True
124  else:
125  rospy.logerr(self.name + ": requested position is out of range: " + str(req))
126 
127  def readingToPosition(self, reading):
128  low = 0
129  while reading > self.keys[low+1]:
130  low += 1
131  high = len(self.keys) - 1
132  while reading < self.keys[high-1]:
133  high += -1
134  x = self.keys[high] - self.keys[low]
135  y = self.cal[self.keys[high]] - self.cal[self.keys[low]]
136  x1 = reading - self.keys[low]
137  y1 = y * ( float(x1)/float(x) )
138  return self.cal[self.keys[low]] + y1
139 
140 
142  """ A controller for a linear actuator, with absolute positional feedback. """
143 
144  def __init__(self, device, name):
145  Controller.__init__(self, device, name)
146 
147  self.a = rospy.get_param('~controllers/'+name+'/motor_a',29)
148  self.b = rospy.get_param('~controllers/'+name+'/motor_b',30)
149  self.p = rospy.get_param('~controllers/'+name+'/motor_pwm',31)
150  self.analog = rospy.get_param('~controllers/'+name+'/feedback',0)
151  self.last = 0
152  self.last_reading = 0
153 
154  self.delta = rospy.Duration(1.0/rospy.get_param('~controllers/'+name+'/rate', 10.0))
155  self.next = rospy.Time.now() + self.delta
156 
157  self.joint = device.joints[rospy.get_param('~controllers/'+name+'/joint')]
158 
159  rospy.loginfo("Started LinearController ("+self.name+").")
160 
161  def startup(self):
162  if not self.fake:
163  self.joint.setCurrentFeedback(self.device.getAnalog(self.analog))
164 
165  def update(self):
166  now = rospy.Time.now()
167  if now > self.next:
168  # read current position
169  if self.joint.dirty:
170  if not self.fake:
171  try:
172  self.last_reading = self.getPosition()
173  self.joint.setCurrentFeedback(self.last_reading)
174  except Exception as e:
175  print "linear error: ", e
176  # update movement
177  output = self.joint.interpolate(1.0/self.delta.to_sec())
178  if self.last != output and not self.fake:
179  self.setSpeed(output)
180  self.last = output
181  self.next = now + self.delta
182 
183  def setSpeed(self, speed):
184  """ Set speed of actuator. """
185  if speed > 0:
186  self.device.setDigital(self.a, 0); self.device.setDigital(self.b, 1); # up
187  self.device.setDigital(self.p, 1)
188  elif speed < 0:
189  self.device.setDigital(self.a, 1); self.device.setDigital(self.b, 0); # down
190  self.device.setDigital(self.p, 1)
191  else:
192  self.device.setDigital(self.p, 0)
193 
194  def getPosition(self):
195  return self.device.getAnalog(self.analog)
196 
197  def shutdown(self):
198  if not self.fake:
199  self.device.setDigital(self.p, 0)
200 
201  def getDiagnostics(self):
202  """ Get a diagnostics status. """
203  msg = DiagnosticStatus()
204  msg.name = self.name
205 
206  msg.level = DiagnosticStatus.OK
207  msg.message = "OK"
208  if not self.fake:
209  msg.values.append(KeyValue("Encoder Reading", str(self.last_reading)))
210 
211  return msg
212 
213 
215  """ A controller for a linear actuator, without absolute encoder. """
216  POSITION_L = 100
217  POSITION_H = 101
218  DIRECTION = 102
219 
220  def __init__(self, device, name):
221  Controller.__init__(self, device, name)
222  self.pause = True
223 
224  self.a = rospy.get_param('~controllers/'+name+'/motor_a',29)
225  self.b = rospy.get_param('~controllers/'+name+'/motor_b',30)
226  self.p = rospy.get_param('~controllers/'+name+'/motor_pwm',31)
227  self.last = 0
228  self.last_reading = 0
229 
230  self.delta = rospy.Duration(1.0/rospy.get_param('~controllers/'+name+'/rate', 10.0))
231  self.next = rospy.Time.now() + self.delta
232 
233  self.joint = device.joints[rospy.get_param('~controllers/'+name+'/joint')]
234 
235  rospy.Service(name+'/zero', Empty, self.zeroCb)
236  rospy.loginfo("Started LinearControllerIncremental ("+self.name+").")
237 
238  def startup(self):
239  if not self.fake:
240  self.zeroEncoder()
241 
242  def setSpeed(self, speed):
243  """ Set speed of actuator. We need to set direction for encoder. """
244  if speed > 0:
245  self.device.write(253, self.DIRECTION, [1])
246  self.device.setDigital(self.a, 0); self.device.setDigital(self.b, 1); # up
247  self.device.setDigital(self.p, 1)
248  elif speed < 0:
249  self.device.write(253, self.DIRECTION, [0])
250  self.device.setDigital(self.a, 1); self.device.setDigital(self.b, 0); # down
251  self.device.setDigital(self.p, 1)
252  else:
253  self.device.setDigital(self.p, 0)
254 
255  def getPosition(self):
256  return unpack('=h', "".join([chr(k) for k in self.device.read(253, self.POSITION_L, 2)]) )[0]
257 
258  def zeroEncoder(self, timeout=15.0):
259  rospy.loginfo(self.name + ': zeroing encoder')
260  self.setSpeed(1)
261  last_pos = None
262  for i in range(int(timeout)):
263  if rospy.is_shutdown():
264  return
265  try:
266  new_pos = self.getPosition()
267  except:
268  pass
269  if last_pos == new_pos:
270  break
271  last_pos = new_pos
272  rospy.sleep(1)
273  self.setSpeed(0)
274  self.device.write(253, self.POSITION_L, [0, 0])
275  self.joint.setCurrentFeedback(0)
276 
277  def zeroCb(self, msg):
278  if not self.fake:
279  self.zeroEncoder(15.0)
280  return EmptyResponse()
281 
282  def shutdown(self):
283  if not self.fake:
284  self.setSpeed(0)
285 
Joints hold current values.
Definition: joints.py:32
Controllers interact with ArbotiX hardware.
Definition: controllers.py:32


arbotix_python
Author(s): Michael Ferguson
autogenerated on Mon Feb 28 2022 21:37:42