00001
00002 PKG = 'rosbridge_library'
00003 import roslib
00004 roslib.load_manifest(PKG)
00005 roslib.load_manifest("std_msgs")
00006 import rospy
00007
00008 import unittest
00009 import time
00010
00011 from rosbridge_library.internal import subscription_modifiers as subscribe
00012
00013
00014 class TestMessageHandlers(unittest.TestCase):
00015
00016 def setUp(self):
00017 rospy.init_node("test_message_handlers")
00018
00019 def dummy_cb(self, msg):
00020 pass
00021
00022 def test_default_message_handler(self):
00023 msg = "test_default_message_handler"
00024 received = {"msg": None}
00025
00026 def cb(msg):
00027 received["msg"] = msg
00028
00029 handler = subscribe.MessageHandler(None, cb)
00030 self.assertTrue(handler.time_remaining() == 0)
00031 t1 = time.time()
00032 handler.handle_message(msg)
00033 t2 = time.time()
00034
00035 self.assertEqual(received["msg"], msg)
00036 self.assertLessEqual(t1, handler.last_publish)
00037 self.assertLessEqual(handler.last_publish, t2)
00038 self.assertEqual(handler.time_remaining(), 0)
00039
00040 handler = subscribe.MessageHandler(None, self.dummy_cb)
00041 next_handler = handler.transition()
00042 self.assertEqual(handler, next_handler)
00043
00044 handler = subscribe.MessageHandler(None, self.dummy_cb)
00045 next_handler = handler.set_throttle_rate(100)
00046 self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler)
00047
00048 handler = subscribe.MessageHandler(None, self.dummy_cb)
00049 next_handler = handler.set_queue_length(100)
00050 self.assertIsInstance(next_handler, subscribe.MessageHandler)
00051
00052 handler = subscribe.MessageHandler(None, self.dummy_cb)
00053 next_handler = handler.set_queue_length(100).set_throttle_rate(100)
00054 self.assertIsInstance(next_handler, subscribe.QueueMessageHandler)
00055 next_handler.finish()
00056
00057 received = {"msgs": []}
00058
00059 def cb(msg):
00060 received["msgs"].append(msg)
00061
00062 handler.publish = cb
00063 vals = range(10000, 20000)
00064 for x in vals:
00065 handler.handle_message(x)
00066
00067 self.assertEqual(vals, received["msgs"])
00068
00069 def test_throttle_message_handler(self):
00070 msg = "test_throttle_message_handler"
00071
00072
00073 received = {"msg": None}
00074
00075 def cb(msg):
00076 received["msg"] = msg
00077
00078 handler = subscribe.MessageHandler(None, cb)
00079 handler = handler.set_throttle_rate(10)
00080 self.assertIsInstance(handler, subscribe.ThrottleMessageHandler)
00081 handler.handle_message(msg)
00082 self.assertEqual(received["msg"], msg)
00083
00084
00085 received = {"msgs": []}
00086
00087 def cb(msg):
00088 received["msgs"].append(msg)
00089 handler.publish = cb
00090
00091 numsent = 0
00092 last = -1
00093 for i in range(1, 10):
00094 start = time.time()
00095 while (time.time() - start < handler.throttle_rate):
00096 handler.handle_message(numsent)
00097 numsent = numsent + 1
00098
00099 self.assertGreater(numsent, i)
00100 self.assertEqual(len(received["msgs"]), i)
00101 self.assertGreater(received["msgs"][-1], last)
00102 last = numsent
00103
00104 def test_queue_message_handler_passes_msgs(self):
00105 received = {"msgs": []}
00106
00107 def cb(msg):
00108 received["msgs"].append(msg)
00109
00110 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00111 handler.queue_length = 1000
00112
00113 msgs = range(1000)
00114 for x in msgs:
00115 handler.handle_message(x)
00116
00117 time.sleep(0.1)
00118 handler.finish()
00119
00120 self.assertEqual(msgs, received["msgs"])
00121
00122 def test_queue_message_handler_stops(self):
00123 received = {"msgs": []}
00124
00125 def cb(msg):
00126 received["msgs"].append(msg)
00127
00128 handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb))
00129
00130 self.assertTrue(handler.is_alive())
00131
00132 handler.finish()
00133
00134 self.assertFalse(handler.is_alive())
00135
00136 def test_queue_message_handler_queue(self):
00137 received = {"msgs": []}
00138
00139 def cb(msg):
00140 received["msgs"].append(msg)
00141
00142 msgs = range(1000)
00143
00144 handler = subscribe.MessageHandler(None, cb)
00145 handler.handle_message("hello")
00146 handler = handler.set_throttle_rate(10000)
00147 handler = handler.set_queue_length(10)
00148
00149 for x in msgs:
00150 handler.handle_message(x)
00151
00152 handler.set_throttle_rate(0)
00153
00154 time.sleep(0.1)
00155
00156 try:
00157 self.assertEqual(["hello"] + range(990, 1000), received["msgs"])
00158 except:
00159 handler.finish()
00160 raise
00161
00162 handler.finish()
00163
00164 def test_queue_message_handler_rate(self):
00165 received = {"msg": None}
00166
00167 def cb(msg):
00168 received["msg"] = msg
00169
00170 handler = subscribe.MessageHandler(None, cb)
00171 handler = handler.set_throttle_rate(50)
00172 handler = handler.set_queue_length(10)
00173
00174 for x in range(10):
00175 handler.handle_message(x)
00176
00177 time.sleep(0.025)
00178
00179 try:
00180 for x in range(10):
00181 self.assertEqual(x, received["msg"])
00182 time.sleep(0.05)
00183 except:
00184 handler.finish()
00185 raise
00186
00187 handler.finish()
00188
00189 def test_transitions(self):
00190 def test_default(handler):
00191 handler = handler.set_queue_length(0)
00192 handler = handler.set_throttle_rate(0)
00193 received = {"msgs": []}
00194 def cb(msg):
00195 received["msgs"].append(msg)
00196 handler.publish = cb
00197 xs = range(10000)
00198 for x in xs:
00199 handler.handle_message(x)
00200
00201 self.assertEqual(received["msgs"], xs)
00202 return handler
00203
00204 def test_throttle(handler, throttle_rate):
00205 received = {"msgs": []}
00206 def cb(msg):
00207 received["msgs"].append(msg)
00208 handler = handler.set_queue_length(0)
00209 handler = handler.set_throttle_rate(throttle_rate)
00210 handler.publish = cb
00211 x = 0
00212 for i in range(1, 10):
00213 fin = time.time() + throttle_rate / 1000.0
00214 while time.time() < fin:
00215 handler.handle_message(x)
00216 x = x + 1
00217 self.assertEqual(len(received["msgs"]), i)
00218 return handler
00219
00220 def test_queue(handler, throttle_rate, queue_length):
00221 received = {"msgs": []}
00222
00223 def cb(msg):
00224 received["msgs"].append(msg)
00225
00226 handler = handler.set_throttle_rate(throttle_rate)
00227 handler = handler.set_queue_length(queue_length)
00228
00229 throttle_rate = throttle_rate / 1000.0
00230
00231 time.sleep(throttle_rate + 0.01)
00232 handler.last_publish = time.time()
00233 last_msg = time.time() + throttle_rate / 4.0
00234
00235 handler.publish = cb
00236
00237 xs = range(1000)
00238 for x in xs:
00239 handler.handle_message(x)
00240
00241 try:
00242 for i in range(0, queue_length - 1):
00243 time.sleep(throttle_rate - time.time() + last_msg)
00244 last_msg = time.time()
00245 self.assertEqual(received["msgs"], xs[len(xs) - queue_length:len(xs) - queue_length + i + 1])
00246 except:
00247 handler.finish()
00248 raise
00249
00250 return handler
00251
00252
00253 handler = subscribe.MessageHandler(None, None)
00254 handler = test_queue(handler, 50, 10)
00255 handler.finish()
00256
00257 handler = subscribe.MessageHandler(None, None)
00258 handler = test_throttle(handler, 50)
00259 handler.finish()
00260
00261 handler = subscribe.MessageHandler(None, None)
00262 handler = test_default(handler)
00263 handler.finish()
00264
00265
00266 handler = subscribe.MessageHandler(None, None)
00267 handler = test_queue(handler, 50, 10)
00268 handler = test_throttle(handler, 50)
00269 handler = test_default(handler)
00270 handler.finish()
00271
00272 handler = subscribe.MessageHandler(None, None)
00273 handler = test_queue(handler, 50, 10)
00274 handler = test_default(handler)
00275 handler = test_throttle(handler, 50)
00276 handler.finish()
00277
00278 handler = subscribe.MessageHandler(None, None)
00279 handler = test_throttle(handler, 50)
00280 handler = test_queue(handler, 50, 10)
00281 handler = test_default(handler)
00282 handler.finish()
00283
00284 handler = subscribe.MessageHandler(None, None)
00285 handler = test_throttle(handler, 50)
00286 handler = test_default(handler)
00287 handler = test_queue(handler, 50, 10)
00288 handler.finish()
00289
00290 handler = subscribe.MessageHandler(None, None)
00291 handler = test_default(handler)
00292 handler = test_throttle(handler, 50)
00293 handler = test_queue(handler, 50, 10)
00294 handler.finish()
00295
00296 handler = subscribe.MessageHandler(None, None)
00297 handler = test_default(handler)
00298 handler = test_queue(handler, 50, 10)
00299 handler = test_throttle(handler, 50)
00300 handler.finish()
00301
00302
00303 handler = subscribe.MessageHandler(None, None)
00304 handler = test_queue(handler, 50, 10)
00305 handler = test_queue(handler, 100, 10)
00306 handler.finish()
00307
00308 handler = subscribe.MessageHandler(None, None)
00309 handler = test_throttle(handler, 50)
00310 handler = test_throttle(handler, 100)
00311 handler.finish()
00312
00313 handler = subscribe.MessageHandler(None, None)
00314 handler = test_default(handler)
00315 handler = test_default(handler)
00316 handler.finish()
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327 if __name__ == '__main__':
00328 import rostest
00329 rostest.rosrun(PKG, 'test_message_handlers', TestMessageHandlers)