39 import rospy, rostest, unittest
40 from diagnostic_msgs.msg
import DiagnosticArray, DiagnosticStatus
41 from time
import sleep
48 super(TestDiscardStale, self).
__init__(*args)
55 rospy.init_node(
'test_expected_stale')
66 for stat
in msg.status:
72 for stat
in msg.status:
80 while (len(expecteds.keys()) != 1
and
81 not rospy.is_shutdown()
and
87 self.assert_(len(expecteds.keys()) == 1,
"The expected diagnostics are not of length 1."
88 "Received diagnostics: {}".format(expecteds))
89 self.assert_(expecteds[
'nonexistent2'].level == DiagnosticStatus.WARN)
94 while not rospy.is_shutdown():
101 "There should only be one expected aggregated item left, {} found instead!".
104 "The name of the first aggregated message should be '/Nonexistent2'!")
105 self.assert_(self.
_agg_expecteds[0].level == DiagnosticStatus.STALE,
106 "The level of the first aggregated message should be stale!")
111 while not rospy.is_shutdown():
118 "There should't be any aggregated items left, {} found instead! {}".
122 if __name__ ==
'__main__':
123 rostest.run(
'diagnostic_aggregator', sys.argv[0], TestDiscardStale, sys.argv)