00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 """
00038 Provides interface to Viaflo Single Channel Vision Pipette with Bluetooth
00039 connection.
00040
00041 Interface allows opening, closing, polling and pipette operations.
00042 """
00043
00044 import bluetooth
00045 import re
00046 from collections import defaultdict
00047
00048 class NotConnectedException(Exception): pass
00049 class InvalidChecksumError(Exception): pass
00050 class IncompleteResponseError(Exception): pass
00051 class InvalidMessageTypeError(Exception): pass
00052 class InvalidEscapeSequenceError(Exception) : pass
00053 class SeqNumMismatchError(Exception) : pass
00054 class SeqNumMismatchError(Exception) : pass
00055 class ResponseError(Exception) : pass
00056
00057 def _short_num_to_bytes(num):
00058 """
00059 Convert short number into two byte list representation
00060 @param number int : Number to convert
00061 @return [int,int] : Two bytes
00062 """
00063 return [(num >> 8) & 0xFF, num & 0xFF]
00064
00065 def _bytes_to_num(data):
00066 """
00067 Convert list with two or four bytes into integer
00068 @param data, list of bytes (integers), with either 2 or 4 elements
00069 @return int : Number
00070 """
00071 total = 0
00072 for d in data:
00073 total = (total << 8) + d
00074 return total
00075
00076 def _convert_bytes_to_str(data):
00077 """
00078 Convert list of bytes into string
00079 """
00080 chrs = [ chr(val) for val in data ]
00081 return ''.join(chrs)
00082
00083 def _convert_str_to_bytes(data):
00084 """
00085 Unpack string, convert to list of bytes
00086 """
00087 return [ ord(c) for c in data ]
00088
00089
00090
00091
00092 _STX = 2
00093
00094 _ETX = 3
00095
00096 _ESC = 27
00097
00098 _LF = 10
00099
00100
00101 _action_dict = defaultdict(lambda : "???", {1:'Position Inquiry', 2:'Homing', 3:'Purge', 4:'Aspirate', 5:'Aspirate with jog', 6:'Dispense', 7:'Dispense to 0', 8:'Mix', 9:'Prompt'})
00102
00103
00104 class ViafloResponse(object):
00105 def __init__(self, data):
00106 self.length = _bytes_to_num((data[0], data[1]))
00107 if self.length != len(data):
00108 print data
00109
00110 raise IncompleteResponseError("Incomplete response. Expected data of length %d, received %d" % (self.length, len(data)))
00111
00112 self.checksum = data[2]
00113
00114 data_checksum = sum(data) & 0xFF
00115 if data_checksum != 0:
00116
00117
00118 raise InvalidChecksumError("Checksum is invalid. Computed sum %d, Msg checksum %d" % (data_checksum, self.checksum))
00119
00120 self.seqnum = _bytes_to_num((data[3], data[4]))
00121 self.resend = data[5]
00122 self.msg_type = _bytes_to_num((data[6], data[7]))
00123 self.error_code = data[8]
00124
00125 if self.error_code != 0:
00126 raise ResponseError("Got response error code %d" % self.error_code)
00127
00128 self.body = data[9:]
00129
00130 def _body_substr(self, msg_offset, length):
00131 """
00132 Semi-knock off of perl substring function. Makes porting perl code to python easier.
00133 """
00134
00135
00136 if msg_offset < 9:
00137 raise RuntimeError("Invalid offset %d" % msg_offset)
00138 return self.body[msg_offset-9:msg_offset-9+length]
00139
00140 def _body_at(self, msg_offset):
00141 """
00142 Semi-knock off of perl substr function. Returns value at index
00143 """
00144
00145
00146 if msg_offset < 9:
00147 raise RuntimeError("Invalid offset %d" % msg_offset)
00148 return self.body[msg_offset-9]
00149
00150
00151 class ViafloInfoResponse(ViafloResponse):
00152 def __init__(self, data):
00153 ViafloResponse.__init__(self, data)
00154
00155 if self.msg_type != 1:
00156 raise InvalidMessageTypeError("Info message should be of type 1. Got type %d" % self.msg_type)
00157
00158 self.owner = _convert_bytes_to_str(self._body_substr(10,12))
00159 self.serial = _convert_bytes_to_str(self._body_substr(22,10))
00160
00161 self.fwmajor = _bytes_to_num(self._body_substr(32,2))
00162 self.fwminor = _bytes_to_num(self._body_substr(34,2))
00163 self.model = _convert_bytes_to_str(self._body_substr(36,5))
00164 self.type = _bytes_to_num(self._body_substr(41,1))
00165 self.voyager_flag = _bytes_to_num(self.body[33:34])
00166 self.aspirate_dispense_table_page = _bytes_to_num(self.body[34:36])
00167 self.aspirate_factory_factor = _bytes_to_num(self.body[36:38])
00168 self.dispense_factory_factor = _bytes_to_num(self.body[38:40])
00169 self.aspirate_dispense_factor = _bytes_to_num(self.body[40:42])
00170 self.dispense_dispense_factor = _bytes_to_num(self.body[42:44])
00171 self.pipette_min_stops = _bytes_to_num(self._body_substr(53,2))
00172 self.pipette_max_stops = _bytes_to_num(self._body_substr(55,2))
00173 self.first_flash_page = _bytes_to_num(self.body[48:50])
00174 self.calibration_timer_on = _bytes_to_num(self.body[50:52])
00175 self.calibration_timer_time_months = _bytes_to_num(self.body[52:54])
00176 self.time_high = _bytes_to_num(self.body[54:56])
00177 self.time_low = _bytes_to_num(self.body[56:58])
00178 self.motor_steps_millions = _bytes_to_num(self._body_substr(67,4))
00179 self.motor_steps = _bytes_to_num(self._body_substr(71,4))
00180 self.display_decimal = _bytes_to_num(self._body_substr(75,2))
00181
00182
00183 self.at_least_1_7 = self.fwmajor > 1 or (self.fwmajor == 1 and self.fwminor >= 7)
00184
00185 if (self.at_least_1_7):
00186 self.firmware_engineering_revision = _bytes_to_num(self._body_substr(77,2))
00187 self.firmware_type = _bytes_to_num(self._body_substr(79,2))
00188
00189
00190
00191
00192 def __str__(self):
00193 type_dict = defaultdict(lambda : "???", {1:'Single Channel', 2:'Multi Channel', 3:'Step'})
00194
00195 out = [ "ViafloInfoResponse" ]
00196 out.append("\tOwner: '%s'" % self.owner )
00197 out.append("\tSerial: '%s'" % self.serial )
00198 out.append("\tFW Major: %d" % self.fwmajor )
00199 out.append("\tFW Minor: %d" % self.fwminor )
00200 out.append("\tModel: '%s'" % self.model)
00201 out.append("\tType: %d '%s'" % (self.type, type_dict[self.type]) )
00202 out.append("\tVoyager Flag: %d" % self.voyager_flag )
00203 out.append("\tMotor Steps Millions: %d" % self.motor_steps_millions )
00204 out.append("\tMotor Steps: %d" % self.motor_steps)
00205 out.append("\tMin Stops: %d" % self.pipette_min_stops)
00206 out.append("\tMax Stops: %d" % self.pipette_max_stops)
00207 out.append("\tCalibration Timer On: %d" % self.calibration_timer_on)
00208 out.append("\tCalibration Timer Time Months: %d" % self.calibration_timer_time_months)
00209 out.append("\tDisplay Decimal: %d" % self.display_decimal)
00210 if (self.at_least_1_7):
00211 out.append("\tEngineering FW Rev: %d" % self.firmware_engineering_revision)
00212 out.append("\tFW Type: %d" % self.firmware_type)
00213
00214 out.append("\tError Code (msg): %d" % self.error_code)
00215 out.append("\tBody Length: %d" % (len(self.body)))
00216
00217
00218 return '\n'.join(out)
00219
00220 class ViafloPipettorActionResponse(ViafloResponse):
00221 def __init__(self, data):
00222 ViafloResponse.__init__(self, data)
00223
00224 if self.msg_type != 4:
00225 raise InvalidMessageTypeError("Action response should be of type 4. Got type %d" % self.msg_type)
00226
00227 self.action = _bytes_to_num(self._body_substr(10,1))
00228 self.pipette_stop = _bytes_to_num(self._body_substr(11,2))
00229
00230 def __str__(self):
00231 out = [ "ViafloPipettorActionResponse "]
00232 out.append("\tAction: %d '%s'" % (self.action, _action_dict[self.action]) )
00233 out.append("\tPipette Stop: %d" % self.pipette_stop)
00234 out.append("\tError Code: %d" % self.error_code)
00235 out.append("\tBody Length: %d" % (len(self.body)))
00236 return '\n'.join(out)
00237
00238 class ViafloVersionResponse(ViafloResponse):
00239 def __init__(self, data):
00240 ViafloResponse.__init__(self, data)
00241
00242 def dot_join(data):
00243 return '.'.join([str(n) for n in data])
00244
00245 self.tracking_number = dot_join(self._body_substr(10,5))
00246 self.overall_version = dot_join([self._body_at(19), self._body_at(20), self._body_at(27)])
00247 self.firmware_version = dot_join([_bytes_to_num(self._body_substr(15,2)),_bytes_to_num(self._body_substr(17,2)),self._body_at(28)])
00248 self.interpreter_version = dot_join(self._body_substr(21,2))
00249 self.hardware_control_version = dot_join(self._body_substr(23,2))
00250 self.motor_control_version = dot_join(self._body_substr(25,2))
00251
00252 def __str__(self):
00253 out = [ "ViafloVersionResponse "]
00254 out.append("\tTracking Number: '%s'" % (self.tracking_number) )
00255 out.append("\tOverall Version: '%s'" % (self.overall_version) )
00256 out.append("\tFirmware Version: '%s'" % (self.firmware_version) )
00257 out.append("\tInterpreter Version: '%s'" % (self.interpreter_version) )
00258 out.append("\tHW Control Version: '%s'" % (self.hardware_control_version) )
00259 out.append("\tMotor Control Version: '%s'" % (self.motor_control_version) )
00260 return '\n'.join(out)
00261
00262
00263
00264 class ViafloCommand(object):
00265 def __init__(self, msg_type, body):
00266 self.checksum = 0
00267 self.length = len(body) + 8
00268 self.body = body
00269
00270 self.resend = 0
00271 self.type = msg_type
00272
00273 def pack(self, seqnum):
00274 """
00275 Packs message for transport
00276 """
00277 data = [ 0 for i in range(8)]
00278 len_bytes = _short_num_to_bytes(self.length)
00279 data[0] = len_bytes[0]
00280 data[1] = len_bytes[1]
00281 data[2] = 0
00282 seq_bts = _short_num_to_bytes(seqnum)
00283 data[3] = seq_bts[0]
00284 data[4] = seq_bts[1]
00285 data[5] = 0
00286 typ_bts = _short_num_to_bytes(self.type)
00287 data[6] = typ_bts[0]
00288 data[7] = typ_bts[1]
00289 data += self.body
00290
00291
00292 data[2] = (-sum(data)) & 0xff
00293
00294
00295 escaped_data = []
00296 for d in data:
00297 if (d==_STX) or (d==_ETX) or (d==_ESC):
00298 escaped_data.append(_ESC)
00299 escaped_data.append(d)
00300
00301
00302
00303
00304
00305
00306 cmd = [_STX] + escaped_data + [_ETX,_ETX,_ETX,_LF]
00307
00308 return _convert_bytes_to_str(cmd)
00309
00310 class ViafloInfoCommand(ViafloCommand):
00311 def __init__(self):
00312 ViafloCommand.__init__(self, 1, [])
00313
00314 class ViafloVersionCommand(ViafloCommand):
00315 def __init__(self):
00316 ViafloCommand.__init__(self, 10, [])
00317
00318 class ViafloPurgeCommand(ViafloCommand):
00319 def __init__(self, speed, number_of_bells=0):
00320 body = [3,speed]
00321 if number_of_bells!=0:
00322 body.append(number_of_bells)
00323 ViafloCommand.__init__(self, 4, body)
00324
00325 class ViafloAspirateCommand(ViafloCommand):
00326 def __init__(self, speed, stops, number_of_bells=0):
00327 body = [4, speed] + _short_num_to_bytes(stops)
00328 if number_of_bells!=0:
00329 body.append(number_of_bells)
00330 ViafloCommand.__init__(self, 4, body)
00331
00332 class ViafloDispenseCommand(ViafloCommand):
00333 def __init__(self, speed, stops, number_of_bells=0):
00334 body = [6, speed] + _short_num_to_bytes(stops)
00335 if number_of_bells!=0:
00336 body.append(number_of_bells)
00337 ViafloCommand.__init__(self, 4, body)
00338
00339 class ViafloPositionQueryCommand(ViafloCommand):
00340 def __init__(self):
00341 ViafloCommand.__init__(self, 4, [1])
00342
00343
00344 def unpack_msg(raw_data, expected_seq_num):
00345 """
00346 Unpacks data and returns response message
00347 @return ViafloResponse
00348 """
00349
00350
00351
00352
00353 if re.search("\x1B[^\x02\x03\x1B]", raw_data):
00354 print "Invalid escape seqeunce found"
00355
00356
00357
00358 raw_data = re.sub("\x1B([\x02\x03\x1B])","\\1",raw_data)
00359
00360
00361 data = _convert_str_to_bytes(raw_data)
00362
00363 if len(data) < 11:
00364 raise IncompleteResponseError("Incomplete response. Data was only %d long." % len(data))
00365
00366 if data[0] != _STX:
00367 print data
00368 raise IncompleteResponseError("Data began with %d, expected STX (%d)." % (data[0], _STX))
00369 if data[-1] != _ETX:
00370 print data
00371 raise IncompleteResponseError("Data ended with %d, not ETX (%d)." % (data[-1], _ETX))
00372
00373 data = data[1:-1]
00374
00375
00376 seqnum = _bytes_to_num((data[3],data[4]))
00377 if seqnum != expected_seq_num:
00378 print data
00379 raise SeqNumMismatchError("Message # mismatch, got %d, expected %d" % (seqnum, expected_seq_num))
00380
00381 msg_type = _bytes_to_num((data[6], data[7]))
00382
00383 if msg_type == 1:
00384 return ViafloInfoResponse(data)
00385 elif msg_type == 4:
00386 return ViafloPipettorActionResponse(data)
00387 elif msg_type == 10:
00388 return ViafloVersionResponse(data)
00389 else:
00390 print 'Data length', len(data)
00391 print 'MSG data:', data
00392 raise InvalidMessageTypeError("Got message type of %d. Unable to process" % msg_type)
00393
00394
00395 class ViafloPipette(object):
00396 def __init__(self):
00397 self._socket = None
00398 self._seq_num = 0
00399 self._bd_addr = None
00400 self._port = None
00401
00402 def open(self, bd_addr, port):
00403 """
00404 Open specified device.
00405 """
00406 self._bd_addr = bd_addr
00407 self._port = port
00408 self._seq_num = 0
00409
00410 self._sock = bluetooth.BluetoothSocket( bluetooth.RFCOMM )
00411
00412
00413 self._sock.connect( (self._bd_addr, self._port ))
00414
00415 data = self._sock.recv(1024)
00416
00417 def _recv_data(self):
00418 data = ""
00419 while (len(data) == 0) or (data[-1] != chr(_ETX)):
00420 data += self._sock.recv(1024)
00421
00422 return data
00423
00424 def send_msg(self, message, receive = True):
00425 """
00426 @param message bytes : Message must be converted into hex format
00427 @param receive bool : Whether to wait and receive data
00428 @return ViafloResponse : Response message
00429 """
00430 if not self._sock:
00431 raise NotConnectedException("Pipette isn't connected")
00432
00433 self._seq_num += 1
00434 self._sock.send( message.pack(self._seq_num) )
00435
00436 if not receive:
00437 return
00438
00439 data = self._recv_data()
00440
00441 return unpack_msg(data, self._seq_num)
00442
00443 def close(self):
00444 self._sock.close()
00445 self._sock = None
00446
00447