39 from __future__
import with_statement
41 PKG =
'diagnostic_aggregator' 42 import roslib; roslib.load_manifest(PKG)
43 import rospy, rostest, unittest
44 from diagnostic_msgs.msg
import DiagnosticArray, DiagnosticStatus, KeyValue
45 from time
import sleep
54 return agg_name.split(
'/')[-1]
57 return '/'.join(agg_name.split(
'/')[1:-1])
72 self.
level = msg.level
79 super(TestMultipleMatch, self).
__init__(*args)
85 rospy.init_node(
'test_multiple_match')
88 sub_agg = rospy.Subscriber(
"/diagnostics_agg", DiagnosticArray, self.
diag_agg_cb)
92 for stat
in msg.status:
93 if stat.name.find(MULTI_NAME) > 0:
97 while not rospy.is_shutdown():
99 if rospy.get_time() - self.
_starttime > DURATION:
102 self.assert_(
not rospy.is_shutdown(),
"Rospy shutdown!")
105 self.assert_(self._multi_items.has_key(HEADER1),
"Didn't have item under %s. Items: %s" % (HEADER1, self.
_multi_items))
106 self.assert_(self.
_multi_items[HEADER1].name == MULTI_NAME,
"Item name under %s didn't match %s" % (HEADER1, MULTI_NAME))
108 self.assert_(self._multi_items.has_key(HEADER2),
"Didn't have item under %s" % HEADER2)
109 self.assert_(self.
_multi_items[HEADER2].name == MULTI_NAME,
"Item name under %s didn't match %s" % (HEADER2, MULTI_NAME))
112 if __name__ ==
'__main__':
114 suite = unittest.TestSuite()
116 unittest.TextTestRunner(verbosity = 2).run(suite)
118 rostest.run(PKG, sys.argv[0], TestMultipleMatch, sys.argv)
def test_multiple_match(self)
def get_header_name(agg_name)
def get_raw_name(agg_name)
def diag_agg_cb(self, msg)