$search
00001 # Software License Agreement (BSD License) 00002 # 00003 # Copyright (c) 2009, Willow Garage, Inc. 00004 # All rights reserved. 00005 # 00006 # Redistribution and use in source and binary forms, with or without 00007 # modification, are permitted provided that the following conditions 00008 # are met: 00009 # 00010 # * Redistributions of source code must retain the above copyright 00011 # notice, this list of conditions and the following disclaimer. 00012 # * Redistributions in binary form must reproduce the above 00013 # copyright notice, this list of conditions and the following 00014 # disclaimer in the documentation and/or other materials provided 00015 # with the distribution. 00016 # * Neither the name of Willow Garage, Inc. nor the names of its 00017 # contributors may be used to endorse or promote products derived 00018 # from this software without specific prior written permission. 00019 # 00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00031 # POSSIBILITY OF SUCH DAMAGE. 00032 00033 from __future__ import with_statement 00034 00035 PKG = 'rxbag' 00036 import roslib; roslib.load_manifest(PKG) 00037 import rospy 00038 00039 import bisect 00040 import Queue 00041 import sys 00042 import threading 00043 import time 00044 00045 import wx 00046 00047 class TimelineCache(threading.Thread): 00048 def __init__(self, loader, listener=None, max_cache_size=100): 00049 threading.Thread.__init__(self) 00050 00051 self.loader = loader 00052 self.listener = listener 00053 self.stop_flag = False 00054 00055 self.lock = threading.RLock() 00056 00057 self.items = {} # topic -> [(timestamp, items), ...] 00058 self.last_accessed = {} # topic -> [(access time, timestamp), ...] 00059 self.item_access = {} # topic -> timestamp -> access time 00060 00061 self.max_cache_size = max_cache_size # max number of items to cache (per topic) 00062 00063 self.queue = Queue.Queue() 00064 00065 self.setDaemon(True) 00066 self.start() 00067 00068 self._debugging = False 00069 if self._debugging: 00070 self._debug_frame = wx.Frame(None, -1, 'TimelineCache debug') 00071 self._debug_text = wx.TextCtrl(self._debug_frame, style=wx.TE_READONLY | wx.NO_BORDER | wx.TE_MULTILINE) 00072 self._debug_text.SetPosition((1, 0)) 00073 self._debug_text.SetFont(wx.Font(7, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)) 00074 self._debug_frame.SetSize((400, 1200)) 00075 self._debug_frame.Show() 00076 00077 def run(self): 00078 while not self.stop_flag: 00079 # Get next item to load 00080 entry = self.queue.get() 00081 00082 # self used to signal a change in stop_flag 00083 if entry == self: 00084 continue 00085 00086 # Update debugging info (if enabled) 00087 if self._debugging: 00088 wx.CallAfter(self._update_debug) 00089 00090 # Check we haven't already cached it 00091 topic, stamp, time_threshold, item_details = entry 00092 00093 if not self.get_item(topic, stamp, time_threshold): 00094 # Load the item 00095 msg_stamp, item = self.loader(topic, stamp, item_details) 00096 if item: 00097 # Store in the cache 00098 self.cache_item(topic, msg_stamp, item) 00099 00100 if self.listener: 00101 self.listener(topic, msg_stamp, item) 00102 else: 00103 print 'Failed to load:', entry 00104 00105 self.queue.task_done() 00106 00107 def enqueue(self, entry): 00108 self.queue.put(entry) 00109 00110 def cache_item(self, topic, t, item): 00111 with self.lock: 00112 if topic not in self.items: 00113 self.items[topic] = [] 00114 topic_cache = self.items[topic] 00115 00116 cache_entry = (t.to_sec(), item) 00117 cache_index = bisect.bisect_right(topic_cache, cache_entry) 00118 topic_cache.insert(cache_index, cache_entry) 00119 00120 self._update_last_accessed(topic, t.to_sec()) 00121 00122 self._limit_cache() 00123 00124 def get_item(self, topic, stamp, time_threshold): 00125 with self.lock: 00126 # Attempt to get a item from the cache that's within time_threshold secs from stamp 00127 topic_cache = self.items.get(topic) 00128 if topic_cache: 00129 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1) 00130 00131 if cache_index <= len(topic_cache) - 1: 00132 # Get cache entry before (or at) timestamp, and entry after 00133 (cache_before_stamp, cache_before_item) = topic_cache[cache_index] 00134 if cache_index < len(topic_cache) - 1: 00135 cache_after_stamp, cache_after_item = topic_cache[cache_index + 1] 00136 else: 00137 cache_after_stamp = None 00138 00139 # Find closest entry 00140 cache_before_dist = abs(stamp - cache_before_stamp) 00141 if cache_after_stamp: 00142 cache_after_dist = abs(cache_after_stamp - stamp) 00143 00144 if cache_after_stamp and cache_after_dist < cache_before_dist: 00145 cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item 00146 else: 00147 cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item 00148 00149 # Check entry is close enough 00150 if cache_dist <= time_threshold: 00151 self._update_last_accessed(topic, cache_stamp) 00152 00153 return cache_item 00154 00155 return None 00156 00157 def _update_last_accessed(self, topic, stamp): 00158 """ 00159 Maintains a sorted list of cache accesses by timestamp for each topic. 00160 """ 00161 with self.lock: 00162 access_time = time.time() 00163 00164 if topic not in self.last_accessed: 00165 self.last_accessed[topic] = [(access_time, stamp)] 00166 self.item_access[topic] = { stamp: access_time } 00167 return 00168 00169 topic_last_accessed = self.last_accessed[topic] 00170 topic_item_access = self.item_access[topic] 00171 00172 if stamp in topic_item_access: 00173 last_access = topic_item_access[stamp] 00174 00175 index = bisect.bisect_left(topic_last_accessed, (last_access, None)) 00176 assert(topic_last_accessed[index][1] == stamp) 00177 00178 del topic_last_accessed[index] 00179 00180 topic_last_accessed.append((access_time, stamp)) 00181 topic_item_access[stamp] = access_time 00182 00183 def _limit_cache(self): 00184 """ 00185 Removes LRU's from cache until size of each topic's cache is <= max_cache_size. 00186 """ 00187 with self.lock: 00188 for topic, topic_cache in self.items.items(): 00189 while len(topic_cache) > self.max_cache_size: 00190 lru_stamp = self.last_accessed[topic][0][1] 00191 00192 cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None)) 00193 assert(topic_cache[cache_index][0] == lru_stamp) 00194 00195 del topic_cache[cache_index] 00196 del self.last_accessed[topic][0] 00197 del self.item_access[topic][lru_stamp] 00198 00199 def stop(self): 00200 self.stop_flag = True 00201 self.queue.put(self) 00202 00203 def _update_debug(self): 00204 try: 00205 s = '' 00206 00207 start_stamp = self.timeline.start_stamp.to_sec() 00208 00209 for topic, topic_cache in self.items.items(): 00210 s += topic + '\n' 00211 for t, _ in topic_cache: 00212 s += '%.5f\n' % (t - start_stamp) 00213 00214 s += 'Last Accessed:\n' 00215 topic_last_accessed = self.last_accessed[topic] 00216 for access_time, t in topic_last_accessed: 00217 s += '%.02f: %.5f\n' % (time.time() - access_time, t - start_stamp) 00218 00219 self._debug_text.SetValue(s) 00220 00221 except Exception: 00222 pass