group_bulk_read.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 ################################################################################
5 # Copyright 2017 ROBOTIS CO., LTD.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 ################################################################################
19 
20 # Author: Ryu Woon Jung (Leon)
21 
22 from .robotis_def import *
23 
24 PARAM_NUM_DATA = 0
25 PARAM_NUM_ADDRESS = 1
26 PARAM_NUM_LENGTH = 2
27 
28 
30  def __init__(self, port, ph):
31  self.port = port
32  self.ph = ph
33 
34  self.last_result = False
35  self.is_param_changed = False
36  self.param = []
37  self.data_dict = {}
38 
39  self.clearParam()
40 
41  def makeParam(self):
42  if not self.data_dict:
43  return
44 
45  self.param = []
46 
47  for dxl_id in self.data_dict:
48  if self.ph.getProtocolVersion() == 1.0:
49  self.param.append(self.data_dict[dxl_id][2]) # LEN
50  self.param.append(dxl_id) # ID
51  self.param.append(self.data_dict[dxl_id][1]) # ADDR
52  else:
53  self.param.append(dxl_id) # ID
54  self.param.append(DXL_LOBYTE(self.data_dict[dxl_id][1])) # ADDR_L
55  self.param.append(DXL_HIBYTE(self.data_dict[dxl_id][1])) # ADDR_H
56  self.param.append(DXL_LOBYTE(self.data_dict[dxl_id][2])) # LEN_L
57  self.param.append(DXL_HIBYTE(self.data_dict[dxl_id][2])) # LEN_H
58 
59  def addParam(self, dxl_id, start_address, data_length):
60  if dxl_id in self.data_dict: # dxl_id already exist
61  return False
62 
63  data = [] # [0] * data_length
64  self.data_dict[dxl_id] = [data, start_address, data_length]
65 
66  self.is_param_changed = True
67  return True
68 
69  def removeParam(self, dxl_id):
70  if dxl_id not in self.data_dict: # NOT exist
71  return
72 
73  del self.data_dict[dxl_id]
74 
75  self.is_param_changed = True
76 
77  def clearParam(self):
78  self.data_dict.clear()
79  return
80 
81  def txPacket(self):
82  if len(self.data_dict.keys()) == 0:
83  return COMM_NOT_AVAILABLE
84 
85  if self.is_param_changed is True or not self.param:
86  self.makeParam()
87 
88  if self.ph.getProtocolVersion() == 1.0:
89  return self.ph.bulkReadTx(self.port, self.param, len(self.data_dict.keys()) * 3)
90  else:
91  return self.ph.bulkReadTx(self.port, self.param, len(self.data_dict.keys()) * 5)
92 
93  def rxPacket(self):
94  self.last_result = False
95 
96  result = COMM_RX_FAIL
97 
98  if len(self.data_dict.keys()) == 0:
99  return COMM_NOT_AVAILABLE
100 
101  for dxl_id in self.data_dict:
102  self.data_dict[dxl_id][PARAM_NUM_DATA], result, _ = self.ph.readRx(self.port, dxl_id,
103  self.data_dict[dxl_id][PARAM_NUM_LENGTH])
104  if result != COMM_SUCCESS:
105  return result
106 
107  if result == COMM_SUCCESS:
108  self.last_result = True
109 
110  return result
111 
112  def txRxPacket(self):
113  result = self.txPacket()
114  if result != COMM_SUCCESS:
115  return result
116 
117  return self.rxPacket()
118 
119  def isAvailable(self, dxl_id, address, data_length):
120  if self.last_result is False or dxl_id not in self.data_dict:
121  return False
122 
123  start_addr = self.data_dict[dxl_id][PARAM_NUM_ADDRESS]
124 
125  if (address < start_addr) or (start_addr + self.data_dict[dxl_id][PARAM_NUM_LENGTH] - data_length < address):
126  return False
127 
128  return True
129 
130  def getData(self, dxl_id, address, data_length):
131  if not self.isAvailable(dxl_id, address, data_length):
132  return 0
133 
134  start_addr = self.data_dict[dxl_id][PARAM_NUM_ADDRESS]
135 
136  if data_length == 1:
137  return self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr]
138  elif data_length == 2:
139  return DXL_MAKEWORD(self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr],
140  self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 1])
141  elif data_length == 4:
142  return DXL_MAKEDWORD(DXL_MAKEWORD(self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 0],
143  self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 1]),
144  DXL_MAKEWORD(self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 2],
145  self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 3]))
146  else:
147  return 0
#define DXL_MAKEDWORD(a, b)
def isAvailable(self, dxl_id, address, data_length)
#define DXL_MAKEWORD(a, b)
def getData(self, dxl_id, address, data_length)
#define DXL_LOBYTE(w)
def addParam(self, dxl_id, start_address, data_length)
#define DXL_HIBYTE(w)


dynamixel_sdk
Author(s): Gilbert , Zerom , Darby Lim , Leon
autogenerated on Fri Apr 16 2021 02:25:55