test_topic_statistics.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 '''
35 Integration test for topic statistics
36 '''
37 
38 from __future__ import print_function
39 import sys
40 import unittest
41 
42 import rospy
43 import rostest
44 from rosgraph_msgs.msg import TopicStatistics
45 
46 PKG = 'test_rospy'
47 
48 
49 class TestTopicStatistics(unittest.TestCase):
50  def setUp(self):
52 
53  def new_msg(self, msg):
54  # need at least two messages to compute the period fields
55  # since messages without period fields aren't useful skip them
56  if msg.delivered_msgs > 1:
57  self.topic_statistic_msg_map[msg.topic] = msg
58 
60  self, cond, timeout=rospy.Duration(5.0), interval=rospy.Duration(0.5)
61  ):
62  started = rospy.Time.now()
63  while rospy.Time.now() - started < timeout:
64  if cond():
65  return True
66  rospy.sleep(interval)
67  self.assertTrue(False)
68 
69  def frequency_acceptable(self, topic, expected, error_margin=0.1):
70  ''' return True if topic message's measured frequency
71  is within some error margin of expected frequency '''
72  msg = self.topic_statistic_msg_map[topic]
73  # need at least two messages to compute the period fields
74  assert msg.delivered_msgs > 1
75  found_freq = 1.0 / msg.period_mean.to_sec()
76  rospy.loginfo(
77  "Testing {}'s found frequency {} against expected {}".format(
78  topic, found_freq, expected))
79  return abs(found_freq - expected) / expected <= error_margin
80 
81  def test_frequencies(self):
82  sub = rospy.Subscriber('/statistics', TopicStatistics, self.new_msg)
83 
84  self.assert_eventually(
85  lambda: '/very_fast_chatter' in self.topic_statistic_msg_map)
86  self.assert_eventually(
87  lambda: '/fast_chatter' in self.topic_statistic_msg_map)
88  self.assert_eventually(
89  lambda: '/slow_chatter' in self.topic_statistic_msg_map)
90  self.assert_eventually(
91  lambda: '/very_slow_chatter' in self.topic_statistic_msg_map,
92  timeout=rospy.Duration(10.0))
93 
94  self.assertTrue(self.frequency_acceptable('/very_fast_chatter', 150))
95  self.assertTrue(self.frequency_acceptable('/fast_chatter', 53))
96  self.assertTrue(self.frequency_acceptable('/slow_chatter', 8))
97  self.assertTrue(self.frequency_acceptable('/very_slow_chatter', 0.5))
98 
99 
100 if __name__ == '__main__':
101  rospy.init_node('test_topic_statistics')
102  rostest.run(PKG, 'rospy_topic_statistics', TestTopicStatistics, sys.argv)
test_topic_statistics.TestTopicStatistics.new_msg
def new_msg(self, msg)
Definition: test_topic_statistics.py:53
test_topic_statistics.TestTopicStatistics.frequency_acceptable
def frequency_acceptable(self, topic, expected, error_margin=0.1)
Definition: test_topic_statistics.py:69
test_topic_statistics.TestTopicStatistics.assert_eventually
def assert_eventually(self, cond, timeout=rospy.Duration(5.0), interval=rospy.Duration(0.5))
Definition: test_topic_statistics.py:59
test_topic_statistics.TestTopicStatistics.topic_statistic_msg_map
topic_statistic_msg_map
Definition: test_topic_statistics.py:51
test_topic_statistics.TestTopicStatistics.test_frequencies
def test_frequencies(self)
Definition: test_topic_statistics.py:81
test_topic_statistics.TestTopicStatistics.setUp
def setUp(self)
Definition: test_topic_statistics.py:50
test_topic_statistics.TestTopicStatistics
Definition: test_topic_statistics.py:49


test_rospy
Author(s): Ken Conley, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:44