trajectory_generator.py
Go to the documentation of this file.
1 # Copyright (c) 2016-2019 The UUV Simulator Authors.
2 # All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import rospy
16 import numpy as np
17 from copy import deepcopy
18 from geometry_msgs.msg import Vector3, PoseStamped, Quaternion
19 import uuv_control_msgs.msg as uuv_control_msgs
20 from nav_msgs.msg import Path
21 from uuv_waypoints import WaypointSet
22 from .wp_trajectory_generator import WPTrajectoryGenerator
23 from .trajectory_point import TrajectoryPoint
24 from tf_quaternion.transformations import euler_from_quaternion
25 from ._log import get_logger
26 
27 
28 class TrajectoryGenerator(object):
29  """Trajectory generator based on waypoint and trajectory interpolation.
30 
31  > *Input arguments*
32 
33  * `full_dof` (*type:* `bool`, *default:* `False`): If `True`, generate
34  the trajectory in 6 DoFs, otherwise `roll` and `pitch` are set to zero.
35  * `stamped_pose_only` (*type:* `bool`, *default:* `False`): If `True`
36  the output trajectory will set velocity and acceleration references
37  as zero.
38  """
39  def __init__(self, full_dof=False, stamped_pose_only=False):
41  self._points = None
42  self._time = None
43  self._this_pnt = None
44  self._is_full_dof = full_dof
45  self._stamped_pose_only = stamped_pose_only
46  self._wp_interp_on = False
47  self._wp_interp = WPTrajectoryGenerator(
48  full_dof=full_dof, stamped_pose_only=stamped_pose_only)
49 
50  self._has_started = False
51  self._is_finished = False
52 
53  @property
54  def points(self):
55  """List of `uuv_trajectory_generator.TrajectoryPoint`: List of trajectory points"""
56  if self._wp_interp_on:
57  return self._wp_interp.get_samples(0.001)
58  else:
59  return self._points
60 
61  @property
62  def time(self):
63  """List of `float`: List of timestamps"""
64  return self._time
65 
66  def use_finite_diff(self, flag):
67  self._wp_interp.use_finite_diff = flag
68 
70  return self._wp_interp.use_finite_diff
71 
72  def set_stamped_pose_only(self, flag):
73  """Set flag to enable or disable computation of trajectory
74  points
75 
76  > *Input arguments*
77 
78  * `flag` (*type:* `bool`): Parameter description
79 
80  > *Returns*
81 
82  Description of return values
83  """
84  self._wp_interp.stamped_pose_only = flag
85 
87  return self._wp_interp.stamped_pose_only
88 
89  def set_interp_method(self, method):
90  return self._wp_interp.set_interpolation_method(method)
91 
92  def get_interp_method(self):
93  return self._wp_interp.get_interpolation_method()
94 
96  return self._wp_interp.interpolator_tags
97 
98  def set_interpolator_parameters(self, method, params):
99  return self._wp_interp.set_interpolator_parameters(method, params)
100 
102  if self._wp_interp_on:
103  return self._wp_interp.get_visual_markers()
104  else:
105  return None
106 
107  def _reset(self):
108  self._points = None
109  self._time = None
110  self._this_pnt = None
111  self._has_started = False
112  self._is_finished = False
113 
115  """
116  Return the trajectory points as a Trajectory type message. If waypoints
117  are currently in use, then sample the interpolated path and return the
118  poses only.
119  """
120 
121  if self.points is None:
122  return None
123  msg = uuv_control_msgs.Trajectory()
124  try:
125  msg.header.stamp = rospy.Time.now()
126  except:
127  self._logger.warning(
128  'A ROS node was not initialized, no '
129  'timestamp can be set to Trajectory message')
130  msg.header.frame_id = 'world'
131  for pnt in self.points:
132  # FIXME Sometimes the time stamp of the point is NaN
133  msg.points.append(pnt.to_message())
134  return msg
135 
137  """Return true if the waypoint interpolation is being used."""
138  return self._wp_interp_on
139 
140  def set_waypoints(self, waypoints, init_rot=(0, 0, 0, 1)):
141  """Initializes the waypoint interpolator with a set of waypoints."""
142  self._logger.info('Initial rotation vector (quat)=%s', str(init_rot))
143  self._logger.info('Initial rotation vector (rpy)=%s',
144  str(euler_from_quaternion(init_rot)))
145  if self._wp_interp.init_waypoints(waypoints, init_rot):
146  self._wp_interp_on = True
147  return True
148  else:
149  return False
150 
151  def get_waypoints(self):
152  """
153  Return the waypoints used by the waypoint interpolator,
154  if any exist.
155  """
156 
157  if not self.is_using_waypoints():
158  self._logger.info('NOT USING WAYPOINTS')
159  return None
160  return self._wp_interp.get_waypoints()
161 
162  def add_waypoint(self, waypoint, add_to_beginning=False):
163  """
164  Add waypoint to the current waypoint set, if one has been initialized.
165  """
166  if not self._wp_interp_on:
167  return False
168  self._wp_interp.add_waypoint(waypoint, add_to_beginning)
169  return True
170 
171  def add_trajectory_point(self, pnt):
172  """
173  If a trajectory set is currently being used in the interpolation
174  process, add a trajectory point to the set.
175  """
176 
177  if not self._wp_interp_on:
178  if self._points is None:
179  self._points = list()
180  self._time = list()
181  if len(self._points) > 1:
182  if pnt.t <= self._points[-1].t:
183  return False
184  self._points.append(pnt)
185  self._time.append(pnt.t)
186  return True
187  else:
188  self._logger.error('Cannot add trajectory point! Generator is in '
189  'waypoint interpolation mode!')
190  return False
191 
193  if not self._wp_interp_on:
194  pnt = TrajectoryPoint()
195  if pnt.from_message(msg):
196  self.add_trajectory_point(pnt)
197  return True
198  else:
199  self._logger.error('Error converting message to trajectory point')
200  return False
201  else:
202  self._logger.error('Cannot add trajectory point! Generator is in '
203  'waypoint interpolation mode!')
204  return False
205 
206  def set_duration(self, t):
207  if not self._wp_interp_on:
208  self._logger.error('Waypoint interpolation is not activated')
209  return False
210  else:
211  return self._wp_interp.set_duration(t)
212 
213  def get_max_time(self):
214  if self._points is None and not self._wp_interp_on:
215  return None
216  if self._wp_interp_on:
217  return self._wp_interp.get_max_time()
218  else:
219  return self._points[-1].t
220 
221  def is_running(self):
222  return self._has_started and not self._is_finished
223 
224  def has_started(self):
225  if self._wp_interp_on:
226  return self._wp_interp.started
227  else:
228  return self._has_started
229 
230  def has_finished(self):
231  if self._wp_interp_on:
232  return self._wp_interp.is_finished()
233  else:
234  return self._is_finished
235 
237  self._wp_interp_on = False
238  self._points = None
239  self._time = None
240 
241  last_t = None
242  for p_msg in msg.points:
243  t = p_msg.header.stamp.to_sec()
244 
245  if last_t is None:
246  last_t = t
247  else:
248  if t <= last_t:
249  self._logger.error('Trajectory should be given a growing value '
250  'of time')
251  self._reset()
252  return False
254  self._has_started = True
255  return True
256 
258  wp_set = WaypointSet.from_message(msg)
259  self._wp_interp.init_waypoints(wp_set)
260  self._wp_interp_on = True
261 
262  def init_from_waypoint_file(self, filename):
263  wp_set = WaypointSet()
264  success = wp_set.read_from_file(filename)
265  if success:
266  self._wp_interp.init_waypoints(wp_set)
267  self._wp_interp_on = True
268  return success
269 
271  if self._points is None:
272  return None
273  msg = uuv_control_msgs.Trajectory()
274  try:
275  msg.header.stamp = rospy.Time.now()
276  set_timestamps = True
277  except:
278  set_timestamps = False
279  self._logger.warning(
280  'ROS node was not initialized, no timestamp '
281  'can be assigned to trajectory message')
282  msg.header.frame_id = 'world'
283  if not self.is_using_waypoints:
284  for p in self._points:
285  p_msg = uuv_control_msgs.TrajectoryPoint()
286  if set_timestamps:
287  p_msg.header.stamp = rospy.Time(p.t)
288  p_msg.pose.position = Vector3(*p.p)
289  p_msg.pose.orientation = Quaternion(*p.q)
290  p_msg.velocity.linear = Vector3(*p.v)
291  p_msg.velocity.angular = Vector3(*p.w)
292  p_msg.acceleration.linear = Vector3(*p.a)
293  p_msg.acceleration.angular = Vector3(*p.alpha)
294  msg.points.append(p_msg)
295  else:
296  dt = 0.05 * self._wp_interp.get_max_time()
297  for ti in np.arange(0, self._wp_interp.get_max_time(), dt):
298  pnt = self._wp_interp.interpolate(ti)
299  p_msg = uuv_control_msgs.TrajectoryPoint()
300  if set_timestamps:
301  p_msg.header.stamp = rospy.Time(pnt.t)
302  p_msg.pose.position = Vector3(*pnt.p)
303  p_msg.pose.orientation = Quaternion(*pnt.q)
304  p_msg.velocity.linear = Vector3(*pnt.v)
305  p_msg.velocity.angular = Vector3(*pnt.w)
306  p_msg.acceleration.linear = Vector3(*pnt.a)
307  p_msg.acceleration.angular = Vector3(*pnt.alpha)
308  msg.points.append(p_msg)
309  return msg
310 
311  def get_path_message(self):
312  path_msg = Path()
313  try:
314  path_msg.header.stamp = rospy.Time.now()
315  set_timestamps = True
316  except:
317  set_timestamps = False
318  self._logger.warning(
319  'ROS node was not initialized, no timestamp '
320  'can be assigned to path message')
321  path_msg.header.frame_id = 'world'
322  path_msg.poses = list()
323 
324  for point in self._local_planner.points:
325  pose = PoseStamped()
326  if set_timestamps:
327  pose.header.stamp = rospy.Time(point.t)
328  pose.pose.position = Vector3(*point.p)
329  pose.pose.orientation = Quaternion(*point.q)
330  path_msg.poses.append(pose)
331 
332  def set_start_time(self, t):
333  if self._wp_interp_on:
334  self._wp_interp.set_start_time(t)
335  return True
336  return False
337 
338  def generate_reference(self, t, *args):
339  if self._wp_interp_on:
340  return self._wp_interp.generate_reference(t, *args)
341  else:
342  return None
343 
344  def interpolate(self, t, *args):
345  if not self._wp_interp_on:
346  self._this_pnt = TrajectoryPoint()
347  if self._points is None:
348  return None
349  if len(self._points) == 0:
350  return None
351 
352  if type(self._time) == list:
353  self._time = np.array(self._time)
354 
355  # Interpolate the given trajectory
356  self._this_pnt.t = t
357  if t <= self._points[0].t:
358  self._this_pnt.pos = deepcopy(self._points[0].pos)
359  self._this_pnt.rotq = deepcopy(self._points[0].rotq)
360  self._has_started = False
361  self._has_finished = False
362  if t >= self._points[-1].t:
363  self._this_pnt.pos = deepcopy(self._points[-1].pos)
364  self._this_pnt.rotq = deepcopy(self._points[-1].rotq)
365  self._has_started
366  self._is_finished = True
367  else:
368  self._has_started = True
369  self._has_finished = False
370  idx = np.argmin(np.abs(self._time - t))
371  if idx == 0:
372  self._this_pnt = deepcopy(self._points[0])
373  else:
374  if t < self._points[idx].t:
375  p_this = self._points[idx]
376  p_last = self._points[idx - 1]
377  else:
378  p_this = self._points[idx + 1]
379  p_last = self._points[idx]
380  dt = p_this.t - p_last.t
381  w1 = (t - p_last.t) / dt
382  w0 = (p_this.t - t) / dt
383  self._this_pnt.pos = w0*p_last.pos + w1*p_this.pos
384  self._this_pnt.rotq = w0*p_last.rotq + w1*p_this.rotq
385  self._this_pnt.vel = w0*p_last.vel + w1*p_this.vel
386  self._this_pnt.acc = w0*p_last.acc + w1*p_this.acc
387  else:
388  self._this_pnt = self._wp_interp.interpolate(t, *args)
389 
390  return self._this_pnt
def __init__(self, full_dof=False, stamped_pose_only=False)


uuv_trajectory_control
Author(s):
autogenerated on Thu Jun 18 2020 03:28:42