37 from __future__
import print_function
40 NAME =
'test_pubsub_order'
51 LPNODE =
'listenerpublisher'
52 LPTOPIC =
'listenerpublisher'
68 self.assertTrue(self.
callback_data is None,
"invalid test fixture")
71 timeout_t = time.time() + 5.0
72 while not rostest.is_subscriber(
73 rospy.resolve_name(PUBTOPIC),
74 rospy.resolve_name(LPNODE))
and time.time() < timeout_t:
77 self.assertTrue(rostest.is_subscriber(
78 rospy.resolve_name(PUBTOPIC),
79 rospy.resolve_name(LPNODE)),
"%s is not up"%LPNODE)
81 print(
"Publishing to ", PUBTOPIC)
82 pub = rospy.Publisher(PUBTOPIC, MSG, queue_size=0)
87 val = random.randint(0, 109812312)
89 for i
in range(0, 10):
95 self.assertTrue(self.
callback_data is not None,
"no callback data from listenerpublisher")
96 self.assertEqual(msg, self.
callback_data.data,
"callback data from listenerpublisher does not match")
98 if __name__ ==
'__main__':
100 rostest.run(PKG, NAME, TestPubSubOrder, sys.argv)