protocol2_packet_handler.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 ################################################################################
5 # Copyright 2017 ROBOTIS CO., LTD.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 ################################################################################
19 
20 # Author: Ryu Woon Jung (Leon)
21 
22 from .robotis_def import *
23 
24 TXPACKET_MAX_LEN = 1 * 1024
25 RXPACKET_MAX_LEN = 1 * 1024
26 
27 # for Protocol 2.0 Packet
28 PKT_HEADER0 = 0
29 PKT_HEADER1 = 1
30 PKT_HEADER2 = 2
31 PKT_RESERVED = 3
32 PKT_ID = 4
33 PKT_LENGTH_L = 5
34 PKT_LENGTH_H = 6
35 PKT_INSTRUCTION = 7
36 PKT_ERROR = 8
37 PKT_PARAMETER0 = 8
38 
39 # Protocol 2.0 Error bit
40 ERRNUM_RESULT_FAIL = 1 # Failed to process the instruction packet.
41 ERRNUM_INSTRUCTION = 2 # Instruction error
42 ERRNUM_CRC = 3 # CRC check error
43 ERRNUM_DATA_RANGE = 4 # Data range error
44 ERRNUM_DATA_LENGTH = 5 # Data length error
45 ERRNUM_DATA_LIMIT = 6 # Data limit error
46 ERRNUM_ACCESS = 7 # Access error
47 
48 ERRBIT_ALERT = 128 # When the device has a problem, this bit is set to 1. Check "Device Status Check" value.
49 
50 
51 class Protocol2PacketHandler(object):
52  def getProtocolVersion(self):
53  return 2.0
54 
55  def getTxRxResult(self, result):
56  if result == COMM_SUCCESS:
57  return "[TxRxResult] Communication success!"
58  elif result == COMM_PORT_BUSY:
59  return "[TxRxResult] Port is in use!"
60  elif result == COMM_TX_FAIL:
61  return "[TxRxResult] Failed transmit instruction packet!"
62  elif result == COMM_RX_FAIL:
63  return "[TxRxResult] Failed get status packet from device!"
64  elif result == COMM_TX_ERROR:
65  return "[TxRxResult] Incorrect instruction packet!"
66  elif result == COMM_RX_WAITING:
67  return "[TxRxResult] Now receiving status packet!"
68  elif result == COMM_RX_TIMEOUT:
69  return "[TxRxResult] There is no status packet!"
70  elif result == COMM_RX_CORRUPT:
71  return "[TxRxResult] Incorrect status packet!"
72  elif result == COMM_NOT_AVAILABLE:
73  return "[TxRxResult] Protocol does not support this function!"
74  else:
75  return ""
76 
77  def getRxPacketError(self, error):
78  if error & ERRBIT_ALERT:
79  return "[RxPacketError] Hardware error occurred. Check the error at Control Table (Hardware Error Status)!"
80 
81  not_alert_error = error & ~ERRBIT_ALERT
82  if not_alert_error == 0:
83  return ""
84  elif not_alert_error == ERRNUM_RESULT_FAIL:
85  return "[RxPacketError] Failed to process the instruction packet!"
86 
87  elif not_alert_error == ERRNUM_INSTRUCTION:
88  return "[RxPacketError] Undefined instruction or incorrect instruction!"
89 
90  elif not_alert_error == ERRNUM_CRC:
91  return "[RxPacketError] CRC doesn't match!"
92 
93  elif not_alert_error == ERRNUM_DATA_RANGE:
94  return "[RxPacketError] The data value is out of range!"
95 
96  elif not_alert_error == ERRNUM_DATA_LENGTH:
97  return "[RxPacketError] The data length does not match as expected!"
98 
99  elif not_alert_error == ERRNUM_DATA_LIMIT:
100  return "[RxPacketError] The data value exceeds the limit value!"
101 
102  elif not_alert_error == ERRNUM_ACCESS:
103  return "[RxPacketError] Writing or Reading is not available to target address!"
104 
105  else:
106  return "[RxPacketError] Unknown error code!"
107 
108  def updateCRC(self, crc_accum, data_blk_ptr, data_blk_size):
109  crc_table = [0x0000,
110  0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
111  0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027,
112  0x0022, 0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D,
113  0x8077, 0x0072, 0x0050, 0x8055, 0x805F, 0x005A, 0x804B,
114  0x004E, 0x0044, 0x8041, 0x80C3, 0x00C6, 0x00CC, 0x80C9,
115  0x00D8, 0x80DD, 0x80D7, 0x00D2, 0x00F0, 0x80F5, 0x80FF,
116  0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1, 0x00A0, 0x80A5,
117  0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1, 0x8093,
118  0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082,
119  0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197,
120  0x0192, 0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE,
121  0x01A4, 0x81A1, 0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB,
122  0x01FE, 0x01F4, 0x81F1, 0x81D3, 0x01D6, 0x01DC, 0x81D9,
123  0x01C8, 0x81CD, 0x81C7, 0x01C2, 0x0140, 0x8145, 0x814F,
124  0x014A, 0x815B, 0x015E, 0x0154, 0x8151, 0x8173, 0x0176,
125  0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162, 0x8123,
126  0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132,
127  0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104,
128  0x8101, 0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D,
129  0x8317, 0x0312, 0x0330, 0x8335, 0x833F, 0x033A, 0x832B,
130  0x032E, 0x0324, 0x8321, 0x0360, 0x8365, 0x836F, 0x036A,
131  0x837B, 0x037E, 0x0374, 0x8371, 0x8353, 0x0356, 0x035C,
132  0x8359, 0x0348, 0x834D, 0x8347, 0x0342, 0x03C0, 0x83C5,
133  0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1, 0x83F3,
134  0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2,
135  0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7,
136  0x03B2, 0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E,
137  0x0384, 0x8381, 0x0280, 0x8285, 0x828F, 0x028A, 0x829B,
138  0x029E, 0x0294, 0x8291, 0x82B3, 0x02B6, 0x02BC, 0x82B9,
139  0x02A8, 0x82AD, 0x82A7, 0x02A2, 0x82E3, 0x02E6, 0x02EC,
140  0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2, 0x02D0, 0x82D5,
141  0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1, 0x8243,
142  0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252,
143  0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264,
144  0x8261, 0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E,
145  0x0234, 0x8231, 0x8213, 0x0216, 0x021C, 0x8219, 0x0208,
146  0x820D, 0x8207, 0x0202]
147 
148  for j in range(0, data_blk_size):
149  i = ((crc_accum >> 8) ^ data_blk_ptr[j]) & 0xFF
150  crc_accum = ((crc_accum << 8) ^ crc_table[i]) & 0xFFFF
151 
152  return crc_accum
153 
154  def addStuffing(self, packet):
155  packet_length_in = DXL_MAKEWORD(packet[PKT_LENGTH_L], packet[PKT_LENGTH_H])
156  packet_length_out = packet_length_in
157 
158  temp = [0] * TXPACKET_MAX_LEN
159 
160  # FF FF FD XX ID LEN_L LEN_H
161  temp[PKT_HEADER0: PKT_HEADER0 + PKT_LENGTH_H + 1] = packet[PKT_HEADER0: PKT_HEADER0 + PKT_LENGTH_H + 1]
162 
163  index = PKT_INSTRUCTION
164 
165  for i in range(0, packet_length_in - 2): # except CRC
166  temp[index] = packet[i + PKT_INSTRUCTION]
167  index = index + 1
168  if packet[i + PKT_INSTRUCTION] == 0xFD \
169  and packet[i + PKT_INSTRUCTION - 1] == 0xFF \
170  and packet[i + PKT_INSTRUCTION - 2] == 0xFF:
171  # FF FF FD
172  temp[index] = 0xFD
173  index = index + 1
174  packet_length_out = packet_length_out + 1
175 
176  temp[index] = packet[PKT_INSTRUCTION + packet_length_in - 2]
177  temp[index + 1] = packet[PKT_INSTRUCTION + packet_length_in - 1]
178  index = index + 2
179 
180  if packet_length_in != packet_length_out:
181  packet = [0] * index
182 
183  packet[0: index] = temp[0: index]
184 
185  packet[PKT_LENGTH_L] = DXL_LOBYTE(packet_length_out)
186  packet[PKT_LENGTH_H] = DXL_HIBYTE(packet_length_out)
187 
188  return packet
189 
190  def removeStuffing(self, packet):
191  packet_length_in = DXL_MAKEWORD(packet[PKT_LENGTH_L], packet[PKT_LENGTH_H])
192  packet_length_out = packet_length_in
193 
194  index = PKT_INSTRUCTION
195  for i in range(0, (packet_length_in - 2)): # except CRC
196  if (packet[i + PKT_INSTRUCTION] == 0xFD) and (packet[i + PKT_INSTRUCTION + 1] == 0xFD) and (
197  packet[i + PKT_INSTRUCTION - 1] == 0xFF) and (packet[i + PKT_INSTRUCTION - 2] == 0xFF):
198  # FF FF FD FD
199  packet_length_out = packet_length_out - 1
200  else:
201  packet[index] = packet[i + PKT_INSTRUCTION]
202  index += 1
203 
204  packet[index] = packet[PKT_INSTRUCTION + packet_length_in - 2]
205  packet[index + 1] = packet[PKT_INSTRUCTION + packet_length_in - 1]
206 
207  packet[PKT_LENGTH_L] = DXL_LOBYTE(packet_length_out)
208  packet[PKT_LENGTH_H] = DXL_HIBYTE(packet_length_out)
209 
210  return packet
211 
212  def txPacket(self, port, txpacket):
213  if port.is_using:
214  return COMM_PORT_BUSY
215  port.is_using = True
216 
217  # byte stuffing for header
218  self.addStuffing(txpacket)
219 
220  # check max packet length
221  total_packet_length = DXL_MAKEWORD(txpacket[PKT_LENGTH_L], txpacket[PKT_LENGTH_H]) + 7
222  # 7: HEADER0 HEADER1 HEADER2 RESERVED ID LENGTH_L LENGTH_H
223 
224  if total_packet_length > TXPACKET_MAX_LEN:
225  port.is_using = False
226  return COMM_TX_ERROR
227 
228  # make packet header
229  txpacket[PKT_HEADER0] = 0xFF
230  txpacket[PKT_HEADER1] = 0xFF
231  txpacket[PKT_HEADER2] = 0xFD
232  txpacket[PKT_RESERVED] = 0x00
233 
234  # add CRC16
235  crc = self.updateCRC(0, txpacket, total_packet_length - 2) # 2: CRC16
236 
237  txpacket[total_packet_length - 2] = DXL_LOBYTE(crc)
238  txpacket[total_packet_length - 1] = DXL_HIBYTE(crc)
239 
240  # tx packet
241  port.clearPort()
242  written_packet_length = port.writePort(txpacket)
243  if total_packet_length != written_packet_length:
244  port.is_using = False
245  return COMM_TX_FAIL
246 
247  return COMM_SUCCESS
248 
249  def rxPacket(self, port):
250  rxpacket = []
251 
252  result = COMM_TX_FAIL
253  rx_length = 0
254  wait_length = 11 # minimum length (HEADER0 HEADER1 HEADER2 RESERVED ID LENGTH_L LENGTH_H INST ERROR CRC16_L CRC16_H)
255 
256  while True:
257  rxpacket.extend(port.readPort(wait_length - rx_length))
258  rx_length = len(rxpacket)
259  if rx_length >= wait_length:
260  # find packet header
261  for idx in range(0, (rx_length - 3)):
262  if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF) and (rxpacket[idx + 2] == 0xFD) and (
263  rxpacket[idx + 3] != 0xFD):
264  break
265 
266  if idx == 0:
267  if (rxpacket[PKT_RESERVED] != 0x00) or (rxpacket[PKT_ID] > 0xFC) or (
268  DXL_MAKEWORD(rxpacket[PKT_LENGTH_L], rxpacket[PKT_LENGTH_H]) > RXPACKET_MAX_LEN) or (
269  rxpacket[PKT_INSTRUCTION] != 0x55):
270  # remove the first byte in the packet
271  del rxpacket[0]
272  rx_length -= 1
273  continue
274 
275  if wait_length != (DXL_MAKEWORD(rxpacket[PKT_LENGTH_L], rxpacket[PKT_LENGTH_H]) + PKT_LENGTH_H + 1):
276  wait_length = DXL_MAKEWORD(rxpacket[PKT_LENGTH_L], rxpacket[PKT_LENGTH_H]) + PKT_LENGTH_H + 1
277  continue
278 
279  if rx_length < wait_length:
280  if port.isPacketTimeout():
281  if rx_length == 0:
282  result = COMM_RX_TIMEOUT
283  else:
284  result = COMM_RX_CORRUPT
285  break
286  else:
287  continue
288 
289  crc = DXL_MAKEWORD(rxpacket[wait_length - 2], rxpacket[wait_length - 1])
290 
291  if self.updateCRC(0, rxpacket, wait_length - 2) == crc:
292  result = COMM_SUCCESS
293  else:
294  result = COMM_RX_CORRUPT
295  break
296 
297  else:
298  # remove unnecessary packets
299  del rxpacket[0: idx]
300  rx_length -= idx
301 
302  else:
303  if port.isPacketTimeout():
304  if rx_length == 0:
305  result = COMM_RX_TIMEOUT
306  else:
307  result = COMM_RX_CORRUPT
308  break
309 
310  port.is_using = False
311 
312  if result == COMM_SUCCESS:
313  rxpacket = self.removeStuffing(rxpacket)
314 
315  return rxpacket, result
316 
317  # NOT for BulkRead / SyncRead instruction
318  def txRxPacket(self, port, txpacket):
319  rxpacket = None
320  error = 0
321 
322  # tx packet
323  result = self.txPacket(port, txpacket)
324  if result != COMM_SUCCESS:
325  return rxpacket, result, error
326 
327  # (Instruction == BulkRead or SyncRead) == this function is not available.
328  if txpacket[PKT_INSTRUCTION] == INST_BULK_READ or txpacket[PKT_INSTRUCTION] == INST_SYNC_READ:
329  result = COMM_NOT_AVAILABLE
330 
331  # (ID == Broadcast ID) == no need to wait for status packet or not available.
332  # (Instruction == action) == no need to wait for status packet
333  if txpacket[PKT_ID] == BROADCAST_ID or txpacket[PKT_INSTRUCTION] == INST_ACTION:
334  port.is_using = False
335  return rxpacket, result, error
336 
337  # set packet timeout
338  if txpacket[PKT_INSTRUCTION] == INST_READ:
339  port.setPacketTimeout(DXL_MAKEWORD(txpacket[PKT_PARAMETER0 + 2], txpacket[PKT_PARAMETER0 + 3]) + 11)
340  else:
341  port.setPacketTimeout(11)
342  # HEADER0 HEADER1 HEADER2 RESERVED ID LENGTH_L LENGTH_H INST ERROR CRC16_L CRC16_H
343 
344  # rx packet
345  while True:
346  rxpacket, result = self.rxPacket(port)
347  if result != COMM_SUCCESS or txpacket[PKT_ID] == rxpacket[PKT_ID]:
348  break
349 
350  if result == COMM_SUCCESS and txpacket[PKT_ID] == rxpacket[PKT_ID]:
351  error = rxpacket[PKT_ERROR]
352 
353  return rxpacket, result, error
354 
355  def ping(self, port, dxl_id):
356  model_number = 0
357  error = 0
358 
359  txpacket = [0] * 10
360 
361  if dxl_id >= BROADCAST_ID:
362  return model_number, COMM_NOT_AVAILABLE, error
363 
364  txpacket[PKT_ID] = dxl_id
365  txpacket[PKT_LENGTH_L] = 3
366  txpacket[PKT_LENGTH_H] = 0
367  txpacket[PKT_INSTRUCTION] = INST_PING
368 
369  rxpacket, result, error = self.txRxPacket(port, txpacket)
370  if result == COMM_SUCCESS:
371  model_number = DXL_MAKEWORD(rxpacket[PKT_PARAMETER0 + 1], rxpacket[PKT_PARAMETER0 + 2])
372 
373  return model_number, result, error
374 
375  def broadcastPing(self, port):
376  data_list = {}
377 
378  STATUS_LENGTH = 14
379 
380  rx_length = 0
381  wait_length = STATUS_LENGTH * MAX_ID
382 
383  txpacket = [0] * 10
384  rxpacket = []
385 
386  tx_time_per_byte = (1000.0 / port.getBaudRate()) *10.0;
387 
388  txpacket[PKT_ID] = BROADCAST_ID
389  txpacket[PKT_LENGTH_L] = 3
390  txpacket[PKT_LENGTH_H] = 0
391  txpacket[PKT_INSTRUCTION] = INST_PING
392 
393  result = self.txPacket(port, txpacket)
394  if result != COMM_SUCCESS:
395  port.is_using = False
396  return data_list, result
397 
398  # set rx timeout
399  #port.setPacketTimeout(wait_length * 1)
400  port.setPacketTimeoutMillis((wait_length * tx_time_per_byte) + (3.0 * MAX_ID) + 16.0);
401 
402  while True:
403  rxpacket += port.readPort(wait_length - rx_length)
404  rx_length = len(rxpacket)
405 
406  if port.isPacketTimeout(): # or rx_length >= wait_length
407  break
408 
409  port.is_using = False
410 
411  if rx_length == 0:
412  return data_list, COMM_RX_TIMEOUT
413 
414  while True:
415  if rx_length < STATUS_LENGTH:
416  return data_list, COMM_RX_CORRUPT
417 
418  # find packet header
419  for idx in range(0, rx_length - 2):
420  if rxpacket[idx] == 0xFF and rxpacket[idx + 1] == 0xFF and rxpacket[idx + 2] == 0xFD:
421  break
422 
423  if idx == 0: # found at the beginning of the packet
424  # verify CRC16
425  crc = DXL_MAKEWORD(rxpacket[STATUS_LENGTH - 2], rxpacket[STATUS_LENGTH - 1])
426 
427  if self.updateCRC(0, rxpacket, STATUS_LENGTH - 2) == crc:
428  result = COMM_SUCCESS
429 
430  data_list[rxpacket[PKT_ID]] = [
431  DXL_MAKEWORD(rxpacket[PKT_PARAMETER0 + 1], rxpacket[PKT_PARAMETER0 + 2]),
432  rxpacket[PKT_PARAMETER0 + 3]]
433 
434  del rxpacket[0: STATUS_LENGTH]
435  rx_length = rx_length - STATUS_LENGTH
436 
437  if rx_length == 0:
438  return data_list, result
439 
440  else:
441  result = COMM_RX_CORRUPT
442 
443  # remove header (0xFF 0xFF 0xFD)
444  del rxpacket[0: 3]
445  rx_length = rx_length - 3
446 
447  else:
448  # remove unnecessary packets
449  del rxpacket[0: idx]
450  rx_length = rx_length - idx
451 
452  # FIXME: unreachable code
453  return data_list, result
454 
455  def action(self, port, dxl_id):
456  txpacket = [0] * 10
457 
458  txpacket[PKT_ID] = dxl_id
459  txpacket[PKT_LENGTH_L] = 3
460  txpacket[PKT_LENGTH_H] = 0
461  txpacket[PKT_INSTRUCTION] = INST_ACTION
462 
463  _, result, _ = self.txRxPacket(port, txpacket)
464  return result
465 
466  def reboot(self, port, dxl_id):
467  txpacket = [0] * 10
468 
469  txpacket[PKT_ID] = dxl_id
470  txpacket[PKT_LENGTH_L] = 3
471  txpacket[PKT_LENGTH_H] = 0
472  txpacket[PKT_INSTRUCTION] = INST_REBOOT
473 
474  _, result, error = self.txRxPacket(port, txpacket)
475  return result, error
476 
477  def clearMultiTurn(self, port, dxl_id):
478  txpacket = [0] * 15
479 
480  txpacket[PKT_ID] = dxl_id
481  txpacket[PKT_LENGTH_L] = 8
482  txpacket[PKT_LENGTH_H] = 0
483  txpacket[PKT_INSTRUCTION] = INST_CLEAR
484  txpacket[PKT_PARAMETER0 + 0] = 0x01
485  txpacket[PKT_PARAMETER0 + 1] = 0x44
486  txpacket[PKT_PARAMETER0 + 2] = 0x58
487  txpacket[PKT_PARAMETER0 + 3] = 0x4C
488  txpacket[PKT_PARAMETER0 + 4] = 0x22
489 
490  _, result, error = self.txRxPacket(port, txpacket)
491  return result, error
492 
493  def factoryReset(self, port, dxl_id, option):
494  txpacket = [0] * 11
495 
496  txpacket[PKT_ID] = dxl_id
497  txpacket[PKT_LENGTH_L] = 4
498  txpacket[PKT_LENGTH_H] = 0
499  txpacket[PKT_INSTRUCTION] = INST_FACTORY_RESET
500  txpacket[PKT_PARAMETER0] = option
501 
502  _, result, error = self.txRxPacket(port, txpacket)
503  return result, error
504 
505  def readTx(self, port, dxl_id, address, length):
506  txpacket = [0] * 14
507 
508  if dxl_id >= BROADCAST_ID:
509  return COMM_NOT_AVAILABLE
510 
511  txpacket[PKT_ID] = dxl_id
512  txpacket[PKT_LENGTH_L] = 7
513  txpacket[PKT_LENGTH_H] = 0
514  txpacket[PKT_INSTRUCTION] = INST_READ
515  txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
516  txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
517  txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(length)
518  txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(length)
519 
520  result = self.txPacket(port, txpacket)
521 
522  # set packet timeout
523  if result == COMM_SUCCESS:
524  port.setPacketTimeout(length + 11)
525 
526  return result
527 
528  def readRx(self, port, dxl_id, length):
529  result = COMM_TX_FAIL
530  error = 0
531 
532  rxpacket = None
533  data = []
534 
535  while True:
536  rxpacket, result = self.rxPacket(port)
537 
538  if result != COMM_SUCCESS or rxpacket[PKT_ID] == dxl_id:
539  break
540 
541  if result == COMM_SUCCESS and rxpacket[PKT_ID] == dxl_id:
542  error = rxpacket[PKT_ERROR]
543 
544  data.extend(rxpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length])
545 
546  return data, result, error
547 
548  def readTxRx(self, port, dxl_id, address, length):
549  error = 0
550 
551  txpacket = [0] * 14
552  data = []
553 
554  if dxl_id >= BROADCAST_ID:
555  return data, COMM_NOT_AVAILABLE, error
556 
557  txpacket[PKT_ID] = dxl_id
558  txpacket[PKT_LENGTH_L] = 7
559  txpacket[PKT_LENGTH_H] = 0
560  txpacket[PKT_INSTRUCTION] = INST_READ
561  txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
562  txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
563  txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(length)
564  txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(length)
565 
566  rxpacket, result, error = self.txRxPacket(port, txpacket)
567  if result == COMM_SUCCESS:
568  error = rxpacket[PKT_ERROR]
569 
570  data.extend(rxpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length])
571 
572  return data, result, error
573 
574  def read1ByteTx(self, port, dxl_id, address):
575  return self.readTx(port, dxl_id, address, 1)
576 
577  def read1ByteRx(self, port, dxl_id):
578  data, result, error = self.readRx(port, dxl_id, 1)
579  data_read = data[0] if (result == COMM_SUCCESS) else 0
580  return data_read, result, error
581 
582  def read1ByteTxRx(self, port, dxl_id, address):
583  data, result, error = self.readTxRx(port, dxl_id, address, 1)
584  data_read = data[0] if (result == COMM_SUCCESS) else 0
585  return data_read, result, error
586 
587  def read2ByteTx(self, port, dxl_id, address):
588  return self.readTx(port, dxl_id, address, 2)
589 
590  def read2ByteRx(self, port, dxl_id):
591  data, result, error = self.readRx(port, dxl_id, 2)
592  data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
593  return data_read, result, error
594 
595  def read2ByteTxRx(self, port, dxl_id, address):
596  data, result, error = self.readTxRx(port, dxl_id, address, 2)
597  data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
598  return data_read, result, error
599 
600  def read4ByteTx(self, port, dxl_id, address):
601  return self.readTx(port, dxl_id, address, 4)
602 
603  def read4ByteRx(self, port, dxl_id):
604  data, result, error = self.readRx(port, dxl_id, 4)
605  data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
606  DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
607  return data_read, result, error
608 
609  def read4ByteTxRx(self, port, dxl_id, address):
610  data, result, error = self.readTxRx(port, dxl_id, address, 4)
611  data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
612  DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
613  return data_read, result, error
614 
615  def writeTxOnly(self, port, dxl_id, address, length, data):
616  txpacket = [0] * (length + 12)
617 
618  txpacket[PKT_ID] = dxl_id
619  txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
620  txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
621  txpacket[PKT_INSTRUCTION] = INST_WRITE
622  txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
623  txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
624 
625  txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
626 
627  result = self.txPacket(port, txpacket)
628  port.is_using = False
629 
630  return result
631 
632  def writeTxRx(self, port, dxl_id, address, length, data):
633  txpacket = [0] * (length + 12)
634 
635  txpacket[PKT_ID] = dxl_id
636  txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
637  txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
638  txpacket[PKT_INSTRUCTION] = INST_WRITE
639  txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
640  txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
641 
642  txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
643  rxpacket, result, error = self.txRxPacket(port, txpacket)
644 
645  return result, error
646 
647  def write1ByteTxOnly(self, port, dxl_id, address, data):
648  data_write = [data]
649  return self.writeTxOnly(port, dxl_id, address, 1, data_write)
650 
651  def write1ByteTxRx(self, port, dxl_id, address, data):
652  data_write = [data]
653  return self.writeTxRx(port, dxl_id, address, 1, data_write)
654 
655  def write2ByteTxOnly(self, port, dxl_id, address, data):
656  data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
657  return self.writeTxOnly(port, dxl_id, address, 2, data_write)
658 
659  def write2ByteTxRx(self, port, dxl_id, address, data):
660  data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
661  return self.writeTxRx(port, dxl_id, address, 2, data_write)
662 
663  def write4ByteTxOnly(self, port, dxl_id, address, data):
664  data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
665  DXL_HIBYTE(DXL_LOWORD(data)),
666  DXL_LOBYTE(DXL_HIWORD(data)),
667  DXL_HIBYTE(DXL_HIWORD(data))]
668  return self.writeTxOnly(port, dxl_id, address, 4, data_write)
669 
670  def write4ByteTxRx(self, port, dxl_id, address, data):
671  data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
672  DXL_HIBYTE(DXL_LOWORD(data)),
673  DXL_LOBYTE(DXL_HIWORD(data)),
674  DXL_HIBYTE(DXL_HIWORD(data))]
675  return self.writeTxRx(port, dxl_id, address, 4, data_write)
676 
677  def regWriteTxOnly(self, port, dxl_id, address, length, data):
678  txpacket = [0] * (length + 12)
679 
680  txpacket[PKT_ID] = dxl_id
681  txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
682  txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
683  txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
684  txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
685  txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
686 
687  txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
688 
689  result = self.txPacket(port, txpacket)
690  port.is_using = False
691 
692  return result
693 
694  def regWriteTxRx(self, port, dxl_id, address, length, data):
695  txpacket = [0] * (length + 12)
696 
697  txpacket[PKT_ID] = dxl_id
698  txpacket[PKT_LENGTH_L] = DXL_LOBYTE(length + 5)
699  txpacket[PKT_LENGTH_H] = DXL_HIBYTE(length + 5)
700  txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
701  txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(address)
702  txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(address)
703 
704  txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + length] = data[0: length]
705 
706  _, result, error = self.txRxPacket(port, txpacket)
707 
708  return result, error
709 
710  def syncReadTx(self, port, start_address, data_length, param, param_length):
711  txpacket = [0] * (param_length + 14)
712  # 14: HEADER0 HEADER1 HEADER2 RESERVED ID LEN_L LEN_H INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
713 
714  txpacket[PKT_ID] = BROADCAST_ID
715  txpacket[PKT_LENGTH_L] = DXL_LOBYTE(
716  param_length + 7) # 7: INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
717  txpacket[PKT_LENGTH_H] = DXL_HIBYTE(
718  param_length + 7) # 7: INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
719  txpacket[PKT_INSTRUCTION] = INST_SYNC_READ
720  txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(start_address)
721  txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(start_address)
722  txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(data_length)
723  txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(data_length)
724 
725  txpacket[PKT_PARAMETER0 + 4: PKT_PARAMETER0 + 4 + param_length] = param[0: param_length]
726 
727  result = self.txPacket(port, txpacket)
728  if result == COMM_SUCCESS:
729  port.setPacketTimeout((11 + data_length) * param_length)
730 
731  return result
732 
733  def syncWriteTxOnly(self, port, start_address, data_length, param, param_length):
734  txpacket = [0] * (param_length + 14)
735  # 14: HEADER0 HEADER1 HEADER2 RESERVED ID LEN_L LEN_H INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
736 
737  txpacket[PKT_ID] = BROADCAST_ID
738  txpacket[PKT_LENGTH_L] = DXL_LOBYTE(
739  param_length + 7) # 7: INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
740  txpacket[PKT_LENGTH_H] = DXL_HIBYTE(
741  param_length + 7) # 7: INST START_ADDR_L START_ADDR_H DATA_LEN_L DATA_LEN_H CRC16_L CRC16_H
742  txpacket[PKT_INSTRUCTION] = INST_SYNC_WRITE
743  txpacket[PKT_PARAMETER0 + 0] = DXL_LOBYTE(start_address)
744  txpacket[PKT_PARAMETER0 + 1] = DXL_HIBYTE(start_address)
745  txpacket[PKT_PARAMETER0 + 2] = DXL_LOBYTE(data_length)
746  txpacket[PKT_PARAMETER0 + 3] = DXL_HIBYTE(data_length)
747 
748  txpacket[PKT_PARAMETER0 + 4: PKT_PARAMETER0 + 4 + param_length] = param[0: param_length]
749 
750  _, result, _ = self.txRxPacket(port, txpacket)
751 
752  return result
753 
754  def bulkReadTx(self, port, param, param_length):
755  txpacket = [0] * (param_length + 10)
756  # 10: HEADER0 HEADER1 HEADER2 RESERVED ID LEN_L LEN_H INST CRC16_L CRC16_H
757 
758  txpacket[PKT_ID] = BROADCAST_ID
759  txpacket[PKT_LENGTH_L] = DXL_LOBYTE(param_length + 3) # 3: INST CRC16_L CRC16_H
760  txpacket[PKT_LENGTH_H] = DXL_HIBYTE(param_length + 3) # 3: INST CRC16_L CRC16_H
761  txpacket[PKT_INSTRUCTION] = INST_BULK_READ
762 
763  txpacket[PKT_PARAMETER0: PKT_PARAMETER0 + param_length] = param[0: param_length]
764 
765  result = self.txPacket(port, txpacket)
766  if result == COMM_SUCCESS:
767  wait_length = 0
768  i = 0
769  while i < param_length:
770  wait_length += DXL_MAKEWORD(param[i + 3], param[i + 4]) + 10
771  i += 5
772  port.setPacketTimeout(wait_length)
773 
774  return result
775 
776  def bulkWriteTxOnly(self, port, param, param_length):
777  txpacket = [0] * (param_length + 10)
778  # 10: HEADER0 HEADER1 HEADER2 RESERVED ID LEN_L LEN_H INST CRC16_L CRC16_H
779 
780  txpacket[PKT_ID] = BROADCAST_ID
781  txpacket[PKT_LENGTH_L] = DXL_LOBYTE(param_length + 3) # 3: INST CRC16_L CRC16_H
782  txpacket[PKT_LENGTH_H] = DXL_HIBYTE(param_length + 3) # 3: INST CRC16_L CRC16_H
783  txpacket[PKT_INSTRUCTION] = INST_BULK_WRITE
784 
785  txpacket[PKT_PARAMETER0: PKT_PARAMETER0 + param_length] = param[0: param_length]
786 
787  _, result, _ = self.txRxPacket(port, txpacket)
788 
789  return result
#define DXL_MAKEDWORD(a, b)
def writeTxRx(self, port, dxl_id, address, length, data)
def regWriteTxOnly(self, port, dxl_id, address, length, data)
def regWriteTxRx(self, port, dxl_id, address, length, data)
#define DXL_MAKEWORD(a, b)
def syncWriteTxOnly(self, port, start_address, data_length, param, param_length)
def updateCRC(self, crc_accum, data_blk_ptr, data_blk_size)
#define DXL_LOBYTE(w)
#define DXL_HIWORD(l)
def syncReadTx(self, port, start_address, data_length, param, param_length)
#define DXL_HIBYTE(w)
#define DXL_LOWORD(l)
def writeTxOnly(self, port, dxl_id, address, length, data)


dynamixel_sdk
Author(s): Gilbert , Zerom , Darby Lim , Leon
autogenerated on Fri Apr 16 2021 02:25:55