emulation_follow_joint_trajectory.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import copy
4 
5 import rospy
6 import actionlib
7 from control_msgs.msg import FollowJointTrajectoryAction, FollowJointTrajectoryResult
8 from sensor_msgs.msg import JointState
9 from std_srvs.srv import Trigger, TriggerResponse
10 
12  def __init__(self):
13  # TODO
14  # - speed factor
15 
16  params = rospy.get_param('~')
17  self.joint_names = params['joint_names']
18  # fixed frequency to control the granularity of the sampling resolution
19  self.sample_rate_hz = rospy.get_param("~sample_rate_hz", 10)
20  # duration from one sample to the next
21  self.sample_rate_dur_secs = (1.0 / float(self.sample_rate_hz))
22  # rospy loop rate
23  self.sample_rate = rospy.Rate(self.sample_rate_hz)
24 
25  # joint_states publisher
26  js = JointState()
27  js.name = copy.deepcopy(self.joint_names)
28  js.position = [0]*len(js.name)
29  js.velocity = [0]*len(js.name)
30  js.effort = [0]*len(js.name)
31  self.joint_states = js
32  self.pub_joint_states = rospy.Publisher("joint_states", JointState, queue_size=1)
33  self.js_timer = rospy.Timer(rospy.Duration(self.sample_rate_dur_secs), self.publish_joint_states)
34 
35  # reset service
36  self.service_reset_fjta = rospy.Service("reset_joint_states", Trigger, self.reset_cb)
37 
38  # follow_joint_trajectory action
39  action_name = "joint_trajectory_controller/follow_joint_trajectory"
40  self.as_fjta = actionlib.SimpleActionServer(action_name, FollowJointTrajectoryAction, execute_cb=self.fjta_cb, auto_start = False)
41  self.as_fjta.start()
42  rospy.loginfo("Emulation running for action %s of type FollowJointTrajectoryAction"%(action_name))
43 
44  def reset_cb(self, req):
45  self.joint_states.position = [0.0] * len(self.joint_states.position)
46  self.joint_states.velocity = [0.0] * len(self.joint_states.velocity)
47  self.joint_states.effort = [0.0] * len(self.joint_states.effort)
48 
49  return TriggerResponse(
50  success = True,
51  message = "Succesfully reset joint states"
52  )
53 
54  def fjta_cb(self, goal):
55  joint_names = copy.deepcopy(self.joint_names)
56  joint_names.sort()
57  fjta_joint_names = copy.deepcopy(goal.trajectory.joint_names)
58  fjta_joint_names.sort()
59  if joint_names == fjta_joint_names:
60  rospy.loginfo("got a new joint trajectory goal for %s", joint_names)
61  # sort goal to fit joint_names order in joint_states
62 
63  goal_sorted = copy.deepcopy(goal)
64  goal_sorted.trajectory.joint_names = self.joint_names
65 
66  # keep track of the current time
67  latest_time_from_start = rospy.Duration(0)
68  # the point in time of a previous computation.
69  # This is used to compute the interpolation weight as a function of current and goal time point
70  time_since_start_of_previous_point = rospy.Duration(0)
71 
72  nr_points_dof = len(self.joint_names)
73 
74  # for all points in the desired trajectory
75  for point in goal_sorted.trajectory.points:
76 
77  if len(point.positions) != nr_points_dof:
78  self.as_fjta.set_aborted()
79  return
80 
81  point_time_delta = point.time_from_start - time_since_start_of_previous_point
82  if point_time_delta.to_sec() < self.sample_rate_dur_secs:
83  rospy.logwarn("current trajectory point has time_delta smaller than sample_rate: {} < {}! Skipping".format(point_time_delta.to_sec(), self.sample_rate_dur_secs))
84  continue
85 
86  # we need to resort the positions array because moveit sorts alphabetically but all other ROS components sort in the URDF order
87  positions_sorted = []
88  for joint_name in self.joint_names:
89  idx = goal.trajectory.joint_names.index(joint_name)
90  positions_sorted.append(point.positions[idx])
91  point.positions = positions_sorted
92 
93  joint_states_prev = copy.deepcopy(self.joint_states)
94 
95  # linear interpolation of the given trajectory samples is used
96  # to compute smooth intermediate joints positions at a fixed resolution
97 
98  # upper bound of local duration segment
99  t1 = point.time_from_start - time_since_start_of_previous_point
100  # compute velocity as the fraction of distance from prev point to next point in trajectory
101  # and the corresponding time t1
102  velocities = [0] * nr_points_dof
103  for i in range(nr_points_dof):
104  if t1.to_sec() != 0.0:
105  velocities[i] = (point.positions[i] - joint_states_prev.position[i]) / float(t1.to_sec())
106  else:
107  velocities[i] = 0.0
108  self.joint_states.velocity = velocities
109 
110  # this loop samples the time segment from the current states to the next goal state in "points"
111  while not rospy.is_shutdown() and ((point.time_from_start - latest_time_from_start) > rospy.Duration(0)):
112  # check that preempt has not been requested by the client
113  if self.as_fjta.is_preempt_requested():
114  rospy.loginfo("preempt requested")
115  self.as_fjta.set_preempted()
116  return
117 
118  # current time passed in local duration segment
119  t0 = latest_time_from_start - time_since_start_of_previous_point
120  # compute the interpolation weight as a fraction of passed time and upper bound time in this local segment
121  if t1 != 0.0:
122  alpha = t0 / t1
123  else:
124  alpha = 0.0
125 
126  # interpolate linearly (lerp) each component
127  interpolated_positions = [0] * nr_points_dof
128  for i in range(nr_points_dof):
129  interpolated_positions[i] = (1.0 - alpha) * joint_states_prev.position[i] + alpha * point.positions[i]
130  self.joint_states.position = interpolated_positions
131 
132  # sleep until next sample update
133  self.sample_rate.sleep()
134  # increment passed time
135  latest_time_from_start += rospy.Duration(self.sample_rate_dur_secs)
136 
137  # ensure that the goal and time point is always exactly reached
138  latest_time_from_start = point.time_from_start
139  self.joint_states.position = point.positions
140  # set lower time bound for the next point
141  time_since_start_of_previous_point = latest_time_from_start
142 
143  # set joint velocities to zero after the robot stopped moving (reaching final point of trajectory)
144  self.joint_states.velocity = [0.0] * nr_points_dof
145  self.joint_states.effort = [0.0] * nr_points_dof
146 
147  self.as_fjta.set_succeeded(FollowJointTrajectoryResult())
148  else:
149  rospy.logerr("received unexpected joint names in goal")
150  self.as_fjta.set_aborted()
151 
152  def publish_joint_states(self, event):
153  msg = copy.deepcopy(self.joint_states)
154  msg.header.stamp = rospy.Time.now() # update to current time stamp
155  self.pub_joint_states.publish(msg)
156 
157 if __name__ == '__main__':
158  rospy.init_node('emulation_follow_joint_trajectory')
159  emulation_follow_joint_trajectory = EmulationFollowJointTrajectory()
160  rospy.spin()


cob_hardware_emulation
Author(s): Florian Weisshardt
autogenerated on Thu Apr 8 2021 02:39:41