00001 import curses
00002 import math
00003
00004 import rospy
00005 from geometry_msgs.msg import Twist
00006
00007 class Velocity(object):
00008
00009 def __init__(self, min_velocity, max_velocity, num_steps):
00010 assert min_velocity > 0 and max_velocity > 0 and num_steps > 0
00011 self._min = min_velocity
00012 self._max = max_velocity
00013 self._num_steps = num_steps
00014 if self._num_steps > 1:
00015 self._step_incr = (max_velocity - min_velocity) / (self._num_steps - 1)
00016 else:
00017
00018 self._step_incr = 0
00019
00020 def __call__(self, value, step):
00021 """
00022 Takes a value in the range [0, 1] and the step and returns the
00023 velocity (usually m/s or rad/s).
00024 """
00025 if step == 0:
00026 return 0
00027
00028 assert step > 0 and step <= self._num_steps
00029 max_value = self._min + self._step_incr * (step - 1)
00030 return value * max_value
00031
00032 class TextWindow():
00033
00034 _screen = None
00035 _window = None
00036 _num_lines = None
00037
00038 def __init__(self, stdscr, lines=10):
00039 self._screen = stdscr
00040 self._screen.nodelay(True)
00041 curses.curs_set(0)
00042
00043 self._num_lines = lines
00044
00045 def read_key(self):
00046 keycode = self._screen.getch()
00047 return keycode if keycode != -1 else None
00048
00049 def clear(self):
00050 self._screen.clear()
00051
00052 def write_line(self, lineno, message):
00053 if lineno < 0 or lineno >= self._num_lines:
00054 raise ValueError, 'lineno out of bounds'
00055 height, width = self._screen.getmaxyx()
00056 y = (height / self._num_lines) * lineno
00057 x = 10
00058 for text in message.split('\n'):
00059 text = text.ljust(width)
00060 self._screen.addstr(y, x, text)
00061 y += 1
00062
00063 def refresh(self):
00064 self._screen.refresh()
00065
00066 def beep(self):
00067 curses.flash()
00068
00069 class KeyTeleop():
00070
00071 _interface = None
00072
00073 _linear = None
00074 _angular = None
00075
00076 def __init__(self, interface):
00077 self._interface = interface
00078 self._pub_cmd = rospy.Publisher('key_vel', Twist)
00079
00080 self._hz = rospy.get_param('~hz', 10)
00081
00082 self._num_steps = rospy.get_param('~turbo/steps', 4)
00083
00084 forward_min = rospy.get_param('~turbo/linear_forward_min', 0.5)
00085 forward_max = rospy.get_param('~turbo/linear_forward_max', 1.0)
00086 self._forward = Velocity(forward_min, forward_max, self._num_steps)
00087
00088 backward_min = rospy.get_param('~turbo/linear_backward_min', 0.25)
00089 backward_max = rospy.get_param('~turbo/linear_backward_max', 0.5)
00090 self._backward = Velocity(backward_min, backward_max, self._num_steps)
00091
00092 angular_min = rospy.get_param('~turbo/angular_min', 0.7)
00093 angular_max = rospy.get_param('~turbo/angular_max', 1.2)
00094 self._rotation = Velocity(angular_min, angular_max, self._num_steps)
00095
00096 def run(self):
00097 self._linear = 0
00098 self._angular = 0
00099
00100 rate = rospy.Rate(self._hz)
00101 while True:
00102 keycode = self._interface.read_key()
00103 if keycode:
00104 if self._key_pressed(keycode):
00105 self._publish()
00106 else:
00107 self._publish()
00108 rate.sleep()
00109
00110 def _get_twist(self, linear, angular):
00111 twist = Twist()
00112 if linear >= 0:
00113 twist.linear.x = self._forward(1.0, linear)
00114 else:
00115 twist.linear.x = self._backward(-1.0, -linear)
00116 twist.angular.z = self._rotation(math.copysign(1, angular), abs(angular))
00117 return twist
00118
00119 def _key_pressed(self, keycode):
00120 movement_bindings = {
00121 curses.KEY_UP: ( 1, 0),
00122 curses.KEY_DOWN: (-1, 0),
00123 curses.KEY_LEFT: ( 0, 1),
00124 curses.KEY_RIGHT: ( 0, -1),
00125 }
00126 speed_bindings = {
00127 ord(' '): (0, 0),
00128 }
00129 if keycode in movement_bindings:
00130 acc = movement_bindings[keycode]
00131 ok = False
00132 if acc[0]:
00133 linear = self._linear + acc[0]
00134 if abs(linear) <= self._num_steps:
00135 self._linear = linear
00136 ok = True
00137 if acc[1]:
00138 angular = self._angular + acc[1]
00139 if abs(angular) <= self._num_steps:
00140 self._angular = angular
00141 ok = True
00142 if not ok:
00143 self._interface.beep()
00144 elif keycode in speed_bindings:
00145 acc = speed_bindings[keycode]
00146
00147 if acc[0] is not None:
00148 self._linear = acc[0]
00149 if acc[1] is not None:
00150 self._angular = acc[1]
00151
00152 elif keycode == ord('q'):
00153 rospy.signal_shutdown('Bye')
00154 else:
00155 return False
00156
00157 return True
00158
00159 def _publish(self):
00160 self._interface.clear()
00161 self._interface.write_line(2, 'Linear: %d, Angular: %d' % (self._linear, self._angular))
00162 self._interface.write_line(5, 'Use arrow keys to move, space to stop, q to exit.')
00163 self._interface.refresh()
00164
00165 twist = self._get_twist(self._linear, self._angular)
00166 self._pub_cmd.publish(twist)
00167
00168
00169 class SimpleKeyTeleop():
00170 def __init__(self, interface):
00171 self._interface = interface
00172 self._pub_cmd = rospy.Publisher('key_vel', Twist)
00173
00174 self._hz = rospy.get_param('~hz', 10)
00175
00176 self._forward_rate = rospy.get_param('~forward_rate', 0.8)
00177 self._backward_rate = rospy.get_param('~backward_rate', 0.5)
00178 self._rotation_rate = rospy.get_param('~rotation_rate', 1.0)
00179 self._last_pressed = {}
00180 self._angular = 0
00181 self._linear = 0
00182
00183 movement_bindings = {
00184 curses.KEY_UP: ( 1, 0),
00185 curses.KEY_DOWN: (-1, 0),
00186 curses.KEY_LEFT: ( 0, 1),
00187 curses.KEY_RIGHT: ( 0, -1),
00188 }
00189
00190 def run(self):
00191 rate = rospy.Rate(self._hz)
00192 self._running = True
00193 while self._running:
00194 while True:
00195 keycode = self._interface.read_key()
00196 if keycode is None:
00197 break
00198 self._key_pressed(keycode)
00199 self._set_velocity()
00200 self._publish()
00201 rate.sleep()
00202
00203 def _get_twist(self, linear, angular):
00204 twist = Twist()
00205 twist.linear.x = linear
00206 twist.angular.z = angular
00207 return twist
00208
00209 def _set_velocity(self):
00210 now = rospy.get_time()
00211 keys = []
00212 for a in self._last_pressed:
00213 if now - self._last_pressed[a] < 0.4:
00214 keys.append(a)
00215 linear = 0.0
00216 angular = 0.0
00217 for k in keys:
00218 l, a = self.movement_bindings[k]
00219 linear += l
00220 angular += a
00221 if linear > 0:
00222 linear = linear * self._forward_rate
00223 else:
00224 linear = linear * self._backward_rate
00225 angular = angular * self._rotation_rate
00226 self._angular = angular
00227 self._linear = linear
00228
00229 def _key_pressed(self, keycode):
00230 if keycode == ord('q'):
00231 self._running = False
00232 rospy.signal_shutdown('Bye')
00233 elif keycode in self.movement_bindings:
00234 self._last_pressed[keycode] = rospy.get_time()
00235
00236 def _publish(self):
00237 self._interface.clear()
00238 self._interface.write_line(2, 'Linear: %f, Angular: %f' % (self._linear, self._angular))
00239 self._interface.write_line(5, 'Use arrow keys to move, q to exit.')
00240 self._interface.refresh()
00241
00242 twist = self._get_twist(self._linear, self._angular)
00243 self._pub_cmd.publish(twist)
00244
00245
00246 def main(stdscr):
00247 rospy.init_node('key_teleop')
00248 app = SimpleKeyTeleop(TextWindow(stdscr))
00249 app.run()
00250
00251 if __name__ == '__main__':
00252 try:
00253 curses.wrapper(main)
00254 except rospy.ROSInterruptException:
00255 pass