dispatch.py
Go to the documentation of this file.
1 """
2 dispatch.py
3 
4 By Paul Malmsten, 2010
5 pmalmsten@gmail.com
6 
7 Provides the Dispatch class, which allows one to filter incoming data
8 packets from an XBee device and call an appropriate method when
9 one arrives.
10 """
11 
12 from xbee import XBee
13 
14 class Dispatch(object):
15  def __init__(self, ser=None, xbee=None):
16  self.xbee = None
17  if xbee:
18  self.xbee = xbee
19  elif ser:
20  self.xbee = XBee(ser)
21 
22  self.handlers = []
23  self.names = set()
24 
25  def register(self, name, callback, filter):
26  """
27  register: string, function: string, data -> None, function: data -> boolean -> None
28 
29  Register will save the given name, callback, and filter function
30  for use when a packet arrives. When one arrives, the filter
31  function will be called to determine whether to call its associated
32  callback function. If the filter method returns true, the callback
33  method will be called with its associated name string and the packet
34  which triggered the call.
35  """
36  if name in self.names:
37  raise ValueError("A callback has already been registered with the name '%s'" % name)
38 
39  self.handlers.append(
40  {'name':name,
41  'callback':callback,
42  'filter':filter}
43  )
44 
45  self.names.add(name)
46 
47  def run(self, oneshot=False):
48  """
49  run: boolean -> None
50 
51  run will read and dispatch any packet which arrives from the
52  XBee device
53  """
54  if not self.xbee:
55  raise ValueError("Either a serial port or an XBee must be provided to __init__ to execute run()")
56 
57  while True:
58  self.dispatch(self.xbee.wait_read_frame())
59 
60  if oneshot:
61  break
62 
63  def dispatch(self, packet):
64  """
65  dispatch: XBee data dict -> None
66 
67  When called, dispatch checks the given packet against each
68  registered callback method and calls each callback whose filter
69  function returns true.
70  """
71  for handler in self.handlers:
72  if handler['filter'](packet):
73  # Call the handler method with its associated
74  # name and the packet which passed its filter check
75  handler['callback'](handler['name'], packet)
def register(self, name, callback, filter)
Definition: dispatch.py:25
def __init__(self, ser=None, xbee=None)
Definition: dispatch.py:15
def run(self, oneshot=False)
Definition: dispatch.py:47


rosserial_xbee
Author(s): Adam Stambler
autogenerated on Mon Jun 10 2019 14:53:52