37 from __future__
import print_function
40 NAME =
'test_pubsub_order' 51 LPNODE =
'listenerpublisher' 52 LPTOPIC =
'listenerpublisher' 68 self.assert_(self.
callback_data is None,
"invalid test fixture")
71 timeout_t = time.time() + 5.0
72 while not rostest.is_subscriber(
73 rospy.resolve_name(PUBTOPIC),
74 rospy.resolve_name(LPNODE))
and time.time() < timeout_t:
77 self.assert_(rostest.is_subscriber(
78 rospy.resolve_name(PUBTOPIC),
79 rospy.resolve_name(LPNODE)),
"%s is not up"%LPNODE)
81 print(
"Publishing to ", PUBTOPIC)
82 pub = rospy.Publisher(PUBTOPIC, MSG)
87 val = random.randint(0, 109812312)
89 for i
in range(0, 10):
95 self.assert_(self.
callback_data is not None,
"no callback data from listenerpublisher")
96 self.assertEquals(msg, self.callback_data.data,
"callback data from listenerpublisher does not match")
98 if __name__ ==
'__main__':
100 rostest.run(PKG, NAME, TestPubSubOrder, sys.argv)
def _test_subscriber_first_callback(self, data)
def test_subscriber_first(self)
Test subscriber first makes sure that if a subscriber is up first that it is able to successfully rec...