multiple_match_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2009, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of the Willow Garage nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 
35 
36 
37 
38 
39 from __future__ import with_statement
40 DURATION = 10
41 PKG = 'diagnostic_aggregator'
42 import roslib; roslib.load_manifest(PKG)
43 import rospy, rostest, unittest
44 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
45 from time import sleep
46 import sys
47 import threading
48 
49 MULTI_NAME = 'multi'
50 HEADER1 = 'Header1'
51 HEADER2 = 'Header2'
52 
53 def get_raw_name(agg_name):
54  return agg_name.split('/')[-1]
55 
56 def get_header_name(agg_name):
57  return '/'.join(agg_name.split('/')[1:-1])
58 
60  def __init__(self, msg):
61  self.name = get_raw_name(msg.name)
62  self.header = get_header_name(msg.name)
63  self.level = msg.level
64  self.message = msg.message
65 
66  self.update_time = rospy.get_time()
67 
68  def is_stale(self):
69  return rospy.get_time() - self.update_time > 5
70 
71  def update(self, msg):
72  self.level = msg.level
73  self.message = msg.message
74 
75  self.update_time = rospy.get_time()
76 
77 class TestMultipleMatch(unittest.TestCase):
78  def __init__(self, *args):
79  super(TestMultipleMatch, self).__init__(*args)
80 
81  self._mutex = threading.Lock()
82 
83  self._multi_items = {}
84 
85  rospy.init_node('test_multiple_match')
86  self._starttime = rospy.get_time()
87 
88  sub_agg = rospy.Subscriber("/diagnostics_agg", DiagnosticArray, self.diag_agg_cb)
89 
90  def diag_agg_cb(self, msg):
91  with self._mutex:
92  for stat in msg.status:
93  if stat.name.find(MULTI_NAME) > 0:
94  self._multi_items[get_header_name(stat.name)] = DiagnosticItem(stat)
95 
97  while not rospy.is_shutdown():
98  sleep(1.0)
99  if rospy.get_time() - self._starttime > DURATION:
100  break
101 
102  self.assert_(not rospy.is_shutdown(), "Rospy shutdown!")
103 
104  with self._mutex:
105  self.assert_(HEADER1 in self._multi_items, "Didn't have item under %s. Items: %s" % (HEADER1, self._multi_items))
106  self.assert_(self._multi_items[HEADER1].name == MULTI_NAME, "Item name under %s didn't match %s" % (HEADER1, MULTI_NAME))
107 
108  self.assert_(HEADER2 in self._multi_items, "Didn't have item under %s" % HEADER2)
109  self.assert_(self._multi_items[HEADER2].name == MULTI_NAME, "Item name under %s didn't match %s" % (HEADER2, MULTI_NAME))
110 
111 
112 if __name__ == '__main__':
113  if False:
114  suite = unittest.TestSuite()
115  suite.addTest(TestMultipleMatch('test_multiple_match'))
116  unittest.TextTestRunner(verbosity = 2).run(suite)
117  else:
118  rostest.run(PKG, sys.argv[0], TestMultipleMatch, sys.argv)
multiple_match_test.DiagnosticItem
Definition: multiple_match_test.py:59
multiple_match_test.DiagnosticItem.header
header
Definition: multiple_match_test.py:62
multiple_match_test.DiagnosticItem.level
level
Definition: multiple_match_test.py:63
multiple_match_test.DiagnosticItem.update_time
update_time
Definition: multiple_match_test.py:66
multiple_match_test.DiagnosticItem.message
message
Definition: multiple_match_test.py:64
multiple_match_test.TestMultipleMatch
Definition: multiple_match_test.py:77
multiple_match_test.DiagnosticItem.name
name
Definition: multiple_match_test.py:61
multiple_match_test.DiagnosticItem.__init__
def __init__(self, msg)
Definition: multiple_match_test.py:60
multiple_match_test.DiagnosticItem.is_stale
def is_stale(self)
Definition: multiple_match_test.py:68
multiple_match_test.get_header_name
def get_header_name(agg_name)
Definition: multiple_match_test.py:56
multiple_match_test.get_raw_name
def get_raw_name(agg_name)
Definition: multiple_match_test.py:53
multiple_match_test.TestMultipleMatch._mutex
_mutex
Definition: multiple_match_test.py:81
multiple_match_test.DiagnosticItem.update
def update(self, msg)
Definition: multiple_match_test.py:71
multiple_match_test.TestMultipleMatch._starttime
_starttime
Definition: multiple_match_test.py:86
multiple_match_test.TestMultipleMatch.diag_agg_cb
def diag_agg_cb(self, msg)
Definition: multiple_match_test.py:90
multiple_match_test.TestMultipleMatch._multi_items
_multi_items
Definition: multiple_match_test.py:83
multiple_match_test.TestMultipleMatch.test_multiple_match
def test_multiple_match(self)
Definition: multiple_match_test.py:96
multiple_match_test.TestMultipleMatch.__init__
def __init__(self, *args)
Definition: multiple_match_test.py:78


diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen
autogenerated on Tue Nov 15 2022 03:17:13