test_subscription_modifiers.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 import time
7 
8 from rosbridge_library.internal import subscription_modifiers as subscribe
9 
10 
11 class TestMessageHandlers(unittest.TestCase):
12 
13  def setUp(self):
14  rospy.init_node("test_message_handlers")
15 
16  def dummy_cb(self, msg):
17  pass
18 
20  handler = subscribe.MessageHandler(None, self.dummy_cb)
21  self.help_test_default(handler)
22 
24  handler = subscribe.ThrottleMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
25  self.help_test_throttle(handler, 50)
26 
28  handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
29  self.help_test_queue(handler, 1000)
30  handler.finish()
31 
33  received = {"msgs": []}
34 
35  def cb(msg):
36  received["msgs"].append(msg)
37 
38  handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
39 
40  self.assertTrue(handler.is_alive())
41 
42  handler.finish()
43 
44  self.assertFalse(handler.is_alive())
45 
47  received = {"msgs": []}
48 
49  def cb(msg):
50  received["msgs"].append(msg)
51 
52  msgs = range(1000)
53 
54  handler = subscribe.MessageHandler(None, cb)
55 
56  handler = handler.set_throttle_rate(10000)
57  handler = handler.set_queue_length(10)
58  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
59 
60  # 'hello' is handled immediately
61  handler.handle_message("hello")
62  time.sleep(0.02)
63  # queue is now empty, but throttling is in effect
64  # no messages will be handled in the next 10 seconds
65 
66  # these will fill up the queue, with newer values displacing old ones
67  # nothing gets sent because the throttle rate
68  for x in msgs:
69  handler.handle_message(x)
70 
71  handler = handler.set_throttle_rate(0)
72 
73  time.sleep(0.1)
74 
75  try:
76  self.assertEqual(["hello"] + list(range(990, 1000)), received["msgs"])
77  except:
78  handler.finish()
79  raise
80 
81  handler.finish()
82 
84  handler = subscribe.MessageHandler(None, self.dummy_cb)
85  self.help_test_queue_rate(handler, 50, 10)
86  handler.finish()
87 
88  # Helper methods for each of the three Handler types, plus one for Queue+Rate.
89  # Used in standalone testing as well as the test_transition_functionality test
90  def help_test_default(self, handler):
91  handler = handler.set_queue_length(0)
92  handler = handler.set_throttle_rate(0)
93  self.assertIsInstance(handler, subscribe.MessageHandler)
94 
95  msg = "test_default_message_handler"
96  received = {"msg": None}
97 
98  def cb(msg):
99  received["msg"] = msg
100  handler.publish = cb
101 
102  self.assertTrue(handler.time_remaining() == 0)
103  t1 = time.time()
104  handler.handle_message(msg)
105  t2 = time.time()
106 
107  self.assertEqual(received["msg"], msg)
108  self.assertLessEqual(t1, handler.last_publish)
109  self.assertLessEqual(handler.last_publish, t2)
110  self.assertEqual(handler.time_remaining(), 0)
111 
112  received = {"msgs": []}
113  def cb(msg):
114  received["msgs"].append(msg)
115  handler.publish = cb
116  xs = list(range(10000))
117  for x in xs:
118  handler.handle_message(x)
119 
120  self.assertEqual(received["msgs"], xs)
121  return handler
122 
123  def help_test_throttle(self, handler, throttle_rate):
124  handler = handler.set_queue_length(0)
125  handler = handler.set_throttle_rate(throttle_rate)
126  self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
127 
128  msg = "test_throttle_message_handler"
129 
130  # First, try with a single message
131  received = {"msg": None}
132 
133  def cb(msg):
134  received["msg"] = msg
135 
136  handler.publish = cb
137 
138  # ensure the handler doesn't swallow this message
139  time.sleep(2.0 * handler.throttle_rate)
140  handler.handle_message(msg)
141  self.assertEqual(received["msg"], msg)
142 
143  # sleep to make sure the handler sends right away for the second part
144  time.sleep(2.0 * handler.throttle_rate)
145 
146  received = {"msgs": []}
147  def cb(msg):
148  received["msgs"].append(msg)
149 
150  handler.publish = cb
151  x = 0
152  time_padding = handler.throttle_rate / 4.0
153  for i in range(1, 10):
154  # We guarantee that in the while loop below only the first message is handled
155  # All subsequent messages (within throttling window - time_padding ) are dropped
156  # Time padding is a test-only hack around race condition when time.time() - fin is within
157  # the throttling window, but handler.handle_message(x) gets a later timestamp that is outside.
158  time.sleep(2.0 * time_padding)
159  fin = time.time() + throttle_rate / 1000.0 - time_padding
160  while time.time() < fin:
161  handler.handle_message(x)
162  x = x + 1
163  self.assertEqual(len(received["msgs"]), i)
164  return handler
165 
166  def help_test_queue(self, handler, queue_length):
167  handler = handler.set_queue_length(queue_length)
168  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
169 
170  received = {"msgs": []}
171 
172  def cb(msg):
173  received["msgs"].append(msg)
174 
175  handler.publish = cb
176 
177  msgs = list(range(queue_length))
178  for x in msgs:
179  handler.handle_message(x)
180 
181  time.sleep(0.1)
182 
183  self.assertEqual(msgs, received["msgs"])
184  return handler
185 
186  def help_test_queue_rate(self, handler, throttle_rate, queue_length):
187  handler = handler.set_throttle_rate(throttle_rate)
188  handler = handler.set_queue_length(queue_length)
189  self.assertIsInstance(handler, subscribe.QueueMessageHandler)
190 
191  received = {"msg": None}
192 
193  def cb(msg):
194  received["msg"] = msg
195 
196  handler.publish = cb
197 
198  throttle_rate_sec = throttle_rate / 1000.0
199 
200  # ensure previous tests' last sent time is long enough ago
201  time.sleep(throttle_rate_sec)
202  for x in range(queue_length):
203  handler.handle_message(x)
204 
205  time.sleep(throttle_rate_sec / 2.0)
206 
207  try:
208  for x in range(10):
209  self.assertEqual(x, received["msg"])
210  time.sleep(throttle_rate_sec)
211  except:
212  handler.finish()
213  raise
214 
215  return handler
216 
217 # Test that each transition works and is stable
218  def test_transitions(self):
219  # MessageHandler.transition is stable
220  handler = subscribe.MessageHandler(None, self.dummy_cb)
221  next_handler = handler.transition()
222  self.assertEqual(handler, next_handler)
223 
224  # Going from MessageHandler to ThrottleMessageHandler...
225  handler = subscribe.MessageHandler(None, self.dummy_cb)
226  next_handler = handler.set_throttle_rate(100)
227  self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
228  handler = next_handler
229  # Testing transition returns another ThrottleMessageHandler
230  next_handler = handler.transition()
231  self.assertEqual(handler, next_handler)
232  # And finally going back to MessageHandler
233  next_handler = handler.set_throttle_rate(0)
234  self.assertIsInstance(next_handler, subscribe.MessageHandler)
235 
236  # Same for QueueMessageHandler
237  handler = subscribe.MessageHandler(None, self.dummy_cb)
238  next_handler = handler.set_queue_length(100)
239  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
240  handler = next_handler
241  next_handler = handler.transition()
242  self.assertEqual(handler, next_handler)
243  next_handler = handler.set_queue_length(0)
244  self.assertIsInstance(next_handler, subscribe.MessageHandler)
245 
246  # Checking a QueueMessageHandler with rate limit can be generated both ways
247  handler = subscribe.MessageHandler(None, self.dummy_cb)
248  next_handler = handler.set_queue_length(100).set_throttle_rate(100)
249  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
250  next_handler.finish()
251  next_handler = handler.set_throttle_rate(100).set_queue_length(100)
252  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
253  next_handler.finish()
254  handler = next_handler
255  next_handler = handler.transition()
256  self.assertEqual(handler, next_handler)
257  # Check both steps on the way back to plain MessageHandler
258  next_handler = handler.set_throttle_rate(0)
259  self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
260  next_handler = handler.set_queue_length(0)
261  self.assertIsInstance(next_handler, subscribe.MessageHandler)
262 
264  # Test individually
265  handler = subscribe.MessageHandler(None, None)
266  handler = self.help_test_queue(handler, 10)
267  handler.finish()
268 
269  handler = subscribe.MessageHandler(None, None)
270  handler = self.help_test_throttle(handler, 50)
271  handler.finish()
272 
273  handler = subscribe.MessageHandler(None, None)
274  handler = self.help_test_default(handler)
275  handler.finish()
276 
277  # Test combinations
278  handler = subscribe.MessageHandler(None, None)
279  handler = self.help_test_queue(handler, 10)
280  handler = self.help_test_throttle(handler, 50)
281  handler = self.help_test_default(handler)
282  handler.finish()
283 
284  handler = subscribe.MessageHandler(None, None)
285  handler = self.help_test_queue(handler, 10)
286  handler = self.help_test_default(handler)
287  handler = self.help_test_throttle(handler, 50)
288  handler.finish()
289 
290  handler = subscribe.MessageHandler(None, None)
291  handler = self.help_test_throttle(handler, 50)
292  handler = self.help_test_queue_rate(handler, 50, 10)
293  handler = self.help_test_default(handler)
294  handler.finish()
295 
296  handler = subscribe.MessageHandler(None, None)
297  handler = self.help_test_throttle(handler, 50)
298  handler = self.help_test_default(handler)
299  handler = self.help_test_queue_rate(handler, 50, 10)
300  handler.finish()
301 
302  handler = subscribe.MessageHandler(None, None)
303  handler = self.help_test_default(handler)
304  handler = self.help_test_throttle(handler, 50)
305  handler = self.help_test_queue_rate(handler, 50, 10)
306  handler.finish()
307 
308  handler = subscribe.MessageHandler(None, None)
309  handler = self.help_test_default(handler)
310  handler = self.help_test_queue(handler, 10)
311  handler = self.help_test_throttle(handler, 50)
312  handler.finish()
313 
314  # Test duplicates
315  handler = subscribe.MessageHandler(None, None)
316  handler = self.help_test_queue_rate(handler, 50, 10)
317  handler = self.help_test_queue_rate(handler, 100, 10)
318  handler.finish()
319 
320  handler = subscribe.MessageHandler(None, None)
321  handler = self.help_test_throttle(handler, 50)
322  handler = self.help_test_throttle(handler, 100)
323  handler.finish()
324 
325  handler = subscribe.MessageHandler(None, None)
326  handler = self.help_test_default(handler)
327  handler = self.help_test_default(handler)
328  handler.finish()
329 
330 
331 # handler = self.help_test_throttle(handler, 50)
332 # handler = self.help_test_default(handler)
333 # handler = self.help_test_throttle(handler, 50)
334 # handler = self.help_test_default(handler)
335 # handler = self.help_test_throttle(handler, 50)
336 
337 
338 PKG = 'rosbridge_library'
339 NAME = 'test_message_handlers'
340 if __name__ == '__main__':
341  rostest.unitrun(PKG, NAME, TestMessageHandlers)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri May 10 2019 02:17:02