joystick_relay.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # twist_mux: joystick_relay.py
5 #
6 # Copyright (c) 2013 PAL Robotics SL. All Rights Reserved
7 #
8 # Authors:
9 # * Enrique Fernandez
10 # * Siegfried-A. Gevatter
11 
12 import rospy
13 import actionlib
14 from twist_mux_msgs.msg import JoyPriorityAction, JoyTurboAction
15 from geometry_msgs.msg import Twist
16 from std_msgs.msg import Bool
17 from visualization_msgs.msg import Marker
18 
19 import numpy as np
20 
21 class Velocity(object):
22 
23  def __init__(self, min_velocity, max_velocity, num_steps):
24  assert min_velocity > 0 and max_velocity > 0 and num_steps > 0
25  self._min = min_velocity
26  self._max = max_velocity
27  self._num_steps = num_steps
28  if self._num_steps > 1:
29  self._step_incr = (max_velocity - min_velocity) / (self._num_steps - 1)
30  else:
31  # If num_steps is one, we always use the minimum velocity.
32  self._step_incr = 0
33 
34  def __call__(self, value, step=1):
35  """
36  Takes a value in the range [0, 1] and the step and returns the
37  velocity (usually m/s or rad/s).
38  """
39  assert step > 0 and step <= self._num_steps
40  max_value = self._min + self._step_incr * (step - 1)
41  return value * max_value
42 
44 
45  def __init__(self, action_name, action_type, callback):
46  self._action_type = action_type
47  self._callback = callback
48  self._server = actionlib.SimpleActionServer(action_name, action_type,
49  self._cb, False)
50  self._server.start()
51 
52  def _cb(self, goal):
53  self._callback()
54  result = self._action_type().action_result
55  self._server.set_succeeded(result)
56 
58 
59  def __init__(self):
60  self._num_steps = rospy.get_param('~turbo/steps', 1)
61 
62  forward_min = rospy.get_param('~turbo/linear_forward_min', 1.0)
63  forward_max = rospy.get_param('~turbo/linear_forward_max', 1.0)
64  self._forward = Velocity(forward_min, forward_max, self._num_steps)
65 
66  backward_min = rospy.get_param('~turbo/linear_backward_min', forward_min)
67  backward_max = rospy.get_param('~turbo/linear_backward_max', forward_max)
68  self._backward = Velocity(backward_min, backward_max, self._num_steps)
69 
70  lateral_min = rospy.get_param('~turbo/linear_lateral_min', 1.0)
71  lateral_max = rospy.get_param('~turbo/linear_lateral_max', 1.0)
72  self._lateral = Velocity(lateral_min, lateral_max, self._num_steps)
73 
74  angular_min = rospy.get_param('~turbo/angular_min', 1.0)
75  angular_max = rospy.get_param('~turbo/angular_max', 1.0)
76  self._angular = Velocity(angular_min, angular_max, self._num_steps)
77 
78  default_init_step = np.floor((self._num_steps + 1)/2.0)
79  init_step = rospy.get_param('~turbo/init_step', default_init_step)
80  if init_step < 0 or init_step > self._num_steps:
81  self._init_step = default_init_step
82  rospy.logwarn('Initial step %d outside range [1, %d]!'
83  ' Falling back to default %d' %
84  (init_step, self._num_steps, default_init_step))
85  else:
86  self._init_step = init_step
87  self.reset_turbo()
88 
89  def validate_twist(self, cmd):
90  if cmd.linear.z or cmd.angular.x or cmd.angular.y:
91  rospy.logerr("Joystick provided invalid values, only linear.x, linear.y and angular.z may be non-zero.")
92  return False
93  if abs(cmd.linear.x) > 1.0 or abs(cmd.linear.y) > 1.0 or abs(cmd.angular.z) > 1.0:
94  rospy.logerr("Joystick provided invalid values (%d, %d, %d), not in [-1, 1] range." % (cmd.linear.x, cmd.linear.y, cmd.angular.z))
95  return False
96  return True
97 
98  def scale_twist(self, cmd):
99  twist = Twist()
100  if self.validate_twist(cmd):
101  if cmd.linear.x >= 0:
102  twist.linear.x = self._forward(cmd.linear.x, self._current_step)
103  else:
104  twist.linear.x = self._backward(cmd.linear.x, self._current_step)
105  twist.linear.y = self._lateral(cmd.linear.y, self._current_step)
106  twist.angular.z = self._angular(cmd.angular.z, self._current_angular_step)
107  return twist
108 
109  def increase_turbo(self):
110  if self._current_step < self._num_steps:
111  self._current_step += 1
113 
114  def decrease_turbo(self):
115  if self._current_step > 1:
116  self._current_step -= 1
118 
120  if self._current_angular_step < self._num_steps:
121  self._current_angular_step += 1
122 
124  if self._current_angular_step > 1:
125  self._current_angular_step -= 1
126 
127  def reset_turbo(self):
130 
131 class TextMarker(object):
132 
133  def __init__(self, scale = 1.0, z = 0.0):
134  self._pub = rospy.Publisher('text_marker', Marker, queue_size=1, latch=True)
135 
136  self._scale = scale
137  self._z = z
138 
139  # Build marker
140  self._marker = Marker()
141 
142  self._marker.id = 0
143  self._marker.type = Marker.TEXT_VIEW_FACING
144 
145  self._marker.header.frame_id = "base_footprint"
146 
147  self._marker.pose.position.z = self._z
148 
149  self._marker.pose.orientation.w = 1.0
150 
151  self._marker.scale.z = self._scale
152 
153  self._marker.color.a = 1.0
154  self._marker.color.r = 1.0
155  self._marker.color.g = 1.0
156  self._marker.color.b = 1.0
157 
158  def update(self, joystick_priority, add = True):
159  if add:
160  self._marker.action = Marker.ADD
161 
162  self._marker.text = "Manual" if joystick_priority else "Autonomous"
163  else:
164  self._marker.action = Marker.DELETE
165 
166  self._pub.publish(self._marker)
167 
169 
170  def __init__(self):
171  self._current_priority = rospy.get_param("~priority", True)
173 
174  self._marker = TextMarker(0.5, 2.0)
175 
176  self._pub_cmd = rospy.Publisher('joy_vel_out', Twist, queue_size=1)
177  self._subscriber = rospy.Subscriber('joy_vel_in', Twist, self._forward_cmd, queue_size=1)
178 
179  self._pub_priority = rospy.Publisher('joy_priority', Bool, queue_size=1, latch=True)
180 
181  # Wait for subscribers and publish initial joy_priority:
182  self._pub_priority.publish(self._current_priority)
183  self._marker.update(self._current_priority)
184 
185  # Marker timer (required to update the marker when the robot doesn't receive velocities):
186  rospy.Timer(rospy.Duration(1.0), self._timer_callback)
187 
188  # Action servers to change priority & the currently active turbo step.
189  # We aren't using services because they aren't supported by joy_teleop.
190  self._server_priority = ServiceLikeActionServer('joy_priority_action', JoyPriorityAction,
191  self._toggle_priority)
192  self._server_increase = ServiceLikeActionServer('joy_turbo_increase', JoyTurboAction,
193  self._velocity_control.increase_turbo)
194  self._server_decrease = ServiceLikeActionServer('joy_turbo_decrease', JoyTurboAction,
195  self._velocity_control.decrease_turbo)
196  self._server_angular_increase = ServiceLikeActionServer('joy_turbo_angular_increase', JoyTurboAction,
197  self._velocity_control.increase_angular_turbo)
198  self._server_angular_decrease = ServiceLikeActionServer('joy_turbo_angular_decrease', JoyTurboAction,
199  self._velocity_control.decrease_angular_turbo)
200  self._server_reset = ServiceLikeActionServer('joy_turbo_reset', JoyTurboAction,
201  self._velocity_control.reset_turbo)
202 
203  def _forward_cmd(self, cmd):
204  if self._current_priority:
205  self._pub_cmd.publish(self._velocity_control.scale_twist(cmd))
206 
207  self._marker.update(self._current_priority)
208 
209  def _toggle_priority(self):
210  self._current_priority = not self._current_priority
211  rospy.loginfo("Toggled joy_priority, current status is: %s", self._current_priority)
212  self._pub_priority.publish(self._current_priority)
213  self._marker.update(self._current_priority)
214 
215  # Reset velocity to 0:
216  if self._current_priority:
217  self._pub_cmd.publish(Twist())
218 
219  def _timer_callback(self, event):
220  self._marker.update(self._current_priority)
221 
222 if __name__ == '__main__':
223  rospy.init_node('joystick_relay')
224 
225  server = JoystickRelay()
226 
227  rospy.spin()
joystick_relay.VelocityControl._backward
_backward
Definition: joystick_relay.py:68
joystick_relay.JoystickRelay._server_angular_decrease
_server_angular_decrease
Definition: joystick_relay.py:198
joystick_relay.VelocityControl._num_steps
_num_steps
Definition: joystick_relay.py:60
joystick_relay.JoystickRelay._marker
_marker
Definition: joystick_relay.py:174
joystick_relay.JoystickRelay._toggle_priority
def _toggle_priority(self)
Definition: joystick_relay.py:209
joystick_relay.VelocityControl._forward
_forward
Definition: joystick_relay.py:64
joystick_relay.TextMarker._z
_z
Definition: joystick_relay.py:137
joystick_relay.JoystickRelay._server_priority
_server_priority
Definition: joystick_relay.py:190
joystick_relay.VelocityControl._lateral
_lateral
Definition: joystick_relay.py:72
joystick_relay.Velocity._num_steps
_num_steps
Definition: joystick_relay.py:27
joystick_relay.VelocityControl._current_angular_step
_current_angular_step
Definition: joystick_relay.py:129
joystick_relay.VelocityControl.decrease_angular_turbo
def decrease_angular_turbo(self)
Definition: joystick_relay.py:123
joystick_relay.VelocityControl._init_step
_init_step
Definition: joystick_relay.py:81
joystick_relay.VelocityControl._current_step
_current_step
Definition: joystick_relay.py:128
joystick_relay.JoystickRelay._subscriber
_subscriber
Definition: joystick_relay.py:177
joystick_relay.Velocity.__init__
def __init__(self, min_velocity, max_velocity, num_steps)
Definition: joystick_relay.py:23
joystick_relay.Velocity._min
_min
Definition: joystick_relay.py:25
joystick_relay.TextMarker._marker
_marker
Definition: joystick_relay.py:140
joystick_relay.ServiceLikeActionServer._server
_server
Definition: joystick_relay.py:48
joystick_relay.VelocityControl
Definition: joystick_relay.py:57
joystick_relay.JoystickRelay._server_angular_increase
_server_angular_increase
Definition: joystick_relay.py:196
joystick_relay.TextMarker
Definition: joystick_relay.py:131
joystick_relay.Velocity._max
_max
Definition: joystick_relay.py:26
joystick_relay.VelocityControl.validate_twist
def validate_twist(self, cmd)
Definition: joystick_relay.py:89
joystick_relay.JoystickRelay._server_reset
_server_reset
Definition: joystick_relay.py:200
joystick_relay.Velocity._step_incr
_step_incr
Definition: joystick_relay.py:29
joystick_relay.ServiceLikeActionServer._action_type
_action_type
Definition: joystick_relay.py:46
joystick_relay.JoystickRelay._pub_priority
_pub_priority
Definition: joystick_relay.py:179
joystick_relay.JoystickRelay.__init__
def __init__(self)
Definition: joystick_relay.py:170
joystick_relay.ServiceLikeActionServer.__init__
def __init__(self, action_name, action_type, callback)
Definition: joystick_relay.py:45
joystick_relay.JoystickRelay._current_priority
_current_priority
Definition: joystick_relay.py:171
joystick_relay.VelocityControl.increase_angular_turbo
def increase_angular_turbo(self)
Definition: joystick_relay.py:119
joystick_relay.TextMarker._scale
_scale
Definition: joystick_relay.py:136
joystick_relay.VelocityControl._angular
_angular
Definition: joystick_relay.py:76
joystick_relay.TextMarker.update
def update(self, joystick_priority, add=True)
Definition: joystick_relay.py:158
joystick_relay.ServiceLikeActionServer._callback
_callback
Definition: joystick_relay.py:47
joystick_relay.VelocityControl.increase_turbo
def increase_turbo(self)
Definition: joystick_relay.py:109
joystick_relay.Velocity.__call__
def __call__(self, value, step=1)
Definition: joystick_relay.py:34
joystick_relay.Velocity
Definition: joystick_relay.py:21
joystick_relay.TextMarker.__init__
def __init__(self, scale=1.0, z=0.0)
Definition: joystick_relay.py:133
joystick_relay.ServiceLikeActionServer._cb
def _cb(self, goal)
Definition: joystick_relay.py:52
joystick_relay.JoystickRelay._server_decrease
_server_decrease
Definition: joystick_relay.py:194
joystick_relay.ServiceLikeActionServer
Definition: joystick_relay.py:43
joystick_relay.JoystickRelay
Definition: joystick_relay.py:168
actionlib::SimpleActionServer
joystick_relay.TextMarker._pub
_pub
Definition: joystick_relay.py:134
joystick_relay.VelocityControl.__init__
def __init__(self)
Definition: joystick_relay.py:59
joystick_relay.JoystickRelay._forward_cmd
def _forward_cmd(self, cmd)
Definition: joystick_relay.py:203
joystick_relay.JoystickRelay._timer_callback
def _timer_callback(self, event)
Definition: joystick_relay.py:219
joystick_relay.VelocityControl.reset_turbo
def reset_turbo(self)
Definition: joystick_relay.py:127
joystick_relay.JoystickRelay._velocity_control
_velocity_control
Definition: joystick_relay.py:172
joystick_relay.VelocityControl.decrease_turbo
def decrease_turbo(self)
Definition: joystick_relay.py:114
joystick_relay.VelocityControl.scale_twist
def scale_twist(self, cmd)
Definition: joystick_relay.py:98
joystick_relay.JoystickRelay._server_increase
_server_increase
Definition: joystick_relay.py:192
joystick_relay.JoystickRelay._pub_cmd
_pub_cmd
Definition: joystick_relay.py:176


twist_mux
Author(s): Enrique Fernandez , Siegfried-A. Gevatter Pujals
autogenerated on Wed Oct 26 2022 02:18:09