test_subscription_modifiers.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 import time
7 
8 from rosbridge_library.internal import subscription_modifiers as subscribe
9 
10 
11 PKG = 'rosbridge_library'
12 NAME = 'test_message_handlers'
13 
14 
15 class TestMessageHandlers(unittest.TestCase):
16 
17  def setUp(self):
18  rospy.init_node(NAME)
19 
20  def dummy_cb(self, msg):
21  pass
22 
24  handler = subscribe.MessageHandler(None, self.dummy_cb)
25  self.help_test_default(handler)
26 
28  handler = subscribe.ThrottleMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
29  self.help_test_throttle(handler, 50)
30 
32  handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
33  self.help_test_queue(handler, 1000)
34  handler.finish()
35 
37  received = {"msgs": []}
38 
39  def cb(msg):
40  received["msgs"].append(msg)
41 
42  handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
43 
44  self.assertTrue(handler.is_alive())
45 
46  handler.finish()
47 
48  self.assertFalse(handler.is_alive())
49 
51  received = {"msgs": []}
52 
53  def cb(msg):
54  received["msgs"].append(msg)
55 
56  msgs = range(1000)
57 
58  handler = subscribe.MessageHandler(None, cb)
59 
60  handler = handler.set_throttle_rate(10000)
61  handler = handler.set_queue_length(10)
62  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
63 
64  # 'hello' is handled immediately
65  handler.handle_message("hello")
66  time.sleep(0.02)
67  # queue is now empty, but throttling is in effect
68  # no messages will be handled in the next 10 seconds
69 
70  # these will fill up the queue, with newer values displacing old ones
71  # nothing gets sent because the throttle rate
72  for x in msgs:
73  handler.handle_message(x)
74 
75  handler = handler.set_throttle_rate(0)
76 
77  time.sleep(0.1)
78 
79  try:
80  self.assertEqual(["hello"] + list(range(990, 1000)), received["msgs"])
81  except:
82  handler.finish()
83  raise
84 
85  handler.finish()
86 
88  received = {"msgs": []}
89 
90  def cb(msg):
91  received["msgs"].append(msg)
92  time.sleep(1)
93 
94  queue_length = 5
95  msgs = list(range(queue_length * 5))
96 
97  handler = subscribe.MessageHandler(None, cb)
98 
99  handler = handler.set_queue_length(queue_length)
100  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
101 
102  # yield the thread to let QueueMessageHandler reach wait().
103  time.sleep(0.001)
104 
105  # send all messages at once.
106  # only the first and the last queue_length should get through,
107  # because the callbacks are blocked.
108  for x in msgs:
109  handler.handle_message(x)
110  # yield the thread so the first callback can append,
111  # otherwise the first handled value is non-deterministic.
112  time.sleep(0.001)
113 
114  # wait long enough for all the callbacks, and then some.
115  time.sleep(queue_length + 3)
116 
117  try:
118  self.assertEqual([msgs[0]] + msgs[-queue_length:], received["msgs"])
119  except:
120  handler.finish()
121  raise
122 
123  handler.finish()
124 
126  handler = subscribe.MessageHandler(None, self.dummy_cb)
127  self.help_test_queue_rate(handler, 50, 10)
128  handler.finish()
129 
130  # Helper methods for each of the three Handler types, plus one for Queue+Rate.
131  # Used in standalone testing as well as the test_transition_functionality test
132  def help_test_default(self, handler):
133  handler = handler.set_queue_length(0)
134  handler = handler.set_throttle_rate(0)
135  self.assertIsInstance(handler, subscribe.MessageHandler)
136 
137  msg = "test_default_message_handler"
138  received = {"msg": None}
139 
140  def cb(msg):
141  received["msg"] = msg
142  handler.publish = cb
143 
144  self.assertTrue(handler.time_remaining() == 0)
145  t1 = time.time()
146  handler.handle_message(msg)
147  t2 = time.time()
148 
149  self.assertEqual(received["msg"], msg)
150  self.assertLessEqual(t1, handler.last_publish)
151  self.assertLessEqual(handler.last_publish, t2)
152  self.assertEqual(handler.time_remaining(), 0)
153 
154  received = {"msgs": []}
155  def cb(msg):
156  received["msgs"].append(msg)
157  handler.publish = cb
158  xs = list(range(10000))
159  for x in xs:
160  handler.handle_message(x)
161 
162  self.assertEqual(received["msgs"], xs)
163  return handler
164 
165  def help_test_throttle(self, handler, throttle_rate):
166  handler = handler.set_queue_length(0)
167  handler = handler.set_throttle_rate(throttle_rate)
168  self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
169 
170  msg = "test_throttle_message_handler"
171 
172  # First, try with a single message
173  received = {"msg": None}
174 
175  def cb(msg):
176  received["msg"] = msg
177 
178  handler.publish = cb
179 
180  # ensure the handler doesn't swallow this message
181  time.sleep(2.0 * handler.throttle_rate)
182  handler.handle_message(msg)
183  self.assertEqual(received["msg"], msg)
184 
185  # sleep to make sure the handler sends right away for the second part
186  time.sleep(2.0 * handler.throttle_rate)
187 
188  received = {"msgs": []}
189  def cb(msg):
190  received["msgs"].append(msg)
191 
192  handler.publish = cb
193  x = 0
194  time_padding = handler.throttle_rate / 4.0
195  for i in range(1, 10):
196  # We guarantee that in the while loop below only the first message is handled
197  # All subsequent messages (within throttling window - time_padding ) are dropped
198  # Time padding is a test-only hack around race condition when time.time() - fin is within
199  # the throttling window, but handler.handle_message(x) gets a later timestamp that is outside.
200  time.sleep(2.0 * time_padding)
201  fin = time.time() + throttle_rate / 1000.0 - time_padding
202  while time.time() < fin:
203  handler.handle_message(x)
204  x = x + 1
205  self.assertEqual(len(received["msgs"]), i)
206  return handler
207 
208  def help_test_queue(self, handler, queue_length):
209  handler = handler.set_queue_length(queue_length)
210  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
211 
212  received = {"msgs": []}
213 
214  def cb(msg):
215  received["msgs"].append(msg)
216 
217  handler.publish = cb
218 
219  msgs = list(range(queue_length))
220  for x in msgs:
221  handler.handle_message(x)
222 
223  time.sleep(0.1)
224 
225  self.assertEqual(msgs, received["msgs"])
226  return handler
227 
228  def help_test_queue_rate(self, handler, throttle_rate, queue_length):
229  handler = handler.set_throttle_rate(throttle_rate)
230  handler = handler.set_queue_length(queue_length)
231  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
232 
233  received = {"msg": None}
234 
235  def cb(msg):
236  received["msg"] = msg
237 
238  handler.publish = cb
239 
240  throttle_rate_sec = throttle_rate / 1000.0
241 
242  # ensure previous tests' last sent time is long enough ago
243  time.sleep(throttle_rate_sec)
244  for x in range(queue_length):
245  handler.handle_message(x)
246 
247  time.sleep(throttle_rate_sec / 2.0)
248 
249  try:
250  for x in range(10):
251  self.assertEqual(x, received["msg"])
252  time.sleep(throttle_rate_sec)
253  except:
254  handler.finish()
255  raise
256 
257  return handler
258 
259 # Test that each transition works and is stable
260  def test_transitions(self):
261  # MessageHandler.transition is stable
262  handler = subscribe.MessageHandler(None, self.dummy_cb)
263  next_handler = handler.transition()
264  self.assertEqual(handler, next_handler)
265 
266  # Going from MessageHandler to ThrottleMessageHandler...
267  handler = subscribe.MessageHandler(None, self.dummy_cb)
268  next_handler = handler.set_throttle_rate(100)
269  self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
270  handler = next_handler
271  # Testing transition returns another ThrottleMessageHandler
272  next_handler = handler.transition()
273  self.assertEqual(handler, next_handler)
274  # And finally going back to MessageHandler
275  next_handler = handler.set_throttle_rate(0)
276  self.assertIsInstance(next_handler, subscribe.MessageHandler)
277 
278  # Same for QueueMessageHandler
279  handler = subscribe.MessageHandler(None, self.dummy_cb)
280  next_handler = handler.set_queue_length(100)
281  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
282  handler = next_handler
283  next_handler = handler.transition()
284  self.assertEqual(handler, next_handler)
285  next_handler = handler.set_queue_length(0)
286  self.assertIsInstance(next_handler, subscribe.MessageHandler)
287 
288  # Checking a QueueMessageHandler with rate limit can be generated both ways
289  handler = subscribe.MessageHandler(None, self.dummy_cb)
290  next_handler = handler.set_queue_length(100).set_throttle_rate(100)
291  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
292  next_handler.finish()
293  next_handler = handler.set_throttle_rate(100).set_queue_length(100)
294  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
295  next_handler.finish()
296  handler = next_handler
297  next_handler = handler.transition()
298  self.assertEqual(handler, next_handler)
299  # Check both steps on the way back to plain MessageHandler
300  next_handler = handler.set_throttle_rate(0)
301  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
302  next_handler = handler.set_queue_length(0)
303  self.assertIsInstance(next_handler, subscribe.MessageHandler)
304 
306  # Test individually
307  handler = subscribe.MessageHandler(None, None)
308  handler = self.help_test_queue(handler, 10)
309  handler.finish()
310 
311  handler = subscribe.MessageHandler(None, None)
312  handler = self.help_test_throttle(handler, 50)
313  handler.finish()
314 
315  handler = subscribe.MessageHandler(None, None)
316  handler = self.help_test_default(handler)
317  handler.finish()
318 
319  # Test combinations
320  handler = subscribe.MessageHandler(None, None)
321  handler = self.help_test_queue(handler, 10)
322  handler = self.help_test_throttle(handler, 50)
323  handler = self.help_test_default(handler)
324  handler.finish()
325 
326  handler = subscribe.MessageHandler(None, None)
327  handler = self.help_test_queue(handler, 10)
328  handler = self.help_test_default(handler)
329  handler = self.help_test_throttle(handler, 50)
330  handler.finish()
331 
332  handler = subscribe.MessageHandler(None, None)
333  handler = self.help_test_throttle(handler, 50)
334  handler = self.help_test_queue_rate(handler, 50, 10)
335  handler = self.help_test_default(handler)
336  handler.finish()
337 
338  handler = subscribe.MessageHandler(None, None)
339  handler = self.help_test_throttle(handler, 50)
340  handler = self.help_test_default(handler)
341  handler = self.help_test_queue_rate(handler, 50, 10)
342  handler.finish()
343 
344  handler = subscribe.MessageHandler(None, None)
345  handler = self.help_test_default(handler)
346  handler = self.help_test_throttle(handler, 50)
347  handler = self.help_test_queue_rate(handler, 50, 10)
348  handler.finish()
349 
350  handler = subscribe.MessageHandler(None, None)
351  handler = self.help_test_default(handler)
352  handler = self.help_test_queue(handler, 10)
353  handler = self.help_test_throttle(handler, 50)
354  handler.finish()
355 
356  # Test duplicates
357  handler = subscribe.MessageHandler(None, None)
358  handler = self.help_test_queue_rate(handler, 50, 10)
359  handler = self.help_test_queue_rate(handler, 100, 10)
360  handler.finish()
361 
362  handler = subscribe.MessageHandler(None, None)
363  handler = self.help_test_throttle(handler, 50)
364  handler = self.help_test_throttle(handler, 100)
365  handler.finish()
366 
367  handler = subscribe.MessageHandler(None, None)
368  handler = self.help_test_default(handler)
369  handler = self.help_test_default(handler)
370  handler.finish()
371 
372 
373 # handler = self.help_test_throttle(handler, 50)
374 # handler = self.help_test_default(handler)
375 # handler = self.help_test_throttle(handler, 50)
376 # handler = self.help_test_default(handler)
377 # handler = self.help_test_throttle(handler, 50)
378 
379 
380 if __name__ == '__main__':
381  rosunit.unitrun(PKG, NAME, TestMessageHandlers)
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.test_queue_message_handler_passes_msgs
def test_queue_message_handler_passes_msgs(self)
Definition: test_subscription_modifiers.py:31
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.test_transitions
def test_transitions(self)
Definition: test_subscription_modifiers.py:260
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.help_test_default
def help_test_default(self, handler)
Definition: test_subscription_modifiers.py:132
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.test_transition_functionality
def test_transition_functionality(self)
Definition: test_subscription_modifiers.py:305
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers
Definition: test_subscription_modifiers.py:15
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.test_default_message_handler
def test_default_message_handler(self)
Definition: test_subscription_modifiers.py:23
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.help_test_queue
def help_test_queue(self, handler, queue_length)
Definition: test_subscription_modifiers.py:208
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.test_queue_message_handler_queue
def test_queue_message_handler_queue(self)
Definition: test_subscription_modifiers.py:50
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.help_test_throttle
def help_test_throttle(self, handler, throttle_rate)
Definition: test_subscription_modifiers.py:165
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.dummy_cb
def dummy_cb(self, msg)
Definition: test_subscription_modifiers.py:20
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.test_queue_message_handler_rate
def test_queue_message_handler_rate(self)
Definition: test_subscription_modifiers.py:125
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.test_queue_message_handler_dropping
def test_queue_message_handler_dropping(self)
Definition: test_subscription_modifiers.py:87
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.test_throttle_message_handler
def test_throttle_message_handler(self)
Definition: test_subscription_modifiers.py:27
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.test_queue_message_handler_stops
def test_queue_message_handler_stops(self)
Definition: test_subscription_modifiers.py:36
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.help_test_queue_rate
def help_test_queue_rate(self, handler, throttle_rate, queue_length)
Definition: test_subscription_modifiers.py:228
rosbridge_library.internal
Definition: src/rosbridge_library/internal/__init__.py:1
test.internal.subscribers.test_subscription_modifiers.TestMessageHandlers.setUp
def setUp(self)
Definition: test_subscription_modifiers.py:17


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Tue Oct 3 2023 02:12:45