00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 from __future__ import with_statement
00034
00035 PKG = 'rxbag_plugins'
00036 import roslib; roslib.load_manifest(PKG)
00037 import rospy
00038
00039 import bisect
00040 import collections
00041 import csv
00042 import itertools
00043 import sys
00044 import threading
00045 import time
00046
00047 from dataset import DataSet
00048
00049 class PlotDataLoader(threading.Thread):
00050 def __init__(self, timeline, topic):
00051 threading.Thread.__init__(self, target=self._run)
00052
00053 self._timeline = timeline
00054 self._topic = topic
00055
00056 self._start_stamp = self._timeline.start_stamp
00057 self._end_stamp = self._timeline.end_stamp
00058 self._paths = []
00059 self._max_interval = 0.0
00060 self._extension_fraction = 0.1
00061 self._min_reload_secs = 0.4
00062 self._use_header_stamp = False
00063
00064 self._dirty = True
00065 self._dirty_cv = threading.Condition()
00066 self._last_reload = None
00067 self._data = {}
00068 self._load_complete = False
00069
00070 self._progress_listeners = []
00071 self._complete_listeners = []
00072
00073 self._stop_flag = False
00074 self.setDaemon(True)
00075
00076 @property
00077 def data(self): return self._data
00078
00079 @property
00080 def is_load_complete(self): return self._load_complete
00081
00082
00083
00084 def add_progress_listener(self, listener): self._progress_listeners.append(listener)
00085 def remove_progress_listener(self, listener): self._progress_listeners.remove(listener)
00086
00087 def add_complete_listener(self, listener): self._complete_listeners.append(listener)
00088 def remove_complete_listener(self, listener): self._complete_listeners.remove(listener)
00089
00090 def invalidate(self):
00091 with self._dirty_cv:
00092 self._dirty = True
00093 self._dirty_cv.notify()
00094
00095
00096
00097 def _get_start_stamp(self): return self._start_stamp
00098
00099 def _set_start_stamp(self, start_stamp):
00100 with self._dirty_cv:
00101 if start_stamp != self._start_stamp:
00102 self._start_stamp = start_stamp
00103 self._dirty = True
00104 self._dirty_cv.notify()
00105
00106 start_stamp = property(_get_start_stamp, _set_start_stamp)
00107
00108
00109
00110 def _get_end_stamp(self): return self._end_stamp
00111
00112 def _set_end_stamp(self, end_stamp):
00113 with self._dirty_cv:
00114 if end_stamp != self._end_stamp:
00115 self._end_stamp = end_stamp
00116 self._dirty = True
00117 self._dirty_cv.notify()
00118
00119 end_stamp = property(_get_end_stamp, _set_end_stamp)
00120
00121 def set_interval(self, start_stamp, end_stamp):
00122 with self._dirty_cv:
00123 updated = False
00124 if start_stamp != self._start_stamp:
00125 self._start_stamp = start_stamp
00126 updated = True
00127 if end_stamp != self._end_stamp:
00128 self._end_stamp = end_stamp
00129 updated = True
00130 if updated:
00131 self._dirty = True
00132 self._dirty_cv.notify()
00133
00134
00135
00136 def _get_paths(self): return self._paths
00137
00138 def _set_paths(self, paths):
00139 with self._dirty_cv:
00140 if set(paths) != set(self._paths):
00141 self._data = {}
00142 self._paths = paths
00143 self._dirty = True
00144 self._dirty_cv.notify()
00145
00146 paths = property(_get_paths, _set_paths)
00147
00148
00149
00150 def _get_max_interval(self): return self._max_interval
00151
00152 def _set_max_interval(self, max_interval):
00153 with self._dirty_cv:
00154 if max_interval != self._max_interval:
00155 self._max_interval = max_interval
00156 self._dirty = True
00157 self._dirty_cv.notify()
00158
00159 max_interval = property(_get_max_interval, _set_max_interval)
00160
00161 def stop(self):
00162 self._stop_flag = True
00163 with self._dirty_cv:
00164 self._dirty_cv.notify()
00165 self.join()
00166
00167
00168
00169 def _run(self):
00170 entry_queue = []
00171
00172 while not self._stop_flag:
00173
00174 with self._dirty_cv:
00175 if self._dirty and (self._last_reload is None or time.time() - self._last_reload >= self._min_reload_secs):
00176 self._trim_data(self._extension_fraction, self._max_interval)
00177
00178 entry_queue = self._get_entries_to_load(self._extension_fraction, self._max_interval)
00179
00180 self._last_reload = time.time()
00181 self._load_complete = False
00182 self._dirty = False
00183
00184
00185 if len(entry_queue) == 0 or len(self._paths) == 0:
00186 self._load_complete = True
00187 for listener in itertools.chain(self._progress_listeners, self._complete_listeners):
00188 listener()
00189
00190
00191 with self._dirty_cv:
00192 if not self._dirty:
00193 self._dirty_cv.wait()
00194
00195 continue
00196
00197
00198 try:
00199 bag, entry = entry_queue.pop()
00200
00201 self._process(bag, entry)
00202 except Exception:
00203 pass
00204
00205 def _trim_data(self, extension_fraction=None, max_interval=None):
00206 """
00207 Toss out data outside of (extended) view range, and closer than max_interval seconds apart.
00208 """
00209 if extension_fraction is None:
00210 start_stamp = self._start_stamp
00211 end_stamp = self._end_stamp
00212 else:
00213 extension = rospy.Duration((self._end_stamp - self._start_stamp).to_sec() * extension_fraction)
00214 if extension.to_sec() >= self._start_stamp.to_sec():
00215 start_stamp = rospy.Time(0, 1)
00216 else:
00217 start_stamp = self._start_stamp - extension
00218 end_stamp = self._end_stamp + extension
00219
00220 min_x = (start_stamp - self._timeline.start_stamp).to_sec()
00221 max_x = (end_stamp - self._timeline.start_stamp).to_sec()
00222
00223 for series in list(self._data.keys()):
00224 points = self._data[series].points
00225 num_points = len(points)
00226
00227 trimmed_points = []
00228
00229 if num_points > 0 and points[0][0] < max_x and points[-1][0] > min_x:
00230 first_index = None
00231 last_x = None
00232 for i, (x, y) in enumerate(points):
00233 if x >= min_x:
00234 trimmed_points.append((x, y))
00235 first_index = i
00236 last_x = x
00237 break
00238
00239 if first_index is not None:
00240 for i, (x, y) in enumerate(points[first_index + 1:]):
00241 if x > max_x:
00242 break
00243
00244 if (max_interval is None) or (x - last_x >= max_interval):
00245 trimmed_points.append((x, y))
00246 last_x = x
00247
00248 new_data = DataSet()
00249 new_data.set(trimmed_points)
00250
00251 self._data[series] = new_data
00252
00253 def _get_entries_to_load(self, extension_fraction=None, max_interval=None):
00254 """
00255 Returns a list of (Bag, IndexEntry) tuples to load.
00256
00257 @param extension: extra proportion of the view range to load
00258 @param max_interval: maximum
00259 """
00260
00261 if extension_fraction is None:
00262 start_stamp = self._start_stamp
00263 end_stamp = self._end_stamp
00264 else:
00265 extension = rospy.Duration((self._end_stamp - self._start_stamp).to_sec() * extension_fraction)
00266 if extension.to_sec() >= self._start_stamp.to_sec():
00267 start_stamp = rospy.Time(0, 1)
00268 else:
00269 start_stamp = self._start_stamp - extension
00270 end_stamp = self._end_stamp + extension
00271
00272
00273 view_entries = list(self._timeline.get_entries_with_bags(self._topic, start_stamp, end_stamp))
00274
00275
00276 if max_interval is not None:
00277 spaced_entries = []
00278 max_interval_duration = rospy.Duration(max_interval)
00279
00280 last_time = None
00281 for bag, entry in view_entries:
00282 if last_time is None or (entry.time - last_time) > max_interval_duration:
00283 spaced_entries.append((bag, entry))
00284 last_time = entry.time
00285 else:
00286 spaced_entries = view_entries
00287
00288
00289 if max_interval is not None:
00290 far_entries = []
00291 loaded_xs = set()
00292 for series in list(self._data.keys()):
00293 for x, y in self._data[series].points:
00294 loaded_xs.add(x)
00295 loaded_xs = list(loaded_xs)
00296 loaded_xs.sort()
00297 loaded_stamps = [self._timeline.start_stamp + rospy.Duration(x) for x in loaded_xs]
00298 for bag, entry in spaced_entries:
00299 closest_stamp_left_index = bisect.bisect_left(loaded_stamps, entry.time) - 1
00300 if closest_stamp_left_index >= 0 and abs((entry.time - loaded_stamps[closest_stamp_left_index]).to_sec()) < max_interval / 2:
00301 continue
00302 closest_stamp_right_index = closest_stamp_left_index + 1
00303 if closest_stamp_right_index < len(loaded_stamps) - 1 and abs((entry.time - loaded_stamps[closest_stamp_right_index]).to_sec()) < max_interval / 2:
00304 continue
00305 far_entries.append((bag, entry))
00306 else:
00307 far_entries = spaced_entries
00308
00309
00310 bin_search_entries = []
00311 if len(far_entries) > 0:
00312 for i in _subdivide(0, len(far_entries) - 1):
00313 bin_search_entries.append(far_entries[i])
00314
00315 entry_queue = list(reversed(bin_search_entries))
00316
00317 return entry_queue
00318
00319 def _process(self, bag, entry):
00320
00321 _, msg, msg_stamp = self._timeline.read_message(bag, entry.position)
00322 if not msg:
00323 return False
00324
00325
00326 for path in self._paths:
00327
00328 try:
00329 y = eval('msg.' + path)
00330 except Exception:
00331 continue
00332
00333
00334 if self._use_header_stamp:
00335 if msg.__class__._has_header:
00336 header = msg.header
00337 else:
00338 header = _get_header(msg, path)
00339
00340 stamp = header.stamp
00341 else:
00342 stamp = msg_stamp
00343 x = (stamp - self._timeline.start_stamp).to_sec()
00344
00345
00346 if path not in self._data:
00347 self._data[path] = DataSet()
00348 self._data[path].add(x, y)
00349
00350
00351 for listener in self._progress_listeners:
00352 listener()
00353
00354 return True
00355
00356 def _get_header(msg, path):
00357 fields = path.split('.')
00358 if len(fields) <= 1:
00359 return None
00360
00361 parent_path = '.'.join(fields[:-1])
00362
00363 parent = eval('msg.' + parent_path)
00364
00365 for slot in parent.__slots__:
00366 subobj = getattr(parent, slot)
00367 if subobj is not None and hasattr(subobj, '_type') and getattr(subobj, '_type') == 'roslib/Header':
00368 return subobj
00369
00370 return _get_header(msg, parent_path)
00371
00372 def _subdivide(left, right):
00373 if left == right:
00374 yield left
00375 return
00376
00377 yield left
00378 yield right
00379
00380 intervals = collections.deque([(left, right)])
00381 while True:
00382 try:
00383 (left, right) = intervals.popleft()
00384 except Exception:
00385 break
00386
00387 mid = (left + right) / 2
00388
00389 if right - left <= 1:
00390 continue
00391
00392 yield mid
00393
00394 if right - left <= 2:
00395 continue
00396
00397 intervals.append((left, mid))
00398 intervals.append((mid, right))