Go to the documentation of this file.00001
00002
00003 import sys
00004 import unittest
00005 import rospy
00006 from std_msgs.msg import String
00007 import rosunit
00008
00009 class TestListener(unittest.TestCase):
00010
00011 def setUp(self):
00012 rospy.init_node('listener', anonymous=True)
00013 rospy.Subscriber("chatter", String, self.listener_callback)
00014 self.received_data = None
00015
00016 def listener_callback(self, talker):
00017 rospy.loginfo("Listener: I heard %s" % talker.data)
00018 self.received_data = talker.data
00019
00020 def test_receive_data(self):
00021 start_time = rospy.Time.now()
00022 timeout = rospy.Duration(5.0)
00023 timed_out = False
00024 while not rospy.is_shutdown() and not self.received_data and not (rospy.Time.now() - start_time > timeout):
00025 rospy.loginfo("Listener: waiting for data")
00026 rospy.rostime.wallsleep(0.2)
00027 self.assertEquals("dude", self.received_data)
00028
00029 def tearDown(self):
00030 pass
00031
00032 NAME = 'test_listener'
00033 if __name__ == '__main__':
00034 rosunit.unitrun('test_listener', NAME, TestListener, sys.argv, coverage_packages=['rocon_test'])