00001
00002 from twisted.internet.defer import Deferred, DeferredQueue, inlineCallbacks, returnValue
00003 from twisted.internet import reactor
00004 from twisted.internet.protocol import Protocol
00005 from collections import deque
00006 from weakcallback import WeakCallbackCb
00007 import weakref
00008 import sys
00009 from event import Unsubscribe, Event
00010
00011
00012
00013 def async_sleep(t):
00014 d = Deferred()
00015 reactor.callLater(max(0, t), d.callback, None)
00016 return d
00017
00018 def event_queue(event):
00019 q = DeferredQueue()
00020 def cb(*args, **kwargs):
00021 q.put((args, kwargs))
00022 h = event.subscribe_repeating(cb)
00023 q.unsubscribe = h.unsubscribe
00024 return q
00025
00026 def now():
00027 d = Deferred()
00028 reactor.callLater(0, d.callback, None)
00029 return d
00030
00031 def wait_for_state(state, condition = None):
00032 d = Deferred()
00033 def cb(old_state, new_state):
00034
00035 if condition is None or condition(new_state):
00036
00037 d.callback(new_state)
00038 raise Unsubscribe
00039 state.subscribe(cb)
00040 return d
00041
00042 def wait_for_event(event, condition = None):
00043 d = Deferred()
00044 def cb(*args, **kwargs):
00045 if condition is None or condition(args, kwargs):
00046 d.callback(*args, **kwargs)
00047 raise Unsubscribe
00048 event.subscribe_repeating(cb)
00049 return d
00050
00051 class EventStream:
00052 """Event stream class to be used with select and switch."""
00053 def __init__(self, event = None):
00054 self._queue = deque()
00055 self._invoke_listener = None
00056 self.put = WeakCallbackCb(self._put)
00057 self.discard = False
00058 if event:
00059 event.subscribe_repeating(self.put)
00060
00061 def _put(*args, **kwargs):
00062 """Puts an element into the queue, and notifies an eventual
00063 listener. Use pop rather than _pop or you will create cycles and
00064 prevent reference counts from going to zero."""
00065 self = args[0]
00066 if self.discard:
00067 return
00068 self._queue.append((args[1:], kwargs))
00069 if self._invoke_listener:
00070 self._trigger()
00071
00072 def get(self):
00073 """Pops the next element off the queue."""
00074 return self._queue.popleft()
00075
00076 def _trigger(self):
00077 """Used internally to to trigger the callback."""
00078 self._invoke_listener.callback(None)
00079 self._invoke_listener = None
00080
00081 def listen(self):
00082 """Returns a Deferred that will be called the next time an event
00083 arrives, possibly immediately."""
00084 if self._invoke_listener:
00085 raise Exception("Event stream in use from multiple places simultaneously.")
00086 d = self._invoke_listener = Deferred()
00087 if self._queue:
00088 self._trigger()
00089 return d
00090
00091 def stop_listen(self):
00092 """Discards the Deferred set by listen."""
00093 assert self._invoke_listener or self._queue
00094 self._invoke_listener = None
00095
00096 def set_discard(self, discard):
00097 """Tells the event stream whether it should discard all queued and
00098 incoming events."""
00099 self.discard = discard
00100 if discard:
00101 self._queue = deque()
00102
00103 class EventStreamFromDeferred(EventStream):
00104 def __init__(self, d = None):
00105 EventStream.__init__(self)
00106 if d is None:
00107 d = Deferred()
00108 self.deferred = d
00109 d.addCallback(self.put)
00110
00111 def StateCondition(*args, **kwargs):
00112 return EventStreamFromDeferred(wait_for_state(*args, **kwargs))
00113
00114 def EventCondition(*args, **kwargs):
00115 return EventStreamFromDeferred(wait_for_event(*args, **kwargs))
00116
00117 class Timeout(EventStream):
00118 def __init__(self, timeout):
00119 EventStream.__init__(self)
00120 h = reactor.callLater(max(0, timeout), self.put)
00121 def cancel():
00122 if not h.called:
00123 h.cancel()
00124 self.put.set_deleted_cb(cancel)
00125
00126 class Now(EventStream):
00127 def __init__(self):
00128 EventStream.__init__(self)
00129 reactor.callLater(0, self.put)
00130
00131 class ReadDescrEventStream(EventStream):
00132 def __init__(self, portType, *args, **kwargs):
00133 EventStream.__init__(self)
00134 put = self.put
00135 class _ReadDescrEventStreamProto(Protocol):
00136 def dataReceived(self, data):
00137 put(data)
00138 self.port = reactor.listenWith(portType, _ReadDescrEventStreamProto(), *args, **kwargs)
00139 def cancel(port):
00140 port.stopListening()
00141 self.put.set_deleted_cb(cancel, self.port)
00142
00143 def recv(self):
00144 return self.get()[0][0]
00145
00146 @inlineCallbacks
00147 def select(*events):
00148 """Listens to the provided EventStreams, and returns a set of integers
00149 indicating which ones have data ready, as soon as there its at least
00150 one that is ready."""
00151 ready_list = []
00152 done = Deferred()
00153 done_called = []
00154 def _select_cb(_, i):
00155 ready_list.append(i)
00156 if not done_called:
00157
00158
00159 done_called.append(None)
00160 reactor.callLater(0, done.callback, None)
00161 for i in range(len(events)):
00162 events[i].listen().addCallback(_select_cb, i)
00163 try:
00164 yield done
00165 finally:
00166 for e in events:
00167 e.stop_listen()
00168 del events
00169 del e
00170 returnValue(ready_list)
00171
00172 @inlineCallbacks
00173 def switch(cases, multiple = False):
00174 events, actions = zip(*cases.iteritems())
00175
00176 ready_list = yield select(*events)
00177
00178 for i in ready_list:
00179 args, kwargs = events[i].get()
00180 if actions[i]:
00181 actions[i](*args, **kwargs)
00182 if not multiple:
00183 break
00184
00185 def wrap_function(f):
00186 def run_f(g):
00187 def run_g(*args, **kwargs):
00188 return f(g, *args, **kwargs)
00189 return run_g
00190 return run_f
00191
00192 @wrap_function
00193 def mainThreadCallback(f, *args, **kwargs):
00194 "Decorator that causes the function to be called in the main thread."
00195 reactor.callFromThread(f, *args, **kwargs)
00196
00197 @wrap_function
00198 def async_test(f, *args, **kwargs):
00199 "Starts an asynchronous test, waits for it to complete, and returns its result."
00200 result = []
00201 def cb(value, good):
00202 result.append(good)
00203 result.append(value)
00204 inlineCallbacks(f)(*args, **kwargs).addCallbacks(callback = cb, callbackArgs = [True],
00205 errback = cb, errbackArgs = [False])
00206 while not result:
00207 reactor.iterate(0.02)
00208 if result[0]:
00209
00210
00211
00212 return result[1]
00213 else:
00214 result[1].printTraceback()
00215 result[1].raiseException()
00216
00217 def unittest_with_reactor(run_ros_tests):
00218 exitval = []
00219 def run_test():
00220 try:
00221 if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="):
00222 import roslib; roslib.load_manifest('multi_interface_roam')
00223 global rostest
00224 import rostest
00225 run_ros_tests()
00226 else:
00227 import unittest
00228 unittest.main()
00229 exitval.append(0)
00230 except SystemExit, v:
00231 exitval.append(v.code)
00232 except:
00233 import traceback
00234 traceback.print_exc()
00235 finally:
00236 reactor.stop()
00237
00238 reactor.callWhenRunning(run_test)
00239 reactor.run()
00240 sys.exit(exitval[0])
00241
00242 if __name__ == "__main__":
00243 import unittest
00244 import sys
00245 import gc
00246 import threading
00247
00248
00249
00250 class EventStreamTest(unittest.TestCase):
00251 def test_dies_despite_cb(self):
00252 """Test that EventStream gets unallocated despite its callback
00253 being held. As without another reference, calling the callback
00254 will have no effect."""
00255 es = EventStream()
00256 esr = weakref.ref(es)
00257 putter = es.put
00258 l = []
00259 putter.set_deleted_cb(lambda : l.append('deleted'))
00260 self.assertEqual(l, [])
00261 self.assertEqual(esr(), es)
00262 del es
00263 self.assertEqual(l, ['deleted'])
00264 self.assertEqual(esr(), None)
00265
00266 @async_test
00267 def test_timeout_memory_frees_correctly(self):
00268 """Had a lot of subtle bugs getting select to free up properly.
00269 This test case is what pointed me at them."""
00270 before = 0
00271 after = 0
00272
00273
00274
00275 for iter in range(10):
00276 before = len(gc.get_objects())
00277
00278
00279 yield select(Timeout(0.001))
00280 after = len(gc.get_objects())
00281 if before == after:
00282 break
00283 self.assertEqual(before, after)
00284
00285 @async_test
00286 def test_timeout_autocancel(self):
00287 self.assertEqual(len(reactor.getDelayedCalls()), 0)
00288 yield select(Timeout(0.3), Timeout(0.001))
00289 self.assertEqual(len(reactor.getDelayedCalls()), 0)
00290
00291 @async_test
00292 def test_event_stream_from_event(self):
00293 e = Event()
00294 es = EventStream(e)
00295 e.trigger('hello world')
00296 yield select(es)
00297 self.assertEqual(es.get(), (('hello world',),{}))
00298
00299 class SelectTest(unittest.TestCase):
00300 @async_test
00301 def test_select_param1(self):
00302 "Tests that the first parameter can get returned."
00303 self.assertEqual((yield (select(Timeout(.01), Timeout(100)))), [0])
00304
00305 @async_test
00306 def test_select_param2(self):
00307 "Tests that the second parameter can get returned."
00308 self.assertEqual((yield (select(Timeout(100), Timeout(.01)))), [1])
00309
00310 @async_test
00311 def test_select_both(self):
00312 "Tests that the both parameters can get returned."
00313 es1 = EventStream()
00314 es2 = EventStream()
00315 es1.put(None)
00316 es2.put(None)
00317 self.assertEqual((yield select(es1, es2)), [0, 1])
00318
00319 class SwitchTest(unittest.TestCase):
00320 @async_test
00321 def test_switch_param(self):
00322 "Tests switch on single outcome."
00323 yield switch({
00324 Timeout(.01): lambda : None,
00325 Timeout(100): lambda : self.fail('Wrong switch'),
00326 })
00327
00328 @async_test
00329 def test_switch_empty_action(self):
00330 "Tests that a None action doesn't cause an exception."
00331 yield switch({
00332 Timeout(.01): None,
00333 })
00334
00335 @async_test
00336 def test_switch_both_single(self):
00337 "Tests switch on simultaneous, non-multiple."
00338 es1 = EventStream()
00339 es2 = EventStream()
00340 es1.put()
00341 es2.put()
00342 hits = []
00343 yield switch({
00344 es1: lambda : hits.append(None),
00345 es2: lambda : hits.append(None),
00346 })
00347 self.assertEqual(len(hits), 1)
00348
00349 @async_test
00350 def test_switch_both_multiple(self):
00351 "Tests switch on simultaneous, multiple."
00352 es1 = EventStreamFromDeferred()
00353 es2 = EventStreamFromDeferred()
00354 es1.deferred.callback(None)
00355 es2.deferred.callback(None)
00356 hits = []
00357 yield switch({
00358 es1: lambda _: hits.append(None),
00359 es2: lambda _: hits.append(None),
00360 }, multiple = True)
00361 self.assertEqual(len(hits), 2)
00362
00363 @async_test
00364 def test_switch_parameters(self):
00365 "Tests that switch passes parameters correctly."
00366 es = EventStream()
00367 es.put(3)
00368 es.put(4)
00369 yield switch({
00370 es: lambda v: self.assertEqual(v, 3),
00371 }, multiple = True)
00372 yield switch({
00373 es: lambda v: self.assertEqual(v, 4),
00374 }, multiple = True)
00375 self.assertRaises(IndexError, es._queue.pop)
00376
00377 class DecoratorTest(unittest.TestCase):
00378 def test_main_thread_callback(self):
00379 "Tests that mainThreadCallback works."
00380 done = []
00381 @mainThreadCallback
00382 def cb(*args, **kwargs):
00383 done.append((args, kwargs))
00384 threading.Thread(target=cb, args=[1,2], kwargs={'a':'b'}).start()
00385 for i in range(0, 100):
00386 if done:
00387 break
00388 reactor.iterate(0.02)
00389 self.assertEqual(done, [((1,2), {'a':'b'})])
00390
00391 def run_ros_tests():
00392 rostest.unitrun('multi_interface_roam', 'eventstream', EventStreamTest)
00393 rostest.unitrun('multi_interface_roam', 'select', SelectTest)
00394 rostest.unitrun('multi_interface_roam', 'switch', SwitchTest)
00395 rostest.unitrun('multi_interface_roam', 'decorators', DecoratorTest)
00396
00397 unittest_with_reactor(run_ros_tests)