00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 import rospy
00013 import actionlib
00014 from twist_mux_msgs.msg import JoyPriorityAction, JoyTurboAction
00015 from geometry_msgs.msg import Twist
00016 from std_msgs.msg import Bool
00017 from visualization_msgs.msg import Marker
00018
00019 import numpy as np
00020
00021 class Velocity(object):
00022
00023 def __init__(self, min_velocity, max_velocity, num_steps):
00024 assert min_velocity > 0 and max_velocity > 0 and num_steps > 0
00025 self._min = min_velocity
00026 self._max = max_velocity
00027 self._num_steps = num_steps
00028 if self._num_steps > 1:
00029 self._step_incr = (max_velocity - min_velocity) / (self._num_steps - 1)
00030 else:
00031
00032 self._step_incr = 0
00033
00034 def __call__(self, value, step=1):
00035 """
00036 Takes a value in the range [0, 1] and the step and returns the
00037 velocity (usually m/s or rad/s).
00038 """
00039 assert step > 0 and step <= self._num_steps
00040 max_value = self._min + self._step_incr * (step - 1)
00041 return value * max_value
00042
00043 class ServiceLikeActionServer(object):
00044
00045 def __init__(self, action_name, action_type, callback):
00046 self._action_type = action_type
00047 self._callback = callback
00048 self._server = actionlib.SimpleActionServer(action_name, action_type,
00049 self._cb, False)
00050 self._server.start()
00051
00052 def _cb(self, goal):
00053 self._callback()
00054 result = self._action_type().action_result
00055 self._server.set_succeeded(result)
00056
00057 class VelocityControl:
00058
00059 def __init__(self):
00060 self._num_steps = rospy.get_param('~turbo/steps', 1)
00061
00062 forward_min = rospy.get_param('~turbo/linear_forward_min', 1.0)
00063 forward_max = rospy.get_param('~turbo/linear_forward_max', 1.0)
00064 self._forward = Velocity(forward_min, forward_max, self._num_steps)
00065
00066 backward_min = rospy.get_param('~turbo/linear_backward_min', forward_min)
00067 backward_max = rospy.get_param('~turbo/linear_backward_max', forward_max)
00068 self._backward = Velocity(backward_min, backward_max, self._num_steps)
00069
00070 lateral_min = rospy.get_param('~turbo/linear_lateral_min', 1.0)
00071 lateral_max = rospy.get_param('~turbo/linear_lateral_max', 1.0)
00072 self._lateral = Velocity(lateral_min, lateral_max, self._num_steps)
00073
00074 angular_min = rospy.get_param('~turbo/angular_min', 1.0)
00075 angular_max = rospy.get_param('~turbo/angular_max', 1.0)
00076 self._angular = Velocity(angular_min, angular_max, self._num_steps)
00077
00078 default_init_step = np.floor((self._num_steps + 1)/2.0)
00079 init_step = rospy.get_param('~turbo/init_step', default_init_step)
00080 if init_step < 0 or init_step > self._num_steps:
00081 self._init_step = default_init_step
00082 rospy.logwarn('Initial step %d outside range [1, %d]!'
00083 ' Falling back to default %d' %
00084 (init_step, self._num_steps, default_init_step))
00085 else:
00086 self._init_step = init_step
00087 self.reset_turbo()
00088
00089 def validate_twist(self, cmd):
00090 if cmd.linear.z or cmd.angular.x or cmd.angular.y:
00091 rospy.logerr("Joystick provided invalid values, only linear.x, linear.y and angular.z may be non-zero.")
00092 return False
00093 if abs(cmd.linear.x) > 1.0 or abs(cmd.linear.y) > 1.0 or abs(cmd.angular.z) > 1.0:
00094 rospy.logerr("Joystick provided invalid values (%d, %d, %d), not in [-1, 1] range." % (cmd.linear.x, cmd.linear.y, cmd.angular.z))
00095 return False
00096 return True
00097
00098 def scale_twist(self, cmd):
00099 twist = Twist()
00100 if self.validate_twist(cmd):
00101 if cmd.linear.x >= 0:
00102 twist.linear.x = self._forward(cmd.linear.x, self._current_step)
00103 else:
00104 twist.linear.x = self._backward(cmd.linear.x, self._current_step)
00105 twist.linear.y = self._lateral(cmd.linear.y, self._current_step)
00106 twist.angular.z = self._angular(cmd.angular.z, self._current_angular_step)
00107 return twist
00108
00109 def increase_turbo(self):
00110 if self._current_step < self._num_steps:
00111 self._current_step += 1
00112 self.increase_angular_turbo()
00113
00114 def decrease_turbo(self):
00115 if self._current_step > 1:
00116 self._current_step -= 1
00117 self.decrease_angular_turbo()
00118
00119 def increase_angular_turbo(self):
00120 if self._current_angular_step < self._num_steps:
00121 self._current_angular_step += 1
00122
00123 def decrease_angular_turbo(self):
00124 if self._current_angular_step > 1:
00125 self._current_angular_step -= 1
00126
00127 def reset_turbo(self):
00128 self._current_step = self._init_step
00129 self._current_angular_step = self._init_step
00130
00131 class TextMarker(object):
00132
00133 def __init__(self, scale = 1.0, z = 0.0):
00134 self._pub = rospy.Publisher('text_marker', Marker, queue_size=1, latch=True)
00135
00136 self._scale = scale
00137 self._z = z
00138
00139
00140 self._marker = Marker()
00141
00142 self._marker.id = 0
00143 self._marker.type = Marker.TEXT_VIEW_FACING
00144
00145 self._marker.header.frame_id = "base_footprint"
00146
00147 self._marker.pose.position.z = self._z
00148
00149 self._marker.pose.orientation.w = 1.0
00150
00151 self._marker.scale.z = self._scale
00152
00153 self._marker.color.a = 1.0
00154 self._marker.color.r = 1.0
00155 self._marker.color.g = 1.0
00156 self._marker.color.b = 1.0
00157
00158 def update(self, joystick_priority, add = True):
00159 if add:
00160 self._marker.action = Marker.ADD
00161
00162 self._marker.text = "Manual" if joystick_priority else "Autonomous"
00163 else:
00164 self._marker.action = Marker.DELETE
00165
00166 self._pub.publish(self._marker)
00167
00168 class JoystickRelay:
00169
00170 def __init__(self):
00171 self._current_priority = rospy.get_param("~priority", True)
00172 self._velocity_control = VelocityControl()
00173
00174 self._marker = TextMarker(0.5, 2.0)
00175
00176 self._pub_cmd = rospy.Publisher('joy_vel_out', Twist, queue_size=1)
00177 self._subscriber = rospy.Subscriber('joy_vel_in', Twist, self._forward_cmd, queue_size=1)
00178
00179 self._pub_priority = rospy.Publisher('joy_priority', Bool, queue_size=1, latch=True)
00180
00181
00182 self._pub_priority.publish(self._current_priority)
00183 self._marker.update(self._current_priority)
00184
00185
00186 rospy.Timer(rospy.Duration(1.0), self._timer_callback)
00187
00188
00189
00190 self._server_priority = ServiceLikeActionServer('joy_priority_action', JoyPriorityAction,
00191 self._toggle_priority)
00192 self._server_increase = ServiceLikeActionServer('joy_turbo_increase', JoyTurboAction,
00193 self._velocity_control.increase_turbo)
00194 self._server_decrease = ServiceLikeActionServer('joy_turbo_decrease', JoyTurboAction,
00195 self._velocity_control.decrease_turbo)
00196 self._server_angular_increase = ServiceLikeActionServer('joy_turbo_angular_increase', JoyTurboAction,
00197 self._velocity_control.increase_angular_turbo)
00198 self._server_angular_decrease = ServiceLikeActionServer('joy_turbo_angular_decrease', JoyTurboAction,
00199 self._velocity_control.decrease_angular_turbo)
00200 self._server_reset = ServiceLikeActionServer('joy_turbo_reset', JoyTurboAction,
00201 self._velocity_control.reset_turbo)
00202
00203 def _forward_cmd(self, cmd):
00204 if self._current_priority:
00205 self._pub_cmd.publish(self._velocity_control.scale_twist(cmd))
00206
00207 self._marker.update(self._current_priority)
00208
00209 def _toggle_priority(self):
00210 self._current_priority = not self._current_priority
00211 rospy.loginfo("Toggled joy_priority, current status is: %s", self._current_priority)
00212 self._pub_priority.publish(self._current_priority)
00213 self._marker.update(self._current_priority)
00214
00215
00216 if self._current_priority:
00217 self._pub_cmd.publish(Twist())
00218
00219 def _timer_callback(self, event):
00220 self._marker.update(self._current_priority)
00221
00222 if __name__ == '__main__':
00223 rospy.init_node('joystick_relay')
00224
00225 server = JoystickRelay()
00226
00227 rospy.spin()