Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 import unittest
00026 
00027 import rospy
00028 from std_msgs.msg import Bool
00029 from geometry_msgs.msg import Twist
00030 
00031 from rate_publishers import RatePublishers, TimeoutManager
00032 
00033 def twist(x=0.0, r=0.0):
00034     """
00035     Returns a Twist for the given linear and rotation speed.
00036     """
00037     t = Twist()
00038     t.linear.x = x
00039     t.angular.z = r
00040     return t
00041 
00042 class TestTwistMux(unittest.TestCase):
00043 
00044     
00045     
00046     MESSAGE_TIMEOUT = 0.3
00047 
00048     
00049     TOPIC_TIMEOUT = 1.0
00050 
00051     @classmethod
00052     def setUpClass(cls):
00053         cls._publishers = RatePublishers()
00054         cls._vel1 = cls._publishers.add_topic('vel_1', Twist)
00055         cls._vel2 = cls._publishers.add_topic('vel_2', Twist)
00056         cls._vel3 = cls._publishers.add_topic('vel_3', Twist)
00057 
00058         cls._lock1 = cls._publishers.add_topic('lock_1', Bool)
00059         cls._lock2 = cls._publishers.add_topic('lock_2', Bool)
00060 
00061         cls._timeout_manager = TimeoutManager()
00062         cls._timeout_manager.add(cls._publishers)
00063         cls._timeout_manager.spin_thread()
00064 
00065     def tearDown(self):
00066         
00067         t = twist(0, 0)
00068         l = Bool(False)
00069 
00070         self._vel1.pub(t)
00071         self._vel2.pub(t)
00072         self._vel3.pub(t)
00073 
00074         self._lock1.pub(l)
00075         self._lock2.pub(l)
00076 
00077         
00078         
00079         
00080         
00081         
00082         rospy.sleep(self.MESSAGE_TIMEOUT + self.TOPIC_TIMEOUT)
00083 
00084     @classmethod
00085     def _vel_cmd(cls):
00086         rospy.sleep(cls.MESSAGE_TIMEOUT)
00087         return rospy.wait_for_message('cmd_vel_out', Twist,
00088                                       timeout=cls.MESSAGE_TIMEOUT)
00089 
00090     def test_empty(self):
00091         try:
00092             self._vel_cmd()
00093             self.fail('twist_mux should not be publishing without any input')
00094         except rospy.ROSException:
00095             pass
00096 
00097     def test_basic(self):
00098         t = twist(2.0)
00099         self._vel1.pub(t, rate=5)
00100         self.assertEqual(t, self._vel_cmd())
00101 
00102     def test_basic_with_priorities(self):
00103         t1 = twist(2.0)
00104         t2 = twist(0, 1.0)
00105 
00106         
00107         self._vel1.pub(t1, rate=5)
00108         self.assertEqual(t1, self._vel_cmd())
00109 
00110         
00111         
00112         self._vel3.pub(t2, rate=10)
00113         self.assertEqual(t2, self._vel_cmd())
00114 
00115         
00116         
00117         self._vel3.stop()
00118         rospy.sleep(0.5)  
00119         self.assertEqual(t1, self._vel_cmd())
00120 
00121 
00122 
00123 if __name__ == '__main__':
00124     import rostest
00125     PKG_NAME = 'twist_mux'
00126     TEST_NAME = '%s_system_blackbox_test' % PKG_NAME
00127     rospy.init_node(TEST_NAME)
00128     rostest.rosrun(PKG_NAME, TEST_NAME, TestTwistMux)