39 from __future__ 
import with_statement
 
   40 PKG = 
'diagnostic_aggregator' 
   42 import roslib; roslib.load_manifest(PKG)
 
   46 from time 
import sleep
 
   48 from optparse 
import OptionParser
 
   52 from diagnostic_msgs.msg 
import DiagnosticArray
 
   59     for start_name 
in remove_prefixes:
 
   60         if last.startswith(start_name):
 
   61             last = last[len(start_name):]
 
   62         if last.startswith(
':'):
 
   64         while last.startswith(
' '):
 
   70     fixed = 
fix_sub_name(name.replace(
'/', 
''), remove_prefixes)
 
   71     return '/'.join([prefix, my_prefix, fixed])
 
   74     return '/'.join([prefix, my_prefix])
 
   78     if type(params) 
in (list, tuple):
 
   82     return [ str(params) ]
 
   86     if 'remove_prefix' in value:
 
   89     if 'find_and_remove_prefix' in value:
 
   93             if name.startswith(sw):
 
   98     if 'startswith' in value:
 
  100             if name.startswith(sw):
 
  105     if 'contains' in value:
 
  107             if name.find(con) >= 0:
 
  119     if 'expected' in value:
 
  130     for key, value 
in params.items():
 
  131         if not 'path' in value 
or not 'type' in value:
 
  133         my_prefix = value[
'path']
 
  134         if value[
'type'] == 
'GenericAnalyzer' or value[
'type'] == 
'diagnostic_aggregator/GenericAnalyzer':
 
  136             if generic_name 
is not None:
 
  146     for key, value 
in params.items():
 
  147         if not 'path' in value 
or not 'type' in value:
 
  149         my_prefix = value[
'path']
 
  150         if value[
'type'] == 
'GenericAnalyzer' or value[
'type'] == 
'diagnostic_aggregator/GenericAnalyzer':
 
  152             if generic_name 
is not None:
 
  163         super(TestAggregator, self).
__init__(*args)
 
  164         parser = OptionParser(usage=
"./%prog [options]", prog=
"aggregator_test.py")
 
  165         parser.add_option(
'--gtest_output', action=
"store", dest=
"gtest")
 
  166         parser.add_option(
'--param_name', action=
"store", dest=
"param",
 
  167                           default=
'diag_agg', metavar=
"PARAM_NAME",
 
  168                           help=
"Name of parameter that defines analyzers")
 
  169         parser.add_option(
'--duration', action=
"store", dest=
"duration",
 
  170                           default=10, metavar=
"DURATIION",
 
  171                           help=
"Duration of test")
 
  172         parser.add_option(
'--base_path', action=
"store", dest=
"base_path",
 
  173                           default=
"", metavar=
"BASE_PATH",
 
  174                           help=
"Base path for all output topics")
 
  179         rospy.init_node(
'test_diag_agg')
 
  180         options, args = parser.parse_args(rospy.myargv())
 
  183         prefix = options.base_path
 
  185         self.
params = rospy.get_param(options.param)
 
  187         rospy.Subscriber(
'diagnostics_agg', DiagnosticArray, self.
cb)
 
  188         rospy.Subscriber(
'diagnostics', DiagnosticArray, self.
diag_cb)
 
  194             for stat 
in msg.status:
 
  200             for stat 
in msg.status:
 
  204         start = rospy.get_time()
 
  205         while not rospy.is_shutdown():
 
  207             if rospy.get_time() - start > self.
duration:
 
  210         self.assert_(
not rospy.is_shutdown(), 
"Rospy shutdown")
 
  215             for name, msg 
in self.
agg_msgs.items():
 
  216                 self.assert_(name.startswith(
'/'), 
"Aggregated name %s doesn't start with \"/\"" % name)
 
  222                 self.assert_(agg_name 
is not None, 
'Aggregated name is None for %s' % name)
 
  223                 self.assert_(agg_name 
in self.
agg_msgs, 
'No matching name found for name: %s, aggregated name: %s' % (name, agg_name))
 
  224                 self.assert_(msg.level == self.
agg_msgs[agg_name].level, 
'Status level of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s.' % (name, agg_name))
 
  225                 self.assert_(msg.message == self.
agg_msgs[agg_name].message, 
'Status message of original, aggregated messages doesn\'t match. Name: %s, aggregated name: %s' % (name, agg_name))
 
  229                 if self.
agg_msgs[agg_name].level == 3: 
 
  233                 if header 
in all_headers:
 
  234                     all_headers[header] = max(all_headers[header], self.
agg_msgs[agg_name].level)
 
  236                     all_headers[header] = self.
agg_msgs[agg_name].level
 
  241             for header, lvl 
in all_headers.items():
 
  246                 self.assert_(header 
in self.
agg_msgs, 
"Header %s not found in messages" % header)
 
  247                 self.assert_(self.
agg_msgs[header].level == lvl, 
"Level of header %s doesn't match expected value." % header)
 
  252                 self.assert_(len(self.
agg_msgs) == 1, 
"Incorrect number of messages remaining: %d. Messages: %s" % (len(self.
agg_msgs), str(self.
agg_msgs)))
 
  254                 self.assert_(prefix 
in self.
agg_msgs, 
"Global prefix not found in messages: %s. Messages: %s" % (prefix, str(self.
agg_msgs)))
 
  256                 self.assert_(len(self.
agg_msgs) == 0, 
"Incorrect number of messages remaining: %d. Messages: %s. Expected 0." % (len(self.
agg_msgs), str(self.
agg_msgs)))
 
  260 if __name__ == 
'__main__':
 
  261     print(
'SYS ARGS:', sys.argv)
 
  262     rostest.run(PKG, sys.argv[0], TestAggregator, sys.argv)