$search
00001 # Software License Agreement (BSD License) 00002 # 00003 # Copyright (c) 2009, Willow Garage, Inc. 00004 # All rights reserved. 00005 # 00006 # Redistribution and use in source and binary forms, with or without 00007 # modification, are permitted provided that the following conditions 00008 # are met: 00009 # 00010 # * Redistributions of source code must retain the above copyright 00011 # notice, this list of conditions and the following disclaimer. 00012 # * Redistributions in binary form must reproduce the above 00013 # copyright notice, this list of conditions and the following 00014 # disclaimer in the documentation and/or other materials provided 00015 # with the distribution. 00016 # * Neither the name of Willow Garage, Inc. nor the names of its 00017 # contributors may be used to endorse or promote products derived 00018 # from this software without specific prior written permission. 00019 # 00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00031 # POSSIBILITY OF SUCH DAMAGE. 00032 00033 from __future__ import with_statement 00034 00035 PKG = 'rxbag_plugins' 00036 import roslib; roslib.load_manifest(PKG) 00037 import rospy 00038 00039 import bisect 00040 import collections 00041 import csv 00042 import itertools 00043 import sys 00044 import threading 00045 import time 00046 00047 from dataset import DataSet 00048 00049 class PlotDataLoader(threading.Thread): 00050 def __init__(self, timeline, topic): 00051 threading.Thread.__init__(self, target=self._run) 00052 00053 self._timeline = timeline 00054 self._topic = topic 00055 00056 self._start_stamp = self._timeline.start_stamp 00057 self._end_stamp = self._timeline.end_stamp 00058 self._paths = [] 00059 self._max_interval = 0.0 00060 self._extension_fraction = 0.1 00061 self._min_reload_secs = 0.4 # minimum time to wait before loading entries 00062 self._use_header_stamp = False # use the timestamp from the header 00063 00064 self._dirty = True 00065 self._dirty_cv = threading.Condition() 00066 self._last_reload = None # time that entries were reloaded 00067 self._data = {} 00068 self._load_complete = False 00069 00070 self._progress_listeners = [] 00071 self._complete_listeners = [] 00072 00073 self._stop_flag = False 00074 self.setDaemon(True) 00075 00076 @property 00077 def data(self): return self._data 00078 00079 @property 00080 def is_load_complete(self): return self._load_complete 00081 00082 # listeners 00083 00084 def add_progress_listener(self, listener): self._progress_listeners.append(listener) 00085 def remove_progress_listener(self, listener): self._progress_listeners.remove(listener) 00086 00087 def add_complete_listener(self, listener): self._complete_listeners.append(listener) 00088 def remove_complete_listener(self, listener): self._complete_listeners.remove(listener) 00089 00090 def invalidate(self): 00091 with self._dirty_cv: 00092 self._dirty = True 00093 self._dirty_cv.notify() 00094 00095 # property: start_stamp 00096 00097 def _get_start_stamp(self): return self._start_stamp 00098 00099 def _set_start_stamp(self, start_stamp): 00100 with self._dirty_cv: 00101 if start_stamp != self._start_stamp: 00102 self._start_stamp = start_stamp 00103 self._dirty = True 00104 self._dirty_cv.notify() 00105 00106 start_stamp = property(_get_start_stamp, _set_start_stamp) 00107 00108 # property: end_stamp 00109 00110 def _get_end_stamp(self): return self._end_stamp 00111 00112 def _set_end_stamp(self, end_stamp): 00113 with self._dirty_cv: 00114 if end_stamp != self._end_stamp: 00115 self._end_stamp = end_stamp 00116 self._dirty = True 00117 self._dirty_cv.notify() 00118 00119 end_stamp = property(_get_end_stamp, _set_end_stamp) 00120 00121 def set_interval(self, start_stamp, end_stamp): 00122 with self._dirty_cv: 00123 updated = False 00124 if start_stamp != self._start_stamp: 00125 self._start_stamp = start_stamp 00126 updated = True 00127 if end_stamp != self._end_stamp: 00128 self._end_stamp = end_stamp 00129 updated = True 00130 if updated: 00131 self._dirty = True 00132 self._dirty_cv.notify() 00133 00134 # property: paths 00135 00136 def _get_paths(self): return self._paths 00137 00138 def _set_paths(self, paths): 00139 with self._dirty_cv: 00140 if set(paths) != set(self._paths): 00141 self._data = {} 00142 self._paths = paths 00143 self._dirty = True 00144 self._dirty_cv.notify() 00145 00146 paths = property(_get_paths, _set_paths) 00147 00148 # property: max_interval 00149 00150 def _get_max_interval(self): return self._max_interval 00151 00152 def _set_max_interval(self, max_interval): 00153 with self._dirty_cv: 00154 if max_interval != self._max_interval: 00155 self._max_interval = max_interval 00156 self._dirty = True 00157 self._dirty_cv.notify() 00158 00159 max_interval = property(_get_max_interval, _set_max_interval) 00160 00161 def stop(self): 00162 self._stop_flag = True 00163 with self._dirty_cv: 00164 self._dirty_cv.notify() 00165 self.join() 00166 00167 ## 00168 00169 def _run(self): 00170 entry_queue = [] 00171 00172 while not self._stop_flag: 00173 # If dirty, then throw away data which doesn't fit in current view, and regenerate the entries to load 00174 with self._dirty_cv: 00175 if self._dirty and (self._last_reload is None or time.time() - self._last_reload >= self._min_reload_secs): 00176 self._trim_data(self._extension_fraction, self._max_interval) 00177 00178 entry_queue = self._get_entries_to_load(self._extension_fraction, self._max_interval) 00179 00180 self._last_reload = time.time() 00181 self._load_complete = False 00182 self._dirty = False 00183 00184 # Check to see if we've finished loading 00185 if len(entry_queue) == 0 or len(self._paths) == 0: 00186 self._load_complete = True 00187 for listener in itertools.chain(self._progress_listeners, self._complete_listeners): 00188 listener() 00189 00190 # Wait for dirty flag to be set 00191 with self._dirty_cv: 00192 if not self._dirty: 00193 self._dirty_cv.wait() 00194 00195 continue 00196 00197 # Pop the next entry to load, and process it (insert it into _data) 00198 try: 00199 bag, entry = entry_queue.pop() 00200 00201 self._process(bag, entry) 00202 except Exception: 00203 pass 00204 00205 def _trim_data(self, extension_fraction=None, max_interval=None): 00206 """ 00207 Toss out data outside of (extended) view range, and closer than max_interval seconds apart. 00208 """ 00209 if extension_fraction is None: 00210 start_stamp = self._start_stamp 00211 end_stamp = self._end_stamp 00212 else: 00213 extension = rospy.Duration((self._end_stamp - self._start_stamp).to_sec() * extension_fraction) 00214 if extension.to_sec() >= self._start_stamp.to_sec(): 00215 start_stamp = rospy.Time(0, 1) 00216 else: 00217 start_stamp = self._start_stamp - extension 00218 end_stamp = self._end_stamp + extension 00219 00220 min_x = (start_stamp - self._timeline.start_stamp).to_sec() 00221 max_x = (end_stamp - self._timeline.start_stamp).to_sec() 00222 00223 for series in list(self._data.keys()): 00224 points = self._data[series].points 00225 num_points = len(points) 00226 00227 trimmed_points = [] 00228 00229 if num_points > 0 and points[0][0] < max_x and points[-1][0] > min_x: 00230 first_index = None 00231 last_x = None 00232 for i, (x, y) in enumerate(points): 00233 if x >= min_x: 00234 trimmed_points.append((x, y)) 00235 first_index = i 00236 last_x = x 00237 break 00238 00239 if first_index is not None: 00240 for i, (x, y) in enumerate(points[first_index + 1:]): 00241 if x > max_x: 00242 break 00243 00244 if (max_interval is None) or (x - last_x >= max_interval): 00245 trimmed_points.append((x, y)) 00246 last_x = x 00247 00248 new_data = DataSet() 00249 new_data.set(trimmed_points) 00250 00251 self._data[series] = new_data 00252 00253 def _get_entries_to_load(self, extension_fraction=None, max_interval=None): 00254 """ 00255 Returns a list of (Bag, IndexEntry) tuples to load. 00256 00257 @param extension: extra proportion of the view range to load 00258 @param max_interval: maximum 00259 """ 00260 # Load data outside of the view range 00261 if extension_fraction is None: 00262 start_stamp = self._start_stamp 00263 end_stamp = self._end_stamp 00264 else: 00265 extension = rospy.Duration((self._end_stamp - self._start_stamp).to_sec() * extension_fraction) 00266 if extension.to_sec() >= self._start_stamp.to_sec(): 00267 start_stamp = rospy.Time(0, 1) 00268 else: 00269 start_stamp = self._start_stamp - extension 00270 end_stamp = self._end_stamp + extension 00271 00272 # Get the entries 00273 view_entries = list(self._timeline.get_entries_with_bags(self._topic, start_stamp, end_stamp)) 00274 00275 # Toss out those entries too close to each other 00276 if max_interval is not None: 00277 spaced_entries = [] 00278 max_interval_duration = rospy.Duration(max_interval) 00279 00280 last_time = None 00281 for bag, entry in view_entries: 00282 if last_time is None or (entry.time - last_time) > max_interval_duration: 00283 spaced_entries.append((bag, entry)) 00284 last_time = entry.time 00285 else: 00286 spaced_entries = view_entries 00287 00288 # Toss out entries too close to existing data 00289 if max_interval is not None: 00290 far_entries = [] 00291 loaded_xs = set() 00292 for series in list(self._data.keys()): 00293 for x, y in self._data[series].points: 00294 loaded_xs.add(x) 00295 loaded_xs = list(loaded_xs) 00296 loaded_xs.sort() 00297 loaded_stamps = [self._timeline.start_stamp + rospy.Duration(x) for x in loaded_xs] 00298 for bag, entry in spaced_entries: 00299 closest_stamp_left_index = bisect.bisect_left(loaded_stamps, entry.time) - 1 00300 if closest_stamp_left_index >= 0 and abs((entry.time - loaded_stamps[closest_stamp_left_index]).to_sec()) < max_interval / 2: 00301 continue 00302 closest_stamp_right_index = closest_stamp_left_index + 1 00303 if closest_stamp_right_index < len(loaded_stamps) - 1 and abs((entry.time - loaded_stamps[closest_stamp_right_index]).to_sec()) < max_interval / 2: 00304 continue 00305 far_entries.append((bag, entry)) 00306 else: 00307 far_entries = spaced_entries 00308 00309 # Reorder to load using bisection 00310 bin_search_entries = [] 00311 if len(far_entries) > 0: 00312 for i in _subdivide(0, len(far_entries) - 1): 00313 bin_search_entries.append(far_entries[i]) 00314 00315 entry_queue = list(reversed(bin_search_entries)) # reversed so that pop from front of list 00316 00317 return entry_queue 00318 00319 def _process(self, bag, entry): 00320 # Read the message from the bag file 00321 _, msg, msg_stamp = self._timeline.read_message(bag, entry.position) 00322 if not msg: 00323 return False 00324 00325 # Extract the field data from the message 00326 for path in self._paths: 00327 # Get Y value 00328 try: 00329 y = eval('msg.' + path) 00330 except Exception: 00331 continue 00332 00333 # Get X value 00334 if self._use_header_stamp: 00335 if msg.__class__._has_header: 00336 header = msg.header 00337 else: 00338 header = _get_header(msg, path) 00339 00340 stamp = header.stamp 00341 else: 00342 stamp = msg_stamp 00343 x = (stamp - self._timeline.start_stamp).to_sec() 00344 00345 # Store data 00346 if path not in self._data: 00347 self._data[path] = DataSet() 00348 self._data[path].add(x, y) 00349 00350 # Notify listeners of progress 00351 for listener in self._progress_listeners: 00352 listener() 00353 00354 return True 00355 00356 def _get_header(msg, path): 00357 fields = path.split('.') 00358 if len(fields) <= 1: 00359 return None 00360 00361 parent_path = '.'.join(fields[:-1]) 00362 00363 parent = eval('msg.' + parent_path) 00364 00365 for slot in parent.__slots__: 00366 subobj = getattr(parent, slot) 00367 if subobj is not None and hasattr(subobj, '_type') and getattr(subobj, '_type') == 'roslib/Header': 00368 return subobj 00369 00370 return _get_header(msg, parent_path) 00371 00372 def _subdivide(left, right): 00373 if left == right: 00374 yield left 00375 return 00376 00377 yield left 00378 yield right 00379 00380 intervals = collections.deque([(left, right)]) 00381 while True: 00382 try: 00383 (left, right) = intervals.popleft() 00384 except Exception: 00385 break 00386 00387 mid = (left + right) / 2 00388 00389 if right - left <= 1: 00390 continue 00391 00392 yield mid 00393 00394 if right - left <= 2: 00395 continue 00396 00397 intervals.append((left, mid)) 00398 intervals.append((mid, right))