protocol2_packet_handler.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 
00004 ################################################################################
00005 # Copyright 2017 ROBOTIS CO., LTD.
00006 #
00007 # Licensed under the Apache License, Version 2.0 (the "License");
00008 # you may not use this file except in compliance with the License.
00009 # You may obtain a copy of the License at
00010 #
00011 #     http://www.apache.org/licenses/LICENSE-2.0
00012 #
00013 # Unless required by applicable law or agreed to in writing, software
00014 # distributed under the License is distributed on an "AS IS" BASIS,
00015 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00016 # See the License for the specific language governing permissions and
00017 # limitations under the License.
00018 ################################################################################
00019 
00020 # Author: Ryu Woon Jung (Leon)
00021 
00022 from .robotis_def import *
00023 
00024 TXPACKET_MAX_LEN = 4 * 1024
00025 RXPACKET_MAX_LEN = 4 * 1024
00026 
00027 # for Protocol 2.0 Packet
00028 PKT_HEADER0 = 0
00029 PKT_HEADER1 = 1
00030 PKT_HEADER2 = 2
00031 PKT_RESERVED = 3
00032 PKT_ID = 4
00033 PKT_LENGTH_L = 5
00034 PKT_LENGTH_H = 6
00035 PKT_INSTRUCTION = 7
00036 PKT_ERROR = 8
00037 PKT_PARAMETER0 = 8
00038 
00039 # Protocol 2.0 Error bit
00040 ERRNUM_RESULT_FAIL = 1  # Failed to process the instruction packet.
00041 ERRNUM_INSTRUCTION = 2  # Instruction error
00042 ERRNUM_CRC = 3  # CRC check error
00043 ERRNUM_DATA_RANGE = 4  # Data range error
00044 ERRNUM_DATA_LENGTH = 5  # Data length error
00045 ERRNUM_DATA_LIMIT = 6  # Data limit error
00046 ERRNUM_ACCESS = 7  # Access error
00047 
00048 ERRBIT_ALERT = 128  # When the device has a problem, this bit is set to 1. Check "Device Status Check" value.
00049 
00050 
00051 class Protocol2PacketHandler(object):
00052     def getProtocolVersion(self):
00053         return 2.0
00054 
00055     def getTxRxResult(self, result):
00056         if result == COMM_SUCCESS:
00057             return "[TxRxResult] Communication success!"
00058         elif result == COMM_PORT_BUSY:
00059             return "[TxRxResult] Port is in use!"
00060         elif result == COMM_TX_FAIL:
00061             return "[TxRxResult] Failed transmit instruction packet!"
00062         elif result == COMM_RX_FAIL:
00063             return "[TxRxResult] Failed get status packet from device!"
00064         elif result == COMM_TX_ERROR:
00065             return "[TxRxResult] Incorrect instruction packet!"
00066         elif result == COMM_RX_WAITING:
00067             return "[TxRxResult] Now receiving status packet!"
00068         elif result == COMM_RX_TIMEOUT:
00069             return "[TxRxResult] There is no status packet!"
00070         elif result == COMM_RX_CORRUPT:
00071             return "[TxRxResult] Incorrect status packet!"
00072         elif result == COMM_NOT_AVAILABLE:
00073             return "[TxRxResult] Protocol does not support this function!"
00074         else:
00075             return ""
00076 
00077     def getRxPacketError(self, error):
00078         if error & ERRBIT_ALERT:
00079             return "[RxPacketError] Hardware error occurred. Check the error at Control Table (Hardware Error Status)!"
00080 
00081         not_alert_error = error & ~ERRBIT_ALERT
00082         if not_alert_error == 0:
00083             return ""
00084         elif not_alert_error == ERRNUM_RESULT_FAIL:
00085             return "[RxPacketError] Failed to process the instruction packet!"
00086 
00087         elif not_alert_error == ERRNUM_INSTRUCTION:
00088             return "[RxPacketError] Undefined instruction or incorrect instruction!"
00089 
00090         elif not_alert_error == ERRNUM_CRC:
00091             return "[RxPacketError] CRC doesn't match!"
00092 
00093         elif not_alert_error == ERRNUM_DATA_RANGE:
00094             return "[RxPacketError] The data value is out of range!"
00095 
00096         elif not_alert_error == ERRNUM_DATA_LENGTH:
00097             return "[RxPacketError] The data length does not match as expected!"
00098 
00099         elif not_alert_error == ERRNUM_DATA_LIMIT:
00100             return "[RxPacketError] The data value exceeds the limit value!"
00101 
00102         elif not_alert_error == ERRNUM_ACCESS:
00103             return "[RxPacketError] Writing or Reading is not available to target address!"
00104 
00105         else:
00106             return "[RxPacketError] Unknown error code!"
00107 
00108     def updateCRC(self, crc_accum, data_blk_ptr, data_blk_size):
00109         crc_table = [0x0000,
00110                      0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
00111                      0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027,
00112                      0x0022, 0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D,
00113                      0x8077, 0x0072, 0x0050, 0x8055, 0x805F, 0x005A, 0x804B,
00114                      0x004E, 0x0044, 0x8041, 0x80C3, 0x00C6, 0x00CC, 0x80C9,
00115                      0x00D8, 0x80DD, 0x80D7, 0x00D2, 0x00F0, 0x80F5, 0x80FF,
00116                      0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1, 0x00A0, 0x80A5,
00117                      0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1, 0x8093,
00118                      0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082,
00119                      0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197,
00120                      0x0192, 0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE,
00121                      0x01A4, 0x81A1, 0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB,
00122                      0x01FE, 0x01F4, 0x81F1, 0x81D3, 0x01D6, 0x01DC, 0x81D9,
00123                      0x01C8, 0x81CD, 0x81C7, 0x01C2, 0x0140, 0x8145, 0x814F,
00124                      0x014A, 0x815B, 0x015E, 0x0154, 0x8151, 0x8173, 0x0176,
00125                      0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162, 0x8123,
00126                      0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132,
00127                      0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104,
00128                      0x8101, 0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D,
00129                      0x8317, 0x0312, 0x0330, 0x8335, 0x833F, 0x033A, 0x832B,
00130                      0x032E, 0x0324, 0x8321, 0x0360, 0x8365, 0x836F, 0x036A,
00131                      0x837B, 0x037E, 0x0374, 0x8371, 0x8353, 0x0356, 0x035C,
00132                      0x8359, 0x0348, 0x834D, 0x8347, 0x0342, 0x03C0, 0x83C5,
00133                      0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1, 0x83F3,
00134                      0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2,
00135                      0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7,
00136                      0x03B2, 0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E,
00137                      0x0384, 0x8381, 0x0280, 0x8285, 0x828F, 0x028A, 0x829B,
00138                      0x029E, 0x0294, 0x8291, 0x82B3, 0x02B6, 0x02BC, 0x82B9,
00139                      0x02A8, 0x82AD, 0x82A7, 0x02A2, 0x82E3, 0x02E6, 0x02EC,
00140                      0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2, 0x02D0, 0x82D5,
00141                      0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1, 0x8243,
00142                      0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252,
00143                      0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264,
00144                      0x8261, 0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E,
00145                      0x0234, 0x8231, 0x8213, 0x0216, 0x021C, 0x8219, 0x0208,
00146                      0x820D, 0x8207, 0x0202]
00147 
00148         for j in range(0, data_blk_size):
00149             i = ((crc_accum >> 8) ^ data_blk_ptr[j]) & 0xFF
00150             crc_accum = ((crc_accum << 8) ^ crc_table[i]) & 0xFFFF
00151 
00152         return crc_accum
00153 
00154     def addStuffing(self, packet):
00155         packet_length_in = DXL_MAKEWORD(packet[PKT_LENGTH_L], packet[PKT_LENGTH_H])
00156         packet_length_out = packet_length_in
00157 
00158         temp = [0] * TXPACKET_MAX_LEN
00159 
00160         # FF FF FD XX ID LEN_L LEN_H
00161         temp[PKT_HEADER0: PKT_HEADER0 + PKT_LENGTH_H + 1] = packet[PKT_HEADER0: PKT_HEADER0 + PKT_LENGTH_H + 1]
00162 
00163         index = PKT_INSTRUCTION
00164 
00165         for i in range(0, packet_length_in - 2):  # except CRC
00166             temp[index] = packet[i + PKT_INSTRUCTION]
00167             index = index + 1
00168             if packet[i + PKT_INSTRUCTION] == 0xFD \
00169                     and packet[i + PKT_INSTRUCTION - 1] == 0xFF \
00170                     and packet[i + PKT_INSTRUCTION - 2] == 0xFF:
00171                 # FF FF FD
00172                 temp[index] = 0xFD
00173                 index = index + 1
00174                 packet_length_out = packet_length_out + 1
00175 
00176         temp[index] = packet[PKT_INSTRUCTION + packet_length_in - 2]
00177         temp[index + 1] = packet[PKT_INSTRUCTION + packet_length_in - 1]
00178         index = index + 2
00179 
00180         if packet_length_in != packet_length_out:
00181             packet = [0] * index
00182 
00183         packet[0: index] = temp[0: index]
00184 
00185         packet[PKT_LENGTH_L] = DXL_LOBYTE(packet_length_out)
00186         packet[PKT_LENGTH_H] = DXL_HIBYTE(packet_length_out)
00187 
00188         return packet
00189 
00190     def removeStuffing(self, packet):
00191         packet_length_in = DXL_MAKEWORD(packet[PKT_LENGTH_L], packet[PKT_LENGTH_H])
00192         packet_length_out = packet_length_in
00193 
00194         index = PKT_INSTRUCTION
00195         for i in range(0, (packet_length_in - 2)):  # except CRC
00196             if (packet[i + PKT_INSTRUCTION] == 0xFD) and (packet[i + PKT_INSTRUCTION + 1] == 0xFD) and (
00197                     packet[i + PKT_INSTRUCTION - 1] == 0xFF) and (packet[i + PKT_INSTRUCTION - 2] == 0xFF):
00198                 # FF FF FD FD
00199                 packet_length_out = packet_length_out - 1
00200                 i += 1
00201 
00202             packet[index] = packet[i + PKT_INSTRUCTION]
00203             index += 1
00204 
00205         packet[index] = packet[PKT_INSTRUCTION + packet_length_in - 2]
00206         packet[index + 1] = packet[PKT_INSTRUCTION + packet_length_in - 1]
00207 
00208         packet[PKT_LENGTH_L] = DXL_LOBYTE(packet_length_out)
00209         packet[PKT_LENGTH_H] = DXL_HIBYTE(packet_length_out)
00210 
00211         return packet
00212 
00213     def txPacket(self, port, txpacket):
00214         if port.is_using:
00215             return COMM_PORT_BUSY
00216         port.is_using = True
00217 
00218         # byte stuffing for header
00219         self.addStuffing(txpacket)
00220 
00221         # check max packet length
00222         total_packet_length = DXL_MAKEWORD(txpacket[PKT_LENGTH_L], txpacket[PKT_LENGTH_H]) + 7
00223         # 7: HEADER0 HEADER1 HEADER2 RESERVED ID LENGTH_L LENGTH_H
00224 
00225         if total_packet_length > TXPACKET_MAX_LEN:
00226             port.is_using = False
00227             return COMM_TX_ERROR
00228 
00229         # make packet header
00230         txpacket[PKT_HEADER0] = 0xFF
00231         txpacket[PKT_HEADER1] = 0xFF
00232         txpacket[PKT_HEADER2] = 0xFD
00233         txpacket[PKT_RESERVED] = 0x00
00234 
00235         # add CRC16
00236         crc = self.updateCRC(0, txpacket, total_packet_length - 2)  # 2: CRC16
00237 
00238         txpacket[total_packet_length - 2] = DXL_LOBYTE(crc)
00239         txpacket[total_packet_length - 1] = DXL_HIBYTE(crc)
00240 
00241         # tx packet
00242         port.clearPort()
00243         written_packet_length = port.writePort(txpacket)
00244         if total_packet_length != written_packet_length:
00245             port.is_using = False
00246             return COMM_TX_FAIL
00247 
00248         return COMM_SUCCESS
00249 
00250     def rxPacket(self, port):
00251         rxpacket = []
00252 
00253         result = COMM_TX_FAIL
00254         rx_length = 0
00255         wait_length = 11  # minimum length (HEADER0 HEADER1 HEADER2 RESERVED ID LENGTH_L LENGTH_H INST ERROR CRC16_L CRC16_H)
00256 
00257         while True:
00258             rxpacket.extend(port.readPort(wait_length - rx_length))
00259             rx_length = len(rxpacket)
00260             if rx_length >= wait_length:
00261                 # find packet header
00262                 for idx in range(0, (rx_length - 3)):
00263                     if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF) and (rxpacket[idx + 2] == 0xFD) and (
00264                             rxpacket[idx + 3] != 0xFD):
00265                         break
00266 
00267                 if idx == 0:
00268                     if (rxpacket[PKT_RESERVED] != 0x00) or (rxpacket[PKT_ID] > 0xFC) or (
00269                             DXL_MAKEWORD(rxpacket[PKT_LENGTH_L], rxpacket[PKT_LENGTH_H]) > RXPACKET_MAX_LEN) or (
00270                             rxpacket[PKT_INSTRUCTION] != 0x55):
00271                         # remove the first byte in the packet
00272                         del rxpacket[0]
00273                         rx_length -= 1
00274                         continue
00275 
00276                     if wait_length != (DXL_MAKEWORD(rxpacket[PKT_LENGTH_L], rxpacket[PKT_LENGTH_H]) + PKT_LENGTH_H + 1):
00277                         wait_length = DXL_MAKEWORD(rxpacket[PKT_LENGTH_L], rxpacket[PKT_LENGTH_H]) + PKT_LENGTH_H + 1
00278                         continue
00279 
00280                     if rx_length < wait_length:
00281                         if port.isPacketTimeout():
00282                             if rx_length == 0:
00283                                 result = COMM_RX_TIMEOUT
00284                             else:
00285                                 result = COMM_RX_CORRUPT
00286                             break
00287                         else:
00288                             continue
00289 
00290                     crc = DXL_MAKEWORD(rxpacket[wait_length - 2], rxpacket[wait_length - 1])
00291 
00292                     if self.updateCRC(0, rxpacket, wait_length - 2) == crc:
00293                         result = COMM_SUCCESS
00294                     else:
00295                         result = COMM_RX_CORRUPT
00296                     break
00297 
00298                 else:
00299                     # remove unnecessary packets
00300                     del rxpacket[0: idx]
00301                     rx_length -= idx
00302 
00303             else:
00304                 if port.isPacketTimeout():
00305                     if rx_length == 0:
00306                         result = COMM_RX_TIMEOUT
00307                     else:
00308                         result = COMM_RX_CORRUPT
00309                     break
00310 
00311         port.is_using = False
00312 
00313         if result == COMM_SUCCESS:
00314             rxpacket = self.removeStuffing(rxpacket)
00315 
00316         return rxpacket, result
00317 
00318     # NOT for BulkRead / SyncRead instruction
00319     def txRxPacket(self, port, txpacket):
00320         rxpacket = None
00321         error = 0
00322 
00323         # tx packet
00324         result = self.txPacket(port, txpacket)
00325         if result != COMM_SUCCESS:
00326             return rxpacket, result, error
00327 
00328         # (Instruction == BulkRead or SyncRead) == this function is not available.
00329         if txpacket[PKT_INSTRUCTION] == INST_BULK_READ or txpacket[PKT_INSTRUCTION] == INST_SYNC_READ:
00330             result = COMM_NOT_AVAILABLE
00331 
00332         # (ID == Broadcast ID) == no need to wait for status packet or not available.
00333         # (Instruction == action) == no need to wait for status packet
00334         if txpacket[PKT_ID] == BROADCAST_ID or txpacket[PKT_INSTRUCTION] == INST_ACTION:
00335             port.is_using = False
00336             return rxpacket, result, error
00337 
00338         # set packet timeout
00339         if txpacket[PKT_INSTRUCTION] == INST_READ:
00340             port.setPacketTimeout(DXL_MAKEWORD(txpacket[PKT_PARAMETER0 + 2], txpacket[PKT_PARAMETER0 + 3]) + 11)
00341         else:
00342             port.setPacketTimeout(11)
00343             # HEADER0 HEADER1 HEADER2 RESERVED ID LENGTH_L LENGTH_H INST ERROR CRC16_L CRC16_H
00344 
00345         # rx packet
00346         while True:
00347             rxpacket, result = self.rxPacket(port)
00348             if result != COMM_SUCCESS or txpacket[PKT_ID] == rxpacket[PKT_ID]:
00349                 break
00350 
00351         if result == COMM_SUCCESS and txpacket[PKT_ID] == rxpacket[PKT_ID]:
00352             error = rxpacket[PKT_ERROR]
00353 
00354         return rxpacket, result, error
00355 
00356     def ping(self, port, dxl_id):
00357         model_number = 0
00358         error = 0
00359 
00360         txpacket = [0] * 10
00361 
00362         if dxl_id >= BROADCAST_ID:
00363             return model_number, COMM_NOT_AVAILABLE, error
00364 
00365         txpacket[PKT_ID] = dxl_id
00366         txpacket[PKT_LENGTH_L] = 3
00367         txpacket[PKT_LENGTH_H] = 0
00368         txpacket[PKT_INSTRUCTION] = INST_PING
00369 
00370         rxpacket, result, error = self.txRxPacket(port, txpacket)
00371         if result == COMM_SUCCESS:
00372             model_number = DXL_MAKEWORD(rxpacket[PKT_PARAMETER0 + 1], rxpacket[PKT_PARAMETER0 + 2])
00373 
00374         return model_number, result, error
00375 
00376     def broadcastPing(self, port):
00377         data_list = {}
00378 
00379         STATUS_LENGTH = 14
00380 
00381         rx_length = 0
00382         wait_length = STATUS_LENGTH * MAX_ID
00383 
00384         txpacket = [0] * 10
00385         rxpacket = []
00386 
00387         txpacket[PKT_ID] = BROADCAST_ID
00388         txpacket[PKT_LENGTH_L] = 3
00389         txpacket[PKT_LENGTH_H] = 0
00390         txpacket[PKT_INSTRUCTION] = INST_PING
00391 
00392         result = self.txPacket(port, txpacket)
00393         if result != COMM_SUCCESS:
00394             port.is_using = False
00395             return data_list, result
00396 
00397         # set rx timeout
00398         port.setPacketTimeout(wait_length * 1)
00399 
00400         while True:
00401             rxpacket += port.readPort(wait_length - rx_length)
00402             rx_length = len(rxpacket)
00403 
00404             if port.isPacketTimeout():  # or rx_length >= wait_length
00405                 break
00406 
00407         port.is_using = False
00408 
00409         if rx_length == 0:
00410             return data_list, COMM_RX_TIMEOUT
00411 
00412         while True:
00413             if rx_length < STATUS_LENGTH:
00414                 return data_list, COMM_RX_CORRUPT
00415 
00416             # find packet header
00417             for idx in range(0, rx_length - 2):
00418                 if rxpacket[idx] == 0xFF and rxpacket[idx + 1] == 0xFF and rxpacket[idx + 2] == 0xFD:
00419                     break
00420 
00421             if idx == 0:  # found at the beginning of the packet
00422                 # verify CRC16
00423                 crc = DXL_MAKEWORD(rxpacket[STATUS_LENGTH - 2], rxpacket[STATUS_LENGTH - 1])
00424 
00425                 if self.updateCRC(0, rxpacket, STATUS_LENGTH - 2) == crc:
00426                     result = COMM_SUCCESS
00427 
00428                     data_list[rxpacket[PKT_ID]] = [
00429                         DXL_MAKEWORD(rxpacket[PKT_PARAMETER0 + 1], rxpacket[PKT_PARAMETER0 + 2]),
00430                         rxpacket[PKT_PARAMETER0 + 3]]
00431 
00432                     del rxpacket[0: STATUS_LENGTH]
00433                     rx_length = rx_length - STATUS_LENGTH
00434 
00435                     if rx_length == 0:
00436                         return data_list, result
00437 
00438                 else:
00439                     result = COMM_RX_CORRUPT
00440 
00441                     # remove header (0xFF 0xFF 0xFD)
00442                     del rxpacket[0: 3]
00443                     rx_length = rx_length - 3
00444 
00445             else:
00446                 # remove unnecessary packets
00447                 del rxpacket[0: idx]
00448                 rx_length = rx_length - idx
00449 
00450         # FIXME: unreachable code
00451         return data_list, result
00452 
00453     def action(self, port, dxl_id):
00454         txpacket = [0] * 10
00455 
00456         txpacket[PKT_ID] = dxl_id
00457         txpacket[PKT_LENGTH_L] = 3
00458         txpacket[PKT_LENGTH_H] = 0
00459         txpacket[PKT_INSTRUCTION] = INST_ACTION
00460 
00461         _, result, _ = self.txRxPacket(port, txpacket)
00462         return result
00463 
00464     def reboot(self, port, dxl_id):
00465         txpacket = [0] * 10
00466 
00467         txpacket[PKT_ID] = dxl_id
00468         txpacket[PKT_LENGTH_L] = 3
00469         txpacket[PKT_LENGTH_H] = 0
00470         txpacket[PKT_INSTRUCTION] = INST_REBOOT
00471 
00472         _, result, error = self.txRxPacket(port, txpacket)
00473         return result, error
00474 
00475     def factoryReset(self, port, dxl_id, option):
00476         txpacket = [0] * 11
00477 
00478         txpacket[PKT_ID] = dxl_id
00479         txpacket[PKT_LENGTH_L] = 4
00480         txpacket[PKT_LENGTH_H] = 0
00481         txpacket[PKT_INSTRUCTION] = INST_FACTORY_RESET
00482         txpacket[PKT_PARAMETER0] = option
00483 
00484         _, result, error = self.txRxPacket(port, txpacket)
00485         return result, error
00486 
00487     def readTx(self, port, dxl_id, address, length):
00488         txpacket = [0] * 14
00489 
00490         if dxl_id >= BROADCAST_ID:
00491             return COMM_NOT_AVAILABLE
00492 
00493         txpacket[PKT_ID] = dxl_id
00494         txpacket[PKT_LENGTH_L] = 7
00495         txpacket[PKT_LENGTH_H] = 0
00496         txpacket[PKT_INSTRUCTION] = INST_READ
00497         txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00498         txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00499         txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(length)
00500         txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(length)
00501 
00502         result = self.txPacket(port, txpacket)
00503 
00504         # set packet timeout
00505         if result == COMM_SUCCESS:
00506             port.setPacketTimeout(length + 11)
00507 
00508         return result
00509 
00510     def readRx(self, port, dxl_id, length):
00511         result = COMM_TX_FAIL
00512         error = 0
00513 
00514         rxpacket = None
00515         data = []
00516 
00517         while True:
00518             rxpacket, result = self.rxPacket(port)
00519 
00520             if result != COMM_SUCCESS or rxpacket[PKT_ID] == dxl_id:
00521                 break
00522 
00523         if result == COMM_SUCCESS and rxpacket[PKT_ID] == dxl_id:
00524             error = rxpacket[PKT_ERROR]
00525 
00526             data.extend(rxpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length])
00527 
00528         return data, result, error
00529 
00530     def readTxRx(self, port, dxl_id, address, length):
00531         error = 0
00532 
00533         txpacket = [0] * 14
00534         data = []
00535 
00536         if dxl_id >= BROADCAST_ID:
00537             return data, COMM_NOT_AVAILABLE, error
00538 
00539         txpacket[PKT_ID] = dxl_id
00540         txpacket[PKT_LENGTH_L] = 7
00541         txpacket[PKT_LENGTH_H] = 0
00542         txpacket[PKT_INSTRUCTION] = INST_READ
00543         txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00544         txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00545         txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(length)
00546         txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(length)
00547 
00548         rxpacket, result, error = self.txRxPacket(port, txpacket)
00549         if result == COMM_SUCCESS:
00550             error = rxpacket[PKT_ERROR]
00551 
00552             data.extend(rxpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length])
00553 
00554         return data, result, error
00555 
00556     def read1ByteTx(self, port, dxl_id, address):
00557         return self.readTx(port, dxl_id, address, 1)
00558 
00559     def read1ByteRx(self, port, dxl_id):
00560         data, result, error = self.readRx(port, dxl_id, 1)
00561         data_read = data[0] if (result == COMM_SUCCESS) else 0
00562         return data_read, result, error
00563 
00564     def read1ByteTxRx(self, port, dxl_id, address):
00565         data, result, error = self.readTxRx(port, dxl_id, address, 1)
00566         data_read = data[0] if (result == COMM_SUCCESS) else 0
00567         return data_read, result, error
00568 
00569     def read2ByteTx(self, port, dxl_id, address):
00570         return self.readTx(port, dxl_id, address, 2)
00571 
00572     def read2ByteRx(self, port, dxl_id):
00573         data, result, error = self.readRx(port, dxl_id, 2)
00574         data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
00575         return data_read, result, error
00576 
00577     def read2ByteTxRx(self, port, dxl_id, address):
00578         data, result, error = self.readTxRx(port, dxl_id, address, 2)
00579         data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
00580         return data_read, result, error
00581 
00582     def read4ByteTx(self, port, dxl_id, address):
00583         return self.readTx(port, dxl_id, address, 4)
00584 
00585     def read4ByteRx(self, port, dxl_id):
00586         data, result, error = self.readRx(port, dxl_id, 4)
00587         data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
00588                                   DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
00589         return data_read, result, error
00590 
00591     def read4ByteTxRx(self, port, dxl_id, address):
00592         data, result, error = self.readTxRx(port, dxl_id, address, 4)
00593         data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
00594                                   DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
00595         return data_read, result, error
00596 
00597     def writeTxOnly(self, port, dxl_id, address, length, data):
00598         txpacket = [0] * (length + 12)
00599 
00600         txpacket[PKT_ID] = dxl_id
00601         txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
00602         txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
00603         txpacket[PKT_INSTRUCTION] = INST_WRITE
00604         txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00605         txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00606 
00607         txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
00608 
00609         result = self.txPacket(port, txpacket)
00610         port.is_using = False
00611 
00612         return result
00613 
00614     def writeTxRx(self, port, dxl_id, address, length, data):
00615         txpacket = [0] * (length + 12)
00616 
00617         txpacket[PKT_ID] = dxl_id
00618         txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
00619         txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
00620         txpacket[PKT_INSTRUCTION] = INST_WRITE
00621         txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00622         txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00623 
00624         txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
00625         rxpacket, result, error = self.txRxPacket(port, txpacket)
00626 
00627         return result, error
00628 
00629     def write1ByteTxOnly(self, port, dxl_id, address, data):
00630         data_write = [data]
00631         return self.writeTxOnly(port, dxl_id, address, 1, data_write)
00632 
00633     def write1ByteTxRx(self, port, dxl_id, address, data):
00634         data_write = [data]
00635         return self.writeTxRx(port, dxl_id, address, 1, data_write)
00636 
00637     def write2ByteTxOnly(self, port, dxl_id, address, data):
00638         data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
00639         return self.writeTxOnly(port, dxl_id, address, 2, data_write)
00640 
00641     def write2ByteTxRx(self, port, dxl_id, address, data):
00642         data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
00643         return self.writeTxRx(port, dxl_id, address, 2, data_write)
00644 
00645     def write4ByteTxOnly(self, port, dxl_id, address, data):
00646         data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
00647                       DXL_HIBYTE(DXL_LOWORD(data)),
00648                       DXL_LOBYTE(DXL_HIWORD(data)),
00649                       DXL_HIBYTE(DXL_HIWORD(data))]
00650         return self.writeTxOnly(port, dxl_id, address, 4, data_write)
00651 
00652     def write4ByteTxRx(self, port, dxl_id, address, data):
00653         data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
00654                       DXL_HIBYTE(DXL_LOWORD(data)),
00655                       DXL_LOBYTE(DXL_HIWORD(data)),
00656                       DXL_HIBYTE(DXL_HIWORD(data))]
00657         return self.writeTxRx(port, dxl_id, address, 4, data_write)
00658 
00659     def regWriteTxOnly(self, port, dxl_id, address, length, data):
00660         txpacket = [0] * (length + 12)
00661 
00662         txpacket[PKT_ID] = dxl_id
00663         txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
00664         txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
00665         txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
00666         txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00667         txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00668 
00669         txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
00670 
00671         result = self.txPacket(port, txpacket)
00672         port.is_using = False
00673 
00674         return result
00675 
00676     def regWriteTxRx(self, port, dxl_id, address, length, data):
00677         txpacket = [0] * (length + 12)
00678 
00679         txpacket[PKT_ID] = dxl_id
00680         txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
00681         txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
00682         txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
00683         txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
00684         txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
00685 
00686         txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
00687 
00688         _, result, error = self.txRxPacket(port, txpacket)
00689 
00690         return result, error
00691 
00692     def syncReadTx(self, port, start_address, data_length, param, param_length):
00693         txpacket = [0] * (param_length + 14)
00694         # 14: HEADER0 HEADER1 HEADER2 RESERVED ID LEN_L LEN_H INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
00695 
00696         txpacket[PKT_ID] = BROADCAST_ID
00697         txpacket[PKT_LENGTH_L] = DXL_LOBYTE(
00698             param_length + 7)  # 7: INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
00699         txpacket[PKT_LENGTH_H] = DXL_HIBYTE(
00700             param_length + 7)  # 7: INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
00701         txpacket[PKT_INSTRUCTION] = INST_SYNC_READ
00702         txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(start_address)
00703         txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(start_address)
00704         txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(data_length)
00705         txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(data_length)
00706 
00707         txpacket[PKT_PARAMETER0 + 4: PKT_PARAMETER0 + 4 + param_length] = param[0: param_length]
00708 
00709         result = self.txPacket(port, txpacket)
00710         if result == COMM_SUCCESS:
00711             port.setPacketTimeout((11 + data_length) * param_length)
00712 
00713         return result
00714 
00715     def syncWriteTxOnly(self, port, start_address, data_length, param, param_length):
00716         txpacket = [0] * (param_length + 14)
00717         # 14: HEADER0 HEADER1 HEADER2 RESERVED ID LEN_L LEN_H INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
00718 
00719         txpacket[PKT_ID] = BROADCAST_ID
00720         txpacket[PKT_LENGTH_L] = DXL_LOBYTE(
00721             param_length + 7)  # 7: INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
00722         txpacket[PKT_LENGTH_H] = DXL_HIBYTE(
00723             param_length + 7)  # 7: INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
00724         txpacket[PKT_INSTRUCTION] = INST_SYNC_WRITE
00725         txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(start_address)
00726         txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(start_address)
00727         txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(data_length)
00728         txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(data_length)
00729 
00730         txpacket[PKT_PARAMETER0 + 4: PKT_PARAMETER0 + 4 + param_length] = param[0: param_length]
00731 
00732         _, result, _ = self.txRxPacket(port, txpacket)
00733 
00734         return result
00735 
00736     def bulkReadTx(self, port, param, param_length):
00737         txpacket = [0] * (param_length + 10)
00738         # 10: HEADER0 HEADER1 HEADER2 RESERVED ID LEN_L LEN_H INST CRC16_L CRC16_H
00739 
00740         txpacket[PKT_ID] = BROADCAST_ID
00741         txpacket[PKT_LENGTH_L] = DXL_LOBYTE(param_length + 3)  # 3: INST CRC16_L CRC16_H
00742         txpacket[PKT_LENGTH_H] = DXL_HIBYTE(param_length + 3)  # 3: INST CRC16_L CRC16_H
00743         txpacket[PKT_INSTRUCTION] = INST_BULK_READ
00744 
00745         txpacket[PKT_PARAMETER0: PKT_PARAMETER0 + param_length] = param[0: param_length]
00746 
00747         result = self.txPacket(port, txpacket)
00748         if result == COMM_SUCCESS:
00749             wait_length = 0
00750             i = 0
00751             while i < param_length:
00752                 wait_length += DXL_MAKEWORD(param[i + 3], param[i + 4]) + 10
00753                 i += 5
00754             port.setPacketTimeout(wait_length)
00755 
00756         return result
00757 
00758     def bulkWriteTxOnly(self, port, param, param_length):
00759         txpacket = [0] * (param_length + 10)
00760         # 10: HEADER0 HEADER1 HEADER2 RESERVED ID LEN_L LEN_H INST CRC16_L CRC16_H
00761 
00762         txpacket[PKT_ID] = BROADCAST_ID
00763         txpacket[PKT_LENGTH_L] = DXL_LOBYTE(param_length + 3)  # 3: INST CRC16_L CRC16_H
00764         txpacket[PKT_LENGTH_H] = DXL_HIBYTE(param_length + 3)  # 3: INST CRC16_L CRC16_H
00765         txpacket[PKT_INSTRUCTION] = INST_BULK_WRITE
00766 
00767         txpacket[PKT_PARAMETER0: PKT_PARAMETER0 + param_length] = param[0: param_length]
00768 
00769         _, result, _ = self.txRxPacket(port, txpacket)
00770 
00771         return result


ros
Author(s): Pyo , Zerom , Leon
autogenerated on Sat Jun 8 2019 18:32:11