Go to the documentation of this file.00001
00002
00003
00004
00005 import rospy
00006 import unittest
00007 from std_msgs.msg import String
00008
00009
00010 class TestStealthRelay(unittest.TestCase):
00011 def out_callback(self, msg):
00012 self.out_msg_count += 1
00013
00014 def monitor_callback(self, msg):
00015 self.monitor_msg_count += 1
00016
00017 def test_stealth_relay(self):
00018 self.out_msg_count = 0
00019 self.monitor_msg_count = 0
00020 sub_out = rospy.Subscriber("/stealth_relay/output", String,
00021 self.out_callback, queue_size=1)
00022 for i in range(5):
00023 if sub_out.get_num_connections() == 0:
00024 rospy.sleep(1)
00025 self.assertTrue(sub_out.get_num_connections() > 0)
00026
00027 rospy.sleep(5)
00028 self.assertEqual(self.out_msg_count, 0)
00029
00030 sub_monitor = rospy.Subscriber("/original_topic/relay", String,
00031 self.monitor_callback, queue_size=1)
00032 rospy.sleep(5)
00033 self.assertGreater(self.monitor_msg_count, 0)
00034 self.assertGreater(self.out_msg_count, 0)
00035
00036 cnt = self.out_msg_count
00037 sub_monitor.unregister()
00038
00039 rospy.sleep(3)
00040 self.assertLess(abs(cnt - self.out_msg_count), 30)
00041
00042
00043 if __name__ == '__main__':
00044 import rostest
00045 rospy.init_node("test_stealth_relay")
00046 rostest.rosrun("jsk_topic_tools", "test_stealth_relay", TestStealthRelay)