39 import rospy, rostest, unittest
40 from diagnostic_msgs.msg
import DiagnosticArray, DiagnosticStatus
41 from time
import sleep
48 super(TestDiscardStale, self).
__init__(*args)
55 rospy.init_node(
'test_expected_stale')
66 for stat
in msg.status:
72 for stat
in msg.status:
73 self._agg_expecteds.append(stat)
80 while (len(expecteds.keys()) != 1
and 81 not rospy.is_shutdown()
and 87 self.assert_(len(expecteds.keys()) == 1,
"The expected diagnostics are not of length 1." 88 "Received diagnostics: {}".format(expecteds))
89 self.assert_(expecteds[
'nonexistent2'].level == DiagnosticStatus.WARN)
93 while not rospy.is_shutdown():
100 "There should only be one expected aggregated item left, {} found instead!".
103 "The name of the first aggregated message should be '/Nonexistent2'!")
104 self.assert_(self.
_agg_expecteds[0].level == DiagnosticStatus.STALE,
105 "The level of the first aggregated message should be stale!")
109 while not rospy.is_shutdown():
116 "There should't be any aggregated items left, {} found instead! {}".
120 if __name__ ==
'__main__':
121 rostest.run(
'diagnostic_aggregator', sys.argv[0], TestDiscardStale, sys.argv)
def test_discard_stale(self)
def diag_agg_cb(self, msg)