switcher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # License: BSD
4 # https://raw.github.com/stonier/rqt_wrapper/license/LICENSE
5 #
6 ##############################################################################
7 # Imports
8 ##############################################################################
9 
10 import sys
11 from .ros_masters import RosMasterMonitor
12 import rocon_python_utils.system
13 import signal
14 import tempfile
15 
16 try: # indigo
17  from python_qt_binding.QtGui import QApplication, QMessageBox
18 except ImportError: # kinetic+ (pyqt5)
19  from python_qt_binding.QtWidgets import QApplication, QMessageBox
20 
21 from python_qt_binding.QtCore import Qt, QCoreApplication, QObject, QTimer, Signal, Slot
22 
23 ##############################################################################
24 # Classes
25 ##############################################################################
26 
27 
28 class Switcher(QObject):
29  # PySide signals are always defined as class attributes (GPL Pyqt4 Signals use pyqtSignal)
30  signal_master_changed_state = Signal(bool)
31 
32  def __init__(self, rqt_plugin_name):
33  super(Switcher, self).__init__()
34  self._rqt_plugin_name = rqt_plugin_name
35  self._rqt_subprocess = None
36  self._no_ros_master_dialog = QMessageBox(QMessageBox.Warning, self.tr(self._rqt_plugin_name), self.tr("Waiting for the ROS Master to become available."))
37  self._no_ros_master_abort_button = self._no_ros_master_dialog.addButton(QMessageBox.Abort)
38  self._no_ros_master_abort_button.clicked.connect(self.shutdown)
39  self.signal_master_changed_state.connect(self.switch_state, type=Qt.QueuedConnection)
40  self._ros_master_monitor = RosMasterMonitor(period=1, callback_function=self._master_changed_state)
42 
43  @Slot(bool)
44  def switch_state(self, new_master_found):
45  '''
46  :param bool new_master_found: true when a new master is detected, false when a master has disappeared.
47  '''
48  if new_master_found:
49  self._no_ros_master_dialog.hide()
51  else:
52  self._no_ros_master_dialog.show()
53  self._no_ros_master_dialog.raise_()
54  if self._rqt_subprocess is not None:
56  self._rqt_subprocess.send_signal(signal.SIGINT)
57  # self._rqt_subprocess.terminate() # this is SIGTERM
58  self._rqt_subprocess = None
59 
60  def shutdown(self):
61  '''
62  Shutdowns from external signals or the abort button while waiting for a master.
63  '''
64  print("Shutting down the ros master monitor")
65  self._ros_master_monitor.shutdown()
66  if self._rqt_subprocess is not None:
67  print("Terminating subprocess")
68  self._rqt_subprocess.send_signal(signal.SIGINT)
69  # self._rqt_subprocess.terminate() # this is SIGTERM
70  self._rqt_subprocess = None
71  QCoreApplication.instance().quit()
72 
73  def _rqt_shutdown(self):
74  '''
75  Shutdown handler coming from the rqt subprocess shutting down. This needs
76  to be handled differently depending on whether it's us shutting it down
77  because a ros master shut down (in which case we need to stay alive)
78  or otherwise.
79  '''
81  self._ros_master_monitor.shutdown()
82  QCoreApplication.instance().quit()
83  else:
85 
86  def _master_changed_state(self, available):
87  '''
88  :param bool available: whether the ros master is available or not.
89  '''
90  self.signal_master_changed_state.emit(available)
91 
93  script = '''
94 #!/usr/bin/env python
95 import sys
96 from rqt_gui.main import Main
97 main = Main()
98 sys.exit(main.main(sys.argv, standalone='%s'))
99  ''' % self._rqt_plugin_name
100  executable_file = tempfile.NamedTemporaryFile(mode='w+t', delete=False)
101  executable_file.write(script)
102  executable_file.flush()
103  self._rqt_subprocess = rocon_python_utils.system.Popen(['python', executable_file.name], postexec_fn=self._rqt_shutdown)
104 
105 
106 class RQTWrapper(object):
107  def __init__(self, rqt_plugin_name):
108  signal.signal(signal.SIGINT, self._shutdown)
109  self._app = QApplication(sys.argv)
110  self._rqt_wrapper = Switcher(rqt_plugin_name)
111  # Let the interpreter run each 200 ms, this gives a chance for CTRL-C signals to come through
113  self._give_signals_a_chance_timer.timeout.connect(lambda: None)
114  self._give_signals_a_chance_timer.start(200)
115 
116  def exec_(self):
117  return self._app.exec_()
118 
119  def _shutdown(self, signum, f):
120  if signum == signal.SIGINT:
121  self._rqt_wrapper.shutdown()
122 
123 ##############################################################################
124 # Example Usage - Rocon Remocon
125 ##############################################################################
126 
127 # if __name__ == '__main__':
128 # rqt_wrapper = RQTWrapper('rocon_remocon')
129 # sys.exit(rqt_wrapper.exec_())
def _shutdown(self, signum, f)
Definition: switcher.py:119
def _master_changed_state(self, available)
Definition: switcher.py:86
def __init__(self, rqt_plugin_name)
Definition: switcher.py:107
def __init__(self, rqt_plugin_name)
Definition: switcher.py:32
def switch_state(self, new_master_found)
Definition: switcher.py:44


rqt_wrapper
Author(s): Daniel Stonier
autogenerated on Mon Jun 10 2019 14:59:48