add_analyzers_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2009, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of the Willow Garage nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 
35 import unittest
36 import rospy, rostest
37 import rosparam
38 import optparse
39 import sys
40 import threading
41 from bondpy import bondpy
42 from diagnostic_msgs.srv import AddDiagnostics
43 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
44 
45 PKG = 'diagnostic_aggregator'
46 
47 class TestAddAnalyzer(unittest.TestCase):
48  def __init__(self, *args):
49  super(TestAddAnalyzer, self).__init__(*args)
50  rospy.init_node('test_add_analyzer')
51  self.namespace = rospy.get_name()
52  paramlist = rosparam.load_file(rospy.myargv()[1])
53  # expect to receive these paths in the added analyzers
54  self.expected = [paramlist[0][1] + analyzer['path'] for name, analyzer in paramlist[0][0]['analyzers'].items()]
55 
56  self._mutex = threading.Lock()
57  self.agg_msgs = {}
58 
59  # put parameters in the node namespace so they can be read by the aggregator
60  for params, ns in paramlist:
61  rosparam.upload_params(rospy.get_name() + '/' + ns, params)
62 
63  rospy.Subscriber('/diagnostics_agg', DiagnosticArray, self.agg_cb)
64  self.pub = rospy.Publisher('/diagnostics', DiagnosticArray, queue_size=1)
65 
66  def agg_cb(self, msg):
67  with self._mutex:
68  for stat in msg.status:
69  self.agg_msgs[stat.name] = stat
70 
71  def add_analyzer(self):
72  """Start a bond to the aggregator
73  """
74  namespace = rospy.resolve_name(rospy.get_name())
75  self.bond = bondpy.Bond("/diagnostics_agg/bond" + namespace, namespace)
76  self.bond.start()
77  rospy.wait_for_service('/diagnostics_agg/add_diagnostics', timeout=10)
78  add_diagnostics = rospy.ServiceProxy('/diagnostics_agg/add_diagnostics', AddDiagnostics)
79  print(self.namespace)
80  resp = add_diagnostics(load_namespace=self.namespace)
81  self.assert_(resp.success, 'Service call was unsuccessful: {0}'.format(resp.message))
82 
83  def wait_for_agg(self):
84  self.agg_msgs = {}
85  while not self.agg_msgs and not rospy.is_shutdown():
86  rospy.sleep(rospy.Duration(3))
87 
88  def test_add_agg(self):
89  self.wait_for_agg()
90 
91  # confirm that the things we're going to add aren't there already
92  with self._mutex:
93  agg_paths = [msg.name for name, msg in self.agg_msgs.items()]
94  self.assert_(not any(expected in agg_paths for expected in self.expected))
95 
96  # add the new groups
97  self.add_analyzer()
98 
99  arr = DiagnosticArray()
100  arr.header.stamp = rospy.get_rostime()
101  arr.status = [
102  DiagnosticStatus(name='primary', message='hello-primary'),
103  DiagnosticStatus(name='secondary', message='hello-secondary')
104  ]
105  self.pub.publish(arr)
106  self.wait_for_agg()
107  # the new aggregator data should contain the extra paths. At this point
108  # the paths are probably still in the 'Other' group because the bond
109  # hasn't been fully formed
110  with self._mutex:
111  agg_paths = [msg.name for name, msg in self.agg_msgs.items()]
112  self.assert_(all(expected in agg_paths for expected in self.expected))
113 
114  rospy.sleep(rospy.Duration(5)) # wait a bit for the new items to move to the right group
115  arr.header.stamp = rospy.get_rostime()
116  self.pub.publish(arr) # publish again to get the correct groups to show OK
117  self.wait_for_agg()
118 
119  for name, msg in self.agg_msgs.items():
120  if name in self.expected: # should have just received messages on the analyzer
121  self.assert_(msg.message == 'OK')
122 
123  agg_paths = [msg.name for name, msg in self.agg_msgs.items()]
124  self.assert_(all(expected in agg_paths for expected in self.expected))
125 
126 
127  self.bond.shutdown()
128  rospy.sleep(rospy.Duration(5)) # wait a bit for the analyzers to unload
129  self.wait_for_agg()
130  # the aggregator data should no longer contain the paths once the bond is shut down
131  with self._mutex:
132  agg_paths = [msg.name for name, msg in self.agg_msgs.items()]
133  self.assert_(not any(expected in agg_paths for expected in self.expected))
134 
135 if __name__ == '__main__':
136  print('SYS ARGS:', sys.argv)
137  rostest.run(PKG, sys.argv[0], TestAddAnalyzer, sys.argv)
add_analyzers_test.TestAddAnalyzer.namespace
namespace
Definition: add_analyzers_test.py:51
add_analyzers_test.TestAddAnalyzer._mutex
_mutex
Definition: add_analyzers_test.py:56
add_analyzers_test.TestAddAnalyzer.expected
expected
Definition: add_analyzers_test.py:54
add_analyzers_test.TestAddAnalyzer.wait_for_agg
def wait_for_agg(self)
Definition: add_analyzers_test.py:83
add_analyzers_test.TestAddAnalyzer.add_analyzer
def add_analyzer(self)
Definition: add_analyzers_test.py:71
add_analyzers_test.TestAddAnalyzer.bond
bond
Definition: add_analyzers_test.py:75
add_analyzers_test.TestAddAnalyzer.agg_msgs
agg_msgs
Definition: add_analyzers_test.py:57
add_analyzers_test.TestAddAnalyzer.test_add_agg
def test_add_agg(self)
Definition: add_analyzers_test.py:88
add_analyzers_test.TestAddAnalyzer.agg_cb
def agg_cb(self, msg)
Definition: add_analyzers_test.py:66
add_analyzers_test.TestAddAnalyzer.__init__
def __init__(self, *args)
Definition: add_analyzers_test.py:48
add_analyzers_test.TestAddAnalyzer.pub
pub
Definition: add_analyzers_test.py:64
add_analyzers_test.TestAddAnalyzer
Definition: add_analyzers_test.py:47


diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen
autogenerated on Tue Nov 15 2022 03:17:12