discard_stale_not_published_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2019, Magazino GmbH.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Magazino GmbH nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 
35 # \author Guglielmo Gemignani
36 
37 # \brief Tests that expected items from GenericAnalyzer will be removed after the timeout if discard_stale is set to true
38 
39 import rospy, rostest, unittest
40 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
41 from time import sleep
42 import sys
43 import threading
44 
45 
46 class TestDiscardStale(unittest.TestCase):
47  def __init__(self, *args):
48  super(TestDiscardStale, self).__init__(*args)
49 
50  self._mutex = threading.Lock()
51 
52  self._agg_expecteds = []
53  self._expecteds = {}
54 
55  rospy.init_node('test_expected_stale')
56  self._diag_agg_sub = rospy.Subscriber("/diagnostics_agg",
57  DiagnosticArray,
58  self.diag_agg_cb)
59  self._diag_sub = rospy.Subscriber("/diagnostics",
60  DiagnosticArray,
61  self.diag_cb)
62  self._start_time = rospy.get_time()
63 
64  def diag_cb(self, msg):
65  with self._mutex:
66  for stat in msg.status:
67  self._expecteds[stat.name] = stat
68 
69  def diag_agg_cb(self, msg):
70  with self._mutex:
71  self._agg_expecteds = []
72  for stat in msg.status:
73  self._agg_expecteds.append(stat)
74 
75  def test_discard_stale(self):
76  expecteds = {}
77  timeout = 10
78 
79  # wait for expecteds to be published
80  while (len(expecteds.keys()) != 1 and
81  not rospy.is_shutdown() and
82  (rospy.get_time() - self._start_time < timeout)):
83  sleep(1.0)
84  with self._mutex:
85  expecteds = self._expecteds
86 
87  self.assert_(len(expecteds.keys()) == 1, "The expected diagnostics are not of length 1."
88  "Received diagnostics: {}".format(expecteds))
89  self.assert_(expecteds['nonexistent2'].level == DiagnosticStatus.WARN)
90 
91  self._start_time = rospy.get_time()
92  duration = 8
93  # waiting a bit more than 5 seconds for the messages to become stale
94  while not rospy.is_shutdown():
95  sleep(1.0)
96  if rospy.get_time() - self._start_time > duration:
97  break
98 
99  with self._mutex:
100  self.assert_(len(self._agg_expecteds) == 1,
101  "There should only be one expected aggregated item left, {} found instead!".
102  format(len(self._agg_expecteds)))
103  self.assert_(self._agg_expecteds[0].name == "/Nonexistent2",
104  "The name of the first aggregated message should be '/Nonexistent2'!")
105  self.assert_(self._agg_expecteds[0].level == DiagnosticStatus.STALE,
106  "The level of the first aggregated message should be stale!")
107 
108  self._start_time = rospy.get_time()
109  duration = 8
110  # waiting a bit more than 5 seconds for the timeout of the aggregator to kick in
111  while not rospy.is_shutdown():
112  sleep(1.0)
113  if rospy.get_time() - self._start_time > duration:
114  break
115 
116  with self._mutex:
117  self.assert_(len(self._agg_expecteds) == 0,
118  "There should't be any aggregated items left, {} found instead! {}".
119  format(len(self._agg_expecteds), self._agg_expecteds))
120 
121 
122 if __name__ == '__main__':
123  rostest.run('diagnostic_aggregator', sys.argv[0], TestDiscardStale, sys.argv)
124 
discard_stale_not_published_test.TestDiscardStale._agg_expecteds
_agg_expecteds
Definition: discard_stale_not_published_test.py:52
discard_stale_not_published_test.TestDiscardStale.diag_agg_cb
def diag_agg_cb(self, msg)
Definition: discard_stale_not_published_test.py:69
discard_stale_not_published_test.TestDiscardStale
Definition: discard_stale_not_published_test.py:46
discard_stale_not_published_test.TestDiscardStale.__init__
def __init__(self, *args)
Definition: discard_stale_not_published_test.py:47
discard_stale_not_published_test.TestDiscardStale._start_time
_start_time
Definition: discard_stale_not_published_test.py:62
discard_stale_not_published_test.TestDiscardStale._mutex
_mutex
Definition: discard_stale_not_published_test.py:50
discard_stale_not_published_test.TestDiscardStale.diag_cb
def diag_cb(self, msg)
Definition: discard_stale_not_published_test.py:64
discard_stale_not_published_test.TestDiscardStale._expecteds
_expecteds
Definition: discard_stale_not_published_test.py:53
discard_stale_not_published_test.TestDiscardStale._diag_agg_sub
_diag_agg_sub
Definition: discard_stale_not_published_test.py:56
discard_stale_not_published_test.TestDiscardStale.test_discard_stale
def test_discard_stale(self)
Definition: discard_stale_not_published_test.py:75
discard_stale_not_published_test.TestDiscardStale._diag_sub
_diag_sub
Definition: discard_stale_not_published_test.py:59


diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen
autogenerated on Tue Nov 15 2022 03:17:12