Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 import unittest
00026
00027 import rospy
00028 from std_msgs.msg import Bool
00029 from geometry_msgs.msg import Twist
00030
00031 from rate_publishers import RatePublishers, TimeoutManager
00032
00033 def twist(x=0.0, r=0.0):
00034 """
00035 Returns a Twist for the given linear and rotation speed.
00036 """
00037 t = Twist()
00038 t.linear.x = x
00039 t.angular.z = r
00040 return t
00041
00042 class TestTwistMux(unittest.TestCase):
00043
00044
00045
00046 MESSAGE_TIMEOUT = 0.3
00047
00048
00049 TOPIC_TIMEOUT = 1.0
00050
00051 @classmethod
00052 def setUpClass(cls):
00053 cls._publishers = RatePublishers()
00054 cls._vel1 = cls._publishers.add_topic('vel_1', Twist)
00055 cls._vel2 = cls._publishers.add_topic('vel_2', Twist)
00056 cls._vel3 = cls._publishers.add_topic('vel_3', Twist)
00057
00058 cls._lock1 = cls._publishers.add_topic('lock_1', Bool)
00059 cls._lock2 = cls._publishers.add_topic('lock_2', Bool)
00060
00061 cls._timeout_manager = TimeoutManager()
00062 cls._timeout_manager.add(cls._publishers)
00063 cls._timeout_manager.spin_thread()
00064
00065 def tearDown(self):
00066
00067 t = twist(0, 0)
00068 l = Bool(False)
00069
00070 self._vel1.pub(t)
00071 self._vel2.pub(t)
00072 self._vel3.pub(t)
00073
00074 self._lock1.pub(l)
00075 self._lock2.pub(l)
00076
00077
00078
00079
00080
00081
00082 rospy.sleep(self.MESSAGE_TIMEOUT + self.TOPIC_TIMEOUT)
00083
00084 @classmethod
00085 def _vel_cmd(cls):
00086 rospy.sleep(cls.MESSAGE_TIMEOUT)
00087 return rospy.wait_for_message('cmd_vel_out', Twist,
00088 timeout=cls.MESSAGE_TIMEOUT)
00089
00090 def test_empty(self):
00091 try:
00092 self._vel_cmd()
00093 self.fail('twist_mux should not be publishing without any input')
00094 except rospy.ROSException:
00095 pass
00096
00097 def test_basic(self):
00098 t = twist(2.0)
00099 self._vel1.pub(t, rate=5)
00100 self.assertEqual(t, self._vel_cmd())
00101
00102 def test_basic_with_priorities(self):
00103 t1 = twist(2.0)
00104 t2 = twist(0, 1.0)
00105
00106
00107 self._vel1.pub(t1, rate=5)
00108 self.assertEqual(t1, self._vel_cmd())
00109
00110
00111
00112 self._vel3.pub(t2, rate=10)
00113 self.assertEqual(t2, self._vel_cmd())
00114
00115
00116
00117 self._vel3.stop()
00118 rospy.sleep(0.5)
00119 self.assertEqual(t1, self._vel_cmd())
00120
00121
00122
00123 if __name__ == '__main__':
00124 import rostest
00125 PKG_NAME = 'twist_mux'
00126 TEST_NAME = '%s_system_blackbox_test' % PKG_NAME
00127 rospy.init_node(TEST_NAME)
00128 rostest.rosrun(PKG_NAME, TEST_NAME, TestTwistMux)