$search
00001 """ 00002 xbee.py 00003 00004 By Paul Malmsten, 2010 00005 Inspired by code written by Amit Synderman and Marco Sangalli 00006 pmalmsten@gmail.com 00007 00008 _wait_for_frame modified by Adam Stambler to allow for non 00009 blocking io 00010 Adam Stambler, 2011 00011 00012 XBee superclass module 00013 00014 00015 This class defines data and methods common to all XBee modules. 00016 This class should be subclassed in order to provide 00017 series-specific functionality. 00018 """ 00019 import struct, threading, time 00020 from xbee.frame import APIFrame 00021 00022 class ThreadQuitException(Exception): 00023 pass 00024 00025 class XBeeBase(threading.Thread): 00026 """ 00027 Abstract base class providing command generation and response 00028 parsing methods for XBee modules. 00029 00030 Constructor arguments: 00031 ser: The file-like serial port to use. 00032 00033 00034 shorthand: boolean flag which determines whether shorthand command 00035 calls (i.e. xbee.at(...) instead of xbee.send("at",...) 00036 are allowed. 00037 00038 callback: function which should be called with frame data 00039 whenever a frame arrives from the serial port. 00040 When this is not None, a background thread to monitor 00041 the port and call the given function is automatically 00042 started. 00043 00044 escaped: boolean flag which determines whether the library should 00045 operate in escaped mode. In this mode, certain data bytes 00046 in the output and input streams will be escaped and unescaped 00047 in accordance with the XBee API. This setting must match 00048 the appropriate api_mode setting of an XBee device; see your 00049 XBee device's documentation for more information. 00050 """ 00051 00052 def __init__(self, ser, shorthand=True, callback=None, escaped=False): 00053 super(XBeeBase, self).__init__() 00054 self.serial = ser 00055 self.shorthand = shorthand 00056 self._callback = None 00057 self._thread_continue = False 00058 self._escaped = escaped 00059 00060 if callback: 00061 self._callback = callback 00062 self._thread_continue = True 00063 self._thread_quit = threading.Event() 00064 self.start() 00065 00066 def halt(self): 00067 """ 00068 halt: None -> None 00069 00070 If this instance has a separate thread running, it will be 00071 halted. This method will wait until the thread has cleaned 00072 up before returning. 00073 """ 00074 if self._callback: 00075 self._thread_continue = False 00076 self._thread_quit.wait() 00077 00078 def _write(self, data): 00079 """ 00080 _write: binary data -> None 00081 00082 Packages the given binary data in an API frame and writes the 00083 result to the serial port 00084 """ 00085 frame = APIFrame(data, self._escaped).output() 00086 self.serial.write(frame) 00087 00088 def run(self): 00089 """ 00090 run: None -> None 00091 00092 This method overrides threading.Thread.run() and is automatically 00093 called when an instance is created with threading enabled. 00094 """ 00095 while True: 00096 try: 00097 self._callback(self.wait_read_frame()) 00098 except ThreadQuitException: 00099 break 00100 self._thread_quit.set() 00101 00102 def _wait_for_frame(self): 00103 """ 00104 _wait_for_frame: None -> binary data 00105 00106 _wait_for_frame will read from the serial port until a valid 00107 API frame arrives. It will then return the binary data 00108 contained within the frame. 00109 00110 If this method is called as a separate thread 00111 and self.thread_continue is set to False, the thread will 00112 exit by raising a ThreadQuitException. 00113 """ 00114 frame = APIFrame(escaped=self._escaped) 00115 mode = 0 00116 while True: 00117 if self._callback and not self._thread_continue: 00118 raise ThreadQuitException 00119 00120 while ( self.serial.inWaiting() <1): 00121 time.sleep(0.01) 00122 byte = self.serial.read() 00123 if byte =='': 00124 continue 00125 00126 if (mode ==0): 00127 if byte == APIFrame.START_BYTE: 00128 mode=1 00129 else: 00130 continue 00131 00132 frame.fill(byte) 00133 00134 if ( (mode==1) and (frame.remaining_bytes() <=0) ) : 00135 try: 00136 # Try to parse and return result 00137 frame.parse() 00138 mode =0 00139 return frame 00140 except ValueError: 00141 # Bad frame, so restart 00142 mode=0 00143 frame = APIFrame(escaped=self._escaped) 00144 00145 def _build_command(self, cmd, **kwargs): 00146 """ 00147 _build_command: string (binary data) ... -> binary data 00148 00149 _build_command will construct a command packet according to the 00150 specified command's specification in api_commands. It will expect 00151 named arguments for all fields other than those with a default 00152 value or a length of 'None'. 00153 00154 Each field will be written out in the order they are defined 00155 in the command definition. 00156 """ 00157 try: 00158 cmd_spec = self.api_commands[cmd] 00159 except AttributeError: 00160 raise NotImplementedError("API command specifications could not be found; use a derived class which defines 'api_commands'.") 00161 00162 packet = '' 00163 00164 for field in cmd_spec: 00165 try: 00166 # Read this field's name from the function arguments dict 00167 data = kwargs[field['name']] 00168 except KeyError: 00169 # Data wasn't given 00170 # Only a problem if the field has a specific length 00171 if field['len'] is not None: 00172 # Was a default value specified? 00173 default_value = field['default'] 00174 if default_value: 00175 # If so, use it 00176 data = default_value 00177 else: 00178 # Otherwise, fail 00179 raise KeyError( 00180 "The expected field %s of length %d was not provided" 00181 % (field['name'], field['len'])) 00182 else: 00183 # No specific length, ignore it 00184 data = None 00185 00186 # Ensure that the proper number of elements will be written 00187 if field['len'] and len(data) != field['len']: 00188 raise ValueError( 00189 "The data provided for '%s' was not %d bytes long"\ 00190 % (field['name'], field['len'])) 00191 00192 # Add the data to the packet, if it has been specified 00193 # Otherwise, the parameter was of variable length, and not 00194 # given 00195 if data: 00196 packet += data 00197 00198 return packet 00199 00200 def _split_response(self, data): 00201 """ 00202 _split_response: binary data -> {'id':str, 00203 'param':binary data, 00204 ...} 00205 00206 _split_response takes a data packet received from an XBee device 00207 and converts it into a dictionary. This dictionary provides 00208 names for each segment of binary data as specified in the 00209 api_responses spec. 00210 """ 00211 # Fetch the first byte, identify the packet 00212 # If the spec doesn't exist, raise exception 00213 packet_id = data[0] 00214 try: 00215 packet = self.api_responses[packet_id] 00216 except AttributeError: 00217 raise NotImplementedError("API response specifications could not be found; use a derived class which defines 'api_responses'.") 00218 except KeyError: 00219 raise KeyError( 00220 "Unrecognized response packet with id byte %s" 00221 % data[0]) 00222 00223 # Current byte index in the data stream 00224 index = 1 00225 00226 # Result info 00227 info = {'id':packet['name']} 00228 packet_spec = packet['structure'] 00229 00230 # Parse the packet in the order specified 00231 for field in packet_spec: 00232 if field['len'] == 'null_terminated': 00233 field_data = '' 00234 00235 while data[index] != '\x00': 00236 field_data += data[index] 00237 index += 1 00238 00239 index += 1 00240 info[field['name']] = field_data 00241 elif field['len'] is not None: 00242 # Store the number of bytes specified 00243 00244 # Are we trying to read beyond the last data element? 00245 if index + field['len'] > len(data): 00246 raise ValueError( 00247 "Response packet was shorter than expected") 00248 00249 field_data = data[index:index + field['len']] 00250 info[field['name']] = field_data 00251 00252 index += field['len'] 00253 # If the data field has no length specified, store any 00254 # leftover bytes and quit 00255 else: 00256 field_data = data[index:] 00257 00258 # Were there any remaining bytes? 00259 if field_data: 00260 # If so, store them 00261 info[field['name']] = field_data 00262 index += len(field_data) 00263 break 00264 00265 # If there are more bytes than expected, raise an exception 00266 if index < len(data): 00267 raise ValueError( 00268 "Response packet was longer than expected; expected: %d, got: %d bytes" % (index, 00269 len(data))) 00270 00271 # Check if this packet was an IO sample 00272 # If so, process the sample data 00273 if 'parse_as_io_samples' in packet: 00274 field_to_process = packet['parse_as_io_samples'] 00275 info[field_to_process] = self._parse_samples( 00276 info[field_to_process]) 00277 00278 return info 00279 00280 def _parse_samples_header(self, io_bytes): 00281 """ 00282 _parse_samples_header: binary data in XBee IO data format -> 00283 (int, [int ...], [int ...], int, int) 00284 00285 _parse_samples_header will read the first three bytes of the 00286 binary data given and will return the number of samples which 00287 follow, a list of enabled digital inputs, a list of enabled 00288 analog inputs, the dio_mask, and the size of the header in bytes 00289 """ 00290 header_size = 3 00291 00292 # number of samples (always 1?) is the first byte 00293 sample_count = ord(io_bytes[0]) 00294 00295 # part of byte 1 and byte 2 are the DIO mask ( 9 bits ) 00296 dio_mask = (ord(io_bytes[1]) << 8 | ord(io_bytes[2])) & 0x01FF 00297 00298 # upper 7 bits of byte 1 is the AIO mask 00299 aio_mask = (ord(io_bytes[1]) & 0xFE) >> 1 00300 00301 # sorted lists of enabled channels; value is position of bit in mask 00302 dio_chans = [] 00303 aio_chans = [] 00304 00305 for i in range(0,9): 00306 if dio_mask & (1 << i): 00307 dio_chans.append(i) 00308 00309 dio_chans.sort() 00310 00311 for i in range(0,7): 00312 if aio_mask & (1 << i): 00313 aio_chans.append(i) 00314 00315 aio_chans.sort() 00316 00317 return (sample_count, dio_chans, aio_chans, dio_mask, header_size) 00318 00319 def _parse_samples(self, io_bytes): 00320 """ 00321 _parse_samples: binary data in XBee IO data format -> 00322 [ {"dio-0":True, 00323 "dio-1":False, 00324 "adc-0":100"}, ...] 00325 00326 _parse_samples reads binary data from an XBee device in the IO 00327 data format specified by the API. It will then return a 00328 dictionary indicating the status of each enabled IO port. 00329 """ 00330 00331 sample_count, dio_chans, aio_chans, dio_mask, header_size = \ 00332 self._parse_samples_header(io_bytes) 00333 00334 samples = [] 00335 00336 # split the sample data into a list, so it can be pop()'d 00337 sample_bytes = [ord(c) for c in io_bytes[header_size:]] 00338 00339 # repeat for every sample provided 00340 for sample_ind in range(0, sample_count): 00341 tmp_samples = {} 00342 00343 if dio_chans: 00344 # we have digital data 00345 digital_data_set = (sample_bytes.pop(0) << 8 | sample_bytes.pop(0)) 00346 digital_values = dio_mask & digital_data_set 00347 00348 for i in dio_chans: 00349 tmp_samples['dio-%d' % i] = True if (digital_values >> i) & 1 else False 00350 00351 for i in aio_chans: 00352 # only first 10 bits are significant 00353 analog_sample = (sample_bytes.pop(0) << 8 | sample_bytes.pop(0)) & 0x03FF 00354 tmp_samples['adc-%d' % i] = analog_sample 00355 00356 samples.append(tmp_samples) 00357 00358 return samples 00359 00360 def send(self, cmd, **kwargs): 00361 """ 00362 send: string param=binary data ... -> None 00363 00364 When send is called with the proper arguments, an API command 00365 will be written to the serial port for this XBee device 00366 containing the proper instructions and data. 00367 00368 This method must be called with named arguments in accordance 00369 with the api_command specification. Arguments matching all 00370 field names other than those in reserved_names (like 'id' and 00371 'order') should be given, unless they are of variable length 00372 (of 'None' in the specification. Those are optional). 00373 """ 00374 # Pass through the keyword arguments 00375 self._write(self._build_command(cmd, **kwargs)) 00376 00377 00378 def wait_read_frame(self): 00379 """ 00380 wait_read_frame: None -> frame info dictionary 00381 00382 wait_read_frame calls XBee._wait_for_frame() and waits until a 00383 valid frame appears on the serial port. Once it receives a frame, 00384 wait_read_frame attempts to parse the data contained within it 00385 and returns the resulting dictionary 00386 """ 00387 00388 frame = self._wait_for_frame() 00389 return self._split_response(frame.data) 00390 00391 def __getattr__(self, name): 00392 """ 00393 If a method by the name of a valid api command is called, 00394 the arguments will be automatically sent to an appropriate 00395 send() call 00396 """ 00397 00398 # If api_commands is not defined, raise NotImplementedError\ 00399 # If its not defined, _getattr__ will be called with its name 00400 if name == 'api_commands': 00401 raise NotImplementedError("API command specifications could not be found; use a derived class which defines 'api_commands'.") 00402 00403 # Is shorthand enabled, and is the called name a command? 00404 if self.shorthand and name in self.api_commands: 00405 # If so, simply return a function which passes its arguments 00406 # to an appropriate send() call 00407 return lambda **kwargs: self.send(name, **kwargs) 00408 else: 00409 raise AttributeError("XBee has no attribute '%s'" % name)