24 from std_msgs.msg
import Float64
25 from sensor_msgs.msg
import JointState
26 from sr_robot_msgs.srv
import ForceController
27 from sr_robot_msgs.msg
import EthercatDebug
28 from sr_utilities.hand_finder
import HandFinder
34 Useful python library to communicate with the etherCAT hand. 36 sensors = [
"FFJ1",
"FFJ2",
"FFJ3",
"FFJ4",
37 "MFJ1",
"MFJ2",
"MFJ3",
"MFJ4",
38 "RFJ1",
"RFJ2",
"RFJ3",
"RFJ4",
39 "LFJ1",
"LFJ2",
"LFJ3",
"LFJ4",
"LFJ5",
40 "THJ1",
"THJ2",
"THJ3",
"THJ4",
"THJ5A",
"THJ5B",
41 "WRJ1A",
"WRJ1B",
"WRJ2",
42 "ACCX",
"ACCY",
"ACCZ",
43 "GYRX",
"GYRY",
"GYRZ",
44 "AN0",
"AN1",
"AN2",
"AN3"]
48 Useful python library to communicate with the etherCAT hand. 53 if len(self.hand_params.mapping)
is not 0:
54 self.
hand_id = self.hand_params.mapping.itervalues().next()
66 self.msg_to_send.data = 0.0
71 joint_to_sensor_mapping = []
73 rospy.wait_for_message(self.
hand_id +
"/debug_etherCAT_data",
74 EthercatDebug, timeout=0.2)
76 joint_to_sensor_mapping \
77 = rospy.get_param(self.
hand_id +
"/joint_to_sensor_mapping")
79 rospy.logwarn(
"The parameter joint_to_sensor_mapping " 80 "was not found, you won't be able to get the " 81 "raw values from the EtherCAT compound sensors.")
85 for mapping
in joint_to_sensor_mapping:
87 if "THJ5" in mapping[1][0]:
88 self.
compounds[
"THJ5"] = [[
"THJ5A", mapping[1][1]],
89 [
"THJ5B", mapping[2][1]]]
90 elif "WRJ1" in mapping[1][0]:
91 self.
compounds[
"WRJ1"] = [[
"WRJ1A", mapping[1][1]],
92 [
"WRJ1B", mapping[2][1]]]
94 def sendupdate(self, joint_name, target, controller_type="effort"):
96 topic =
"sh_" + joint_name.lower() +
"_" + controller_type \
97 +
"_controller/command" 98 self.
publishers[joint_name] = rospy.Publisher(topic, Float64)
100 self.msg_to_send.data = math.radians(float(target))
111 m = re.match(
"(?P<finger>\w{2})J0", joint_name)
113 value = self.
positions[m.group(
"finger") +
"J1"] +\
126 m = re.match(
"(?P<finger>\w{2})J0", joint_name)
128 value = self.
velocities[m.group(
"finger") +
"J1"] + \
137 value = self.
efforts[joint_name]
143 m = re.match(
"(?P<finger>\w{2})J0", joint_name)
145 value = self.
efforts[m.group(
"finger") +
"J1"]
156 def set_pid(self, joint_name, pid_parameters):
160 service_name =
"sr_hand_robot/change_force_PID_" + joint_name
161 self.
pid_services[joint_name] = rospy.ServiceProxy(service_name,
164 self.
pid_services[joint_name](pid_parameters[
"max_pwm"],
165 pid_parameters[
"sgleftref"],
166 pid_parameters[
"sgrightref"],
171 pid_parameters[
"imax"],
172 pid_parameters[
"deadband"],
173 pid_parameters[
"sign"])
176 if not(all(v == 0
for v
in msg.sensors)):
180 for name, pos, vel, effort
in \
181 zip(msg.name, msg.position, msg.velocity, msg.effort):
193 rospy.logwarn(
"No values received from the etherCAT hand.")
197 if sensor_name
in self.compounds.keys():
198 for sub_compound
in self.
compounds[sensor_name]:
199 index = self.sensors.index(sub_compound[0])
200 value = value + (self.
raw_values[index] * sub_compound[1])
202 index = self.sensors.index(sensor_name)
211 Get the average raw value for the given sensor, average on 215 while len(tmp_raw_values) < number_of_samples:
217 if accept_zeros
or value > 0.0:
218 tmp_raw_values.append(value)
220 average = float(sum(tmp_raw_values)) / len(tmp_raw_values)
227 rospy.wait_for_message(self.
hand_id +
"/debug_etherCAT_data",
228 EthercatDebug, timeout=0.2)
229 rospy.wait_for_message(
"joint_states", JointState, timeout=0.2)
234 rospy.Subscriber(self.
hand_id +
"/debug_etherCAT_data",
237 rospy.Subscriber(
"joint_states",
245 rospy.wait_for_message(
"joint_states", JointState, timeout=0.2)
247 rospy.Subscriber(
"joint_states", JointState,
256 self.debug_subscriber.unregister()
260 self.joint_state_subscriber.unregister()
def get_effort(self, joint_name)
def get_position(self, joint_name)
def get_velocity(self, joint_name)
def get_raw_value(self, sensor_name)
def activate_joint_states(self)
def set_pid(self, joint_name, pid_parameters)
def get_average_raw_value(self, sensor_name, number_of_samples=10, accept_zeros=True)
def stop_record(self, joint_name)
def sendupdate(self, joint_name, target, controller_type="effort")
def debug_callback(self, msg)
def joint_state_callback(self, msg)
def start_record(self, joint_name, callback)