match_analyze_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2009, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of the Willow Garage nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 
35 ##\author Kevin Watts
36 
37 ##\brief Tests that analyzer that matches item will not affect item going into Other
38 
39 from __future__ import with_statement
40 
41 DURATION = 10
42 PKG = 'test_diagnostic_aggregator'
43 import rospy, rostest, unittest
44 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
45 from time import sleep
46 import sys
47 import threading
48 
49 from optparse import OptionParser
50 
51 MATCH_NAME = 'Match Item'
52 
53 def get_raw_name(agg_name):
54  return agg_name.split('/')[-1]
55 
56 def get_header_name(agg_name):
57  return '/'.join(agg_name.split('/')[1:-1])
58 
59 class TestMatchAnalyze(unittest.TestCase):
60  def __init__(self, *args):
61  super(TestMatchAnalyze, self).__init__(*args)
62 
63  parser = OptionParser(usage="usage ./%prog [options]", prog="match_analyze_test.py")
64  parser.add_option('--header', action="store", default=None,
65  dest="header", metavar="HEADER",
66  help="Expected header that \"Match Item\" will be under")
67  # Option comes with rostest, will fail w/o this line
68  parser.add_option('--gtest_output', action="store",
69  dest="gtest")
70 
71  options, args = parser.parse_args(rospy.myargv())
72 
73  if not options.header:
74  parser.error("Option --header is mandatory. Unable to parse args")
75 
76  self.header = options.header
77 
78  self._mutex = threading.Lock()
79 
80  self.match_headers = []
81 
82  rospy.init_node('test_match_analyze')
83  self._starttime = rospy.get_time()
84 
85  sub_agg = rospy.Subscriber("/diagnostics_agg", DiagnosticArray, self.diag_agg_cb)
86 
87  def diag_agg_cb(self, msg):
88  with self._mutex:
89  for stat in msg.status:
90  if stat.name.find(MATCH_NAME) > 0:
91  if self.match_headers.count(get_header_name(stat.name)) == 0:
92  self.match_headers.append(get_header_name(stat.name))
93 
94  def test_match_analyze(self):
95  while not rospy.is_shutdown():
96  sleep(1.0)
97  if rospy.get_time() - self._starttime > DURATION:
98  break
99 
100  self.assert_(not rospy.is_shutdown(), "Rospy shutdown!")
101 
102  with self._mutex:
103  self.assert_(self.header, "Header is none. Option --header not given")
104  self.assert_(len(self.match_headers) == 1, "Multiple analyzers reported our item! Headers: %s" % self.match_headers)
105  self.assert_(self.match_headers.count(self.header) > 0, "Didn't have item under header \"%s\". Header: \"%s\"" % (self.header, self.match_headers[0]))
106 
107 
108 if __name__ == '__main__':
109  if False:
110  suite = unittest.TestSuite()
111  suite.addTest(TestMatchAnalyze('test_match_analyze'))
112  unittest.TextTestRunner(verbosity = 2).run(suite)
113  else:
114  rostest.run(PKG, sys.argv[0], TestMatchAnalyze, sys.argv)
def get_header_name(agg_name)
def get_raw_name(agg_name)


test_diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen , Kevin Watts
autogenerated on Thu Oct 8 2020 03:21:52