14 rospy.init_node(
"test_message_handlers")
20 handler = subscribe.MessageHandler(
None, self.dummy_cb)
21 self.help_test_default(handler)
24 handler = subscribe.ThrottleMessageHandler(subscribe.MessageHandler(
None, self.
dummy_cb))
28 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(
None, self.
dummy_cb))
33 received = {
"msgs": []}
36 received[
"msgs"].append(msg)
38 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(
None, cb))
40 self.assertTrue(handler.is_alive())
44 self.assertFalse(handler.is_alive())
47 received = {
"msgs": []}
50 received[
"msgs"].append(msg)
54 handler = subscribe.MessageHandler(
None, cb)
56 handler = handler.set_throttle_rate(10000)
57 handler = handler.set_queue_length(10)
58 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
61 handler.handle_message(
"hello")
69 handler.handle_message(x)
71 handler = handler.set_throttle_rate(0)
76 self.assertEqual([
"hello"] + list(range(990, 1000)), received[
"msgs"])
84 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
91 handler = handler.set_queue_length(0)
92 handler = handler.set_throttle_rate(0)
93 self.assertIsInstance(handler, subscribe.MessageHandler)
95 msg =
"test_default_message_handler" 96 received = {
"msg":
None}
102 self.assertTrue(handler.time_remaining() == 0)
104 handler.handle_message(msg)
107 self.assertEqual(received[
"msg"], msg)
108 self.assertLessEqual(t1, handler.last_publish)
109 self.assertLessEqual(handler.last_publish, t2)
110 self.assertEqual(handler.time_remaining(), 0)
112 received = {
"msgs": []}
114 received[
"msgs"].append(msg)
116 xs = list(range(10000))
118 handler.handle_message(x)
120 self.assertEqual(received[
"msgs"], xs)
124 handler = handler.set_queue_length(0)
125 handler = handler.set_throttle_rate(throttle_rate)
126 self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
128 msg =
"test_throttle_message_handler" 131 received = {
"msg":
None}
134 received[
"msg"] = msg
139 time.sleep(2.0 * handler.throttle_rate)
140 handler.handle_message(msg)
141 self.assertEqual(received[
"msg"], msg)
144 time.sleep(2.0 * handler.throttle_rate)
146 received = {
"msgs": []}
148 received[
"msgs"].append(msg)
152 time_padding = handler.throttle_rate / 4.0
153 for i
in range(1, 10):
158 time.sleep(2.0 * time_padding)
159 fin = time.time() + throttle_rate / 1000.0 - time_padding
160 while time.time() < fin:
161 handler.handle_message(x)
163 self.assertEqual(len(received[
"msgs"]), i)
167 handler = handler.set_queue_length(queue_length)
168 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
170 received = {
"msgs": []}
173 received[
"msgs"].append(msg)
177 msgs = list(range(queue_length))
179 handler.handle_message(x)
183 self.assertEqual(msgs, received[
"msgs"])
187 handler = handler.set_throttle_rate(throttle_rate)
188 handler = handler.set_queue_length(queue_length)
189 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
191 received = {
"msg":
None}
194 received[
"msg"] = msg
198 throttle_rate_sec = throttle_rate / 1000.0
201 time.sleep(throttle_rate_sec)
202 for x
in range(queue_length):
203 handler.handle_message(x)
205 time.sleep(throttle_rate_sec / 2.0)
209 self.assertEqual(x, received[
"msg"])
210 time.sleep(throttle_rate_sec)
220 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
221 next_handler = handler.transition()
222 self.assertEqual(handler, next_handler)
225 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
226 next_handler = handler.set_throttle_rate(100)
227 self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
228 handler = next_handler
230 next_handler = handler.transition()
231 self.assertEqual(handler, next_handler)
233 next_handler = handler.set_throttle_rate(0)
234 self.assertIsInstance(next_handler, subscribe.MessageHandler)
237 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
238 next_handler = handler.set_queue_length(100)
239 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
240 handler = next_handler
241 next_handler = handler.transition()
242 self.assertEqual(handler, next_handler)
243 next_handler = handler.set_queue_length(0)
244 self.assertIsInstance(next_handler, subscribe.MessageHandler)
247 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
248 next_handler = handler.set_queue_length(100).set_throttle_rate(100)
249 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
250 next_handler.finish()
251 next_handler = handler.set_throttle_rate(100).set_queue_length(100)
252 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
253 next_handler.finish()
254 handler = next_handler
255 next_handler = handler.transition()
256 self.assertEqual(handler, next_handler)
258 next_handler = handler.set_throttle_rate(0)
259 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
260 next_handler = handler.set_queue_length(0)
261 self.assertIsInstance(next_handler, subscribe.MessageHandler)
265 handler = subscribe.MessageHandler(
None,
None)
269 handler = subscribe.MessageHandler(
None,
None)
273 handler = subscribe.MessageHandler(
None,
None)
278 handler = subscribe.MessageHandler(
None,
None)
284 handler = subscribe.MessageHandler(
None,
None)
290 handler = subscribe.MessageHandler(
None,
None)
296 handler = subscribe.MessageHandler(
None,
None)
302 handler = subscribe.MessageHandler(
None,
None)
308 handler = subscribe.MessageHandler(
None,
None)
315 handler = subscribe.MessageHandler(
None,
None)
320 handler = subscribe.MessageHandler(
None,
None)
325 handler = subscribe.MessageHandler(
None,
None)
338 PKG =
'rosbridge_library' 339 NAME =
'test_message_handlers' 340 if __name__ ==
'__main__':
341 rostest.unitrun(PKG, NAME, TestMessageHandlers)
def test_queue_message_handler_passes_msgs(self)
def test_queue_message_handler_rate(self)
def help_test_queue(self, handler, queue_length)
def help_test_default(self, handler)
def help_test_throttle(self, handler, throttle_rate)
def test_queue_message_handler_queue(self)
def test_queue_message_handler_stops(self)
def test_default_message_handler(self)
def test_transition_functionality(self)
def test_transitions(self)
def help_test_queue_rate(self, handler, throttle_rate, queue_length)
def test_throttle_message_handler(self)