protocol1_packet_handler.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 
00004 ################################################################################
00005 # Copyright 2017 ROBOTIS CO., LTD.
00006 #
00007 # Licensed under the Apache License, Version 2.0 (the "License");
00008 # you may not use this file except in compliance with the License.
00009 # You may obtain a copy of the License at
00010 #
00011 #     http://www.apache.org/licenses/LICENSE-2.0
00012 #
00013 # Unless required by applicable law or agreed to in writing, software
00014 # distributed under the License is distributed on an "AS IS" BASIS,
00015 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00016 # See the License for the specific language governing permissions and
00017 # limitations under the License.
00018 ################################################################################
00019 
00020 # Author: Ryu Woon Jung (Leon)
00021 
00022 from .robotis_def import *
00023 
00024 TXPACKET_MAX_LEN = 250
00025 RXPACKET_MAX_LEN = 250
00026 
00027 # for Protocol 1.0 Packet
00028 PKT_HEADER0 = 0
00029 PKT_HEADER1 = 1
00030 PKT_ID = 2
00031 PKT_LENGTH = 3
00032 PKT_INSTRUCTION = 4
00033 PKT_ERROR = 4
00034 PKT_PARAMETER0 = 5
00035 
00036 # Protocol 1.0 Error bit
00037 ERRBIT_VOLTAGE = 1  # Supplied voltage is out of the range (operating volatage set in the control table)
00038 ERRBIT_ANGLE = 2  # Goal position is written out of the range (from CW angle limit to CCW angle limit)
00039 ERRBIT_OVERHEAT = 4  # Temperature is out of the range (operating temperature set in the control table)
00040 ERRBIT_RANGE = 8  # Command(setting value) is out of the range for use.
00041 ERRBIT_CHECKSUM = 16  # Instruction packet checksum is incorrect.
00042 ERRBIT_OVERLOAD = 32  # The current load cannot be controlled by the set torque.
00043 ERRBIT_INSTRUCTION = 64  # Undefined instruction or delivering the action command without the reg_write command.
00044 
00045 
00046 class Protocol1PacketHandler(object):
00047     def getProtocolVersion(self):
00048         return 1.0
00049 
00050     def getTxRxResult(self, result):
00051         if result == COMM_SUCCESS:
00052             return "[TxRxResult] Communication success!"
00053         elif result == COMM_PORT_BUSY:
00054             return "[TxRxResult] Port is in use!"
00055         elif result == COMM_TX_FAIL:
00056             return "[TxRxResult] Failed transmit instruction packet!"
00057         elif result == COMM_RX_FAIL:
00058             return "[TxRxResult] Failed get status packet from device!"
00059         elif result == COMM_TX_ERROR:
00060             return "[TxRxResult] Incorrect instruction packet!"
00061         elif result == COMM_RX_WAITING:
00062             return "[TxRxResult] Now receiving status packet!"
00063         elif result == COMM_RX_TIMEOUT:
00064             return "[TxRxResult] There is no status packet!"
00065         elif result == COMM_RX_CORRUPT:
00066             return "[TxRxResult] Incorrect status packet!"
00067         elif result == COMM_NOT_AVAILABLE:
00068             return "[TxRxResult] Protocol does not support this function!"
00069         else:
00070             return ""
00071 
00072     def getRxPacketError(self, error):
00073         if error & ERRBIT_VOLTAGE:
00074             return "[RxPacketError] Input voltage error!"
00075 
00076         if error & ERRBIT_ANGLE:
00077             return "[RxPacketError] Angle limit error!"
00078 
00079         if error & ERRBIT_OVERHEAT:
00080             return "[RxPacketError] Overheat error!"
00081 
00082         if error & ERRBIT_RANGE:
00083             return "[RxPacketError] Out of range error!"
00084 
00085         if error & ERRBIT_CHECKSUM:
00086             return "[RxPacketError] Checksum error!"
00087 
00088         if error & ERRBIT_OVERLOAD:
00089             return "[RxPacketError] Overload error!"
00090 
00091         if error & ERRBIT_INSTRUCTION:
00092             return "[RxPacketError] Instruction code error!"
00093 
00094         return ""
00095 
00096     def txPacket(self, port, txpacket):
00097         checksum = 0
00098         total_packet_length = txpacket[PKT_LENGTH] + 4  # 4: HEADER0 HEADER1 ID LENGTH
00099 
00100         if port.is_using:
00101             return COMM_PORT_BUSY
00102         port.is_using = True
00103 
00104         # check max packet length
00105         if total_packet_length > TXPACKET_MAX_LEN:
00106             port.is_using = False
00107             return COMM_TX_ERROR
00108 
00109         # make packet header
00110         txpacket[PKT_HEADER0] = 0xFF
00111         txpacket[PKT_HEADER1] = 0xFF
00112 
00113         # add a checksum to the packet
00114         for idx in range(2, total_packet_length - 1):  # except header, checksum
00115             checksum += txpacket[idx]
00116 
00117         txpacket[total_packet_length - 1] = ~checksum & 0xFF
00118 
00119         #print "[TxPacket] %r" % txpacket
00120 
00121         # tx packet
00122         port.clearPort()
00123         written_packet_length = port.writePort(txpacket)
00124         if total_packet_length != written_packet_length:
00125             port.is_using = False
00126             return COMM_TX_FAIL
00127 
00128         return COMM_SUCCESS
00129 
00130     def rxPacket(self, port):
00131         rxpacket = []
00132 
00133         result = COMM_TX_FAIL
00134         checksum = 0
00135         rx_length = 0
00136         wait_length = 6  # minimum length (HEADER0 HEADER1 ID LENGTH ERROR CHKSUM)
00137 
00138         while True:
00139             rxpacket.extend(port.readPort(wait_length - rx_length))
00140             rx_length = len(rxpacket)
00141             if rx_length >= wait_length:
00142                 # find packet header
00143                 for idx in range(0, (rx_length - 1)):
00144                     if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF):
00145                         break
00146 
00147                 if idx == 0:  # found at the beginning of the packet
00148                     if (rxpacket[PKT_ID] > 0xFD) or (rxpacket[PKT_LENGTH] > RXPACKET_MAX_LEN) or (
00149                             rxpacket[PKT_ERROR] > 0x7F):
00150                         # unavailable ID or unavailable Length or unavailable Error
00151                         # remove the first byte in the packet
00152                         del rxpacket[0]
00153                         rx_length -= 1
00154                         continue
00155 
00156                     # re-calculate the exact length of the rx packet
00157                     if wait_length != (rxpacket[PKT_LENGTH] + PKT_LENGTH + 1):
00158                         wait_length = rxpacket[PKT_LENGTH] + PKT_LENGTH + 1
00159                         continue
00160 
00161                     if rx_length < wait_length:
00162                         # check timeout
00163                         if port.isPacketTimeout():
00164                             if rx_length == 0:
00165                                 result = COMM_RX_TIMEOUT
00166                             else:
00167                                 result = COMM_RX_CORRUPT
00168                             break
00169                         else:
00170                             continue
00171 
00172                     # calculate checksum
00173                     for i in range(2, wait_length - 1):  # except header, checksum
00174                         checksum += rxpacket[i]
00175                     checksum = ~checksum & 0xFF
00176 
00177                     # verify checksum
00178                     if rxpacket[wait_length - 1] == checksum:
00179                         result = COMM_SUCCESS
00180                     else:
00181                         result = COMM_RX_CORRUPT
00182                     break
00183 
00184                 else:
00185                     # remove unnecessary packets
00186                     del rxpacket[0: idx]
00187                     rx_length -= idx
00188 
00189             else:
00190                 # check timeout
00191                 if port.isPacketTimeout():
00192                     if rx_length == 0:
00193                         result = COMM_RX_TIMEOUT
00194                     else:
00195                         result = COMM_RX_CORRUPT
00196                     break
00197 
00198         port.is_using = False
00199 
00200         #print "[RxPacket] %r" % rxpacket
00201 
00202         return rxpacket, result
00203 
00204     # NOT for BulkRead
00205     def txRxPacket(self, port, txpacket):
00206         rxpacket = None
00207         error = 0
00208 
00209         # tx packet
00210         result = self.txPacket(port, txpacket)
00211         if result != COMM_SUCCESS:
00212             return rxpacket, result, error
00213 
00214         # (Instruction == BulkRead) == this function is not available.
00215         if txpacket[PKT_INSTRUCTION] == INST_BULK_READ:
00216             result = COMM_NOT_AVAILABLE
00217 
00218         # (ID == Broadcast ID) == no need to wait for status packet or not available
00219         if (txpacket[PKT_ID] == BROADCAST_ID):
00220             port.is_using = False
00221             return rxpacket, result, error
00222 
00223         # set packet timeout
00224         if txpacket[PKT_INSTRUCTION] == INST_READ:
00225             port.setPacketTimeout(txpacket[PKT_PARAMETER0 + 1] + 6)
00226         else:
00227             port.setPacketTimeout(6)  # HEADER0 HEADER1 ID LENGTH ERROR CHECKSUM
00228 
00229         # rx packet
00230         while True:
00231             rxpacket, result = self.rxPacket(port)
00232             if result != COMM_SUCCESS or txpacket[PKT_ID] == rxpacket[PKT_ID]:
00233                 break
00234 
00235         if result == COMM_SUCCESS and txpacket[PKT_ID] == rxpacket[PKT_ID]:
00236             error = rxpacket[PKT_ERROR]
00237 
00238         return rxpacket, result, error
00239 
00240     def ping(self, port, dxl_id):
00241         model_number = 0
00242         error = 0
00243 
00244         txpacket = [0] * 6
00245 
00246         if dxl_id >= BROADCAST_ID:
00247             return model_number, COMM_NOT_AVAILABLE, error
00248 
00249         txpacket[PKT_ID] = dxl_id
00250         txpacket[PKT_LENGTH] = 2
00251         txpacket[PKT_INSTRUCTION] = INST_PING
00252 
00253         rxpacket, result, error = self.txRxPacket(port, txpacket)
00254 
00255         if result == COMM_SUCCESS:
00256             data_read, result, error = self.readTxRx(port, dxl_id, 0, 2)  # Address 0 : Model Number
00257             if result == COMM_SUCCESS:
00258                 model_number = DXL_MAKEWORD(data_read[0], data_read[1])
00259 
00260         return model_number, result, error
00261 
00262     def broadcastPing(self, port):
00263         data_list = None
00264         return data_list, COMM_NOT_AVAILABLE
00265 
00266     def action(self, port, dxl_id):
00267         txpacket = [0] * 6
00268 
00269         txpacket[PKT_ID] = dxl_id
00270         txpacket[PKT_LENGTH] = 2
00271         txpacket[PKT_INSTRUCTION] = INST_ACTION
00272 
00273         _, result, _ = self.txRxPacket(port, txpacket)
00274 
00275         return result
00276 
00277     def reboot(self, port, dxl_id):
00278         return COMM_NOT_AVAILABLE, 0
00279 
00280     def factoryReset(self, port, dxl_id):
00281         txpacket = [0] * 6
00282 
00283         txpacket[PKT_ID] = dxl_id
00284         txpacket[PKT_LENGTH] = 2
00285         txpacket[PKT_INSTRUCTION] = INST_FACTORY_RESET
00286 
00287         _, result, error = self.txRxPacket(port, txpacket)
00288 
00289         return result, error
00290 
00291     def readTx(self, port, dxl_id, address, length):
00292 
00293         txpacket = [0] * 8
00294 
00295         if dxl_id >= BROADCAST_ID:
00296             return COMM_NOT_AVAILABLE
00297 
00298         txpacket[PKT_ID] = dxl_id
00299         txpacket[PKT_LENGTH] = 4
00300         txpacket[PKT_INSTRUCTION] = INST_READ
00301         txpacket[PKT_PARAMETER0 + 0] = address
00302         txpacket[PKT_PARAMETER0 + 1] = length
00303 
00304         result = self.txPacket(port, txpacket)
00305 
00306         # set packet timeout
00307         if result == COMM_SUCCESS:
00308             port.setPacketTimeout(length + 6)
00309 
00310         return result
00311 
00312     def readRx(self, port, dxl_id, length):
00313         result = COMM_TX_FAIL
00314         error = 0
00315 
00316         rxpacket = None
00317         data = []
00318 
00319         while True:
00320             rxpacket, result = self.rxPacket(port)
00321 
00322             if result != COMM_SUCCESS or rxpacket[PKT_ID] == dxl_id:
00323                 break
00324 
00325         if result == COMM_SUCCESS and rxpacket[PKT_ID] == dxl_id:
00326             error = rxpacket[PKT_ERROR]
00327 
00328             data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])
00329 
00330         return data, result, error
00331 
00332     def readTxRx(self, port, dxl_id, address, length):
00333         txpacket = [0] * 8
00334         data = []
00335 
00336         if dxl_id >= BROADCAST_ID:
00337             return data, COMM_NOT_AVAILABLE, 0
00338 
00339         txpacket[PKT_ID] = dxl_id
00340         txpacket[PKT_LENGTH] = 4
00341         txpacket[PKT_INSTRUCTION] = INST_READ
00342         txpacket[PKT_PARAMETER0 + 0] = address
00343         txpacket[PKT_PARAMETER0 + 1] = length
00344 
00345         rxpacket, result, error = self.txRxPacket(port, txpacket)
00346         if result == COMM_SUCCESS:
00347             error = rxpacket[PKT_ERROR]
00348 
00349             data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])
00350 
00351         return data, result, error
00352 
00353     def read1ByteTx(self, port, dxl_id, address):
00354         return self.readTx(port, dxl_id, address, 1)
00355 
00356     def read1ByteRx(self, port, dxl_id):
00357         data, result, error = self.readRx(port, dxl_id, 1)
00358         data_read = data[0] if (result == COMM_SUCCESS) else 0
00359         return data_read, result, error
00360 
00361     def read1ByteTxRx(self, port, dxl_id, address):
00362         data, result, error = self.readTxRx(port, dxl_id, address, 1)
00363         data_read = data[0] if (result == COMM_SUCCESS) else 0
00364         return data_read, result, error
00365 
00366     def read2ByteTx(self, port, dxl_id, address):
00367         return self.readTx(port, dxl_id, address, 2)
00368 
00369     def read2ByteRx(self, port, dxl_id):
00370         data, result, error = self.readRx(port, dxl_id, 2)
00371         data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
00372         return data_read, result, error
00373 
00374     def read2ByteTxRx(self, port, dxl_id, address):
00375         data, result, error = self.readTxRx(port, dxl_id, address, 2)
00376         data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
00377         return data_read, result, error
00378 
00379     def read4ByteTx(self, port, dxl_id, address):
00380         return self.readTx(port, dxl_id, address, 4)
00381 
00382     def read4ByteRx(self, port, dxl_id):
00383         data, result, error = self.readRx(port, dxl_id, 4)
00384         data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
00385                                   DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
00386         return data_read, result, error
00387 
00388     def read4ByteTxRx(self, port, dxl_id, address):
00389         data, result, error = self.readTxRx(port, dxl_id, address, 4)
00390         data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
00391                                   DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
00392         return data_read, result, error
00393 
00394     def writeTxOnly(self, port, dxl_id, address, length, data):
00395         txpacket = [0] * (length + 7)
00396 
00397         txpacket[PKT_ID] = dxl_id
00398         txpacket[PKT_LENGTH] = length + 3
00399         txpacket[PKT_INSTRUCTION] = INST_WRITE
00400         txpacket[PKT_PARAMETER0] = address
00401 
00402         txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
00403 
00404         result = self.txPacket(port, txpacket)
00405         port.is_using = False
00406 
00407         return result
00408 
00409     def writeTxRx(self, port, dxl_id, address, length, data):
00410         txpacket = [0] * (length + 7)
00411 
00412         txpacket[PKT_ID] = dxl_id
00413         txpacket[PKT_LENGTH] = length + 3
00414         txpacket[PKT_INSTRUCTION] = INST_WRITE
00415         txpacket[PKT_PARAMETER0] = address
00416 
00417         txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
00418         rxpacket, result, error = self.txRxPacket(port, txpacket)
00419 
00420         return result, error
00421 
00422     def write1ByteTxOnly(self, port, dxl_id, address, data):
00423         data_write = [data]
00424         return self.writeTxOnly(port, dxl_id, address, 1, data_write)
00425 
00426     def write1ByteTxRx(self, port, dxl_id, address, data):
00427         data_write = [data]
00428         return self.writeTxRx(port, dxl_id, address, 1, data_write)
00429 
00430     def write2ByteTxOnly(self, port, dxl_id, address, data):
00431         data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
00432         return self.writeTxOnly(port, dxl_id, address, 2, data_write)
00433 
00434     def write2ByteTxRx(self, port, dxl_id, address, data):
00435         data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
00436         return self.writeTxRx(port, dxl_id, address, 2, data_write)
00437 
00438     def write4ByteTxOnly(self, port, dxl_id, address, data):
00439         data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
00440                       DXL_HIBYTE(DXL_LOWORD(data)),
00441                       DXL_LOBYTE(DXL_HIWORD(data)),
00442                       DXL_HIBYTE(DXL_HIWORD(data))]
00443         return self.writeTxOnly(port, dxl_id, address, 4, data_write)
00444 
00445     def write4ByteTxRx(self, port, dxl_id, address, data):
00446         data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
00447                       DXL_HIBYTE(DXL_LOWORD(data)),
00448                       DXL_LOBYTE(DXL_HIWORD(data)),
00449                       DXL_HIBYTE(DXL_HIWORD(data))]
00450         return self.writeTxRx(port, dxl_id, address, 4, data_write)
00451 
00452     def regWriteTxOnly(self, port, dxl_id, address, length, data):
00453         txpacket = [0] * (length + 7)
00454 
00455         txpacket[PKT_ID] = dxl_id
00456         txpacket[PKT_LENGTH] = length + 3
00457         txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
00458         txpacket[PKT_PARAMETER0] = address
00459 
00460         txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
00461 
00462         result = self.txPacket(port, txpacket)
00463         port.is_using = False
00464 
00465         return result
00466 
00467     def regWriteTxRx(self, port, dxl_id, address, length, data):
00468         txpacket = [0] * (length + 7)
00469 
00470         txpacket[PKT_ID] = dxl_id
00471         txpacket[PKT_LENGTH] = length + 3
00472         txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
00473         txpacket[PKT_PARAMETER0] = address
00474 
00475         txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
00476 
00477         _, result, error = self.txRxPacket(port, txpacket)
00478 
00479         return result, error
00480 
00481     def syncReadTx(self, port, start_address, data_length, param, param_length):
00482         return COMM_NOT_AVAILABLE
00483 
00484     def syncWriteTxOnly(self, port, start_address, data_length, param, param_length):
00485         txpacket = [0] * (param_length + 8)
00486         # 8: HEADER0 HEADER1 ID LEN INST START_ADDR DATA_LEN ... CHKSUM
00487 
00488         txpacket[PKT_ID] = BROADCAST_ID
00489         txpacket[PKT_LENGTH] = param_length + 4  # 4: INST START_ADDR DATA_LEN ... CHKSUM
00490         txpacket[PKT_INSTRUCTION] = INST_SYNC_WRITE
00491         txpacket[PKT_PARAMETER0 + 0] = start_address
00492         txpacket[PKT_PARAMETER0 + 1] = data_length
00493 
00494         txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + param_length] = param[0: param_length]
00495 
00496         _, result, _ = self.txRxPacket(port, txpacket)
00497 
00498         return result
00499 
00500     def bulkReadTx(self, port, param, param_length):
00501         txpacket = [0] * (param_length + 7)
00502         # 7: HEADER0 HEADER1 ID LEN INST 0x00 ... CHKSUM
00503 
00504         txpacket[PKT_ID] = BROADCAST_ID
00505         txpacket[PKT_LENGTH] = param_length + 3  # 3: INST 0x00 ... CHKSUM
00506         txpacket[PKT_INSTRUCTION] = INST_BULK_READ
00507         txpacket[PKT_PARAMETER0 + 0] = 0x00
00508 
00509         txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + param_length] = param[0: param_length]
00510 
00511         result = self.txPacket(port, txpacket)
00512         if result == COMM_SUCCESS:
00513             wait_length = 0
00514             i = 0
00515             while i < param_length:
00516                 wait_length += param[i] + 7
00517                 i += 3
00518             port.setPacketTimeout(wait_length)
00519 
00520         return result
00521 
00522     def bulkWriteTxOnly(self, port, param, param_length):
00523         return COMM_NOT_AVAILABLE


ros
Author(s): Pyo , Zerom , Leon
autogenerated on Sat Jun 8 2019 18:32:11