39 from __future__
import with_statement
42 PKG =
'test_diagnostic_aggregator' 43 import rospy, rostest, unittest
44 from diagnostic_msgs.msg
import DiagnosticArray, DiagnosticStatus, KeyValue
45 from time
import sleep
49 from optparse
import OptionParser
53 return agg_name.split(
'/')[-1]
56 return '/'.join(agg_name.split(
'/')[1:-1])
60 super(TestFailInit, self).
__init__(*args)
62 parser = OptionParser(usage=
"usage ./%prog [options]", prog=
"fail_init_test.py")
63 parser.add_option(
'--ns', action=
"store", default=
None,
64 dest=
"ns", metavar=
"NAMESPACE",
65 help=
"Expected namespace that analyzer will fail in")
67 parser.add_option(
'--gtest_output', action=
"store",
70 options, args = parser.parse_args(rospy.myargv())
73 parser.error(
"Option --ns is mandatory. Unable to parse args")
81 rospy.init_node(
'test_fail_init')
84 sub_agg = rospy.Subscriber(
"/diagnostics_agg", DiagnosticArray, self.
diag_agg_cb)
88 for stat
in msg.status:
89 if stat.name.find(self.
_ns) > 0:
94 while not rospy.is_shutdown():
96 if rospy.get_time() - self.
_starttime > DURATION:
99 self.assert_(
not rospy.is_shutdown(),
"Rospy shutdown!")
102 self.assert_(self.
_ns,
"Namespace is none. Option --ns not given")
103 self.assert_(self.
_item,
"No item with name %s found in diag_agg" % self.
_ns)
104 self.assert_(self._item.level == 3,
"Item failed to initialize, but was not stale. Level: %d" % self._item.level)
107 if __name__ ==
'__main__':
109 suite = unittest.TestSuite()
111 unittest.TextTestRunner(verbosity = 2).run(suite)
113 rostest.run(PKG, sys.argv[0], TestFailInit, sys.argv)
def get_header_name(agg_name)
def diag_agg_cb(self, msg)
def get_raw_name(agg_name)