zigbee.py
Go to the documentation of this file.
1 """
2 zigbee.py
3 
4 By Greg Rapp, 2010
5 Inspired by code written by Paul Malmsten, 2010
6 Inspired by code written by Amit Synderman and Marco Sangalli
7 gdrapp@gmail.com
8 
9 This module implements an XBee ZB (ZigBee) API library.
10 """
11 import struct
12 from xbee.base import XBeeBase
13 
15  """
16  Provides an implementation of the XBee API for XBee ZB (ZigBee) modules
17  with recent firmware.
18 
19  Commands may be sent to a device by instantiating this class with
20  a serial port object (see PySerial) and then calling the send
21  method with the proper information specified by the API. Data may
22  be read from a device synchronously by calling wait_read_frame.
23  For asynchronous reads, see the defintion of XBeeBase.
24  """
25  # Packets which can be sent to an XBee
26 
27  # Format:
28  # {name of command:
29  # [{name:field name, len:field length, default: default value sent}
30  # ...
31  # ]
32  # ...
33  # }
34  api_commands = {"at":
35  [{'name':'id', 'len':1, 'default':'\x08'},
36  {'name':'frame_id', 'len':1, 'default':'\x01'},
37  {'name':'command', 'len':2, 'default':None},
38  {'name':'parameter', 'len':None, 'default':None}],
39  "queued_at":
40  [{'name':'id', 'len':1, 'default':'\x09'},
41  {'name':'frame_id', 'len':1, 'default':'\x01'},
42  {'name':'command', 'len':2, 'default':None},
43  {'name':'parameter', 'len':None, 'default':None}],
44  "remote_at":
45  [{'name':'id', 'len':1, 'default':'\x17'},
46  {'name':'frame_id', 'len':1, 'default':'\x00'},
47  # dest_addr_long is 8 bytes (64 bits), so use an unsigned long long
48  {'name':'dest_addr_long', 'len':8, 'default':struct.pack('>Q', 0)},
49  {'name':'dest_addr', 'len':2, 'default':'\xFF\xFE'},
50  {'name':'options', 'len':1, 'default':'\x02'},
51  {'name':'command', 'len':2, 'default':None},
52  {'name':'parameter', 'len':None, 'default':None}],
53  "tx":
54  [{'name':'id', 'len':1, 'default':'\x10'},
55  {'name':'frame_id', 'len':1, 'default':'\x01'},
56  {'name':'dest_addr_long', 'len':8, 'default':None},
57  {'name':'dest_addr', 'len':2, 'default':None},
58  {'name':'broadcast_radius','len':1, 'default':'\x00'},
59  {'name':'options', 'len':1, 'default':'\x00'},
60  {'name':'data', 'len':None, 'default':None}],
61  "tx_explicit":
62  [{'name':'id', 'len':1, 'default':'\x11'},
63  {'name':'frame_id', 'len':1, 'default':'\x00'},
64  {'name':'dest_addr_long', 'len':8, 'default':None},
65  {'name':'dest_addr', 'len':2, 'default':None},
66  {'name':'src_endpoint', 'len':1, 'default':None},
67  {'name':'dest_endpoint', 'len':1, 'default':None},
68  {'name':'cluster', 'len':1, 'default':None},
69  {'name':'profile', 'len':1, 'default':None},
70  {'name':'broadcast_radius','len':1, 'default':'\x00'},
71  {'name':'options', 'len':1, 'default':'\x00'},
72  {'name':'data', 'len':None, 'default':None}]
73  }
74 
75  # Packets which can be received from an XBee
76 
77  # Format:
78  # {id byte received from XBee:
79  # {name: name of response
80  # structure:
81  # [ {'name': name of field, 'len':length of field}
82  # ...
83  # ]
84  # parse_as_io_samples:name of field to parse as io
85  # }
86  # ...
87  # }
88  #
89  api_responses = {"\x90":
90  {'name':'rx',
91  'structure':
92  [{'name':'source_addr_long','len':8},
93  {'name':'source_addr', 'len':2},
94  {'name':'options', 'len':1},
95  {'name':'rf_data', 'len':None}]},
96  "\x91":
97  {'name':'rx_explicit',
98  'structure':
99  [{'name':'source_addr_long','len':8},
100  {'name':'source_addr', 'len':2},
101  {'name':'source_endpoint', 'len':1},
102  {'name':'dest_endpoint', 'len':1},
103  {'name':'cluster', 'len':2},
104  {'name':'profile', 'len':2},
105  {'name':'options', 'len':1},
106  {'name':'rf_data', 'len':None}]},
107  "\x92": # Checked by GDR-parse_samples_header function appears to need update to support
108  {'name':'rx_io_data_long_addr',
109  'structure':
110  [{'name':'source_addr_long','len':8},
111  {'name':'source_addr', 'len':2},
112  {'name':'options', 'len':1},
113  {'name':'samples', 'len':None}],
114  'parse_as_io_samples':'samples'},
115  "\x8b":
116  {'name':'tx_status',
117  'structure':
118  [{'name':'frame_id', 'len':1},
119  {'name':'dest_addr', 'len':2},
120  {'name':'retries', 'len':1},
121  {'name':'deliver_status', 'len':1},
122  {'name':'discover_status', 'len':1}]},
123  "\x8a":
124  {'name':'status',
125  'structure':
126  [{'name':'status', 'len':1}]},
127  "\x88":
128  {'name':'at_response',
129  'structure':
130  [{'name':'frame_id', 'len':1},
131  {'name':'command', 'len':2},
132  {'name':'status', 'len':1},
133  {'name':'parameter', 'len':None}]},
134  "\x97": #Checked GDR (not sure about parameter, could be 4 bytes)
135  {'name':'remote_at_response',
136  'structure':
137  [{'name':'frame_id', 'len':1},
138  {'name':'source_addr_long','len':8},
139  {'name':'source_addr', 'len':2},
140  {'name':'command', 'len':2},
141  {'name':'status', 'len':1},
142  {'name':'parameter', 'len':None}]},
143  "\x95":
144  {'name':'node_id_indicator',
145  'structure':
146  [{'name':'sender_addr_long','len':8},
147  {'name':'sender_addr', 'len':2},
148  {'name':'options', 'len':1},
149  {'name':'source_addr', 'len':2},
150  {'name':'source_addr_long','len':8},
151  {'name':'node_id', 'len':'null_terminated'},
152  {'name':'parent_source_addr','len':2},
153  {'name':'device_type', 'len':1},
154  {'name':'source_event', 'len':1},
155  {'name':'digi_profile_id', 'len':2},
156  {'name':'manufacturer_id', 'len':2}]}
157  }
158 
159  def __init__(self, *args, **kwargs):
160  # Call the super class constructor to save the serial port
161  super(ZigBee, self).__init__(*args, **kwargs)
162 
163  def _parse_samples_header(self, io_bytes):
164  """
165  _parse_samples_header: binary data in XBee ZB IO data format ->
166  (int, [int ...], [int ...], int, int)
167 
168  _parse_samples_header will read the first three bytes of the
169  binary data given and will return the number of samples which
170  follow, a list of enabled digital inputs, a list of enabled
171  analog inputs, the dio_mask, and the size of the header in bytes
172 
173  _parse_samples_header is overloaded here to support the additional
174  IO lines offered by the XBee ZB
175  """
176  header_size = 4
177 
178  # number of samples (always 1?) is the first byte
179  sample_count = ord(io_bytes[0])
180 
181  # bytes 1 and 2 are the DIO mask; bits 9 and 8 aren't used
182  dio_mask = (ord(io_bytes[1]) << 8 | ord(io_bytes[2])) & 0x0E7F
183 
184  # byte 3 is the AIO mask
185  aio_mask = ord(io_bytes[3])
186 
187  # sorted lists of enabled channels; value is position of bit in mask
188  dio_chans = []
189  aio_chans = []
190 
191  for i in range(0,13):
192  if dio_mask & (1 << i):
193  dio_chans.append(i)
194 
195  dio_chans.sort()
196 
197  for i in range(0,8):
198  if aio_mask & (1 << i):
199  aio_chans.append(i)
200 
201  aio_chans.sort()
202 
203  return (sample_count, dio_chans, aio_chans, dio_mask, header_size)
Definition: base.py:1
def _parse_samples_header(self, io_bytes)
Definition: zigbee.py:163
def __init__(self, args, kwargs)
Definition: zigbee.py:159


rosserial_xbee
Author(s): Adam Stambler
autogenerated on Mon Jun 10 2019 14:53:52