test_subscription_modifiers.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007 
00008 from rosbridge_library.internal import subscription_modifiers as subscribe
00009 
00010 
00011 class TestMessageHandlers(unittest.TestCase):
00012 
00013     def setUp(self):
00014         rospy.init_node("test_message_handlers")
00015 
00016     def dummy_cb(self, msg):
00017         pass
00018 
00019     def test_default_message_handler(self):
00020         handler = subscribe.MessageHandler(None, self.dummy_cb)
00021         self.help_test_default(handler)
00022 
00023     def test_throttle_message_handler(self):
00024         handler = subscribe.ThrottleMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
00025         self.help_test_throttle(handler, 50)
00026 
00027     def test_queue_message_handler_passes_msgs(self):
00028         handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
00029         self.help_test_queue(handler, 1000)
00030         handler.finish()
00031 
00032     def test_queue_message_handler_stops(self):
00033         received = {"msgs": []}
00034 
00035         def cb(msg):
00036             received["msgs"].append(msg)
00037 
00038         handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00039 
00040         self.assertTrue(handler.is_alive())
00041 
00042         handler.finish()
00043 
00044         self.assertFalse(handler.is_alive())
00045 
00046     def test_queue_message_handler_queue(self):
00047         received = {"msgs": []}
00048 
00049         def cb(msg):
00050             received["msgs"].append(msg)
00051 
00052         msgs = range(1000)
00053 
00054         handler = subscribe.MessageHandler(None, cb)
00055 
00056         handler = handler.set_throttle_rate(10000)
00057         handler = handler.set_queue_length(10)
00058         self.assertIsInstance(handler, subscribe.QueueMessageHandler)
00059 
00060         # 'hello' is handled immediately
00061         handler.handle_message("hello")
00062         time.sleep(0.02)
00063         # queue is now empty, but throttling is in effect
00064         # no messages will be handled in the next 10 seconds
00065 
00066         # these will fill up the queue, with newer values displacing old ones
00067         # nothing gets sent because the throttle rate 
00068         for x in msgs:
00069             handler.handle_message(x)
00070 
00071         handler = handler.set_throttle_rate(0)
00072 
00073         time.sleep(0.1)
00074 
00075         try:
00076             self.assertEqual(["hello"] + range(990, 1000), received["msgs"])
00077         except:
00078             handler.finish()
00079             raise
00080 
00081         handler.finish()
00082 
00083     def test_queue_message_handler_rate(self):
00084         handler = subscribe.MessageHandler(None, self.dummy_cb)
00085         self.help_test_queue_rate(handler, 50, 10)
00086         handler.finish()
00087 
00088     # Helper methods for each of the three Handler types, plus one for Queue+Rate.
00089     # Used in standalone testing as well as the test_transition_functionality test
00090     def help_test_default(self, handler):
00091         handler = handler.set_queue_length(0)
00092         handler = handler.set_throttle_rate(0)
00093         self.assertIsInstance(handler, subscribe.MessageHandler)
00094 
00095         msg = "test_default_message_handler"
00096         received = {"msg": None}
00097 
00098         def cb(msg):
00099             received["msg"] = msg
00100         handler.publish = cb
00101 
00102         self.assertTrue(handler.time_remaining() == 0)
00103         t1 = time.time()
00104         handler.handle_message(msg)
00105         t2 = time.time()
00106 
00107         self.assertEqual(received["msg"], msg)
00108         self.assertLessEqual(t1, handler.last_publish)
00109         self.assertLessEqual(handler.last_publish, t2)
00110         self.assertEqual(handler.time_remaining(), 0)
00111 
00112         received = {"msgs": []}
00113         def cb(msg):
00114             received["msgs"].append(msg)
00115         handler.publish = cb
00116         xs = range(10000)
00117         for x in xs:
00118             handler.handle_message(x)
00119 
00120         self.assertEqual(received["msgs"], xs)
00121         return handler
00122 
00123     def help_test_throttle(self, handler, throttle_rate):
00124         handler = handler.set_queue_length(0)
00125         handler = handler.set_throttle_rate(throttle_rate)
00126         self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
00127         
00128         msg = "test_throttle_message_handler"
00129         
00130         # First, try with a single message
00131         received = {"msg": None}
00132             
00133         def cb(msg):
00134             received["msg"] = msg
00135                 
00136         handler.publish = cb
00137 
00138         # ensure the handler doesn't swallow this message
00139         time.sleep(2.0 * handler.throttle_rate)
00140         handler.handle_message(msg)
00141         self.assertEqual(received["msg"], msg)
00142         
00143         # sleep to make sure the handler sends right away for the second part
00144         time.sleep(2.0 * handler.throttle_rate)
00145 
00146         received = {"msgs": []}
00147         def cb(msg):
00148             received["msgs"].append(msg)
00149 
00150         handler.publish = cb
00151         x = 0
00152         time_padding = handler.throttle_rate / 4.0
00153         for i in range(1, 10):
00154             # We guarantee that in the while loop below only the first message is handled
00155             # All subsequent messages (within throttling window - time_padding ) are dropped
00156             # Time padding is a test-only hack around race condition when time.time() - fin is within
00157             # the throttling window, but handler.handle_message(x) gets a later timestamp that is outside.
00158             time.sleep(2.0 * time_padding)
00159             fin = time.time() + throttle_rate / 1000.0 - time_padding
00160             while time.time() < fin:
00161                 handler.handle_message(x)
00162                 x = x + 1
00163             self.assertEqual(len(received["msgs"]), i)
00164         return handler
00165 
00166     def help_test_queue(self, handler, queue_length):
00167         handler = handler.set_queue_length(queue_length)
00168         self.assertIsInstance(handler, subscribe.QueueMessageHandler)
00169         
00170         received = {"msgs": []}
00171 
00172         def cb(msg):
00173             received["msgs"].append(msg)
00174 
00175         handler.publish = cb
00176 
00177         msgs = range(queue_length)
00178         for x in msgs:
00179             handler.handle_message(x)
00180 
00181         time.sleep(0.1)
00182 
00183         self.assertEqual(msgs, received["msgs"])
00184         return handler
00185 
00186     def help_test_queue_rate(self, handler, throttle_rate, queue_length):
00187         handler = handler.set_throttle_rate(throttle_rate)
00188         handler = handler.set_queue_length(queue_length)
00189         self.assertIsInstance(handler, subscribe.QueueMessageHandler)
00190 
00191         received = {"msg": None}
00192 
00193         def cb(msg):
00194             received["msg"] = msg
00195 
00196         handler.publish = cb
00197 
00198         throttle_rate_sec = throttle_rate / 1000.0
00199         
00200         # ensure previous tests' last sent time is long enough ago
00201         time.sleep(throttle_rate_sec)
00202         for x in range(queue_length):
00203             handler.handle_message(x)
00204 
00205         time.sleep(throttle_rate_sec / 2.0)
00206 
00207         try:
00208             for x in range(10):
00209                 self.assertEqual(x, received["msg"])
00210                 time.sleep(throttle_rate_sec)
00211         except:
00212             handler.finish()
00213             raise
00214 
00215         return handler
00216 
00217 # Test that each transition works and is stable
00218     def test_transitions(self):
00219         # MessageHandler.transition is stable
00220         handler = subscribe.MessageHandler(None, self.dummy_cb)
00221         next_handler = handler.transition()
00222         self.assertEqual(handler, next_handler)
00223 
00224         # Going from MessageHandler to ThrottleMessageHandler...
00225         handler = subscribe.MessageHandler(None, self.dummy_cb)
00226         next_handler = handler.set_throttle_rate(100)
00227         self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
00228         handler = next_handler
00229         # Testing transition returns another ThrottleMessageHandler
00230         next_handler = handler.transition()
00231         self.assertEqual(handler, next_handler)
00232         # And finally going back to MessageHandler
00233         next_handler = handler.set_throttle_rate(0)
00234         self.assertIsInstance(next_handler, subscribe.MessageHandler)
00235 
00236         # Same for QueueMessageHandler
00237         handler = subscribe.MessageHandler(None, self.dummy_cb)
00238         next_handler = handler.set_queue_length(100)
00239         self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00240         handler = next_handler
00241         next_handler = handler.transition()
00242         self.assertEqual(handler, next_handler)
00243         next_handler = handler.set_queue_length(0)
00244         self.assertIsInstance(next_handler, subscribe.MessageHandler)
00245 
00246         # Checking a QueueMessageHandler with rate limit can be generated both ways
00247         handler = subscribe.MessageHandler(None, self.dummy_cb)
00248         next_handler = handler.set_queue_length(100).set_throttle_rate(100)
00249         self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00250         next_handler.finish()
00251         next_handler = handler.set_throttle_rate(100).set_queue_length(100)
00252         self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00253         next_handler.finish()
00254         handler = next_handler
00255         next_handler = handler.transition()
00256         self.assertEqual(handler, next_handler)
00257         # Check both steps on the way back to plain MessageHandler
00258         next_handler = handler.set_throttle_rate(0)
00259         self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00260         next_handler = handler.set_queue_length(0)
00261         self.assertIsInstance(next_handler, subscribe.MessageHandler)
00262 
00263     def test_transition_functionality(self):
00264         # Test individually
00265         handler = subscribe.MessageHandler(None, None)
00266         handler = self.help_test_queue(handler, 10)
00267         handler.finish()
00268 
00269         handler = subscribe.MessageHandler(None, None)
00270         handler = self.help_test_throttle(handler, 50)
00271         handler.finish()
00272 
00273         handler = subscribe.MessageHandler(None, None)
00274         handler = self.help_test_default(handler)
00275         handler.finish()
00276 
00277         # Test combinations
00278         handler = subscribe.MessageHandler(None, None)
00279         handler = self.help_test_queue(handler, 10)
00280         handler = self.help_test_throttle(handler, 50)
00281         handler = self.help_test_default(handler)
00282         handler.finish()
00283 
00284         handler = subscribe.MessageHandler(None, None)
00285         handler = self.help_test_queue(handler, 10)
00286         handler = self.help_test_default(handler)
00287         handler = self.help_test_throttle(handler, 50)
00288         handler.finish()
00289 
00290         handler = subscribe.MessageHandler(None, None)
00291         handler = self.help_test_throttle(handler, 50)
00292         handler = self.help_test_queue_rate(handler, 50, 10)
00293         handler = self.help_test_default(handler)
00294         handler.finish()
00295 
00296         handler = subscribe.MessageHandler(None, None)
00297         handler = self.help_test_throttle(handler, 50)
00298         handler = self.help_test_default(handler)
00299         handler = self.help_test_queue_rate(handler, 50, 10)
00300         handler.finish()
00301 
00302         handler = subscribe.MessageHandler(None, None)
00303         handler = self.help_test_default(handler)
00304         handler = self.help_test_throttle(handler, 50)
00305         handler = self.help_test_queue_rate(handler, 50, 10)
00306         handler.finish()
00307 
00308         handler = subscribe.MessageHandler(None, None)
00309         handler = self.help_test_default(handler)
00310         handler = self.help_test_queue(handler, 10)
00311         handler = self.help_test_throttle(handler, 50)
00312         handler.finish()
00313 
00314         # Test duplicates
00315         handler = subscribe.MessageHandler(None, None)
00316         handler = self.help_test_queue_rate(handler, 50, 10)
00317         handler = self.help_test_queue_rate(handler, 100, 10)
00318         handler.finish()
00319 
00320         handler = subscribe.MessageHandler(None, None)
00321         handler = self.help_test_throttle(handler, 50)
00322         handler = self.help_test_throttle(handler, 100)
00323         handler.finish()
00324 
00325         handler = subscribe.MessageHandler(None, None)
00326         handler = self.help_test_default(handler)
00327         handler = self.help_test_default(handler)
00328         handler.finish()
00329 
00330 
00331 #        handler = self.help_test_throttle(handler, 50)
00332 #        handler = self.help_test_default(handler)
00333 #        handler = self.help_test_throttle(handler, 50)
00334 #        handler = self.help_test_default(handler)
00335 #        handler = self.help_test_throttle(handler, 50)
00336 
00337 
00338 PKG = 'rosbridge_library'
00339 NAME = 'test_message_handlers'
00340 if __name__ == '__main__':
00341     rostest.unitrun(PKG, NAME, TestMessageHandlers)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Aug 27 2015 14:50:35