00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 from __future__ import with_statement
00040 PKG = 'diagnostic_aggregator'
00041
00042 import roslib; roslib.load_manifest(PKG)
00043
00044 import unittest
00045 import rospy, rostest
00046 from time import sleep
00047 import sys
00048 from optparse import OptionParser
00049 import threading
00050 import types
00051
00052 from diagnostic_msgs.msg import DiagnosticArray
00053
00054 prefix = ""
00055
00056
00057 def fix_sub_name(name, remove_prefixes):
00058 last = str(name)
00059 for start_name in remove_prefixes:
00060 if last.startswith(start_name):
00061 last = last[len(start_name):]
00062 if last.startswith(':'):
00063 last = last[1:]
00064 while last.startswith(' '):
00065 last = last[1:]
00066
00067 return last
00068
00069 def combine_name_prefix(my_prefix, name, remove_prefixes):
00070 fixed = fix_sub_name(name.replace('/', ''), remove_prefixes)
00071 return '/'.join([prefix, my_prefix, fixed])
00072
00073 def header_name(my_prefix):
00074 return '/'.join([prefix, my_prefix])
00075
00076 def _get_params_list(params):
00077 out = []
00078 if type(params) in (list, tuple):
00079 for p in params:
00080 out.append(str(p))
00081 return out
00082 return [ str(params) ]
00083
00084 def name_to_full_generic(name, my_prefix, value, header=False):
00085 remove_prefixes = []
00086 if value.has_key('remove_prefix'):
00087 remove_prefixes = _get_params_list(value['remove_prefix'])
00088
00089 if value.has_key('find_and_remove_prefix'):
00090 for rp in _get_params_list(value['find_and_remove_prefix']):
00091 remove_prefixes.extend(_get_params_list(value['find_and_remove_prefix']))
00092 for sw in _get_params_list(value['find_and_remove_prefix']):
00093 if name.startswith(sw):
00094 if header:
00095 return header_name(my_prefix)
00096 return combine_name_prefix(my_prefix, name, remove_prefixes)
00097
00098 if value.has_key('startswith'):
00099 for sw in _get_params_list(value['startswith']):
00100 if name.startswith(sw):
00101 if header:
00102 return header_name(my_prefix)
00103 return combine_name_prefix(my_prefix, name, remove_prefixes)
00104
00105 if value.has_key('contains'):
00106 for con in _get_params_list(value['contains']):
00107 if name.find(con) >= 0:
00108 if header:
00109 return header_name(my_prefix)
00110 return combine_name_prefix(my_prefix, name, remove_prefixes)
00111
00112 if value.has_key('name'):
00113 for nm in _get_params_list(value['name']):
00114 if name == nm:
00115 if header:
00116 return header_name(my_prefix)
00117 return combine_name_prefix(my_prefix, name, remove_prefixes)
00118
00119 if value.has_key('expected'):
00120 for nm in _get_params_list(value['expected']):
00121 if name == nm:
00122 if header:
00123 return header_name(my_prefix)
00124 return combine_name_prefix(my_prefix, name, remove_prefixes)
00125
00126
00127 return None
00128
00129 def name_to_agg_name(name, params):
00130 for key, value in params.iteritems():
00131 if not value.has_key('path') or not value.has_key('type'):
00132 return None
00133 my_prefix = value['path']
00134 if value['type'] == 'GenericAnalyzer' or value['type'] == 'diagnostic_aggregator/GenericAnalyzer':
00135 generic_name = name_to_full_generic(name, my_prefix, value)
00136 if generic_name is not None:
00137 return generic_name
00138 else:
00139 return None
00140
00141
00142 return combine_name_prefix('Other', name, [])
00143
00144
00145 def name_to_agg_header(name, params):
00146 for key, value in params.iteritems():
00147 if not value.has_key('path') or not value.has_key('type'):
00148 return None
00149 my_prefix = value['path']
00150 if value['type'] == 'GenericAnalyzer' or value['type'] == 'diagnostic_aggregator/GenericAnalyzer':
00151 generic_name = name_to_full_generic(name, my_prefix, value, header=True)
00152 if generic_name is not None:
00153 return generic_name
00154 else:
00155 return None
00156
00157
00158 return header_name('Other')
00159
00160
00161 class TestAggregator(unittest.TestCase):
00162 def __init__(self, *args):
00163 super(TestAggregator, self).__init__(*args)
00164 parser = OptionParser(usage="./%prog [options]", prog="aggregator_test.py")
00165 parser.add_option('--gtest_output', action="store", dest="gtest")
00166 parser.add_option('--param_name', action="store", dest="param",
00167 default='diag_agg', metavar="PARAM_NAME",
00168 help="Name of parameter that defines analyzers")
00169 parser.add_option('--duration', action="store", dest="duration",
00170 default=10, metavar="DURATIION",
00171 help="Duration of test")
00172 parser.add_option('--base_path', action="store", dest="base_path",
00173 default="", metavar="BASE_PATH",
00174 help="Base path for all output topics")
00175
00176 self.diag_msgs = {}
00177 self.agg_msgs = {}
00178
00179 rospy.init_node('test_diag_agg')
00180 options, args = parser.parse_args(rospy.myargv())
00181
00182 global prefix
00183 prefix = options.base_path
00184
00185 self.params = rospy.get_param(options.param)
00186 self.duration = options.duration
00187 rospy.Subscriber('diagnostics_agg', DiagnosticArray, self.cb)
00188 rospy.Subscriber('diagnostics', DiagnosticArray, self.diag_cb)
00189
00190 self._mutex = threading.Lock()
00191
00192 def diag_cb(self, msg):
00193 with self._mutex:
00194 for stat in msg.status:
00195 self.diag_msgs[stat.name] = stat
00196
00197
00198 def cb(self, msg):
00199 with self._mutex:
00200 for stat in msg.status:
00201 self.agg_msgs[stat.name] = stat
00202
00203 def test_agg(self):
00204 start = rospy.get_time()
00205 while not rospy.is_shutdown():
00206 sleep(1.0)
00207 if rospy.get_time() - start > self.duration:
00208 break
00209
00210 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00211
00212 with self._mutex:
00213 all_headers = {}
00214
00215 for name, msg in self.agg_msgs.iteritems():
00216 self.assert_(name.startswith('/'), "Aggregated name %s doesn't start with \"/\"" % name)
00217
00218
00219 for name, msg in self.diag_msgs.iteritems():
00220 agg_name = name_to_agg_name(name, self.params)
00221
00222 self.assert_(agg_name is not None, 'Aggregated name is None for %s' % name)
00223 self.assert_(self.agg_msgs.has_key(agg_name), 'No matching name found for name: %s, aggregated name: %s' % (name, agg_name))
00224 self.assert_(msg.level == self.agg_msgs[agg_name].level, 'Status level of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s.' % (name, agg_name))
00225 self.assert_(msg.message == self.agg_msgs[agg_name].message, 'Status message of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s' % (name, agg_name))
00226
00227
00228
00229 if self.agg_msgs[agg_name].level == 3:
00230 self.agg_msgs[agg_name].level = -1
00231
00232 header = name_to_agg_header(name, self.params)
00233 if all_headers.has_key(header):
00234 all_headers[header] = max(all_headers[header], self.agg_msgs[agg_name].level)
00235 else:
00236 all_headers[header] = self.agg_msgs[agg_name].level
00237
00238 del self.agg_msgs[agg_name]
00239
00240
00241 for header, lvl in all_headers.iteritems():
00242
00243 if lvl == -1:
00244 lvl = 3
00245
00246 self.assert_(self.agg_msgs.has_key(header), "Header %s not found in messages" % header)
00247 self.assert_(self.agg_msgs[header].level == lvl, "Level of header %s doesn't match expected value." % header)
00248 del self.agg_msgs[header]
00249
00250
00251 if len(prefix) > 0:
00252 self.assert_(len(self.agg_msgs) == 1, "Incorrect number of messages remaining: %d. Messages: %s" % (len(self.agg_msgs), str(self.agg_msgs)))
00253
00254 self.assert_(self.agg_msgs.has_key(prefix), "Global prefix not found in messages: %s. Messages: %s" % (prefix, str(self.agg_msgs)))
00255 else:
00256 self.assert_(len(self.agg_msgs) == 0, "Incorrect number of messages remaining: %d. Messages: %s. Expected 0." % (len(self.agg_msgs), str(self.agg_msgs)))
00257
00258
00259
00260 if __name__ == '__main__':
00261 print 'SYS ARGS:', sys.argv
00262 rostest.run(PKG, sys.argv[0], TestAggregator, sys.argv)