Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 from __future__ import print_function
00038
00039 PKG = 'test_rospy'
00040 NAME = 'test_pubsub_order'
00041
00042 import sys
00043 import time
00044 import unittest
00045
00046 import rospy
00047 import rostest
00048 from std_msgs.msg import String
00049
00050 PUBTOPIC = "chatter"
00051 LPNODE = 'listenerpublisher'
00052 LPTOPIC = 'listenerpublisher'
00053 MSG = String
00054
00055 TIMEOUT = 10.0
00056
00057 class TestPubSubOrder(unittest.TestCase):
00058
00059 def setUp(self):
00060 self.callback_data = None
00061
00062 def _test_subscriber_first_callback(self, data):
00063 self.callback_data = data
00064
00065
00066
00067 def test_subscriber_first(self):
00068 self.assert_(self.callback_data is None, "invalid test fixture")
00069
00070
00071 timeout_t = time.time() + 5.0
00072 while not rostest.is_subscriber(
00073 rospy.resolve_name(PUBTOPIC),
00074 rospy.resolve_name(LPNODE)) and time.time() < timeout_t:
00075 time.sleep(0.1)
00076
00077 self.assert_(rostest.is_subscriber(
00078 rospy.resolve_name(PUBTOPIC),
00079 rospy.resolve_name(LPNODE)), "%s is not up"%LPNODE)
00080
00081 print("Publishing to ", PUBTOPIC)
00082 pub = rospy.Publisher(PUBTOPIC, MSG)
00083 rospy.Subscriber(LPTOPIC, MSG, self._test_subscriber_first_callback)
00084
00085
00086 import random
00087 val = random.randint(0, 109812312)
00088 msg = "hi [%s]"%val
00089 for i in range(0, 10):
00090 pub.publish(MSG(msg))
00091 time.sleep(0.1)
00092
00093
00094
00095 self.assert_(self.callback_data is not None, "no callback data from listenerpublisher")
00096 self.assertEquals(msg, self.callback_data.data, "callback data from listenerpublisher does not match")
00097
00098 if __name__ == '__main__':
00099 rospy.init_node(NAME)
00100 rostest.run(PKG, NAME, TestPubSubOrder, sys.argv)