00001
00002
00003 from __future__ import with_statement
00004
00005 import thread
00006 import weakref
00007
00008
00009
00010
00011 class MultipleUnsubscribe(Exception):
00012 pass
00013
00014 class Unsubscribe(Exception):
00015 pass
00016
00017 class _LivenessSensor():
00018 def __init__(self):
00019 self._cb = None
00020
00021 def set_cb(self, cb = None, *args, **kwargs):
00022 self._cb = cb
00023 self._args = args
00024 self._kwargs = kwargs
00025 if not cb:
00026 self._args = None
00027 self._kwargs = None
00028
00029 def __del__(self):
00030 if self._cb:
00031 self._cb(*self._args, **self._kwargs)
00032
00033 class EventCallbackHandle:
00034 def __init__(self, dispatcher):
00035 self._dispatcher = weakref.ref(dispatcher)
00036 self._auto_unsubscribed = False
00037
00038 def unsubscribe(self):
00039 if not self._auto_unsubscribed:
00040 d = self._dispatcher()
00041 if d:
00042 d._unsubscribe(self)
00043
00044 def _auto_unsubscribed():
00045 self._auto_unsubscribed = True
00046 del self._dispatcher
00047
00048 class _EventDispatcher:
00049 def __init__(self):
00050 self._subscribers = {}
00051
00052 def _unsubscribe(self, h):
00053 try:
00054 del self._subscribers[h]
00055 except KeyError:
00056 raise MultipleUnsubscribe("Tried to unsubscribe from event more than once.")
00057
00058 def _trigger(self, *args, **kwargs):
00059 for (h, (cb, firstargs, firstkwargs, repeating, _)) in self._subscribers.items():
00060 if not h in self._subscribers:
00061 continue
00062
00063 allargs = firstargs + args
00064
00065
00066 allkwargs = dict(kwargs)
00067 allkwargs.update(firstkwargs)
00068 try:
00069 cb(*allargs, **allkwargs)
00070 except Unsubscribe:
00071 repeating = False
00072 except:
00073 import traceback
00074 traceback.print_exc()
00075 if not repeating:
00076 del self._subscribers[h]
00077 h._auto_unsubscribed = True
00078
00079 class Event:
00080 def __init__(self):
00081 self._liveness_sensor = _LivenessSensor()
00082 self._dispatcher = _EventDispatcher()
00083 self.trigger = self._dispatcher._trigger
00084 self.set_close_cb = self._liveness_sensor.set_cb
00085
00086 def subscribe(self, cb, args, kwargs, repeating = True):
00087 """Subscribes to an event.
00088
00089 Can be called at any time and from any thread. Subscriptions that
00090 occur while an event is being triggered will not be called until
00091 the next time the event is triggered."""
00092
00093 h = EventCallbackHandle(self._dispatcher)
00094 self._dispatcher._subscribers[h] = (cb, args, kwargs, repeating, self._liveness_sensor)
00095 return h
00096
00097 def subscribe_once(*args, **kwargs):
00098
00099
00100 return args[0].subscribe(args[1], args[2:], kwargs, repeating = False)
00101
00102 def subscribe_repeating(*args, **kwargs):
00103
00104
00105 return args[0].subscribe(args[1], args[2:], kwargs, repeating = True)
00106
00107 def unsubscribe_all(self):
00108 for h in self._dispatcher._subscribers.keys():
00109 self._dispatcher._unsubscribe(h)
00110
00111 if __name__ == "__main__":
00112 import unittest
00113 import sys
00114
00115 def append_cb(l, *args, **kwargs):
00116 l.append((args, kwargs))
00117
00118 class Unsubscriber:
00119 def __init__(self, l, blocking):
00120 self.l = l
00121 if blocking:
00122 raise Exception("blocking unsubscribe has been removed")
00123
00124 def cb(self, *args, **kwargs):
00125 append_cb(self.l, *args, **kwargs)
00126 self.h.unsubscribe()
00127
00128 class BasicTest(unittest.TestCase):
00129 def test_basic(self):
00130 """Tests basic functionality.
00131
00132 Adds a couple of callbacks. Makes sure they are called the
00133 right number of times. Checks that parameters are correct,
00134 including keyword arguments giving priority to subscribe over
00135 trigger."""
00136 e = Event()
00137 l1 = []
00138 l2 = []
00139 h1 = e.subscribe_repeating(append_cb, l1, 'd', e = 'f')
00140 e.subscribe_once(append_cb, l2, 'a', b = 'c')
00141 e.trigger('1', g = 'h')
00142 e.trigger('2', e = 'x')
00143 h1.unsubscribe()
00144 e.trigger('3')
00145 sys.stdout.flush()
00146 self.assertEqual(l1, [
00147 (('d', '1'), { 'e' : 'f', 'g' : 'h'}),
00148 (('d', '2'), { 'e' : 'f'}),
00149 ])
00150 self.assertEqual(l2, [
00151 (('a', '1'), { 'b' : 'c', 'g': 'h'}),
00152 ])
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169 def test_unsub_myself(self):
00170 """Tests that a callback can unsubscribe itself."""
00171 e = Event()
00172 l = []
00173 u = Unsubscriber(l, False)
00174 u.h = e.subscribe_repeating(u.cb)
00175 e.trigger('t1')
00176 e.trigger('t2')
00177 self.assertEqual(l, [
00178 (('t1',), {}),
00179 ])
00180
00181 def test_multiple_unsubscribe_repeating(self):
00182 """Tests exceptoin on multiple unsubscribe for repeating subscribers."""
00183 e = Event()
00184 h = e.subscribe_repeating(None)
00185 h.unsubscribe()
00186 self.assertRaises(MultipleUnsubscribe, h.unsubscribe)
00187
00188 def test_multiple_unsubscribe_once(self):
00189 """Tests exceptoin on multiple unsubscribe for non-repeating subscribers."""
00190 e = Event()
00191 h = e.subscribe_repeating(None)
00192 h.unsubscribe()
00193 self.assertRaises(MultipleUnsubscribe, h.unsubscribe)
00194
00195 def test_unsubscribe_all(self):
00196 """Tests basic unsubscribe_all functionality."""
00197 e = Event()
00198 e.subscribe_repeating(None)
00199 e.subscribe_repeating(None)
00200 e.subscribe_repeating(None)
00201 e.subscribe_repeating(None)
00202 e.subscribe_repeating(None)
00203 e.unsubscribe_all()
00204 self.assertEqual(len(e._dispatcher._subscribers), 0)
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214 def test_unsub_myself_nonblocking(self):
00215 """Tests that a nonblocking unsubscribe on myself does not raise."""
00216 e = Event()
00217 l = []
00218 u = Unsubscriber(l, False)
00219 u.h = e.subscribe_repeating(u.cb)
00220 e.trigger('t1')
00221
00222 def test_norun_sub_during_trig(self):
00223 """Tests that a callback that gets added during a trigger is
00224 not run."""
00225 e = Event()
00226 l = []
00227 def add_cb(iter):
00228 l.append(('add_cb', iter))
00229 e.subscribe_repeating(append_cb, l)
00230 h = e.subscribe_repeating(add_cb)
00231 e.trigger('v1')
00232 h.unsubscribe()
00233 e.trigger('v2')
00234 self.assertEqual(l, [('add_cb', 'v1'), (('v2', ), {})])
00235
00236 def test_norun_unsub_during_trig(self):
00237 """Tests that a callback that gets deleted during a trigger is
00238 not run."""
00239 e = Event()
00240 l = []
00241 def rm_cb():
00242 l.append('rm')
00243 e.unsubscribe_all()
00244 e.subscribe_repeating(rm_cb)
00245 e.subscribe_repeating(rm_cb)
00246 e.trigger()
00247 self.assertEqual(l, ['rm'])
00248
00249 def test_liveness_sensor_no_source(self):
00250 """Tests when the liveness sensor gets set off when the event
00251 disappears and there is no event source."""
00252 e = Event()
00253 l = []
00254 e.set_close_cb(l.append, 'closed')
00255 e.subscribe_repeating(lambda : None)
00256 self.assertEqual(l, [])
00257 del e
00258 self.assertEqual(l, ['closed'])
00259
00260 def test_liveness_sensor_with_source(self):
00261 """Tests when the liveness sensor gets set off when the event
00262 disappears and there is an event source."""
00263 e = Event()
00264 t = e.trigger
00265 l = []
00266 e.set_close_cb(l.append, 'closed')
00267 h = e.subscribe_repeating(lambda : None)
00268 del e
00269 self.assertEqual(l, [])
00270 h.unsubscribe()
00271 self.assertEqual(l, ['closed'])
00272
00273 def test_liveness_sensor_with_dropping_source(self):
00274 """Tests when the liveness sensor gets set off when the event
00275 disappears and there is an event source."""
00276 e = Event()
00277 t = e.trigger
00278 l = []
00279 e.set_close_cb(l.append, 'closed')
00280 h = e.subscribe_repeating(lambda : None)
00281 del e
00282 self.assertEqual(l, [])
00283 del t
00284 self.assertEqual(l, ['closed'])
00285
00286 if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="):
00287 import roslib; roslib.load_manifest('multi_interface_roam')
00288 import rostest
00289 rostest.unitrun('multi_interface_roam', 'event_basic', BasicTest)
00290 else:
00291 unittest.main()