$search
00001 #! /usr/bin/env python 00002 00003 from __future__ import with_statement 00004 00005 import thread 00006 import weakref 00007 00008 # TODO: 00009 # - Add an exception that allows you to unsubscribe from a callback. 00010 00011 class MultipleUnsubscribe(Exception): 00012 pass 00013 00014 class Unsubscribe(Exception): 00015 pass 00016 00017 class _LivenessSensor(): 00018 def __init__(self): 00019 self._cb = None 00020 00021 def set_cb(self, cb = None, *args, **kwargs): 00022 self._cb = cb 00023 self._args = args 00024 self._kwargs = kwargs 00025 if not cb: 00026 self._args = None 00027 self._kwargs = None 00028 00029 def __del__(self): 00030 if self._cb: 00031 self._cb(*self._args, **self._kwargs) 00032 00033 class EventCallbackHandle: 00034 def __init__(self, dispatcher): 00035 self._dispatcher = weakref.ref(dispatcher) 00036 self._auto_unsubscribed = False 00037 00038 def unsubscribe(self): 00039 if not self._auto_unsubscribed: 00040 d = self._dispatcher() 00041 if d: 00042 d._unsubscribe(self) 00043 00044 def _auto_unsubscribed(): 00045 self._auto_unsubscribed = True 00046 del self._dispatcher 00047 00048 class _EventDispatcher: 00049 def __init__(self): 00050 self._subscribers = {} 00051 00052 def _unsubscribe(self, h): 00053 try: 00054 del self._subscribers[h] 00055 except KeyError: 00056 raise MultipleUnsubscribe("Tried to unsubscribe from event more than once.") 00057 00058 def _trigger(self, *args, **kwargs): 00059 for (h, (cb, firstargs, firstkwargs, repeating, _)) in self._subscribers.items(): 00060 if not h in self._subscribers: 00061 continue # Don't call if deleted during triggering. 00062 # Prepare the positional parameters 00063 allargs = firstargs + args 00064 # Prepare the named parameters. Parameters given to 00065 # the subscribe call have precedence. 00066 allkwargs = dict(kwargs) 00067 allkwargs.update(firstkwargs) 00068 try: 00069 cb(*allargs, **allkwargs) 00070 except Unsubscribe: 00071 repeating = False 00072 except: 00073 import traceback 00074 traceback.print_exc() 00075 if not repeating: 00076 del self._subscribers[h] 00077 h._auto_unsubscribed = True 00078 00079 class Event: 00080 def __init__(self): 00081 self._liveness_sensor = _LivenessSensor() 00082 self._dispatcher = _EventDispatcher() 00083 self.trigger = self._dispatcher._trigger 00084 self.set_close_cb = self._liveness_sensor.set_cb 00085 00086 def subscribe(self, cb, args, kwargs, repeating = True): 00087 """Subscribes to an event. 00088 00089 Can be called at any time and from any thread. Subscriptions that 00090 occur while an event is being triggered will not be called until 00091 the next time the event is triggered.""" 00092 00093 h = EventCallbackHandle(self._dispatcher) 00094 self._dispatcher._subscribers[h] = (cb, args, kwargs, repeating, self._liveness_sensor) 00095 return h 00096 00097 def subscribe_once(*args, **kwargs): 00098 # We don't want the names we use to limit what the user can put in 00099 # kwargs. So do all our arguments positional. 00100 return args[0].subscribe(args[1], args[2:], kwargs, repeating = False) 00101 00102 def subscribe_repeating(*args, **kwargs): 00103 # We don't want the names we use to limit what the user can put in 00104 # kwargs. So do all our arguments positional. 00105 return args[0].subscribe(args[1], args[2:], kwargs, repeating = True) 00106 00107 def unsubscribe_all(self): 00108 for h in self._dispatcher._subscribers.keys(): 00109 self._dispatcher._unsubscribe(h) 00110 00111 if __name__ == "__main__": 00112 import unittest 00113 import sys 00114 00115 def append_cb(l, *args, **kwargs): 00116 l.append((args, kwargs)) 00117 00118 class Unsubscriber: 00119 def __init__(self, l, blocking): 00120 self.l = l 00121 if blocking: 00122 raise Exception("blocking unsubscribe has been removed") 00123 00124 def cb(self, *args, **kwargs): 00125 append_cb(self.l, *args, **kwargs) 00126 self.h.unsubscribe() 00127 00128 class BasicTest(unittest.TestCase): 00129 def test_basic(self): 00130 """Tests basic functionality. 00131 00132 Adds a couple of callbacks. Makes sure they are called the 00133 right number of times. Checks that parameters are correct, 00134 including keyword arguments giving priority to subscribe over 00135 trigger.""" 00136 e = Event() 00137 l1 = [] 00138 l2 = [] 00139 h1 = e.subscribe_repeating(append_cb, l1, 'd', e = 'f') 00140 e.subscribe_once(append_cb, l2, 'a', b = 'c') 00141 e.trigger('1', g = 'h') 00142 e.trigger('2', e = 'x') 00143 h1.unsubscribe() 00144 e.trigger('3') 00145 sys.stdout.flush() 00146 self.assertEqual(l1, [ 00147 (('d', '1'), { 'e' : 'f', 'g' : 'h'}), 00148 (('d', '2'), { 'e' : 'f'}), 00149 ]) 00150 self.assertEqual(l2, [ 00151 (('a', '1'), { 'b' : 'c', 'g': 'h'}), 00152 ]) 00153 00154 # def test_subscription_change(self): 00155 # """Test that the _subscription_change is called appropriately.""" 00156 # l = [] 00157 # class SubChangeEvent(Event): 00158 # def _subscription_change(self): 00159 # l.append(len(self._subscribers)) 00160 # e = SubChangeEvent() 00161 # h1 = e.subscribe_repeating(None) 00162 # h2 = e.subscribe_repeating(None) 00163 # h1.unsubscribe() 00164 # h3 = e.subscribe_repeating(None) 00165 # h2.unsubscribe() 00166 # h3.unsubscribe() 00167 # self.assertEqual(l, [0, 1, 2, 1, 2, 1, 0]) 00168 00169 def test_unsub_myself(self): 00170 """Tests that a callback can unsubscribe itself.""" 00171 e = Event() 00172 l = [] 00173 u = Unsubscriber(l, False) 00174 u.h = e.subscribe_repeating(u.cb) 00175 e.trigger('t1') 00176 e.trigger('t2') 00177 self.assertEqual(l, [ 00178 (('t1',), {}), 00179 ]) 00180 00181 def test_multiple_unsubscribe_repeating(self): 00182 """Tests exceptoin on multiple unsubscribe for repeating subscribers.""" 00183 e = Event() 00184 h = e.subscribe_repeating(None) 00185 h.unsubscribe() 00186 self.assertRaises(MultipleUnsubscribe, h.unsubscribe) 00187 00188 def test_multiple_unsubscribe_once(self): 00189 """Tests exceptoin on multiple unsubscribe for non-repeating subscribers.""" 00190 e = Event() 00191 h = e.subscribe_repeating(None) 00192 h.unsubscribe() 00193 self.assertRaises(MultipleUnsubscribe, h.unsubscribe) 00194 00195 def test_unsubscribe_all(self): 00196 """Tests basic unsubscribe_all functionality.""" 00197 e = Event() 00198 e.subscribe_repeating(None) 00199 e.subscribe_repeating(None) 00200 e.subscribe_repeating(None) 00201 e.subscribe_repeating(None) 00202 e.subscribe_repeating(None) 00203 e.unsubscribe_all() 00204 self.assertEqual(len(e._dispatcher._subscribers), 0) 00205 00206 # def test_unsub_myself_blocking(self): 00207 # """Tests that a blocking unsubscribe on myself raises exception.""" 00208 # e = Event() 00209 # l = [] 00210 # u = Unsubscriber(l, True) 00211 # u.h = e.subscribe_repeating(u.cb) 00212 # self.assertRaises(DeadlockException, e.trigger, ['t1']) 00213 00214 def test_unsub_myself_nonblocking(self): 00215 """Tests that a nonblocking unsubscribe on myself does not raise.""" 00216 e = Event() 00217 l = [] 00218 u = Unsubscriber(l, False) 00219 u.h = e.subscribe_repeating(u.cb) 00220 e.trigger('t1') 00221 00222 def test_norun_sub_during_trig(self): 00223 """Tests that a callback that gets added during a trigger is 00224 not run.""" 00225 e = Event() 00226 l = [] 00227 def add_cb(iter): 00228 l.append(('add_cb', iter)) 00229 e.subscribe_repeating(append_cb, l) 00230 h = e.subscribe_repeating(add_cb) 00231 e.trigger('v1') 00232 h.unsubscribe() 00233 e.trigger('v2') 00234 self.assertEqual(l, [('add_cb', 'v1'), (('v2', ), {})]) 00235 00236 def test_norun_unsub_during_trig(self): 00237 """Tests that a callback that gets deleted during a trigger is 00238 not run.""" 00239 e = Event() 00240 l = [] 00241 def rm_cb(): 00242 l.append('rm') 00243 e.unsubscribe_all() 00244 e.subscribe_repeating(rm_cb) 00245 e.subscribe_repeating(rm_cb) 00246 e.trigger() 00247 self.assertEqual(l, ['rm']) 00248 00249 def test_liveness_sensor_no_source(self): 00250 """Tests when the liveness sensor gets set off when the event 00251 disappears and there is no event source.""" 00252 e = Event() 00253 l = [] 00254 e.set_close_cb(l.append, 'closed') 00255 e.subscribe_repeating(lambda : None) 00256 self.assertEqual(l, []) 00257 del e 00258 self.assertEqual(l, ['closed']) 00259 00260 def test_liveness_sensor_with_source(self): 00261 """Tests when the liveness sensor gets set off when the event 00262 disappears and there is an event source.""" 00263 e = Event() 00264 t = e.trigger 00265 l = [] 00266 e.set_close_cb(l.append, 'closed') 00267 h = e.subscribe_repeating(lambda : None) 00268 del e 00269 self.assertEqual(l, []) 00270 h.unsubscribe() 00271 self.assertEqual(l, ['closed']) 00272 00273 def test_liveness_sensor_with_dropping_source(self): 00274 """Tests when the liveness sensor gets set off when the event 00275 disappears and there is an event source.""" 00276 e = Event() 00277 t = e.trigger 00278 l = [] 00279 e.set_close_cb(l.append, 'closed') 00280 h = e.subscribe_repeating(lambda : None) 00281 del e 00282 self.assertEqual(l, []) 00283 del t 00284 self.assertEqual(l, ['closed']) 00285 00286 if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="): 00287 import roslib; roslib.load_manifest('multi_interface_roam') 00288 import rostest 00289 rostest.unitrun('multi_interface_roam', 'event_basic', BasicTest) 00290 else: 00291 unittest.main()