39 from __future__
import with_statement
41 PKG =
'diagnostic_aggregator' 42 import roslib; roslib.load_manifest(PKG)
43 import rospy, rostest, unittest
44 from diagnostic_msgs.msg
import DiagnosticArray, DiagnosticStatus, KeyValue
45 from time
import sleep
50 return agg_name.split(
'/')[-1]
64 self.
level = msg.level
71 super(TestExpectedItemsStale, self).
__init__(*args)
78 rospy.init_node(
'test_expected_stale')
81 sub = rospy.Subscriber(
"/diagnostics", DiagnosticArray, self.
diag_cb)
82 sub_agg = rospy.Subscriber(
"/diagnostics_agg", DiagnosticArray, self.
diag_agg_cb)
86 for stat
in msg.status:
87 if stat.name.find(
'expected') == 0:
92 for stat
in msg.status:
93 if stat.name.find(
'expected') > 0:
97 while not rospy.is_shutdown():
99 if rospy.get_time() - self.
_starttime > DURATION:
103 self.assert_(len(self.
_expecteds) > 0,
"No expected items found in raw data!")
105 for name, item
in self.
_expecteds.iteritems():
106 self.assert_(self.
_agg_expecteds.has_key(name),
"Item %s not found in aggregated diagnostics output" % name)
108 self.assert_(self.
_agg_expecteds[name].level == 3,
"Stale item in diagnostics, but aggregated didn't report as stale. Item: %s, state: %d" %(name, self.
_agg_expecteds[name].level))
110 self.assert_(self.
_agg_expecteds[name].level == item.level,
"Diagnostic level of aggregated, raw item don't match for %s" % name)
112 if __name__ ==
'__main__':
113 rostest.run(PKG, sys.argv[0], TestExpectedItemsStale, sys.argv)
def test_expecteds_arent_stale(self)
def get_raw_name(agg_name)
def diag_agg_cb(self, msg)