00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 PKG = 'robot_monitor'
00040 import roslib; roslib.load_manifest(PKG)
00041
00042 WXVER = '2.8'
00043 import wxversion
00044 if wxversion.checkInstalled(WXVER):
00045 wxversion.select(WXVER)
00046 else:
00047 print >> sys.stderr, "This application requires wxPython version %s"%(WXVER)
00048 sys.exit(1)
00049
00050 from robot_monitor.robot_monitor_panel import RobotMonitorPanel
00051 import wx
00052
00053 import threading
00054 import unittest
00055 import rospy, rostest
00056 from time import sleep
00057 import sys
00058 from optparse import OptionParser
00059
00060 DURATION = 10
00061
00062
00063 class RobotMonitorFrame(wx.Frame):
00064 def __init__(self, parent, title):
00065 wx.Frame.__init__(self, parent, wx.ID_ANY, title)
00066
00067 self.panel = RobotMonitorPanel(self)
00068
00069 def on_exit(self, e):
00070 self.Close(True)
00071
00072
00073 class RobotMonitorApp(wx.App):
00074 def OnInit(self):
00075 self._frame = RobotMonitorFrame(None, 'Robot Monitor')
00076 self._frame.SetSize(wx.Size(500, 700))
00077 self._frame.Layout()
00078
00079 self._frame.Show(True)
00080 return True
00081
00082 class RoMonRunner(threading.Thread):
00083 def __init__(self):
00084 threading.Thread.__init__(self)
00085 self.app = RobotMonitorApp()
00086
00087 def run(self):
00088 self.app.MainLoop()
00089
00090
00091 class TestRobotMonitor(unittest.TestCase):
00092 def __init__(self, *args):
00093 super(TestRobotMonitor, self).__init__(*args)
00094
00095 rospy.init_node('test_robot_monitor', anonymous=True)
00096 parser = OptionParser(usage="usage ./%prog [options]", prog="test_robot_monitor.py")
00097
00098
00099 parser.add_option('--gtest_output', action="store",
00100 dest="gtest")
00101
00102 options, args = parser.parse_args(rospy.myargv())
00103
00104 self.runner = RoMonRunner()
00105 self.runner.start()
00106
00107 def test_robot_monitor(self):
00108 start = rospy.get_time()
00109 while not rospy.is_shutdown():
00110 if rospy.get_time() - start > DURATION:
00111 break
00112 self.assert_(self.runner.isAlive(), "Thread running robot monitor isn't running")
00113 sleep(1.0)
00114
00115
00116 try:
00117 wx.CallAfter(self.runner.app._frame.on_exit(None))
00118 self.runner.join(5)
00119 except:
00120 import traceback
00121 self.assert_(False, "Caught exception closing robot monitor: %s" % traceback.format_exc())
00122
00123 self.assert_(not self.runner.isAlive(), "Thread running robot monitor didn't join up")
00124 self.assert_(not rospy.is_shutdown(), "Rospy shut down")
00125
00126
00127 if __name__ == '__main__':
00128 rostest.run(PKG, sys.argv[0], TestRobotMonitor, sys.argv)
00129