test_subscription_modifiers.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 import time
7 
8 from rosbridge_library.internal import subscription_modifiers as subscribe
9 
10 
11 class TestMessageHandlers(unittest.TestCase):
12 
13  def setUp(self):
14  rospy.init_node("test_message_handlers")
15 
16  def dummy_cb(self, msg):
17  pass
18 
20  handler = subscribe.MessageHandler(None, self.dummy_cb)
21  self.help_test_default(handler)
22 
24  handler = subscribe.ThrottleMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
25  self.help_test_throttle(handler, 50)
26 
28  handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
29  self.help_test_queue(handler, 1000)
30  handler.finish()
31 
33  received = {"msgs": []}
34 
35  def cb(msg):
36  received["msgs"].append(msg)
37 
38  handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
39 
40  self.assertTrue(handler.is_alive())
41 
42  handler.finish()
43 
44  self.assertFalse(handler.is_alive())
45 
47  received = {"msgs": []}
48 
49  def cb(msg):
50  received["msgs"].append(msg)
51 
52  msgs = range(1000)
53 
54  handler = subscribe.MessageHandler(None, cb)
55 
56  handler = handler.set_throttle_rate(10000)
57  handler = handler.set_queue_length(10)
58  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
59 
60  # 'hello' is handled immediately
61  handler.handle_message("hello")
62  time.sleep(0.02)
63  # queue is now empty, but throttling is in effect
64  # no messages will be handled in the next 10 seconds
65 
66  # these will fill up the queue, with newer values displacing old ones
67  # nothing gets sent because the throttle rate
68  for x in msgs:
69  handler.handle_message(x)
70 
71  handler = handler.set_throttle_rate(0)
72 
73  time.sleep(0.1)
74 
75  try:
76  self.assertEqual(["hello"] + list(range(990, 1000)), received["msgs"])
77  except:
78  handler.finish()
79  raise
80 
81  handler.finish()
82 
84  received = {"msgs": []}
85 
86  def cb(msg):
87  received["msgs"].append(msg)
88  time.sleep(1)
89 
90  queue_length = 5
91  msgs = range(queue_length * 5)
92 
93  handler = subscribe.MessageHandler(None, cb)
94 
95  handler = handler.set_queue_length(queue_length)
96  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
97 
98  # yield the thread to let QueueMessageHandler reach wait().
99  time.sleep(0.001)
100 
101  # send all messages at once.
102  # only the first and the last queue_length should get through,
103  # because the callbacks are blocked.
104  for x in msgs:
105  handler.handle_message(x)
106  # yield the thread so the first callback can append,
107  # otherwise the first handled value is non-deterministic.
108  time.sleep(0.001)
109 
110  # wait long enough for all the callbacks, and then some.
111  time.sleep(queue_length + 3)
112 
113  try:
114  self.assertEqual([msgs[0]] + msgs[-queue_length:], received["msgs"])
115  except:
116  handler.finish()
117  raise
118 
119  handler.finish()
120 
122  handler = subscribe.MessageHandler(None, self.dummy_cb)
123  self.help_test_queue_rate(handler, 50, 10)
124  handler.finish()
125 
126  # Helper methods for each of the three Handler types, plus one for Queue+Rate.
127  # Used in standalone testing as well as the test_transition_functionality test
128  def help_test_default(self, handler):
129  handler = handler.set_queue_length(0)
130  handler = handler.set_throttle_rate(0)
131  self.assertIsInstance(handler, subscribe.MessageHandler)
132 
133  msg = "test_default_message_handler"
134  received = {"msg": None}
135 
136  def cb(msg):
137  received["msg"] = msg
138  handler.publish = cb
139 
140  self.assertTrue(handler.time_remaining() == 0)
141  t1 = time.time()
142  handler.handle_message(msg)
143  t2 = time.time()
144 
145  self.assertEqual(received["msg"], msg)
146  self.assertLessEqual(t1, handler.last_publish)
147  self.assertLessEqual(handler.last_publish, t2)
148  self.assertEqual(handler.time_remaining(), 0)
149 
150  received = {"msgs": []}
151  def cb(msg):
152  received["msgs"].append(msg)
153  handler.publish = cb
154  xs = list(range(10000))
155  for x in xs:
156  handler.handle_message(x)
157 
158  self.assertEqual(received["msgs"], xs)
159  return handler
160 
161  def help_test_throttle(self, handler, throttle_rate):
162  handler = handler.set_queue_length(0)
163  handler = handler.set_throttle_rate(throttle_rate)
164  self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
165 
166  msg = "test_throttle_message_handler"
167 
168  # First, try with a single message
169  received = {"msg": None}
170 
171  def cb(msg):
172  received["msg"] = msg
173 
174  handler.publish = cb
175 
176  # ensure the handler doesn't swallow this message
177  time.sleep(2.0 * handler.throttle_rate)
178  handler.handle_message(msg)
179  self.assertEqual(received["msg"], msg)
180 
181  # sleep to make sure the handler sends right away for the second part
182  time.sleep(2.0 * handler.throttle_rate)
183 
184  received = {"msgs": []}
185  def cb(msg):
186  received["msgs"].append(msg)
187 
188  handler.publish = cb
189  x = 0
190  time_padding = handler.throttle_rate / 4.0
191  for i in range(1, 10):
192  # We guarantee that in the while loop below only the first message is handled
193  # All subsequent messages (within throttling window - time_padding ) are dropped
194  # Time padding is a test-only hack around race condition when time.time() - fin is within
195  # the throttling window, but handler.handle_message(x) gets a later timestamp that is outside.
196  time.sleep(2.0 * time_padding)
197  fin = time.time() + throttle_rate / 1000.0 - time_padding
198  while time.time() < fin:
199  handler.handle_message(x)
200  x = x + 1
201  self.assertEqual(len(received["msgs"]), i)
202  return handler
203 
204  def help_test_queue(self, handler, queue_length):
205  handler = handler.set_queue_length(queue_length)
206  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
207 
208  received = {"msgs": []}
209 
210  def cb(msg):
211  received["msgs"].append(msg)
212 
213  handler.publish = cb
214 
215  msgs = list(range(queue_length))
216  for x in msgs:
217  handler.handle_message(x)
218 
219  time.sleep(0.1)
220 
221  self.assertEqual(msgs, received["msgs"])
222  return handler
223 
224  def help_test_queue_rate(self, handler, throttle_rate, queue_length):
225  handler = handler.set_throttle_rate(throttle_rate)
226  handler = handler.set_queue_length(queue_length)
227  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
228 
229  received = {"msg": None}
230 
231  def cb(msg):
232  received["msg"] = msg
233 
234  handler.publish = cb
235 
236  throttle_rate_sec = throttle_rate / 1000.0
237 
238  # ensure previous tests' last sent time is long enough ago
239  time.sleep(throttle_rate_sec)
240  for x in range(queue_length):
241  handler.handle_message(x)
242 
243  time.sleep(throttle_rate_sec / 2.0)
244 
245  try:
246  for x in range(10):
247  self.assertEqual(x, received["msg"])
248  time.sleep(throttle_rate_sec)
249  except:
250  handler.finish()
251  raise
252 
253  return handler
254 
255 # Test that each transition works and is stable
256  def test_transitions(self):
257  # MessageHandler.transition is stable
258  handler = subscribe.MessageHandler(None, self.dummy_cb)
259  next_handler = handler.transition()
260  self.assertEqual(handler, next_handler)
261 
262  # Going from MessageHandler to ThrottleMessageHandler...
263  handler = subscribe.MessageHandler(None, self.dummy_cb)
264  next_handler = handler.set_throttle_rate(100)
265  self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
266  handler = next_handler
267  # Testing transition returns another ThrottleMessageHandler
268  next_handler = handler.transition()
269  self.assertEqual(handler, next_handler)
270  # And finally going back to MessageHandler
271  next_handler = handler.set_throttle_rate(0)
272  self.assertIsInstance(next_handler, subscribe.MessageHandler)
273 
274  # Same for QueueMessageHandler
275  handler = subscribe.MessageHandler(None, self.dummy_cb)
276  next_handler = handler.set_queue_length(100)
277  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
278  handler = next_handler
279  next_handler = handler.transition()
280  self.assertEqual(handler, next_handler)
281  next_handler = handler.set_queue_length(0)
282  self.assertIsInstance(next_handler, subscribe.MessageHandler)
283 
284  # Checking a QueueMessageHandler with rate limit can be generated both ways
285  handler = subscribe.MessageHandler(None, self.dummy_cb)
286  next_handler = handler.set_queue_length(100).set_throttle_rate(100)
287  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
288  next_handler.finish()
289  next_handler = handler.set_throttle_rate(100).set_queue_length(100)
290  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
291  next_handler.finish()
292  handler = next_handler
293  next_handler = handler.transition()
294  self.assertEqual(handler, next_handler)
295  # Check both steps on the way back to plain MessageHandler
296  next_handler = handler.set_throttle_rate(0)
297  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
298  next_handler = handler.set_queue_length(0)
299  self.assertIsInstance(next_handler, subscribe.MessageHandler)
300 
302  # Test individually
303  handler = subscribe.MessageHandler(None, None)
304  handler = self.help_test_queue(handler, 10)
305  handler.finish()
306 
307  handler = subscribe.MessageHandler(None, None)
308  handler = self.help_test_throttle(handler, 50)
309  handler.finish()
310 
311  handler = subscribe.MessageHandler(None, None)
312  handler = self.help_test_default(handler)
313  handler.finish()
314 
315  # Test combinations
316  handler = subscribe.MessageHandler(None, None)
317  handler = self.help_test_queue(handler, 10)
318  handler = self.help_test_throttle(handler, 50)
319  handler = self.help_test_default(handler)
320  handler.finish()
321 
322  handler = subscribe.MessageHandler(None, None)
323  handler = self.help_test_queue(handler, 10)
324  handler = self.help_test_default(handler)
325  handler = self.help_test_throttle(handler, 50)
326  handler.finish()
327 
328  handler = subscribe.MessageHandler(None, None)
329  handler = self.help_test_throttle(handler, 50)
330  handler = self.help_test_queue_rate(handler, 50, 10)
331  handler = self.help_test_default(handler)
332  handler.finish()
333 
334  handler = subscribe.MessageHandler(None, None)
335  handler = self.help_test_throttle(handler, 50)
336  handler = self.help_test_default(handler)
337  handler = self.help_test_queue_rate(handler, 50, 10)
338  handler.finish()
339 
340  handler = subscribe.MessageHandler(None, None)
341  handler = self.help_test_default(handler)
342  handler = self.help_test_throttle(handler, 50)
343  handler = self.help_test_queue_rate(handler, 50, 10)
344  handler.finish()
345 
346  handler = subscribe.MessageHandler(None, None)
347  handler = self.help_test_default(handler)
348  handler = self.help_test_queue(handler, 10)
349  handler = self.help_test_throttle(handler, 50)
350  handler.finish()
351 
352  # Test duplicates
353  handler = subscribe.MessageHandler(None, None)
354  handler = self.help_test_queue_rate(handler, 50, 10)
355  handler = self.help_test_queue_rate(handler, 100, 10)
356  handler.finish()
357 
358  handler = subscribe.MessageHandler(None, None)
359  handler = self.help_test_throttle(handler, 50)
360  handler = self.help_test_throttle(handler, 100)
361  handler.finish()
362 
363  handler = subscribe.MessageHandler(None, None)
364  handler = self.help_test_default(handler)
365  handler = self.help_test_default(handler)
366  handler.finish()
367 
368 
369 # handler = self.help_test_throttle(handler, 50)
370 # handler = self.help_test_default(handler)
371 # handler = self.help_test_throttle(handler, 50)
372 # handler = self.help_test_default(handler)
373 # handler = self.help_test_throttle(handler, 50)
374 
375 
376 PKG = 'rosbridge_library'
377 NAME = 'test_message_handlers'
378 if __name__ == '__main__':
379  rostest.unitrun(PKG, NAME, TestMessageHandlers)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14