test_subscription_modifiers.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib
00004 roslib.load_manifest(PKG)
00005 roslib.load_manifest("std_msgs")
00006 import rospy
00007 
00008 import unittest
00009 import time
00010 
00011 from rosbridge_library.internal import subscription_modifiers as subscribe
00012 
00013 
00014 class TestMessageHandlers(unittest.TestCase):
00015 
00016     def setUp(self):
00017         rospy.init_node("test_message_handlers")
00018 
00019     def dummy_cb(self, msg):
00020         pass
00021 
00022     def test_default_message_handler(self):
00023         msg = "test_default_message_handler"
00024         received = {"msg": None}
00025 
00026         def cb(msg):
00027             received["msg"] = msg
00028 
00029         handler = subscribe.MessageHandler(None, cb)
00030         self.assertTrue(handler.time_remaining() == 0)
00031         t1 = time.time()
00032         handler.handle_message(msg)
00033         t2 = time.time()
00034 
00035         self.assertEqual(received["msg"], msg)
00036         self.assertLessEqual(t1, handler.last_publish)
00037         self.assertLessEqual(handler.last_publish, t2)
00038         self.assertEqual(handler.time_remaining(), 0)
00039 
00040         handler = subscribe.MessageHandler(None, self.dummy_cb)
00041         next_handler = handler.transition()
00042         self.assertEqual(handler, next_handler)
00043 
00044         handler = subscribe.MessageHandler(None, self.dummy_cb)
00045         next_handler = handler.set_throttle_rate(100)
00046         self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
00047 
00048         handler = subscribe.MessageHandler(None, self.dummy_cb)
00049         next_handler = handler.set_queue_length(100)
00050         self.assertIsInstance(next_handler, subscribe.MessageHandler)
00051 
00052         handler = subscribe.MessageHandler(None, self.dummy_cb)
00053         next_handler = handler.set_queue_length(100).set_throttle_rate(100)
00054         self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00055         next_handler.finish()
00056 
00057         received = {"msgs": []}
00058 
00059         def cb(msg):
00060             received["msgs"].append(msg)
00061 
00062         handler.publish = cb
00063         vals = range(10000, 20000)
00064         for x in vals:
00065             handler.handle_message(x)
00066 
00067         self.assertEqual(vals, received["msgs"])
00068 
00069     def test_throttle_message_handler(self):
00070         msg = "test_throttle_message_handler"
00071 
00072         # First, try with a single message
00073         received = {"msg": None}
00074 
00075         def cb(msg):
00076             received["msg"] = msg
00077 
00078         handler = subscribe.MessageHandler(None, cb)
00079         handler = handler.set_throttle_rate(10)
00080         self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
00081         handler.handle_message(msg)
00082         self.assertEqual(received["msg"], msg)
00083 
00084         # Now, saturate with messages, see how many we get
00085         received = {"msgs": []}
00086 
00087         def cb(msg):
00088             received["msgs"].append(msg)
00089         handler.publish = cb
00090 
00091         numsent = 0
00092         last = -1
00093         for i in range(1, 10):
00094             start = time.time()
00095             while (time.time() - start < handler.throttle_rate):
00096                 handler.handle_message(numsent)
00097                 numsent = numsent + 1
00098 
00099             self.assertGreater(numsent, i)
00100             self.assertEqual(len(received["msgs"]), i)
00101             self.assertGreater(received["msgs"][-1], last)
00102             last = numsent
00103 
00104     def test_queue_message_handler_passes_msgs(self):
00105         received = {"msgs": []}
00106 
00107         def cb(msg):
00108             received["msgs"].append(msg)
00109 
00110         handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00111         handler.queue_length = 1000
00112 
00113         msgs = range(1000)
00114         for x in msgs:
00115             handler.handle_message(x)
00116 
00117         time.sleep(0.1)
00118         handler.finish()
00119 
00120         self.assertEqual(msgs, received["msgs"])
00121 
00122     def test_queue_message_handler_stops(self):
00123         received = {"msgs": []}
00124 
00125         def cb(msg):
00126             received["msgs"].append(msg)
00127 
00128         handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00129 
00130         self.assertTrue(handler.is_alive())
00131 
00132         handler.finish()
00133 
00134         self.assertFalse(handler.is_alive())
00135 
00136     def test_queue_message_handler_queue(self):
00137         received = {"msgs": []}
00138 
00139         def cb(msg):
00140             received["msgs"].append(msg)
00141 
00142         msgs = range(1000)
00143 
00144         handler = subscribe.MessageHandler(None, cb)
00145         handler.handle_message("hello")
00146         handler = handler.set_throttle_rate(10000)
00147         handler = handler.set_queue_length(10)
00148 
00149         for x in msgs:
00150             handler.handle_message(x)
00151 
00152         handler.set_throttle_rate(0)
00153 
00154         time.sleep(0.1)
00155 
00156         try:
00157             self.assertEqual(["hello"] + range(990, 1000), received["msgs"])
00158         except:
00159             handler.finish()
00160             raise
00161 
00162         handler.finish()
00163 
00164     def test_queue_message_handler_rate(self):
00165         received = {"msg": None}
00166 
00167         def cb(msg):
00168             received["msg"] = msg
00169 
00170         handler = subscribe.MessageHandler(None, cb)
00171         handler = handler.set_throttle_rate(50)
00172         handler = handler.set_queue_length(10)
00173 
00174         for x in range(10):
00175             handler.handle_message(x)
00176 
00177         time.sleep(0.025)
00178 
00179         try:
00180             for x in range(10):
00181                 self.assertEqual(x, received["msg"])
00182                 time.sleep(0.05)
00183         except:
00184             handler.finish()
00185             raise
00186 
00187         handler.finish()
00188 
00189     def test_transitions(self):
00190         def test_default(handler):
00191             handler = handler.set_queue_length(0)
00192             handler = handler.set_throttle_rate(0)
00193             received = {"msgs": []}
00194             def cb(msg):
00195                 received["msgs"].append(msg)
00196             handler.publish = cb
00197             xs = range(10000)
00198             for x in xs:
00199                 handler.handle_message(x)
00200 
00201             self.assertEqual(received["msgs"], xs)
00202             return handler
00203 
00204         def test_throttle(handler, throttle_rate):
00205             received = {"msgs": []}
00206             def cb(msg):
00207                 received["msgs"].append(msg)
00208             handler = handler.set_queue_length(0)
00209             handler = handler.set_throttle_rate(throttle_rate)
00210             handler.publish = cb
00211             x = 0
00212             for i in range(1, 10):
00213                 fin = time.time() + throttle_rate / 1000.0
00214                 while time.time() < fin:
00215                     handler.handle_message(x)
00216                     x = x + 1
00217                 self.assertEqual(len(received["msgs"]), i)
00218             return handler
00219 
00220         def test_queue(handler, throttle_rate, queue_length):
00221             received = {"msgs": []}
00222 
00223             def cb(msg):
00224                 received["msgs"].append(msg)
00225 
00226             handler = handler.set_throttle_rate(throttle_rate)
00227             handler = handler.set_queue_length(queue_length)
00228 
00229             throttle_rate = throttle_rate / 1000.0
00230 
00231             time.sleep(throttle_rate + 0.01)
00232             handler.last_publish = time.time()
00233             last_msg = time.time() + throttle_rate / 4.0
00234 
00235             handler.publish = cb
00236 
00237             xs = range(1000)
00238             for x in xs:
00239                 handler.handle_message(x)
00240 
00241             try:
00242                 for i in range(0, queue_length - 1):
00243                     time.sleep(throttle_rate - time.time() + last_msg)
00244                     last_msg = time.time()
00245                     self.assertEqual(received["msgs"], xs[len(xs) - queue_length:len(xs) - queue_length + i + 1])
00246             except:
00247                 handler.finish()
00248                 raise
00249 
00250             return handler
00251 
00252         # Test individually
00253         handler = subscribe.MessageHandler(None, None)
00254         handler = test_queue(handler, 50, 10)
00255         handler.finish()
00256 
00257         handler = subscribe.MessageHandler(None, None)
00258         handler = test_throttle(handler, 50)
00259         handler.finish()
00260 
00261         handler = subscribe.MessageHandler(None, None)
00262         handler = test_default(handler)
00263         handler.finish()
00264 
00265         # Test combinations
00266         handler = subscribe.MessageHandler(None, None)
00267         handler = test_queue(handler, 50, 10)
00268         handler = test_throttle(handler, 50)
00269         handler = test_default(handler)
00270         handler.finish()
00271 
00272         handler = subscribe.MessageHandler(None, None)
00273         handler = test_queue(handler, 50, 10)
00274         handler = test_default(handler)
00275         handler = test_throttle(handler, 50)
00276         handler.finish()
00277 
00278         handler = subscribe.MessageHandler(None, None)
00279         handler = test_throttle(handler, 50)
00280         handler = test_queue(handler, 50, 10)
00281         handler = test_default(handler)
00282         handler.finish()
00283 
00284         handler = subscribe.MessageHandler(None, None)
00285         handler = test_throttle(handler, 50)
00286         handler = test_default(handler)
00287         handler = test_queue(handler, 50, 10)
00288         handler.finish()
00289 
00290         handler = subscribe.MessageHandler(None, None)
00291         handler = test_default(handler)
00292         handler = test_throttle(handler, 50)
00293         handler = test_queue(handler, 50, 10)
00294         handler.finish()
00295 
00296         handler = subscribe.MessageHandler(None, None)
00297         handler = test_default(handler)
00298         handler = test_queue(handler, 50, 10)
00299         handler = test_throttle(handler, 50)
00300         handler.finish()
00301 
00302         # Test duplicates
00303         handler = subscribe.MessageHandler(None, None)
00304         handler = test_queue(handler, 50, 10)
00305         handler = test_queue(handler, 100, 10)
00306         handler.finish()
00307 
00308         handler = subscribe.MessageHandler(None, None)
00309         handler = test_throttle(handler, 50)
00310         handler = test_throttle(handler, 100)
00311         handler.finish()
00312 
00313         handler = subscribe.MessageHandler(None, None)
00314         handler = test_default(handler)
00315         handler = test_default(handler)
00316         handler.finish()
00317 
00318 
00319 #        handler = test_throttle(handler, 50)
00320 #        handler = test_default(handler)
00321 #        handler = test_throttle(handler, 50)
00322 #        handler = test_default(handler)
00323 #        handler = test_throttle(handler, 50)
00324 
00325 
00326 
00327 if __name__ == '__main__':
00328     import rostest
00329     rostest.rosrun(PKG, 'test_message_handlers', TestMessageHandlers)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04