expected_stale_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2009, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of the Willow Garage nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 
35 
36 
37 
38 
39 from __future__ import with_statement
40 DURATION = 15
41 PKG = 'diagnostic_aggregator'
42 import roslib; roslib.load_manifest(PKG)
43 import rospy, rostest, unittest
44 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
45 from time import sleep
46 import sys
47 import threading
48 
49 def get_raw_name(agg_name):
50  return agg_name.split('/')[-1]
51 
53  def __init__(self, msg):
54  self.name = get_raw_name(msg.name)
55  self.level = msg.level
56  self.message = msg.message
57 
58  self.update_time = rospy.get_time()
59 
60  def is_stale(self):
61  return rospy.get_time() - self.update_time > 5
62 
63  def update(self, msg):
64  self.level = msg.level
65  self.message = msg.message
66 
67  self.update_time = rospy.get_time()
68 
69 class TestExpectedItemsStale(unittest.TestCase):
70  def __init__(self, *args):
71  super(TestExpectedItemsStale, self).__init__(*args)
72 
73  self._mutex = threading.Lock()
74 
75  self._expecteds = {}
76  self._agg_expecteds = {}
77 
78  rospy.init_node('test_expected_stale')
79  self._starttime = rospy.get_time()
80 
81  sub = rospy.Subscriber("/diagnostics", DiagnosticArray, self.diag_cb)
82  sub_agg = rospy.Subscriber("/diagnostics_agg", DiagnosticArray, self.diag_agg_cb)
83 
84  def diag_cb(self, msg):
85  with self._mutex:
86  for stat in msg.status:
87  if stat.name.find('expected') == 0:
88  self._expecteds[stat.name] = DiagnosticItem(stat)
89 
90  def diag_agg_cb(self, msg):
91  with self._mutex:
92  for stat in msg.status:
93  if stat.name.find('expected') > 0:
94  self._agg_expecteds[get_raw_name(stat.name)] = DiagnosticItem(stat)
95 
97  while not rospy.is_shutdown():
98  sleep(1.0)
99  if rospy.get_time() - self._starttime > DURATION:
100  break
101 
102  with self._mutex:
103  self.assert_(len(self._expecteds) > 0, "No expected items found in raw data!")
104 
105  for name, item in self._expecteds.iteritems():
106  self.assert_(self._agg_expecteds.has_key(name), "Item %s not found in aggregated diagnostics output" % name)
107  if item.is_stale():
108  self.assert_(self._agg_expecteds[name].level == 3, "Stale item in diagnostics, but aggregated didn't report as stale. Item: %s, state: %d" %(name, self._agg_expecteds[name].level))
109  else:
110  self.assert_(self._agg_expecteds[name].level == item.level, "Diagnostic level of aggregated, raw item don't match for %s" % name)
111 
112 if __name__ == '__main__':
113  rostest.run(PKG, sys.argv[0], TestExpectedItemsStale, sys.argv)
def get_raw_name(agg_name)


diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen
autogenerated on Mon Feb 28 2022 22:16:34