Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 PKG = 'runtime_monitor'
00040 import roslib; roslib.load_manifest(PKG)
00041 
00042 import wx
00043 import unittest
00044 import rospy, rostest
00045 from time import sleep
00046 import sys
00047 from optparse import OptionParser
00048 
00049 from runtime_monitor.monitor_panel import *
00050 
00051 import threading
00052 
00053 DURATION = 10
00054 
00055 
00056 class MainWindow(wx.Frame):
00057     def __init__(self, parent, title):
00058         wx.Frame.__init__(self, parent, wx.ID_ANY, title)
00059 
00060         self.panel = MonitorPanel(self)
00061         self.SetSize(wx.Size(750, 450))
00062 
00063     def on_exit(self, e):
00064         self.Close(True)
00065 
00066 class MonitorApp(wx.App):
00067     def OnInit(self):
00068         self._frame = MainWindow(None, 'Runtime Monitor')
00069         self._frame.SetSize(wx.Size(500, 700))
00070         self._frame.Layout()
00071         self._frame.Show(True)
00072         return True
00073 
00074 class RuntimeRunner(threading.Thread):
00075     def __init__(self):
00076         threading.Thread.__init__(self)
00077         self.app = MonitorApp()
00078         
00079     def run(self):
00080         self.app.MainLoop()
00081         
00082 
00083 class TestRuntimeMonitor(unittest.TestCase):
00084     def __init__(self, *args):
00085         super(TestRuntimeMonitor, self).__init__(*args)
00086         
00087         rospy.init_node('test_runtime_monitor', anonymous=True)
00088         parser = OptionParser(usage="usage ./%prog [options]", prog="test_monitor.py")
00089 
00090         
00091         parser.add_option('--gtest_output', action="store",
00092                           dest="gtest")
00093 
00094         options, args = parser.parse_args(rospy.myargv())
00095 
00096         self.runner = RuntimeRunner()
00097         self.runner.start()
00098         
00099     def test_runtime_monitor(self):
00100         start = rospy.get_time()
00101         while not rospy.is_shutdown():
00102             if rospy.get_time() - start > DURATION:
00103                 break
00104             self.assert_(self.runner.isAlive(), "Thread running runtime monitor isn't running.")
00105             sleep(1.0)
00106             
00107         try:
00108             wx.CallAfter(self.runner.app._frame.on_exit(None))
00109             self.runner.join(5)
00110         except:
00111             import traceback
00112             self.assert_(False, "Caught exception closing runtime monitor: %s" % traceback.format_exc())
00113         
00114         self.assert_(not self.runner.isAlive(), "Thread running runtime monitor didn't join up.")
00115         self.assert_(not rospy.is_shutdown(), "Rospy shut down during test.")
00116 
00117 
00118 if __name__ == '__main__':
00119     rostest.run(PKG, sys.argv[0], TestRuntimeMonitor, sys.argv)
00120