$search
00001 #! /usr/bin/env python 00002 from twisted.internet.defer import Deferred, DeferredQueue, inlineCallbacks, returnValue 00003 from twisted.internet import reactor 00004 from twisted.internet.protocol import Protocol 00005 from collections import deque 00006 from weakcallback import WeakCallbackCb 00007 import weakref 00008 import sys 00009 from event import Unsubscribe, Event 00010 00011 # FIXME Add test for ReadDescrEventStream 00012 00013 def async_sleep(t): 00014 d = Deferred() 00015 reactor.callLater(max(0, t), d.callback, None) 00016 return d 00017 00018 def event_queue(event): 00019 q = DeferredQueue() 00020 def cb(*args, **kwargs): 00021 q.put((args, kwargs)) 00022 h = event.subscribe_repeating(cb) 00023 q.unsubscribe = h.unsubscribe 00024 return q 00025 00026 def now(): 00027 d = Deferred() 00028 reactor.callLater(0, d.callback, None) 00029 return d 00030 00031 def wait_for_state(state, condition = None): 00032 d = Deferred() 00033 def cb(old_state, new_state): 00034 #print "wait_for_state, cb", old_state, new_state, condition 00035 if condition is None or condition(new_state): 00036 #print "wait_for_state, cb, hit" 00037 d.callback(new_state) 00038 raise Unsubscribe 00039 state.subscribe(cb) 00040 return d 00041 00042 def wait_for_event(event, condition = None): 00043 d = Deferred() 00044 def cb(*args, **kwargs): 00045 if condition is None or condition(args, kwargs): 00046 d.callback(*args, **kwargs) 00047 raise Unsubscribe 00048 event.subscribe_repeating(cb) 00049 return d 00050 00051 class EventStream: 00052 """Event stream class to be used with select and switch.""" 00053 def __init__(self, event = None): 00054 self._queue = deque() 00055 self._invoke_listener = None 00056 self.put = WeakCallbackCb(self._put) 00057 self.discard = False 00058 if event: 00059 event.subscribe_repeating(self.put) 00060 00061 def _put(*args, **kwargs): 00062 """Puts an element into the queue, and notifies an eventual 00063 listener. Use pop rather than _pop or you will create cycles and 00064 prevent reference counts from going to zero.""" 00065 self = args[0] 00066 if self.discard: 00067 return 00068 self._queue.append((args[1:], kwargs)) 00069 if self._invoke_listener: 00070 self._trigger() 00071 00072 def get(self): 00073 """Pops the next element off the queue.""" 00074 return self._queue.popleft() 00075 00076 def _trigger(self): 00077 """Used internally to to trigger the callback.""" 00078 self._invoke_listener.callback(None) 00079 self._invoke_listener = None 00080 00081 def listen(self): 00082 """Returns a Deferred that will be called the next time an event 00083 arrives, possibly immediately.""" 00084 if self._invoke_listener: 00085 raise Exception("Event stream in use from multiple places simultaneously.") 00086 d = self._invoke_listener = Deferred() 00087 if self._queue: 00088 self._trigger() 00089 return d # Using d because self._trigger() may change self._invoke_listener 00090 00091 def stop_listen(self): 00092 """Discards the Deferred set by listen.""" 00093 assert self._invoke_listener or self._queue 00094 self._invoke_listener = None 00095 00096 def set_discard(self, discard): 00097 """Tells the event stream whether it should discard all queued and 00098 incoming events.""" 00099 self.discard = discard 00100 if discard: 00101 self._queue = deque() 00102 00103 class EventStreamFromDeferred(EventStream): 00104 def __init__(self, d = None): 00105 EventStream.__init__(self) 00106 if d is None: 00107 d = Deferred() 00108 self.deferred = d 00109 d.addCallback(self.put) 00110 00111 def StateCondition(*args, **kwargs): 00112 return EventStreamFromDeferred(wait_for_state(*args, **kwargs)) 00113 00114 def EventCondition(*args, **kwargs): 00115 return EventStreamFromDeferred(wait_for_event(*args, **kwargs)) 00116 00117 class Timeout(EventStream): 00118 def __init__(self, timeout): 00119 EventStream.__init__(self) 00120 h = reactor.callLater(max(0, timeout), self.put) 00121 def cancel(): 00122 if not h.called: 00123 h.cancel() 00124 self.put.set_deleted_cb(cancel) 00125 00126 class Now(EventStream): 00127 def __init__(self): 00128 EventStream.__init__(self) 00129 reactor.callLater(0, self.put) 00130 00131 class ReadDescrEventStream(EventStream): 00132 def __init__(self, portType, *args, **kwargs): 00133 EventStream.__init__(self) 00134 put = self.put 00135 class _ReadDescrEventStreamProto(Protocol): 00136 def dataReceived(self, data): 00137 put(data) 00138 self.port = reactor.listenWith(portType, _ReadDescrEventStreamProto(), *args, **kwargs) 00139 def cancel(port): 00140 port.stopListening() 00141 self.put.set_deleted_cb(cancel, self.port) 00142 00143 def recv(self): 00144 return self.get()[0][0] 00145 00146 @inlineCallbacks 00147 def select(*events): 00148 """Listens to the provided EventStreams, and returns a set of integers 00149 indicating which ones have data ready, as soon as there its at least 00150 one that is ready.""" 00151 ready_list = [] 00152 done = Deferred() 00153 done_called = [] 00154 def _select_cb(_, i): 00155 ready_list.append(i) 00156 if not done_called: 00157 # We don't do the callback directly or else cycles tend to form 00158 # which cause delays in releasing objects. 00159 done_called.append(None) 00160 reactor.callLater(0, done.callback, None) 00161 for i in range(len(events)): 00162 events[i].listen().addCallback(_select_cb, i) 00163 try: 00164 yield done 00165 finally: 00166 for e in events: 00167 e.stop_listen() 00168 del events # Needed to avoid creating a cycle when events gets put 00169 del e # into the traceback associated with returnValue. 00170 returnValue(ready_list) 00171 00172 @inlineCallbacks 00173 def switch(cases, multiple = False): 00174 events, actions = zip(*cases.iteritems()) 00175 00176 ready_list = yield select(*events) 00177 00178 for i in ready_list: 00179 args, kwargs = events[i].get() 00180 if actions[i]: 00181 actions[i](*args, **kwargs) 00182 if not multiple: 00183 break 00184 00185 def wrap_function(f): 00186 def run_f(g): 00187 def run_g(*args, **kwargs): 00188 return f(g, *args, **kwargs) 00189 return run_g 00190 return run_f 00191 00192 @wrap_function 00193 def mainThreadCallback(f, *args, **kwargs): 00194 "Decorator that causes the function to be called in the main thread." 00195 reactor.callFromThread(f, *args, **kwargs) 00196 00197 @wrap_function 00198 def async_test(f, *args, **kwargs): 00199 "Starts an asynchronous test, waits for it to complete, and returns its result." 00200 result = [] 00201 def cb(value, good): 00202 result.append(good) 00203 result.append(value) 00204 inlineCallbacks(f)(*args, **kwargs).addCallbacks(callback = cb, callbackArgs = [True], 00205 errback = cb, errbackArgs = [False]) 00206 while not result: 00207 reactor.iterate(0.02) 00208 if result[0]: 00209 # Uncomment the following line to check that all the tests 00210 # really are being run to completion. 00211 #raise(Exception("Success")) 00212 return result[1] 00213 else: 00214 result[1].printTraceback() 00215 result[1].raiseException() 00216 00217 def unittest_with_reactor(run_ros_tests): 00218 exitval = [] 00219 def run_test(): 00220 try: 00221 if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="): 00222 import roslib; roslib.load_manifest('multi_interface_roam') 00223 global rostest 00224 import rostest 00225 run_ros_tests() 00226 else: 00227 import unittest 00228 unittest.main() 00229 exitval.append(0) 00230 except SystemExit, v: 00231 exitval.append(v.code) 00232 except: 00233 import traceback 00234 traceback.print_exc() 00235 finally: 00236 reactor.stop() 00237 00238 reactor.callWhenRunning(run_test) 00239 reactor.run() 00240 sys.exit(exitval[0]) 00241 00242 if __name__ == "__main__": 00243 import unittest 00244 import sys 00245 import gc 00246 import threading 00247 #from twisted.internet.defer import setDebugging 00248 #setDebugging(True) 00249 00250 class EventStreamTest(unittest.TestCase): 00251 def test_dies_despite_cb(self): 00252 """Test that EventStream gets unallocated despite its callback 00253 being held. As without another reference, calling the callback 00254 will have no effect.""" 00255 es = EventStream() 00256 esr = weakref.ref(es) 00257 putter = es.put 00258 l = [] 00259 putter.set_deleted_cb(lambda : l.append('deleted')) 00260 self.assertEqual(l, []) 00261 self.assertEqual(esr(), es) 00262 del es 00263 self.assertEqual(l, ['deleted']) 00264 self.assertEqual(esr(), None) 00265 00266 @async_test 00267 def test_timeout_memory_frees_correctly(self): 00268 """Had a lot of subtle bugs getting select to free up properly. 00269 This test case is what pointed me at them.""" 00270 before = 0 00271 after = 0 00272 00273 # First few times through seems to create some new data, so 00274 # retry. (#4518) 00275 for iter in range(10): 00276 before = len(gc.get_objects()) 00277 # The yielding statement seems necessary, some stuff only gets 00278 # cleaned up when going through the reactor main loop. 00279 yield select(Timeout(0.001)) 00280 after = len(gc.get_objects()) 00281 if before == after: 00282 break 00283 self.assertEqual(before, after) 00284 00285 @async_test 00286 def test_timeout_autocancel(self): 00287 self.assertEqual(len(reactor.getDelayedCalls()), 0) 00288 yield select(Timeout(0.3), Timeout(0.001)) 00289 self.assertEqual(len(reactor.getDelayedCalls()), 0) 00290 00291 @async_test 00292 def test_event_stream_from_event(self): 00293 e = Event() 00294 es = EventStream(e) 00295 e.trigger('hello world') 00296 yield select(es) 00297 self.assertEqual(es.get(), (('hello world',),{})) 00298 00299 class SelectTest(unittest.TestCase): 00300 @async_test 00301 def test_select_param1(self): 00302 "Tests that the first parameter can get returned." 00303 self.assertEqual((yield (select(Timeout(.01), Timeout(100)))), [0]) 00304 00305 @async_test 00306 def test_select_param2(self): 00307 "Tests that the second parameter can get returned." 00308 self.assertEqual((yield (select(Timeout(100), Timeout(.01)))), [1]) 00309 00310 @async_test 00311 def test_select_both(self): 00312 "Tests that the both parameters can get returned." 00313 es1 = EventStream() 00314 es2 = EventStream() 00315 es1.put(None) 00316 es2.put(None) 00317 self.assertEqual((yield select(es1, es2)), [0, 1]) 00318 00319 class SwitchTest(unittest.TestCase): 00320 @async_test 00321 def test_switch_param(self): 00322 "Tests switch on single outcome." 00323 yield switch({ 00324 Timeout(.01): lambda : None, 00325 Timeout(100): lambda : self.fail('Wrong switch'), 00326 }) 00327 00328 @async_test 00329 def test_switch_empty_action(self): 00330 "Tests that a None action doesn't cause an exception." 00331 yield switch({ 00332 Timeout(.01): None, 00333 }) 00334 00335 @async_test 00336 def test_switch_both_single(self): 00337 "Tests switch on simultaneous, non-multiple." 00338 es1 = EventStream() 00339 es2 = EventStream() 00340 es1.put() 00341 es2.put() 00342 hits = [] 00343 yield switch({ 00344 es1: lambda : hits.append(None), 00345 es2: lambda : hits.append(None), 00346 }) 00347 self.assertEqual(len(hits), 1) 00348 00349 @async_test 00350 def test_switch_both_multiple(self): 00351 "Tests switch on simultaneous, multiple." 00352 es1 = EventStreamFromDeferred() 00353 es2 = EventStreamFromDeferred() 00354 es1.deferred.callback(None) 00355 es2.deferred.callback(None) 00356 hits = [] 00357 yield switch({ 00358 es1: lambda _: hits.append(None), 00359 es2: lambda _: hits.append(None), 00360 }, multiple = True) 00361 self.assertEqual(len(hits), 2) 00362 00363 @async_test 00364 def test_switch_parameters(self): 00365 "Tests that switch passes parameters correctly." 00366 es = EventStream() 00367 es.put(3) 00368 es.put(4) 00369 yield switch({ 00370 es: lambda v: self.assertEqual(v, 3), 00371 }, multiple = True) 00372 yield switch({ 00373 es: lambda v: self.assertEqual(v, 4), 00374 }, multiple = True) 00375 self.assertRaises(IndexError, es._queue.pop) 00376 00377 class DecoratorTest(unittest.TestCase): 00378 def test_main_thread_callback(self): 00379 "Tests that mainThreadCallback works." 00380 done = [] 00381 @mainThreadCallback 00382 def cb(*args, **kwargs): 00383 done.append((args, kwargs)) 00384 threading.Thread(target=cb, args=[1,2], kwargs={'a':'b'}).start() 00385 for i in range(0, 100): 00386 if done: 00387 break 00388 reactor.iterate(0.02) 00389 self.assertEqual(done, [((1,2), {'a':'b'})]) 00390 00391 def run_ros_tests(): 00392 rostest.unitrun('multi_interface_roam', 'eventstream', EventStreamTest) 00393 rostest.unitrun('multi_interface_roam', 'select', SelectTest) 00394 rostest.unitrun('multi_interface_roam', 'switch', SwitchTest) 00395 rostest.unitrun('multi_interface_roam', 'decorators', DecoratorTest) 00396 00397 unittest_with_reactor(run_ros_tests)