base.py
Go to the documentation of this file.
1 """
2 xbee.py
3 
4 By Paul Malmsten, 2010
5 Inspired by code written by Amit Synderman and Marco Sangalli
6 pmalmsten@gmail.com
7 
8  _wait_for_frame modified by Adam Stambler to allow for non
9  blocking io
10  Adam Stambler, 2011
11 
12 XBee superclass module
13 
14 
15 This class defines data and methods common to all XBee modules.
16 This class should be subclassed in order to provide
17 series-specific functionality.
18 """
19 import struct, threading, time
20 from xbee.frame import APIFrame
21 
22 class ThreadQuitException(Exception):
23  pass
24 
25 class XBeeBase(threading.Thread):
26  """
27  Abstract base class providing command generation and response
28  parsing methods for XBee modules.
29 
30  Constructor arguments:
31  ser: The file-like serial port to use.
32 
33 
34  shorthand: boolean flag which determines whether shorthand command
35  calls (i.e. xbee.at(...) instead of xbee.send("at",...)
36  are allowed.
37 
38  callback: function which should be called with frame data
39  whenever a frame arrives from the serial port.
40  When this is not None, a background thread to monitor
41  the port and call the given function is automatically
42  started.
43 
44  escaped: boolean flag which determines whether the library should
45  operate in escaped mode. In this mode, certain data bytes
46  in the output and input streams will be escaped and unescaped
47  in accordance with the XBee API. This setting must match
48  the appropriate api_mode setting of an XBee device; see your
49  XBee device's documentation for more information.
50  """
51 
52  def __init__(self, ser, shorthand=True, callback=None, escaped=False):
53  super(XBeeBase, self).__init__()
54  self.serial = ser
55  self.shorthand = shorthand
56  self._callback = None
57  self._thread_continue = False
58  self._escaped = escaped
59 
60  if callback:
61  self._callback = callback
62  self._thread_continue = True
63  self._thread_quit = threading.Event()
64  self.start()
65 
66  def halt(self):
67  """
68  halt: None -> None
69 
70  If this instance has a separate thread running, it will be
71  halted. This method will wait until the thread has cleaned
72  up before returning.
73  """
74  if self._callback:
75  self._thread_continue = False
76  self._thread_quit.wait()
77 
78  def _write(self, data):
79  """
80  _write: binary data -> None
81 
82  Packages the given binary data in an API frame and writes the
83  result to the serial port
84  """
85  frame = APIFrame(data, self._escaped).output()
86  self.serial.write(frame)
87 
88  def run(self):
89  """
90  run: None -> None
91 
92  This method overrides threading.Thread.run() and is automatically
93  called when an instance is created with threading enabled.
94  """
95  while True:
96  try:
97  self._callback(self.wait_read_frame())
98  except ThreadQuitException:
99  break
100  self._thread_quit.set()
101 
102  def _wait_for_frame(self):
103  """
104  _wait_for_frame: None -> binary data
105 
106  _wait_for_frame will read from the serial port until a valid
107  API frame arrives. It will then return the binary data
108  contained within the frame.
109 
110  If this method is called as a separate thread
111  and self.thread_continue is set to False, the thread will
112  exit by raising a ThreadQuitException.
113  """
114  frame = APIFrame(escaped=self._escaped)
115  mode = 0
116  while True:
117  if self._callback and not self._thread_continue:
118  raise ThreadQuitException
119 
120  while ( self.serial.inWaiting() <1):
121  time.sleep(0.01)
122  byte = self.serial.read()
123  if byte =='':
124  continue
125 
126  if (mode ==0):
127  if byte == APIFrame.START_BYTE:
128  mode=1
129  else:
130  continue
131 
132  frame.fill(byte)
133 
134  if ( (mode==1) and (frame.remaining_bytes() <=0) ) :
135  try:
136  # Try to parse and return result
137  frame.parse()
138  mode =0
139  return frame
140  except ValueError:
141  # Bad frame, so restart
142  mode=0
143  frame = APIFrame(escaped=self._escaped)
144 
145  def _build_command(self, cmd, **kwargs):
146  """
147  _build_command: string (binary data) ... -> binary data
148 
149  _build_command will construct a command packet according to the
150  specified command's specification in api_commands. It will expect
151  named arguments for all fields other than those with a default
152  value or a length of 'None'.
153 
154  Each field will be written out in the order they are defined
155  in the command definition.
156  """
157  try:
158  cmd_spec = self.api_commands[cmd]
159  except AttributeError:
160  raise NotImplementedError("API command specifications could not be found; use a derived class which defines 'api_commands'.")
161 
162  packet = ''
163 
164  for field in cmd_spec:
165  try:
166  # Read this field's name from the function arguments dict
167  data = kwargs[field['name']]
168  except KeyError:
169  # Data wasn't given
170  # Only a problem if the field has a specific length
171  if field['len'] is not None:
172  # Was a default value specified?
173  default_value = field['default']
174  if default_value:
175  # If so, use it
176  data = default_value
177  else:
178  # Otherwise, fail
179  raise KeyError(
180  "The expected field %s of length %d was not provided"
181  % (field['name'], field['len']))
182  else:
183  # No specific length, ignore it
184  data = None
185 
186  # Ensure that the proper number of elements will be written
187  if field['len'] and len(data) != field['len']:
188  raise ValueError(
189  "The data provided for '%s' was not %d bytes long"\
190  % (field['name'], field['len']))
191 
192  # Add the data to the packet, if it has been specified
193  # Otherwise, the parameter was of variable length, and not
194  # given
195  if data:
196  packet += data
197 
198  return packet
199 
200  def _split_response(self, data):
201  """
202  _split_response: binary data -> {'id':str,
203  'param':binary data,
204  ...}
205 
206  _split_response takes a data packet received from an XBee device
207  and converts it into a dictionary. This dictionary provides
208  names for each segment of binary data as specified in the
209  api_responses spec.
210  """
211  # Fetch the first byte, identify the packet
212  # If the spec doesn't exist, raise exception
213  packet_id = data[0]
214  try:
215  packet = self.api_responses[packet_id]
216  except AttributeError:
217  raise NotImplementedError("API response specifications could not be found; use a derived class which defines 'api_responses'.")
218  except KeyError:
219  raise KeyError(
220  "Unrecognized response packet with id byte %s"
221  % data[0])
222 
223  # Current byte index in the data stream
224  index = 1
225 
226  # Result info
227  info = {'id':packet['name']}
228  packet_spec = packet['structure']
229 
230  # Parse the packet in the order specified
231  for field in packet_spec:
232  if field['len'] == 'null_terminated':
233  field_data = ''
234 
235  while data[index] != '\x00':
236  field_data += data[index]
237  index += 1
238 
239  index += 1
240  info[field['name']] = field_data
241  elif field['len'] is not None:
242  # Store the number of bytes specified
243 
244  # Are we trying to read beyond the last data element?
245  if index + field['len'] > len(data):
246  raise ValueError(
247  "Response packet was shorter than expected")
248 
249  field_data = data[index:index + field['len']]
250  info[field['name']] = field_data
251 
252  index += field['len']
253  # If the data field has no length specified, store any
254  # leftover bytes and quit
255  else:
256  field_data = data[index:]
257 
258  # Were there any remaining bytes?
259  if field_data:
260  # If so, store them
261  info[field['name']] = field_data
262  index += len(field_data)
263  break
264 
265  # If there are more bytes than expected, raise an exception
266  if index < len(data):
267  raise ValueError(
268  "Response packet was longer than expected; expected: %d, got: %d bytes" % (index,
269  len(data)))
270 
271  # Check if this packet was an IO sample
272  # If so, process the sample data
273  if 'parse_as_io_samples' in packet:
274  field_to_process = packet['parse_as_io_samples']
275  info[field_to_process] = self._parse_samples(
276  info[field_to_process])
277 
278  return info
279 
280  def _parse_samples_header(self, io_bytes):
281  """
282  _parse_samples_header: binary data in XBee IO data format ->
283  (int, [int ...], [int ...], int, int)
284 
285  _parse_samples_header will read the first three bytes of the
286  binary data given and will return the number of samples which
287  follow, a list of enabled digital inputs, a list of enabled
288  analog inputs, the dio_mask, and the size of the header in bytes
289  """
290  header_size = 3
291 
292  # number of samples (always 1?) is the first byte
293  sample_count = ord(io_bytes[0])
294 
295  # part of byte 1 and byte 2 are the DIO mask ( 9 bits )
296  dio_mask = (ord(io_bytes[1]) << 8 | ord(io_bytes[2])) & 0x01FF
297 
298  # upper 7 bits of byte 1 is the AIO mask
299  aio_mask = (ord(io_bytes[1]) & 0xFE) >> 1
300 
301  # sorted lists of enabled channels; value is position of bit in mask
302  dio_chans = []
303  aio_chans = []
304 
305  for i in range(0,9):
306  if dio_mask & (1 << i):
307  dio_chans.append(i)
308 
309  dio_chans.sort()
310 
311  for i in range(0,7):
312  if aio_mask & (1 << i):
313  aio_chans.append(i)
314 
315  aio_chans.sort()
316 
317  return (sample_count, dio_chans, aio_chans, dio_mask, header_size)
318 
319  def _parse_samples(self, io_bytes):
320  """
321  _parse_samples: binary data in XBee IO data format ->
322  [ {"dio-0":True,
323  "dio-1":False,
324  "adc-0":100"}, ...]
325 
326  _parse_samples reads binary data from an XBee device in the IO
327  data format specified by the API. It will then return a
328  dictionary indicating the status of each enabled IO port.
329  """
330 
331  sample_count, dio_chans, aio_chans, dio_mask, header_size = \
332  self._parse_samples_header(io_bytes)
333 
334  samples = []
335 
336  # split the sample data into a list, so it can be pop()'d
337  sample_bytes = [ord(c) for c in io_bytes[header_size:]]
338 
339  # repeat for every sample provided
340  for sample_ind in range(0, sample_count):
341  tmp_samples = {}
342 
343  if dio_chans:
344  # we have digital data
345  digital_data_set = (sample_bytes.pop(0) << 8 | sample_bytes.pop(0))
346  digital_values = dio_mask & digital_data_set
347 
348  for i in dio_chans:
349  tmp_samples['dio-%d' % i] = True if (digital_values >> i) & 1 else False
350 
351  for i in aio_chans:
352  # only first 10 bits are significant
353  analog_sample = (sample_bytes.pop(0) << 8 | sample_bytes.pop(0)) & 0x03FF
354  tmp_samples['adc-%d' % i] = analog_sample
355 
356  samples.append(tmp_samples)
357 
358  return samples
359 
360  def send(self, cmd, **kwargs):
361  """
362  send: string param=binary data ... -> None
363 
364  When send is called with the proper arguments, an API command
365  will be written to the serial port for this XBee device
366  containing the proper instructions and data.
367 
368  This method must be called with named arguments in accordance
369  with the api_command specification. Arguments matching all
370  field names other than those in reserved_names (like 'id' and
371  'order') should be given, unless they are of variable length
372  (of 'None' in the specification. Those are optional).
373  """
374  # Pass through the keyword arguments
375  self._write(self._build_command(cmd, **kwargs))
376 
377 
378  def wait_read_frame(self):
379  """
380  wait_read_frame: None -> frame info dictionary
381 
382  wait_read_frame calls XBee._wait_for_frame() and waits until a
383  valid frame appears on the serial port. Once it receives a frame,
384  wait_read_frame attempts to parse the data contained within it
385  and returns the resulting dictionary
386  """
387 
388  frame = self._wait_for_frame()
389  return self._split_response(frame.data)
390 
391  def __getattr__(self, name):
392  """
393  If a method by the name of a valid api command is called,
394  the arguments will be automatically sent to an appropriate
395  send() call
396  """
397 
398  # If api_commands is not defined, raise NotImplementedError\
399  # If its not defined, _getattr__ will be called with its name
400  if name == 'api_commands':
401  raise NotImplementedError("API command specifications could not be found; use a derived class which defines 'api_commands'.")
402 
403  # Is shorthand enabled, and is the called name a command?
404  if self.shorthand and name in self.api_commands:
405  # If so, simply return a function which passes its arguments
406  # to an appropriate send() call
407  return lambda **kwargs: self.send(name, **kwargs)
408  else:
409  raise AttributeError("XBee has no attribute '%s'" % name)
def _split_response(self, data)
Definition: base.py:200
def halt(self)
Definition: base.py:66
def _parse_samples(self, io_bytes)
Definition: base.py:319
def _parse_samples_header(self, io_bytes)
Definition: base.py:280
def _wait_for_frame(self)
Definition: base.py:102
def _write(self, data)
Definition: base.py:78
def send(self, cmd, kwargs)
Definition: base.py:360
def _build_command(self, cmd, kwargs)
Definition: base.py:145
def run(self)
Definition: base.py:88
def __getattr__(self, name)
Definition: base.py:391
def __init__(self, ser, shorthand=True, callback=None, escaped=False)
Definition: base.py:52
def wait_read_frame(self)
Definition: base.py:378


rosserial_xbee
Author(s): Adam Stambler
autogenerated on Mon Jun 10 2019 14:53:52