group_sync_read.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 ################################################################################
5 # Copyright 2017 ROBOTIS CO., LTD.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 ################################################################################
19 
20 # Author: Ryu Woon Jung (Leon)
21 
22 from .robotis_def import *
23 
24 
26  def __init__(self, port, ph, start_address, data_length):
27  self.port = port
28  self.ph = ph
29  self.start_address = start_address
30  self.data_length = data_length
31 
32  self.last_result = False
33  self.is_param_changed = False
34  self.param = []
35  self.data_dict = {}
36 
37  self.clearParam()
38 
39  def makeParam(self):
40  if self.ph.getProtocolVersion() == 1.0:
41  return
42 
43  if not self.data_dict: # len(self.data_dict.keys()) == 0:
44  return
45 
46  self.param = []
47 
48  for dxl_id in self.data_dict:
49  self.param.append(dxl_id)
50 
51  def addParam(self, dxl_id):
52  if self.ph.getProtocolVersion() == 1.0:
53  return False
54 
55  if dxl_id in self.data_dict: # dxl_id already exist
56  return False
57 
58  self.data_dict[dxl_id] = [] # [0] * self.data_length
59 
60  self.is_param_changed = True
61  return True
62 
63  def removeParam(self, dxl_id):
64  if self.ph.getProtocolVersion() == 1.0:
65  return
66 
67  if dxl_id not in self.data_dict: # NOT exist
68  return
69 
70  del self.data_dict[dxl_id]
71 
72  self.is_param_changed = True
73 
74  def clearParam(self):
75  if self.ph.getProtocolVersion() == 1.0:
76  return
77 
78  self.data_dict.clear()
79 
80  def txPacket(self):
81  if self.ph.getProtocolVersion() == 1.0 or len(self.data_dict.keys()) == 0:
82  return COMM_NOT_AVAILABLE
83 
84  if self.is_param_changed is True or not self.param:
85  self.makeParam()
86 
87  return self.ph.syncReadTx(self.port, self.start_address, self.data_length, self.param,
88  len(self.data_dict.keys()) * 1)
89 
90  def rxPacket(self):
91  self.last_result = False
92 
93  if self.ph.getProtocolVersion() == 1.0:
94  return COMM_NOT_AVAILABLE
95 
96  result = COMM_RX_FAIL
97 
98  if len(self.data_dict.keys()) == 0:
99  return COMM_NOT_AVAILABLE
100 
101  for dxl_id in self.data_dict:
102  self.data_dict[dxl_id], result, _ = self.ph.readRx(self.port, dxl_id, self.data_length)
103  if result != COMM_SUCCESS:
104  return result
105 
106  if result == COMM_SUCCESS:
107  self.last_result = True
108 
109  return result
110 
111  def txRxPacket(self):
112  if self.ph.getProtocolVersion() == 1.0:
113  return COMM_NOT_AVAILABLE
114 
115  result = self.txPacket()
116  if result != COMM_SUCCESS:
117  return result
118 
119  return self.rxPacket()
120 
121  def isAvailable(self, dxl_id, address, data_length):
122  if self.ph.getProtocolVersion() == 1.0 or self.last_result is False or dxl_id not in self.data_dict:
123  return False
124 
125  if (address < self.start_address) or (self.start_address + self.data_length - data_length < address):
126  return False
127 
128  return True
129 
130  def getData(self, dxl_id, address, data_length):
131  if not self.isAvailable(dxl_id, address, data_length):
132  return 0
133 
134  if data_length == 1:
135  return self.data_dict[dxl_id][address - self.start_address]
136  elif data_length == 2:
137  return DXL_MAKEWORD(self.data_dict[dxl_id][address - self.start_address],
138  self.data_dict[dxl_id][address - self.start_address + 1])
139  elif data_length == 4:
140  return DXL_MAKEDWORD(DXL_MAKEWORD(self.data_dict[dxl_id][address - self.start_address + 0],
141  self.data_dict[dxl_id][address - self.start_address + 1]),
142  DXL_MAKEWORD(self.data_dict[dxl_id][address - self.start_address + 2],
143  self.data_dict[dxl_id][address - self.start_address + 3]))
144  else:
145  return 0
#define DXL_MAKEDWORD(a, b)
def getData(self, dxl_id, address, data_length)
#define DXL_MAKEWORD(a, b)
def isAvailable(self, dxl_id, address, data_length)
def __init__(self, port, ph, start_address, data_length)


dynamixel_sdk
Author(s): Gilbert , Zerom , Darby Lim , Leon
autogenerated on Fri Apr 16 2021 02:25:55