test_dispatch.py
Go to the documentation of this file.
1 """
2 test_dispatch.py
3 
4 By Paul Malmsten, 2010
5 pmalmsten@gmail.com
6 
7 Tests the Dispatch module.
8 """
9 import unittest
10 from xbee.helpers.dispatch import Dispatch
11 from xbee.helpers.dispatch.tests.fake import FakeXBee
12 
13 class CallbackCheck(object):
14  def __init__(self):
15  self.called = False
16 
17  def call(self, name, data):
18  self.called = True
19 
20 class TestDispatch(unittest.TestCase):
21  """
22  Tests xbee.helpers.dispatch for expected behavior
23  """
24 
25  def setUp(self):
26  self.xbee = FakeXBee(None)
27  self.dispatch = Dispatch(xbee=self.xbee)
29 
31  """
32  After registerring a callback function with a filter function,
33  the callback should be called when data arrives.
34  """
35  self.dispatch.register("test1", self.callback_check.call, lambda data: True)
36  self.dispatch.run(oneshot=True)
37  self.assertTrue(self.callback_check.called)
38 
40  """
41  After registerring a callback function with a filter function,
42  the callback should not be called if a packet which does not
43  satisfy the callback's filter arrives.
44  """
45  self.dispatch.register("test1", self.callback_check.call, lambda data: False)
46  self.dispatch.run(oneshot=True)
47  self.assertFalse(self.callback_check.called)
48 
50  """
51  Many callbacks should be called on the same packet if each
52  callback's filter method is satisfied.
53  """
54  callbacks = []
55 
56  for i in range(0,10):
57  check = CallbackCheck()
58  callbacks.append(check)
59  self.dispatch.register("test%d" % i, check.call, lambda data: True)
60 
61  self.dispatch.run(oneshot=True)
62 
63  for callback in callbacks:
64  if not callback.called:
65  self.fail("All callback methods should be called")
66 
68  """
69  If a call to register() results in attempting to register a
70  callback with the same name as another callback should result
71  in a ValueError exception being raised.
72  """
73  self.dispatch.register("test", None, None)
74  self.assertRaises(ValueError, self.dispatch.register, "test", None, None)
75 
76 
77 class TestHeadlessDispatch(unittest.TestCase):
78  """
79  Tests Dispatch functionality when it is not constructed with a serial
80  port or an XBee
81  """
82  def setUp(self):
83  self.headless = Dispatch()
84 
86  """
87  A user may construct a Dispatch with neither a serial port nor
88  an XBee. This allows one to configure an XBee to asynchronously
89  call dispatch() whenever a packet arrives.
90  """
91  pass
92 
94  """
95  A ValueError must be raised by a headless Dispatch instance if
96  a user attempts to call run().
97  """
98  self.assertRaises(ValueError, self.headless.run)


rosserial_xbee
Author(s): Adam Stambler
autogenerated on Mon Jun 10 2019 14:53:52