2 from __future__
import print_function
8 from time
import sleep, time
16 PKG =
'rosbridge_library'
17 NAME =
'test_publisher_consistency_listener'
26 """ See whether the listener can correctly time out """
27 topic =
"/test_listener_timeout"
30 publisher = rospy.Publisher(topic, type)
33 listener.attach(publisher)
35 self.assertFalse(listener.timed_out())
37 sleep(listener.timeout / 2.0)
39 self.assertFalse(listener.timed_out())
41 sleep(listener.timeout / 2.0 + 0.1)
43 self.assertTrue(listener.timed_out())
46 """ See whether the listener actually attaches and detaches itself """
47 topic =
"/test_listener_attach_detach"
50 publisher = rospy.Publisher(topic, type)
51 orig_publish = publisher.publish
54 listener_publish = listener.publish_override
56 self.assertNotEqual(orig_publish, listener_publish)
57 self.assertNotIn(listener, publisher.impl.subscriber_listeners)
59 listener.attach(publisher)
61 self.assertEqual(publisher.publish, listener_publish)
62 self.assertNotEqual(publisher.publish, orig_publish)
63 self.assertIn(listener, publisher.impl.subscriber_listeners)
67 self.assertEqual(publisher.publish, orig_publish)
68 self.assertNotEqual(publisher.publish, listener_publish)
69 self.assertNotIn(listener, publisher.impl.subscriber_listeners)
72 """ This test makes sure the failure case that the PublisherConsistency
73 Listener is trying to solve, is indeed a failure case """
74 topic =
"/test_immediate_publish_fails_without"
78 string =
"why halo thar"
81 received = {
"msg":
None}
85 rospy.Subscriber(topic, msg_class, callback)
86 publisher = rospy.Publisher(topic, msg_class)
87 publisher.publish(msg)
90 self.assertNotEqual(received[
"msg"], msg)
91 self.assertEqual(received[
"msg"],
None)
94 """ This test makes sure the PublisherConsistencyListener is working"""
95 topic =
"/test_immediate_publish"
99 string =
"why halo thar"
102 received = {
"msg":
None}
104 print(
"Received a msg! ", msg)
105 received[
"msg"] = msg
107 rospy.Subscriber(topic, msg_class, callback)
109 class temp_listener(rospy.SubscribeListener):
110 def peer_subscribe(self, topic_name, topic_publish, peer_publish):
111 print(
"peer subscribe in temp listener")
114 publisher = rospy.Publisher(topic, msg_class, temp_listener())
115 listener.attach(publisher)
116 publisher.publish(msg)
119 self.assertEqual(received[
"msg"], msg)
122 """ This test makes sure the failure case that the PublisherConsistency
123 Listener is trying to solve, is indeed a failure case, even for large
125 topic =
"/test_immediate_multi_publish_fails_without"
134 received = {
"msgs": []}
136 received[
"msgs"].append(msg)
138 rospy.Subscriber(topic, msg_class, callback)
140 publisher = rospy.Publisher(topic, msg_class)
142 publisher.publish(msg)
145 self.assertEqual(len(received[
"msgs"]), 0)
146 self.assertNotEqual(received[
"msgs"], msgs)
149 """ This test makes sure the PublisherConsistencyListener is working
150 even with a huge message buffer"""
151 topic =
"/test_immediate_multi_publish"
160 received = {
"msgs": []}
162 received[
"msgs"].append(msg)
164 rospy.Subscriber(topic, msg_class, callback)
167 publisher = rospy.Publisher(topic, msg_class)
168 listener.attach(publisher)
170 publisher.publish(msg)
173 self.assertEqual(len(received[
"msgs"]), len(msgs))
174 self.assertEqual(received[
"msgs"], msgs)
177 if __name__ ==
'__main__':
178 rosunit.unitrun(PKG, NAME, TestPublisherConsistencyListener)