39 from __future__
import with_statement
40 PKG =
'diagnostic_aggregator' 42 import roslib; roslib.load_manifest(PKG)
46 from time
import sleep
48 from optparse
import OptionParser
52 from diagnostic_msgs.msg
import DiagnosticArray
59 for start_name
in remove_prefixes:
60 if last.startswith(start_name):
61 last = last[len(start_name):]
62 if last.startswith(
':'):
64 while last.startswith(
' '):
70 fixed =
fix_sub_name(name.replace(
'/',
''), remove_prefixes)
71 return '/'.join([prefix, my_prefix, fixed])
74 return '/'.join([prefix, my_prefix])
78 if type(params)
in (list, tuple):
82 return [ str(params) ]
86 if value.has_key(
'remove_prefix'):
89 if value.has_key(
'find_and_remove_prefix'):
93 if name.startswith(sw):
98 if value.has_key(
'startswith'):
100 if name.startswith(sw):
105 if value.has_key(
'contains'):
107 if name.find(con) >= 0:
112 if value.has_key(
'name'):
119 if value.has_key(
'expected'):
130 for key, value
in params.iteritems():
131 if not value.has_key(
'path')
or not value.has_key(
'type'):
133 my_prefix = value[
'path']
134 if value[
'type'] ==
'GenericAnalyzer' or value[
'type'] ==
'diagnostic_aggregator/GenericAnalyzer':
136 if generic_name
is not None:
146 for key, value
in params.iteritems():
147 if not value.has_key(
'path')
or not value.has_key(
'type'):
149 my_prefix = value[
'path']
150 if value[
'type'] ==
'GenericAnalyzer' or value[
'type'] ==
'diagnostic_aggregator/GenericAnalyzer':
152 if generic_name
is not None:
163 super(TestAggregator, self).
__init__(*args)
164 parser = OptionParser(usage=
"./%prog [options]", prog=
"aggregator_test.py")
165 parser.add_option(
'--gtest_output', action=
"store", dest=
"gtest")
166 parser.add_option(
'--param_name', action=
"store", dest=
"param",
167 default=
'diag_agg', metavar=
"PARAM_NAME",
168 help=
"Name of parameter that defines analyzers")
169 parser.add_option(
'--duration', action=
"store", dest=
"duration",
170 default=10, metavar=
"DURATIION",
171 help=
"Duration of test")
172 parser.add_option(
'--base_path', action=
"store", dest=
"base_path",
173 default=
"", metavar=
"BASE_PATH",
174 help=
"Base path for all output topics")
179 rospy.init_node(
'test_diag_agg')
180 options, args = parser.parse_args(rospy.myargv())
183 prefix = options.base_path
185 self.
params = rospy.get_param(options.param)
187 rospy.Subscriber(
'diagnostics_agg', DiagnosticArray, self.
cb)
188 rospy.Subscriber(
'diagnostics', DiagnosticArray, self.
diag_cb)
194 for stat
in msg.status:
200 for stat
in msg.status:
204 start = rospy.get_time()
205 while not rospy.is_shutdown():
207 if rospy.get_time() - start > self.
duration:
210 self.assert_(
not rospy.is_shutdown(),
"Rospy shutdown")
215 for name, msg
in self.
agg_msgs.iteritems():
216 self.assert_(name.startswith(
'/'),
"Aggregated name %s doesn't start with \"/\"" % name)
219 for name, msg
in self.
diag_msgs.iteritems():
222 self.assert_(agg_name
is not None,
'Aggregated name is None for %s' % name)
223 self.assert_(self.
agg_msgs.has_key(agg_name),
'No matching name found for name: %s, aggregated name: %s' % (name, agg_name))
224 self.assert_(msg.level == self.
agg_msgs[agg_name].level,
'Status level of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s.' % (name, agg_name))
225 self.assert_(msg.message == self.
agg_msgs[agg_name].message,
'Status message of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s' % (name, agg_name))
229 if self.
agg_msgs[agg_name].level == 3:
233 if all_headers.has_key(header):
234 all_headers[header] = max(all_headers[header], self.
agg_msgs[agg_name].level)
236 all_headers[header] = self.
agg_msgs[agg_name].level
241 for header, lvl
in all_headers.iteritems():
246 self.assert_(self.
agg_msgs.has_key(header),
"Header %s not found in messages" % header)
247 self.assert_(self.
agg_msgs[header].level == lvl,
"Level of header %s doesn't match expected value." % header)
252 self.assert_(len(self.
agg_msgs) == 1,
"Incorrect number of messages remaining: %d. Messages: %s" % (len(self.
agg_msgs), str(self.
agg_msgs)))
254 self.assert_(self.
agg_msgs.has_key(prefix),
"Global prefix not found in messages: %s. Messages: %s" % (prefix, str(self.
agg_msgs)))
256 self.assert_(len(self.
agg_msgs) == 0,
"Incorrect number of messages remaining: %d. Messages: %s. Expected 0." % (len(self.
agg_msgs), str(self.
agg_msgs)))
260 if __name__ ==
'__main__':
261 print 'SYS ARGS:', sys.argv
262 rostest.run(PKG, sys.argv[0], TestAggregator, sys.argv)
def header_name(my_prefix)
def fix_sub_name(name, remove_prefixes)
Removes name chaff (ex: 'tilt_hokuyo_node: Frequency' to 'Frequency')
def _get_params_list(params)
def name_to_agg_name(name, params)
def combine_name_prefix(my_prefix, name, remove_prefixes)
def name_to_agg_header(name, params)
def name_to_full_generic(name, my_prefix, value, header=False)
Uses aggregator parameters to compare diagnostics with aggregated output.