test
test_throttle_simtime_loop.py
Go to the documentation of this file.
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
# Software License Agreement (BSD License)
4
#
5
# Copyright (c) 2018, JSK Robotics Lab.
6
# All rights reserved.
7
#
8
# Redistribution and use in source and binary forms, with or without
9
# modification, are permitted provided that the following conditions
10
# are met:
11
#
12
# * Redistributions of source code must retain the above copyright
13
# notice, this list of conditions and the following disclaimer.
14
# * Redistributions in binary form must reproduce the above
15
# copyright notice, this list of conditions and the following
16
# disclaimer in the documentation and/or other materials provided
17
# with the distribution.
18
# * Neither the name of Willow Garage, Inc. nor the names of its
19
# contributors may be used to endorse or promote products derived
20
# from this software without specific prior written permission.
21
#
22
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
32
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33
# POSSIBILITY OF SUCH DAMAGE.
34
#
35
# Author: Yuki Furuta <furushchev@jsk.imi.i.u-tokyo.ac.jp>
36
37
import
threading
38
import
rospy
39
from
std_msgs.msg
import
String
40
from
rosgraph_msgs.msg
import
Clock
41
import
time
42
from
unittest
import
TestCase
43
44
45
class
ClockPublisher
(threading.Thread):
46
def
__init__
(self):
47
super(ClockPublisher, self).
__init__
()
48
self.
finished
= threading.Event()
49
self.
interval
= 0.1
50
self.
pub_clock
= rospy.Publisher(
"/clock"
, Clock, queue_size=1)
51
self.
reset
(time.time())
52
53
def
run
(self):
54
while
not
self.
finished
.is_set():
55
self.
finished
.wait(self.
interval
)
56
self.
pub_clock
.publish(self.
clock
)
57
self.
clock
.clock += rospy.Duration(self.
interval
)
58
self.
finished
.set()
59
60
def
stop
(self):
61
self.
finished
.set()
62
63
def
reset
(self, seconds=0):
64
self.
clock
= Clock(
65
clock=rospy.Time.from_seconds(seconds))
66
67
68
class
TestThrottleSimtimeLoop
(TestCase):
69
def
setUp
(self):
70
self.
clock_pub
=
ClockPublisher
()
71
self.
clock_pub
.start()
72
time.sleep(1)
73
self.
input_count
= 0
74
self.
throttle_count
= 0
75
self.
sub_throttle
= rospy.Subscriber(
76
"input_throttle"
, String, self.
callback_throttle
, queue_size=1)
77
self.
sub_input
= rospy.Subscriber(
78
"input"
, String, self.
callback_input
, queue_size=1)
79
80
def
tearDown
(self):
81
self.
clock_pub
.stop()
82
83
def
callback_throttle
(self, msg):
84
self.
throttle_count
+= 1
85
86
def
callback_input
(self, msg):
87
self.
input_count
+= 1
88
89
def
test_throttle_loop
(self):
90
# wait for throttled message
91
for
i
in
range(100):
92
if
self.
throttle_count
> 0:
93
break
94
time.sleep(0.1)
95
self.assertGreater(
96
self.
input_count
, 0,
97
"Input message comes before rostime moves backward"
)
98
self.assertGreater(
99
self.
throttle_count
, 0,
100
"Throttle message comes before rostime moves backward"
)
101
102
# reset /clock (rostime moves backward)
103
self.
clock_pub
.reset()
104
time.sleep(0.1)
105
106
# wait for throttled message
107
self.
input_count
= 0
108
self.
throttle_count
= 0
109
for
i
in
range(100):
110
if
self.
throttle_count
> 0:
111
break
112
time.sleep(0.1)
113
self.assertGreater(
114
self.
input_count
, 0,
115
"Input message comes after rostime moved backward"
)
116
self.assertGreater(
117
self.
throttle_count
, 0,
118
"Throttle message comes after rostime moved backward"
)
119
120
121
if
__name__ ==
'__main__'
:
122
import
rostest
123
rospy.init_node(
"test_throttle_simtime_loop"
)
124
rostest.rosrun(
"topic_tools"
,
125
"test_throttle_simtime_loop"
,
126
TestThrottleSimtimeLoop)
test_throttle_simtime_loop.ClockPublisher.reset
def reset(self, seconds=0)
Definition:
test_throttle_simtime_loop.py:63
test_throttle_simtime_loop.TestThrottleSimtimeLoop.sub_throttle
sub_throttle
Definition:
test_throttle_simtime_loop.py:75
test_throttle_simtime_loop.ClockPublisher.pub_clock
pub_clock
Definition:
test_throttle_simtime_loop.py:50
test_throttle_simtime_loop.TestThrottleSimtimeLoop.input_count
input_count
Definition:
test_throttle_simtime_loop.py:73
test_throttle_simtime_loop.TestThrottleSimtimeLoop.sub_input
sub_input
Definition:
test_throttle_simtime_loop.py:77
test_throttle_simtime_loop.TestThrottleSimtimeLoop.throttle_count
throttle_count
Definition:
test_throttle_simtime_loop.py:74
test_throttle_simtime_loop.TestThrottleSimtimeLoop
Definition:
test_throttle_simtime_loop.py:68
test_throttle_simtime_loop.ClockPublisher.stop
def stop(self)
Definition:
test_throttle_simtime_loop.py:60
test_throttle_simtime_loop.TestThrottleSimtimeLoop.callback_input
def callback_input(self, msg)
Definition:
test_throttle_simtime_loop.py:86
test_throttle_simtime_loop.TestThrottleSimtimeLoop.setUp
def setUp(self)
Definition:
test_throttle_simtime_loop.py:69
test_throttle_simtime_loop.TestThrottleSimtimeLoop.callback_throttle
def callback_throttle(self, msg)
Definition:
test_throttle_simtime_loop.py:83
test_throttle_simtime_loop.TestThrottleSimtimeLoop.test_throttle_loop
def test_throttle_loop(self)
Definition:
test_throttle_simtime_loop.py:89
msg
test_throttle_simtime_loop.ClockPublisher.interval
interval
Definition:
test_throttle_simtime_loop.py:49
test_throttle_simtime_loop.ClockPublisher.clock
clock
Definition:
test_throttle_simtime_loop.py:64
test_throttle_simtime_loop.ClockPublisher.finished
finished
Definition:
test_throttle_simtime_loop.py:48
test_throttle_simtime_loop.ClockPublisher
Definition:
test_throttle_simtime_loop.py:45
test_throttle_simtime_loop.TestThrottleSimtimeLoop.clock_pub
clock_pub
Definition:
test_throttle_simtime_loop.py:70
test_throttle_simtime_loop.TestThrottleSimtimeLoop.tearDown
def tearDown(self)
Definition:
test_throttle_simtime_loop.py:80
test_throttle_simtime_loop.ClockPublisher.__init__
def __init__(self)
Definition:
test_throttle_simtime_loop.py:46
test_throttle_simtime_loop.ClockPublisher.run
def run(self)
Definition:
test_throttle_simtime_loop.py:53
topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:34:17