servo_controller.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 """
4  servo_controller.py: classes for servo interaction
5  Copyright (c) 2011-2013 Vanadium Labs LLC. All right reserved.
6 
7  Redistribution and use in source and binary forms, with or without
8  modification, are permitted provided that the following conditions are met:
9  * Redistributions of source code must retain the above copyright
10  notice, this list of conditions and the following disclaimer.
11  * Redistributions in binary form must reproduce the above copyright
12  notice, this list of conditions and the following disclaimer in the
13  documentation and/or other materials provided with the distribution.
14  * Neither the name of Vanadium Labs LLC nor the names of its
15  contributors may be used to endorse or promote products derived
16  from this software without specific prior written permission.
17 
18  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21  DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT,
22  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
24  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
26  OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27  ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 """
29 
30 import rospy
31 
32 from math import radians
33 
34 from std_msgs.msg import Float64
35 from arbotix_msgs.srv import *
36 from diagnostic_msgs.msg import *
37 
38 from ax12 import *
39 from joints import *
40 
42 
43  def __init__(self, device, name, ns="~joints"):
44  Joint.__init__(self, device, name)
45  n = ns+"/"+name+"/"
46 
47  self.id = int(rospy.get_param(n+"id"))
48  self.ticks = rospy.get_param(n+"ticks", 1024)
49  self.neutral = rospy.get_param(n+"neutral", self.ticks/2)
50  if self.ticks == 4096:
51  self.range = 360.0
52  else:
53  self.range = 300.0
54  self.range = rospy.get_param(n+"range", self.range)
55  self.rad_per_tick = radians(self.range)/self.ticks
56 
57  # TODO: load these from URDF
58  self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0))
59  self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0))
60  self.max_speed = radians(rospy.get_param(n+"max_speed",684.0))
61  # max speed = 114 rpm - 684 deg/s
62  self.invert = rospy.get_param(n+"invert",False)
63  self.readable = rospy.get_param(n+"readable",True)
64 
65  self.status = "OK"
66  self.level = DiagnosticStatus.OK
67 
68  self.dirty = False # newly updated position?
69  self.position = 0.0 # current position, as returned by servo (radians)
70  self.desired = 0.0 # desired position (radians)
71  self.last_cmd = 0.0 # last position sent (radians)
72  self.velocity = 0.0 # moving speed
73  self.enabled = True # can we take commands?
74  self.active = False # are we under torque control?
75  self.last = rospy.Time.now()
76 
77  self.reads = 0.0 # number of reads
78  self.errors = 0 # number of failed reads
79  self.total_reads = 0.0
80  self.total_errors = [0.0]
81 
82  self.voltage = 0.0
83  self.temperature = 0.0
84 
85  # ROS interfaces
86  rospy.Subscriber(name+'/command', Float64, self.commandCb)
87  rospy.Service(name+'/relax', Relax, self.relaxCb)
88  rospy.Service(name+'/enable', Enable, self.enableCb)
89  rospy.Service(name+'/set_speed', SetSpeed, self.setSpeedCb)
90 
91  def interpolate(self, frame):
92  """ Get the new position to move to, in ticks. """
93  if self.enabled and self.active and self.dirty:
94  # compute command, limit velocity
95  cmd = self.desired - self.last_cmd
96  if cmd > self.max_speed/frame:
97  cmd = self.max_speed/frame
98  elif cmd < -self.max_speed/frame:
99  cmd = -self.max_speed/frame
100  # compute angle, apply limits
101  ticks = self.angleToTicks(self.last_cmd + cmd)
102  self.last_cmd = self.ticksToAngle(ticks)
103  self.speed = cmd*frame
104  # cap movement
105  if self.last_cmd == self.desired:
106  self.dirty = False
107  # when fake, need to set position/velocity here
108  if self.device.fake:
109  last_angle = self.position
110  self.position = self.last_cmd
111  t = rospy.Time.now()
112  self.velocity = (self.position - last_angle)/((t - self.last).to_nsec()/1000000000.0)
113  self.last = t
114  return None
115  return int(ticks)
116  else:
117  # when fake, need to reset velocity to 0 here.
118  if self.device.fake:
119  self.velocity = 0.0
120  self.last = rospy.Time.now()
121  return None
122 
123  def setCurrentFeedback(self, reading):
124  """ Update angle in radians by reading from servo, or by
125  using position passed in from a sync read (in ticks). """
126  if reading > -1 and reading < self.ticks: # check validity
127  self.reads += 1
128  self.total_reads += 1
129  last_angle = self.position
130  self.position = self.ticksToAngle(reading)
131  # update velocity estimate
132  t = rospy.Time.now()
133  self.velocity = (self.position - last_angle)/((t - self.last).to_nsec()/1000000000.0)
134  self.last = t
135  else:
136  rospy.logdebug("Invalid read of servo: id " + str(self.id) + ", value " + str(reading))
137  self.errors += 1
138  self.total_reads += 1
139  return
140  if not self.active:
141  self.last_cmd = self.position
142 
143  def setControlOutput(self, position):
144  """ Set the position that controller is moving to.
145  Returns output value in ticks. """
146  if self.enabled:
147  ticks = self.angleToTicks(position)
148  self.desired = position
149  self.dirty = True
150  self.active = True
151  return int(ticks)
152  return -1
153 
154  def getDiagnostics(self):
155  """ Get a diagnostics status. """
156  msg = DiagnosticStatus()
157  msg.name = self.name
158  if self.temperature > 60: # TODO: read this value from eeprom
159  self.status = "OVERHEATED, SHUTDOWN"
160  self.level = DiagnosticStatus.ERROR
161  elif self.temperature > 50 and self.status != "OVERHEATED, SHUTDOWN":
162  self.status = "OVERHEATING"
163  self.level = DiagnosticStatus.WARN
164  elif self.status != "OVERHEATED, SHUTDOWN":
165  self.status = "OK"
166  self.level = DiagnosticStatus.OK
167  msg.level = self.level
168  msg.message = self.status
169  msg.values.append(KeyValue("Position", str(self.position)))
170  msg.values.append(KeyValue("Temperature", str(self.temperature)))
171  msg.values.append(KeyValue("Voltage", str(self.voltage)))
172  if self.reads + self.errors > 100:
173  self.total_errors.append((self.errors*100.0)/(self.reads+self.errors))
174  if len(self.total_errors) > 10:
175  self.total_errors = self.total_errors[-10:]
176  self.reads = 0
177  self.errors = 0
178  msg.values.append(KeyValue("Reads", str(self.total_reads)))
179  msg.values.append(KeyValue("Error Rate", str(sum(self.total_errors)/len(self.total_errors))+"%" ))
180  if self.active:
181  msg.values.append(KeyValue("Torque", "ON"))
182  else:
183  msg.values.append(KeyValue("Torque", "OFF"))
184  return msg
185 
186  def angleToTicks(self, angle):
187  """ Convert an angle to ticks, applying limits. """
188  ticks = self.neutral + (angle/self.rad_per_tick)
189  if self.invert:
190  ticks = self.neutral - (angle/self.rad_per_tick)
191  if ticks >= self.ticks:
192  return self.ticks-1.0
193  if ticks < 0:
194  return 0
195  return ticks
196 
197  def ticksToAngle(self, ticks):
198  """ Convert an ticks to angle, applying limits. """
199  angle = (ticks - self.neutral) * self.rad_per_tick
200  if self.invert:
201  angle = -1.0 * angle
202  return angle
203 
204  def speedToTicks(self, rads_per_sec):
205  """ Convert speed in radians per second to ticks, applying limits. """
206  ticks = self.ticks * rads_per_sec / self.max_speed
207  if ticks >= self.ticks:
208  return self.ticks-1.0
209  if ticks < 0:
210  return 0
211  return ticks
212 
213  def enableCb(self, req):
214  """ Turn on/off servo torque, so that it is pose-able. """
215  if req.enable:
216  self.enabled = True
217  else:
218  if not self.device.fake:
219  self.device.disableTorque(self.id)
220  self.dirty = False
221  self.enabled = False
222  self.active = False
223  return EnableResponse(self.enabled)
224 
225  def relaxCb(self, req):
226  """ Turn off servo torque, so that it is pose-able. """
227  if not self.device.fake:
228  self.device.disableTorque(self.id)
229  self.dirty = False
230  self.active = False
231  return RelaxResponse()
232 
233  def commandCb(self, req):
234  """ Float64 style command input. """
235  if self.enabled:
236  if self.controller and self.controller.active():
237  # Under and action control, do not interfere
238  return
239  elif self.desired != req.data or not self.active:
240  self.dirty = True
241  self.active = True
242  self.desired = req.data
243 
244  def setSpeedCb(self, req):
245  """ Set servo speed. Requested speed is in radians per second.
246  Don't allow 0 which means "max speed" to a Dynamixel in joint mode. """
247  if not self.device.fake:
248  ticks_per_sec = max(1, int(self.speedToTicks(req.speed)))
249  self.device.setSpeed(self.id, ticks_per_sec)
250  return SetSpeedResponse()
251 
253 
254  def __init__(self, device, name, ns="~joints"):
255  Joint.__init__(self, device, name)
256  n = ns+"/"+name+"/"
257 
258  self.id = int(rospy.get_param(n+"id"))
259  self.ticks = rospy.get_param(n+"ticks", 2000)
260  self.neutral = rospy.get_param(n+"neutral", 1500)
261  self.range = rospy.get_param(n+"range", 180)
262  self.rad_per_tick = radians(self.range)/self.ticks
263 
264  # TODO: load these from URDF
265  self.max_angle = radians(rospy.get_param(n+"max_angle",self.range/2.0))
266  self.min_angle = radians(rospy.get_param(n+"min_angle",-self.range/2.0))
267  self.max_speed = radians(rospy.get_param(n+"max_speed",90.0))
268 
269  self.invert = rospy.get_param(n+"invert",False)
270 
271  self.dirty = False # newly updated position?
272  self.position = 0.0 # current position, as returned by servo (radians)
273  self.desired = 0.0 # desired position (radians)
274  self.last_cmd = 0.0 # last position sent (radians)
275  self.velocity = 0.0 # moving speed
276  self.last = rospy.Time.now()
277 
278  # ROS interfaces
279  rospy.Subscriber(name+'/command', Float64, self.commandCb)
280 
281  def interpolate(self, frame):
282  """ Get the new position to move to, in ticks. """
283  if self.dirty:
284  # compute command, limit velocity
285  cmd = self.desired - self.last_cmd
286  if cmd > self.max_speed/frame:
287  cmd = self.max_speed/frame
288  elif cmd < -self.max_speed/frame:
289  cmd = -self.max_speed/frame
290  # compute angle, apply limits
291  ticks = self.angleToTicks(self.last_cmd + cmd)
292  self.last_cmd = self.ticksToAngle(ticks)
293  self.speed = cmd*frame
294  # cap movement
295  if self.last_cmd == self.desired:
296  self.dirty = False
297  if self.device.fake:
298  self.position = self.last_cmd
299  return None
300  return int(ticks)
301  else:
302  return None
303 
304  def setCurrentFeedback(self, raw_data):
305  """ Update angle in radians by reading from servo, or by
306  using position passed in from a sync read (in ticks). """
307  return None
308 
309  def setControlOutput(self, position):
310  """ Set the position that controller is moving to.
311  Returns output value in ticks. """
312  ticks = self.angleToTicks(position)
313  self.desired = position
314  self.dirty = True
315  return int(ticks)
316 
317  def getDiagnostics(self):
318  """ Get a diagnostics status. """
319  msg = DiagnosticStatus()
320  msg.name = self.name
321  msg.level = DiagnosticStatus.OK
322  msg.message = "OK"
323  msg.values.append(KeyValue("Position", str(self.position)))
324  return msg
325 
326  def angleToTicks(self, angle):
327  """ Convert an angle to ticks, applying limits. """
328  ticks = self.neutral + (angle/self.rad_per_tick)
329  if self.invert:
330  ticks = self.neutral - (angle/self.rad_per_tick)
331  if ticks >= self.ticks:
332  return self.ticks-1.0
333  if ticks < 0:
334  return 0
335  return ticks
336 
337  def ticksToAngle(self, ticks):
338  """ Convert an ticks to angle, applying limits. """
339  angle = (ticks - self.neutral) * self.rad_per_tick
340  if self.invert:
341  angle = -1.0 * angle
342  return angle
343 
344  def commandCb(self, req):
345  """ Float64 style command input. """
346  if self.controller and self.controller.active():
347  # Under and action control, do not interfere
348  return
349  else:
350  self.dirty = True
351  self.desired = req.data
352 
353 
354 from controllers import *
355 
357 
358  def __init__(self, device, name):
359  Controller.__init__(self, device, name)
360  self.dynamixels = list()
361  self.hobbyservos = list()
362  self.iter = 0
363 
364  # steal some servos
365  for joint in device.joints.values():
366  if isinstance(joint, DynamixelServo):
367  self.dynamixels.append(joint)
368  elif isinstance(joint, HobbyServo):
369  self.hobbyservos.append(joint)
370 
371  self.w_delta = rospy.Duration(1.0/rospy.get_param("~write_rate", 10.0))
372  self.w_next = rospy.Time.now() + self.w_delta
373 
374  self.r_delta = rospy.Duration(1.0/rospy.get_param("~read_rate", 10.0))
375  self.r_next = rospy.Time.now() + self.r_delta
376 
377  rospy.Service(name + '/relax_all', Relax, self.relaxCb)
378  rospy.Service(name + '/enable_all', Enable, self.enableCb)
379 
380  def update(self):
381  """ Read servo positions, update them. """
382  if rospy.Time.now() > self.r_next and not self.fake:
383  if self.device.use_sync_read:
384  # arbotix/servostik/wifi board sync_read
385  synclist = list()
386  for joint in self.dynamixels:
387  if joint.readable:
388  synclist.append(joint.id)
389  if len(synclist) > 0:
390  val = self.device.syncRead(synclist, P_PRESENT_POSITION_L, 2)
391  if val:
392  for joint in self.dynamixels:
393  try:
394  i = synclist.index(joint.id)*2
395  joint.setCurrentFeedback(val[i]+(val[i+1]<<8))
396  except:
397  # not a readable servo
398  continue
399  else:
400  # direct connection, or other hardware with no sync_read capability
401  for joint in self.dynamixels:
402  joint.setCurrentFeedback(self.device.getPosition(joint.id))
403  self.r_next = rospy.Time.now() + self.r_delta
404 
405  if rospy.Time.now() > self.w_next:
406  if self.device.use_sync_write and not self.fake:
407  syncpkt = list()
408  for joint in self.dynamixels:
409  v = joint.interpolate(1.0/self.w_delta.to_sec())
410  if v != None: # if was dirty
411  syncpkt.append([joint.id,int(v)%256,int(v)>>8])
412  if len(syncpkt) > 0:
413  self.device.syncWrite(P_GOAL_POSITION_L,syncpkt)
414  else:
415  for joint in self.dynamixels:
416  v = joint.interpolate(1.0/self.w_delta.to_sec())
417  if v != None: # if was dirty
418  self.device.setPosition(joint.id, int(v))
419  for joint in self.hobbyservos:
420  v = joint.interpolate(1.0/self.w_delta.to_sec())
421  if v != None: # if it was dirty
422  self.device.setServo(joint.id, v)
423  self.w_next = rospy.Time.now() + self.w_delta
424 
425  def getDiagnostics(self):
426  """ Update status of servos (voltages, temperatures). """
427  if self.iter % 5 == 0 and not self.fake:
428  if self.device.use_sync_read:
429  # arbotix/servostik/wifi board sync_read
430  synclist = list()
431  for joint in self.dynamixels:
432  if joint.readable:
433  synclist.append(joint.id)
434  if len(synclist) > 0:
435  val = self.device.syncRead(synclist, P_PRESENT_VOLTAGE, 2)
436  if val:
437  for joint in self.dynamixels:
438  try:
439  i = synclist.index(joint.id)*2
440  if val[i] < 250:
441  joint.voltage = val[i]/10.0
442  if val[i+1] < 100:
443  joint.temperature = val[i+1]
444  except:
445  # not a readable servo
446  continue
447  else:
448  # direct connection, or other hardware with no sync_read capability
449  for joint in self.dynamixels:
450  if joint.readable:
451  val = self.device.read(joint.id, P_PRESENT_VOLTAGE, 2)
452  try:
453  joint.voltage = val[0]
454  joint.temperature = val[1]
455  except:
456  continue
457  self.iter += 1
458  return None
459 
460  def enableCb(self, req):
461  """ Turn on/off all servos torque, so that they are pose-able. """
462  for joint in self.dynamixels:
463  resp = joint.enableCb(req)
464  return resp
465 
466  def relaxCb(self, req):
467  """ Turn off all servos torque, so that they are pose-able. """
468  for joint in self.dynamixels:
469  resp = joint.relaxCb(req)
470  return resp
Joints hold current values.
Definition: joints.py:32
def __init__(self, device, name, ns="~joints")
Controllers interact with ArbotiX hardware.
Definition: controllers.py:32
def __init__(self, device, name, ns="~joints")


arbotix_python
Author(s): Michael Ferguson
autogenerated on Mon Jun 10 2019 12:43:36