$search
00001 """ 00002 test_dispatch.py 00003 00004 By Paul Malmsten, 2010 00005 pmalmsten@gmail.com 00006 00007 Tests the Dispatch module. 00008 """ 00009 import unittest 00010 from xbee.helpers.dispatch import Dispatch 00011 from xbee.helpers.dispatch.tests.fake import FakeXBee 00012 00013 class CallbackCheck(object): 00014 def __init__(self): 00015 self.called = False 00016 00017 def call(self, name, data): 00018 self.called = True 00019 00020 class TestDispatch(unittest.TestCase): 00021 """ 00022 Tests xbee.helpers.dispatch for expected behavior 00023 """ 00024 00025 def setUp(self): 00026 self.xbee = FakeXBee(None) 00027 self.dispatch = Dispatch(xbee=self.xbee) 00028 self.callback_check = CallbackCheck() 00029 00030 def test_callback_is_called_when_registered(self): 00031 """ 00032 After registerring a callback function with a filter function, 00033 the callback should be called when data arrives. 00034 """ 00035 self.dispatch.register("test1", self.callback_check.call, lambda data: True) 00036 self.dispatch.run(oneshot=True) 00037 self.assertTrue(self.callback_check.called) 00038 00039 def test_callback_not_called_when_filter_not_satisfied(self): 00040 """ 00041 After registerring a callback function with a filter function, 00042 the callback should not be called if a packet which does not 00043 satisfy the callback's filter arrives. 00044 """ 00045 self.dispatch.register("test1", self.callback_check.call, lambda data: False) 00046 self.dispatch.run(oneshot=True) 00047 self.assertFalse(self.callback_check.called) 00048 00049 def test_multiple_callbacks(self): 00050 """ 00051 Many callbacks should be called on the same packet if each 00052 callback's filter method is satisfied. 00053 """ 00054 callbacks = [] 00055 00056 for i in range(0,10): 00057 check = CallbackCheck() 00058 callbacks.append(check) 00059 self.dispatch.register("test%d" % i, check.call, lambda data: True) 00060 00061 self.dispatch.run(oneshot=True) 00062 00063 for callback in callbacks: 00064 if not callback.called: 00065 self.fail("All callback methods should be called") 00066 00067 def test_callback_name_collisions_raise_valueerror(self): 00068 """ 00069 If a call to register() results in attempting to register a 00070 callback with the same name as another callback should result 00071 in a ValueError exception being raised. 00072 """ 00073 self.dispatch.register("test", None, None) 00074 self.assertRaises(ValueError, self.dispatch.register, "test", None, None) 00075 00076 00077 class TestHeadlessDispatch(unittest.TestCase): 00078 """ 00079 Tests Dispatch functionality when it is not constructed with a serial 00080 port or an XBee 00081 """ 00082 def setUp(self): 00083 self.headless = Dispatch() 00084 00085 def test_dispatch_can_be_created(self): 00086 """ 00087 A user may construct a Dispatch with neither a serial port nor 00088 an XBee. This allows one to configure an XBee to asynchronously 00089 call dispatch() whenever a packet arrives. 00090 """ 00091 pass 00092 00093 def test_run_raises_exception(self): 00094 """ 00095 A ValueError must be raised by a headless Dispatch instance if 00096 a user attempts to call run(). 00097 """ 00098 self.assertRaises(ValueError, self.headless.run)