aggregator_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2009, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of the Willow Garage nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 
35 ##\author Kevin Watts
36 
37 ##\brief Tests receipt of /diagnostics_agg from diagnostic aggregator
38 
39 from __future__ import with_statement
40 PKG = 'diagnostic_aggregator'
41 
42 import roslib; roslib.load_manifest(PKG)
43 
44 import unittest
45 import rospy, rostest
46 from time import sleep
47 import sys
48 from optparse import OptionParser
49 import threading
50 import types
51 
52 from diagnostic_msgs.msg import DiagnosticArray
53 
54 prefix = ""
55 
56 ##\brief Removes name chaff (ex: 'tilt_hokuyo_node: Frequency' to 'Frequency')
57 def fix_sub_name(name, remove_prefixes):
58  last = str(name)
59  for start_name in remove_prefixes:
60  if last.startswith(start_name):
61  last = last[len(start_name):]
62  if last.startswith(':'):
63  last = last[1:]
64  while last.startswith(' '):
65  last = last[1:]
66 
67  return last
68 
69 def combine_name_prefix(my_prefix, name, remove_prefixes):
70  fixed = fix_sub_name(name.replace('/', ''), remove_prefixes)
71  return '/'.join([prefix, my_prefix, fixed])
72 
73 def header_name(my_prefix):
74  return '/'.join([prefix, my_prefix])
75 
76 def _get_params_list(params):
77  out = []
78  if type(params) in (list, tuple):
79  for p in params:
80  out.append(str(p))
81  return out
82  return [ str(params) ]
83 
84 def name_to_full_generic(name, my_prefix, value, header=False):
85  remove_prefixes = []
86  if value.has_key('remove_prefix'):
87  remove_prefixes = _get_params_list(value['remove_prefix'])
88 
89  if value.has_key('find_and_remove_prefix'):
90  for rp in _get_params_list(value['find_and_remove_prefix']):
91  remove_prefixes.extend(_get_params_list(value['find_and_remove_prefix']))
92  for sw in _get_params_list(value['find_and_remove_prefix']):
93  if name.startswith(sw):
94  if header:
95  return header_name(my_prefix)
96  return combine_name_prefix(my_prefix, name, remove_prefixes)
97 
98  if value.has_key('startswith'):
99  for sw in _get_params_list(value['startswith']):
100  if name.startswith(sw):
101  if header:
102  return header_name(my_prefix)
103  return combine_name_prefix(my_prefix, name, remove_prefixes)
104 
105  if value.has_key('contains'):
106  for con in _get_params_list(value['contains']):
107  if name.find(con) >= 0:
108  if header:
109  return header_name(my_prefix)
110  return combine_name_prefix(my_prefix, name, remove_prefixes)
111 
112  if value.has_key('name'):
113  for nm in _get_params_list(value['name']):
114  if name == nm:
115  if header:
116  return header_name(my_prefix)
117  return combine_name_prefix(my_prefix, name, remove_prefixes)
118 
119  if value.has_key('expected'):
120  for nm in _get_params_list(value['expected']):
121  if name == nm:
122  if header:
123  return header_name(my_prefix)
124  return combine_name_prefix(my_prefix, name, remove_prefixes)
125 
126 
127  return None
128 
129 def name_to_agg_name(name, params):
130  for key, value in params.iteritems():
131  if not value.has_key('path') or not value.has_key('type'):
132  return None
133  my_prefix = value['path']
134  if value['type'] == 'GenericAnalyzer' or value['type'] == 'diagnostic_aggregator/GenericAnalyzer':
135  generic_name = name_to_full_generic(name, my_prefix, value)
136  if generic_name is not None:
137  return generic_name
138  else:
139  return None
140 
141  # If we don't have it...
142  return combine_name_prefix('Other', name, [])
143 
144 # Returns header name for particular item
145 def name_to_agg_header(name, params):
146  for key, value in params.iteritems():
147  if not value.has_key('path') or not value.has_key('type'):
148  return None
149  my_prefix = value['path']
150  if value['type'] == 'GenericAnalyzer' or value['type'] == 'diagnostic_aggregator/GenericAnalyzer':
151  generic_name = name_to_full_generic(name, my_prefix, value, header=True)
152  if generic_name is not None:
153  return generic_name
154  else:
155  return None
156 
157  # If we don't have it...
158  return header_name('Other')
159 
160 ##\brief Uses aggregator parameters to compare diagnostics with aggregated output
161 class TestAggregator(unittest.TestCase):
162  def __init__(self, *args):
163  super(TestAggregator, self).__init__(*args)
164  parser = OptionParser(usage="./%prog [options]", prog="aggregator_test.py")
165  parser.add_option('--gtest_output', action="store", dest="gtest")
166  parser.add_option('--param_name', action="store", dest="param",
167  default='diag_agg', metavar="PARAM_NAME",
168  help="Name of parameter that defines analyzers")
169  parser.add_option('--duration', action="store", dest="duration",
170  default=10, metavar="DURATIION",
171  help="Duration of test")
172  parser.add_option('--base_path', action="store", dest="base_path",
173  default="", metavar="BASE_PATH",
174  help="Base path for all output topics")
175 
176  self.diag_msgs = {}
177  self.agg_msgs = {}
178 
179  rospy.init_node('test_diag_agg')
180  options, args = parser.parse_args(rospy.myargv())
181 
182  global prefix
183  prefix = options.base_path
184 
185  self.params = rospy.get_param(options.param)
186  self.duration = options.duration
187  rospy.Subscriber('diagnostics_agg', DiagnosticArray, self.cb)
188  rospy.Subscriber('diagnostics', DiagnosticArray, self.diag_cb)
189 
190  self._mutex = threading.Lock()
191 
192  def diag_cb(self, msg):
193  with self._mutex:
194  for stat in msg.status:
195  self.diag_msgs[stat.name] = stat
196 
197 
198  def cb(self, msg):
199  with self._mutex:
200  for stat in msg.status:
201  self.agg_msgs[stat.name] = stat
202 
203  def test_agg(self):
204  start = rospy.get_time()
205  while not rospy.is_shutdown():
206  sleep(1.0)
207  if rospy.get_time() - start > self.duration:
208  break
209 
210  self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
211 
212  with self._mutex:
213  all_headers = {}
214 
215  for name, msg in self.agg_msgs.iteritems():
216  self.assert_(name.startswith('/'), "Aggregated name %s doesn't start with \"/\"" % name)
217 
218  # Go through all messages and check that we have them in aggregate
219  for name, msg in self.diag_msgs.iteritems():
220  agg_name = name_to_agg_name(name, self.params)
221 
222  self.assert_(agg_name is not None, 'Aggregated name is None for %s' % name)
223  self.assert_(self.agg_msgs.has_key(agg_name), 'No matching name found for name: %s, aggregated name: %s' % (name, agg_name))
224  self.assert_(msg.level == self.agg_msgs[agg_name].level, 'Status level of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s.' % (name, agg_name))
225  self.assert_(msg.message == self.agg_msgs[agg_name].message, 'Status message of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s' % (name, agg_name))
226 
227  # This is because the analyzers only reports stale if
228  # all messages underneath it are stale
229  if self.agg_msgs[agg_name].level == 3: # Stale
230  self.agg_msgs[agg_name].level = -1
231 
232  header = name_to_agg_header(name, self.params)
233  if all_headers.has_key(header):
234  all_headers[header] = max(all_headers[header], self.agg_msgs[agg_name].level)
235  else:
236  all_headers[header] = self.agg_msgs[agg_name].level
237 
238  del self.agg_msgs[agg_name]
239 
240  # Check that we have all_headers
241  for header, lvl in all_headers.iteritems():
242  # If everything is stale, report stale. Otherwise, it should report an error
243  if lvl == -1:
244  lvl = 3
245 
246  self.assert_(self.agg_msgs.has_key(header), "Header %s not found in messages" % header)
247  self.assert_(self.agg_msgs[header].level == lvl, "Level of header %s doesn't match expected value." % header)
248  del self.agg_msgs[header]
249 
250  # Check that we have the main header message
251  if len(prefix) > 0:
252  self.assert_(len(self.agg_msgs) == 1, "Incorrect number of messages remaining: %d. Messages: %s" % (len(self.agg_msgs), str(self.agg_msgs)))
253 
254  self.assert_(self.agg_msgs.has_key(prefix), "Global prefix not found in messages: %s. Messages: %s" % (prefix, str(self.agg_msgs)))
255  else:
256  self.assert_(len(self.agg_msgs) == 0, "Incorrect number of messages remaining: %d. Messages: %s. Expected 0." % (len(self.agg_msgs), str(self.agg_msgs)))
257 
258 
259 
260 if __name__ == '__main__':
261  print 'SYS ARGS:', sys.argv
262  rostest.run(PKG, sys.argv[0], TestAggregator, sys.argv)
def header_name(my_prefix)
def fix_sub_name(name, remove_prefixes)
Removes name chaff (ex: 'tilt_hokuyo_node: Frequency' to 'Frequency')
def _get_params_list(params)
def name_to_agg_name(name, params)
def combine_name_prefix(my_prefix, name, remove_prefixes)
def name_to_agg_header(name, params)
def name_to_full_generic(name, my_prefix, value, header=False)
Uses aggregator parameters to compare diagnostics with aggregated output.


diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen
autogenerated on Thu Oct 8 2020 03:17:16