waypoint_set.py
Go to the documentation of this file.
1 # Copyright (c) 2016-2019 The UUV Simulator Authors.
2 # All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 from __future__ import print_function
16 import rospy
17 import numpy as np
18 import os
19 import yaml
20 from .waypoint import Waypoint
21 from uuv_control_msgs.msg import WaypointSet as WaypointSetMessage
22 from visualization_msgs.msg import Marker, MarkerArray
23 from nav_msgs.msg import Path
24 from geometry_msgs.msg import PoseStamped
25 
26 
27 class WaypointSet(object):
28  """Set of waypoints.
29 
30  > *Attributes*
31 
32  * `FINAL_WAYPOINT_COLOR` (*type:* list of `float`, *value:* `[1.0, 0.5737, 0.0]`): RGB color for marker of the final waypoint in RViz
33  * `OK_WAYPOINT` (*type:* list of `float`, *value:* `[0.1216, 0.4157, 0.8863]`): RGB color for marker of a successful waypoint in RViz
34  * `FAILED_WAYPOINT` (*type:* list of `float`, *value:* `[1.0, 0.0, 0.0]`): RGB color for marker of a failed waypoint in RViz
35 
36  > *Input arguments*
37 
38  * `scale` (*type:* `float`, *default:* `0.1`): Scale of the spherical marker for waypoints
39  * `inertial_frame_id` (*type:* `str`, *default:* `'world'`): Name of the inertial reference frame, options are `world` and `world_ned` for `ENU` and `NED` inertial reference frames, respectively
40  * `max_surge_speed` (*type:* `float`, *default:* `None`): Max. surge speed in m/s associated with each waypoint
41 
42  """
43  FINAL_WAYPOINT_COLOR = [1.0, 0.5737, 0.0]
44  OK_WAYPOINT = [31. / 255, 106. / 255, 226. / 255]
45  FAILED_WAYPOINT = [1.0, 0.0, 0.0]
46 
47  def __init__(self, scale=0.1, inertial_frame_id='world', max_surge_speed=None):
48  assert inertial_frame_id in ['world', 'world_ned']
49  self._waypoints = list()
50  self._violates_constraint = False
51  self._scale = scale
52  self._inertial_frame_id = inertial_frame_id
53  self._max_surge_speed = max_surge_speed
54 
55  def __str__(self):
56  if self.num_waypoints:
57  msg = '================================\n'
58  msg += 'List of waypoints\n'
59  msg += '================================\n'
60  for i in range(self.num_waypoints):
61  msg += self.get_waypoint(i).__str__()
62  msg += '---\n'
63  msg += 'Number of waypoints = %d\n' % self.num_waypoints
64  msg += 'Number of valid waypoints = %d\n' % self.num_waypoints
65  msg += 'Inertial frame ID = %s\n' % self._inertial_frame_id
66  return msg
67  else:
68  return 'Waypoint set is empty'
69 
70  @property
71  def num_waypoints(self):
72  """`int`: Number of waypoints"""
73  return len(self._waypoints)
74 
75  @property
76  def x(self):
77  """`list`: List of the X-coordinates of all waypoints"""
78  return [wp.x for wp in self._waypoints]
79 
80  @property
81  def y(self):
82  """`list`: List of the Y-coordinates of all waypoints"""
83  return [wp.y for wp in self._waypoints]
84 
85  @property
86  def z(self):
87  """`list`: List of the Z-coordinates of all waypoints"""
88  return [wp.z for wp in self._waypoints]
89 
90  @property
91  def is_empty(self):
92  """`bool`: True if the list of waypoints is empty"""
93  return len(self._waypoints) == 0
94 
95  @property
96  def inertial_frame_id(self):
97  """`str`: Name of inertial reference frame"""
98  return self._inertial_frame_id
99 
100  @inertial_frame_id.setter
101  def inertial_frame_id(self, frame_id):
102  assert frame_id in ['world', 'world_ned'], \
103  'Inertial reference frame can only be either world or world_ned'
104  self._inertial_frame_id = frame_id
105 
106  def clear_waypoints(self):
107  """Clear the list of waypoints"""
108  self._waypoints = list()
109 
110  def set_constraint_status(self, index, flag):
111  """Set the flag violates_constraint to a waypoint
112 
113  > *Input arguments*
114 
115  * `index` (*type:* `int`): Index of the waypoints
116  * `flag` (*type:* `bool`): True, if waypoint violates a constraint
117 
118  > *Returns*
119 
120  `True` if successful, and `False` if the waypoint `index` is outsite of the list's range.
121  """
122  if index < 0 or index >= len(self._waypoints):
123  return False
124  self._waypoints[index].violates_constraint = flag
125  return True
126 
127  def get_waypoint(self, index):
128  """Return a waypoint
129 
130  > *Input arguments*
131 
132  * `index` (*type:* `int`): Index of the waypoint
133 
134  > *Returns*
135 
136  Return a waypoint as `uuv_waypoints.Waypoint` or `None` if `index` is outside of range.
137  """
138  if index < 0 or index >= len(self._waypoints):
139  return None
140  return self._waypoints[index]
141 
142  def add_waypoint(self, waypoint, add_to_beginning=False):
143  """Add a waypoint to the set
144 
145  > *Input arguments*
146 
147  * `waypoint` (*type:* `uuv_waypoints.Waypoint`): Waypoint object
148  * `add_to_beginning` (*type:* `bool`, *default:* `False`): If `True`, add the waypoint to the beginning of the list.
149 
150  > *Returns*
151 
152  `True` if waypoint was added to the set. `False` if a repeated waypoint is already found in the set.
153  """
154  if len(self._waypoints):
155  if self._waypoints[-1] != waypoint:
156  if not add_to_beginning:
157  self._waypoints.append(waypoint)
158  else:
159  self._waypoints = [waypoint] + self._waypoints
160  else:
161  print('Cannot add repeated waypoint')
162  return False
163  else:
164  if not add_to_beginning:
165  self._waypoints.append(waypoint)
166  else:
167  self._waypoints = [waypoint] + self._waypoints
168  return True
169 
170  def add_waypoint_from_msg(self, msg):
171  """Add waypoint from ROS `uuv_control_msgs/Waypoint` message
172 
173  > *Input arguments*
174 
175  * `msg` (*type:* `uuv_control_msgs/Waypoint`): Waypoint message
176 
177  > *Returns*
178 
179  `True`, if waypoint could be added to set, `False`, otherwise.
180  """
181  waypoint = Waypoint()
182  waypoint.from_message(msg)
183  return self.add_waypoint(waypoint)
184 
186  """Return the starting waypoint
187 
188  > *Returns*
189 
190  A `uuv_waypoints.Waypoint` object or None, if the list of waypoints is empty.
191  """
192  if len(self._waypoints):
193  return self._waypoints[0]
194  else:
195  return None
196 
197  def get_last_waypoint(self):
198  """Return the final waypoint
199 
200  > *Returns*
201 
202  A `uuv_waypoints.Waypoint` object or None, if the list of waypoints is empty.
203  """
204  if len(self._waypoints):
205  return self._waypoints[-1]
206  return None
207 
208  def remove_waypoint(self, waypoint):
209  """Remove waypoint from set.
210 
211  > *Input arguments*
212 
213  * `waypoint` (*type:* `uuv_waypoints.Waypoint`): Waypoint object
214  """
215  new_waypoints = list()
216  for point in self._waypoints:
217  if point == waypoint:
218  continue
219  new_waypoints.append(point)
220  self._waypoints = new_waypoints
221 
222  def read_from_file(self, filename):
223  """Read waypoint set from a YAML file.
224 
225  > *Input arguments*
226 
227  * `filename` (*type:* `str`): Filename of the waypoint set
228 
229  > *Returns*
230 
231  `True` if waypoint set file could be parsed, `False`, otherwise.
232  """
233  if not os.path.isfile(filename):
234  print('Invalid waypoint filename, filename={}'.format(filename))
235  return False
236  try:
237  self.clear_waypoints()
238  with open(filename, 'r') as wp_file:
239  wps = yaml.load(wp_file)
240  if isinstance(wps, list):
241  for wp_data in wps:
242  wp = Waypoint(
243  x=wp_data['point'][0],
244  y=wp_data['point'][1],
245  z=wp_data['point'][2],
246  max_forward_speed=wp_data['max_forward_speed'],
247  heading_offset=wp_data['heading'],
248  use_fixed_heading=wp_data['use_fixed_heading'],
249  inertial_frame_id='world')
250  self.add_waypoint(wp)
251  self._inertial_frame_id = 'world'
252  else:
253  assert 'inertial_frame_id' in wps, 'Waypoint input has no inertial_frame_id key'
254  assert 'waypoints' in wps
255  assert wps['inertial_frame_id'] in ['world', 'world_ned']
256  self._inertial_frame_id = wps['inertial_frame_id']
257  for wp_data in wps['waypoints']:
258  wp = Waypoint(
259  x=wp_data['point'][0],
260  y=wp_data['point'][1],
261  z=wp_data['point'][2],
262  max_forward_speed=wp_data['max_forward_speed'],
263  heading_offset=wp_data['heading'],
264  use_fixed_heading=wp_data['use_fixed_heading'],
265  inertial_frame_id=wps['inertial_frame_id'])
266  self.add_waypoint(wp)
267  except Exception as e:
268  print('Error while loading the file, message={}'.format(e))
269  return False
270  return True
271 
272  def export_to_file(self, path, filename):
273  """Export waypoint set to YAML file.
274 
275  > *Input arguments*
276 
277  * `path` (*type:* `str`): Path to the folder containing the file
278  * `filename` (*type:* `str`): Name of the YAML file.
279 
280  > *Returns*
281 
282  `True` is waypoints could be exported to file. `False`, otherwise.
283  """
284  try:
285  output = dict(inertial_frame_id=self._inertial_frame_id,
286  waypoints=list())
287  for wp in self._waypoints:
288  wp_elem = dict(point=[float(wp.x), float(wp.y), float(wp.z)],
289  max_forward_speed=float(wp._max_forward_speed),
290  heading=float(wp._heading_offset if wp._heading_offset is not None else 0.0),
291  use_fixed_heading=bool(wp._use_fixed_heading))
292  output['waypoints'].append(wp_elem)
293  with open(os.path.join(path, filename), 'w') as wp_file:
294  yaml.dump(output, wp_file, default_flow_style=False)
295  return True
296  except Exception as e:
297  print('Error occured while exporting waypoint file, message={}'.format(e))
298  return False
299 
300  def to_message(self):
301  """Convert waypoints set to message `uuv_control_msgs/WaypointSet`
302 
303  > *Returns*
304 
305  `uuv_control_msgs/WaypointSet` message object
306  """
307  msg = WaypointSetMessage()
308  msg.header.stamp = rospy.Time().now()
309  msg.header.frame_id = self._inertial_frame_id
310  msg.waypoints = list()
311  for wp in self._waypoints:
312  wp_msg = wp.to_message()
313  wp_msg.header.frame_id = self._inertial_frame_id
314  msg.waypoints.append(wp_msg)
315  return msg
316 
317  def from_message(self, msg):
318  """Convert `uuv_control_msgs/WaypointSet` message into `uuv_waypoints.WaypointSet`
319 
320  > *Input arguments*
321 
322  * `msg` (*type:* `uuv_control_msgs/WaypointSet` object): Waypoint set message
323  """
324  self.clear_waypoints()
325  self.inertial_frame_id = msg.header.frame_id
326  for pnt in msg.waypoints:
327  self.add_waypoint_from_msg(pnt)
328 
329  def dist_to_waypoint(self, pos, index=0):
330  """Compute the distance of a waypoint in the set to point
331 
332  > *Input arguments*
333 
334  * `pos` (*type:* list of `float`): 3D point as a list of coordinates
335  * `index` (*type:* `int`, *default:* `0`): Waypoint index in set
336 
337  > *Returns*
338 
339  Distance between `pos` and the waypoint in `index`. `None` if waypoint set is empty.
340  """
341  wp = self.get_waypoint(index)
342  if wp is not None:
343  return wp.dist(pos)
344  return None
345 
346  def set_radius_of_acceptance(self, index, radius):
347  """Set the radius of acceptance around each waypoint
348  inside which a vehicle is considered to have reached
349  a waypoint.
350 
351  > *Input arguments*
352 
353  * `index` (*type:* `int`): Index of the waypoint
354  * `radius` (*type:* `float`): Radius of the sphere representing the volume of acceptance
355  """
356  if index >= 0 and index < len(self._waypoints):
357  self._waypoints[index].radius_of_acceptance = radius
358 
359  def get_radius_of_acceptance(self, index):
360  """Return the radius of acceptance for a waypoint
361 
362  > *Input arguments*
363 
364  * `index` (*type:* `int`): Index of the waypoint
365 
366  > *Returns*
367 
368  Radius of acceptance for the waypoint in position
369  given by `index` as a `float`. `None` if waypoint
370  set is empty.
371  """
372  if index >= 0 and index < len(self._waypoints):
373  return self._waypoints[index].radius_of_acceptance
374  else:
375  return None
376 
377  def to_path_marker(self, clear=False):
378  """Return a `nav_msgs/Path` message with all waypoints in the set
379 
380  > *Input arguments*
381 
382  * `clear` (*type:* `bool`, *default:* `False`): Return an empty `nav_msgs/Path` message.
383 
384  > *Returns*
385 
386  `nav_msgs/Path` message
387  """
388  path = Path()
389  t = rospy.Time.now()
390  path.header.stamp = t
391  path.header.frame_id = self._inertial_frame_id
392  if self.num_waypoints > 1 and not clear:
393  for i in range(self.num_waypoints):
394  wp = self.get_waypoint(i)
395  pose = PoseStamped()
396  pose.header.stamp = rospy.Time(i)
397  pose.header.frame_id = self._inertial_frame_id
398  pose.pose.position.x = wp.x
399  pose.pose.position.y = wp.y
400  pose.pose.position.z = wp.z
401  path.poses.append(pose)
402  return path
403 
404  def to_marker_list(self, clear=False):
405  """Return waypoint set as a markers list message of type `visualization_msgs/MarkerArray`
406 
407  > *Input arguments*
408 
409  * `clear` (*type:* `bool`, *default:* `False`): Return an empty marker array message
410 
411  > *Returns*
412 
413  `visualization_msgs/MarkerArray` message
414  """
415  list_waypoints = MarkerArray()
416  t = rospy.Time.now()
417  if self.num_waypoints == 0 or clear:
418  marker = Marker()
419  marker.header.stamp = t
420  marker.header.frame_id = self._inertial_frame_id
421  marker.id = 0
422  marker.type = Marker.SPHERE
423  marker.action = 3
424  list_waypoints.markers.append(marker)
425  else:
426  for i in range(self.num_waypoints):
427  wp = self.get_waypoint(i)
428  marker = Marker()
429  marker.header.stamp = t
430  marker.header.frame_id = self._inertial_frame_id
431  marker.id = i
432  marker.type = Marker.SPHERE
433  marker.action = Marker.ADD
434  marker.pose.position.x = wp.x
435  marker.pose.position.y = wp.y
436  marker.pose.position.z = wp.z
437  marker.scale.x = self._scale
438  marker.scale.y = self._scale
439  marker.scale.z = self._scale
440  marker.color.a = 1.0
441  if wp == self.get_last_waypoint():
442  color = wp.get_final_color()
443  else:
444  color = wp.get_color()
445  marker.color.r = color[0]
446  marker.color.g = color[1]
447  marker.color.b = color[2]
448  list_waypoints.markers.append(marker)
449  return list_waypoints
450 
451  def generate_circle(self, radius, center, num_points, max_forward_speed,
452  theta_offset=0.0, heading_offset=0.0, append=False):
453  """Generate a set of waypoints describing a circle
454 
455  > *Input arguments*
456 
457  * `radius` (*type:* `float`): Radius of the circle in meters
458  * `num_points` (*type:* `int`): Number of waypoints to be generated
459  * `max_forward_speed` (*type:* `float`): Max. forward speed set to each waypoint in m/s
460  * `theta_offset` (*type:* `float`, *default:* `0`): Angle offset to start generating the waypoints in radians
461  * `heading_offset` (*type:* `float`, *default:* `0`): Heading offset set to the reference heading of the vehicle in radians
462  * `append` (*type:* `bool`, *default:* `False`): If `True`, append the generated waypoints to the existent waypoints in the set
463 
464  > *Returns*
465 
466  `True` if the circle was successfully generated, `False`, otherwise
467  """
468  if radius <= 0:
469  print('Invalid radius, value={}'.format(radius))
470  return False
471 
472  if num_points <= 0:
473  print('Invalid number of samples, value={}'.format(num_points))
474  return False
475 
476  if max_forward_speed <= 0:
477  print('Invalid absolute maximum velocity, value={}'.format(max_forward_speed))
478  return False
479 
480  if not append:
481  # Clear current list
482  self.clear_waypoints()
483 
484  step_theta = 2 * np.pi / num_points
485  for i in range(num_points):
486  angle = i * step_theta + theta_offset
487  x = np.cos(angle) * radius + center.x
488  y = np.sin(angle) * radius + center.y
489  z = center.z
490  wp = Waypoint(x, y, z, max_forward_speed,
491  heading_offset)
492  self.add_waypoint(wp)
493  return True
494 
495  def generate_helix(self, radius, center, num_points, max_forward_speed, delta_z,
496  num_turns, theta_offset=0.0, heading_offset=0.0,
497  append=False):
498  """Generate a set of waypoints describing a helix
499 
500  > *Input arguments*
501 
502  * `radius` (*type:* `float`): Radius of the circle in meters
503  * `num_points` (*type:* `int`): Number of waypoints to be generated
504  * `max_forward_speed` (*type:* `float`): Max. forward speed set to each waypoint in m/s
505  * `delta_z` (*type:* `float`): Step in the Z direction for each lap of the helix in meters
506  * `theta_offset` (*type:* `float`, *default:* `0`): Angle offset to start generating the waypoints in radians
507  * `heading_offset` (*type:* `float`, *default:* `0`): Heading offset set to the reference heading of the vehicle in radians
508  * `append` (*type:* `bool`, *default:* `False`): If `True`, append the generated waypoints to the existent waypoints in the set
509 
510  > *Returns*
511 
512  `True` if the circle was successfully generated, `False`, otherwise
513  """
514  if radius <= 0:
515  print('Invalid radius, value={}'.format(radius))
516  return False
517 
518  if num_points <= 0:
519  print('Invalid number of samples, value={}'.format(num_points))
520  return False
521 
522  if num_turns <= 0:
523  print('Invalid number of turns, value={}'.format(num_points))
524  return False
525 
526  if max_forward_speed <= 0:
527  print('Invalid absolute maximum velocity, value={}'.format(max_forward_speed))
528  return False
529 
530  if not append:
531  # Clear current list
532  self.clear_waypoints()
533 
534  total_angle = 2 * np.pi * num_turns
535  step_angle = total_angle / num_points
536  step_z = float(delta_z) / num_points
537  for i in range(num_points):
538  angle = theta_offset + i * step_angle
539  x = radius * np.cos(angle) + center.x
540  y = radius * np.sin(angle) + center.y
541  z = step_z * i + center.z
542 
543  wp = Waypoint(x, y, z, max_forward_speed,
544  heading_offset)
545  self.add_waypoint(wp)
546  return True
def to_marker_list(self, clear=False)
def dist_to_waypoint(self, pos, index=0)
def to_path_marker(self, clear=False)
def generate_helix(self, radius, center, num_points, max_forward_speed, delta_z, num_turns, theta_offset=0.0, heading_offset=0.0, append=False)
def set_constraint_status(self, index, flag)
def export_to_file(self, path, filename)
def generate_circle(self, radius, center, num_points, max_forward_speed, theta_offset=0.0, heading_offset=0.0, append=False)
def __init__(self, scale=0.1, inertial_frame_id='world', max_surge_speed=None)
Definition: waypoint_set.py:47
def add_waypoint(self, waypoint, add_to_beginning=False)
def set_radius_of_acceptance(self, index, radius)


uuv_trajectory_control
Author(s):
autogenerated on Thu Jun 18 2020 03:28:42