DockDriveControl.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2012, Yujin Robot
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions
10 # are met:
11 #
12 # * Redistributions of source code must retain the above copyright
13 # notice, this list of conditions and the following disclaimer.
14 # * Redistributions in binary form must reproduce the above
15 # copyright notice, this list of conditions and the following
16 # disclaimer in the documentation and/or other materials provided
17 # with the distribution.
18 # * Neither the name of the Yujin Robot nor the names of its
19 # contributors may be used to endorse or promote products derived
20 # from this software without specific prior written permission.
21 #
22 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
32 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 # POSSIBILITY OF SUCH DAMAGE.
34 
35 # Author: Younghun Ju <yhju@yujinrobot.com> <yhju83@gmail.com>
36 
37 import roslib; roslib.load_manifest('kobuki_auto_docking')
38 import rospy
39 import sys, select, termios, tty, os
40 import time
41 from datetime import datetime
42 import types
43 import commands
44 
45 from std_msgs.msg import String, Empty
46 from geometry_msgs.msg import Twist
47 from kobuki_msgs.msg import DigitalOutput, Led, Sound, SensorState, DockInfraRed, ExternalPower, MotorPower
48 
49 
50 
51 colorBindings = {
52  Led.GREEN:Led.ORANGE,
53  Led.ORANGE:Led.RED,
54  Led.RED:Led.BLACK,
55  Led.BLACK:Led.GREEN,
56 }
57 
58 textBindings = {
59  Led.GREEN:' Green',
60  Led.ORANGE:'Orange',
61  Led.RED:' Red',
62  Led.BLACK:' Black',
63 }
64 
65 
66 settings = termios.tcgetattr(sys.stdin.fileno())
67 
68 class Controller(object):
69 
70  def __init__(self):
71  #rospy initial setup
72  rospy.init_node("dock_drive_control")
73  rospy.on_shutdown(self.clearing)
74  rate = rospy.Rate(10)
75  self.message = "Idle"
76  self.publish_cmd_vel=False
77  self.cmd_vel=Twist()
78  self.sensors = SensorState()
79  self.dock_ir = DockInfraRed()
80 
81  self.bat_name = self.getBatteryName()
82  self.state = "N/A"
83  self.percentage = 0.0
84 
85  self.pub = {
86 # 'enable':rospy.Publisher('/enable', String),
87 # 'disable':rospy.Publisher('/disable', String),
88  'motor_power':rospy.Publisher('/mobile_base/commands/motor_power',MotorPower),
89 # 'do_dock':rospy.Publisher('/dock_drive/commands/do_dock', Empty),
90 # 'cancel_dock':rospy.Publisher('/dock_drive/commands/cancel_dock', Empty),
91  'debug':rospy.Publisher('/dock_drive/debug/mode_shift', String),
92  'external_power':rospy.Publisher('/mobile_base/commands/external_power',ExternalPower),
93  'digital_output':rospy.Publisher('/mobile_base/commands/digital_output',DigitalOutput),
94  'led1':rospy.Publisher('/mobile_base/commands/led1',Led),
95  'led2':rospy.Publisher('/mobile_base/commands/led2',Led),
96  'sound':rospy.Publisher('/mobile_base/commands/sound',Sound),
97  'cmd_vel':rospy.Publisher('/mobile_base/commands/velocity',Twist),
98  }
99  self.sub = {
100  'core':rospy.Subscriber('/mobile_base/sensors/core', SensorState, self.sensorsCallback),
101  'dock_ir':rospy.Subscriber('/mobile_base/sensors/dock_ir', DockInfraRed, self.dockIRCallback),
102  }
103  self.keyBindings = {
104  '1':(self.pub['debug'].publish, String('enable') ,'enable'),
105  '2':(self.pub['debug'].publish, String('run') ,'run'),
106  '3':(self.pub['debug'].publish, String('stop') ,'stop'),
107  '4':(self.pub['debug'].publish, String('disable'),'disable'),
108  '5':5,
109  '6':6,
110  '7':7,
111  '8':'eight',
112  '9':'nine',
113  '0':'null',
114 # 'e':(self.pub['motor_power'].publish,MotorPower(MotorPower.ON),'enabled'),
115 # 'r':(self.pub['motor_power'].publish,MotorPower(MotorPower.OFF),'disabled'),
116  'e':(self.toggleMotor,True,'enabled'),
117  'r':(self.toggleMotor,False,'disabled'),
118  ' ':(self.resetVel,'','resetted'),
119  'a':(self.pub['sound'].publish,Sound(Sound.ON),'sound.on'),
120  's':(self.pub['sound'].publish,Sound(Sound.OFF),'sound.off'),
121  'd':(self.pub['sound'].publish,Sound(Sound.RECHARGE),'sound.recharge'),
122  'f':(self.pub['sound'].publish,Sound(Sound.BUTTON),'sound.button'),
123  'z':(self.pub['sound'].publish,Sound(Sound.ERROR),'sound.error'),
124  'x':(self.pub['sound'].publish,Sound(Sound.CLEANINGSTART),'sound.cleaningstart'),
125  'c':(self.pub['sound'].publish,Sound(Sound.CLEANINGEND),'sound.cleaningend'),
126  'q':(rospy.signal_shutdown,'user reuest','quit'),
127  'Q':(rospy.signal_shutdown,'user reuest','quit'),
128  }
129  rospy.Timer(rospy.Duration(0.1), self.keyopCallback)
130  if len(self.bat_name) > 0:
131  rospy.Timer(rospy.Duration(1.0), self.batteryCallback)
132  rospy.Timer(rospy.Duration(1.0), self.stateCallback) # to check status of rostopics
133  self.printFront()
134  '''
135  # initial values
136  external_power = DigitalOutput()
137  external_power.values = [ True, True, True, True ]
138  external_power.mask = [ True, True, True, True ]
139 
140  digital_output = DigitalOutput()
141  digital_output.values = [ True, True, True, True ]
142  digital_output.mask = [ True, True, True, True ]
143 
144  leds = []
145  leds.append(Led())
146  leds.append(Led())
147  leds[0].value = Led.GREEN
148  leds[1].value = Led.GREEN
149 
150  # initialize outputs
151  while not pub_ext_pwr.get_num_connections():
152  rate.sleep()
153  pub_ext_pwr.publish(external_power)
154 
155  while not pub_dgt_out.get_num_connections():
156  rate.sleep()
157  pub_dgt_out.publish(digital_output)
158 
159  while not pub[0].get_num_connections():
160  rate.sleep()
161  pub[0].publish(leds[0])
162 
163  while not pub[1].get_num_connections():
164  rate.sleep()
165  pub[1].publish(leds[1])
166  '''
167 
168  def spin(self):
169  while not rospy.is_shutdown():
170  self.printStatus()
171  self.processKeys()
172 
173  def getKey(self):
174  fd = sys.stdin.fileno()
175  new = termios.tcgetattr(fd)
176  new[3] = new[3] & ~termios.ICANON & ~termios.ECHO
177  new[6][termios.VMIN] = 0
178  new[6][termios.VTIME] = 1
179  termios.tcsetattr(fd, termios.TCSANOW, new)
180  return sys.stdin.read(1)
181  #return os.read(fd,1) #equivalent
182  #return key
183 
184  def processKeys(self):
185  key = self.getKey()
186  if key == '':
187  #self.message = "non." #debug
188  return
189  if ord(key) == 27:
190  akey = self.getKey()
191  if len(akey) and ord(akey)==91:
192  key = self.getKey()
193  if self.publish_cmd_vel:
194  if key == 'A': self.message = 'Up Arrow' ; self.cmd_vel.linear.x += 0.05
195  if key == 'B': self.message = 'Down Arrow' ; self.cmd_vel.linear.x -= 0.05
196  if key == 'C': self.message = 'Right Arrow' ; self.cmd_vel.angular.z -= 0.33
197  if key == 'D': self.message = 'Left Arrow' ; self.cmd_vel.angular.z += 0.33
198  return
199  if key in self.keyBindings.keys():
200  if type(self.keyBindings[key]) is types.TupleType:
201  f = self.keyBindings[key][0]
202  v = self.keyBindings[key][1]
203  m = self.keyBindings[key][2]
204  if type(f) is types.InstanceType or type(f) is types.MethodType or type(f) is types.FunctionType : f(v)
205  else:
206  print type(f)
207  self.message = str(m)
208  else:
209  self.message = str(self.keyBindings[key])
210  else:
211  self.message = "unknown key: " + str(len(key)) + ": " + str(ord(key))
212  return
213  '''
214  elif key in keyBindings1.keys():
215  external_power.values[keyBindings1[key]] ^= True
216  printStatus(external_power.values, digital_output.values, leds)
217  pub_ext_pwr.publish(external_power)
218 
219  elif key in keyBindings3.keys():
220  digital_output.values[keyBindings3[key]] ^= True
221  printStatus(external_power.values, digital_output.values, leds)
222  pub_dgt_out.publish(digital_output)
223 
224  elif key in keyBindings2.keys():
225  leds[keyBindings2[key]].value = colorBindings[ leds[keyBindings2[key]].value ]
226  printStatus(external_power.values, digital_output.values, leds)
227  pub[keyBindings2[key]].publish(leds[keyBindings2[key]])
228  elif key in keyBindings4.keys():
229  pub_sounds.publish(keyBindings4[key])
230  #rate.sleep()
231  '''
232 
233  def clearing(self):
234  sys.stdout.write('\n\r')
235  sys.stdout.flush()
236  termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
237 
238  def toggleMotor(self, on_off):
239  if on_off:
240  self.pub['motor_power'].publish(MotorPower(MotorPower.ON))
241  self.publish_cmd_vel = True
242  else:
243  self.pub['motor_power'].publish(MotorPower(MotorPower.OFF))
244  self.publish_cmd_vel = False
245  self.cmd_vel = Twist()
246 
247  def keyopCallback(self, event):
248  if not rospy.is_shutdown():
249  if self.publish_cmd_vel:
250  self.pub['cmd_vel'].publish(self.cmd_vel)
251 
252  def sensorsCallback(self, data):
253  if not rospy.is_shutdown():
254  self.sensors = data
255 
256  def dockIRCallback(self, data):
257  if not rospy.is_shutdown():
258  self.dock_ir = data
259 
260  def batteryCallback(self, event):
261  rem = float(commands.getoutput("grep \"^remaining capacity\" /proc/acpi/battery/" + self.bat_name + "/state | awk '{ print $3 }'"))
262  full = float(commands.getoutput("grep \"^last full capacity\" /proc/acpi/battery/" + self.bat_name + "/info | awk '{ print $4 }'"))
263  self.state = commands.getoutput("grep \"^charging state\" /proc/acpi/battery/" + self.bat_name + "/state | awk '{ print $3 }'")
264  self.percentage = float((rem/full) * 100.)
265 
266  def stateCallback(self, event):
267  # do check
268  num = self.sub['core'].get_num_connections()
269  if num == 0:
270  self.messages = 'core is disconnected({0:d})'.format(num)
271  else:
272  self.messages = 'core is connected({0:d})'.format(num)
273  # do more handling and managing
274  # get_num_connections are not reliable yet.
275 
276  def resetVel(self, x):
277  self.cmd_vel = Twist()
278 
279  def printFront(self):
280  # statement
281  print ""
282  print " Dock Drive Controller"
283  print "----------------------"
284  print "1: do_dock"
285  print "2: run"
286  print "3: stop"
287  print "4: cancel_dock"
288  print ""
289  print "q: quit"
290  print ""
291  #print " 3.3V 5.0V 12V5A 12V1A | Led #1 Led #2 | DO_0 DO_1 DO_2 DO_3"
292  #print " [ On] [Off] [ On] [Off] | [Orange] [Orange] | [ On] [Off] [Off] [ On]"
293 
294  def printStatus(self):
295  sys.stdout.write(' \r')
296  sys.stdout.write('[ \033[1m' + self.message + '\033[0m ]')
297 
298  if len(self.bat_name) > 0:
299  sys.stdout.write('[Laptop: ' + "{0:2.2f}".format(self.percentage) + '% - ' + self.state + ']')
300 
301  src_str = ''
302  if self.sensors.charger:
303  src_str = '(adaptor)' if self.sensors.charger&16 else '(dock)'
304 
305  chg = self.sensors.charger&6
306  if chg==0: chg_str = 'discharging'
307  elif chg==2: chg_str = 'fully charged'
308  else: chg_str = 'charging'
309  sys.stdout.write('[Robot: ' + "{0:2.1f}".format(self.sensors.battery/10.) + 'V - ' + chg_str + src_str + ']')
310  '''
311  for idx in range(0,4):
312  if ext_values[idx]:
313  sys.stdout.write('[ \033[1mOn\033[0m] ')
314  else:
315  sys.stdout.write('[Off] ')
316  sys.stdout.write('| [\033[1m'+textBindings[leds[0].value]+'\033[0m] ')
317  sys.stdout.write('[\033[1m'+textBindings[leds[1].value]+'\033[0m] | ')
318 
319  for idx in range(0,4):
320  if dgt_values[idx]:
321  sys.stdout.write('[ \033[1mOn\033[0m] ')
322  else:
323  sys.stdout.write('[Off] ')
324  '''
325  sys.stdout.write('\r')
326  sys.stdout.flush()
327 
328  def getBatteryName(self):
329  PATH = '/proc/acpi/battery'
330  bat_name = ''
331  if os.path.exists(PATH):
332  bat = os.walk(PATH).next()
333  if len(bat[1]) > 0:
334  bat_name = bat[1][0]
335  return bat_name
336 
337 
338 if __name__ == '__main__':
339  try:
340  instance = Controller()
341  instance.spin()
342  except rospy.ROSInterruptException: pass
def batteryCallback(self, event)


kobuki_auto_docking
Author(s): Younghun Ju
autogenerated on Mon Jun 10 2019 13:44:57