joy.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import math
00004 import numpy
00005 
00006 import rospy
00007 import os
00008 import sys
00009 
00010 import imp
00011 try:
00012   imp.find_module("sensor_msgs")
00013 except:
00014   import roslib; roslib.load_manifest('jsk_teleop_joy')
00015 
00016 
00017 from sensor_msgs.msg import Joy
00018 from diagnostic_msgs.msg import DiagnosticStatus, DiagnosticArray
00019 import tf.transformations
00020 from joy_status import IpegaStatus, XBoxStatus, PS3Status, PS3WiredStatus
00021 from jsk_rviz_plugins.msg import OverlayMenu
00022 from plugin_manager import PluginManager
00023 from status_history import StatusHistory
00024 from diagnostic_updater import Updater as DiagnosticUpdater
00025 
00026 AUTO_DETECTED_CLASS = None
00027 
00028 def autoJoyDetect(msg):
00029   global AUTO_DETECTED_CLASS
00030   if AUTO_DETECTED_CLASS:
00031     return
00032   if len(msg.axes) == 27 and len(msg.buttons) == 19:
00033     rospy.loginfo("auto detected as ps3wired")
00034     AUTO_DETECTED_CLASS = PS3WiredStatus
00035   elif len(msg.axes) == 8 and len(msg.buttons) == 11:
00036     rospy.loginfo("auto detected as xbox")
00037     AUTO_DETECTED_CLASS = XBoxStatus
00038   elif len(msg.axes) == 20 and len(msg.buttons) == 17:
00039     rospy.loginfo("auto detected as ps3")
00040     AUTO_DETECTED_CLASS = PS3Status
00041   elif len(msg.axes) == 8 and len(msg.buttons) == 19:
00042     rospy.loginfo("auto detected as ipega")
00043     AUTO_DETECTED_CLASS = IpegaStatus
00044   else:
00045     AUTO_DETECTED_CLASS = "UNKNOWN"
00046     
00047 class JoyManager():
00048   STATE_INITIALIZATION = 1
00049   STATE_RUNNING = 2
00050   STATE_WAIT_FOR_JOY = 3
00051 
00052   MODE_PLUGIN = 0
00053   MODE_MENU = 1
00054   mode = 0
00055   
00056   plugin_instances = []
00057   def stateDiagnostic(self, stat):
00058     if self.state == self.STATE_INITIALIZATION:
00059       stat.summary(DiagnosticStatus.WARN,
00060                    "initializing JoyManager")
00061     elif self.state == self.STATE_RUNNING:
00062       stat.summary(DiagnosticStatus.OK,
00063                    "running")
00064       stat.add("Joy stick type", str(self.JoyStatus))
00065     elif self.state == self.STATE_WAIT_FOR_JOY:
00066       stat.summary(DiagnosticStatus.WARN,
00067                    "waiting for joy message to detect joy stick type")
00068     return stat
00069   def pluginStatusDiagnostic(self, stat):
00070     if len(self.plugin_instances) == 0:
00071         stat.summary(DiagnosticStatus.ERROR, "no plugin is loaded")
00072     else:
00073         stat.summary(DiagnosticStatus.OK, 
00074                      "%d plugins are loaded" % (len(self.plugin_instances)))
00075         stat.add("instances", ", ".join([p.name for p in self.plugin_instances]))
00076     return stat
00077   def __init__(self, plugin_package="jsk_teleop_joy"):
00078     self.state = self.STATE_INITIALIZATION
00079     self.pre_status = None
00080     self.history = StatusHistory(max_length=10)
00081     self.menu_pub = rospy.Publisher("/overlay_menu", OverlayMenu)
00082     self.controller_type = rospy.get_param('~controller_type', 'auto')
00083     self.plugins = rospy.get_param('~plugins', {})
00084     self.current_plugin_index = 0
00085     self.selecting_plugin_index = 0
00086     #you can specify the limit of the rate via ~diagnostic_period
00087     self.diagnostic_updater = DiagnosticUpdater()
00088     self.diagnostic_updater.setHardwareID("teleop_manager")
00089     self.diagnostic_updater.add("State", self.stateDiagnostic)
00090     self.diagnostic_updater.add("Plugin Status", self.pluginStatusDiagnostic)
00091     #self.diagnostic_updater.add("Joy Input", self.joyInputDiagnostic)
00092     self.diagnostic_updater.update()
00093     if self.controller_type == 'xbox':
00094       self.JoyStatus = XBoxStatus
00095     elif self.controller_type == 'ps3':
00096       self.JoyStatus = PS3Status
00097     elif self.controller_type == 'ps3wired':
00098       self.JoyStatus = PS3WiredStatus
00099     elif self.controller_type == 'ipega':
00100       self.JoyStatus = IpegaStatus
00101     elif self.controller_type == 'auto':
00102       s = rospy.Subscriber('/joy', Joy, autoJoyDetect)
00103       self.state = self.STATE_WAIT_FOR_JOY
00104       error_message_published = False
00105       r = rospy.Rate(1)
00106       while not rospy.is_shutdown():
00107         self.diagnostic_updater.update()
00108         if AUTO_DETECTED_CLASS == "UNKNOWN":
00109           if not error_message_published:
00110             rospy.logfatal("unknown joy type")
00111             error_message_published = True
00112           r.sleep()
00113         elif AUTO_DETECTED_CLASS:
00114           self.JoyStatus = AUTO_DETECTED_CLASS
00115           s.unregister()
00116           break
00117         else:
00118           r.sleep()
00119     self.diagnostic_updater.update()
00120     self.plugin_manager = PluginManager(plugin_package)
00121     self.loadPlugins()
00122   def loadPlugins(self):
00123     self.plugin_manager.loadPlugins()
00124     self.plugin_instances = self.plugin_manager.loadPluginInstances(self.plugins, self)
00125   def switchPlugin(self, index):
00126     self.current_plugin_index = index
00127     if len(self.plugin_instances) <= self.current_plugin_index:
00128       self.current_plugin_index = 0
00129     elif self.current_plugin_index < 0:
00130       self.current_plugin_index = len(self.plugin_instances)
00131     self.current_plugin.disable()
00132     self.current_plugin = self.plugin_instances[self.current_plugin_index]
00133     self.current_plugin.enable()
00134   def nextPlugin(self):
00135     rospy.loginfo('switching to next plugin')
00136     self.switchPlugin(self, self.current_plugin_index + 1)
00137   def start(self):
00138     self.publishMenu(0, close=True) # close menu anyway
00139     self.diagnostic_updater.force_update()
00140     if len(self.plugin_instances) == 0:
00141       rospy.logfatal('no valid plugins are loaded')
00142       return False
00143     self.current_plugin = self.plugin_instances[0]
00144     self.current_plugin.enable()
00145     self.joy_subscriber = rospy.Subscriber('/joy', Joy, self.joyCB)
00146     self.state = self.STATE_RUNNING
00147     return True
00148   def publishMenu(self, index, close=False):
00149     menu = OverlayMenu()
00150     menu.menus = [p.name for p in self.plugin_instances]
00151     menu.current_index = index
00152     menu.title = "Joystick"
00153     if close:
00154       menu.action = OverlayMenu.ACTION_CLOSE
00155     self.menu_pub.publish(menu)
00156   def forceToPluginMenu(self, publish_menu=False):
00157     self.selecting_plugin_index = self.current_plugin_index
00158     if publish_menu:
00159       self.publishMenu(self.current_plugin_index)
00160     self.mode = self.MODE_MENU
00161   def processMenuMode(self, status, history):
00162     if history.new(status, "down") or history.new(status, "left_analog_down"):
00163       self.selecting_plugin_index = self.selecting_plugin_index + 1
00164       if self.selecting_plugin_index >= len(self.plugin_instances):
00165         self.selecting_plugin_index = 0
00166       self.publishMenu(self.selecting_plugin_index)
00167     elif history.new(status, "up") or history.new(status, "left_analog_up"):
00168       self.selecting_plugin_index = self.selecting_plugin_index - 1
00169       if self.selecting_plugin_index < 0:
00170         self.selecting_plugin_index = len(self.plugin_instances) - 1
00171       self.publishMenu(self.selecting_plugin_index)
00172     elif history.new(status, "cross") or history.new(status, "center"):
00173       self.publishMenu(self.selecting_plugin_index, close=True)
00174       self.mode = self.MODE_PLUGIN
00175     elif history.new(status, "circle"):
00176       self.publishMenu(self.selecting_plugin_index, close=True)
00177       self.mode = self.MODE_PLUGIN
00178       self.switchPlugin(self.selecting_plugin_index)
00179     else:
00180       self.publishMenu(self.selecting_plugin_index)
00181   def joyCB(self, msg):
00182     status = self.JoyStatus(msg)
00183     if self.mode == self.MODE_MENU:
00184       self.processMenuMode(status, self.history)
00185     else:
00186       if self.history.new(status, "center"):
00187         self.forceToPluginMenu()
00188       else:
00189         self.current_plugin.joyCB(status, self.history)
00190     self.pre_status = status
00191     self.history.add(status)
00192     self.diagnostic_updater.update()
00193     
00194 def main():
00195   global g_manager
00196   rospy.sleep(1)
00197   rospy.init_node('jsk_teleop_joy')
00198   g_manager = JoyManager()
00199   result = g_manager.start()
00200   if not result:
00201     rospy.logfatal("Fatal Error")
00202     return False
00203   else:
00204     rospy.spin()
00205   
00206 if __name__ == '__main__':
00207   main()
00208   


jsk_teleop_joy
Author(s): Ryohei Ueda
autogenerated on Fri Apr 19 2019 03:45:43