39 from __future__
import with_statement
40 PKG =
'diagnostic_aggregator'
42 import roslib; roslib.load_manifest(PKG)
46 from time
import sleep
48 from optparse
import OptionParser
52 from diagnostic_msgs.msg
import DiagnosticArray
59 for start_name
in remove_prefixes:
60 if last.startswith(start_name):
61 last = last[len(start_name):]
62 if last.startswith(
':'):
64 while last.startswith(
' '):
70 fixed =
fix_sub_name(name.replace(
'/',
''), remove_prefixes)
71 return '/'.join([prefix, my_prefix, fixed])
74 return '/'.join([prefix, my_prefix])
78 if type(params)
in (list, tuple):
82 return [ str(params) ]
86 if 'remove_prefix' in value:
89 if 'find_and_remove_prefix' in value:
93 if name.startswith(sw):
98 if 'startswith' in value:
100 if name.startswith(sw):
105 if 'contains' in value:
107 if name.find(con) >= 0:
119 if 'expected' in value:
130 for key, value
in params.items():
131 if not 'path' in value
or not 'type' in value:
133 my_prefix = value[
'path']
134 if value[
'type'] ==
'GenericAnalyzer' or value[
'type'] ==
'diagnostic_aggregator/GenericAnalyzer':
136 if generic_name
is not None:
146 for key, value
in params.items():
147 if not 'path' in value
or not 'type' in value:
149 my_prefix = value[
'path']
150 if value[
'type'] ==
'GenericAnalyzer' or value[
'type'] ==
'diagnostic_aggregator/GenericAnalyzer':
152 if generic_name
is not None:
163 super(TestAggregator, self).
__init__(*args)
164 parser = OptionParser(usage=
"./%prog [options]", prog=
"aggregator_test.py")
165 parser.add_option(
'--gtest_output', action=
"store", dest=
"gtest")
166 parser.add_option(
'--param_name', action=
"store", dest=
"param",
167 default=
'diag_agg', metavar=
"PARAM_NAME",
168 help=
"Name of parameter that defines analyzers")
169 parser.add_option(
'--duration', action=
"store", dest=
"duration",
170 default=10, metavar=
"DURATIION",
171 help=
"Duration of test")
172 parser.add_option(
'--base_path', action=
"store", dest=
"base_path",
173 default=
"", metavar=
"BASE_PATH",
174 help=
"Base path for all output topics")
179 rospy.init_node(
'test_diag_agg')
180 options, args = parser.parse_args(rospy.myargv())
183 prefix = options.base_path
185 self.
params = rospy.get_param(options.param)
187 rospy.Subscriber(
'diagnostics_agg', DiagnosticArray, self.
cb)
188 rospy.Subscriber(
'diagnostics', DiagnosticArray, self.
diag_cb)
194 for stat
in msg.status:
200 for stat
in msg.status:
204 start = rospy.get_time()
205 while not rospy.is_shutdown():
207 if rospy.get_time() - start > self.
duration:
210 self.assert_(
not rospy.is_shutdown(),
"Rospy shutdown")
215 for name, msg
in self.
agg_msgs.items():
216 self.assert_(name.startswith(
'/'),
"Aggregated name %s doesn't start with \"/\"" % name)
222 self.assert_(agg_name
is not None,
'Aggregated name is None for %s' % name)
223 self.assert_(agg_name
in self.
agg_msgs,
'No matching name found for name: %s, aggregated name: %s' % (name, agg_name))
224 self.assert_(msg.level == self.
agg_msgs[agg_name].level,
'Status level of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s.' % (name, agg_name))
225 self.assert_(msg.message == self.
agg_msgs[agg_name].message,
'Status message of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s' % (name, agg_name))
229 if self.
agg_msgs[agg_name].level == 3:
233 if header
in all_headers:
234 all_headers[header] = max(all_headers[header], self.
agg_msgs[agg_name].level)
236 all_headers[header] = self.
agg_msgs[agg_name].level
241 for header, lvl
in all_headers.items():
246 self.assert_(header
in self.
agg_msgs,
"Header %s not found in messages" % header)
247 self.assert_(self.
agg_msgs[header].level == lvl,
"Level of header %s doesn't match expected value." % header)
252 self.assert_(len(self.
agg_msgs) == 1,
"Incorrect number of messages remaining: %d. Messages: %s" % (len(self.
agg_msgs), str(self.
agg_msgs)))
254 self.assert_(prefix
in self.
agg_msgs,
"Global prefix not found in messages: %s. Messages: %s" % (prefix, str(self.
agg_msgs)))
256 self.assert_(len(self.
agg_msgs) == 0,
"Incorrect number of messages remaining: %d. Messages: %s. Expected 0." % (len(self.
agg_msgs), str(self.
agg_msgs)))
260 if __name__ ==
'__main__':
261 print(
'SYS ARGS:', sys.argv)
262 rostest.run(PKG, sys.argv[0], TestAggregator, sys.argv)