protocol1_packet_handler.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 ################################################################################
5 # Copyright 2017 ROBOTIS CO., LTD.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 ################################################################################
19 
20 # Author: Ryu Woon Jung (Leon)
21 
22 from .robotis_def import *
23 
24 TXPACKET_MAX_LEN = 250
25 RXPACKET_MAX_LEN = 250
26 
27 # for Protocol 1.0 Packet
28 PKT_HEADER0 = 0
29 PKT_HEADER1 = 1
30 PKT_ID = 2
31 PKT_LENGTH = 3
32 PKT_INSTRUCTION = 4
33 PKT_ERROR = 4
34 PKT_PARAMETER0 = 5
35 
36 # Protocol 1.0 Error bit
37 ERRBIT_VOLTAGE = 1 # Supplied voltage is out of the range (operating volatage set in the control table)
38 ERRBIT_ANGLE = 2 # Goal position is written out of the range (from CW angle limit to CCW angle limit)
39 ERRBIT_OVERHEAT = 4 # Temperature is out of the range (operating temperature set in the control table)
40 ERRBIT_RANGE = 8 # Command(setting value) is out of the range for use.
41 ERRBIT_CHECKSUM = 16 # Instruction packet checksum is incorrect.
42 ERRBIT_OVERLOAD = 32 # The current load cannot be controlled by the set torque.
43 ERRBIT_INSTRUCTION = 64 # Undefined instruction or delivering the action command without the reg_write command.
44 
45 
46 class Protocol1PacketHandler(object):
47  def getProtocolVersion(self):
48  return 1.0
49 
50  def getTxRxResult(self, result):
51  if result == COMM_SUCCESS:
52  return "[TxRxResult] Communication success!"
53  elif result == COMM_PORT_BUSY:
54  return "[TxRxResult] Port is in use!"
55  elif result == COMM_TX_FAIL:
56  return "[TxRxResult] Failed transmit instruction packet!"
57  elif result == COMM_RX_FAIL:
58  return "[TxRxResult] Failed get status packet from device!"
59  elif result == COMM_TX_ERROR:
60  return "[TxRxResult] Incorrect instruction packet!"
61  elif result == COMM_RX_WAITING:
62  return "[TxRxResult] Now receiving status packet!"
63  elif result == COMM_RX_TIMEOUT:
64  return "[TxRxResult] There is no status packet!"
65  elif result == COMM_RX_CORRUPT:
66  return "[TxRxResult] Incorrect status packet!"
67  elif result == COMM_NOT_AVAILABLE:
68  return "[TxRxResult] Protocol does not support this function!"
69  else:
70  return ""
71 
72  def getRxPacketError(self, error):
73  if error & ERRBIT_VOLTAGE:
74  return "[RxPacketError] Input voltage error!"
75 
76  if error & ERRBIT_ANGLE:
77  return "[RxPacketError] Angle limit error!"
78 
79  if error & ERRBIT_OVERHEAT:
80  return "[RxPacketError] Overheat error!"
81 
82  if error & ERRBIT_RANGE:
83  return "[RxPacketError] Out of range error!"
84 
85  if error & ERRBIT_CHECKSUM:
86  return "[RxPacketError] Checksum error!"
87 
88  if error & ERRBIT_OVERLOAD:
89  return "[RxPacketError] Overload error!"
90 
91  if error & ERRBIT_INSTRUCTION:
92  return "[RxPacketError] Instruction code error!"
93 
94  return ""
95 
96  def txPacket(self, port, txpacket):
97  checksum = 0
98  total_packet_length = txpacket[PKT_LENGTH] + 4 # 4: HEADER0 HEADER1 ID LENGTH
99 
100  if port.is_using:
101  return COMM_PORT_BUSY
102  port.is_using = True
103 
104  # check max packet length
105  if total_packet_length > TXPACKET_MAX_LEN:
106  port.is_using = False
107  return COMM_TX_ERROR
108 
109  # make packet header
110  txpacket[PKT_HEADER0] = 0xFF
111  txpacket[PKT_HEADER1] = 0xFF
112 
113  # add a checksum to the packet
114  for idx in range(2, total_packet_length - 1): # except header, checksum
115  checksum += txpacket[idx]
116 
117  txpacket[total_packet_length - 1] = ~checksum & 0xFF
118 
119  #print "[TxPacket] %r" % txpacket
120 
121  # tx packet
122  port.clearPort()
123  written_packet_length = port.writePort(txpacket)
124  if total_packet_length != written_packet_length:
125  port.is_using = False
126  return COMM_TX_FAIL
127 
128  return COMM_SUCCESS
129 
130  def rxPacket(self, port):
131  rxpacket = []
132 
133  result = COMM_TX_FAIL
134  checksum = 0
135  rx_length = 0
136  wait_length = 6 # minimum length (HEADER0 HEADER1 ID LENGTH ERROR CHKSUM)
137 
138  while True:
139  rxpacket.extend(port.readPort(wait_length - rx_length))
140  rx_length = len(rxpacket)
141  if rx_length >= wait_length:
142  # find packet header
143  for idx in range(0, (rx_length - 1)):
144  if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF):
145  break
146 
147  if idx == 0: # found at the beginning of the packet
148  if (rxpacket[PKT_ID] > 0xFD) or (rxpacket[PKT_LENGTH] > RXPACKET_MAX_LEN) or (
149  rxpacket[PKT_ERROR] > 0x7F):
150  # unavailable ID or unavailable Length or unavailable Error
151  # remove the first byte in the packet
152  del rxpacket[0]
153  rx_length -= 1
154  continue
155 
156  # re-calculate the exact length of the rx packet
157  if wait_length != (rxpacket[PKT_LENGTH] + PKT_LENGTH + 1):
158  wait_length = rxpacket[PKT_LENGTH] + PKT_LENGTH + 1
159  continue
160 
161  if rx_length < wait_length:
162  # check timeout
163  if port.isPacketTimeout():
164  if rx_length == 0:
165  result = COMM_RX_TIMEOUT
166  else:
167  result = COMM_RX_CORRUPT
168  break
169  else:
170  continue
171 
172  # calculate checksum
173  for i in range(2, wait_length - 1): # except header, checksum
174  checksum += rxpacket[i]
175  checksum = ~checksum & 0xFF
176 
177  # verify checksum
178  if rxpacket[wait_length - 1] == checksum:
179  result = COMM_SUCCESS
180  else:
181  result = COMM_RX_CORRUPT
182  break
183 
184  else:
185  # remove unnecessary packets
186  del rxpacket[0: idx]
187  rx_length -= idx
188 
189  else:
190  # check timeout
191  if port.isPacketTimeout():
192  if rx_length == 0:
193  result = COMM_RX_TIMEOUT
194  else:
195  result = COMM_RX_CORRUPT
196  break
197 
198  port.is_using = False
199 
200  #print "[RxPacket] %r" % rxpacket
201 
202  return rxpacket, result
203 
204  # NOT for BulkRead
205  def txRxPacket(self, port, txpacket):
206  rxpacket = None
207  error = 0
208 
209  # tx packet
210  result = self.txPacket(port, txpacket)
211  if result != COMM_SUCCESS:
212  return rxpacket, result, error
213 
214  # (Instruction == BulkRead) == this function is not available.
215  if txpacket[PKT_INSTRUCTION] == INST_BULK_READ:
216  result = COMM_NOT_AVAILABLE
217 
218  # (ID == Broadcast ID) == no need to wait for status packet or not available
219  if (txpacket[PKT_ID] == BROADCAST_ID):
220  port.is_using = False
221  return rxpacket, result, error
222 
223  # set packet timeout
224  if txpacket[PKT_INSTRUCTION] == INST_READ:
225  port.setPacketTimeout(txpacket[PKT_PARAMETER0 + 1] + 6)
226  else:
227  port.setPacketTimeout(6) # HEADER0 HEADER1 ID LENGTH ERROR CHECKSUM
228 
229  # rx packet
230  while True:
231  rxpacket, result = self.rxPacket(port)
232  if result != COMM_SUCCESS or txpacket[PKT_ID] == rxpacket[PKT_ID]:
233  break
234 
235  if result == COMM_SUCCESS and txpacket[PKT_ID] == rxpacket[PKT_ID]:
236  error = rxpacket[PKT_ERROR]
237 
238  return rxpacket, result, error
239 
240  def ping(self, port, dxl_id):
241  model_number = 0
242  error = 0
243 
244  txpacket = [0] * 6
245 
246  if dxl_id >= BROADCAST_ID:
247  return model_number, COMM_NOT_AVAILABLE, error
248 
249  txpacket[PKT_ID] = dxl_id
250  txpacket[PKT_LENGTH] = 2
251  txpacket[PKT_INSTRUCTION] = INST_PING
252 
253  rxpacket, result, error = self.txRxPacket(port, txpacket)
254 
255  if result == COMM_SUCCESS:
256  data_read, result, error = self.readTxRx(port, dxl_id, 0, 2) # Address 0 : Model Number
257  if result == COMM_SUCCESS:
258  model_number = DXL_MAKEWORD(data_read[0], data_read[1])
259 
260  return model_number, result, error
261 
262  def broadcastPing(self, port):
263  data_list = None
264  return data_list, COMM_NOT_AVAILABLE
265 
266  def action(self, port, dxl_id):
267  txpacket = [0] * 6
268 
269  txpacket[PKT_ID] = dxl_id
270  txpacket[PKT_LENGTH] = 2
271  txpacket[PKT_INSTRUCTION] = INST_ACTION
272 
273  _, result, _ = self.txRxPacket(port, txpacket)
274 
275  return result
276 
277  def reboot(self, port, dxl_id):
278  return COMM_NOT_AVAILABLE, 0
279 
280  def factoryReset(self, port, dxl_id):
281  txpacket = [0] * 6
282 
283  txpacket[PKT_ID] = dxl_id
284  txpacket[PKT_LENGTH] = 2
285  txpacket[PKT_INSTRUCTION] = INST_FACTORY_RESET
286 
287  _, result, error = self.txRxPacket(port, txpacket)
288 
289  return result, error
290 
291  def readTx(self, port, dxl_id, address, length):
292 
293  txpacket = [0] * 8
294 
295  if dxl_id >= BROADCAST_ID:
296  return COMM_NOT_AVAILABLE
297 
298  txpacket[PKT_ID] = dxl_id
299  txpacket[PKT_LENGTH] = 4
300  txpacket[PKT_INSTRUCTION] = INST_READ
301  txpacket[PKT_PARAMETER0 + 0] = address
302  txpacket[PKT_PARAMETER0 + 1] = length
303 
304  result = self.txPacket(port, txpacket)
305 
306  # set packet timeout
307  if result == COMM_SUCCESS:
308  port.setPacketTimeout(length + 6)
309 
310  return result
311 
312  def readRx(self, port, dxl_id, length):
313  result = COMM_TX_FAIL
314  error = 0
315 
316  rxpacket = None
317  data = []
318 
319  while True:
320  rxpacket, result = self.rxPacket(port)
321 
322  if result != COMM_SUCCESS or rxpacket[PKT_ID] == dxl_id:
323  break
324 
325  if result == COMM_SUCCESS and rxpacket[PKT_ID] == dxl_id:
326  error = rxpacket[PKT_ERROR]
327 
328  data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])
329 
330  return data, result, error
331 
332  def readTxRx(self, port, dxl_id, address, length):
333  txpacket = [0] * 8
334  data = []
335 
336  if dxl_id >= BROADCAST_ID:
337  return data, COMM_NOT_AVAILABLE, 0
338 
339  txpacket[PKT_ID] = dxl_id
340  txpacket[PKT_LENGTH] = 4
341  txpacket[PKT_INSTRUCTION] = INST_READ
342  txpacket[PKT_PARAMETER0 + 0] = address
343  txpacket[PKT_PARAMETER0 + 1] = length
344 
345  rxpacket, result, error = self.txRxPacket(port, txpacket)
346  if result == COMM_SUCCESS:
347  error = rxpacket[PKT_ERROR]
348 
349  data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])
350 
351  return data, result, error
352 
353  def read1ByteTx(self, port, dxl_id, address):
354  return self.readTx(port, dxl_id, address, 1)
355 
356  def read1ByteRx(self, port, dxl_id):
357  data, result, error = self.readRx(port, dxl_id, 1)
358  data_read = data[0] if (result == COMM_SUCCESS) else 0
359  return data_read, result, error
360 
361  def read1ByteTxRx(self, port, dxl_id, address):
362  data, result, error = self.readTxRx(port, dxl_id, address, 1)
363  data_read = data[0] if (result == COMM_SUCCESS) else 0
364  return data_read, result, error
365 
366  def read2ByteTx(self, port, dxl_id, address):
367  return self.readTx(port, dxl_id, address, 2)
368 
369  def read2ByteRx(self, port, dxl_id):
370  data, result, error = self.readRx(port, dxl_id, 2)
371  data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
372  return data_read, result, error
373 
374  def read2ByteTxRx(self, port, dxl_id, address):
375  data, result, error = self.readTxRx(port, dxl_id, address, 2)
376  data_read = DXL_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
377  return data_read, result, error
378 
379  def read4ByteTx(self, port, dxl_id, address):
380  return self.readTx(port, dxl_id, address, 4)
381 
382  def read4ByteRx(self, port, dxl_id):
383  data, result, error = self.readRx(port, dxl_id, 4)
384  data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
385  DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
386  return data_read, result, error
387 
388  def read4ByteTxRx(self, port, dxl_id, address):
389  data, result, error = self.readTxRx(port, dxl_id, address, 4)
390  data_read = DXL_MAKEDWORD(DXL_MAKEWORD(data[0], data[1]),
391  DXL_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
392  return data_read, result, error
393 
394  def writeTxOnly(self, port, dxl_id, address, length, data):
395  txpacket = [0] * (length + 7)
396 
397  txpacket[PKT_ID] = dxl_id
398  txpacket[PKT_LENGTH] = length + 3
399  txpacket[PKT_INSTRUCTION] = INST_WRITE
400  txpacket[PKT_PARAMETER0] = address
401 
402  txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
403 
404  result = self.txPacket(port, txpacket)
405  port.is_using = False
406 
407  return result
408 
409  def writeTxRx(self, port, dxl_id, address, length, data):
410  txpacket = [0] * (length + 7)
411 
412  txpacket[PKT_ID] = dxl_id
413  txpacket[PKT_LENGTH] = length + 3
414  txpacket[PKT_INSTRUCTION] = INST_WRITE
415  txpacket[PKT_PARAMETER0] = address
416 
417  txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
418  rxpacket, result, error = self.txRxPacket(port, txpacket)
419 
420  return result, error
421 
422  def write1ByteTxOnly(self, port, dxl_id, address, data):
423  data_write = [data]
424  return self.writeTxOnly(port, dxl_id, address, 1, data_write)
425 
426  def write1ByteTxRx(self, port, dxl_id, address, data):
427  data_write = [data]
428  return self.writeTxRx(port, dxl_id, address, 1, data_write)
429 
430  def write2ByteTxOnly(self, port, dxl_id, address, data):
431  data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
432  return self.writeTxOnly(port, dxl_id, address, 2, data_write)
433 
434  def write2ByteTxRx(self, port, dxl_id, address, data):
435  data_write = [DXL_LOBYTE(data), DXL_HIBYTE(data)]
436  return self.writeTxRx(port, dxl_id, address, 2, data_write)
437 
438  def write4ByteTxOnly(self, port, dxl_id, address, data):
439  data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
440  DXL_HIBYTE(DXL_LOWORD(data)),
441  DXL_LOBYTE(DXL_HIWORD(data)),
442  DXL_HIBYTE(DXL_HIWORD(data))]
443  return self.writeTxOnly(port, dxl_id, address, 4, data_write)
444 
445  def write4ByteTxRx(self, port, dxl_id, address, data):
446  data_write = [DXL_LOBYTE(DXL_LOWORD(data)),
447  DXL_HIBYTE(DXL_LOWORD(data)),
448  DXL_LOBYTE(DXL_HIWORD(data)),
449  DXL_HIBYTE(DXL_HIWORD(data))]
450  return self.writeTxRx(port, dxl_id, address, 4, data_write)
451 
452  def regWriteTxOnly(self, port, dxl_id, address, length, data):
453  txpacket = [0] * (length + 7)
454 
455  txpacket[PKT_ID] = dxl_id
456  txpacket[PKT_LENGTH] = length + 3
457  txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
458  txpacket[PKT_PARAMETER0] = address
459 
460  txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
461 
462  result = self.txPacket(port, txpacket)
463  port.is_using = False
464 
465  return result
466 
467  def regWriteTxRx(self, port, dxl_id, address, length, data):
468  txpacket = [0] * (length + 7)
469 
470  txpacket[PKT_ID] = dxl_id
471  txpacket[PKT_LENGTH] = length + 3
472  txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
473  txpacket[PKT_PARAMETER0] = address
474 
475  txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
476 
477  _, result, error = self.txRxPacket(port, txpacket)
478 
479  return result, error
480 
481  def syncReadTx(self, port, start_address, data_length, param, param_length):
482  return COMM_NOT_AVAILABLE
483 
484  def syncWriteTxOnly(self, port, start_address, data_length, param, param_length):
485  txpacket = [0] * (param_length + 8)
486  # 8: HEADER0 HEADER1 ID LEN INST START_ADDR DATA_LEN ... CHKSUM
487 
488  txpacket[PKT_ID] = BROADCAST_ID
489  txpacket[PKT_LENGTH] = param_length + 4 # 4: INST START_ADDR DATA_LEN ... CHKSUM
490  txpacket[PKT_INSTRUCTION] = INST_SYNC_WRITE
491  txpacket[PKT_PARAMETER0 + 0] = start_address
492  txpacket[PKT_PARAMETER0 + 1] = data_length
493 
494  txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + param_length] = param[0: param_length]
495 
496  _, result, _ = self.txRxPacket(port, txpacket)
497 
498  return result
499 
500  def bulkReadTx(self, port, param, param_length):
501  txpacket = [0] * (param_length + 7)
502  # 7: HEADER0 HEADER1 ID LEN INST 0x00 ... CHKSUM
503 
504  txpacket[PKT_ID] = BROADCAST_ID
505  txpacket[PKT_LENGTH] = param_length + 3 # 3: INST 0x00 ... CHKSUM
506  txpacket[PKT_INSTRUCTION] = INST_BULK_READ
507  txpacket[PKT_PARAMETER0 + 0] = 0x00
508 
509  txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + param_length] = param[0: param_length]
510 
511  result = self.txPacket(port, txpacket)
512  if result == COMM_SUCCESS:
513  wait_length = 0
514  i = 0
515  while i < param_length:
516  wait_length += param[i] + 7
517  i += 3
518  port.setPacketTimeout(wait_length)
519 
520  return result
521 
522  def bulkWriteTxOnly(self, port, param, param_length):
523  return COMM_NOT_AVAILABLE
#define DXL_MAKEDWORD(a, b)
def regWriteTxOnly(self, port, dxl_id, address, length, data)
#define DXL_MAKEWORD(a, b)
def syncWriteTxOnly(self, port, start_address, data_length, param, param_length)
def regWriteTxRx(self, port, dxl_id, address, length, data)
#define DXL_LOBYTE(w)
def syncReadTx(self, port, start_address, data_length, param, param_length)
#define DXL_HIWORD(l)
#define DXL_HIBYTE(w)
def writeTxRx(self, port, dxl_id, address, length, data)
def writeTxOnly(self, port, dxl_id, address, length, data)
#define DXL_LOWORD(l)


dynamixel_sdk
Author(s): Gilbert , Zerom , Darby Lim , Leon
autogenerated on Fri Apr 16 2021 02:25:55