39 from __future__
import with_statement
42 PKG =
'test_diagnostic_aggregator' 43 import rospy, rostest, unittest
44 from diagnostic_msgs.msg
import DiagnosticArray, DiagnosticStatus, KeyValue
45 from time
import sleep
49 from optparse
import OptionParser
51 MATCH_NAME =
'Match Item' 54 return agg_name.split(
'/')[-1]
57 return '/'.join(agg_name.split(
'/')[1:-1])
61 super(TestMatchAnalyze, self).
__init__(*args)
63 parser = OptionParser(usage=
"usage ./%prog [options]", prog=
"match_analyze_test.py")
64 parser.add_option(
'--header', action=
"store", default=
None,
65 dest=
"header", metavar=
"HEADER",
66 help=
"Expected header that \"Match Item\" will be under")
68 parser.add_option(
'--gtest_output', action=
"store",
71 options, args = parser.parse_args(rospy.myargv())
73 if not options.header:
74 parser.error(
"Option --header is mandatory. Unable to parse args")
82 rospy.init_node(
'test_match_analyze')
85 sub_agg = rospy.Subscriber(
"/diagnostics_agg", DiagnosticArray, self.
diag_agg_cb)
89 for stat
in msg.status:
90 if stat.name.find(MATCH_NAME) > 0:
95 while not rospy.is_shutdown():
97 if rospy.get_time() - self.
_starttime > DURATION:
100 self.assert_(
not rospy.is_shutdown(),
"Rospy shutdown!")
103 self.assert_(self.
header,
"Header is none. Option --header not given")
105 self.assert_(self.match_headers.count(self.
header) > 0,
"Didn't have item under header \"%s\". Header: \"%s\"" % (self.
header, self.
match_headers[0]))
108 if __name__ ==
'__main__':
110 suite = unittest.TestSuite()
112 unittest.TextTestRunner(verbosity = 2).run(suite)
114 rostest.run(PKG, sys.argv[0], TestMatchAnalyze, sys.argv)
def get_header_name(agg_name)
def diag_agg_cb(self, msg)
def test_match_analyze(self)
def get_raw_name(agg_name)