2 from __future__
import print_function
8 from time
import sleep, time
19 rospy.init_node(
"test_publisher_consistency_listener")
22 """ See whether the listener can correctly time out """ 23 topic =
"/test_listener_timeout" 26 publisher = rospy.Publisher(topic, type)
29 listener.attach(publisher)
31 self.assertFalse(listener.timed_out())
33 sleep(listener.timeout / 2.0)
35 self.assertFalse(listener.timed_out())
37 sleep(listener.timeout / 2.0 + 0.1)
39 self.assertTrue(listener.timed_out())
42 """ See whether the listener actually attaches and detaches itself """ 43 topic =
"/test_listener_attach_detach" 46 publisher = rospy.Publisher(topic, type)
47 orig_publish = publisher.publish
50 listener_publish = listener.publish_override
52 self.assertNotEqual(orig_publish, listener_publish)
53 self.assertNotIn(listener, publisher.impl.subscriber_listeners)
55 listener.attach(publisher)
57 self.assertEqual(publisher.publish, listener_publish)
58 self.assertNotEqual(publisher.publish, orig_publish)
59 self.assertIn(listener, publisher.impl.subscriber_listeners)
63 self.assertEqual(publisher.publish, orig_publish)
64 self.assertNotEqual(publisher.publish, listener_publish)
65 self.assertNotIn(listener, publisher.impl.subscriber_listeners)
68 """ This test makes sure the failure case that the PublisherConsistency 69 Listener is trying to solve, is indeed a failure case """ 70 topic =
"/test_immediate_publish_fails_without" 74 string =
"why halo thar" 77 received = {
"msg":
None}
81 rospy.Subscriber(topic, msg_class, callback)
82 publisher = rospy.Publisher(topic, msg_class)
83 publisher.publish(msg)
86 self.assertNotEqual(received[
"msg"], msg)
87 self.assertEqual(received[
"msg"],
None)
90 """ This test makes sure the PublisherConsistencyListener is working""" 91 topic =
"/test_immediate_publish" 95 string =
"why halo thar" 98 received = {
"msg":
None}
100 print(
"Received a msg! ", msg)
101 received[
"msg"] = msg
103 rospy.Subscriber(topic, msg_class, callback)
105 class temp_listener(rospy.SubscribeListener):
106 def peer_subscribe(self, topic_name, topic_publish, peer_publish):
107 print(
"peer subscribe in temp listener")
110 publisher = rospy.Publisher(topic, msg_class, temp_listener())
111 listener.attach(publisher)
112 publisher.publish(msg)
115 self.assertEqual(received[
"msg"], msg)
118 """ This test makes sure the failure case that the PublisherConsistency 119 Listener is trying to solve, is indeed a failure case, even for large 121 topic =
"/test_immediate_multi_publish_fails_without" 130 received = {
"msgs": []}
132 received[
"msgs"].append(msg)
134 rospy.Subscriber(topic, msg_class, callback)
136 publisher = rospy.Publisher(topic, msg_class)
138 publisher.publish(msg)
141 self.assertEqual(len(received[
"msgs"]), 0)
142 self.assertNotEqual(received[
"msgs"], msgs)
145 """ This test makes sure the PublisherConsistencyListener is working 146 even with a huge message buffer""" 147 topic =
"/test_immediate_multi_publish" 156 received = {
"msgs": []}
158 received[
"msgs"].append(msg)
160 rospy.Subscriber(topic, msg_class, callback)
163 publisher = rospy.Publisher(topic, msg_class)
164 listener.attach(publisher)
166 publisher.publish(msg)
169 self.assertEqual(len(received[
"msgs"]), len(msgs))
170 self.assertEqual(received[
"msgs"], msgs)
173 PKG =
'rosbridge_library' 174 NAME =
'test_publisher_consistency_listener' 175 if __name__ ==
'__main__':
176 rostest.unitrun(PKG, NAME, TestPublisherConsistencyListener)
def test_immediate_publish(self)
def test_immediate_multi_publish(self)
def test_immediate_publish_fails_without(self)
def test_listener_attach_detach(self)
def test_listener_timeout(self)
def test_immediate_multi_publish_fails_without(self)