7 from control_msgs.msg
import FollowJointTrajectoryAction, FollowJointTrajectoryResult
8 from sensor_msgs.msg
import JointState
9 from std_srvs.srv
import Trigger, TriggerResponse
16 params = rospy.get_param(
'~')
28 js.position = [0]*len(js.name)
29 js.velocity = [0]*len(js.name)
30 js.effort = [0]*len(js.name)
39 action_name =
"joint_trajectory_controller/follow_joint_trajectory" 42 rospy.loginfo(
"Emulation running for action %s of type FollowJointTrajectoryAction"%(action_name))
45 self.joint_states.position = [0.0] * len(self.joint_states.position)
46 self.joint_states.velocity = [0.0] * len(self.joint_states.velocity)
47 self.joint_states.effort = [0.0] * len(self.joint_states.effort)
49 return TriggerResponse(
51 message =
"Succesfully reset joint states" 57 fjta_joint_names = copy.deepcopy(goal.trajectory.joint_names)
58 fjta_joint_names.sort()
59 if joint_names == fjta_joint_names:
60 rospy.loginfo(
"got a new joint trajectory goal for %s", joint_names)
63 goal_sorted = copy.deepcopy(goal)
64 goal_sorted.trajectory.joint_names = self.
joint_names 67 latest_time_from_start = rospy.Duration(0)
70 time_since_start_of_previous_point = rospy.Duration(0)
75 for point
in goal_sorted.trajectory.points:
77 if len(point.positions) != nr_points_dof:
78 self.as_fjta.set_aborted()
81 point_time_delta = point.time_from_start - time_since_start_of_previous_point
83 rospy.logwarn(
"current trajectory point has time_delta smaller than sample_rate: {} < {}! Skipping".format(point_time_delta.to_sec(), self.
sample_rate_dur_secs))
89 idx = goal.trajectory.joint_names.index(joint_name)
90 positions_sorted.append(point.positions[idx])
91 point.positions = positions_sorted
99 t1 = point.time_from_start - time_since_start_of_previous_point
102 velocities = [0] * nr_points_dof
103 for i
in range(nr_points_dof):
104 if t1.to_sec() != 0.0:
105 velocities[i] = (point.positions[i] - joint_states_prev.position[i]) / float(t1.to_sec())
108 self.joint_states.velocity = velocities
111 while not rospy.is_shutdown()
and ((point.time_from_start - latest_time_from_start) > rospy.Duration(0)):
113 if self.as_fjta.is_preempt_requested():
114 rospy.loginfo(
"preempt requested")
115 self.as_fjta.set_preempted()
119 t0 = latest_time_from_start - time_since_start_of_previous_point
127 interpolated_positions = [0] * nr_points_dof
128 for i
in range(nr_points_dof):
129 interpolated_positions[i] = (1.0 - alpha) * joint_states_prev.position[i] + alpha * point.positions[i]
130 self.joint_states.position = interpolated_positions
133 self.sample_rate.sleep()
138 latest_time_from_start = point.time_from_start
139 self.joint_states.position = point.positions
141 time_since_start_of_previous_point = latest_time_from_start
144 self.joint_states.velocity = [0.0] * nr_points_dof
145 self.joint_states.effort = [0.0] * nr_points_dof
147 self.as_fjta.set_succeeded(FollowJointTrajectoryResult())
149 rospy.logerr(
"received unexpected joint names in goal")
150 self.as_fjta.set_aborted()
154 msg.header.stamp = rospy.Time.now()
155 self.pub_joint_states.publish(msg)
157 if __name__ ==
'__main__':
158 rospy.init_node(
'emulation_follow_joint_trajectory')
def publish_joint_states(self, event)