Go to the documentation of this file.00001
00002
00003 import os
00004 import sys
00005
00006 import unittest
00007
00008 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..",
00009 "scripts")))
00010
00011 import rospy
00012
00013 try:
00014 from std_msgs.msg import String
00015 except:
00016 import roslib; roslib.load_manifest("jsk_topic_tools")
00017 from std_msgs.msg import String
00018
00019
00020 class TestBlock(unittest.TestCase):
00021 input_msg = None
00022 subscriber = None
00023 def reset_subscribers(self):
00024 global running
00025 if self.subscriber:
00026 self.subscriber.unregister()
00027 seff.subscriber = None
00028 self._input_msg = None
00029 running = False
00030 def cb(self, msg):
00031 self.input_msg = msg
00032 def test_no_(self):
00033 global running
00034
00035 self.reset_subscribers()
00036 rospy.loginfo("wait 10 seconds...")
00037 rospy.sleep(10)
00038 self.assertTrue(self.input_msg == None)
00039 def test_sub(self):
00040 global running
00041 self.reset_subscribers()
00042 self.subscribe = rospy.Subscriber("/output", String, self.cb)
00043 rospy.loginfo("wait 10 seconds...")
00044 rospy.sleep(10)
00045 self.assertFalse(self.input_msg == None)
00046
00047 running = False
00048 output_original_pub = None
00049 def topic_cb(msg):
00050 global running
00051 running = True
00052 output_original_pub.publish(msg)
00053
00054 if __name__ == "__main__":
00055 import rostest
00056 rospy.init_node("test_blocke")
00057 output_original_pub = rospy.Publisher("/output_original", String)
00058 s = rospy.Subscriber("/input_original", String, topic_cb)
00059 rostest.rosrun("jsk_topic_tools", "test_block", TestBlock)
00060