Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 from __future__ import with_statement
00040 DURATION = 15
00041 PKG = 'diagnostic_aggregator'
00042 import roslib; roslib.load_manifest(PKG)
00043 import rospy, rostest, unittest
00044 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00045 from time import sleep
00046 import sys
00047 import threading
00048
00049 def get_raw_name(agg_name):
00050 return agg_name.split('/')[-1]
00051
00052 class DiagnosticItem:
00053 def __init__(self, msg):
00054 self.name = get_raw_name(msg.name)
00055 self.level = msg.level
00056 self.message = msg.message
00057
00058 self.update_time = rospy.get_time()
00059
00060 def is_stale(self):
00061 return rospy.get_time() - self.update_time > 5
00062
00063 def update(self, msg):
00064 self.level = msg.level
00065 self.message = msg.message
00066
00067 self.update_time = rospy.get_time()
00068
00069 class TestExpectedItemsStale(unittest.TestCase):
00070 def __init__(self, *args):
00071 super(TestExpectedItemsStale, self).__init__(*args)
00072
00073 self._mutex = threading.Lock()
00074
00075 self._expecteds = {}
00076 self._agg_expecteds = {}
00077
00078 rospy.init_node('test_expected_stale')
00079 self._starttime = rospy.get_time()
00080
00081 sub = rospy.Subscriber("/diagnostics", DiagnosticArray, self.diag_cb)
00082 sub_agg = rospy.Subscriber("/diagnostics_agg", DiagnosticArray, self.diag_agg_cb)
00083
00084 def diag_cb(self, msg):
00085 with self._mutex:
00086 for stat in msg.status:
00087 if stat.name.find('expected') == 0:
00088 self._expecteds[stat.name] = DiagnosticItem(stat)
00089
00090 def diag_agg_cb(self, msg):
00091 with self._mutex:
00092 for stat in msg.status:
00093 if stat.name.find('expected') > 0:
00094 self._agg_expecteds[get_raw_name(stat.name)] = DiagnosticItem(stat)
00095
00096 def test_expecteds_arent_stale(self):
00097 while not rospy.is_shutdown():
00098 sleep(1.0)
00099 if rospy.get_time() - self._starttime > DURATION:
00100 break
00101
00102 with self._mutex:
00103 self.assert_(len(self._expecteds) > 0, "No expected items found in raw data!")
00104
00105 for name, item in self._expecteds.iteritems():
00106 self.assert_(self._agg_expecteds.has_key(name), "Item %s not found in aggregated diagnostics output" % name)
00107 if item.is_stale():
00108 self.assert_(self._agg_expecteds[name].level == 3, "Stale item in diagnostics, but aggregated didn't report as stale. Item: %s, state: %d" %(name, self._agg_expecteds[name].level))
00109 else:
00110 self.assert_(self._agg_expecteds[name].level == item.level, "Diagnostic level of aggregated, raw item don't match for %s" % name)
00111
00112 if __name__ == '__main__':
00113 rostest.run(PKG, sys.argv[0], TestExpectedItemsStale, sys.argv)