Go to the documentation of this file.00001
00002
00003 import unittest
00004
00005 import rospy
00006
00007 try:
00008 from std_msgs.msg import String
00009 except:
00010 import roslib
00011 roslib.load_manifest("jsk_topic_tools")
00012 from std_msgs.msg import String
00013
00014
00015 PKG = 'jsk_topic_tools'
00016 NAME = 'test_block'
00017
00018
00019 class TestBlock(unittest.TestCase):
00020 input_msg = None
00021 subscriber = None
00022
00023 def __init__(self, *args):
00024 super(TestBlock, self).__init__(*args)
00025 rospy.init_node(NAME)
00026 self.output_original_pub = rospy.Publisher(
00027 'output_original', String, queue_size=1)
00028 self.input_original_sub = rospy.Subscriber(
00029 'input_original', String, self.original_topic_cb)
00030
00031 def original_topic_cb(self, msg):
00032 self.running = True
00033 self.output_original_pub.publish(msg)
00034
00035 def reset_subscriber(self):
00036 if self.subscriber:
00037 self.subscriber.unregister()
00038 self.subscriber = None
00039 self.input_msg = None
00040 self.running = False
00041
00042 def test_subscribe(self):
00043 self.reset_subscriber()
00044 self.subscriber = rospy.Subscriber("output", String, self.cb)
00045 rospy.loginfo("wait 10 seconds...")
00046 rospy.sleep(10)
00047 self.assertFalse(self.input_msg is None)
00048
00049 def cb(self, msg):
00050 self.input_msg = msg
00051
00052 def test_no_subscribe(self):
00053 self.reset_subscriber()
00054
00055 rospy.loginfo("wait 10 seconds...")
00056 rospy.sleep(10)
00057 self.assertTrue(self.input_msg is None)
00058
00059
00060 if __name__ == "__main__":
00061 import rostest
00062 rostest.rosrun(PKG, NAME, TestBlock)