event.py
Go to the documentation of this file.
00001 #! /usr/bin/env python 
00002 
00003 from __future__ import with_statement
00004 
00005 import thread
00006 import weakref
00007 
00008 # TODO:
00009 # - Add an exception that allows you to unsubscribe from a callback.
00010 
00011 class MultipleUnsubscribe(Exception):
00012     pass
00013 
00014 class Unsubscribe(Exception):
00015     pass
00016 
00017 class _LivenessSensor():
00018     def __init__(self):
00019         self._cb = None
00020 
00021     def set_cb(self, cb = None, *args, **kwargs):
00022         self._cb = cb
00023         self._args = args
00024         self._kwargs = kwargs
00025         if not cb:
00026             self._args = None
00027             self._kwargs = None
00028 
00029     def __del__(self):
00030         if self._cb:
00031             self._cb(*self._args, **self._kwargs)
00032 
00033 class EventCallbackHandle:
00034     def __init__(self, dispatcher):
00035         self._dispatcher = weakref.ref(dispatcher)
00036         self._auto_unsubscribed = False
00037 
00038     def unsubscribe(self):
00039         if not self._auto_unsubscribed:
00040             d = self._dispatcher()
00041             if d:
00042                 d._unsubscribe(self)
00043 
00044     def _auto_unsubscribed():
00045         self._auto_unsubscribed = True
00046         del self._dispatcher
00047 
00048 class _EventDispatcher:
00049     def __init__(self):
00050         self._subscribers = {}
00051 
00052     def _unsubscribe(self, h):
00053         try:
00054             del self._subscribers[h]
00055         except KeyError:
00056             raise MultipleUnsubscribe("Tried to unsubscribe from event more than once.")
00057 
00058     def _trigger(self, *args, **kwargs):
00059         for (h, (cb, firstargs, firstkwargs, repeating, _)) in self._subscribers.items():
00060             if not h in self._subscribers:
00061                 continue # Don't call if deleted during triggering.
00062             # Prepare the positional parameters
00063             allargs = firstargs + args
00064             # Prepare the named parameters. Parameters given to
00065             # the subscribe call have precedence.
00066             allkwargs = dict(kwargs)
00067             allkwargs.update(firstkwargs)
00068             try:
00069                 cb(*allargs, **allkwargs)
00070             except Unsubscribe:
00071                 repeating = False
00072             except:
00073                 import traceback
00074                 traceback.print_exc()
00075             if not repeating:
00076                 del self._subscribers[h]
00077                 h._auto_unsubscribed = True
00078 
00079 class Event:
00080     def __init__(self):
00081         self._liveness_sensor = _LivenessSensor()
00082         self._dispatcher = _EventDispatcher()
00083         self.trigger = self._dispatcher._trigger
00084         self.set_close_cb = self._liveness_sensor.set_cb
00085     
00086     def subscribe(self, cb, args, kwargs, repeating = True):
00087         """Subscribes to an event. 
00088         
00089         Can be called at any time and from any thread. Subscriptions that
00090         occur while an event is being triggered will not be called until
00091         the next time the event is triggered."""
00092 
00093         h = EventCallbackHandle(self._dispatcher)
00094         self._dispatcher._subscribers[h] = (cb, args, kwargs, repeating, self._liveness_sensor)
00095         return h
00096     
00097     def subscribe_once(*args, **kwargs):
00098         # We don't want the names we use to limit what the user can put in
00099         # kwargs. So do all our arguments positional.
00100         return args[0].subscribe(args[1], args[2:], kwargs, repeating = False)
00101 
00102     def subscribe_repeating(*args, **kwargs):
00103         # We don't want the names we use to limit what the user can put in
00104         # kwargs. So do all our arguments positional.
00105         return args[0].subscribe(args[1], args[2:], kwargs, repeating = True)
00106     
00107     def unsubscribe_all(self):
00108         for h in self._dispatcher._subscribers.keys():
00109             self._dispatcher._unsubscribe(h)
00110 
00111 if __name__ == "__main__":
00112     import unittest
00113     import sys
00114         
00115     def append_cb(l, *args, **kwargs):
00116         l.append((args, kwargs))
00117             
00118     class Unsubscriber:
00119         def __init__(self, l, blocking):
00120             self.l = l
00121             if blocking:
00122                 raise Exception("blocking unsubscribe has been removed")
00123 
00124         def cb(self, *args, **kwargs):
00125             append_cb(self.l, *args, **kwargs)
00126             self.h.unsubscribe()
00127 
00128     class BasicTest(unittest.TestCase):
00129         def test_basic(self):
00130             """Tests basic functionality.
00131             
00132             Adds a couple of callbacks. Makes sure they are called the
00133             right number of times. Checks that parameters are correct,
00134             including keyword arguments giving priority to subscribe over
00135             trigger."""
00136             e = Event()
00137             l1 = []
00138             l2 = []
00139             h1 = e.subscribe_repeating(append_cb, l1, 'd', e = 'f')
00140             e.subscribe_once(append_cb, l2, 'a', b = 'c')
00141             e.trigger('1', g = 'h')
00142             e.trigger('2', e = 'x')
00143             h1.unsubscribe()
00144             e.trigger('3')
00145             sys.stdout.flush()
00146             self.assertEqual(l1, [
00147                 (('d', '1'), { 'e' : 'f', 'g' : 'h'}), 
00148                 (('d', '2'), { 'e' : 'f'}),
00149                 ])
00150             self.assertEqual(l2, [
00151                 (('a', '1'), { 'b' : 'c', 'g': 'h'}),
00152                 ])
00153 
00154 #        def test_subscription_change(self):
00155 #            """Test that the _subscription_change is called appropriately."""
00156 #            l = []
00157 #            class SubChangeEvent(Event):
00158 #                def _subscription_change(self):
00159 #                    l.append(len(self._subscribers))
00160 #            e = SubChangeEvent()
00161 #            h1 = e.subscribe_repeating(None)
00162 #            h2 = e.subscribe_repeating(None)
00163 #            h1.unsubscribe()
00164 #            h3 = e.subscribe_repeating(None)
00165 #            h2.unsubscribe()
00166 #            h3.unsubscribe()
00167 #            self.assertEqual(l, [0, 1, 2, 1, 2, 1, 0])
00168 
00169         def test_unsub_myself(self):
00170             """Tests that a callback can unsubscribe itself."""
00171             e = Event()
00172             l = []
00173             u = Unsubscriber(l, False)
00174             u.h = e.subscribe_repeating(u.cb)
00175             e.trigger('t1')
00176             e.trigger('t2')
00177             self.assertEqual(l, [
00178                 (('t1',), {}),
00179                 ])
00180 
00181         def test_multiple_unsubscribe_repeating(self):
00182             """Tests exceptoin on multiple unsubscribe for repeating subscribers."""
00183             e = Event()
00184             h = e.subscribe_repeating(None)
00185             h.unsubscribe()
00186             self.assertRaises(MultipleUnsubscribe, h.unsubscribe)
00187 
00188         def test_multiple_unsubscribe_once(self):
00189             """Tests exceptoin on multiple unsubscribe for non-repeating subscribers."""
00190             e = Event()
00191             h = e.subscribe_repeating(None)
00192             h.unsubscribe()
00193             self.assertRaises(MultipleUnsubscribe, h.unsubscribe)
00194 
00195         def test_unsubscribe_all(self):
00196             """Tests basic unsubscribe_all functionality."""
00197             e = Event()
00198             e.subscribe_repeating(None)
00199             e.subscribe_repeating(None)
00200             e.subscribe_repeating(None)
00201             e.subscribe_repeating(None)
00202             e.subscribe_repeating(None)
00203             e.unsubscribe_all()
00204             self.assertEqual(len(e._dispatcher._subscribers), 0)
00205 
00206 #        def test_unsub_myself_blocking(self):
00207 #            """Tests that a blocking unsubscribe on myself raises exception."""
00208 #            e = Event()
00209 #            l = []
00210 #            u = Unsubscriber(l, True)
00211 #            u.h = e.subscribe_repeating(u.cb)
00212 #            self.assertRaises(DeadlockException, e.trigger, ['t1'])
00213 
00214         def test_unsub_myself_nonblocking(self):
00215             """Tests that a nonblocking unsubscribe on myself does not raise."""
00216             e = Event()
00217             l = []
00218             u = Unsubscriber(l, False)
00219             u.h = e.subscribe_repeating(u.cb)
00220             e.trigger('t1')
00221         
00222         def test_norun_sub_during_trig(self):
00223             """Tests that a callback that gets added during a trigger is
00224             not run."""
00225             e = Event()
00226             l = []
00227             def add_cb(iter):
00228                 l.append(('add_cb', iter))
00229                 e.subscribe_repeating(append_cb, l)
00230             h = e.subscribe_repeating(add_cb)
00231             e.trigger('v1')
00232             h.unsubscribe()
00233             e.trigger('v2')
00234             self.assertEqual(l, [('add_cb', 'v1'), (('v2', ), {})])
00235         
00236         def test_norun_unsub_during_trig(self):
00237             """Tests that a callback that gets deleted during a trigger is
00238             not run."""
00239             e = Event()
00240             l = []
00241             def rm_cb():
00242                 l.append('rm')
00243                 e.unsubscribe_all()
00244             e.subscribe_repeating(rm_cb)
00245             e.subscribe_repeating(rm_cb)
00246             e.trigger()
00247             self.assertEqual(l, ['rm'])
00248 
00249         def test_liveness_sensor_no_source(self):
00250             """Tests when the liveness sensor gets set off when the event
00251             disappears and there is no event source."""
00252             e = Event()
00253             l = []
00254             e.set_close_cb(l.append, 'closed')
00255             e.subscribe_repeating(lambda : None)
00256             self.assertEqual(l, [])
00257             del e
00258             self.assertEqual(l, ['closed'])
00259 
00260         def test_liveness_sensor_with_source(self):
00261             """Tests when the liveness sensor gets set off when the event
00262             disappears and there is an event source."""
00263             e = Event()
00264             t = e.trigger
00265             l = []
00266             e.set_close_cb(l.append, 'closed')
00267             h = e.subscribe_repeating(lambda : None)
00268             del e
00269             self.assertEqual(l, [])
00270             h.unsubscribe()
00271             self.assertEqual(l, ['closed'])
00272 
00273         def test_liveness_sensor_with_dropping_source(self):
00274             """Tests when the liveness sensor gets set off when the event
00275             disappears and there is an event source."""
00276             e = Event()
00277             t = e.trigger
00278             l = []
00279             e.set_close_cb(l.append, 'closed')
00280             h = e.subscribe_repeating(lambda : None)
00281             del e
00282             self.assertEqual(l, [])
00283             del t
00284             self.assertEqual(l, ['closed'])
00285 
00286     if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="):
00287         import roslib; roslib.load_manifest('multi_interface_roam')
00288         import rostest
00289         rostest.unitrun('multi_interface_roam', 'event_basic', BasicTest)
00290     else:
00291         unittest.main()


multi_interface_roam
Author(s): Blaise Gassend
autogenerated on Thu Apr 24 2014 15:34:18