mavwp.py
Go to the documentation of this file.
1 '''
2 module for loading/saving waypoints
3 '''
4 from __future__ import print_function
5 from builtins import range
6 from builtins import object
7 
8 import time, copy
9 import logging
10 from . import mavutil
11 try:
12  from google.protobuf import text_format
13  import mission_pb2
14  HAVE_PROTOBUF = True
15 except ImportError:
16  HAVE_PROTOBUF = False
17 
18 
19 class MAVWPError(Exception):
20  '''MAVLink WP error class'''
21  def __init__(self, msg):
22  Exception.__init__(self, msg)
23  self.message = msg
24 
25 
26 class MAVWPLoader(object):
27  '''MAVLink waypoint loader'''
28  def __init__(self, target_system=0, target_component=0):
29  self.wpoints = []
30  self.target_system = target_system
31  self.target_component = target_component
32  self.last_change = time.time()
34  mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION : (255,0,0),
35  mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION : (0,255,0)
36  }
37 
38  def count(self):
39  '''return number of waypoints'''
40  return len(self.wpoints)
41 
42  def wp(self, i):
43  '''return a waypoint'''
44  try:
45  the_wp = self.wpoints[i]
46  except:
47  the_wp = None
48 
49  return the_wp
50 
51  def wp_is_loiter(self, i):
52  '''return true if waypoint is a loiter waypoint'''
53  loiter_cmds = [mavutil.mavlink.MAV_CMD_NAV_LOITER_UNLIM,
54  mavutil.mavlink.MAV_CMD_NAV_LOITER_TURNS,
55  mavutil.mavlink.MAV_CMD_NAV_LOITER_TIME,
56  mavutil.mavlink.MAV_CMD_NAV_LOITER_TO_ALT]
57 
58  if (self.wpoints[i].command in loiter_cmds):
59  return True
60 
61  return False
62 
63  def add(self, w, comment=''):
64  '''add a waypoint'''
65  w = copy.copy(w)
66  if comment:
67  w.comment = comment
68  w.seq = self.count()
69  self.wpoints.append(w)
70  self.last_change = time.time()
71 
72  def insert(self, idx, w, comment=''):
73  '''insert a waypoint'''
74  if idx >= self.count():
75  self.add(w, comment)
76  return
77  if idx < 0:
78  return
79  w = copy.copy(w)
80  if comment:
81  w.comment = comment
82  w.seq = idx
83  self.wpoints.insert(idx, w)
84  self.last_change = time.time()
85  self.reindex()
86 
87  def reindex(self):
88  '''reindex waypoints'''
89  for i in range(self.count()):
90  w = self.wpoints[i]
91  w.seq = i
92  self.last_change = time.time()
93 
94  def add_latlonalt(self, lat, lon, altitude, terrain_alt=False):
95  '''add a point via latitude/longitude/altitude'''
96  if terrain_alt:
97  frame = mavutil.mavlink.MAV_FRAME_GLOBAL_TERRAIN_ALT
98  else:
99  frame = mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT
100  p = mavutil.mavlink.MAVLink_mission_item_message(self.target_system,
101  self.target_component,
102  0,
103  frame,
104  mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
105  0, 0, 0, 0, 0, 0,
106  lat, lon, altitude)
107  self.add(p)
108 
109  def set(self, w, idx):
110  '''set a waypoint'''
111  w.seq = idx
112  if w.seq == self.count():
113  return self.add(w)
114  if self.count() <= idx:
115  raise MAVWPError('adding waypoint at idx=%u past end of list (count=%u)' % (idx, self.count()))
116  self.wpoints[idx] = w
117  self.last_change = time.time()
118 
119  def remove(self, w):
120  '''remove a waypoint'''
121  self.wpoints.remove(w)
122  self.last_change = time.time()
123  self.reindex()
124 
125  def clear(self):
126  '''clear waypoint list'''
127  self.wpoints = []
128  self.last_change = time.time()
129 
130  def _read_waypoints_v100(self, file):
131  '''read a version 100 waypoint'''
132  cmdmap = {
133  2 : mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
134  3 : mavutil.mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH,
135  4 : mavutil.mavlink.MAV_CMD_NAV_LAND,
136  24: mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
137  26: mavutil.mavlink.MAV_CMD_NAV_LAND,
138  25: mavutil.mavlink.MAV_CMD_NAV_WAYPOINT ,
139  27: mavutil.mavlink.MAV_CMD_NAV_LOITER_UNLIM
140  }
141  comment = ''
142  for line in file:
143  if line.startswith('#'):
144  comment = line[1:].lstrip()
145  continue
146  line = line.strip()
147  if not line:
148  continue
149  a = line.split()
150  if len(a) != 13:
151  raise MAVWPError("invalid waypoint line with %u values" % len(a))
152  if mavutil.mavlink10():
153  fn = mavutil.mavlink.MAVLink_mission_item_message
154  else:
155  fn = mavutil.mavlink.MAVLink_waypoint_message
156  w = fn(self.target_system, self.target_component,
157  int(a[0]), # seq
158  int(a[1]), # frame
159  int(a[2]), # action
160  int(a[7]), # current
161  int(a[12]), # autocontinue
162  float(a[5]), # param1,
163  float(a[6]), # param2,
164  float(a[3]), # param3
165  float(a[4]), # param4
166  float(a[9]), # x, latitude
167  float(a[8]), # y, longitude
168  float(a[10]) # z
169  )
170  if not w.command in cmdmap:
171  raise MAVWPError("Unknown v100 waypoint action %u" % w.command)
172 
173  w.command = cmdmap[w.command]
174  self.add(w, comment)
175  comment = ''
176 
177  def _read_waypoints_v110(self, file):
178  '''read a version 110 waypoint'''
179  comment = ''
180  for line in file:
181  if line.startswith('#'):
182  comment = line[1:].lstrip()
183  continue
184  line = line.strip()
185  if not line:
186  continue
187  a = line.split()
188  if len(a) != 12:
189  raise MAVWPError("invalid waypoint line with %u values" % len(a))
190  if mavutil.mavlink10():
191  fn = mavutil.mavlink.MAVLink_mission_item_message
192  else:
193  fn = mavutil.mavlink.MAVLink_waypoint_message
194  w = fn(self.target_system, self.target_component,
195  int(a[0]), # seq
196  int(a[2]), # frame
197  int(a[3]), # command
198  int(a[1]), # current
199  int(a[11]), # autocontinue
200  float(a[4]), # param1,
201  float(a[5]), # param2,
202  float(a[6]), # param3
203  float(a[7]), # param4
204  float(a[8]), # x (latitude)
205  float(a[9]), # y (longitude)
206  float(a[10]) # z (altitude)
207  )
208  if w.command == 0 and w.seq == 0 and self.count() == 0:
209  # special handling for Mission Planner created home wp
210  w.command = mavutil.mavlink.MAV_CMD_NAV_WAYPOINT
211  self.add(w, comment)
212  comment = ''
213 
214  def _read_waypoints_pb_110(self, file):
215  if not HAVE_PROTOBUF:
216  raise MAVWPError(
217  'Cannot read mission file in protobuf format without protobuf '
218  'library. Try "easy_install protobuf".')
219  explicit_seq = False
220  warned_seq = False
221  mission = mission_pb2.Mission()
222  text_format.Merge(file.read(), mission)
223  defaults = mission_pb2.Waypoint()
224  # Set defaults (may be overriden in file).
225  defaults.current = False
226  defaults.autocontinue = True
227  defaults.param1 = 0.0
228  defaults.param2 = 0.0
229  defaults.param3 = 0.0
230  defaults.param4 = 0.0
231  defaults.x = 0.0
232  defaults.y = 0.0
233  defaults.z = 0.0
234  # Use defaults specified in mission file, if there are any.
235  if mission.defaults:
236  defaults.MergeFrom(mission.defaults)
237  for seq, waypoint in enumerate(mission.waypoint):
238  # Consecutive sequence numbers are automatically assigned
239  # UNLESS the mission file specifies sequence numbers of
240  # its own.
241  if waypoint.seq:
242  explicit_seq = True
243  else:
244  if explicit_seq and not warned_seq:
245  logging.warn(
246  'Waypoint file %s: mixes explicit and implicit '
247  'sequence numbers' % (file,))
248  warned_seq = True
249  # The first command has current=True, the rest have current=False.
250  if seq > 0:
251  current = defaults.current
252  else:
253  current = True
254  w = mavutil.mavlink.MAVLink_mission_item_message(
255  self.target_system, self.target_component,
256  waypoint.seq or seq,
257  waypoint.frame,
258  waypoint.command,
259  waypoint.current or current,
260  waypoint.autocontinue or defaults.autocontinue,
261  waypoint.param1 or defaults.param1,
262  waypoint.param2 or defaults.param2,
263  waypoint.param3 or defaults.param3,
264  waypoint.param4 or defaults.param4,
265  waypoint.x or defaults.x,
266  waypoint.y or defaults.y,
267  waypoint.z or defaults.z)
268  self.add(w)
269 
270  def load(self, filename):
271  '''load waypoints from a file.
272  returns number of waypoints loaded'''
273  f = open(filename, mode='r')
274  version_line = f.readline().strip()
275  if version_line == "QGC WPL 100":
276  readfn = self._read_waypoints_v100
277  elif version_line == "QGC WPL 110":
278  readfn = self._read_waypoints_v110
279  elif version_line == "QGC WPL PB 110":
280  readfn = self._read_waypoints_pb_110
281  else:
282  f.close()
283  raise MAVWPError("Unsupported waypoint format '%s'" % version_line)
284 
285  self.clear()
286  readfn(f)
287  f.close()
288 
289  return len(self.wpoints)
290 
291  def save_as_pb(self, filename):
292  mission = mission_pb2.Mission()
293  for w in self.wpoints:
294  waypoint = mission.waypoint.add()
295  waypoint.command = w.command
296  waypoint.frame = w.frame
297  waypoint.seq = w.seq
298  waypoint.current = w.current
299  waypoint.autocontinue = w.autocontinue
300  waypoint.param1 = w.param1
301  waypoint.param2 = w.param2
302  waypoint.param3 = w.param3
303  waypoint.param4 = w.param4
304  waypoint.x = w.x
305  waypoint.y = w.y
306  waypoint.z = w.z
307  with open(filename, 'w') as f:
308  f.write('QGC WPL PB 110\n')
309  f.write(text_format.MessageToString(mission))
310 
311  def save(self, filename):
312  '''save waypoints to a file'''
313  f = open(filename, mode='w')
314  f.write("QGC WPL 110\n")
315  for w in self.wpoints:
316  if getattr(w, 'comment', None):
317  f.write("# %s\n" % w.comment)
318  f.write("%u\t%u\t%u\t%u\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%u\n" % (
319  w.seq, w.current, w.frame, w.command,
320  w.param1, w.param2, w.param3, w.param4,
321  w.x, w.y, w.z, w.autocontinue))
322  f.close()
323 
324  def is_location_command(self, cmd):
325  '''see if cmd is a MAV_CMD with a latitide/longitude.
326  We check if it has Latitude and Longitude params in the right indexes'''
327  mav_cmd = mavutil.mavlink.enums['MAV_CMD']
328  if not cmd in mav_cmd:
329  return False
330  if not mav_cmd[cmd].param.get(5,'').startswith('Latitude'):
331  return False
332  if not mav_cmd[cmd].param.get(6,'').startswith('Longitude'):
333  return False
334  return True
335 
336 
337  def view_indexes(self, done=None):
338  '''return a list waypoint indexes in view order'''
339  ret = []
340  if done is None:
341  done = set()
342  idx = 0
343 
344  # find first point not done yet
345  while idx < self.count():
346  if not idx in done:
347  break
348  idx += 1
349 
350  exclusion_start = -1
351  exclusion_count = -1
352  inclusion_start = -1
353  inclusion_count = -1
354  while idx < self.count():
355  w = self.wp(idx)
356  if idx in done:
357  if w.x != 0 or w.y != 0:
358  ret.append(idx)
359  break
360  done.add(idx)
361  if w.command == mavutil.mavlink.MAV_CMD_DO_JUMP:
362  idx = int(w.param1)
363  w = self.wp(idx)
364  if w.x != 0 or w.y != 0:
365  ret.append(idx)
366  continue
367  # display loops for exclusion and inclusion zones
368  if w.command == mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION:
369  if exclusion_start == -1:
370  exclusion_count = int(w.param1)
371  exclusion_start = idx
372  if idx == exclusion_start + exclusion_count - 1:
373  ret.append(idx)
374  ret.append(exclusion_start)
375  return ret
376  if w.command == mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION:
377  if inclusion_start == -1:
378  inclusion_count = int(w.param1)
379  inclusion_start = idx
380  if idx == inclusion_start + inclusion_count - 1:
381  ret.append(idx)
382  ret.append(inclusion_start)
383  return ret
384  if (w.x != 0 or w.y != 0) and self.is_location_command(w.command):
385  ret.append(idx)
386  exc_zones = [mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION,
387  mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION]
388  w2 = self.wp(idx+1)
389  if w2 is not None and w.command not in exc_zones and w2.command in exc_zones:
390  # don't draw a line from last WP to first exc zone
391  return ret
392  idx += 1
393  return ret
394 
395  def polygon(self, done=None):
396  '''return a polygon for the waypoints'''
397  indexes = self.view_indexes(done)
398  points = []
399  for idx in indexes:
400  w = self.wp(idx)
401  if w.command in self.colour_for_polygon:
402  points.append((w.x, w.y, self.colour_for_polygon[w.command]))
403  else:
404  points.append((w.x, w.y))
405  return points
406 
407  def polygon_list(self):
408  '''return a list of polygons for the waypoints'''
409  done = set()
410  ret = []
411  while len(done) != self.count():
412  p = self.polygon(done)
413  if len(p) > 0:
414  ret.append(p)
415  return ret
416 
417  def view_list(self):
418  '''return a list of polygon indexes lists for the waypoints'''
419  done = set()
420  ret = []
421  while len(done) != self.count():
422  p = self.view_indexes(done)
423  if len(p) > 0:
424  ret.append(p)
425  return ret
426 
427 class MAVRallyError(Exception):
428  '''MAVLink rally point error class'''
429  def __init__(self, msg):
430  Exception.__init__(self, msg)
431  self.message = msg
432 
433 class MAVRallyLoader(object):
434  '''MAVLink Rally points and Rally Land ponts loader'''
435  def __init__(self, target_system=0, target_component=0):
436  self.rally_points = []
437  self.target_system = target_system
438  self.target_component = target_component
439  self.last_change = time.time()
440 
441  def rally_count(self):
442  '''return number of rally points'''
443  return len(self.rally_points)
444 
445  def rally_point(self, i):
446  '''return rally point i'''
447  return self.rally_points[i]
448 
449  def reindex(self):
450  '''reset counters and indexes'''
451  for i in range(self.rally_count()):
452  self.rally_points[i].count = self.rally_count()
453  self.rally_points[i].idx = i
454  self.last_change = time.time()
455 
456  def append_rally_point(self, p):
457  '''add rallypoint to end of list'''
458  if (self.rally_count() > 9):
459  print("Can't have more than 10 rally points, not adding.")
460  return
461 
462  self.rally_points.append(p)
463  self.reindex()
464 
465  def create_and_append_rally_point(self, lat, lon, alt, break_alt, land_dir, flags):
466  '''add a point via latitude/longitude'''
467  p = mavutil.mavlink.MAVLink_rally_point_message(self.target_system, self.target_component,
468  self.rally_count(), 0, lat, lon, alt, break_alt, land_dir, flags)
469  self.append_rally_point(p)
470 
471  def clear(self):
472  '''clear all point lists (rally and rally_land)'''
473  self.rally_points = []
474  self.last_change = time.time()
475 
476  def remove(self, i):
477  '''remove a rally point'''
478  if i < 1 or i > self.rally_count():
479  print("Invalid rally point number %u" % i)
480  self.rally_points.pop(i-1)
481  self.reindex()
482 
483  def move(self, i, lat, lng, change_time=True):
484  '''move a rally point'''
485  if i < 1 or i > self.rally_count():
486  print("Invalid rally point number %u" % i)
487  self.rally_points[i-1].lat = int(lat*1e7)
488  self.rally_points[i-1].lng = int(lng*1e7)
489  if change_time:
490  self.last_change = time.time()
491 
492  def set_alt(self, i, alt, break_alt=None, change_time=True):
493  '''set rally point altitude(s)'''
494  if i < 1 or i > self.rally_count():
495  print("Inavlid rally point number %u" % i)
496  return
497  self.rally_points[i-1].alt = int(alt)
498  if break_alt is not None:
499  self.rally_points[i-1].break_alt = break_alt
500  if change_time:
501  self.last_change = time.time()
502 
503  def load(self, filename):
504  '''load rally and rally_land points from a file.
505  returns number of points loaded'''
506  f = open(filename, mode='r')
507  self.clear()
508  for line in f:
509  if line.startswith('#'):
510  continue
511  line = line.strip()
512  if not line:
513  continue
514  a = line.split()
515  if len(a) != 7:
516  raise MAVRallyError("invalid rally file line: %s" % line)
517 
518  if (a[0].lower() == "rally"):
519  self.create_and_append_rally_point(float(a[1]) * 1e7, float(a[2]) * 1e7,
520  float(a[3]), float(a[4]), float(a[5]) * 100.0, int(a[6]))
521  f.close()
522  return len(self.rally_points)
523 
524  def save(self, filename):
525  '''save fence points to a file'''
526  f = open(filename, mode='w')
527  for p in self.rally_points:
528  f.write("RALLY %f\t%f\t%f\t%f\t%f\t%d\n" % (p.lat * 1e-7, p.lng * 1e-7, p.alt,
529  p.break_alt, p.land_dir, p.flags))
530  f.close()
531 
532 class MAVFenceError(Exception):
533  '''MAVLink fence error class'''
534  def __init__(self, msg):
535  Exception.__init__(self, msg)
536  self.message = msg
537 
538 class MAVFenceLoader(object):
539  '''MAVLink geo-fence loader'''
540  def __init__(self, target_system=0, target_component=0):
541  self.points = []
542  self.target_system = target_system
543  self.target_component = target_component
544  self.last_change = time.time()
545 
546  def count(self):
547  '''return number of points'''
548  return len(self.points)
549 
550  def point(self, i):
551  '''return a point'''
552  return self.points[i]
553 
554  def add(self, p):
555  '''add a point'''
556  self.points.append(p)
557  self.reindex()
558 
559  def reindex(self):
560  '''reindex waypoints'''
561  for i in range(self.count()):
562  w = self.points[i]
563  w.idx = i
564  w.count = self.count()
565  w.target_system = self.target_system
566  w.target_component = self.target_component
567  self.last_change = time.time()
568 
569  def add_latlon(self, lat, lon):
570  '''add a point via latitude/longitude'''
571  p = mavutil.mavlink.MAVLink_fence_point_message(self.target_system, self.target_component,
572  self.count(), 0, lat, lon)
573  self.add(p)
574 
575  def clear(self):
576  '''clear point list'''
577  self.points = []
578  self.last_change = time.time()
579 
580  def load(self, filename):
581  '''load points from a file.
582  returns number of points loaded'''
583  f = open(filename, mode='r')
584  self.clear()
585  for line in f:
586  if line.startswith('#'):
587  continue
588  line = line.strip()
589  if not line:
590  continue
591  a = line.split()
592  if len(a) != 2:
593  raise MAVFenceError("invalid fence point line: %s" % line)
594  self.add_latlon(float(a[0]), float(a[1]))
595  f.close()
596  return len(self.points)
597 
598  def save(self, filename):
599  '''save fence points to a file'''
600  f = open(filename, mode='w')
601  for p in self.points:
602  f.write("%f\t%f\n" % (p.lat, p.lng))
603  f.close()
604 
605  def move(self, i, lat, lng, change_time=True):
606  '''move a fence point'''
607  if i < 0 or i >= self.count():
608  print("Invalid fence point number %u" % i)
609  self.points[i].lat = lat
610  self.points[i].lng = lng
611  # ensure we close the polygon
612  if i == 1:
613  self.points[self.count()-1].lat = lat
614  self.points[self.count()-1].lng = lng
615  if i == self.count() - 1:
616  self.points[1].lat = lat
617  self.points[1].lng = lng
618  if change_time:
619  self.last_change = time.time()
620 
621  def remove(self, i, change_time=True):
622  '''remove a fence point'''
623  if i < 0 or i >= self.count():
624  print("Invalid fence point number %u" % i)
625  self.points.pop(i)
626  # ensure we close the polygon
627  if i == 1:
628  self.points[self.count()-1].lat = self.points[1].lat
629  self.points[self.count()-1].lng = self.points[1].lng
630  if i == self.count():
631  self.points[1].lat = self.points[self.count()-1].lat
632  self.points[1].lng = self.points[self.count()-1].lng
633  if change_time:
634  self.last_change = time.time()
635 
636  def polygon(self):
637  '''return a polygon for the fence'''
638  points = []
639  for fp in self.points[1:]:
640  points.append((fp.lat, fp.lng))
641  return points


mavlink
Author(s): Lorenz Meier
autogenerated on Sun Jul 7 2019 03:22:07