00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007
00008 from rosbridge_library.internal import subscription_modifiers as subscribe
00009
00010
00011 class TestMessageHandlers(unittest.TestCase):
00012
00013 def setUp(self):
00014 rospy.init_node("test_message_handlers")
00015
00016 def dummy_cb(self, msg):
00017 pass
00018
00019 def test_default_message_handler(self):
00020 handler = subscribe.MessageHandler(None, self.dummy_cb)
00021 self.help_test_default(handler)
00022
00023 def test_throttle_message_handler(self):
00024 handler = subscribe.ThrottleMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
00025 self.help_test_throttle(handler, 50)
00026
00027 def test_queue_message_handler_passes_msgs(self):
00028 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, self.dummy_cb))
00029 self.help_test_queue(handler, 1000)
00030 handler.finish()
00031
00032 def test_queue_message_handler_stops(self):
00033 received = {"msgs": []}
00034
00035 def cb(msg):
00036 received["msgs"].append(msg)
00037
00038 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00039
00040 self.assertTrue(handler.is_alive())
00041
00042 handler.finish()
00043
00044 self.assertFalse(handler.is_alive())
00045
00046 def test_queue_message_handler_queue(self):
00047 received = {"msgs": []}
00048
00049 def cb(msg):
00050 received["msgs"].append(msg)
00051
00052 msgs = range(1000)
00053
00054 handler = subscribe.MessageHandler(None, cb)
00055
00056 handler = handler.set_throttle_rate(10000)
00057 handler = handler.set_queue_length(10)
00058 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
00059
00060
00061 handler.handle_message("hello")
00062 time.sleep(0.02)
00063
00064
00065
00066
00067
00068 for x in msgs:
00069 handler.handle_message(x)
00070
00071 handler = handler.set_throttle_rate(0)
00072
00073 time.sleep(0.1)
00074
00075 try:
00076 self.assertEqual(["hello"] + range(990, 1000), received["msgs"])
00077 except:
00078 handler.finish()
00079 raise
00080
00081 handler.finish()
00082
00083 def test_queue_message_handler_rate(self):
00084 handler = subscribe.MessageHandler(None, self.dummy_cb)
00085 self.help_test_queue_rate(handler, 50, 10)
00086 handler.finish()
00087
00088
00089
00090 def help_test_default(self, handler):
00091 handler = handler.set_queue_length(0)
00092 handler = handler.set_throttle_rate(0)
00093 self.assertIsInstance(handler, subscribe.MessageHandler)
00094
00095 msg = "test_default_message_handler"
00096 received = {"msg": None}
00097
00098 def cb(msg):
00099 received["msg"] = msg
00100 handler.publish = cb
00101
00102 self.assertTrue(handler.time_remaining() == 0)
00103 t1 = time.time()
00104 handler.handle_message(msg)
00105 t2 = time.time()
00106
00107 self.assertEqual(received["msg"], msg)
00108 self.assertLessEqual(t1, handler.last_publish)
00109 self.assertLessEqual(handler.last_publish, t2)
00110 self.assertEqual(handler.time_remaining(), 0)
00111
00112 received = {"msgs": []}
00113 def cb(msg):
00114 received["msgs"].append(msg)
00115 handler.publish = cb
00116 xs = range(10000)
00117 for x in xs:
00118 handler.handle_message(x)
00119
00120 self.assertEqual(received["msgs"], xs)
00121 return handler
00122
00123 def help_test_throttle(self, handler, throttle_rate):
00124 handler = handler.set_queue_length(0)
00125 handler = handler.set_throttle_rate(throttle_rate)
00126 self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
00127
00128 msg = "test_throttle_message_handler"
00129
00130
00131 received = {"msg": None}
00132
00133 def cb(msg):
00134 received["msg"] = msg
00135
00136 handler.publish = cb
00137
00138
00139 time.sleep(2.0 * handler.throttle_rate)
00140 handler.handle_message(msg)
00141 self.assertEqual(received["msg"], msg)
00142
00143
00144 time.sleep(2.0 * handler.throttle_rate)
00145
00146 received = {"msgs": []}
00147 def cb(msg):
00148 received["msgs"].append(msg)
00149
00150 handler.publish = cb
00151 x = 0
00152 time_padding = handler.throttle_rate / 4.0
00153 for i in range(1, 10):
00154
00155
00156
00157
00158 time.sleep(2.0 * time_padding)
00159 fin = time.time() + throttle_rate / 1000.0 - time_padding
00160 while time.time() < fin:
00161 handler.handle_message(x)
00162 x = x + 1
00163 self.assertEqual(len(received["msgs"]), i)
00164 return handler
00165
00166 def help_test_queue(self, handler, queue_length):
00167 handler = handler.set_queue_length(queue_length)
00168 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
00169
00170 received = {"msgs": []}
00171
00172 def cb(msg):
00173 received["msgs"].append(msg)
00174
00175 handler.publish = cb
00176
00177 msgs = range(queue_length)
00178 for x in msgs:
00179 handler.handle_message(x)
00180
00181 time.sleep(0.1)
00182
00183 self.assertEqual(msgs, received["msgs"])
00184 return handler
00185
00186 def help_test_queue_rate(self, handler, throttle_rate, queue_length):
00187 handler = handler.set_throttle_rate(throttle_rate)
00188 handler = handler.set_queue_length(queue_length)
00189 self.assertIsInstance(handler, subscribe.QueueMessageHandler)
00190
00191 received = {"msg": None}
00192
00193 def cb(msg):
00194 received["msg"] = msg
00195
00196 handler.publish = cb
00197
00198 throttle_rate_sec = throttle_rate / 1000.0
00199
00200
00201 time.sleep(throttle_rate_sec)
00202 for x in range(queue_length):
00203 handler.handle_message(x)
00204
00205 time.sleep(throttle_rate_sec / 2.0)
00206
00207 try:
00208 for x in range(10):
00209 self.assertEqual(x, received["msg"])
00210 time.sleep(throttle_rate_sec)
00211 except:
00212 handler.finish()
00213 raise
00214
00215 return handler
00216
00217
00218 def test_transitions(self):
00219
00220 handler = subscribe.MessageHandler(None, self.dummy_cb)
00221 next_handler = handler.transition()
00222 self.assertEqual(handler, next_handler)
00223
00224
00225 handler = subscribe.MessageHandler(None, self.dummy_cb)
00226 next_handler = handler.set_throttle_rate(100)
00227 self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
00228 handler = next_handler
00229
00230 next_handler = handler.transition()
00231 self.assertEqual(handler, next_handler)
00232
00233 next_handler = handler.set_throttle_rate(0)
00234 self.assertIsInstance(next_handler, subscribe.MessageHandler)
00235
00236
00237 handler = subscribe.MessageHandler(None, self.dummy_cb)
00238 next_handler = handler.set_queue_length(100)
00239 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00240 handler = next_handler
00241 next_handler = handler.transition()
00242 self.assertEqual(handler, next_handler)
00243 next_handler = handler.set_queue_length(0)
00244 self.assertIsInstance(next_handler, subscribe.MessageHandler)
00245
00246
00247 handler = subscribe.MessageHandler(None, self.dummy_cb)
00248 next_handler = handler.set_queue_length(100).set_throttle_rate(100)
00249 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00250 next_handler.finish()
00251 next_handler = handler.set_throttle_rate(100).set_queue_length(100)
00252 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00253 next_handler.finish()
00254 handler = next_handler
00255 next_handler = handler.transition()
00256 self.assertEqual(handler, next_handler)
00257
00258 next_handler = handler.set_throttle_rate(0)
00259 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00260 next_handler = handler.set_queue_length(0)
00261 self.assertIsInstance(next_handler, subscribe.MessageHandler)
00262
00263 def test_transition_functionality(self):
00264
00265 handler = subscribe.MessageHandler(None, None)
00266 handler = self.help_test_queue(handler, 10)
00267 handler.finish()
00268
00269 handler = subscribe.MessageHandler(None, None)
00270 handler = self.help_test_throttle(handler, 50)
00271 handler.finish()
00272
00273 handler = subscribe.MessageHandler(None, None)
00274 handler = self.help_test_default(handler)
00275 handler.finish()
00276
00277
00278 handler = subscribe.MessageHandler(None, None)
00279 handler = self.help_test_queue(handler, 10)
00280 handler = self.help_test_throttle(handler, 50)
00281 handler = self.help_test_default(handler)
00282 handler.finish()
00283
00284 handler = subscribe.MessageHandler(None, None)
00285 handler = self.help_test_queue(handler, 10)
00286 handler = self.help_test_default(handler)
00287 handler = self.help_test_throttle(handler, 50)
00288 handler.finish()
00289
00290 handler = subscribe.MessageHandler(None, None)
00291 handler = self.help_test_throttle(handler, 50)
00292 handler = self.help_test_queue_rate(handler, 50, 10)
00293 handler = self.help_test_default(handler)
00294 handler.finish()
00295
00296 handler = subscribe.MessageHandler(None, None)
00297 handler = self.help_test_throttle(handler, 50)
00298 handler = self.help_test_default(handler)
00299 handler = self.help_test_queue_rate(handler, 50, 10)
00300 handler.finish()
00301
00302 handler = subscribe.MessageHandler(None, None)
00303 handler = self.help_test_default(handler)
00304 handler = self.help_test_throttle(handler, 50)
00305 handler = self.help_test_queue_rate(handler, 50, 10)
00306 handler.finish()
00307
00308 handler = subscribe.MessageHandler(None, None)
00309 handler = self.help_test_default(handler)
00310 handler = self.help_test_queue(handler, 10)
00311 handler = self.help_test_throttle(handler, 50)
00312 handler.finish()
00313
00314
00315 handler = subscribe.MessageHandler(None, None)
00316 handler = self.help_test_queue_rate(handler, 50, 10)
00317 handler = self.help_test_queue_rate(handler, 100, 10)
00318 handler.finish()
00319
00320 handler = subscribe.MessageHandler(None, None)
00321 handler = self.help_test_throttle(handler, 50)
00322 handler = self.help_test_throttle(handler, 100)
00323 handler.finish()
00324
00325 handler = subscribe.MessageHandler(None, None)
00326 handler = self.help_test_default(handler)
00327 handler = self.help_test_default(handler)
00328 handler.finish()
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338 PKG = 'rosbridge_library'
00339 NAME = 'test_message_handlers'
00340 if __name__ == '__main__':
00341 rostest.unitrun(PKG, NAME, TestMessageHandlers)