test_throttle_simtime_loop.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2018, JSK Robotics Lab.
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions
10 # are met:
11 #
12 # * Redistributions of source code must retain the above copyright
13 # notice, this list of conditions and the following disclaimer.
14 # * Redistributions in binary form must reproduce the above
15 # copyright notice, this list of conditions and the following
16 # disclaimer in the documentation and/or other materials provided
17 # with the distribution.
18 # * Neither the name of Willow Garage, Inc. nor the names of its
19 # contributors may be used to endorse or promote products derived
20 # from this software without specific prior written permission.
21 #
22 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
32 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 # POSSIBILITY OF SUCH DAMAGE.
34 #
35 # Author: Yuki Furuta <furushchev@jsk.imi.i.u-tokyo.ac.jp>
36 
37 import threading
38 import rospy
39 from std_msgs.msg import String
40 from rosgraph_msgs.msg import Clock
41 import time
42 from unittest import TestCase
43 
44 
45 class ClockPublisher(threading.Thread):
46  def __init__(self):
47  super(ClockPublisher, self).__init__()
48  self.finished = threading.Event()
49  self.interval = 0.1
50  self.pub_clock = rospy.Publisher("/clock", Clock, queue_size=1)
51  self.reset(time.time())
52 
53  def run(self):
54  while not self.finished.is_set():
55  self.finished.wait(self.interval)
56  self.pub_clock.publish(self.clock)
57  self.clock.clock += rospy.Duration(self.interval)
58  self.finished.set()
59 
60  def stop(self):
61  self.finished.set()
62 
63  def reset(self, seconds=0):
64  self.clock = Clock(
65  clock=rospy.Time.from_seconds(seconds))
66 
67 
68 class TestThrottleSimtimeLoop(TestCase):
69  def setUp(self):
71  self.clock_pub.start()
72  time.sleep(1)
73  self.input_count = 0
74  self.throttle_count = 0
75  self.sub_throttle = rospy.Subscriber(
76  "input_throttle", String, self.callback_throttle, queue_size=1)
77  self.sub_input = rospy.Subscriber(
78  "input", String, self.callback_input, queue_size=1)
79 
80  def tearDown(self):
81  self.clock_pub.stop()
82 
83  def callback_throttle(self, msg):
84  self.throttle_count += 1
85 
86  def callback_input(self, msg):
87  self.input_count += 1
88 
89  def test_throttle_loop(self):
90  # wait for throttled message
91  for i in range(100):
92  if self.throttle_count > 0:
93  break
94  time.sleep(0.1)
95  self.assertGreater(
96  self.input_count, 0,
97  "Input message comes before rostime moves backward")
98  self.assertGreater(
99  self.throttle_count, 0,
100  "Throttle message comes before rostime moves backward")
101 
102  # reset /clock (rostime moves backward)
103  self.clock_pub.reset()
104  time.sleep(0.1)
105 
106  # wait for throttled message
107  self.input_count = 0
108  self.throttle_count = 0
109  for i in range(100):
110  if self.throttle_count > 0:
111  break
112  time.sleep(0.1)
113  self.assertGreater(
114  self.input_count, 0,
115  "Input message comes after rostime moved backward")
116  self.assertGreater(
117  self.throttle_count, 0,
118  "Throttle message comes after rostime moved backward")
119 
120 
121 if __name__ == '__main__':
122  import rostest
123  rospy.init_node("test_throttle_simtime_loop")
124  rostest.rosrun("topic_tools",
125  "test_throttle_simtime_loop",
126  TestThrottleSimtimeLoop)
msg
test_throttle_simtime_loop.ClockPublisher
Definition: test_throttle_simtime_loop.py:45
test_throttle_simtime_loop.ClockPublisher.run
def run(self)
Definition: test_throttle_simtime_loop.py:53
test_throttle_simtime_loop.TestThrottleSimtimeLoop.callback_throttle
def callback_throttle(self, msg)
Definition: test_throttle_simtime_loop.py:83
test_throttle_simtime_loop.TestThrottleSimtimeLoop
Definition: test_throttle_simtime_loop.py:68
test_throttle_simtime_loop.TestThrottleSimtimeLoop.clock_pub
clock_pub
Definition: test_throttle_simtime_loop.py:70
test_throttle_simtime_loop.TestThrottleSimtimeLoop.test_throttle_loop
def test_throttle_loop(self)
Definition: test_throttle_simtime_loop.py:89
test_throttle_simtime_loop.TestThrottleSimtimeLoop.tearDown
def tearDown(self)
Definition: test_throttle_simtime_loop.py:80
test_throttle_simtime_loop.ClockPublisher.stop
def stop(self)
Definition: test_throttle_simtime_loop.py:60
test_throttle_simtime_loop.TestThrottleSimtimeLoop.sub_input
sub_input
Definition: test_throttle_simtime_loop.py:77
test_throttle_simtime_loop.TestThrottleSimtimeLoop.sub_throttle
sub_throttle
Definition: test_throttle_simtime_loop.py:75
test_throttle_simtime_loop.TestThrottleSimtimeLoop.input_count
input_count
Definition: test_throttle_simtime_loop.py:73
test_throttle_simtime_loop.ClockPublisher.finished
finished
Definition: test_throttle_simtime_loop.py:48
test_throttle_simtime_loop.ClockPublisher.clock
clock
Definition: test_throttle_simtime_loop.py:64
test_throttle_simtime_loop.TestThrottleSimtimeLoop.throttle_count
throttle_count
Definition: test_throttle_simtime_loop.py:74
test_throttle_simtime_loop.TestThrottleSimtimeLoop.setUp
def setUp(self)
Definition: test_throttle_simtime_loop.py:69
test_throttle_simtime_loop.ClockPublisher.interval
interval
Definition: test_throttle_simtime_loop.py:49
test_throttle_simtime_loop.TestThrottleSimtimeLoop.callback_input
def callback_input(self, msg)
Definition: test_throttle_simtime_loop.py:86
test_throttle_simtime_loop.ClockPublisher.reset
def reset(self, seconds=0)
Definition: test_throttle_simtime_loop.py:63
test_throttle_simtime_loop.ClockPublisher.__init__
def __init__(self)
Definition: test_throttle_simtime_loop.py:46
test_throttle_simtime_loop.ClockPublisher.pub_clock
pub_clock
Definition: test_throttle_simtime_loop.py:50


topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:02:10