00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 from .robotis_def import *
00023
00024 TXPACKET_MAX_LEN = 250
00025 RXPACKET_MAX_LEN = 250
00026
00027
00028 PKT_HEADER0 = 0
00029 PKT_HEADER1 = 1
00030 PKT_ID = 2
00031 PKT_LENGTH = 3
00032 PKT_INSTRUCTION = 4
00033 PKT_ERROR = 4
00034 PKT_PARAMETER0 = 5
00035
00036
00037 ERRBIT_VOLTAGE = 1
00038 ERRBIT_ANGLE = 2
00039 ERRBIT_OVERHEAT = 4
00040 ERRBIT_RANGE = 8
00041 ERRBIT_CHECKSUM = 16
00042 ERRBIT_OVERLOAD = 32
00043 ERRBIT_INSTRUCTION = 64
00044
00045
00046 class Protocol1PacketHandler(object):
00047 def getProtocolVersion(self):
00048 return 1.0
00049
00050 def getTxRxResult(self, result):
00051 if result == COMM_SUCCESS:
00052 return "[TxRxResult] Communication success!"
00053 elif result == COMM_PORT_BUSY:
00054 return "[TxRxResult] Port is in use!"
00055 elif result == COMM_TX_FAIL:
00056 return "[TxRxResult] Failed transmit instruction packet!"
00057 elif result == COMM_RX_FAIL:
00058 return "[TxRxResult] Failed get status packet from device!"
00059 elif result == COMM_TX_ERROR:
00060 return "[TxRxResult] Incorrect instruction packet!"
00061 elif result == COMM_RX_WAITING:
00062 return "[TxRxResult] Now receiving status packet!"
00063 elif result == COMM_RX_TIMEOUT:
00064 return "[TxRxResult] There is no status packet!"
00065 elif result == COMM_RX_CORRUPT:
00066 return "[TxRxResult] Incorrect status packet!"
00067 elif result == COMM_NOT_AVAILABLE:
00068 return "[TxRxResult] Protocol does not support this function!"
00069 else:
00070 return ""
00071
00072 def getRxPacketError(self, error):
00073 if error & ERRBIT_VOLTAGE:
00074 return "[RxPacketError] Input voltage error!"
00075
00076 if error & ERRBIT_ANGLE:
00077 return "[RxPacketError] Angle limit error!"
00078
00079 if error & ERRBIT_OVERHEAT:
00080 return "[RxPacketError] Overheat error!"
00081
00082 if error & ERRBIT_RANGE:
00083 return "[RxPacketError] Out of range error!"
00084
00085 if error & ERRBIT_CHECKSUM:
00086 return "[RxPacketError] Checksum error!"
00087
00088 if error & ERRBIT_OVERLOAD:
00089 return "[RxPacketError] Overload error!"
00090
00091 if error & ERRBIT_INSTRUCTION:
00092 return "[RxPacketError] Instruction code error!"
00093
00094 return ""
00095
00096 def txPacket(self, port, txpacket):
00097 checksum = 0
00098 total_packet_length = txpacket[PKT_LENGTH] + 4
00099
00100 if port.is_using:
00101 return COMM_PORT_BUSY
00102 port.is_using = True
00103
00104
00105 if total_packet_length > TXPACKET_MAX_LEN:
00106 port.is_using = False
00107 return COMM_TX_ERROR
00108
00109
00110 txpacket[PKT_HEADER0] = 0xFF
00111 txpacket[PKT_HEADER1] = 0xFF
00112
00113
00114 for idx in range(2, total_packet_length - 1):
00115 checksum += txpacket[idx]
00116
00117 txpacket[total_packet_length - 1] = ~checksum & 0xFF
00118
00119
00120
00121
00122 port.clearPort()
00123 written_packet_length = port.writePort(txpacket)
00124 if total_packet_length != written_packet_length:
00125 port.is_using = False
00126 return COMM_TX_FAIL
00127
00128 return COMM_SUCCESS
00129
00130 def rxPacket(self, port):
00131 rxpacket = []
00132
00133 result = COMM_TX_FAIL
00134 checksum = 0
00135 rx_length = 0
00136 wait_length = 6
00137
00138 while True:
00139 rxpacket.extend(port.readPort(wait_length - rx_length))
00140 rx_length = len(rxpacket)
00141 if rx_length >= wait_length:
00142
00143 for idx in range(0, (rx_length - 1)):
00144 if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF):
00145 break
00146
00147 if idx == 0:
00148 if (rxpacket[PKT_ID] > 0xFD) or (rxpacket[PKT_LENGTH] > RXPACKET_MAX_LEN) or (
00149 rxpacket[PKT_ERROR] > 0x7F):
00150
00151
00152 del rxpacket[0]
00153 rx_length -= 1
00154 continue
00155
00156
00157 if wait_length != (rxpacket[PKT_LENGTH] + PKT_LENGTH + 1):
00158 wait_length = rxpacket[PKT_LENGTH] + PKT_LENGTH + 1
00159 continue
00160
00161 if rx_length < wait_length:
00162
00163 if port.isPacketTimeout():
00164 if rx_length == 0:
00165 result = COMM_RX_TIMEOUT
00166 else:
00167 result = COMM_RX_CORRUPT
00168 break
00169 else:
00170 continue
00171
00172
00173 for i in range(2, wait_length - 1):
00174 checksum += rxpacket[i]
00175 checksum = ~checksum & 0xFF
00176
00177
00178 if rxpacket[wait_length - 1] == checksum:
00179 result = COMM_SUCCESS
00180 else:
00181 result = COMM_RX_CORRUPT
00182 break
00183
00184 else:
00185
00186 del rxpacket[0: idx]
00187 rx_length -= idx
00188
00189 else:
00190
00191 if port.isPacketTimeout():
00192 if rx_length == 0:
00193 result = COMM_RX_TIMEOUT
00194 else:
00195 result = COMM_RX_CORRUPT
00196 break
00197
00198 port.is_using = False
00199
00200
00201
00202 return rxpacket, result
00203
00204
00205 def txRxPacket(self, port, txpacket):
00206 rxpacket = None
00207 error = 0
00208
00209
00210 result = self.txPacket(port, txpacket)
00211 if result != COMM_SUCCESS:
00212 return rxpacket, result, error
00213
00214
00215 if txpacket[PKT_INSTRUCTION] == INST_BULK_READ:
00216 result = COMM_NOT_AVAILABLE
00217
00218
00219 if (txpacket[PKT_ID] == BROADCAST_ID):
00220 port.is_using = False
00221 return rxpacket, result, error
00222
00223
00224 if txpacket[PKT_INSTRUCTION] == INST_READ:
00225 port.setPacketTimeout(txpacket[PKT_PARAMETER0 + 1] + 6)
00226 else:
00227 port.setPacketTimeout(6)
00228
00229
00230 while True:
00231 rxpacket, result = self.rxPacket(port)
00232 if result != COMM_SUCCESS or txpacket[PKT_ID] == rxpacket[PKT_ID]:
00233 break
00234
00235 if result == COMM_SUCCESS and txpacket[PKT_ID] == rxpacket[PKT_ID]:
00236 error = rxpacket[PKT_ERROR]
00237
00238 return rxpacket, result, error
00239
00240 def ping(self, port, dxl_id):
00241 model_number = 0
00242 error = 0
00243
00244 txpacket = [0] * 6
00245
00246 if dxl_id >= BROADCAST_ID:
00247 return model_number, COMM_NOT_AVAILABLE, error
00248
00249 txpacket[PKT_ID] = dxl_id
00250 txpacket[PKT_LENGTH] = 2
00251 txpacket[PKT_INSTRUCTION] = INST_PING
00252
00253 rxpacket, result, error = self.txRxPacket(port, txpacket)
00254
00255 if result == COMM_SUCCESS:
00256 data_read, result, error = self.readTxRx(port, dxl_id, 0, 2)
00257 if result == COMM_SUCCESS:
00258 model_number = DXL_MAKEWORD(data_read[0], data_read[1])
00259
00260 return model_number, result, error
00261
00262 def broadcastPing(self, port):
00263 data_list = None
00264 return data_list, COMM_NOT_AVAILABLE
00265
00266 def action(self, port, dxl_id):
00267 txpacket = [0] * 6
00268
00269 txpacket[PKT_ID] = dxl_id
00270 txpacket[PKT_LENGTH] = 2
00271 txpacket[PKT_INSTRUCTION] = INST_ACTION
00272
00273 _, result, _ = self.txRxPacket(port, txpacket)
00274
00275 return result
00276
00277 def reboot(self, port, dxl_id):
00278 return COMM_NOT_AVAILABLE, 0
00279
00280 def factoryReset(self, port, dxl_id):
00281 txpacket = [0] * 6
00282
00283 txpacket[PKT_ID] = dxl_id
00284 txpacket[PKT_LENGTH] = 2
00285 txpacket[PKT_INSTRUCTION] = INST_FACTORY_RESET
00286
00287 _, result, error = self.txRxPacket(port, txpacket)
00288
00289 return result, error
00290
00291 def readTx(self, port, dxl_id, address, length):
00292
00293 txpacket = [0] * 8
00294
00295 if dxl_id >= BROADCAST_ID:
00296 return COMM_NOT_AVAILABLE
00297
00298 txpacket[PKT_ID] = dxl_id
00299 txpacket[PKT_LENGTH] = 4
00300 txpacket[PKT_INSTRUCTION] = INST_READ
00301 txpacket[PKT_PARAMETER0 + 0] = address
00302 txpacket[PKT_PARAMETER0 + 1] = length
00303
00304 result = self.txPacket(port, txpacket)
00305
00306
00307 if result == COMM_SUCCESS:
00308 port.setPacketTimeout(length + 6)
00309
00310 return result
00311
00312 def readRx(self, port, dxl_id, length):
00313 result = COMM_TX_FAIL
00314 error = 0
00315
00316 rxpacket = None
00317 data = []
00318
00319 while True:
00320 rxpacket, result = self.rxPacket(port)
00321
00322 if result != COMM_SUCCESS or rxpacket[PKT_ID] == dxl_id:
00323 break
00324
00325 if result == COMM_SUCCESS and rxpacket[PKT_ID] == dxl_id:
00326 error = rxpacket[PKT_ERROR]
00327
00328 data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])
00329
00330 return data, result, error
00331
00332 def readTxRx(self, port, dxl_id, address, length):
00333 txpacket = [0] * 8
00334 data = []
00335
00336 if dxl_id >= BROADCAST_ID:
00337 return data, COMM_NOT_AVAILABLE, 0
00338
00339 txpacket[PKT_ID] = dxl_id
00340 txpacket[PKT_LENGTH] = 4
00341 txpacket[PKT_INSTRUCTION] = INST_READ
00342 txpacket[PKT_PARAMETER0 + 0] = address
00343 txpacket[PKT_PARAMETER0 + 1] = length
00344
00345 rxpacket, result, error = self.txRxPacket(port, txpacket)
00346 if result == COMM_SUCCESS:
00347 error = rxpacket[PKT_ERROR]
00348
00349 data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])
00350
00351 return data, result, error
00352
00353 def read1ByteTx(self, port, dxl_id, address):
00354 return self.readTx(port, dxl_id, address, 1)
00355
00356 def read1ByteRx(self, port, dxl_id):
00357 data, result, error = self.readRx(port, dxl_id, 1)
00358 data_read = data[0] if (result == COMM_SUCCESS) else 0
00359 return data_read, result, error
00360
00361 def read1ByteTxRx(self, port, dxl_id, address):
00362 data, result, error = self.readTxRx(port, dxl_id, address, 1)
00363 data_read = data[0] if (result == COMM_SUCCESS) else 0
00364 return data_read, result, error
00365
00366 def read2ByteTx(self, port, dxl_id, address):
00367 return self.readTx(port, dxl_id, address, 2)
00368
00369 def read2ByteRx(self, port, dxl_id):
00370 data, result, error = self.readRx(port, dxl_id, 2)
00371 data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
00372 return data_read, result, error
00373
00374 def read2ByteTxRx(self, port, dxl_id, address):
00375 data, result, error = self.readTxRx(port, dxl_id, address, 2)
00376 data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
00377 return data_read, result, error
00378
00379 def read4ByteTx(self, port, dxl_id, address):
00380 return self.readTx(port, dxl_id, address, 4)
00381
00382 def read4ByteRx(self, port, dxl_id):
00383 data, result, error = self.readRx(port, dxl_id, 4)
00384 data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
00385 DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
00386 return data_read, result, error
00387
00388 def read4ByteTxRx(self, port, dxl_id, address):
00389 data, result, error = self.readTxRx(port, dxl_id, address, 4)
00390 data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
00391 DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
00392 return data_read, result, error
00393
00394 def writeTxOnly(self, port, dxl_id, address, length, data):
00395 txpacket = [0] * (length + 7)
00396
00397 txpacket[PKT_ID] = dxl_id
00398 txpacket[PKT_LENGTH] = length + 3
00399 txpacket[PKT_INSTRUCTION] = INST_WRITE
00400 txpacket[PKT_PARAMETER0] = address
00401
00402 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
00403
00404 result = self.txPacket(port, txpacket)
00405 port.is_using = False
00406
00407 return result
00408
00409 def writeTxRx(self, port, dxl_id, address, length, data):
00410 txpacket = [0] * (length + 7)
00411
00412 txpacket[PKT_ID] = dxl_id
00413 txpacket[PKT_LENGTH] = length + 3
00414 txpacket[PKT_INSTRUCTION] = INST_WRITE
00415 txpacket[PKT_PARAMETER0] = address
00416
00417 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
00418 rxpacket, result, error = self.txRxPacket(port, txpacket)
00419
00420 return result, error
00421
00422 def write1ByteTxOnly(self, port, dxl_id, address, data):
00423 data_write = [data]
00424 return self.writeTxOnly(port, dxl_id, address, 1, data_write)
00425
00426 def write1ByteTxRx(self, port, dxl_id, address, data):
00427 data_write = [data]
00428 return self.writeTxRx(port, dxl_id, address, 1, data_write)
00429
00430 def write2ByteTxOnly(self, port, dxl_id, address, data):
00431 data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
00432 return self.writeTxOnly(port, dxl_id, address, 2, data_write)
00433
00434 def write2ByteTxRx(self, port, dxl_id, address, data):
00435 data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
00436 return self.writeTxRx(port, dxl_id, address, 2, data_write)
00437
00438 def write4ByteTxOnly(self, port, dxl_id, address, data):
00439 data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
00440 DXL_HIBYTE(DXL_LOWORD(data)),
00441 DXL_LOBYTE(DXL_HIWORD(data)),
00442 DXL_HIBYTE(DXL_HIWORD(data))]
00443 return self.writeTxOnly(port, dxl_id, address, 4, data_write)
00444
00445 def write4ByteTxRx(self, port, dxl_id, address, data):
00446 data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
00447 DXL_HIBYTE(DXL_LOWORD(data)),
00448 DXL_LOBYTE(DXL_HIWORD(data)),
00449 DXL_HIBYTE(DXL_HIWORD(data))]
00450 return self.writeTxRx(port, dxl_id, address, 4, data_write)
00451
00452 def regWriteTxOnly(self, port, dxl_id, address, length, data):
00453 txpacket = [0] * (length + 7)
00454
00455 txpacket[PKT_ID] = dxl_id
00456 txpacket[PKT_LENGTH] = length + 3
00457 txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
00458 txpacket[PKT_PARAMETER0] = address
00459
00460 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
00461
00462 result = self.txPacket(port, txpacket)
00463 port.is_using = False
00464
00465 return result
00466
00467 def regWriteTxRx(self, port, dxl_id, address, length, data):
00468 txpacket = [0] * (length + 7)
00469
00470 txpacket[PKT_ID] = dxl_id
00471 txpacket[PKT_LENGTH] = length + 3
00472 txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
00473 txpacket[PKT_PARAMETER0] = address
00474
00475 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
00476
00477 _, result, error = self.txRxPacket(port, txpacket)
00478
00479 return result, error
00480
00481 def syncReadTx(self, port, start_address, data_length, param, param_length):
00482 return COMM_NOT_AVAILABLE
00483
00484 def syncWriteTxOnly(self, port, start_address, data_length, param, param_length):
00485 txpacket = [0] * (param_length + 8)
00486
00487
00488 txpacket[PKT_ID] = BROADCAST_ID
00489 txpacket[PKT_LENGTH] = param_length + 4
00490 txpacket[PKT_INSTRUCTION] = INST_SYNC_WRITE
00491 txpacket[PKT_PARAMETER0 + 0] = start_address
00492 txpacket[PKT_PARAMETER0 + 1] = data_length
00493
00494 txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + param_length] = param[0: param_length]
00495
00496 _, result, _ = self.txRxPacket(port, txpacket)
00497
00498 return result
00499
00500 def bulkReadTx(self, port, param, param_length):
00501 txpacket = [0] * (param_length + 7)
00502
00503
00504 txpacket[PKT_ID] = BROADCAST_ID
00505 txpacket[PKT_LENGTH] = param_length + 3
00506 txpacket[PKT_INSTRUCTION] = INST_BULK_READ
00507 txpacket[PKT_PARAMETER0 + 0] = 0x00
00508
00509 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + param_length] = param[0: param_length]
00510
00511 result = self.txPacket(port, txpacket)
00512 if result == COMM_SUCCESS:
00513 wait_length = 0
00514 i = 0
00515 while i < param_length:
00516 wait_length += param[i] + 7
00517 i += 3
00518 port.setPacketTimeout(wait_length)
00519
00520 return result
00521
00522 def bulkWriteTxOnly(self, port, param, param_length):
00523 return COMM_NOT_AVAILABLE