group_sync_read.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 
19 
20 # Author: Ryu Woon Jung (Leon)
21 
22 from .robotis_def import *
23 
24 
26  def __init__(self, port, ph, start_address, data_length):
27  self.port = port
28  self.ph = ph
29  self.start_address = start_address
30  self.data_length = data_length
31 
32  self.last_result = False
33  self.is_param_changed = False
34  self.param = []
35  self.data_dict = {}
36 
37  self.clearParam()
38 
39  def makeParam(self):
40  if self.ph.getProtocolVersion() == 1.0:
41  return
42 
43  if not self.data_dict: # len(self.data_dict.keys()) == 0:
44  return
45 
46  self.param = []
47 
48  for dxl_id in self.data_dict:
49  self.param.append(dxl_id)
50 
51  def addParam(self, dxl_id):
52  if self.ph.getProtocolVersion() == 1.0:
53  return False
54 
55  if dxl_id in self.data_dict: # dxl_id already exist
56  return False
57 
58  self.data_dict[dxl_id] = [] # [0] * self.data_length
59 
60  self.is_param_changed = True
61  return True
62 
63  def removeParam(self, dxl_id):
64  if self.ph.getProtocolVersion() == 1.0:
65  return
66 
67  if dxl_id not in self.data_dict: # NOT exist
68  return
69 
70  del self.data_dict[dxl_id]
71 
72  self.is_param_changed = True
73 
74  def clearParam(self):
75  if self.ph.getProtocolVersion() == 1.0:
76  return
77 
78  self.data_dict.clear()
79 
80  def txPacket(self):
81  if self.ph.getProtocolVersion() == 1.0 or len(self.data_dict.keys()) == 0:
82  return COMM_NOT_AVAILABLE
83 
84  if self.is_param_changed is True or not self.param:
85  self.makeParam()
86 
87  return self.ph.syncReadTx(self.port, self.start_address, self.data_length, self.param,
88  len(self.data_dict.keys()) * 1)
89 
90  def rxPacket(self):
91  self.last_result = False
92 
93  if self.ph.getProtocolVersion() == 1.0:
94  return COMM_NOT_AVAILABLE
95 
96  result = COMM_RX_FAIL
97 
98  if len(self.data_dict.keys()) == 0:
99  return COMM_NOT_AVAILABLE
100 
101  for dxl_id in self.data_dict:
102  self.data_dict[dxl_id], result, _ = self.ph.readRx(self.port, dxl_id, self.data_length)
103  if result != COMM_SUCCESS:
104  return result
105 
106  if result == COMM_SUCCESS:
107  self.last_result = True
108 
109  return result
110 
111  def txRxPacket(self):
112  if self.ph.getProtocolVersion() == 1.0:
113  return COMM_NOT_AVAILABLE
114 
115  result = self.txPacket()
116  if result != COMM_SUCCESS:
117  return result
118 
119  return self.rxPacket()
120 
121  def isAvailable(self, dxl_id, address, data_length):
122  if self.ph.getProtocolVersion() == 1.0 or self.last_result is False or dxl_id not in self.data_dict:
123  return False
124 
125  if (address < self.start_address) or (self.start_address + self.data_length - data_length < address):
126  return False
127 
128  return True
129 
130  def getData(self, dxl_id, address, data_length):
131  if not self.isAvailable(dxl_id, address, data_length):
132  return 0
133 
134  if data_length == 1:
135  return self.data_dict[dxl_id][address - self.start_address]
136  elif data_length == 2:
137  return DXL_MAKEWORD(self.data_dict[dxl_id][address - self.start_address],
138  self.data_dict[dxl_id][address - self.start_address + 1])
139  elif data_length == 4:
140  return DXL_MAKEDWORD(DXL_MAKEWORD(self.data_dict[dxl_id][address - self.start_address + 0],
141  self.data_dict[dxl_id][address - self.start_address + 1]),
142  DXL_MAKEWORD(self.data_dict[dxl_id][address - self.start_address + 2],
143  self.data_dict[dxl_id][address - self.start_address + 3]))
144  else:
145  return 0
dynamixel_sdk.group_sync_read.GroupSyncRead.is_param_changed
is_param_changed
Definition: group_sync_read.py:33
dynamixel_sdk.group_sync_read.GroupSyncRead.__init__
def __init__(self, port, ph, start_address, data_length)
Definition: group_sync_read.py:26
dynamixel_sdk.group_sync_read.GroupSyncRead.ph
ph
Definition: group_sync_read.py:28
dynamixel_sdk.group_sync_read.GroupSyncRead.param
param
Definition: group_sync_read.py:34
dynamixel_sdk.group_sync_read.GroupSyncRead
Definition: group_sync_read.py:25
dynamixel_sdk.group_sync_read.GroupSyncRead.data_dict
data_dict
Definition: group_sync_read.py:35
dynamixel_sdk.group_sync_read.GroupSyncRead.removeParam
def removeParam(self, dxl_id)
Definition: group_sync_read.py:63
dynamixel_sdk.group_sync_read.GroupSyncRead.last_result
last_result
Definition: group_sync_read.py:32
dynamixel_sdk.group_sync_read.GroupSyncRead.txRxPacket
def txRxPacket(self)
Definition: group_sync_read.py:111
dynamixel_sdk.group_sync_read.GroupSyncRead.getData
def getData(self, dxl_id, address, data_length)
Definition: group_sync_read.py:130
dynamixel_sdk.group_sync_read.GroupSyncRead.clearParam
def clearParam(self)
Definition: group_sync_read.py:74
dynamixel_sdk.group_sync_read.GroupSyncRead.addParam
def addParam(self, dxl_id)
Definition: group_sync_read.py:51
dynamixel_sdk.group_sync_read.GroupSyncRead.makeParam
def makeParam(self)
Definition: group_sync_read.py:39
dynamixel_sdk.group_sync_read.GroupSyncRead.start_address
start_address
Definition: group_sync_read.py:29
dynamixel_sdk.group_sync_read.GroupSyncRead.isAvailable
def isAvailable(self, dxl_id, address, data_length)
Definition: group_sync_read.py:121
DXL_MAKEDWORD
#define DXL_MAKEDWORD(a, b)
Definition: packet_handler.h:43
dynamixel_sdk.group_sync_read.GroupSyncRead.txPacket
def txPacket(self)
Definition: group_sync_read.py:80
DXL_MAKEWORD
#define DXL_MAKEWORD(a, b)
Definition: packet_handler.h:42
dynamixel_sdk.group_sync_read.GroupSyncRead.data_length
data_length
Definition: group_sync_read.py:30
dynamixel_sdk.group_sync_read.GroupSyncRead.rxPacket
def rxPacket(self)
Definition: group_sync_read.py:90
dynamixel_sdk.group_sync_read.GroupSyncRead.port
port
Definition: group_sync_read.py:27


dynamixel_sdk
Author(s): Gilbert , Zerom , Darby Lim , Leon
autogenerated on Wed Mar 2 2022 00:13:50