event.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 from __future__ import with_statement
00004 
00005 import thread
00006 import threading
00007 import weakref
00008 
00009 # TODO:
00010 # - Add support for using a reactor instead of running directly.
00011 # - Add an exception that allows you to unsubscribe from a callback.
00012 
00013 class DeadlockException(Exception):
00014     pass
00015 
00016 class MultipleUnsubscribe(Exception):
00017     pass
00018             
00019 class ReentrantDetectingLockEnterHelper:
00020     def __init__(self, parent):
00021         self.parent = parent
00022 
00023     def __enter__(self):
00024         pass
00025 
00026     def __exit__(self, *args):
00027         return self.parent.release()
00028 
00029 class ReentrantDetectingLock():
00030     def __init__(self, *args, **kwargs):
00031         self._lock = threading.Lock(*args, **kwargs)
00032         self._owner = None
00033 
00034     def acquire(self, blocking=1):
00035         ti = thread.get_ident()
00036         if self._owner == ti and blocking:
00037             raise DeadlockException("Tried to recursively lock a non recursive lock.")
00038         if self._lock.acquire(blocking):
00039             self._owner = ti
00040             return True
00041         else:
00042             return False
00043 
00044     def release(self):
00045         self._owner = None
00046         self._lock.release()
00047 
00048     def __enter__(self):
00049         self.acquire()
00050 
00051     def __exit__(self, *args):
00052         self.release()
00053 
00054     def __call__(self, msg):
00055         try:
00056             self.acquire()
00057             return ReentrantDetectingLockEnterHelper(self)
00058         except DeadlockException:
00059             raise DeadlockException(msg)
00060 
00061 class EventCallbackHandle:
00062     def __init__(self, event, cb, args, kwargs, repeating):
00063         # Use a weakref to avoid creating a loop for the GC.
00064         self._event = weakref.ref(event)
00065         self._cb = cb
00066         self._args = args
00067         self._kwargs = kwargs
00068         self._call_lock = ReentrantDetectingLock()
00069         self._repeating = repeating
00070 
00071     def __enter__(self):
00072         return self
00073 
00074     def __exit__(self, *args):
00075         self.unsubscribe()
00076 
00077     def _trigger(self, args, kwargs):
00078         # Prepare the positional parameters
00079         allargs = self._args + args
00080     
00081         # Prepare the named parameters. Parameters given to
00082         # the subscribe call have precedence.
00083         allkwargs = dict(kwargs)
00084         allkwargs.update(self._kwargs)
00085     
00086         # Call the callback
00087         cb = self._cb
00088         with self._call_lock("Callback recursively triggered itself."):
00089             if cb is not None:
00090                 if not self._repeating:
00091                     self.unsubscribe(blocking = False, _not_called_from_trigger = False)
00092                 cb(*allargs, **allkwargs)
00093     
00094     def unsubscribe(self, blocking = True, _not_called_from_trigger = True):
00095         # Kill as many references as we can.
00096         # Clear _cb first as it is the one that gets checked in _trigger. 
00097         self._cb = None
00098         self._args = ()
00099         self._kwargs = ()
00100         event = self._event()
00101         if event is not None:
00102             try:
00103                 del event._subscribers[self]
00104                 event._subscription_change()
00105             except KeyError:
00106                 # Don't check once callbacks because there is a race
00107                 # between manual deletion and automatic deletion. If a once
00108                 # callback is manually cleared, _repeating will be true, so
00109                 # we will catch the second attempt.
00110                 if self._repeating and _not_called_from_trigger:
00111                     raise MultipleUnsubscribe("Repeating callback unsubscribed multiple times.")
00112         # If not an automatic unsubscribe then set _repeating to true so
00113         # that the next unsubscribe can raise a MultipleUnsubscribe
00114         # exception. 
00115         if _not_called_from_trigger:
00116             self._repeating = True
00117         if blocking:
00118             # Wait until the _call_lock gets released.
00119             with self._call_lock("Callback tried to blocking unsubscribe itself."):
00120                 pass
00121 
00122 class Event:
00123     def __init__(self, name = "Unnamed Event"): 
00124         self._name = name
00125         self._subscribers = {}
00126         self._subscription_change()
00127 
00128     def subscribe(self, cb, args, kwargs, repeating = True):
00129         """Subscribes to an event. 
00130         
00131         Can be called at any time and from any thread. Subscriptions that
00132         occur while an event is being triggered will not be called until
00133         the next time the event is triggered."""
00134 
00135         h = EventCallbackHandle(self, cb, args, kwargs, repeating)
00136         self._subscribers[h] = None
00137         self._subscription_change()
00138         return h
00139     
00140     def subscribe_once(*args, **kwargs):
00141         # We don't want the names we use to limit what the user can put in
00142         # kwargs. So do all our arguments positional.
00143         return args[0].subscribe(args[1], args[2:], kwargs, repeating = False)
00144 
00145     def subscribe_repeating(*args, **kwargs):
00146         # We don't want the names we use to limit what the user can put in
00147         # kwargs. So do all our arguments positional.
00148         return args[0].subscribe(args[1], args[2:], kwargs, repeating = True)
00149 
00150     def trigger(*args, **kwargs):
00151         """Triggers an event.
00152 
00153         Concurrent triggers of a given callback are serialized using a lock, so 
00154         triggering from a callback will cause a deadlock."""
00155         
00156         self = args[0]
00157         args = args[1:]
00158 
00159         for h in self._subscribers.keys():
00160             h._trigger(args, kwargs)
00161 
00162     def _subscription_change(self):
00163         """Called at the end of each subscribe/unsubscribe. Can be
00164         overloaded in derived classes."""
00165         pass
00166 
00167     def unsubscribe_all(self, blocking = True):
00168         """Unsubscribes all subscribers that were present at the start of
00169         the call."""
00170         subs = self._subscribers.keys()
00171         for s in subs:
00172             s._repeating = False
00173             s.unsubscribe(blocking, _not_called_from_trigger = False)
00174 
00175 if __name__ == "__main__":
00176     import unittest
00177     import sys
00178         
00179     def append_cb(l, *args, **kwargs):
00180         l.append((args, kwargs))
00181             
00182     class Unsubscriber:
00183         def __init__(self, l, blocking):
00184             self.l = l
00185             self.blocking = blocking
00186 
00187         def cb(self, *args, **kwargs):
00188             append_cb(self.l, *args, **kwargs)
00189             self.h.unsubscribe(blocking = self.blocking)
00190 
00191     class BasicTest(unittest.TestCase):
00192         def test_basic(self):
00193             """Tests basic functionality.
00194             
00195             Adds a couple of callbacks. Makes sure they are called the
00196             right number of times. Checks that parameters are correct,
00197             including keyword arguments giving priority to subscribe over
00198             trigger."""
00199             e = Event()
00200             l1 = []
00201             l2 = []
00202             h1 = e.subscribe_repeating(append_cb, l1, 'd', e = 'f')
00203             e.subscribe_once(append_cb, l2, 'a', b = 'c')
00204             e.trigger('1', g = 'h')
00205             e.trigger('2', e = 'x')
00206             h1.unsubscribe()
00207             e.trigger('3')
00208             sys.stdout.flush()
00209             self.assertEqual(l1, [
00210                 (('d', '1'), { 'e' : 'f', 'g' : 'h'}), 
00211                 (('d', '2'), { 'e' : 'f'}),
00212                 ])
00213             self.assertEqual(l2, [
00214                 (('a', '1'), { 'b' : 'c', 'g': 'h'}),
00215                 ])
00216 
00217         def test_subscription_change(self):
00218             """Test that the _subscription_change is called appropriately."""
00219             l = []
00220             class SubChangeEvent(Event):
00221                 def _subscription_change(self):
00222                     l.append(len(self._subscribers))
00223             e = SubChangeEvent()
00224             h1 = e.subscribe_repeating(None)
00225             h2 = e.subscribe_repeating(None)
00226             h1.unsubscribe()
00227             h3 = e.subscribe_repeating(None)
00228             h2.unsubscribe()
00229             h3.unsubscribe()
00230             self.assertEqual(l, [0, 1, 2, 1, 2, 1, 0])
00231 
00232         def test_unsub_myself(self):
00233             """Tests that a callback can unsubscribe itself."""
00234             e = Event()
00235             l = []
00236             u = Unsubscriber(l, False)
00237             u.h = e.subscribe_repeating(u.cb)
00238             e.trigger('t1')
00239             e.trigger('t2')
00240             self.assertEqual(l, [
00241                 (('t1',), {}),
00242                 ])
00243 
00244         def test_multiple_unsubscribe_repeating(self):
00245             """Tests exceptoin on multiple unsubscribe for repeating subscribers."""
00246             e = Event()
00247             h = e.subscribe_repeating(None)
00248             h.unsubscribe()
00249             self.assertRaises(MultipleUnsubscribe, h.unsubscribe)
00250 
00251         def test_multiple_unsubscribe_once(self):
00252             """Tests exceptoin on multiple unsubscribe for non-repeating subscribers."""
00253             e = Event()
00254             h = e.subscribe_repeating(None)
00255             h.unsubscribe()
00256             self.assertRaises(MultipleUnsubscribe, h.unsubscribe)
00257 
00258         def test_unsubscribe_all(self):
00259             """Tests basic unsubscribe_all functionality."""
00260             e = Event()
00261             e.subscribe_repeating(None)
00262             e.subscribe_repeating(None)
00263             e.subscribe_repeating(None)
00264             e.subscribe_repeating(None)
00265             e.subscribe_repeating(None)
00266             e.unsubscribe_all()
00267             self.assertEqual(len(e._subscribers), 0)
00268 
00269         def test_unsub_myself_blocking(self):
00270             """Tests that a blocking unsubscribe on myself raises exception."""
00271             e = Event()
00272             l = []
00273             u = Unsubscriber(l, True)
00274             u.h = e.subscribe_repeating(u.cb)
00275             self.assertRaises(DeadlockException, e.trigger, ['t1'])
00276 
00277         def test_unsub_myself_nonblocking(self):
00278             """Tests that a nonblocking unsubscribe on myself does not raise."""
00279             e = Event()
00280             l = []
00281             u = Unsubscriber(l, False)
00282             u.h = e.subscribe_repeating(u.cb)
00283             e.trigger('t1')
00284 
00285     def wait_cv(cv, l, cb, trig):
00286         with cv:
00287             l.append((cb, trig, 'pre'))
00288             cv.notify_all()
00289             cv.wait()
00290             l.append((cb, trig, 'post'))
00291 
00292     class ThreadTest(unittest.TestCase):
00293         def setUp(self):
00294             self.e = Event()
00295             self.cv = threading.Condition()
00296             self.l = []
00297             self.h1 = self.e.subscribe_once(wait_cv, self.cv, self.l, 'cb1')
00298             self.h2 = self.e.subscribe_once(wait_cv, self.cv, self.l, 'cb1')
00299             self.t = threading.Thread(target = self.e.trigger, args=['t1'])
00300             self.t.start()
00301 
00302         def test_norun_sub_during_trig(self):
00303             """Tests that a callback that gets added during a trigger is
00304             not run."""
00305             
00306             # Trigger event
00307             with self.cv:
00308                 # Handle first callback.
00309                 while not self.l:
00310                     self.cv.wait()
00311                 # This runs while wait_cv is waiting
00312                 self.l.append('main')
00313                 self.e.subscribe_repeating(append_cb, self.l, 'cb2')
00314                 self.cv.notify_all()
00315 
00316                 # Handle second wait_cv callback.
00317                 while len(self.l) != 4:
00318                     self.cv.wait()
00319                 self.l.append('main2')
00320                 self.cv.notify_all()
00321             
00322             # Let the trigger finish
00323             self.t.join()
00324 
00325             self.expected = [
00326                 ('cb1', 't1', 'pre'), 
00327                 'main',
00328                 ('cb1', 't1', 'post'), 
00329                 ('cb1', 't1', 'pre'), 
00330                 'main2',
00331                 ('cb1', 't1', 'post'), 
00332                 (('cb2', 't2'), {}),
00333                 ]
00334             
00335             # Trigger event again
00336             self.e.trigger('t2')
00337             
00338             self.assertEqual(self.l, self.expected)
00339 
00340         def test_norun_unsub_during_trig(self):
00341             """Tests that a callback that gets deleted during a trigger is
00342             not run."""
00343             
00344             # Trigger event
00345             with self.cv:
00346                 # Handle first callback.
00347                 while not self.l:
00348                     self.cv.wait()
00349                 # This runs while wait_cv is waiting
00350                 self.l.append('main')
00351                 unsubed = 0
00352                 self.h1.unsubscribe(blocking = False)
00353                 self.h2.unsubscribe(blocking = False)
00354                 self.cv.notify_all()
00355             
00356             # Let the trigger finish
00357             self.t.join()
00358 
00359             self.expected = [
00360                 ('cb1', 't1', 'pre'), 
00361                 'main',
00362                 ('cb1', 't1', 'post'), 
00363                 ]
00364 
00365             # Trigger event again
00366             self.e.trigger('t2')
00367             
00368             self.assertEqual(self.l, self.expected)
00369 
00370     if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="):
00371         import roslib; roslib.load_manifest('multi_interface_roam')
00372         import rostest
00373         rostest.unitrun('multi_interface_roam', 'event_basic', BasicTest)
00374         rostest.unitrun('multi_interface_roam', 'event_thread', ThreadTest)
00375     else:
00376         unittest.main()


multi_interface_roam
Author(s): Blaise Gassend
autogenerated on Thu Jan 2 2014 11:26:15