00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 PKG = 'runtime_monitor'
00040 import roslib; roslib.load_manifest(PKG)
00041
00042 import wx
00043 import unittest
00044 import rospy, rostest
00045 from time import sleep
00046 import sys
00047 from optparse import OptionParser
00048
00049 from runtime_monitor.monitor_panel import *
00050
00051 import threading
00052
00053 DURATION = 10
00054
00055
00056 class MainWindow(wx.Frame):
00057 def __init__(self, parent, title):
00058 wx.Frame.__init__(self, parent, wx.ID_ANY, title)
00059
00060 self.panel = MonitorPanel(self)
00061 self.SetSize(wx.Size(750, 450))
00062
00063 def on_exit(self, e):
00064 self.Close(True)
00065
00066 class MonitorApp(wx.App):
00067 def OnInit(self):
00068 self._frame = MainWindow(None, 'Runtime Monitor')
00069 self._frame.SetSize(wx.Size(500, 700))
00070 self._frame.Layout()
00071 self._frame.Show(True)
00072 return True
00073
00074 class RuntimeRunner(threading.Thread):
00075 def __init__(self):
00076 threading.Thread.__init__(self)
00077 self.app = MonitorApp()
00078
00079 def run(self):
00080 self.app.MainLoop()
00081
00082
00083 class TestRuntimeMonitor(unittest.TestCase):
00084 def __init__(self, *args):
00085 super(TestRuntimeMonitor, self).__init__(*args)
00086
00087 rospy.init_node('test_runtime_monitor', anonymous=True)
00088 parser = OptionParser(usage="usage ./%prog [options]", prog="test_monitor.py")
00089
00090
00091 parser.add_option('--gtest_output', action="store",
00092 dest="gtest")
00093
00094 options, args = parser.parse_args(rospy.myargv())
00095
00096 self.runner = RuntimeRunner()
00097 self.runner.start()
00098
00099 def test_runtime_monitor(self):
00100 start = rospy.get_time()
00101 while not rospy.is_shutdown():
00102 if rospy.get_time() - start > DURATION:
00103 break
00104 self.assert_(self.runner.isAlive(), "Thread running runtime monitor isn't running.")
00105 sleep(1.0)
00106
00107 try:
00108 wx.CallAfter(self.runner.app._frame.on_exit(None))
00109 self.runner.join(5)
00110 except:
00111 import traceback
00112 self.assert_(False, "Caught exception closing runtime monitor: %s" % traceback.format_exc())
00113
00114 self.assert_(not self.runner.isAlive(), "Thread running runtime monitor didn't join up.")
00115 self.assert_(not rospy.is_shutdown(), "Rospy shut down during test.")
00116
00117
00118 if __name__ == '__main__':
00119 rostest.run(PKG, sys.argv[0], TestRuntimeMonitor, sys.argv)
00120