test_rosout.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2020, Amazon.com, Inc. or its affiliates.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 
36 
37 from __future__ import print_function
38 
39 PKG = 'test_rospy'
40 NAME = 'test_rosout'
41 
42 import sys
43 import time
44 import unittest
45 
46 import rospy
47 import rostest
48 from rosgraph_msgs.msg import Log
49 
50 SUBTOPIC = '/rosout'
51 MSG_PREFIX = '[test_rosout] '
52 TIMEOUT = 10.0 #seconds
53 
54 
55 class TestRosout(unittest.TestCase):
56 
57  callback_data = None
58 
59  @classmethod
60  def _set_callback_data(cls, data):
61  cls.callback_data = data
62 
63  @classmethod
64  def _subscriber_callback(cls, data):
65  if MSG_PREFIX in data.msg:
66  cls._set_callback_data(data)
67 
68  @classmethod
69  def setUpClass(cls):
70  cls.subscriber = rospy.Subscriber(SUBTOPIC, Log, cls._subscriber_callback)
71  # https://answers.ros.org/question/251194/rospy-subscriber-needs-sleep-some-time-until-the-first-message-is-received/
72  rospy.sleep(0.5)
73 
74  def setUp(self):
75  self._set_callback_data(None)
76 
77  # Test that rosout is outputting debug messages as expected
78  def test_rosout_dbg(self):
79  self.assertIsNone(self.callback_data, 'invalid test fixture')
80 
81  log_msg = MSG_PREFIX + 'logging test debug message'
82  rospy.logdebug(log_msg)
83 
84  # wait for log messages to be received
85  timeout_time = time.time() + TIMEOUT
86  while not self.callback_data and time.time() < timeout_time:
87  time.sleep(0.1)
88  print(self.callback_data)
89 
90  # checking number of messages received on /rosout is expected
91  self.assertIsNotNone(self.callback_data, 'did not receive expected message')
92 
93  # checking contents of message
94  self.assertEqual(rospy.DEBUG, self.callback_data.level)
95  self.assertEqual(SUBTOPIC, self.callback_data.name)
96  self.assertEqual(log_msg, self.callback_data.msg)
97  self.assertEqual(NAME+'.py', self.callback_data.file)
98  self.assertEqual('TestRosout.test_rosout_dbg', self.callback_data.function)
99  self.assertEqual([SUBTOPIC], self.callback_data.topics)
100 
101  # Test that rosout is outputting info messages as expected
102  def test_rosout_info(self):
103  self.assertIsNone(self.callback_data, 'invalid test fixture')
104 
105  log_msg = MSG_PREFIX + 'logging test info message'
106  rospy.loginfo(log_msg)
107 
108  # wait for log messages to be received
109  timeout_time = time.time() + TIMEOUT
110  while not self.callback_data and time.time() < timeout_time:
111  time.sleep(0.1)
112  print(self.callback_data)
113 
114  # checking number of messages received on /rosout is expected
115  self.assertIsNotNone(self.callback_data, 'did not receive expected message')
116 
117  # checking contents of message
118  self.assertEqual(rospy.INFO, self.callback_data.level)
119  self.assertEqual(SUBTOPIC, self.callback_data.name)
120  self.assertEqual(log_msg, self.callback_data.msg)
121  self.assertEqual(NAME+'.py', self.callback_data.file)
122  self.assertEqual('TestRosout.test_rosout_info', self.callback_data.function)
123  self.assertEqual([SUBTOPIC], self.callback_data.topics)
124 
125  # Test that rosout is outputting warning messages as expected
126  def test_rosout_warn(self):
127  self.assertIsNone(self.callback_data, 'invalid test fixture')
128 
129  log_msg = MSG_PREFIX + 'logging test warning message'
130  rospy.logwarn(log_msg)
131 
132  # wait for log messages to be received
133  timeout_time = time.time() + TIMEOUT
134  while not self.callback_data and time.time() < timeout_time:
135  time.sleep(0.1)
136  print(self.callback_data)
137 
138  # checking number of messages received on /rosout is expected
139  self.assertIsNotNone(self.callback_data, 'did not receive expected message')
140 
141  # checking contents of message
142  self.assertEqual(rospy.WARN, self.callback_data.level)
143  self.assertEqual(SUBTOPIC, self.callback_data.name)
144  self.assertEqual(log_msg, self.callback_data.msg)
145  self.assertEqual(NAME+'.py', self.callback_data.file)
146  self.assertEqual('TestRosout.test_rosout_warn', self.callback_data.function)
147  self.assertEqual([SUBTOPIC], self.callback_data.topics)
148 
149  # Test that rosout is outputting error messages as expected
150  def test_rosout_err(self):
151  self.assertIsNone(self.callback_data, 'invalid test fixture')
152 
153  log_msg = MSG_PREFIX + 'logging test error message'
154  rospy.logerr(log_msg)
155 
156  # wait for log messages to be received
157  timeout_time = time.time() + TIMEOUT
158  while not self.callback_data and time.time() < timeout_time:
159  time.sleep(0.1)
160  print(self.callback_data)
161 
162  # checking number of messages received on /rosout is expected
163  self.assertIsNotNone(self.callback_data, 'did not receive expected message')
164 
165  # checking contents of message
166  self.assertEqual(rospy.ERROR, self.callback_data.level)
167  self.assertEqual(SUBTOPIC, self.callback_data.name)
168  self.assertEqual(log_msg, self.callback_data.msg)
169  self.assertEqual(NAME+'.py', self.callback_data.file)
170  self.assertEqual('TestRosout.test_rosout_err', self.callback_data.function)
171  self.assertEqual([SUBTOPIC], self.callback_data.topics)
172 
173  # Test that rosout is outputting fatal messages as expected
174  def test_rosout_fatal(self):
175  self.assertIsNone(self.callback_data, 'invalid test fixture')
176 
177  log_msg = MSG_PREFIX + 'logging test fatal message'
178  rospy.logfatal(log_msg)
179 
180  # wait for log messages to be received
181  timeout_time = time.time() + TIMEOUT
182  while not self.callback_data and time.time() < timeout_time:
183  time.sleep(0.1)
184  print(self.callback_data)
185 
186  # checking number of messages received on /rosout is expected
187  self.assertIsNotNone(self.callback_data, 'did not receive expected message')
188 
189  # checking contents of message
190  self.assertEqual(rospy.FATAL, self.callback_data.level)
191  self.assertEqual(SUBTOPIC, self.callback_data.name)
192  self.assertEqual(log_msg, self.callback_data.msg)
193  self.assertEqual(NAME+'.py', self.callback_data.file)
194  self.assertEqual('TestRosout.test_rosout_fatal', self.callback_data.function)
195  self.assertEqual([SUBTOPIC], self.callback_data.topics)
196 
197 
198 if __name__ == '__main__':
199  rospy.init_node(NAME, log_level=rospy.DEBUG)
200  rostest.run(PKG, NAME, TestRosout, sys.argv)
test_rosout.TestRosout.subscriber
subscriber
Definition: test_rosout.py:70
test_rosout.TestRosout.test_rosout_warn
def test_rosout_warn(self)
Definition: test_rosout.py:126
test_rosout.TestRosout.setUpClass
def setUpClass(cls)
Definition: test_rosout.py:69
test_rosout.TestRosout.test_rosout_err
def test_rosout_err(self)
Definition: test_rosout.py:150
test_rosout.TestRosout.test_rosout_fatal
def test_rosout_fatal(self)
Definition: test_rosout.py:174
test_rosout.TestRosout._subscriber_callback
def _subscriber_callback(cls, data)
Definition: test_rosout.py:64
test_rosout.TestRosout.callback_data
callback_data
Definition: test_rosout.py:57
test_rosout.TestRosout.test_rosout_dbg
def test_rosout_dbg(self)
Definition: test_rosout.py:78
test_rosout.TestRosout.test_rosout_info
def test_rosout_info(self)
Definition: test_rosout.py:102
test_rosout.TestRosout
Definition: test_rosout.py:55
test_rosout.TestRosout._set_callback_data
def _set_callback_data(cls, data)
Definition: test_rosout.py:60
test_rosout.TestRosout.setUp
def setUp(self)
Definition: test_rosout.py:74


test_rospy
Author(s): Ken Conley, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:44