joint_trajectory_action_server.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 import rospy
4 
5 import actionlib
6 
7 import arm
8 
9 from control_msgs.msg import (
10  FollowJointTrajectoryAction,
11  FollowJointTrajectoryFeedback,
12  FollowJointTrajectoryResult,
13 )
14 
15 from trajectory_msgs.msg import (
16  JointTrajectoryPoint
17 )
18 
19 from math import pi
20 import time
21 import operator
22 import string
23 
24 arm = arm.Arm()
25 
26 GOAL_TOLERANCE = [0.05, 0.05, 0.05, 0.05, 0.05] #Tolerance joints must be within to report success, in radians
27 
28 RATIOS = [3640, 8400, 6000, 4000, 4500] #Joint motor counts corresponding to 90 degree rotation
29 
30 # Assumption: Strings end with a keyword, followed only by ASCII
31 # whitespace and >, which is to be stripped off.
32 OUTPUT_STRIP_CHARS = string.whitespace + '>'
33 
34 
36  '''Reorganise waist-to-wrist ordered joint counts into alphabetical angle values for ros publishing'''
37  out = [0, 0, 0, 0, 0]
38  out[0] = float(-(li[2]*pi/(RATIOS[2]*2)))
39  out[1] = float(-(li[3]*pi/(RATIOS[3]*2)))
40  out[2] = float(-(li[1]*pi/(RATIOS[1]*2)))
41  out[3] = float(-(li[0]*pi/(RATIOS[0]*2)))
42  out[4] = float((li[4]*pi/(RATIOS[4]*2)))
43  return(out)
44 
46  '''Reorganise alphabetical angle values into waist-to-wrist ordered joint counts for sending to arm'''
47  out = [0, 0, 0, 0, 0]
48  out[0] = int(round(-(li[3]*2*RATIOS[0]/pi))) #joint coordinates come as radians, sorted by joint name in alphabetical order; reoorganise and scale by joint count ratios
49  out[1] = int(round(-(li[2]*2*RATIOS[1]/pi)))
50  out[2] = int(round(-(li[0]*2*RATIOS[2]/pi)))
51  out[3] = int(round(-(li[1]*2*RATIOS[3]/pi)))
52  out[4] = int(round(li[4]*2*RATIOS[4]/pi))
53  return(out)
54 
56  '''Joint trajectory action server allowing joint position based control of r12 arm'''
57  def __init__(self, controller_name):
58  self._action_ns = controller_name + '/follow_joint_trajectory'
60  self._action_ns,
61  FollowJointTrajectoryAction,
62  execute_cb=self._execute_cb,
63  auto_start = False)
64  self._action_name = rospy.get_name()
65  self._as.start()
66  self._feedback = FollowJointTrajectoryFeedback()
67  self._result = FollowJointTrajectoryResult()
68  self._rate = rospy.Rate(20)
69  rospy.loginfo('Successful init')
70 
71  def _check_goal(self, pos, final_pos, tolerances):
72  '''Checks if all joints are within tolerance of final position, and returns true if so'''
73  errors = map(operator.sub,
74  pos,
75  final_pos
76  ) #obtains a list of differences between actual position and final desired position
77  errors_tolerances = zip(errors, tolerances)
78  for ele in errors_tolerances:
79  if (abs(ele[0]) > ele[1]) : #compares absolute angle difference from goal with tolerance for that joint, for each joint
80  return False
81  return True #if all joints within tolerance, goal achieved so return true
82 
83  def _update_feedback(self, joint_names, last_pos, time_elapsed):
84  '''Updates feedback with current joint positions and returns current position as angles'''
85  new_pos = joint_to_angle(arm.read_pos(angle_to_joint(last_pos)))
86  self._feedback.header.stamp = rospy.Duration.from_sec(rospy.get_time())
87  self._feedback.joint_names = joint_names
88  self._feedback.desired.positions = new_pos #joint positions are given as a list at the start- no way of checking current command point so just fake by using current position
89  self._feedback.desired.time_from_start = rospy.Duration.from_sec(time_elapsed)
90  self._feedback.actual.positions = new_pos
91  self._feedback.actual.time_from_start = rospy.Duration.from_sec(time_elapsed)
92  self._feedback.error.positions = map(operator.sub,
93  self._feedback.desired.positions,
94  self._feedback.actual.positions
95  )
96  self._feedback.error.time_from_start = rospy.Duration.from_sec(time_elapsed)
97  self._as.publish_feedback(self._feedback)
98  return new_pos
99 
100  def _execute_cb(self, goal):
101  '''Initialises and runs route, and updates feedback until complete or time duration violated'''
102  rospy.loginfo('Started cb')
103  start_time = rospy.get_time()
104  joint_names = goal.trajectory.joint_names
105  trajectory_points = goal.trajectory.points
106  if joint_to_angle(trajectory_points[0].positions) == joint_to_angle(trajectory_points[1].positions): #given trajectory sometimes has two identical points at start of route, which causes errors
107  trajectory_points = trajectory_points[1:]
108  arm.write('FIND TRAJECTORY .\r\n') #returns memory location of route TRAJECTORY if it exists in memory, else returns 0
109  route_exists_check = arm.read()
110  route_exists_check = route_exists_check.strip(OUTPUT_STRIP_CHARS)
111  if (route_exists_check[-1]) <> 0: #if FIND does not return 0, TRAJECTORY already exists in memory
112  arm.write('FORGET TRAJECTORY\r\n')
113  arm.write('ROUTE TRAJECTORY\r\n') #create a new route TRAJECTORY
114  arm.write('500 RESERVE\r\n') #reserve space in memory of route points
115  joint_values = [0, 0, 0, 0, 0] #initialise a list to hold joint values at each point in the trajectory
116  for point in trajectory_points: #for each point in the trajectory, create a string of the form a b c d e $JL where variables are joint counts for each joint
117  position_input = ''
118  joint_values = angle_to_joint(point.positions) #obtains joint counts from received joint angles
119  for i in range (4,-1,-1):
120  position_input += str(joint_values[i])
121  position_input += ' '
122  position_input += '$JL\r\n'
123  arm.write(position_input) #sends created string to arm, teaching point in trajectory
124  print(arm.read())
125  arm.write('$RUN\r\n') #run created route
126  end_time = trajectory_points[-1].time_from_start.to_sec() #get expected time for route to complete
127  time_elapsed = rospy.get_time() - start_time
128  pos = trajectory_points[0].positions #initialises current position to starting position
129  rospy.loginfo('Started route')
130  while (time_elapsed < end_time and not rospy.is_shutdown()): #loop until timeout
131  self._rate.sleep()
132  if self._as.is_preempt_requested():
133  rospy.loginfo("%s: Joint trajectory action preempted, stopping arm" % (self._action_name))
134  arm.stop_route() #sends stop command to arm
135  pos = self._update_feedback(joint_names, pos, time_elapsed) #updates feedback with final position
136  self._as.set_preempted() #cpnfirms action status as prempted and ends action
137  break
138  result = self._check_goal(pos, trajectory_points[-1].positions, GOAL_TOLERANCE)
139  pos = self._update_feedback(joint_names, pos, time_elapsed) #updates and publishes feedback, and sets pos to current position
140  if result:
141  rospy.loginfo("%s: Joint trajectory action succeeded" % (self._action_name))
142  self._result.error_code = self._result.SUCCESSFUL
143  self._as.set_succeeded(self._result)
144  break #on success, finish
145 
146 
147 if __name__ == '__main__':
148  rospy.init_node('r12_interface')
149  port = arm.connect() #Find r12 arm and connect to port where it is located
150  rospy.loginfo('Success: Connected to ${0}.'.format(port))
151  server = JointTrajectoryActionServer('r12_arm_controller')
152  rospy.spin()
153 
154 
def _update_feedback(self, joint_names, last_pos, time_elapsed)
Definition: arm.py:90


r12_hardware_interface
Author(s): Andreas Hogstrand
autogenerated on Mon Feb 28 2022 23:20:33