test_subscription_modifiers.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007 
00008 from rosbridge_library.internal import subscription_modifiers as subscribe
00009 
00010 
00011 class TestMessageHandlers(unittest.TestCase):
00012 
00013     def setUp(self):
00014         rospy.init_node("test_message_handlers")
00015 
00016     def dummy_cb(self, msg):
00017         pass
00018 
00019     def test_default_message_handler(self):
00020         msg = "test_default_message_handler"
00021         received = {"msg": None}
00022 
00023         def cb(msg):
00024             received["msg"] = msg
00025 
00026         handler = subscribe.MessageHandler(None, cb)
00027         self.assertTrue(handler.time_remaining() == 0)
00028         t1 = time.time()
00029         handler.handle_message(msg)
00030         t2 = time.time()
00031 
00032         self.assertEqual(received["msg"], msg)
00033         self.assertLessEqual(t1, handler.last_publish)
00034         self.assertLessEqual(handler.last_publish, t2)
00035         self.assertEqual(handler.time_remaining(), 0)
00036 
00037         handler = subscribe.MessageHandler(None, self.dummy_cb)
00038         next_handler = handler.transition()
00039         self.assertEqual(handler, next_handler)
00040 
00041         handler = subscribe.MessageHandler(None, self.dummy_cb)
00042         next_handler = handler.set_throttle_rate(100)
00043         self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
00044 
00045         handler = subscribe.MessageHandler(None, self.dummy_cb)
00046         next_handler = handler.set_queue_length(100)
00047         self.assertIsInstance(next_handler, subscribe.MessageHandler)
00048 
00049         handler = subscribe.MessageHandler(None, self.dummy_cb)
00050         next_handler = handler.set_queue_length(100).set_throttle_rate(100)
00051         self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00052         next_handler.finish()
00053 
00054         received = {"msgs": []}
00055 
00056         def cb(msg):
00057             received["msgs"].append(msg)
00058 
00059         handler.publish = cb
00060         vals = range(10000, 20000)
00061         for x in vals:
00062             handler.handle_message(x)
00063 
00064         self.assertEqual(vals, received["msgs"])
00065 
00066     def test_throttle_message_handler(self):
00067         msg = "test_throttle_message_handler"
00068 
00069         # First, try with a single message
00070         received = {"msg": None}
00071 
00072         def cb(msg):
00073             received["msg"] = msg
00074 
00075         handler = subscribe.MessageHandler(None, cb)
00076         handler = handler.set_throttle_rate(10)
00077         self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
00078         handler.handle_message(msg)
00079         self.assertEqual(received["msg"], msg)
00080 
00081         # Now, saturate with messages, see how many we get
00082         received = {"msgs": []}
00083 
00084         def cb(msg):
00085             received["msgs"].append(msg)
00086         handler.publish = cb
00087 
00088         numsent = 0
00089         last = -1
00090         for i in range(1, 10):
00091             start = time.time()
00092             while (time.time() - start < handler.throttle_rate):
00093                 handler.handle_message(numsent)
00094                 numsent = numsent + 1
00095 
00096             self.assertGreater(numsent, i)
00097             self.assertEqual(len(received["msgs"]), i)
00098             self.assertGreater(received["msgs"][-1], last)
00099             last = numsent
00100 
00101     def test_queue_message_handler_passes_msgs(self):
00102         received = {"msgs": []}
00103 
00104         def cb(msg):
00105             received["msgs"].append(msg)
00106 
00107         handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00108         handler.queue_length = 1000
00109 
00110         msgs = range(1000)
00111         for x in msgs:
00112             handler.handle_message(x)
00113 
00114         time.sleep(0.1)
00115         handler.finish()
00116 
00117         self.assertEqual(msgs, received["msgs"])
00118 
00119     def test_queue_message_handler_stops(self):
00120         received = {"msgs": []}
00121 
00122         def cb(msg):
00123             received["msgs"].append(msg)
00124 
00125         handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00126 
00127         self.assertTrue(handler.is_alive())
00128 
00129         handler.finish()
00130 
00131         self.assertFalse(handler.is_alive())
00132 
00133     def test_queue_message_handler_queue(self):
00134         received = {"msgs": []}
00135 
00136         def cb(msg):
00137             received["msgs"].append(msg)
00138 
00139         msgs = range(1000)
00140 
00141         handler = subscribe.MessageHandler(None, cb)
00142         handler.handle_message("hello")
00143         handler = handler.set_throttle_rate(10000)
00144         handler = handler.set_queue_length(10)
00145 
00146         for x in msgs:
00147             handler.handle_message(x)
00148 
00149         handler.set_throttle_rate(0)
00150 
00151         time.sleep(0.1)
00152 
00153         try:
00154             self.assertEqual(["hello"] + range(990, 1000), received["msgs"])
00155         except:
00156             handler.finish()
00157             raise
00158 
00159         handler.finish()
00160 
00161     def test_queue_message_handler_rate(self):
00162         received = {"msg": None}
00163 
00164         def cb(msg):
00165             received["msg"] = msg
00166 
00167         handler = subscribe.MessageHandler(None, cb)
00168         handler = handler.set_throttle_rate(50)
00169         handler = handler.set_queue_length(10)
00170 
00171         for x in range(10):
00172             handler.handle_message(x)
00173 
00174         time.sleep(0.025)
00175 
00176         try:
00177             for x in range(10):
00178                 self.assertEqual(x, received["msg"])
00179                 time.sleep(0.05)
00180         except:
00181             handler.finish()
00182             raise
00183 
00184         handler.finish()
00185 
00186     def test_transitions(self):
00187         def test_default(handler):
00188             handler = handler.set_queue_length(0)
00189             handler = handler.set_throttle_rate(0)
00190             received = {"msgs": []}
00191             def cb(msg):
00192                 received["msgs"].append(msg)
00193             handler.publish = cb
00194             xs = range(10000)
00195             for x in xs:
00196                 handler.handle_message(x)
00197 
00198             self.assertEqual(received["msgs"], xs)
00199             return handler
00200 
00201         def test_throttle(handler, throttle_rate):
00202             received = {"msgs": []}
00203             def cb(msg):
00204                 received["msgs"].append(msg)
00205             handler = handler.set_queue_length(0)
00206             handler = handler.set_throttle_rate(throttle_rate)
00207             handler.publish = cb
00208             x = 0
00209             time_padding = 0.01
00210             for i in range(1, 10):
00211                 # We guarantee that in the while loop below only the first message is handled
00212                 # All subsequent messages (within throttling window - time_padding ) are dropped
00213                 # Time padding is a test-only hack around race condition when time.time() - fin is within
00214                 # the throttling window, but handler.handle_message(x) gets a later timestamp that is outside.
00215                 time.sleep(2*time_padding)
00216                 fin = time.time() + throttle_rate / 1000.0 - time_padding
00217                 while time.time() < fin:
00218                     handler.handle_message(x)
00219                     x = x + 1
00220                 self.assertEqual(len(received["msgs"]), i)
00221             return handler
00222 
00223         def test_queue(handler, throttle_rate, queue_length):
00224             received = {"msgs": []}
00225 
00226             def cb(msg):
00227                 received["msgs"].append(msg)
00228 
00229             handler = handler.set_throttle_rate(throttle_rate)
00230             handler = handler.set_queue_length(queue_length)
00231 
00232             throttle_rate = throttle_rate / 1000.0
00233 
00234             time.sleep(throttle_rate + 0.01)
00235             handler.last_publish = time.time()
00236             last_msg = time.time() + throttle_rate / 4.0
00237 
00238             handler.publish = cb
00239 
00240             xs = range(1000)
00241             for x in xs:
00242                 handler.handle_message(x)
00243 
00244             try:
00245                 for i in range(0, queue_length - 1):
00246                     time.sleep(throttle_rate - time.time() + last_msg)
00247                     last_msg = time.time()
00248                     self.assertEqual(received["msgs"], xs[len(xs) - queue_length:len(xs) - queue_length + i + 1])
00249             except:
00250                 handler.finish()
00251                 raise
00252 
00253             return handler
00254 
00255         # Test individually
00256         handler = subscribe.MessageHandler(None, None)
00257         handler = test_queue(handler, 50, 10)
00258         handler.finish()
00259 
00260         handler = subscribe.MessageHandler(None, None)
00261         handler = test_throttle(handler, 50)
00262         handler.finish()
00263 
00264         handler = subscribe.MessageHandler(None, None)
00265         handler = test_default(handler)
00266         handler.finish()
00267 
00268         # Test combinations
00269         handler = subscribe.MessageHandler(None, None)
00270         handler = test_queue(handler, 50, 10)
00271         handler = test_throttle(handler, 50)
00272         handler = test_default(handler)
00273         handler.finish()
00274 
00275         handler = subscribe.MessageHandler(None, None)
00276         handler = test_queue(handler, 50, 10)
00277         handler = test_default(handler)
00278         handler = test_throttle(handler, 50)
00279         handler.finish()
00280 
00281         handler = subscribe.MessageHandler(None, None)
00282         handler = test_throttle(handler, 50)
00283         handler = test_queue(handler, 50, 10)
00284         handler = test_default(handler)
00285         handler.finish()
00286 
00287         handler = subscribe.MessageHandler(None, None)
00288         handler = test_throttle(handler, 50)
00289         handler = test_default(handler)
00290         handler = test_queue(handler, 50, 10)
00291         handler.finish()
00292 
00293         handler = subscribe.MessageHandler(None, None)
00294         handler = test_default(handler)
00295         handler = test_throttle(handler, 50)
00296         handler = test_queue(handler, 50, 10)
00297         handler.finish()
00298 
00299         handler = subscribe.MessageHandler(None, None)
00300         handler = test_default(handler)
00301         handler = test_queue(handler, 50, 10)
00302         handler = test_throttle(handler, 50)
00303         handler.finish()
00304 
00305         # Test duplicates
00306         handler = subscribe.MessageHandler(None, None)
00307         handler = test_queue(handler, 50, 10)
00308         handler = test_queue(handler, 100, 10)
00309         handler.finish()
00310 
00311         handler = subscribe.MessageHandler(None, None)
00312         handler = test_throttle(handler, 50)
00313         handler = test_throttle(handler, 100)
00314         handler.finish()
00315 
00316         handler = subscribe.MessageHandler(None, None)
00317         handler = test_default(handler)
00318         handler = test_default(handler)
00319         handler.finish()
00320 
00321 
00322 #        handler = test_throttle(handler, 50)
00323 #        handler = test_default(handler)
00324 #        handler = test_throttle(handler, 50)
00325 #        handler = test_default(handler)
00326 #        handler = test_throttle(handler, 50)
00327 
00328 
00329 PKG = 'rosbridge_library'
00330 NAME = 'test_message_handlers'
00331 if __name__ == '__main__':
00332     rostest.unitrun(PKG, NAME, TestMessageHandlers)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Mon Oct 6 2014 06:58:09