system_blackbox.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 #
00004 # twist_mux: system_blackbox.py
00005 #
00006 # Copyright (c) 2014 PAL Robotics SL.
00007 # All Rights Reserved
00008 #
00009 # Permission to use, copy, modify, and/or distribute this software for
00010 # any purpose with or without fee is hereby granted, provided that the
00011 # above copyright notice and this permission notice appear in all
00012 # copies.
00013 # 
00014 # THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00015 # REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00016 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY
00017 # SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00018 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00019 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
00020 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00021 #
00022 # Authors:
00023 #   * Siegfried-A. Gevatter
00024 
00025 import unittest
00026 
00027 import rospy
00028 from std_msgs.msg import Bool
00029 from geometry_msgs.msg import Twist
00030 
00031 from rate_publishers import RatePublishers, TimeoutManager
00032 
00033 def twist(x=0.0, r=0.0):
00034     """
00035     Returns a Twist for the given linear and rotation speed.
00036     """
00037     t = Twist()
00038     t.linear.x = x
00039     t.angular.z = r
00040     return t
00041 
00042 class TestTwistMux(unittest.TestCase):
00043 
00044     # Maximum time (in seconds) that it may take for a message
00045     # to be received by the target node.
00046     MESSAGE_TIMEOUT = 0.3
00047 
00048     # Value (in seconds) >= the highest topic/lock timeout.
00049     TOPIC_TIMEOUT = 1.0
00050 
00051     @classmethod
00052     def setUpClass(cls):
00053         cls._publishers = RatePublishers()
00054         cls._vel1 = cls._publishers.add_topic('vel_1', Twist)
00055         cls._vel2 = cls._publishers.add_topic('vel_2', Twist)
00056         cls._vel3 = cls._publishers.add_topic('vel_3', Twist)
00057 
00058         cls._lock1 = cls._publishers.add_topic('lock_1', Bool)
00059         cls._lock2 = cls._publishers.add_topic('lock_2', Bool)
00060 
00061         cls._timeout_manager = TimeoutManager()
00062         cls._timeout_manager.add(cls._publishers)
00063         cls._timeout_manager.spin_thread()
00064 
00065     def tearDown(self):
00066         # Reset all topics.
00067         t = twist(0, 0)
00068         l = Bool(False)
00069 
00070         self._vel1.pub(t)
00071         self._vel2.pub(t)
00072         self._vel3.pub(t)
00073 
00074         self._lock1.pub(l)
00075         self._lock2.pub(l)
00076 
00077         # Wait for previously published messages to time out,
00078         # since we aren't restarting twist_mux.
00079         #
00080         # This sleeping time must be higher than any of the
00081         # timeouts in system_test_config.yaml.
00082         rospy.sleep(self.MESSAGE_TIMEOUT + self.TOPIC_TIMEOUT)
00083 
00084     @classmethod
00085     def _vel_cmd(cls):
00086         rospy.sleep(cls.MESSAGE_TIMEOUT)
00087         return rospy.wait_for_message('cmd_vel_out', Twist,
00088                                       timeout=cls.MESSAGE_TIMEOUT)
00089 
00090     def test_empty(self):
00091         try:
00092             self._vel_cmd()
00093             self.fail('twist_mux should not be publishing without any input')
00094         except rospy.ROSException:
00095             pass
00096 
00097     def test_basic(self):
00098         t = twist(2.0)
00099         self._vel1.pub(t, rate=5)
00100         self.assertEqual(t, self._vel_cmd())
00101 
00102     def test_basic_with_priorities(self):
00103         t1 = twist(2.0)
00104         t2 = twist(0, 1.0)
00105 
00106         # Publish twist from input1 @ 3Hz, it should be used.
00107         self._vel1.pub(t1, rate=5)
00108         self.assertEqual(t1, self._vel_cmd())
00109 
00110         # Publish twist from input3, it should have priority
00111         # over the one from input1.
00112         self._vel3.pub(t2, rate=10)
00113         self.assertEqual(t2, self._vel_cmd())
00114 
00115         # Stop publishing input 3 and wait for it to timeout.
00116         # Speed should fall back to input 1.
00117         self._vel3.stop()
00118         rospy.sleep(0.5)  # input is 0.3 in .yaml file
00119         self.assertEqual(t1, self._vel_cmd())
00120 
00121 # TODO: test limits, test timeouts, etc.
00122 
00123 if __name__ == '__main__':
00124     import rostest
00125     PKG_NAME = 'twist_mux'
00126     TEST_NAME = '%s_system_blackbox_test' % PKG_NAME
00127     rospy.init_node(TEST_NAME)
00128     rostest.rosrun(PKG_NAME, TEST_NAME, TestTwistMux)


twist_mux
Author(s): Enrique Fernandez , Siegfried-A. Gevatter Pujals
autogenerated on Sat Jun 8 2019 20:13:46