async_helpers.py
Go to the documentation of this file.
00001 #! /usr/bin/env python 
00002 from twisted.internet.defer import Deferred, DeferredQueue, inlineCallbacks, returnValue
00003 from twisted.internet import reactor
00004 from twisted.internet.protocol import Protocol
00005 from collections import deque
00006 from weakcallback import WeakCallbackCb
00007 import weakref
00008 import sys
00009 from event import Unsubscribe, Event
00010 
00011 # FIXME Add test for ReadDescrEventStream
00012 
00013 def async_sleep(t):
00014     d = Deferred()
00015     reactor.callLater(max(0, t), d.callback, None)
00016     return d
00017 
00018 def event_queue(event):
00019     q = DeferredQueue()
00020     def cb(*args, **kwargs):
00021         q.put((args, kwargs))
00022     h = event.subscribe_repeating(cb)
00023     q.unsubscribe = h.unsubscribe
00024     return q
00025 
00026 def now():
00027     d = Deferred()
00028     reactor.callLater(0, d.callback, None)
00029     return d
00030 
00031 def wait_for_state(state, condition = None):
00032     d = Deferred()
00033     def cb(old_state, new_state):
00034         #print "wait_for_state, cb", old_state, new_state, condition 
00035         if condition is None or condition(new_state):
00036             #print "wait_for_state, cb, hit"
00037             d.callback(new_state)
00038             raise Unsubscribe
00039     state.subscribe(cb)
00040     return d
00041 
00042 def wait_for_event(event, condition = None):
00043     d = Deferred()
00044     def cb(*args, **kwargs):
00045         if condition is None or condition(args, kwargs):
00046             d.callback(*args, **kwargs)
00047             raise Unsubscribe
00048     event.subscribe_repeating(cb)
00049     return d
00050 
00051 class EventStream:
00052     """Event stream class to be used with select and switch."""
00053     def __init__(self, event = None):
00054         self._queue = deque()
00055         self._invoke_listener = None
00056         self.put = WeakCallbackCb(self._put)
00057         self.discard = False
00058         if event:
00059             event.subscribe_repeating(self.put)
00060 
00061     def _put(*args, **kwargs):
00062         """Puts an element into the queue, and notifies an eventual
00063         listener. Use pop rather than _pop or you will create cycles and
00064         prevent reference counts from going to zero."""
00065         self = args[0]
00066         if self.discard:
00067             return
00068         self._queue.append((args[1:], kwargs))
00069         if self._invoke_listener:
00070             self._trigger()
00071 
00072     def get(self):
00073         """Pops the next element off the queue."""
00074         return self._queue.popleft()
00075     
00076     def _trigger(self):
00077         """Used internally to to trigger the callback."""
00078         self._invoke_listener.callback(None)
00079         self._invoke_listener = None
00080 
00081     def listen(self):
00082         """Returns a Deferred that will be called the next time an event
00083         arrives, possibly immediately."""
00084         if self._invoke_listener:
00085             raise Exception("Event stream in use from multiple places simultaneously.")
00086         d = self._invoke_listener = Deferred()
00087         if self._queue:
00088             self._trigger()
00089         return d # Using d because self._trigger() may change self._invoke_listener
00090 
00091     def stop_listen(self):
00092         """Discards the Deferred set by listen."""
00093         assert self._invoke_listener or self._queue
00094         self._invoke_listener = None
00095 
00096     def set_discard(self, discard):
00097         """Tells the event stream whether it should discard all queued and
00098         incoming events."""
00099         self.discard = discard
00100         if discard:
00101             self._queue = deque()
00102 
00103 class EventStreamFromDeferred(EventStream):
00104     def __init__(self, d = None):
00105         EventStream.__init__(self)
00106         if d is None:
00107             d = Deferred()
00108         self.deferred = d
00109         d.addCallback(self.put)
00110 
00111 def StateCondition(*args, **kwargs):
00112     return EventStreamFromDeferred(wait_for_state(*args, **kwargs))
00113 
00114 def EventCondition(*args, **kwargs):
00115     return EventStreamFromDeferred(wait_for_event(*args, **kwargs))
00116 
00117 class Timeout(EventStream):
00118     def __init__(self, timeout):
00119         EventStream.__init__(self)
00120         h = reactor.callLater(max(0, timeout), self.put)
00121         def cancel():
00122             if not h.called:
00123                 h.cancel()
00124         self.put.set_deleted_cb(cancel)
00125 
00126 class Now(EventStream):
00127     def __init__(self):
00128         EventStream.__init__(self)
00129         reactor.callLater(0, self.put)
00130 
00131 class ReadDescrEventStream(EventStream):
00132     def __init__(self, portType, *args, **kwargs):
00133         EventStream.__init__(self)
00134         put = self.put
00135         class _ReadDescrEventStreamProto(Protocol):
00136             def dataReceived(self, data):
00137                 put(data)
00138         self.port = reactor.listenWith(portType, _ReadDescrEventStreamProto(), *args, **kwargs)
00139         def cancel(port):
00140             port.stopListening()
00141         self.put.set_deleted_cb(cancel, self.port)
00142 
00143     def recv(self):
00144         return self.get()[0][0]
00145     
00146 @inlineCallbacks
00147 def select(*events):
00148     """Listens to the provided EventStreams, and returns a set of integers
00149     indicating which ones have data ready, as soon as there its at least
00150     one that is ready."""
00151     ready_list = []
00152     done = Deferred()
00153     done_called = []
00154     def _select_cb(_, i):
00155         ready_list.append(i)
00156         if not done_called:
00157             # We don't do the callback directly or else cycles tend to form
00158             # which cause delays in releasing objects. 
00159             done_called.append(None)
00160             reactor.callLater(0, done.callback, None)
00161     for i in range(len(events)):
00162         events[i].listen().addCallback(_select_cb, i)
00163     try:
00164         yield done
00165     finally:
00166         for e in events:
00167             e.stop_listen()
00168         del events # Needed to avoid creating a cycle when events gets put
00169         del e      # into the traceback associated with returnValue.
00170     returnValue(ready_list)
00171 
00172 @inlineCallbacks
00173 def switch(cases, multiple = False):
00174     events, actions = zip(*cases.iteritems())
00175 
00176     ready_list = yield select(*events)
00177 
00178     for i in ready_list:
00179         args, kwargs = events[i].get()
00180         if actions[i]:
00181             actions[i](*args, **kwargs)
00182         if not multiple:
00183             break
00184 
00185 def wrap_function(f):
00186     def run_f(g):
00187         def run_g(*args, **kwargs):
00188             return f(g, *args, **kwargs)
00189         return run_g
00190     return run_f
00191 
00192 @wrap_function
00193 def mainThreadCallback(f, *args, **kwargs):
00194     "Decorator that causes the function to be called in the main thread."
00195     reactor.callFromThread(f, *args, **kwargs)
00196 
00197 @wrap_function
00198 def async_test(f, *args, **kwargs):
00199     "Starts an asynchronous test, waits for it to complete, and returns its result."
00200     result = []
00201     def cb(value, good):
00202         result.append(good)
00203         result.append(value)
00204     inlineCallbacks(f)(*args, **kwargs).addCallbacks(callback = cb, callbackArgs = [True],
00205                                     errback  = cb, errbackArgs  = [False])
00206     while not result:
00207         reactor.iterate(0.02)
00208     if result[0]:
00209         # Uncomment the following line to check that all the tests
00210         # really are being run to completion.
00211         #raise(Exception("Success"))
00212         return result[1]
00213     else:
00214         result[1].printTraceback()
00215         result[1].raiseException()
00216 
00217 def unittest_with_reactor(run_ros_tests):
00218     exitval = []
00219     def run_test():
00220         try:
00221             if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="):
00222                 import roslib; roslib.load_manifest('multi_interface_roam')
00223                 global rostest
00224                 import rostest
00225                 run_ros_tests()
00226             else:
00227                 import unittest
00228                 unittest.main()
00229             exitval.append(0)
00230         except SystemExit, v:
00231             exitval.append(v.code)
00232         except:
00233             import traceback
00234             traceback.print_exc()
00235         finally:
00236             reactor.stop()
00237 
00238     reactor.callWhenRunning(run_test)
00239     reactor.run()
00240     sys.exit(exitval[0])
00241     
00242 if __name__ == "__main__":
00243     import unittest
00244     import sys
00245     import gc
00246     import threading
00247     #from twisted.internet.defer import setDebugging
00248     #setDebugging(True)
00249 
00250     class EventStreamTest(unittest.TestCase):
00251         def test_dies_despite_cb(self):
00252             """Test that EventStream gets unallocated despite its callback
00253             being held. As without another reference, calling the callback
00254             will have no effect."""
00255             es = EventStream()
00256             esr = weakref.ref(es)
00257             putter = es.put
00258             l = []
00259             putter.set_deleted_cb(lambda : l.append('deleted'))
00260             self.assertEqual(l, [])
00261             self.assertEqual(esr(), es)
00262             del es
00263             self.assertEqual(l, ['deleted'])
00264             self.assertEqual(esr(), None)
00265 
00266         @async_test
00267         def test_timeout_memory_frees_correctly(self):
00268             """Had a lot of subtle bugs getting select to free up properly.
00269             This test case is what pointed me at them."""
00270             before = 0
00271             after = 0
00272 
00273             # First few times through seems to create some new data, so
00274             # retry. (#4518)
00275             for iter in range(10):
00276                 before = len(gc.get_objects())
00277                 # The yielding statement seems necessary, some stuff only gets
00278                 # cleaned up when going through the reactor main loop.
00279                 yield select(Timeout(0.001))
00280                 after = len(gc.get_objects())
00281                 if before == after:
00282                     break
00283             self.assertEqual(before, after)
00284             
00285         @async_test
00286         def test_timeout_autocancel(self):
00287             self.assertEqual(len(reactor.getDelayedCalls()), 0)
00288             yield select(Timeout(0.3), Timeout(0.001))
00289             self.assertEqual(len(reactor.getDelayedCalls()), 0)
00290         
00291         @async_test
00292         def test_event_stream_from_event(self):
00293             e = Event()
00294             es = EventStream(e)
00295             e.trigger('hello world')
00296             yield select(es)
00297             self.assertEqual(es.get(), (('hello world',),{}))
00298             
00299     class SelectTest(unittest.TestCase):
00300         @async_test
00301         def test_select_param1(self):
00302             "Tests that the first parameter can get returned."
00303             self.assertEqual((yield (select(Timeout(.01), Timeout(100)))), [0])
00304         
00305         @async_test
00306         def test_select_param2(self):
00307             "Tests that the second parameter can get returned."
00308             self.assertEqual((yield (select(Timeout(100), Timeout(.01)))), [1])
00309         
00310         @async_test
00311         def test_select_both(self):
00312             "Tests that the both parameters can get returned."
00313             es1 = EventStream()
00314             es2 = EventStream()
00315             es1.put(None)
00316             es2.put(None)
00317             self.assertEqual((yield select(es1, es2)), [0, 1])
00318      
00319     class SwitchTest(unittest.TestCase):
00320         @async_test
00321         def test_switch_param(self):
00322             "Tests switch on single outcome."
00323             yield switch({
00324                 Timeout(.01): lambda : None,
00325                 Timeout(100): lambda : self.fail('Wrong switch'),
00326                 })
00327 
00328         @async_test
00329         def test_switch_empty_action(self):
00330             "Tests that a None action doesn't cause an exception."
00331             yield switch({
00332                 Timeout(.01): None,
00333                 })
00334 
00335         @async_test
00336         def test_switch_both_single(self):
00337             "Tests switch on simultaneous, non-multiple."
00338             es1 = EventStream()
00339             es2 = EventStream()
00340             es1.put()
00341             es2.put()
00342             hits = []
00343             yield switch({
00344                 es1: lambda : hits.append(None),
00345                 es2: lambda : hits.append(None),
00346                     })
00347             self.assertEqual(len(hits), 1)
00348         
00349         @async_test
00350         def test_switch_both_multiple(self):
00351             "Tests switch on simultaneous, multiple."
00352             es1 = EventStreamFromDeferred()
00353             es2 = EventStreamFromDeferred()
00354             es1.deferred.callback(None)
00355             es2.deferred.callback(None)
00356             hits = []
00357             yield switch({
00358                 es1: lambda _: hits.append(None),
00359                 es2: lambda _: hits.append(None),
00360                     }, multiple = True)
00361             self.assertEqual(len(hits), 2)
00362 
00363         @async_test
00364         def test_switch_parameters(self):
00365             "Tests that switch passes parameters correctly."
00366             es = EventStream()
00367             es.put(3)
00368             es.put(4)
00369             yield switch({
00370                 es: lambda v: self.assertEqual(v, 3),
00371                     }, multiple = True)
00372             yield switch({
00373                 es: lambda v: self.assertEqual(v, 4),
00374                     }, multiple = True)
00375             self.assertRaises(IndexError, es._queue.pop)
00376 
00377     class DecoratorTest(unittest.TestCase):
00378         def test_main_thread_callback(self):
00379             "Tests that mainThreadCallback works."
00380             done = []
00381             @mainThreadCallback
00382             def cb(*args, **kwargs):
00383                 done.append((args, kwargs))
00384             threading.Thread(target=cb, args=[1,2], kwargs={'a':'b'}).start()
00385             for i in range(0, 100):
00386                 if done:
00387                     break
00388                 reactor.iterate(0.02)
00389             self.assertEqual(done, [((1,2), {'a':'b'})])
00390 
00391     def run_ros_tests():
00392         rostest.unitrun('multi_interface_roam', 'eventstream', EventStreamTest)
00393         rostest.unitrun('multi_interface_roam', 'select', SelectTest)
00394         rostest.unitrun('multi_interface_roam', 'switch', SwitchTest)
00395         rostest.unitrun('multi_interface_roam', 'decorators', DecoratorTest)
00396 
00397     unittest_with_reactor(run_ros_tests)


multi_interface_roam
Author(s): Blaise Gassend
autogenerated on Thu Jan 2 2014 11:26:15