Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 import rospy, rostest, unittest
00040 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
00041 from time import sleep
00042 import sys
00043 import threading
00044
00045
00046 class TestDiscardStale(unittest.TestCase):
00047 def __init__(self, *args):
00048 super(TestDiscardStale, self).__init__(*args)
00049
00050 self._mutex = threading.Lock()
00051
00052 self._agg_expecteds = []
00053 self._expecteds = {}
00054
00055 rospy.init_node('test_expected_stale')
00056 self._diag_agg_sub = rospy.Subscriber("/diagnostics_agg",
00057 DiagnosticArray,
00058 self.diag_agg_cb)
00059 self._diag_sub = rospy.Subscriber("/diagnostics",
00060 DiagnosticArray,
00061 self.diag_cb)
00062 self._start_time = rospy.get_time()
00063
00064 def diag_cb(self, msg):
00065 with self._mutex:
00066 for stat in msg.status:
00067 self._expecteds[stat.name] = stat
00068
00069 def diag_agg_cb(self, msg):
00070 with self._mutex:
00071 self._agg_expecteds = []
00072 for stat in msg.status:
00073 self._agg_expecteds.append(stat)
00074
00075 def test_discard_stale(self):
00076 expecteds = {}
00077 timeout = 10
00078
00079
00080 while (len(expecteds.keys()) != 1 and
00081 not rospy.is_shutdown() and
00082 (rospy.get_time() - self._start_time < timeout)):
00083 sleep(1.0)
00084 with self._mutex:
00085 expecteds = self._expecteds
00086
00087 self.assert_(len(expecteds.keys()) == 1, "The expected diagnostics are not of length 1."
00088 "Received diagnostics: {}".format(expecteds))
00089 self.assert_(expecteds['nonexistent2'].level == DiagnosticStatus.WARN)
00090
00091 duration = 8
00092
00093 while not rospy.is_shutdown():
00094 sleep(1.0)
00095 if rospy.get_time() - self._start_time > duration:
00096 break
00097
00098 with self._mutex:
00099 self.assert_(len(self._agg_expecteds) == 1,
00100 "There should only be one expected aggregated item left, {} found instead!".
00101 format(len(self._agg_expecteds)))
00102 self.assert_(self._agg_expecteds[0].name == "/Nonexistent2",
00103 "The name of the first aggregated message should be '/Nonexistent2'!")
00104 self.assert_(self._agg_expecteds[0].level == DiagnosticStatus.STALE,
00105 "The level of the first aggregated message should be stale!")
00106
00107 duration = 15
00108
00109 while not rospy.is_shutdown():
00110 sleep(1.0)
00111 if rospy.get_time() - self._start_time > duration:
00112 break
00113
00114 with self._mutex:
00115 self.assert_(len(self._agg_expecteds) == 0,
00116 "There should't be any aggregated items left, {} found instead! {}".
00117 format(len(self._agg_expecteds), self._agg_expecteds))
00118
00119
00120 if __name__ == '__main__':
00121 rostest.run('diagnostic_aggregator', sys.argv[0], TestDiscardStale, sys.argv)
00122