joy.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import math
4 import numpy
5 
6 import rospy
7 import os
8 import sys
9 
10 import imp
11 try:
12  imp.find_module("sensor_msgs")
13 except:
14  import roslib; roslib.load_manifest('jsk_teleop_joy')
15 
16 
17 from sensor_msgs.msg import Joy
18 from diagnostic_msgs.msg import DiagnosticStatus, DiagnosticArray
19 import tf.transformations
20 from joy_status import IpegaStatus, XBoxStatus, PS3Status, PS3WiredStatus
21 from jsk_rviz_plugins.msg import OverlayMenu
22 from plugin_manager import PluginManager
23 from status_history import StatusHistory
24 from diagnostic_updater import Updater as DiagnosticUpdater
25 
26 AUTO_DETECTED_CLASS = None
27 
28 def autoJoyDetect(msg):
29  global AUTO_DETECTED_CLASS
30  if AUTO_DETECTED_CLASS:
31  return
32  if len(msg.axes) == 27 and len(msg.buttons) == 19:
33  rospy.loginfo("auto detected as ps3wired")
34  AUTO_DETECTED_CLASS = PS3WiredStatus
35  elif len(msg.axes) == 8 and len(msg.buttons) == 11:
36  rospy.loginfo("auto detected as xbox")
37  AUTO_DETECTED_CLASS = XBoxStatus
38  elif len(msg.axes) == 20 and len(msg.buttons) == 17:
39  rospy.loginfo("auto detected as ps3")
40  AUTO_DETECTED_CLASS = PS3Status
41  elif len(msg.axes) == 8 and len(msg.buttons) == 19:
42  rospy.loginfo("auto detected as ipega")
43  AUTO_DETECTED_CLASS = IpegaStatus
44  else:
45  AUTO_DETECTED_CLASS = "UNKNOWN"
46 
47 class JoyManager():
48  STATE_INITIALIZATION = 1
49  STATE_RUNNING = 2
50  STATE_WAIT_FOR_JOY = 3
51 
52  MODE_PLUGIN = 0
53  MODE_MENU = 1
54  mode = 0
55 
56  plugin_instances = []
57  def stateDiagnostic(self, stat):
58  if self.state == self.STATE_INITIALIZATION:
59  stat.summary(DiagnosticStatus.WARN,
60  "initializing JoyManager")
61  elif self.state == self.STATE_RUNNING:
62  stat.summary(DiagnosticStatus.OK,
63  "running")
64  stat.add("Joy stick type", str(self.JoyStatus))
65  elif self.state == self.STATE_WAIT_FOR_JOY:
66  stat.summary(DiagnosticStatus.WARN,
67  "waiting for joy message to detect joy stick type")
68  return stat
69  def pluginStatusDiagnostic(self, stat):
70  if len(self.plugin_instances) == 0:
71  stat.summary(DiagnosticStatus.ERROR, "no plugin is loaded")
72  else:
73  stat.summary(DiagnosticStatus.OK,
74  "%d plugins are loaded" % (len(self.plugin_instances)))
75  stat.add("instances", ", ".join([p.name for p in self.plugin_instances]))
76  return stat
77  def __init__(self, plugin_package="jsk_teleop_joy"):
78  self.state = self.STATE_INITIALIZATION
79  self.pre_status = None
80  self.history = StatusHistory(max_length=10)
81  self.menu_pub = rospy.Publisher("/overlay_menu", OverlayMenu)
82  self.controller_type = rospy.get_param('~controller_type', 'auto')
83  self.plugins = rospy.get_param('~plugins', {})
86  #you can specify the limit of the rate via ~diagnostic_period
87  self.diagnostic_updater = DiagnosticUpdater()
88  self.diagnostic_updater.setHardwareID("teleop_manager")
89  self.diagnostic_updater.add("State", self.stateDiagnostic)
90  self.diagnostic_updater.add("Plugin Status", self.pluginStatusDiagnostic)
91  #self.diagnostic_updater.add("Joy Input", self.joyInputDiagnostic)
92  self.diagnostic_updater.update()
93  if self.controller_type == 'xbox':
94  self.JoyStatus = XBoxStatus
95  elif self.controller_type == 'ps3':
96  self.JoyStatus = PS3Status
97  elif self.controller_type == 'ps3wired':
98  self.JoyStatus = PS3WiredStatus
99  elif self.controller_type == 'ipega':
100  self.JoyStatus = IpegaStatus
101  elif self.controller_type == 'auto':
102  s = rospy.Subscriber('/joy', Joy, autoJoyDetect)
103  self.state = self.STATE_WAIT_FOR_JOY
104  error_message_published = False
105  r = rospy.Rate(1)
106  while not rospy.is_shutdown():
107  self.diagnostic_updater.update()
108  if AUTO_DETECTED_CLASS == "UNKNOWN":
109  if not error_message_published:
110  rospy.logfatal("unknown joy type")
111  error_message_published = True
112  r.sleep()
113  elif AUTO_DETECTED_CLASS:
114  self.JoyStatus = AUTO_DETECTED_CLASS
115  s.unregister()
116  break
117  else:
118  r.sleep()
119  self.diagnostic_updater.update()
120  self.plugin_manager = PluginManager(plugin_package)
121  self.loadPlugins()
122  def loadPlugins(self):
123  self.plugin_manager.loadPlugins()
124  self.plugin_instances = self.plugin_manager.loadPluginInstances(self.plugins, self)
125  def switchPlugin(self, index):
126  self.current_plugin_index = index
127  if len(self.plugin_instances) <= self.current_plugin_index:
128  self.current_plugin_index = 0
129  elif self.current_plugin_index < 0:
130  self.current_plugin_index = len(self.plugin_instances)
131  self.current_plugin.disable()
133  self.current_plugin.enable()
134  def nextPlugin(self):
135  rospy.loginfo('switching to next plugin')
136  self.switchPlugin(self, self.current_plugin_index + 1)
137  def start(self):
138  self.publishMenu(0, close=True) # close menu anyway
139  self.diagnostic_updater.force_update()
140  if len(self.plugin_instances) == 0:
141  rospy.logfatal('no valid plugins are loaded')
142  return False
143  self.current_plugin = self.plugin_instances[0]
144  self.current_plugin.enable()
145  self.joy_subscriber = rospy.Subscriber('/joy', Joy, self.joyCB)
146  self.state = self.STATE_RUNNING
147  return True
148  def publishMenu(self, index, close=False):
149  menu = OverlayMenu()
150  menu.menus = [p.name for p in self.plugin_instances]
151  menu.current_index = index
152  menu.title = "Joystick"
153  if close:
154  menu.action = OverlayMenu.ACTION_CLOSE
155  self.menu_pub.publish(menu)
156  def forceToPluginMenu(self, publish_menu=False):
158  if publish_menu:
160  self.mode = self.MODE_MENU
161  def processMenuMode(self, status, history):
162  if history.new(status, "down") or history.new(status, "left_analog_down"):
164  if self.selecting_plugin_index >= len(self.plugin_instances):
165  self.selecting_plugin_index = 0
167  elif history.new(status, "up") or history.new(status, "left_analog_up"):
169  if self.selecting_plugin_index < 0:
170  self.selecting_plugin_index = len(self.plugin_instances) - 1
172  elif history.new(status, "cross") or history.new(status, "center"):
173  self.publishMenu(self.selecting_plugin_index, close=True)
174  self.mode = self.MODE_PLUGIN
175  elif history.new(status, "circle"):
176  self.publishMenu(self.selecting_plugin_index, close=True)
177  self.mode = self.MODE_PLUGIN
179  else:
181  def joyCB(self, msg):
182  status = self.JoyStatus(msg)
183  if self.mode == self.MODE_MENU:
184  self.processMenuMode(status, self.history)
185  else:
186  if self.history.new(status, "center"):
187  self.forceToPluginMenu()
188  else:
189  self.current_plugin.joyCB(status, self.history)
190  self.pre_status = status
191  self.history.add(status)
192  self.diagnostic_updater.update()
193 
194 def main():
195  global g_manager
196  rospy.sleep(1)
197  rospy.init_node('jsk_teleop_joy')
198  g_manager = JoyManager()
199  result = g_manager.start()
200  if not result:
201  rospy.logfatal("Fatal Error")
202  return False
203  else:
204  rospy.spin()
205 
206 if __name__ == '__main__':
207  main()
208 
def forceToPluginMenu(self, publish_menu=False)
Definition: joy.py:156
def publishMenu(self, index, close=False)
Definition: joy.py:148
def pluginStatusDiagnostic(self, stat)
Definition: joy.py:69
def autoJoyDetect(msg)
Definition: joy.py:28
def processMenuMode(self, status, history)
Definition: joy.py:161
def joyCB(self, msg)
Definition: joy.py:181
def switchPlugin(self, index)
Definition: joy.py:125
def stateDiagnostic(self, stat)
Definition: joy.py:57
def main()
Definition: joy.py:194
def __init__(self, plugin_package="jsk_teleop_joy")
Definition: joy.py:77


jsk_teleop_joy
Author(s): Ryohei Ueda
autogenerated on Fri May 14 2021 02:52:11