22 from .robotis_def
import *
24 TXPACKET_MAX_LEN = 250
25 RXPACKET_MAX_LEN = 250
43 ERRBIT_INSTRUCTION = 64
51 if result == COMM_SUCCESS:
52 return "[TxRxResult] Communication success!" 53 elif result == COMM_PORT_BUSY:
54 return "[TxRxResult] Port is in use!" 55 elif result == COMM_TX_FAIL:
56 return "[TxRxResult] Failed transmit instruction packet!" 57 elif result == COMM_RX_FAIL:
58 return "[TxRxResult] Failed get status packet from device!" 59 elif result == COMM_TX_ERROR:
60 return "[TxRxResult] Incorrect instruction packet!" 61 elif result == COMM_RX_WAITING:
62 return "[TxRxResult] Now receiving status packet!" 63 elif result == COMM_RX_TIMEOUT:
64 return "[TxRxResult] There is no status packet!" 65 elif result == COMM_RX_CORRUPT:
66 return "[TxRxResult] Incorrect status packet!" 67 elif result == COMM_NOT_AVAILABLE:
68 return "[TxRxResult] Protocol does not support this function!" 73 if error & ERRBIT_VOLTAGE:
74 return "[RxPacketError] Input voltage error!" 76 if error & ERRBIT_ANGLE:
77 return "[RxPacketError] Angle limit error!" 79 if error & ERRBIT_OVERHEAT:
80 return "[RxPacketError] Overheat error!" 82 if error & ERRBIT_RANGE:
83 return "[RxPacketError] Out of range error!" 85 if error & ERRBIT_CHECKSUM:
86 return "[RxPacketError] Checksum error!" 88 if error & ERRBIT_OVERLOAD:
89 return "[RxPacketError] Overload error!" 91 if error & ERRBIT_INSTRUCTION:
92 return "[RxPacketError] Instruction code error!" 98 total_packet_length = txpacket[PKT_LENGTH] + 4
101 return COMM_PORT_BUSY
105 if total_packet_length > TXPACKET_MAX_LEN:
106 port.is_using =
False 110 txpacket[PKT_HEADER0] = 0xFF
111 txpacket[PKT_HEADER1] = 0xFF
114 for idx
in range(2, total_packet_length - 1):
115 checksum += txpacket[idx]
117 txpacket[total_packet_length - 1] = ~checksum & 0xFF
123 written_packet_length = port.writePort(txpacket)
124 if total_packet_length != written_packet_length:
125 port.is_using =
False 133 result = COMM_TX_FAIL
139 rxpacket.extend(port.readPort(wait_length - rx_length))
140 rx_length = len(rxpacket)
141 if rx_length >= wait_length:
143 for idx
in range(0, (rx_length - 1)):
144 if (rxpacket[idx] == 0xFF)
and (rxpacket[idx + 1] == 0xFF):
148 if (rxpacket[PKT_ID] > 0xFD)
or (rxpacket[PKT_LENGTH] > RXPACKET_MAX_LEN)
or (
149 rxpacket[PKT_ERROR] > 0x7F):
157 if wait_length != (rxpacket[PKT_LENGTH] + PKT_LENGTH + 1):
158 wait_length = rxpacket[PKT_LENGTH] + PKT_LENGTH + 1
161 if rx_length < wait_length:
163 if port.isPacketTimeout():
165 result = COMM_RX_TIMEOUT
167 result = COMM_RX_CORRUPT
173 for i
in range(2, wait_length - 1):
174 checksum += rxpacket[i]
175 checksum = ~checksum & 0xFF
178 if rxpacket[wait_length - 1] == checksum:
179 result = COMM_SUCCESS
181 result = COMM_RX_CORRUPT
191 if port.isPacketTimeout():
193 result = COMM_RX_TIMEOUT
195 result = COMM_RX_CORRUPT
198 port.is_using =
False 202 return rxpacket, result
210 result = self.
txPacket(port, txpacket)
211 if result != COMM_SUCCESS:
212 return rxpacket, result, error
215 if txpacket[PKT_INSTRUCTION] == INST_BULK_READ:
216 result = COMM_NOT_AVAILABLE
219 if (txpacket[PKT_ID] == BROADCAST_ID):
220 port.is_using =
False 221 return rxpacket, result, error
224 if txpacket[PKT_INSTRUCTION] == INST_READ:
225 port.setPacketTimeout(txpacket[PKT_PARAMETER0 + 1] + 6)
227 port.setPacketTimeout(6)
231 rxpacket, result = self.
rxPacket(port)
232 if result != COMM_SUCCESS
or txpacket[PKT_ID] == rxpacket[PKT_ID]:
235 if result == COMM_SUCCESS
and txpacket[PKT_ID] == rxpacket[PKT_ID]:
236 error = rxpacket[PKT_ERROR]
238 return rxpacket, result, error
246 if dxl_id >= BROADCAST_ID:
247 return model_number, COMM_NOT_AVAILABLE, error
249 txpacket[PKT_ID] = dxl_id
250 txpacket[PKT_LENGTH] = 2
251 txpacket[PKT_INSTRUCTION] = INST_PING
253 rxpacket, result, error = self.
txRxPacket(port, txpacket)
255 if result == COMM_SUCCESS:
256 data_read, result, error = self.
readTxRx(port, dxl_id, 0, 2)
257 if result == COMM_SUCCESS:
260 return model_number, result, error
264 return data_list, COMM_NOT_AVAILABLE
269 txpacket[PKT_ID] = dxl_id
270 txpacket[PKT_LENGTH] = 2
271 txpacket[PKT_INSTRUCTION] = INST_ACTION
273 _, result, _ = self.
txRxPacket(port, txpacket)
278 return COMM_NOT_AVAILABLE, 0
283 txpacket[PKT_ID] = dxl_id
284 txpacket[PKT_LENGTH] = 2
285 txpacket[PKT_INSTRUCTION] = INST_FACTORY_RESET
287 _, result, error = self.
txRxPacket(port, txpacket)
291 def readTx(self, port, dxl_id, address, length):
295 if dxl_id >= BROADCAST_ID:
296 return COMM_NOT_AVAILABLE
298 txpacket[PKT_ID] = dxl_id
299 txpacket[PKT_LENGTH] = 4
300 txpacket[PKT_INSTRUCTION] = INST_READ
301 txpacket[PKT_PARAMETER0 + 0] = address
302 txpacket[PKT_PARAMETER0 + 1] = length
304 result = self.
txPacket(port, txpacket)
307 if result == COMM_SUCCESS:
308 port.setPacketTimeout(length + 6)
313 result = COMM_TX_FAIL
320 rxpacket, result = self.
rxPacket(port)
322 if result != COMM_SUCCESS
or rxpacket[PKT_ID] == dxl_id:
325 if result == COMM_SUCCESS
and rxpacket[PKT_ID] == dxl_id:
326 error = rxpacket[PKT_ERROR]
328 data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])
330 return data, result, error
336 if dxl_id >= BROADCAST_ID:
337 return data, COMM_NOT_AVAILABLE, 0
339 txpacket[PKT_ID] = dxl_id
340 txpacket[PKT_LENGTH] = 4
341 txpacket[PKT_INSTRUCTION] = INST_READ
342 txpacket[PKT_PARAMETER0 + 0] = address
343 txpacket[PKT_PARAMETER0 + 1] = length
345 rxpacket, result, error = self.
txRxPacket(port, txpacket)
346 if result == COMM_SUCCESS:
347 error = rxpacket[PKT_ERROR]
349 data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])
351 return data, result, error
354 return self.
readTx(port, dxl_id, address, 1)
357 data, result, error = self.
readRx(port, dxl_id, 1)
358 data_read = data[0]
if (result == COMM_SUCCESS)
else 0
359 return data_read, result, error
362 data, result, error = self.
readTxRx(port, dxl_id, address, 1)
363 data_read = data[0]
if (result == COMM_SUCCESS)
else 0
364 return data_read, result, error
367 return self.
readTx(port, dxl_id, address, 2)
370 data, result, error = self.
readRx(port, dxl_id, 2)
371 data_read =
DXL_MAKEWORD(data[0], data[1])
if (result == COMM_SUCCESS)
else 0
372 return data_read, result, error
375 data, result, error = self.
readTxRx(port, dxl_id, address, 2)
376 data_read =
DXL_MAKEWORD(data[0], data[1])
if (result == COMM_SUCCESS)
else 0
377 return data_read, result, error
380 return self.
readTx(port, dxl_id, address, 4)
383 data, result, error = self.
readRx(port, dxl_id, 4)
385 DXL_MAKEWORD(data[2], data[3]))
if (result == COMM_SUCCESS)
else 0
386 return data_read, result, error
389 data, result, error = self.
readTxRx(port, dxl_id, address, 4)
391 DXL_MAKEWORD(data[2], data[3]))
if (result == COMM_SUCCESS)
else 0
392 return data_read, result, error
395 txpacket = [0] * (length + 7)
397 txpacket[PKT_ID] = dxl_id
398 txpacket[PKT_LENGTH] = length + 3
399 txpacket[PKT_INSTRUCTION] = INST_WRITE
400 txpacket[PKT_PARAMETER0] = address
402 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
404 result = self.
txPacket(port, txpacket)
405 port.is_using =
False 409 def writeTxRx(self, port, dxl_id, address, length, data):
410 txpacket = [0] * (length + 7)
412 txpacket[PKT_ID] = dxl_id
413 txpacket[PKT_LENGTH] = length + 3
414 txpacket[PKT_INSTRUCTION] = INST_WRITE
415 txpacket[PKT_PARAMETER0] = address
417 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
418 rxpacket, result, error = self.
txRxPacket(port, txpacket)
424 return self.
writeTxOnly(port, dxl_id, address, 1, data_write)
428 return self.
writeTxRx(port, dxl_id, address, 1, data_write)
432 return self.
writeTxOnly(port, dxl_id, address, 2, data_write)
436 return self.
writeTxRx(port, dxl_id, address, 2, data_write)
443 return self.
writeTxOnly(port, dxl_id, address, 4, data_write)
450 return self.
writeTxRx(port, dxl_id, address, 4, data_write)
453 txpacket = [0] * (length + 7)
455 txpacket[PKT_ID] = dxl_id
456 txpacket[PKT_LENGTH] = length + 3
457 txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
458 txpacket[PKT_PARAMETER0] = address
460 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
462 result = self.
txPacket(port, txpacket)
463 port.is_using =
False 468 txpacket = [0] * (length + 7)
470 txpacket[PKT_ID] = dxl_id
471 txpacket[PKT_LENGTH] = length + 3
472 txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
473 txpacket[PKT_PARAMETER0] = address
475 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
477 _, result, error = self.
txRxPacket(port, txpacket)
481 def syncReadTx(self, port, start_address, data_length, param, param_length):
482 return COMM_NOT_AVAILABLE
485 txpacket = [0] * (param_length + 8)
488 txpacket[PKT_ID] = BROADCAST_ID
489 txpacket[PKT_LENGTH] = param_length + 4
490 txpacket[PKT_INSTRUCTION] = INST_SYNC_WRITE
491 txpacket[PKT_PARAMETER0 + 0] = start_address
492 txpacket[PKT_PARAMETER0 + 1] = data_length
494 txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + param_length] = param[0: param_length]
496 _, result, _ = self.
txRxPacket(port, txpacket)
501 txpacket = [0] * (param_length + 7)
504 txpacket[PKT_ID] = BROADCAST_ID
505 txpacket[PKT_LENGTH] = param_length + 3
506 txpacket[PKT_INSTRUCTION] = INST_BULK_READ
507 txpacket[PKT_PARAMETER0 + 0] = 0x00
509 txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + param_length] = param[0: param_length]
511 result = self.
txPacket(port, txpacket)
512 if result == COMM_SUCCESS:
515 while i < param_length:
516 wait_length += param[i] + 7
518 port.setPacketTimeout(wait_length)
523 return COMM_NOT_AVAILABLE
def action(self, port, dxl_id)
def getRxPacketError(self, error)
#define DXL_MAKEDWORD(a, b)
def regWriteTxOnly(self, port, dxl_id, address, length, data)
def write1ByteTxOnly(self, port, dxl_id, address, data)
def write4ByteTxOnly(self, port, dxl_id, address, data)
def bulkWriteTxOnly(self, port, param, param_length)
def read1ByteTx(self, port, dxl_id, address)
#define DXL_MAKEWORD(a, b)
def read1ByteTxRx(self, port, dxl_id, address)
def getTxRxResult(self, result)
def write1ByteTxRx(self, port, dxl_id, address, data)
def txRxPacket(self, port, txpacket)
def read4ByteTx(self, port, dxl_id, address)
def read2ByteRx(self, port, dxl_id)
def readTxRx(self, port, dxl_id, address, length)
def read1ByteRx(self, port, dxl_id)
def syncWriteTxOnly(self, port, start_address, data_length, param, param_length)
def reboot(self, port, dxl_id)
def factoryReset(self, port, dxl_id)
def regWriteTxRx(self, port, dxl_id, address, length, data)
def bulkReadTx(self, port, param, param_length)
def syncReadTx(self, port, start_address, data_length, param, param_length)
def read4ByteTxRx(self, port, dxl_id, address)
def read4ByteRx(self, port, dxl_id)
def read2ByteTx(self, port, dxl_id, address)
def writeTxRx(self, port, dxl_id, address, length, data)
def readTx(self, port, dxl_id, address, length)
def broadcastPing(self, port)
def write2ByteTxOnly(self, port, dxl_id, address, data)
def writeTxOnly(self, port, dxl_id, address, length, data)
def write2ByteTxRx(self, port, dxl_id, address, data)
def txPacket(self, port, txpacket)
def read2ByteTxRx(self, port, dxl_id, address)
def getProtocolVersion(self)
def readRx(self, port, dxl_id, length)
def write4ByteTxRx(self, port, dxl_id, address, data)
def ping(self, port, dxl_id)