Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 from __future__ import with_statement
00040
00041 DURATION = 5
00042 PKG = 'test_diagnostic_aggregator'
00043 import rospy, rostest, unittest
00044 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00045 from time import sleep
00046 import sys
00047 import threading
00048
00049 from optparse import OptionParser
00050
00051
00052 def get_raw_name(agg_name):
00053 return agg_name.split('/')[-1]
00054
00055 def get_header_name(agg_name):
00056 return '/'.join(agg_name.split('/')[1:-1])
00057
00058 class TestFailInit(unittest.TestCase):
00059 def __init__(self, *args):
00060 super(TestFailInit, self).__init__(*args)
00061
00062 parser = OptionParser(usage="usage ./%prog [options]", prog="fail_init_test.py")
00063 parser.add_option('--ns', action="store", default=None,
00064 dest="ns", metavar="NAMESPACE",
00065 help="Expected namespace that analyzer will fail in")
00066
00067 parser.add_option('--gtest_output', action="store",
00068 dest="gtest")
00069
00070 options, args = parser.parse_args(rospy.myargv())
00071
00072 if not options.ns:
00073 parser.error("Option --ns is mandatory. Unable to parse args")
00074
00075 self._ns = options.ns
00076 self._item = None
00077
00078 self._mutex = threading.Lock()
00079
00080
00081 rospy.init_node('test_fail_init')
00082 self._starttime = rospy.get_time()
00083
00084 sub_agg = rospy.Subscriber("/diagnostics_agg", DiagnosticArray, self.diag_agg_cb)
00085
00086 def diag_agg_cb(self, msg):
00087 with self._mutex:
00088 for stat in msg.status:
00089 if stat.name.find(self._ns) > 0:
00090 self._item = stat
00091 break
00092
00093 def test_fail_init(self):
00094 while not rospy.is_shutdown():
00095 sleep(1.0)
00096 if rospy.get_time() - self._starttime > DURATION:
00097 break
00098
00099 self.assert_(not rospy.is_shutdown(), "Rospy shutdown!")
00100
00101 with self._mutex:
00102 self.assert_(self._ns, "Namespace is none. Option --ns not given")
00103 self.assert_(self._item, "No item with name %s found in diag_agg" % self._ns)
00104 self.assert_(self._item.level == 3, "Item failed to initialize, but was not stale. Level: %d" % self._item.level)
00105
00106
00107 if __name__ == '__main__':
00108 if False:
00109 suite = unittest.TestSuite()
00110 suite.addTest(TestFailInit('test_fail_init'))
00111 unittest.TextTestRunner(verbosity = 2).run(suite)
00112 else:
00113 rostest.run(PKG, sys.argv[0], TestFailInit, sys.argv)