35 Author: Maxwell Svetlik. Code snippets used and modified where indicated. 44 from collections
import defaultdict, deque
45 from threading
import Thread
46 from mx_driver
import dynamixel_io
54 from std_msgs.msg
import Bool
55 from trajectory_msgs.msg
import JointTrajectoryPoint
56 from control_msgs.msg
import JointTrajectoryAction, JointTrajectoryGoal, FollowJointTrajectoryAction, FollowJointTrajectoryGoal
57 from svenzva_msgs.msg
import MotorState, MotorStateList, SvenzvaJointAction, SvenzvaJointResult
65 port_name=
'/dev/ttyUSB0',
66 port_namespace=
'revel',
74 rospy.init_node(
'svenzva_driver', anonymous=
False)
81 self.
update_rate = rospy.get_param(
'~update_rate', update_rate)
94 self.
connect(port_name, baud_rate,
False)
101 def connect(self, port_name, baud_rate, readback_echo):
103 self.
dxl_io = dynamixel_io.DynamixelIO(port_name, baud_rate, readback_echo)
104 except dynamixel_io.SerialOpenError, e:
105 rospy.logfatal(e.message)
114 Check if all motors are reachable on the serial port 125 result = self.dxl_io.ping(motor_id)
126 except Exception
as ex:
127 rospy.logerr(
'Exception thrown while pinging motor %d - %s' % (motor_id, ex))
130 self.motors.append(motor_id)
137 counts = defaultdict(int)
140 rospy.loginfo(
'%s, actuator initialization complete.' % status_str[:-2])
146 debug_polling_rate =
False 147 rates = deque([float(self.
update_rate)]*num_events, maxlen=num_events)
148 last_time = rospy.Time.now()
152 rad_per_tick = 6.2831853 / 4096.0
155 while not rospy.is_shutdown():
159 status_ar = self.dxl_io.get_sync_feedback(id_list)
161 for index, state
in enumerate(status_ar):
164 state[
'goal'] = self.
raw_to_rad(state[
'goal'] / gr[index])
165 state[
'position'] = self.
raw_to_rad(state[
'position'] / gr[index])
167 state[
'load'] = (state[
'load'] )
169 motor_states.append(MotorState(**state))
170 if dynamixel_io.exception:
raise dynamixel_io.exception
172 except dynamixel_io.FatalErrorCodeError, fece:
174 except dynamixel_io.NonfatalErrorCodeError, nfece:
176 rospy.logdebug(nfece)
177 except dynamixel_io.ChecksumError, cse:
180 except dynamixel_io.DroppedPacketError, dpe:
183 rospy.loginfo(dpe.message)
185 if ose.errno != errno.EAGAIN:
186 rospy.logfatal(errno.errorcode[ose.errno])
187 rospy.signal_shutdown(errno.errorcode[ose.errno])
192 rospy.logerr(
"Lost connectivitity to servo motors.")
193 rospy.logerr(
"Shutting down driver.")
194 rospy.signal_shutdown(
"Arm connection lost.")
197 msl = MotorStateList()
198 msl.motor_states = motor_states
199 self.motor_states_pub.publish(msl)
204 if debug_polling_rate:
205 current_time = rospy.Time.now()
206 rates.append(1.0 / (current_time - last_time).to_sec())
208 last_time = current_time
209 rospy.loginfo(
"Actual poling rate: %f", self.
actual_rate)
213 This enables velocity control mode. 214 Necessary for cartesian movement for remote controls 217 tup_list_dis = tuple(((1,0),(2,0),(3,0),(4,0),(5,0),(6,0),(7,0)))
218 self.dxl_io.sync_set_torque_enabled(tup_list_dis)
221 tup_list_op = tuple(((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,0)))
222 self.dxl_io.sync_set_operation_mode(tup_list_op)
224 tup_list_en = tuple(((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1)))
225 self.dxl_io.sync_set_torque_enabled(tup_list_en)
230 This enables the teaching mode of the Revel. Teaching mode senses outside forces and assists movement in the direction 236 tup_list_dis = tuple(((1,0),(2,0),(3,0),(4,0),(5,0),(6,0),(7,0)))
237 self.dxl_io.sync_set_torque_enabled(tup_list_dis)
240 tup_list_op = tuple(((1,0),(2,0),(3,0),(4,0),(5,0),(6,0),(7,0)))
241 self.dxl_io.sync_set_operation_mode(tup_list_op)
243 tup_list_en = tuple(((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1)))
244 self.dxl_io.sync_set_torque_enabled(tup_list_en)
248 Thread(target=self.compliance_controller.start).start()
254 tup_list_dis = tuple(((1,0),(2,0),(3,0),(4,0),(5,0),(6,0),(7,0)))
255 self.dxl_io.sync_set_torque_enabled(tup_list_dis)
257 tup_list_op = tuple(((1,5),(2,5),(3,5),(4,5),(5,5),(6,5),(7,0)))
258 self.dxl_io.sync_set_operation_mode(tup_list_op)
260 tup_list_en = tuple(((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1)))
261 self.dxl_io.sync_set_torque_enabled(tup_list_en)
267 open_close_gripper = rospy.get_param(
'~cycle_gripper_on_start',
False)
273 rospy.loginfo(
"Starting Revel trajectory action")
275 self.traj_client.wait_for_server()
278 rospy.loginfo(
"Starting Revel forward kinematics action")
280 self.fkine_action.start()
282 rospy.loginfo(
"Starting Revel Arm Services")
284 Thread(target=arm_utils.start).start()
289 gripper_server.start()
290 rospy.loginfo(
"Starting Revel gripper server.")
292 rospy.loginfo(
"Starting dynamic reconfigure.")
294 Thread(target=self.dynamic_reconfigure_srv.start).start()
297 rospy.loginfo(
"Starting Revel Cartesian controller.")
299 Thread(target=cart_server.loop).start()
301 compliance_demonstration =
False 302 if compliance_demonstration:
303 rospy.loginfo(
"Starting with experimental dynamic compliance.")
306 Thread(target=self.compliance_controller.start).start()
310 Initialize internal motor parameters that are reset when powered down. 316 mode = rospy.get_param(
'~mode',
"user_defined")
317 teaching_mode = mode ==
"gravity" 318 vel_mode = mode ==
"velocity" 329 self.dxl_io.set_goal_current(7, 0)
330 self.dxl_io.set_current_limit(7, 1900)
335 To increase reliability of packet transmission and reduce the number of packets required to fetch 336 motor status, set the indirect addresses on each motor. 337 This is REQUIRED to be called before starting any status based callbacks. 340 def set_indirect_address(self): 341 bulk write ( INDIR_ADDR_1, (1, MX_PRESENT_CURRENT), (2, MX_PRESENT_CURRENT), ... ) 342 bulk write ( INDIR_ADDR_1 + 2, (1, MX_PRESENT_CURRENT+1), (2, MX_PRESENT_CURRENT+1), ...) 350 Given an array of joint positions (in radians), send request to individual servos 353 goal = FollowJointTrajectoryGoal()
354 goal.trajectory.joint_names = [
'joint_1',
'joint_2',
'joint_3',
'joint_4',
'joint_5',
'joint_6']
355 point = JointTrajectoryPoint()
356 point.positions = data.positions
358 point.time_from_start = rospy.Duration(0.1)
359 goal.trajectory.points.append(point)
360 self.traj_client.send_goal_and_wait(goal)
361 res = SvenzvaJointResult()
363 self.fkine_action.set_succeeded(res)
368 return int(round( angle * 4096.0 / 6.2831853 ))
373 return raw * 6.2831853 / 4096.0
377 return max(1, int(round(vel / (RPM_TO_RADSEC * RPM_PER_TICK))))
381 return vel * RPM_PER_TICK * RPM_TO_RADSEC
383 if __name__ ==
'__main__':
388 except rospy.ROSInterruptException:
def initialze_motor_states(self)
def __update_motor_states(self)
def set_user_defined_mode(self)
def __init__(self, port_name='/dev/ttyUSB0', port_namespace='revel', baud_rate='115200', min_motor_id=1, max_motor_id=7, update_rate=10, diagnostics_rate=0, readback_echo=False)
def connect(self, port_name, baud_rate, readback_echo)