system_blackbox.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # twist_mux: system_blackbox.py
5 #
6 # Copyright (c) 2014 PAL Robotics SL.
7 # All Rights Reserved
8 #
9 # Permission to use, copy, modify, and/or distribute this software for
10 # any purpose with or without fee is hereby granted, provided that the
11 # above copyright notice and this permission notice appear in all
12 # copies.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
15 # REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
16 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY
17 # SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
18 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
19 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
20 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
21 #
22 # Authors:
23 # * Siegfried-A. Gevatter
24 
25 import unittest
26 
27 import rospy
28 from std_msgs.msg import Bool
29 from geometry_msgs.msg import Twist
30 
31 from rate_publishers import RatePublishers, TimeoutManager
32 
33 def twist(x=0.0, r=0.0):
34  """
35  Returns a Twist for the given linear and rotation speed.
36  """
37  t = Twist()
38  t.linear.x = x
39  t.angular.z = r
40  return t
41 
42 class TestTwistMux(unittest.TestCase):
43 
44  # Maximum time (in seconds) that it may take for a message
45  # to be received by the target node.
46  MESSAGE_TIMEOUT = 0.3
47 
48  # Value (in seconds) >= the highest topic/lock timeout.
49  TOPIC_TIMEOUT = 1.0
50 
51  @classmethod
52  def setUpClass(cls):
54  cls._vel1 = cls._publishers.add_topic('vel_1', Twist)
55  cls._vel2 = cls._publishers.add_topic('vel_2', Twist)
56  cls._vel3 = cls._publishers.add_topic('vel_3', Twist)
57 
58  cls._lock1 = cls._publishers.add_topic('lock_1', Bool)
59  cls._lock2 = cls._publishers.add_topic('lock_2', Bool)
60 
62  cls._timeout_manager.add(cls._publishers)
63  cls._timeout_manager.spin_thread()
64 
65  def tearDown(self):
66  # Reset all topics.
67  t = twist(0, 0)
68  l = Bool(False)
69 
70  self._vel1.pub(t)
71  self._vel2.pub(t)
72  self._vel3.pub(t)
73 
74  self._lock1.pub(l)
75  self._lock2.pub(l)
76 
77  # Wait for previously published messages to time out,
78  # since we aren't restarting twist_mux.
79  #
80  # This sleeping time must be higher than any of the
81  # timeouts in system_test_config.yaml.
82  rospy.sleep(self.MESSAGE_TIMEOUT + self.TOPIC_TIMEOUT)
83 
84  @classmethod
85  def _vel_cmd(cls):
86  rospy.sleep(cls.MESSAGE_TIMEOUT)
87  return rospy.wait_for_message('cmd_vel_out', Twist,
88  timeout=cls.MESSAGE_TIMEOUT)
89 
90  def test_empty(self):
91  try:
92  self._vel_cmd()
93  self.fail('twist_mux should not be publishing without any input')
94  except rospy.ROSException:
95  pass
96 
97  def test_basic(self):
98  t = twist(2.0)
99  self._vel1.pub(t, rate=5)
100  self.assertEqual(t, self._vel_cmd())
101 
103  t1 = twist(2.0)
104  t2 = twist(0, 1.0)
105 
106  # Publish twist from input1 @ 3Hz, it should be used.
107  self._vel1.pub(t1, rate=5)
108  self.assertEqual(t1, self._vel_cmd())
109 
110  # Publish twist from input3, it should have priority
111  # over the one from input1.
112  self._vel3.pub(t2, rate=10)
113  self.assertEqual(t2, self._vel_cmd())
114 
115  # Stop publishing input 3 and wait for it to timeout.
116  # Speed should fall back to input 1.
117  self._vel3.stop()
118  rospy.sleep(0.5) # input is 0.3 in .yaml file
119  self.assertEqual(t1, self._vel_cmd())
120 
121 # TODO: test limits, test timeouts, etc.
122 
123 if __name__ == '__main__':
124  import rostest
125  PKG_NAME = 'twist_mux'
126  TEST_NAME = '%s_system_blackbox_test' % PKG_NAME
127  rospy.init_node(TEST_NAME)
128  rostest.rosrun(PKG_NAME, TEST_NAME, TestTwistMux)
def twist(x=0.0, r=0.0)


twist_mux
Author(s): Enrique Fernandez , Siegfried-A. Gevatter Pujals
autogenerated on Wed Oct 26 2022 02:14:17