00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 from .robotis_def import *
00023
00024 TXPACKET_MAX_LEN = 4 * 1024
00025 RXPACKET_MAX_LEN = 4 * 1024
00026
00027
00028 PKT_HEADER0 = 0
00029 PKT_HEADER1 = 1
00030 PKT_HEADER2 = 2
00031 PKT_RESERVED = 3
00032 PKT_ID = 4
00033 PKT_LENGTH_L = 5
00034 PKT_LENGTH_H = 6
00035 PKT_INSTRUCTION = 7
00036 PKT_ERROR = 8
00037 PKT_PARAMETER0 = 8
00038
00039
00040 ERRNUM_RESULT_FAIL = 1
00041 ERRNUM_INSTRUCTION = 2
00042 ERRNUM_CRC = 3
00043 ERRNUM_DATA_RANGE = 4
00044 ERRNUM_DATA_LENGTH = 5
00045 ERRNUM_DATA_LIMIT = 6
00046 ERRNUM_ACCESS = 7
00047
00048 ERRBIT_ALERT = 128
00049
00050
00051 class Protocol2PacketHandler(object):
00052 def getProtocolVersion(self):
00053 return 2.0
00054
00055 def getTxRxResult(self, result):
00056 if result == COMM_SUCCESS:
00057 return "[TxRxResult] Communication success!"
00058 elif result == COMM_PORT_BUSY:
00059 return "[TxRxResult] Port is in use!"
00060 elif result == COMM_TX_FAIL:
00061 return "[TxRxResult] Failed transmit instruction packet!"
00062 elif result == COMM_RX_FAIL:
00063 return "[TxRxResult] Failed get status packet from device!"
00064 elif result == COMM_TX_ERROR:
00065 return "[TxRxResult] Incorrect instruction packet!"
00066 elif result == COMM_RX_WAITING:
00067 return "[TxRxResult] Now receiving status packet!"
00068 elif result == COMM_RX_TIMEOUT:
00069 return "[TxRxResult] There is no status packet!"
00070 elif result == COMM_RX_CORRUPT:
00071 return "[TxRxResult] Incorrect status packet!"
00072 elif result == COMM_NOT_AVAILABLE:
00073 return "[TxRxResult] Protocol does not support this function!"
00074 else:
00075 return ""
00076
00077 def getRxPacketError(self, error):
00078 if error & ERRBIT_ALERT:
00079 return "[RxPacketError] Hardware error occurred. Check the error at Control Table (Hardware Error Status)!"
00080
00081 not_alert_error = error & ~ERRBIT_ALERT
00082 if not_alert_error == 0:
00083 return ""
00084 elif not_alert_error == ERRNUM_RESULT_FAIL:
00085 return "[RxPacketError] Failed to process the instruction packet!"
00086
00087 elif not_alert_error == ERRNUM_INSTRUCTION:
00088 return "[RxPacketError] Undefined instruction or incorrect instruction!"
00089
00090 elif not_alert_error == ERRNUM_CRC:
00091 return "[RxPacketError] CRC doesn't match!"
00092
00093 elif not_alert_error == ERRNUM_DATA_RANGE:
00094 return "[RxPacketError] The data value is out of range!"
00095
00096 elif not_alert_error == ERRNUM_DATA_LENGTH:
00097 return "[RxPacketError] The data length does not match as expected!"
00098
00099 elif not_alert_error == ERRNUM_DATA_LIMIT:
00100 return "[RxPacketError] The data value exceeds the limit value!"
00101
00102 elif not_alert_error == ERRNUM_ACCESS:
00103 return "[RxPacketError] Writing or Reading is not available to target address!"
00104
00105 else:
00106 return "[RxPacketError] Unknown error code!"
00107
00108 def updateCRC(self, crc_accum, data_blk_ptr, data_blk_size):
00109 crc_table = [0x0000,
00110 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
00111 0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027,
00112 0x0022, 0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D,
00113 0x8077, 0x0072, 0x0050, 0x8055, 0x805F, 0x005A, 0x804B,
00114 0x004E, 0x0044, 0x8041, 0x80C3, 0x00C6, 0x00CC, 0x80C9,
00115 0x00D8, 0x80DD, 0x80D7, 0x00D2, 0x00F0, 0x80F5, 0x80FF,
00116 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1, 0x00A0, 0x80A5,
00117 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1, 0x8093,
00118 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082,
00119 0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197,
00120 0x0192, 0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE,
00121 0x01A4, 0x81A1, 0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB,
00122 0x01FE, 0x01F4, 0x81F1, 0x81D3, 0x01D6, 0x01DC, 0x81D9,
00123 0x01C8, 0x81CD, 0x81C7, 0x01C2, 0x0140, 0x8145, 0x814F,
00124 0x014A, 0x815B, 0x015E, 0x0154, 0x8151, 0x8173, 0x0176,
00125 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162, 0x8123,
00126 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132,
00127 0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104,
00128 0x8101, 0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D,
00129 0x8317, 0x0312, 0x0330, 0x8335, 0x833F, 0x033A, 0x832B,
00130 0x032E, 0x0324, 0x8321, 0x0360, 0x8365, 0x836F, 0x036A,
00131 0x837B, 0x037E, 0x0374, 0x8371, 0x8353, 0x0356, 0x035C,
00132 0x8359, 0x0348, 0x834D, 0x8347, 0x0342, 0x03C0, 0x83C5,
00133 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1, 0x83F3,
00134 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2,
00135 0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7,
00136 0x03B2, 0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E,
00137 0x0384, 0x8381, 0x0280, 0x8285, 0x828F, 0x028A, 0x829B,
00138 0x029E, 0x0294, 0x8291, 0x82B3, 0x02B6, 0x02BC, 0x82B9,
00139 0x02A8, 0x82AD, 0x82A7, 0x02A2, 0x82E3, 0x02E6, 0x02EC,
00140 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2, 0x02D0, 0x82D5,
00141 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1, 0x8243,
00142 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252,
00143 0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264,
00144 0x8261, 0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E,
00145 0x0234, 0x8231, 0x8213, 0x0216, 0x021C, 0x8219, 0x0208,
00146 0x820D, 0x8207, 0x0202]
00147
00148 for j in range(0, data_blk_size):
00149 i = ((crc_accum >> 8) ^ data_blk_ptr[j]) & 0xFF
00150 crc_accum = ((crc_accum << 8) ^ crc_table[i]) & 0xFFFF
00151
00152 return crc_accum
00153
00154 def addStuffing(self, packet):
00155 packet_length_in = DXL_MAKEWORD(packet[PKT_LENGTH_L], packet[PKT_LENGTH_H])
00156 packet_length_out = packet_length_in
00157
00158 temp = [0] * TXPACKET_MAX_LEN
00159
00160
00161 temp[PKT_HEADER0: PKT_HEADER0 + PKT_LENGTH_H + 1] = packet[PKT_HEADER0: PKT_HEADER0 + PKT_LENGTH_H + 1]
00162
00163 index = PKT_INSTRUCTION
00164
00165 for i in range(0, packet_length_in - 2):
00166 temp[index] = packet[i + PKT_INSTRUCTION]
00167 index = index + 1
00168 if packet[i + PKT_INSTRUCTION] == 0xFD \
00169 and packet[i + PKT_INSTRUCTION - 1] == 0xFF \
00170 and packet[i + PKT_INSTRUCTION - 2] == 0xFF:
00171
00172 temp[index] = 0xFD
00173 index = index + 1
00174 packet_length_out = packet_length_out + 1
00175
00176 temp[index] = packet[PKT_INSTRUCTION + packet_length_in - 2]
00177 temp[index + 1] = packet[PKT_INSTRUCTION + packet_length_in - 1]
00178 index = index + 2
00179
00180 if packet_length_in != packet_length_out:
00181 packet = [0] * index
00182
00183 packet[0: index] = temp[0: index]
00184
00185 packet[PKT_LENGTH_L] = DXL_LOBYTE(packet_length_out)
00186 packet[PKT_LENGTH_H] = DXL_HIBYTE(packet_length_out)
00187
00188 return packet
00189
00190 def removeStuffing(self, packet):
00191 packet_length_in = DXL_MAKEWORD(packet[PKT_LENGTH_L], packet[PKT_LENGTH_H])
00192 packet_length_out = packet_length_in
00193
00194 index = PKT_INSTRUCTION
00195 for i in range(0, (packet_length_in - 2)):
00196 if (packet[i + PKT_INSTRUCTION] == 0xFD) and (packet[i + PKT_INSTRUCTION + 1] == 0xFD) and (
00197 packet[i + PKT_INSTRUCTION - 1] == 0xFF) and (packet[i + PKT_INSTRUCTION - 2] == 0xFF):
00198
00199 packet_length_out = packet_length_out - 1
00200 i += 1
00201
00202 packet[index] = packet[i + PKT_INSTRUCTION]
00203 index += 1
00204
00205 packet[index] = packet[PKT_INSTRUCTION + packet_length_in - 2]
00206 packet[index + 1] = packet[PKT_INSTRUCTION + packet_length_in - 1]
00207
00208 packet[PKT_LENGTH_L] = DXL_LOBYTE(packet_length_out)
00209 packet[PKT_LENGTH_H] = DXL_HIBYTE(packet_length_out)
00210
00211 return packet
00212
00213 def txPacket(self, port, txpacket):
00214 if port.is_using:
00215 return COMM_PORT_BUSY
00216 port.is_using = True
00217
00218
00219 self.addStuffing(txpacket)
00220
00221
00222 total_packet_length = DXL_MAKEWORD(txpacket[PKT_LENGTH_L], txpacket[PKT_LENGTH_H]) + 7
00223
00224
00225 if total_packet_length > TXPACKET_MAX_LEN:
00226 port.is_using = False
00227 return COMM_TX_ERROR
00228
00229
00230 txpacket[PKT_HEADER0] = 0xFF
00231 txpacket[PKT_HEADER1] = 0xFF
00232 txpacket[PKT_HEADER2] = 0xFD
00233 txpacket[PKT_RESERVED] = 0x00
00234
00235
00236 crc = self.updateCRC(0, txpacket, total_packet_length - 2)
00237
00238 txpacket[total_packet_length - 2] = DXL_LOBYTE(crc)
00239 txpacket[total_packet_length - 1] = DXL_HIBYTE(crc)
00240
00241
00242 port.clearPort()
00243 written_packet_length = port.writePort(txpacket)
00244 if total_packet_length != written_packet_length:
00245 port.is_using = False
00246 return COMM_TX_FAIL
00247
00248 return COMM_SUCCESS
00249
00250 def rxPacket(self, port):
00251 rxpacket = []
00252
00253 result = COMM_TX_FAIL
00254 rx_length = 0
00255 wait_length = 11
00256
00257 while True:
00258 rxpacket.extend(port.readPort(wait_length - rx_length))
00259 rx_length = len(rxpacket)
00260 if rx_length >= wait_length:
00261
00262 for idx in range(0, (rx_length - 3)):
00263 if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF) and (rxpacket[idx + 2] == 0xFD) and (
00264 rxpacket[idx + 3] != 0xFD):
00265 break
00266
00267 if idx == 0:
00268 if (rxpacket[PKT_RESERVED] != 0x00) or (rxpacket[PKT_ID] > 0xFC) or (
00269 DXL_MAKEWORD(rxpacket[PKT_LENGTH_L], rxpacket[PKT_LENGTH_H]) > RXPACKET_MAX_LEN) or (
00270 rxpacket[PKT_INSTRUCTION] != 0x55):
00271
00272 del rxpacket[0]
00273 rx_length -= 1
00274 continue
00275
00276 if wait_length != (DXL_MAKEWORD(rxpacket[PKT_LENGTH_L], rxpacket[PKT_LENGTH_H]) + PKT_LENGTH_H + 1):
00277 wait_length = DXL_MAKEWORD(rxpacket[PKT_LENGTH_L], rxpacket[PKT_LENGTH_H]) + PKT_LENGTH_H + 1
00278 continue
00279
00280 if rx_length < wait_length:
00281 if port.isPacketTimeout():
00282 if rx_length == 0:
00283 result = COMM_RX_TIMEOUT
00284 else:
00285 result = COMM_RX_CORRUPT
00286 break
00287 else:
00288 continue
00289
00290 crc = DXL_MAKEWORD(rxpacket[wait_length - 2], rxpacket[wait_length - 1])
00291
00292 if self.updateCRC(0, rxpacket, wait_length - 2) == crc:
00293 result = COMM_SUCCESS
00294 else:
00295 result = COMM_RX_CORRUPT
00296 break
00297
00298 else:
00299
00300 del rxpacket[0: idx]
00301 rx_length -= idx
00302
00303 else:
00304 if port.isPacketTimeout():
00305 if rx_length == 0:
00306 result = COMM_RX_TIMEOUT
00307 else:
00308 result = COMM_RX_CORRUPT
00309 break
00310
00311 port.is_using = False
00312
00313 if result == COMM_SUCCESS:
00314 rxpacket = self.removeStuffing(rxpacket)
00315
00316 return rxpacket, result
00317
00318
00319 def txRxPacket(self, port, txpacket):
00320 rxpacket = None
00321 error = 0
00322
00323
00324 result = self.txPacket(port, txpacket)
00325 if result != COMM_SUCCESS:
00326 return rxpacket, result, error
00327
00328
00329 if txpacket[PKT_INSTRUCTION] == INST_BULK_READ or txpacket[PKT_INSTRUCTION] == INST_SYNC_READ:
00330 result = COMM_NOT_AVAILABLE
00331
00332
00333
00334 if txpacket[PKT_ID] == BROADCAST_ID or txpacket[PKT_INSTRUCTION] == INST_ACTION:
00335 port.is_using = False
00336 return rxpacket, result, error
00337
00338
00339 if txpacket[PKT_INSTRUCTION] == INST_READ:
00340 port.setPacketTimeout(DXL_MAKEWORD(txpacket[PKT_PARAMETER0 + 2], txpacket[PKT_PARAMETER0 + 3]) + 11)
00341 else:
00342 port.setPacketTimeout(11)
00343
00344
00345
00346 while True:
00347 rxpacket, result = self.rxPacket(port)
00348 if result != COMM_SUCCESS or txpacket[PKT_ID] == rxpacket[PKT_ID]:
00349 break
00350
00351 if result == COMM_SUCCESS and txpacket[PKT_ID] == rxpacket[PKT_ID]:
00352 error = rxpacket[PKT_ERROR]
00353
00354 return rxpacket, result, error
00355
00356 def ping(self, port, dxl_id):
00357 model_number = 0
00358 error = 0
00359
00360 txpacket = [0] * 10
00361
00362 if dxl_id >= BROADCAST_ID:
00363 return model_number, COMM_NOT_AVAILABLE, error
00364
00365 txpacket[PKT_ID] = dxl_id
00366 txpacket[PKT_LENGTH_L] = 3
00367 txpacket[PKT_LENGTH_H] = 0
00368 txpacket[PKT_INSTRUCTION] = INST_PING
00369
00370 rxpacket, result, error = self.txRxPacket(port, txpacket)
00371 if result == COMM_SUCCESS:
00372 model_number = DXL_MAKEWORD(rxpacket[PKT_PARAMETER0 + 1], rxpacket[PKT_PARAMETER0 + 2])
00373
00374 return model_number, result, error
00375
00376 def broadcastPing(self, port):
00377 data_list = {}
00378
00379 STATUS_LENGTH = 14
00380
00381 rx_length = 0
00382 wait_length = STATUS_LENGTH * MAX_ID
00383
00384 txpacket = [0] * 10
00385 rxpacket = []
00386
00387 txpacket[PKT_ID] = BROADCAST_ID
00388 txpacket[PKT_LENGTH_L] = 3
00389 txpacket[PKT_LENGTH_H] = 0
00390 txpacket[PKT_INSTRUCTION] = INST_PING
00391
00392 result = self.txPacket(port, txpacket)
00393 if result != COMM_SUCCESS:
00394 port.is_using = False
00395 return data_list, result
00396
00397
00398 port.setPacketTimeout(wait_length * 1)
00399
00400 while True:
00401 rxpacket += port.readPort(wait_length - rx_length)
00402 rx_length = len(rxpacket)
00403
00404 if port.isPacketTimeout():
00405 break
00406
00407 port.is_using = False
00408
00409 if rx_length == 0:
00410 return data_list, COMM_RX_TIMEOUT
00411
00412 while True:
00413 if rx_length < STATUS_LENGTH:
00414 return data_list, COMM_RX_CORRUPT
00415
00416
00417 for idx in range(0, rx_length - 2):
00418 if rxpacket[idx] == 0xFF and rxpacket[idx + 1] == 0xFF and rxpacket[idx + 2] == 0xFD:
00419 break
00420
00421 if idx == 0:
00422
00423 crc = DXL_MAKEWORD(rxpacket[STATUS_LENGTH - 2], rxpacket[STATUS_LENGTH - 1])
00424
00425 if self.updateCRC(0, rxpacket, STATUS_LENGTH - 2) == crc:
00426 result = COMM_SUCCESS
00427
00428 data_list[rxpacket[PKT_ID]] = [
00429 DXL_MAKEWORD(rxpacket[PKT_PARAMETER0 + 1], rxpacket[PKT_PARAMETER0 + 2]),
00430 rxpacket[PKT_PARAMETER0 + 3]]
00431
00432 del rxpacket[0: STATUS_LENGTH]
00433 rx_length = rx_length - STATUS_LENGTH
00434
00435 if rx_length == 0:
00436 return data_list, result
00437
00438 else:
00439 result = COMM_RX_CORRUPT
00440
00441
00442 del rxpacket[0: 3]
00443 rx_length = rx_length - 3
00444
00445 else:
00446
00447 del rxpacket[0: idx]
00448 rx_length = rx_length - idx
00449
00450
00451 return data_list, result
00452
00453 def action(self, port, dxl_id):
00454 txpacket = [0] * 10
00455
00456 txpacket[PKT_ID] = dxl_id
00457 txpacket[PKT_LENGTH_L] = 3
00458 txpacket[PKT_LENGTH_H] = 0
00459 txpacket[PKT_INSTRUCTION] = INST_ACTION
00460
00461 _, result, _ = self.txRxPacket(port, txpacket)
00462 return result
00463
00464 def reboot(self, port, dxl_id):
00465 txpacket = [0] * 10
00466
00467 txpacket[PKT_ID] = dxl_id
00468 txpacket[PKT_LENGTH_L] = 3
00469 txpacket[PKT_LENGTH_H] = 0
00470 txpacket[PKT_INSTRUCTION] = INST_REBOOT
00471
00472 _, result, error = self.txRxPacket(port, txpacket)
00473 return result, error
00474
00475 def factoryReset(self, port, dxl_id, option):
00476 txpacket = [0] * 11
00477
00478 txpacket[PKT_ID] = dxl_id
00479 txpacket[PKT_LENGTH_L] = 4
00480 txpacket[PKT_LENGTH_H] = 0
00481 txpacket[PKT_INSTRUCTION] = INST_FACTORY_RESET
00482 txpacket[PKT_PARAMETER0] = option
00483
00484 _, result, error = self.txRxPacket(port, txpacket)
00485 return result, error
00486
00487 def readTx(self, port, dxl_id, address, length):
00488 txpacket = [0] * 14
00489
00490 if dxl_id >= BROADCAST_ID:
00491 return COMM_NOT_AVAILABLE
00492
00493 txpacket[PKT_ID] = dxl_id
00494 txpacket[PKT_LENGTH_L] = 7
00495 txpacket[PKT_LENGTH_H] = 0
00496 txpacket[PKT_INSTRUCTION] = INST_READ
00497 txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00498 txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00499 txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(length)
00500 txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(length)
00501
00502 result = self.txPacket(port, txpacket)
00503
00504
00505 if result == COMM_SUCCESS:
00506 port.setPacketTimeout(length + 11)
00507
00508 return result
00509
00510 def readRx(self, port, dxl_id, length):
00511 result = COMM_TX_FAIL
00512 error = 0
00513
00514 rxpacket = None
00515 data = []
00516
00517 while True:
00518 rxpacket, result = self.rxPacket(port)
00519
00520 if result != COMM_SUCCESS or rxpacket[PKT_ID] == dxl_id:
00521 break
00522
00523 if result == COMM_SUCCESS and rxpacket[PKT_ID] == dxl_id:
00524 error = rxpacket[PKT_ERROR]
00525
00526 data.extend(rxpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length])
00527
00528 return data, result, error
00529
00530 def readTxRx(self, port, dxl_id, address, length):
00531 error = 0
00532
00533 txpacket = [0] * 14
00534 data = []
00535
00536 if dxl_id >= BROADCAST_ID:
00537 return data, COMM_NOT_AVAILABLE, error
00538
00539 txpacket[PKT_ID] = dxl_id
00540 txpacket[PKT_LENGTH_L] = 7
00541 txpacket[PKT_LENGTH_H] = 0
00542 txpacket[PKT_INSTRUCTION] = INST_READ
00543 txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00544 txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00545 txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(length)
00546 txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(length)
00547
00548 rxpacket, result, error = self.txRxPacket(port, txpacket)
00549 if result == COMM_SUCCESS:
00550 error = rxpacket[PKT_ERROR]
00551
00552 data.extend(rxpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length])
00553
00554 return data, result, error
00555
00556 def read1ByteTx(self, port, dxl_id, address):
00557 return self.readTx(port, dxl_id, address, 1)
00558
00559 def read1ByteRx(self, port, dxl_id):
00560 data, result, error = self.readRx(port, dxl_id, 1)
00561 data_read = data[0] if (result == COMM_SUCCESS) else 0
00562 return data_read, result, error
00563
00564 def read1ByteTxRx(self, port, dxl_id, address):
00565 data, result, error = self.readTxRx(port, dxl_id, address, 1)
00566 data_read = data[0] if (result == COMM_SUCCESS) else 0
00567 return data_read, result, error
00568
00569 def read2ByteTx(self, port, dxl_id, address):
00570 return self.readTx(port, dxl_id, address, 2)
00571
00572 def read2ByteRx(self, port, dxl_id):
00573 data, result, error = self.readRx(port, dxl_id, 2)
00574 data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
00575 return data_read, result, error
00576
00577 def read2ByteTxRx(self, port, dxl_id, address):
00578 data, result, error = self.readTxRx(port, dxl_id, address, 2)
00579 data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
00580 return data_read, result, error
00581
00582 def read4ByteTx(self, port, dxl_id, address):
00583 return self.readTx(port, dxl_id, address, 4)
00584
00585 def read4ByteRx(self, port, dxl_id):
00586 data, result, error = self.readRx(port, dxl_id, 4)
00587 data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
00588 DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
00589 return data_read, result, error
00590
00591 def read4ByteTxRx(self, port, dxl_id, address):
00592 data, result, error = self.readTxRx(port, dxl_id, address, 4)
00593 data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
00594 DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
00595 return data_read, result, error
00596
00597 def writeTxOnly(self, port, dxl_id, address, length, data):
00598 txpacket = [0] * (length + 12)
00599
00600 txpacket[PKT_ID] = dxl_id
00601 txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
00602 txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
00603 txpacket[PKT_INSTRUCTION] = INST_WRITE
00604 txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00605 txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00606
00607 txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
00608
00609 result = self.txPacket(port, txpacket)
00610 port.is_using = False
00611
00612 return result
00613
00614 def writeTxRx(self, port, dxl_id, address, length, data):
00615 txpacket = [0] * (length + 12)
00616
00617 txpacket[PKT_ID] = dxl_id
00618 txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
00619 txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
00620 txpacket[PKT_INSTRUCTION] = INST_WRITE
00621 txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00622 txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00623
00624 txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
00625 rxpacket, result, error = self.txRxPacket(port, txpacket)
00626
00627 return result, error
00628
00629 def write1ByteTxOnly(self, port, dxl_id, address, data):
00630 data_write = [data]
00631 return self.writeTxOnly(port, dxl_id, address, 1, data_write)
00632
00633 def write1ByteTxRx(self, port, dxl_id, address, data):
00634 data_write = [data]
00635 return self.writeTxRx(port, dxl_id, address, 1, data_write)
00636
00637 def write2ByteTxOnly(self, port, dxl_id, address, data):
00638 data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
00639 return self.writeTxOnly(port, dxl_id, address, 2, data_write)
00640
00641 def write2ByteTxRx(self, port, dxl_id, address, data):
00642 data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
00643 return self.writeTxRx(port, dxl_id, address, 2, data_write)
00644
00645 def write4ByteTxOnly(self, port, dxl_id, address, data):
00646 data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
00647 DXL_HIBYTE(DXL_LOWORD(data)),
00648 DXL_LOBYTE(DXL_HIWORD(data)),
00649 DXL_HIBYTE(DXL_HIWORD(data))]
00650 return self.writeTxOnly(port, dxl_id, address, 4, data_write)
00651
00652 def write4ByteTxRx(self, port, dxl_id, address, data):
00653 data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
00654 DXL_HIBYTE(DXL_LOWORD(data)),
00655 DXL_LOBYTE(DXL_HIWORD(data)),
00656 DXL_HIBYTE(DXL_HIWORD(data))]
00657 return self.writeTxRx(port, dxl_id, address, 4, data_write)
00658
00659 def regWriteTxOnly(self, port, dxl_id, address, length, data):
00660 txpacket = [0] * (length + 12)
00661
00662 txpacket[PKT_ID] = dxl_id
00663 txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
00664 txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
00665 txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
00666 txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00667 txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00668
00669 txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
00670
00671 result = self.txPacket(port, txpacket)
00672 port.is_using = False
00673
00674 return result
00675
00676 def regWriteTxRx(self, port, dxl_id, address, length, data):
00677 txpacket = [0] * (length + 12)
00678
00679 txpacket[PKT_ID] = dxl_id
00680 txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
00681 txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
00682 txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
00683 txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00684 txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00685
00686 txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
00687
00688 _, result, error = self.txRxPacket(port, txpacket)
00689
00690 return result, error
00691
00692 def syncReadTx(self, port, start_address, data_length, param, param_length):
00693 txpacket = [0] * (param_length + 14)
00694
00695
00696 txpacket[PKT_ID] = BROADCAST_ID
00697 txpacket[PKT_LENGTH_L] = DXL_LOBYTE(
00698 param_length + 7)
00699 txpacket[PKT_LENGTH_H] = DXL_HIBYTE(
00700 param_length + 7)
00701 txpacket[PKT_INSTRUCTION] = INST_SYNC_READ
00702 txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(start_address)
00703 txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(start_address)
00704 txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(data_length)
00705 txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(data_length)
00706
00707 txpacket[PKT_PARAMETER0 + 4: PKT_PARAMETER0 + 4 + param_length] = param[0: param_length]
00708
00709 result = self.txPacket(port, txpacket)
00710 if result == COMM_SUCCESS:
00711 port.setPacketTimeout((11 + data_length) * param_length)
00712
00713 return result
00714
00715 def syncWriteTxOnly(self, port, start_address, data_length, param, param_length):
00716 txpacket = [0] * (param_length + 14)
00717
00718
00719 txpacket[PKT_ID] = BROADCAST_ID
00720 txpacket[PKT_LENGTH_L] = DXL_LOBYTE(
00721 param_length + 7)
00722 txpacket[PKT_LENGTH_H] = DXL_HIBYTE(
00723 param_length + 7)
00724 txpacket[PKT_INSTRUCTION] = INST_SYNC_WRITE
00725 txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(start_address)
00726 txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(start_address)
00727 txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(data_length)
00728 txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(data_length)
00729
00730 txpacket[PKT_PARAMETER0 + 4: PKT_PARAMETER0 + 4 + param_length] = param[0: param_length]
00731
00732 _, result, _ = self.txRxPacket(port, txpacket)
00733
00734 return result
00735
00736 def bulkReadTx(self, port, param, param_length):
00737 txpacket = [0] * (param_length + 10)
00738
00739
00740 txpacket[PKT_ID] = BROADCAST_ID
00741 txpacket[PKT_LENGTH_L] = DXL_LOBYTE(param_length + 3)
00742 txpacket[PKT_LENGTH_H] = DXL_HIBYTE(param_length + 3)
00743 txpacket[PKT_INSTRUCTION] = INST_BULK_READ
00744
00745 txpacket[PKT_PARAMETER0: PKT_PARAMETER0 + param_length] = param[0: param_length]
00746
00747 result = self.txPacket(port, txpacket)
00748 if result == COMM_SUCCESS:
00749 wait_length = 0
00750 i = 0
00751 while i < param_length:
00752 wait_length += DXL_MAKEWORD(param[i + 3], param[i + 4]) + 10
00753 i += 5
00754 port.setPacketTimeout(wait_length)
00755
00756 return result
00757
00758 def bulkWriteTxOnly(self, port, param, param_length):
00759 txpacket = [0] * (param_length + 10)
00760
00761
00762 txpacket[PKT_ID] = BROADCAST_ID
00763 txpacket[PKT_LENGTH_L] = DXL_LOBYTE(param_length + 3)
00764 txpacket[PKT_LENGTH_H] = DXL_HIBYTE(param_length + 3)
00765 txpacket[PKT_INSTRUCTION] = INST_BULK_WRITE
00766
00767 txpacket[PKT_PARAMETER0: PKT_PARAMETER0 + param_length] = param[0: param_length]
00768
00769 _, result, _ = self.txRxPacket(port, txpacket)
00770
00771 return result