00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import curses
00011 import math
00012
00013 import rospy
00014 from geometry_msgs.msg import Twist
00015
00016 class Velocity(object):
00017
00018 def __init__(self, min_velocity, max_velocity, num_steps):
00019 assert min_velocity > 0 and max_velocity > 0 and num_steps > 0
00020 self._min = min_velocity
00021 self._max = max_velocity
00022 self._num_steps = num_steps
00023 if self._num_steps > 1:
00024 self._step_incr = (max_velocity - min_velocity) / (self._num_steps - 1)
00025 else:
00026
00027 self._step_incr = 0
00028
00029 def __call__(self, value, step):
00030 """
00031 Takes a value in the range [0, 1] and the step and returns the
00032 velocity (usually m/s or rad/s).
00033 """
00034 if step == 0:
00035 return 0
00036
00037 assert step > 0 and step <= self._num_steps
00038 max_value = self._min + self._step_incr * (step - 1)
00039 return value * max_value
00040
00041 class TextWindow():
00042
00043 _screen = None
00044 _window = None
00045 _num_lines = None
00046
00047 def __init__(self, stdscr, lines=10):
00048 self._screen = stdscr
00049 self._screen.nodelay(True)
00050 curses.curs_set(0)
00051
00052 self._num_lines = lines
00053
00054 def read_key(self):
00055 keycode = self._screen.getch()
00056 return keycode if keycode != -1 else None
00057
00058 def clear(self):
00059 self._screen.clear()
00060
00061 def write_line(self, lineno, message):
00062 if lineno < 0 or lineno >= self._num_lines:
00063 raise ValueError, 'lineno out of bounds'
00064 height, width = self._screen.getmaxyx()
00065 y = (height / self._num_lines) * lineno
00066 x = 10
00067 for text in message.split('\n'):
00068 text = text.ljust(width)
00069 self._screen.addstr(y, x, text)
00070 y += 1
00071
00072 def refresh(self):
00073 self._screen.refresh()
00074
00075 def beep(self):
00076 curses.flash()
00077
00078 class KeyTeleop():
00079
00080 _interface = None
00081
00082 _linear = None
00083 _angular = None
00084
00085 def __init__(self, interface):
00086 self._interface = interface
00087 self._pub_cmd = rospy.Publisher('key_vel', Twist)
00088
00089 self._hz = rospy.get_param('~hz', 10)
00090
00091 self._num_steps = rospy.get_param('~turbo/steps', 4)
00092
00093 forward_min = rospy.get_param('~turbo/linear_forward_min', 0.5)
00094 forward_max = rospy.get_param('~turbo/linear_forward_max', 1.0)
00095 self._forward = Velocity(forward_min, forward_max, self._num_steps)
00096
00097 backward_min = rospy.get_param('~turbo/linear_backward_min', 0.25)
00098 backward_max = rospy.get_param('~turbo/linear_backward_max', 0.5)
00099 self._backward = Velocity(backward_min, backward_max, self._num_steps)
00100
00101 angular_min = rospy.get_param('~turbo/angular_min', 0.7)
00102 angular_max = rospy.get_param('~turbo/angular_max', 1.2)
00103 self._rotation = Velocity(angular_min, angular_max, self._num_steps)
00104
00105 def run(self):
00106 self._linear = 0
00107 self._angular = 0
00108
00109 rate = rospy.Rate(self._hz)
00110 while True:
00111 keycode = self._interface.read_key()
00112 if keycode:
00113 if self._key_pressed(keycode):
00114 self._publish()
00115 else:
00116 self._publish()
00117 rate.sleep()
00118
00119 def _get_twist(self, linear, angular):
00120 twist = Twist()
00121 if linear >= 0:
00122 twist.linear.x = self._forward(1.0, linear)
00123 else:
00124 twist.linear.x = self._backward(-1.0, -linear)
00125 twist.angular.z = self._rotation(math.copysign(1, angular), abs(angular))
00126 return twist
00127
00128 def _key_pressed(self, keycode):
00129 movement_bindings = {
00130 curses.KEY_UP: ( 1, 0),
00131 curses.KEY_DOWN: (-1, 0),
00132 curses.KEY_LEFT: ( 0, 1),
00133 curses.KEY_RIGHT: ( 0, -1),
00134 }
00135 speed_bindings = {
00136 ord(' '): (0, 0),
00137 }
00138 if keycode in movement_bindings:
00139 acc = movement_bindings[keycode]
00140 ok = False
00141 if acc[0]:
00142 linear = self._linear + acc[0]
00143 if abs(linear) <= self._num_steps:
00144 self._linear = linear
00145 ok = True
00146 if acc[1]:
00147 angular = self._angular + acc[1]
00148 if abs(angular) <= self._num_steps:
00149 self._angular = angular
00150 ok = True
00151 if not ok:
00152 self._interface.beep()
00153 elif keycode in speed_bindings:
00154 acc = speed_bindings[keycode]
00155
00156 if acc[0] is not None:
00157 self._linear = acc[0]
00158 if acc[1] is not None:
00159 self._angular = acc[1]
00160
00161 elif keycode == ord('q'):
00162 rospy.signal_shutdown('Bye')
00163 else:
00164 return False
00165
00166 return True
00167
00168 def _publish(self):
00169 self._interface.clear()
00170 self._interface.write_line(2, 'Linear: %d, Angular: %d' % (self._linear, self._angular))
00171 self._interface.write_line(5, 'Use arrow keys to move, space to stop, q to exit.')
00172 self._interface.refresh()
00173
00174 twist = self._get_twist(self._linear, self._angular)
00175 self._pub_cmd.publish(twist)
00176
00177
00178 class SimpleKeyTeleop():
00179 def __init__(self, interface):
00180 self._interface = interface
00181 self._pub_cmd = rospy.Publisher('key_vel', Twist)
00182
00183 self._hz = rospy.get_param('~hz', 10)
00184
00185 self._forward_rate = rospy.get_param('~forward_rate', 0.8)
00186 self._backward_rate = rospy.get_param('~backward_rate', 0.5)
00187 self._rotation_rate = rospy.get_param('~rotation_rate', 1.0)
00188 self._last_pressed = {}
00189 self._angular = 0
00190 self._linear = 0
00191
00192 movement_bindings = {
00193 curses.KEY_UP: ( 1, 0),
00194 curses.KEY_DOWN: (-1, 0),
00195 curses.KEY_LEFT: ( 0, 1),
00196 curses.KEY_RIGHT: ( 0, -1),
00197 }
00198
00199 def run(self):
00200 rate = rospy.Rate(self._hz)
00201 self._running = True
00202 while self._running:
00203 while True:
00204 keycode = self._interface.read_key()
00205 if keycode is None:
00206 break
00207 self._key_pressed(keycode)
00208 self._set_velocity()
00209 self._publish()
00210 rate.sleep()
00211
00212 def _get_twist(self, linear, angular):
00213 twist = Twist()
00214 twist.linear.x = linear
00215 twist.angular.z = angular
00216 return twist
00217
00218 def _set_velocity(self):
00219 now = rospy.get_time()
00220 keys = []
00221 for a in self._last_pressed:
00222 if now - self._last_pressed[a] < 0.4:
00223 keys.append(a)
00224 linear = 0.0
00225 angular = 0.0
00226 for k in keys:
00227 l, a = self.movement_bindings[k]
00228 linear += l
00229 angular += a
00230 if linear > 0:
00231 linear = linear * self._forward_rate
00232 else:
00233 linear = linear * self._backward_rate
00234 angular = angular * self._rotation_rate
00235 self._angular = angular
00236 self._linear = linear
00237
00238 def _key_pressed(self, keycode):
00239 if keycode == ord('q'):
00240 self._running = False
00241 rospy.signal_shutdown('Bye')
00242 elif keycode in self.movement_bindings:
00243 self._last_pressed[keycode] = rospy.get_time()
00244
00245 def _publish(self):
00246 self._interface.clear()
00247 self._interface.write_line(2, 'Linear: %f, Angular: %f' % (self._linear, self._angular))
00248 self._interface.write_line(5, 'Use arrow keys to move, q to exit.')
00249 self._interface.refresh()
00250
00251 twist = self._get_twist(self._linear, self._angular)
00252 self._pub_cmd.publish(twist)
00253
00254
00255 def main(stdscr):
00256 rospy.init_node('key_teleop')
00257 app = SimpleKeyTeleop(TextWindow(stdscr))
00258 app.run()
00259
00260 if __name__ == '__main__':
00261 try:
00262 curses.wrapper(main)
00263 except rospy.ROSInterruptException:
00264 pass