Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 from __future__ import with_statement
00040
00041 DURATION = 20
00042
00043 PKG = 'pr2_hardware_test_monitor'
00044 import roslib; roslib.load_manifest(PKG)
00045
00046 import unittest
00047 import rospy, rostest
00048 from time import sleep
00049 import sys
00050 from optparse import OptionParser
00051
00052 from pr2_self_test_msgs.msg import TestStatus
00053 import threading
00054
00055 class TestMonitorUnit(unittest.TestCase):
00056 def __init__(self, *args):
00057 super(TestMonitorUnit, self).__init__(*args)
00058
00059 parser = OptionParser(usage="usage ./%prog [options]",
00060 prog="monitor_listen_test.py")
00061
00062 parser.add_option('--gtest_output', action="store",
00063 dest="gtest")
00064
00065 self._mutex = threading.Lock()
00066 rospy.init_node('test_monitor_listener')
00067 self._ignore_time = 5
00068 self._start_time = rospy.get_time()
00069 self._ok = True
00070 self._message = None
00071 self._level = 0
00072
00073 self._start = rospy.get_time()
00074
00075 options, args = parser.parse_args(rospy.myargv())
00076
00077 rospy.Subscriber('test_status', TestStatus, self.cb)
00078
00079 def cb(self, msg):
00080 if rospy.get_time() - self._start_time < self._ignore_time:
00081 return
00082
00083 with self._mutex:
00084 self._ok = self._ok and (msg.test_ok == 0)
00085 if self._ok or (not self._ok and msg.test_ok != 0):
00086 self._message = msg.message
00087 self._level = msg.test_ok
00088
00089 def test_monitor(self):
00090 while not rospy.is_shutdown():
00091 sleep(1.0)
00092 if rospy.get_time() - self._start > DURATION:
00093 break
00094
00095 with self._mutex:
00096 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00097 self.assert_(self._message is not None, "No data from test monitor")
00098 self.assert_(self._ok, "Test Monitor reports error. Level: %d, Message: %s" % (self._level, self._message))
00099
00100
00101 if __name__ == '__main__':
00102 print 'SYS ARGS:', sys.argv
00103 rostest.run(PKG, sys.argv[0], TestMonitorUnit, sys.argv)