Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 from .robotis_def import *
00023
00024 PARAM_NUM_DATA = 0
00025 PARAM_NUM_ADDRESS = 1
00026 PARAM_NUM_LENGTH = 2
00027
00028
00029 class GroupBulkRead:
00030 def __init__(self, port, ph):
00031 self.port = port
00032 self.ph = ph
00033
00034 self.last_result = False
00035 self.is_param_changed = False
00036 self.param = []
00037 self.data_dict = {}
00038
00039 self.clearParam()
00040
00041 def makeParam(self):
00042 if not self.data_dict:
00043 return
00044
00045 self.param = []
00046
00047 for dxl_id in self.data_dict:
00048 if self.ph.getProtocolVersion() == 1.0:
00049 self.param.append(self.data_dict[dxl_id][2])
00050 self.param.append(dxl_id)
00051 self.param.append(self.data_dict[dxl_id][1])
00052 else:
00053 self.param.append(dxl_id)
00054 self.param.append(DXL_LOBYTE(self.data_dict[dxl_id][1]))
00055 self.param.append(DXL_HIBYTE(self.data_dict[dxl_id][1]))
00056 self.param.append(DXL_LOBYTE(self.data_dict[dxl_id][2]))
00057 self.param.append(DXL_HIBYTE(self.data_dict[dxl_id][2]))
00058
00059 def addParam(self, dxl_id, start_address, data_length):
00060 if dxl_id in self.data_dict:
00061 return False
00062
00063 data = []
00064 self.data_dict[dxl_id] = [data, start_address, data_length]
00065
00066 self.is_param_changed = True
00067 return True
00068
00069 def removeParam(self, dxl_id):
00070 if dxl_id not in self.data_dict:
00071 return
00072
00073 del self.data_dict[dxl_id]
00074
00075 self.is_param_changed = True
00076
00077 def clearParam(self):
00078 self.data_dict.clear()
00079 return
00080
00081 def txPacket(self):
00082 if len(self.data_dict.keys()) == 0:
00083 return COMM_NOT_AVAILABLE
00084
00085 if self.is_param_changed is True or not self.param:
00086 self.makeParam()
00087
00088 if self.ph.getProtocolVersion() == 1.0:
00089 return self.ph.bulkReadTx(self.port, self.param, len(self.data_dict.keys()) * 3)
00090 else:
00091 return self.ph.bulkReadTx(self.port, self.param, len(self.data_dict.keys()) * 5)
00092
00093 def rxPacket(self):
00094 self.last_result = False
00095
00096 result = COMM_RX_FAIL
00097
00098 if len(self.data_dict.keys()) == 0:
00099 return COMM_NOT_AVAILABLE
00100
00101 for dxl_id in self.data_dict:
00102 self.data_dict[dxl_id][PARAM_NUM_DATA], result, _ = self.ph.readRx(self.port, dxl_id,
00103 self.data_dict[dxl_id][PARAM_NUM_LENGTH])
00104 if result != COMM_SUCCESS:
00105 return result
00106
00107 if result == COMM_SUCCESS:
00108 self.last_result = True
00109
00110 return result
00111
00112 def txRxPacket(self):
00113 result = self.txPacket()
00114 if result != COMM_SUCCESS:
00115 return result
00116
00117 return self.rxPacket()
00118
00119 def isAvailable(self, dxl_id, address, data_length):
00120 if self.last_result is False or dxl_id not in self.data_dict:
00121 return False
00122
00123 start_addr = self.data_dict[dxl_id][PARAM_NUM_ADDRESS]
00124
00125 if (address < start_addr) or (start_addr + self.data_dict[dxl_id][PARAM_NUM_LENGTH] - data_length < address):
00126 return False
00127
00128 return True
00129
00130 def getData(self, dxl_id, address, data_length):
00131 if not self.isAvailable(dxl_id, address, data_length):
00132 return 0
00133
00134 start_addr = self.data_dict[dxl_id][PARAM_NUM_ADDRESS]
00135
00136 if data_length == 1:
00137 return self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr]
00138 elif data_length == 2:
00139 return DXL_MAKEWORD(self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr],
00140 self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 1])
00141 elif data_length == 4:
00142 return DXL_MAKEDWORD(DXL_MAKEWORD(self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 0],
00143 self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 1]),
00144 DXL_MAKEWORD(self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 2],
00145 self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 3]))
00146 else:
00147 return 0