test_publisher_consistency_listener.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys
4 import rospy
5 import rostest
6 import unittest
7 
8 from time import sleep, time
9 
11 from rosbridge_library.internal import ros_loader
13 from std_msgs.msg import String, Int32
14 
15 
16 class TestPublisherConsistencyListener(unittest.TestCase):
17 
18  def setUp(self):
19  rospy.init_node("test_publisher_consistency_listener")
20 
22  """ See whether the listener can correctly time out """
23  topic = "/test_listener_timeout"
24  type = String
25 
26  publisher = rospy.Publisher(topic, type)
27 
28  listener = PublisherConsistencyListener()
29  listener.attach(publisher)
30 
31  self.assertFalse(listener.timed_out())
32 
33  sleep(listener.timeout / 2.0)
34 
35  self.assertFalse(listener.timed_out())
36 
37  sleep(listener.timeout / 2.0 + 0.1)
38 
39  self.assertTrue(listener.timed_out())
40 
42  """ See whether the listener actually attaches and detaches itself """
43  topic = "/test_listener_attach_detach"
44  type = String
45 
46  publisher = rospy.Publisher(topic, type)
47  orig_publish = publisher.publish
48 
49  listener = PublisherConsistencyListener()
50  listener_publish = listener.publish_override
51 
52  self.assertNotEqual(orig_publish, listener_publish)
53  self.assertNotIn(listener, publisher.impl.subscriber_listeners)
54 
55  listener.attach(publisher)
56 
57  self.assertEqual(publisher.publish, listener_publish)
58  self.assertNotEqual(publisher.publish, orig_publish)
59  self.assertIn(listener, publisher.impl.subscriber_listeners)
60 
61  listener.detach()
62 
63  self.assertEqual(publisher.publish, orig_publish)
64  self.assertNotEqual(publisher.publish, listener_publish)
65  self.assertNotIn(listener, publisher.impl.subscriber_listeners)
66 
68  """ This test makes sure the failure case that the PublisherConsistency
69  Listener is trying to solve, is indeed a failure case """
70  topic = "/test_immediate_publish_fails_without"
71  msg_class = String
72 
73  msg = String()
74  string = "why halo thar"
75  msg.data = string
76 
77  received = {"msg": None}
78  def callback(msg):
79  received["msg"] = msg
80 
81  rospy.Subscriber(topic, msg_class, callback)
82  publisher = rospy.Publisher(topic, msg_class)
83  publisher.publish(msg)
84  sleep(0.5)
85 
86  self.assertNotEqual(received["msg"], msg)
87  self.assertEqual(received["msg"], None)
88 
90  """ This test makes sure the PublisherConsistencyListener is working"""
91  topic = "/test_immediate_publish"
92  msg_class = String
93 
94  msg = String()
95  string = "why halo thar"
96  msg.data = string
97 
98  received = {"msg": None}
99  def callback(msg):
100  print("Received a msg! ", msg)
101  received["msg"] = msg
102 
103  rospy.Subscriber(topic, msg_class, callback)
104 
105  class temp_listener(rospy.SubscribeListener):
106  def peer_subscribe(self, topic_name, topic_publish, peer_publish):
107  print("peer subscribe in temp listener")
108 
109  listener = PublisherConsistencyListener()
110  publisher = rospy.Publisher(topic, msg_class, temp_listener())
111  listener.attach(publisher)
112  publisher.publish(msg)
113  sleep(0.5)
114 
115  self.assertEqual(received["msg"], msg)
116 
118  """ This test makes sure the failure case that the PublisherConsistency
119  Listener is trying to solve, is indeed a failure case, even for large
120  message buffers """
121  topic = "/test_immediate_multi_publish_fails_without"
122  msg_class = Int32
123 
124  msgs = []
125  for i in range(100):
126  msg = Int32()
127  msg.data = i
128  msgs.append(msg)
129 
130  received = {"msgs": []}
131  def callback(msg):
132  received["msgs"].append(msg)
133 
134  rospy.Subscriber(topic, msg_class, callback)
135 
136  publisher = rospy.Publisher(topic, msg_class)
137  for msg in msgs:
138  publisher.publish(msg)
139  sleep(0.5)
140 
141  self.assertEqual(len(received["msgs"]), 0)
142  self.assertNotEqual(received["msgs"], msgs)
143 
145  """ This test makes sure the PublisherConsistencyListener is working
146  even with a huge message buffer"""
147  topic = "/test_immediate_multi_publish"
148  msg_class = Int32
149 
150  msgs = []
151  for i in range(100):
152  msg = Int32()
153  msg.data = i
154  msgs.append(msg)
155 
156  received = {"msgs": []}
157  def callback(msg):
158  received["msgs"].append(msg)
159 
160  rospy.Subscriber(topic, msg_class, callback)
161 
162  listener = PublisherConsistencyListener()
163  publisher = rospy.Publisher(topic, msg_class)
164  listener.attach(publisher)
165  for msg in msgs:
166  publisher.publish(msg)
167  sleep(0.5)
168 
169  self.assertEqual(len(received["msgs"]), len(msgs))
170  self.assertEqual(received["msgs"], msgs)
171 
172 
173 PKG = 'rosbridge_library'
174 NAME = 'test_publisher_consistency_listener'
175 if __name__ == '__main__':
176  rostest.unitrun(PKG, NAME, TestPublisherConsistencyListener)
177 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri May 10 2019 02:17:02