plot_data_loader.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2009, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 from __future__ import with_statement
00034 
00035 PKG = 'rxbag_plugins'
00036 import roslib; roslib.load_manifest(PKG)
00037 import rospy
00038 
00039 import bisect
00040 import collections
00041 import csv
00042 import itertools
00043 import sys
00044 import threading
00045 import time
00046 
00047 from dataset import DataSet
00048 
00049 class PlotDataLoader(threading.Thread):
00050     def __init__(self, timeline, topic):
00051         threading.Thread.__init__(self, target=self._run)
00052 
00053         self._timeline = timeline
00054         self._topic    = topic
00055 
00056         self._start_stamp        = self._timeline.start_stamp
00057         self._end_stamp          = self._timeline.end_stamp
00058         self._paths              = []
00059         self._max_interval       = 0.0
00060         self._extension_fraction = 0.1
00061         self._min_reload_secs    = 0.4                            # minimum time to wait before loading entries
00062         self._use_header_stamp   = False                          # use the timestamp from the header
00063 
00064         self._dirty         = True
00065         self._dirty_cv      = threading.Condition()
00066         self._last_reload   = None                           # time that entries were reloaded
00067         self._data          = {}
00068         self._load_complete = False
00069 
00070         self._progress_listeners = []
00071         self._complete_listeners = []
00072 
00073         self._stop_flag = False
00074         self.setDaemon(True)
00075 
00076     @property
00077     def data(self): return self._data
00078 
00079     @property
00080     def is_load_complete(self): return self._load_complete
00081 
00082     # listeners
00083 
00084     def add_progress_listener(self, listener):    self._progress_listeners.append(listener)
00085     def remove_progress_listener(self, listener): self._progress_listeners.remove(listener)
00086 
00087     def add_complete_listener(self, listener):    self._complete_listeners.append(listener)
00088     def remove_complete_listener(self, listener): self._complete_listeners.remove(listener)
00089 
00090     def invalidate(self):
00091         with self._dirty_cv:
00092             self._dirty = True
00093             self._dirty_cv.notify()
00094 
00095     # property: start_stamp
00096 
00097     def _get_start_stamp(self): return self._start_stamp
00098     
00099     def _set_start_stamp(self, start_stamp):
00100         with self._dirty_cv:
00101             if start_stamp != self._start_stamp:
00102                 self._start_stamp = start_stamp
00103                 self._dirty = True
00104                 self._dirty_cv.notify()
00105 
00106     start_stamp = property(_get_start_stamp, _set_start_stamp)
00107 
00108     # property: end_stamp
00109 
00110     def _get_end_stamp(self): return self._end_stamp
00111     
00112     def _set_end_stamp(self, end_stamp):
00113         with self._dirty_cv:
00114             if end_stamp != self._end_stamp:
00115                 self._end_stamp = end_stamp
00116                 self._dirty = True
00117                 self._dirty_cv.notify()
00118         
00119     end_stamp = property(_get_end_stamp, _set_end_stamp)
00120 
00121     def set_interval(self, start_stamp, end_stamp):
00122         with self._dirty_cv:
00123             updated = False
00124             if start_stamp != self._start_stamp:
00125                 self._start_stamp = start_stamp
00126                 updated = True
00127             if end_stamp != self._end_stamp:
00128                 self._end_stamp = end_stamp
00129                 updated = True
00130             if updated:
00131                 self._dirty = True
00132                 self._dirty_cv.notify()
00133 
00134     # property: paths
00135 
00136     def _get_paths(self): return self._paths
00137 
00138     def _set_paths(self, paths):
00139         with self._dirty_cv:
00140             if set(paths) != set(self._paths):
00141                 self._data = {}
00142                 self._paths = paths
00143                 self._dirty = True
00144                 self._dirty_cv.notify()
00145 
00146     paths = property(_get_paths, _set_paths)
00147 
00148     # property: max_interval
00149     
00150     def _get_max_interval(self): return self._max_interval
00151     
00152     def _set_max_interval(self, max_interval):
00153         with self._dirty_cv:
00154             if max_interval != self._max_interval:
00155                 self._max_interval = max_interval
00156                 self._dirty = True
00157                 self._dirty_cv.notify()
00158 
00159     max_interval = property(_get_max_interval, _set_max_interval)
00160 
00161     def stop(self):
00162         self._stop_flag = True
00163         with self._dirty_cv:
00164             self._dirty_cv.notify()
00165         self.join()
00166 
00167     ##
00168 
00169     def _run(self):
00170         entry_queue = []
00171         
00172         while not self._stop_flag:
00173             # If dirty, then throw away data which doesn't fit in current view, and regenerate the entries to load
00174             with self._dirty_cv:
00175                 if self._dirty and (self._last_reload is None or time.time() - self._last_reload >= self._min_reload_secs):
00176                     self._trim_data(self._extension_fraction, self._max_interval)
00177 
00178                     entry_queue = self._get_entries_to_load(self._extension_fraction, self._max_interval)
00179 
00180                     self._last_reload = time.time()
00181                     self._load_complete = False
00182                     self._dirty = False
00183 
00184             # Check to see if we've finished loading
00185             if len(entry_queue) == 0 or len(self._paths) == 0:
00186                 self._load_complete = True
00187                 for listener in itertools.chain(self._progress_listeners, self._complete_listeners):
00188                     listener()
00189 
00190                 # Wait for dirty flag to be set
00191                 with self._dirty_cv:
00192                     if not self._dirty:
00193                         self._dirty_cv.wait()
00194                         
00195                 continue
00196 
00197             # Pop the next entry to load, and process it (insert it into _data)
00198             try:
00199                 bag, entry = entry_queue.pop()
00200 
00201                 self._process(bag, entry)
00202             except Exception:
00203                 pass
00204 
00205     def _trim_data(self, extension_fraction=None, max_interval=None):
00206         """
00207         Toss out data outside of (extended) view range, and closer than max_interval seconds apart.
00208         """
00209         if extension_fraction is None:
00210             start_stamp = self._start_stamp
00211             end_stamp   = self._end_stamp
00212         else:
00213             extension = rospy.Duration((self._end_stamp - self._start_stamp).to_sec() * extension_fraction)
00214             if extension.to_sec() >= self._start_stamp.to_sec():
00215                 start_stamp = rospy.Time(0, 1)
00216             else:
00217                 start_stamp = self._start_stamp - extension
00218             end_stamp = self._end_stamp + extension
00219 
00220         min_x = (start_stamp - self._timeline.start_stamp).to_sec()
00221         max_x = (end_stamp   - self._timeline.start_stamp).to_sec()
00222 
00223         for series in list(self._data.keys()):
00224             points     = self._data[series].points
00225             num_points = len(points)
00226 
00227             trimmed_points = []
00228 
00229             if num_points > 0 and points[0][0] < max_x and points[-1][0] > min_x:
00230                 first_index = None
00231                 last_x = None
00232                 for i, (x, y) in enumerate(points):
00233                     if x >= min_x:
00234                         trimmed_points.append((x, y))
00235                         first_index = i
00236                         last_x = x
00237                         break
00238 
00239                 if first_index is not None:
00240                     for i, (x, y) in enumerate(points[first_index + 1:]):
00241                         if x > max_x:
00242                             break
00243 
00244                         if (max_interval is None) or (x - last_x >= max_interval):
00245                             trimmed_points.append((x, y))
00246                             last_x = x
00247 
00248             new_data = DataSet()
00249             new_data.set(trimmed_points)
00250 
00251             self._data[series] = new_data
00252 
00253     def _get_entries_to_load(self, extension_fraction=None, max_interval=None):
00254         """
00255         Returns a list of (Bag, IndexEntry) tuples to load.
00256 
00257         @param extension: extra proportion of the view range to load
00258         @param max_interval: maximum 
00259         """
00260         # Load data outside of the view range
00261         if extension_fraction is None:
00262                 start_stamp = self._start_stamp
00263                 end_stamp   = self._end_stamp
00264         else:
00265             extension = rospy.Duration((self._end_stamp - self._start_stamp).to_sec() * extension_fraction)
00266             if extension.to_sec() >= self._start_stamp.to_sec():
00267                 start_stamp = rospy.Time(0, 1)
00268             else:
00269                 start_stamp = self._start_stamp - extension
00270             end_stamp = self._end_stamp + extension
00271 
00272         # Get the entries
00273         view_entries = list(self._timeline.get_entries_with_bags(self._topic, start_stamp, end_stamp))
00274 
00275         # Toss out those entries too close to each other
00276         if max_interval is not None:
00277             spaced_entries = []
00278             max_interval_duration = rospy.Duration(max_interval)
00279             
00280             last_time = None
00281             for bag, entry in view_entries:
00282                 if last_time is None or (entry.time - last_time) > max_interval_duration:
00283                     spaced_entries.append((bag, entry))
00284                     last_time = entry.time
00285         else:
00286             spaced_entries = view_entries
00287 
00288         # Toss out entries too close to existing data
00289         if max_interval is not None:
00290             far_entries = []
00291             loaded_xs = set()
00292             for series in list(self._data.keys()):
00293                 for x, y in self._data[series].points:
00294                     loaded_xs.add(x)
00295             loaded_xs = list(loaded_xs)
00296             loaded_xs.sort()
00297             loaded_stamps = [self._timeline.start_stamp + rospy.Duration(x) for x in loaded_xs]
00298             for bag, entry in spaced_entries:
00299                 closest_stamp_left_index = bisect.bisect_left(loaded_stamps, entry.time) - 1
00300                 if closest_stamp_left_index >= 0 and abs((entry.time - loaded_stamps[closest_stamp_left_index]).to_sec()) < max_interval / 2:
00301                     continue
00302                 closest_stamp_right_index = closest_stamp_left_index + 1
00303                 if closest_stamp_right_index < len(loaded_stamps) - 1 and abs((entry.time - loaded_stamps[closest_stamp_right_index]).to_sec()) < max_interval / 2:
00304                     continue
00305                 far_entries.append((bag, entry))
00306         else:
00307             far_entries = spaced_entries
00308 
00309         # Reorder to load using bisection
00310         bin_search_entries = []
00311         if len(far_entries) > 0:
00312             for i in _subdivide(0, len(far_entries) - 1):
00313                 bin_search_entries.append(far_entries[i])
00314 
00315         entry_queue = list(reversed(bin_search_entries))  # reversed so that pop from front of list
00316         
00317         return entry_queue
00318 
00319     def _process(self, bag, entry):
00320         # Read the message from the bag file
00321         _, msg, msg_stamp = self._timeline.read_message(bag, entry.position)
00322         if not msg:
00323             return False
00324 
00325         # Extract the field data from the message
00326         for path in self._paths:
00327             # Get Y value
00328             try:
00329                 y = eval('msg.' + path)
00330             except Exception:
00331                 continue
00332 
00333             # Get X value
00334             if self._use_header_stamp:
00335                 if msg.__class__._has_header:
00336                     header = msg.header
00337                 else:
00338                     header = _get_header(msg, path)
00339 
00340                 stamp = header.stamp
00341             else:
00342                 stamp = msg_stamp
00343             x = (stamp - self._timeline.start_stamp).to_sec()
00344 
00345             # Store data
00346             if path not in self._data:
00347                 self._data[path] = DataSet()
00348             self._data[path].add(x, y)
00349 
00350         # Notify listeners of progress
00351         for listener in self._progress_listeners:
00352             listener()
00353 
00354         return True
00355 
00356 def _get_header(msg, path):
00357     fields = path.split('.')
00358     if len(fields) <= 1:
00359         return None
00360 
00361     parent_path = '.'.join(fields[:-1])
00362 
00363     parent = eval('msg.' + parent_path)
00364     
00365     for slot in parent.__slots__:
00366         subobj = getattr(parent, slot)
00367         if subobj is not None and hasattr(subobj, '_type') and getattr(subobj, '_type') == 'roslib/Header':
00368             return subobj
00369 
00370     return _get_header(msg, parent_path)
00371 
00372 def _subdivide(left, right):
00373     if left == right:
00374         yield left
00375         return
00376     
00377     yield left
00378     yield right
00379 
00380     intervals = collections.deque([(left, right)])
00381     while True:
00382         try:
00383             (left, right) = intervals.popleft()
00384         except Exception:
00385             break
00386 
00387         mid = (left + right) / 2
00388 
00389         if right - left <= 1:
00390             continue
00391 
00392         yield mid
00393 
00394         if right - left <= 2:
00395             continue
00396 
00397         intervals.append((left, mid))
00398         intervals.append((mid,  right))


rxbag_plugins
Author(s): Tim Field
autogenerated on Mon Jan 6 2014 11:54:21