00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007
00008 from rosbridge_library.internal import subscription_modifiers as subscribe
00009
00010
00011 class TestMessageHandlers(unittest.TestCase):
00012
00013 def setUp(self):
00014 rospy.init_node("test_message_handlers")
00015
00016 def dummy_cb(self, msg):
00017 pass
00018
00019 def test_default_message_handler(self):
00020 msg = "test_default_message_handler"
00021 received = {"msg": None}
00022
00023 def cb(msg):
00024 received["msg"] = msg
00025
00026 handler = subscribe.MessageHandler(None, cb)
00027 self.assertTrue(handler.time_remaining() == 0)
00028 t1 = time.time()
00029 handler.handle_message(msg)
00030 t2 = time.time()
00031
00032 self.assertEqual(received["msg"], msg)
00033 self.assertLessEqual(t1, handler.last_publish)
00034 self.assertLessEqual(handler.last_publish, t2)
00035 self.assertEqual(handler.time_remaining(), 0)
00036
00037 handler = subscribe.MessageHandler(None, self.dummy_cb)
00038 next_handler = handler.transition()
00039 self.assertEqual(handler, next_handler)
00040
00041 handler = subscribe.MessageHandler(None, self.dummy_cb)
00042 next_handler = handler.set_throttle_rate(100)
00043 self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
00044
00045 handler = subscribe.MessageHandler(None, self.dummy_cb)
00046 next_handler = handler.set_queue_length(100)
00047 self.assertIsInstance(next_handler, subscribe.MessageHandler)
00048
00049 handler = subscribe.MessageHandler(None, self.dummy_cb)
00050 next_handler = handler.set_queue_length(100).set_throttle_rate(100)
00051 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00052 next_handler.finish()
00053
00054 received = {"msgs": []}
00055
00056 def cb(msg):
00057 received["msgs"].append(msg)
00058
00059 handler.publish = cb
00060 vals = range(10000, 20000)
00061 for x in vals:
00062 handler.handle_message(x)
00063
00064 self.assertEqual(vals, received["msgs"])
00065
00066 def test_throttle_message_handler(self):
00067 msg = "test_throttle_message_handler"
00068
00069
00070 received = {"msg": None}
00071
00072 def cb(msg):
00073 received["msg"] = msg
00074
00075 handler = subscribe.MessageHandler(None, cb)
00076 handler = handler.set_throttle_rate(10)
00077 self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
00078 handler.handle_message(msg)
00079 self.assertEqual(received["msg"], msg)
00080
00081
00082 received = {"msgs": []}
00083
00084 def cb(msg):
00085 received["msgs"].append(msg)
00086 handler.publish = cb
00087
00088 numsent = 0
00089 last = -1
00090 for i in range(1, 10):
00091 start = time.time()
00092 while (time.time() - start < handler.throttle_rate):
00093 handler.handle_message(numsent)
00094 numsent = numsent + 1
00095
00096 self.assertGreater(numsent, i)
00097 self.assertEqual(len(received["msgs"]), i)
00098 self.assertGreater(received["msgs"][-1], last)
00099 last = numsent
00100
00101 def test_queue_message_handler_passes_msgs(self):
00102 received = {"msgs": []}
00103
00104 def cb(msg):
00105 received["msgs"].append(msg)
00106
00107 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00108 handler.queue_length = 1000
00109
00110 msgs = range(1000)
00111 for x in msgs:
00112 handler.handle_message(x)
00113
00114 time.sleep(0.1)
00115 handler.finish()
00116
00117 self.assertEqual(msgs, received["msgs"])
00118
00119 def test_queue_message_handler_stops(self):
00120 received = {"msgs": []}
00121
00122 def cb(msg):
00123 received["msgs"].append(msg)
00124
00125 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00126
00127 self.assertTrue(handler.is_alive())
00128
00129 handler.finish()
00130
00131 self.assertFalse(handler.is_alive())
00132
00133 def test_queue_message_handler_queue(self):
00134 received = {"msgs": []}
00135
00136 def cb(msg):
00137 received["msgs"].append(msg)
00138
00139 msgs = range(1000)
00140
00141 handler = subscribe.MessageHandler(None, cb)
00142 handler.handle_message("hello")
00143 handler = handler.set_throttle_rate(10000)
00144 handler = handler.set_queue_length(10)
00145
00146 for x in msgs:
00147 handler.handle_message(x)
00148
00149 handler.set_throttle_rate(0)
00150
00151 time.sleep(0.1)
00152
00153 try:
00154 self.assertEqual(["hello"] + range(990, 1000), received["msgs"])
00155 except:
00156 handler.finish()
00157 raise
00158
00159 handler.finish()
00160
00161 def test_queue_message_handler_rate(self):
00162 received = {"msg": None}
00163
00164 def cb(msg):
00165 received["msg"] = msg
00166
00167 handler = subscribe.MessageHandler(None, cb)
00168 handler = handler.set_throttle_rate(50)
00169 handler = handler.set_queue_length(10)
00170
00171 for x in range(10):
00172 handler.handle_message(x)
00173
00174 time.sleep(0.025)
00175
00176 try:
00177 for x in range(10):
00178 self.assertEqual(x, received["msg"])
00179 time.sleep(0.05)
00180 except:
00181 handler.finish()
00182 raise
00183
00184 handler.finish()
00185
00186 def test_transitions(self):
00187 def test_default(handler):
00188 handler = handler.set_queue_length(0)
00189 handler = handler.set_throttle_rate(0)
00190 received = {"msgs": []}
00191 def cb(msg):
00192 received["msgs"].append(msg)
00193 handler.publish = cb
00194 xs = range(10000)
00195 for x in xs:
00196 handler.handle_message(x)
00197
00198 self.assertEqual(received["msgs"], xs)
00199 return handler
00200
00201 def test_throttle(handler, throttle_rate):
00202 received = {"msgs": []}
00203 def cb(msg):
00204 received["msgs"].append(msg)
00205 handler = handler.set_queue_length(0)
00206 handler = handler.set_throttle_rate(throttle_rate)
00207 handler.publish = cb
00208 x = 0
00209 time_padding = 0.01
00210 for i in range(1, 10):
00211
00212
00213
00214
00215 time.sleep(2*time_padding)
00216 fin = time.time() + throttle_rate / 1000.0 - time_padding
00217 while time.time() < fin:
00218 handler.handle_message(x)
00219 x = x + 1
00220 self.assertEqual(len(received["msgs"]), i)
00221 return handler
00222
00223 def test_queue(handler, throttle_rate, queue_length):
00224 received = {"msgs": []}
00225
00226 def cb(msg):
00227 received["msgs"].append(msg)
00228
00229 handler = handler.set_throttle_rate(throttle_rate)
00230 handler = handler.set_queue_length(queue_length)
00231
00232 throttle_rate = throttle_rate / 1000.0
00233
00234 time.sleep(throttle_rate + 0.01)
00235 handler.last_publish = time.time()
00236 last_msg = time.time() + throttle_rate / 4.0
00237
00238 handler.publish = cb
00239
00240 xs = range(1000)
00241 for x in xs:
00242 handler.handle_message(x)
00243
00244 try:
00245 for i in range(0, queue_length - 1):
00246 time.sleep(throttle_rate - time.time() + last_msg)
00247 last_msg = time.time()
00248 self.assertEqual(received["msgs"], xs[len(xs) - queue_length:len(xs) - queue_length + i + 1])
00249 except:
00250 handler.finish()
00251 raise
00252
00253 return handler
00254
00255
00256 handler = subscribe.MessageHandler(None, None)
00257 handler = test_queue(handler, 50, 10)
00258 handler.finish()
00259
00260 handler = subscribe.MessageHandler(None, None)
00261 handler = test_throttle(handler, 50)
00262 handler.finish()
00263
00264 handler = subscribe.MessageHandler(None, None)
00265 handler = test_default(handler)
00266 handler.finish()
00267
00268
00269 handler = subscribe.MessageHandler(None, None)
00270 handler = test_queue(handler, 50, 10)
00271 handler = test_throttle(handler, 50)
00272 handler = test_default(handler)
00273 handler.finish()
00274
00275 handler = subscribe.MessageHandler(None, None)
00276 handler = test_queue(handler, 50, 10)
00277 handler = test_default(handler)
00278 handler = test_throttle(handler, 50)
00279 handler.finish()
00280
00281 handler = subscribe.MessageHandler(None, None)
00282 handler = test_throttle(handler, 50)
00283 handler = test_queue(handler, 50, 10)
00284 handler = test_default(handler)
00285 handler.finish()
00286
00287 handler = subscribe.MessageHandler(None, None)
00288 handler = test_throttle(handler, 50)
00289 handler = test_default(handler)
00290 handler = test_queue(handler, 50, 10)
00291 handler.finish()
00292
00293 handler = subscribe.MessageHandler(None, None)
00294 handler = test_default(handler)
00295 handler = test_throttle(handler, 50)
00296 handler = test_queue(handler, 50, 10)
00297 handler.finish()
00298
00299 handler = subscribe.MessageHandler(None, None)
00300 handler = test_default(handler)
00301 handler = test_queue(handler, 50, 10)
00302 handler = test_throttle(handler, 50)
00303 handler.finish()
00304
00305
00306 handler = subscribe.MessageHandler(None, None)
00307 handler = test_queue(handler, 50, 10)
00308 handler = test_queue(handler, 100, 10)
00309 handler.finish()
00310
00311 handler = subscribe.MessageHandler(None, None)
00312 handler = test_throttle(handler, 50)
00313 handler = test_throttle(handler, 100)
00314 handler.finish()
00315
00316 handler = subscribe.MessageHandler(None, None)
00317 handler = test_default(handler)
00318 handler = test_default(handler)
00319 handler.finish()
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329 PKG = 'rosbridge_library'
00330 NAME = 'test_message_handlers'
00331 if __name__ == '__main__':
00332 rostest.unitrun(PKG, NAME, TestMessageHandlers)