5 Inspired by code written by Amit Synderman and Marco Sangalli 8 _wait_for_frame modified by Adam Stambler to allow for non 12 XBee superclass module 15 This class defines data and methods common to all XBee modules. 16 This class should be subclassed in order to provide 17 series-specific functionality. 19 import struct, threading, time
27 Abstract base class providing command generation and response 28 parsing methods for XBee modules. 30 Constructor arguments: 31 ser: The file-like serial port to use. 34 shorthand: boolean flag which determines whether shorthand command 35 calls (i.e. xbee.at(...) instead of xbee.send("at",...) 38 callback: function which should be called with frame data 39 whenever a frame arrives from the serial port. 40 When this is not None, a background thread to monitor 41 the port and call the given function is automatically 44 escaped: boolean flag which determines whether the library should 45 operate in escaped mode. In this mode, certain data bytes 46 in the output and input streams will be escaped and unescaped 47 in accordance with the XBee API. This setting must match 48 the appropriate api_mode setting of an XBee device; see your 49 XBee device's documentation for more information. 52 def __init__(self, ser, shorthand=True, callback=None, escaped=False):
70 If this instance has a separate thread running, it will be 71 halted. This method will wait until the thread has cleaned 76 self._thread_quit.wait()
80 _write: binary data -> None 82 Packages the given binary data in an API frame and writes the 83 result to the serial port 86 self.serial.write(frame)
92 This method overrides threading.Thread.run() and is automatically 93 called when an instance is created with threading enabled. 98 except ThreadQuitException:
100 self._thread_quit.set()
104 _wait_for_frame: None -> binary data 106 _wait_for_frame will read from the serial port until a valid 107 API frame arrives. It will then return the binary data 108 contained within the frame. 110 If this method is called as a separate thread 111 and self.thread_continue is set to False, the thread will 112 exit by raising a ThreadQuitException. 118 raise ThreadQuitException
120 while ( self.serial.inWaiting() <1):
122 byte = self.serial.read()
127 if byte == APIFrame.START_BYTE:
134 if ( (mode==1)
and (frame.remaining_bytes() <=0) ) :
147 _build_command: string (binary data) ... -> binary data 149 _build_command will construct a command packet according to the 150 specified command's specification in api_commands. It will expect 151 named arguments for all fields other than those with a default 152 value or a length of 'None'. 154 Each field will be written out in the order they are defined 155 in the command definition. 158 cmd_spec = self.api_commands[cmd]
159 except AttributeError:
160 raise NotImplementedError(
"API command specifications could not be found; use a derived class which defines 'api_commands'.")
164 for field
in cmd_spec:
167 data = kwargs[field[
'name']]
171 if field[
'len']
is not None:
173 default_value = field[
'default']
180 "The expected field %s of length %d was not provided" 181 % (field[
'name'], field[
'len']))
187 if field[
'len']
and len(data) != field[
'len']:
189 "The data provided for '%s' was not %d bytes long"\
190 % (field[
'name'], field[
'len']))
202 _split_response: binary data -> {'id':str, 206 _split_response takes a data packet received from an XBee device 207 and converts it into a dictionary. This dictionary provides 208 names for each segment of binary data as specified in the 215 packet = self.api_responses[packet_id]
216 except AttributeError:
217 raise NotImplementedError(
"API response specifications could not be found; use a derived class which defines 'api_responses'.")
220 "Unrecognized response packet with id byte %s" 227 info = {
'id':packet[
'name']}
228 packet_spec = packet[
'structure']
231 for field
in packet_spec:
232 if field[
'len'] ==
'null_terminated':
235 while data[index] !=
'\x00':
236 field_data += data[index]
240 info[field[
'name']] = field_data
241 elif field[
'len']
is not None:
245 if index + field[
'len'] > len(data):
247 "Response packet was shorter than expected")
249 field_data = data[index:index + field[
'len']]
250 info[field[
'name']] = field_data
252 index += field[
'len']
256 field_data = data[index:]
261 info[field[
'name']] = field_data
262 index += len(field_data)
266 if index < len(data):
268 "Response packet was longer than expected; expected: %d, got: %d bytes" % (index,
273 if 'parse_as_io_samples' in packet:
274 field_to_process = packet[
'parse_as_io_samples']
276 info[field_to_process])
282 _parse_samples_header: binary data in XBee IO data format -> 283 (int, [int ...], [int ...], int, int) 285 _parse_samples_header will read the first three bytes of the 286 binary data given and will return the number of samples which 287 follow, a list of enabled digital inputs, a list of enabled 288 analog inputs, the dio_mask, and the size of the header in bytes 293 sample_count = ord(io_bytes[0])
296 dio_mask = (ord(io_bytes[1]) << 8 | ord(io_bytes[2])) & 0x01FF
299 aio_mask = (ord(io_bytes[1]) & 0xFE) >> 1
306 if dio_mask & (1 << i):
312 if aio_mask & (1 << i):
317 return (sample_count, dio_chans, aio_chans, dio_mask, header_size)
321 _parse_samples: binary data in XBee IO data format -> 326 _parse_samples reads binary data from an XBee device in the IO 327 data format specified by the API. It will then return a 328 dictionary indicating the status of each enabled IO port. 331 sample_count, dio_chans, aio_chans, dio_mask, header_size = \
337 sample_bytes = [ord(c)
for c
in io_bytes[header_size:]]
340 for sample_ind
in range(0, sample_count):
345 digital_data_set = (sample_bytes.pop(0) << 8 | sample_bytes.pop(0))
346 digital_values = dio_mask & digital_data_set
349 tmp_samples[
'dio-%d' % i] =
True if (digital_values >> i) & 1
else False 353 analog_sample = (sample_bytes.pop(0) << 8 | sample_bytes.pop(0)) & 0x03FF
354 tmp_samples[
'adc-%d' % i] = analog_sample
356 samples.append(tmp_samples)
362 send: string param=binary data ... -> None 364 When send is called with the proper arguments, an API command 365 will be written to the serial port for this XBee device 366 containing the proper instructions and data. 368 This method must be called with named arguments in accordance 369 with the api_command specification. Arguments matching all 370 field names other than those in reserved_names (like 'id' and 371 'order') should be given, unless they are of variable length 372 (of 'None' in the specification. Those are optional). 380 wait_read_frame: None -> frame info dictionary 382 wait_read_frame calls XBee._wait_for_frame() and waits until a 383 valid frame appears on the serial port. Once it receives a frame, 384 wait_read_frame attempts to parse the data contained within it 385 and returns the resulting dictionary 393 If a method by the name of a valid api command is called, 394 the arguments will be automatically sent to an appropriate 400 if name ==
'api_commands':
401 raise NotImplementedError(
"API command specifications could not be found; use a derived class which defines 'api_commands'.")
404 if self.
shorthand and name
in self.api_commands:
407 return lambda **kwargs: self.
send(name, **kwargs)
409 raise AttributeError(
"XBee has no attribute '%s'" % name)
def _split_response(self, data)
def _parse_samples(self, io_bytes)
def _parse_samples_header(self, io_bytes)
def _wait_for_frame(self)
def send(self, cmd, kwargs)
def _build_command(self, cmd, kwargs)
def __getattr__(self, name)
def __init__(self, ser, shorthand=True, callback=None, escaped=False)
def wait_read_frame(self)