14 rospy.init_node(
"test_message_handlers")
20 handler = subscribe.MessageHandler(
None, self.dummy_cb)
21 self.help_test_default(handler)
24 handler = subscribe.ThrottleMessageHandler(subscribe.MessageHandler(
None, self.
dummy_cb))
28 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(
None, self.
dummy_cb))
33 received = {
"msgs": []}
36 received[
"msgs"].append(msg)
38 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(
None, cb))
40 self.assertTrue(handler.is_alive())
44 self.assertFalse(handler.is_alive())
47 received = {
"msgs": []}
50 received[
"msgs"].append(msg)
54 handler = subscribe.MessageHandler(
None, cb)
56 handler = handler.set_throttle_rate(10000)
57 handler = handler.set_queue_length(10)
58 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
61 handler.handle_message(
"hello")
69 handler.handle_message(x)
71 handler = handler.set_throttle_rate(0)
76 self.assertEqual([
"hello"] + list(range(990, 1000)), received[
"msgs"])
84 received = {
"msgs": []}
87 received[
"msgs"].append(msg)
91 msgs = range(queue_length * 5)
93 handler = subscribe.MessageHandler(
None, cb)
95 handler = handler.set_queue_length(queue_length)
96 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
105 handler.handle_message(x)
111 time.sleep(queue_length + 3)
114 self.assertEqual([msgs[0]] + msgs[-queue_length:], received[
"msgs"])
122 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
129 handler = handler.set_queue_length(0)
130 handler = handler.set_throttle_rate(0)
131 self.assertIsInstance(handler, subscribe.MessageHandler)
133 msg =
"test_default_message_handler" 134 received = {
"msg":
None}
137 received[
"msg"] = msg
140 self.assertTrue(handler.time_remaining() == 0)
142 handler.handle_message(msg)
145 self.assertEqual(received[
"msg"], msg)
146 self.assertLessEqual(t1, handler.last_publish)
147 self.assertLessEqual(handler.last_publish, t2)
148 self.assertEqual(handler.time_remaining(), 0)
150 received = {
"msgs": []}
152 received[
"msgs"].append(msg)
154 xs = list(range(10000))
156 handler.handle_message(x)
158 self.assertEqual(received[
"msgs"], xs)
162 handler = handler.set_queue_length(0)
163 handler = handler.set_throttle_rate(throttle_rate)
164 self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
166 msg =
"test_throttle_message_handler" 169 received = {
"msg":
None}
172 received[
"msg"] = msg
177 time.sleep(2.0 * handler.throttle_rate)
178 handler.handle_message(msg)
179 self.assertEqual(received[
"msg"], msg)
182 time.sleep(2.0 * handler.throttle_rate)
184 received = {
"msgs": []}
186 received[
"msgs"].append(msg)
190 time_padding = handler.throttle_rate / 4.0
191 for i
in range(1, 10):
196 time.sleep(2.0 * time_padding)
197 fin = time.time() + throttle_rate / 1000.0 - time_padding
198 while time.time() < fin:
199 handler.handle_message(x)
201 self.assertEqual(len(received[
"msgs"]), i)
205 handler = handler.set_queue_length(queue_length)
206 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
208 received = {
"msgs": []}
211 received[
"msgs"].append(msg)
215 msgs = list(range(queue_length))
217 handler.handle_message(x)
221 self.assertEqual(msgs, received[
"msgs"])
225 handler = handler.set_throttle_rate(throttle_rate)
226 handler = handler.set_queue_length(queue_length)
227 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
229 received = {
"msg":
None}
232 received[
"msg"] = msg
236 throttle_rate_sec = throttle_rate / 1000.0
239 time.sleep(throttle_rate_sec)
240 for x
in range(queue_length):
241 handler.handle_message(x)
243 time.sleep(throttle_rate_sec / 2.0)
247 self.assertEqual(x, received[
"msg"])
248 time.sleep(throttle_rate_sec)
258 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
259 next_handler = handler.transition()
260 self.assertEqual(handler, next_handler)
263 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
264 next_handler = handler.set_throttle_rate(100)
265 self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
266 handler = next_handler
268 next_handler = handler.transition()
269 self.assertEqual(handler, next_handler)
271 next_handler = handler.set_throttle_rate(0)
272 self.assertIsInstance(next_handler, subscribe.MessageHandler)
275 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
276 next_handler = handler.set_queue_length(100)
277 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
278 handler = next_handler
279 next_handler = handler.transition()
280 self.assertEqual(handler, next_handler)
281 next_handler = handler.set_queue_length(0)
282 self.assertIsInstance(next_handler, subscribe.MessageHandler)
285 handler = subscribe.MessageHandler(
None, self.
dummy_cb)
286 next_handler = handler.set_queue_length(100).set_throttle_rate(100)
287 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
288 next_handler.finish()
289 next_handler = handler.set_throttle_rate(100).set_queue_length(100)
290 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
291 next_handler.finish()
292 handler = next_handler
293 next_handler = handler.transition()
294 self.assertEqual(handler, next_handler)
296 next_handler = handler.set_throttle_rate(0)
297 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
298 next_handler = handler.set_queue_length(0)
299 self.assertIsInstance(next_handler, subscribe.MessageHandler)
303 handler = subscribe.MessageHandler(
None,
None)
307 handler = subscribe.MessageHandler(
None,
None)
311 handler = subscribe.MessageHandler(
None,
None)
316 handler = subscribe.MessageHandler(
None,
None)
322 handler = subscribe.MessageHandler(
None,
None)
328 handler = subscribe.MessageHandler(
None,
None)
334 handler = subscribe.MessageHandler(
None,
None)
340 handler = subscribe.MessageHandler(
None,
None)
346 handler = subscribe.MessageHandler(
None,
None)
353 handler = subscribe.MessageHandler(
None,
None)
358 handler = subscribe.MessageHandler(
None,
None)
363 handler = subscribe.MessageHandler(
None,
None)
376 PKG =
'rosbridge_library' 377 NAME =
'test_message_handlers' 378 if __name__ ==
'__main__':
379 rostest.unitrun(PKG, NAME, TestMessageHandlers)
def test_queue_message_handler_passes_msgs(self)
def test_queue_message_handler_rate(self)
def help_test_queue(self, handler, queue_length)
def help_test_default(self, handler)
def help_test_throttle(self, handler, throttle_rate)
def test_queue_message_handler_queue(self)
def test_queue_message_handler_stops(self)
def test_default_message_handler(self)
def test_transition_functionality(self)
def test_transitions(self)
def help_test_queue_rate(self, handler, throttle_rate, queue_length)
def test_throttle_message_handler(self)
def test_queue_message_handler_dropping(self)