timeline_cache.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2009, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 import rospy
00034 
00035 import bisect
00036 import Queue
00037 import sys
00038 import threading
00039 import time
00040 
00041 import wx
00042 
00043 class TimelineCache(threading.Thread):
00044     def __init__(self, loader, listener=None, max_cache_size=100):
00045         threading.Thread.__init__(self)
00046 
00047         self.loader    = loader
00048         self.listener  = listener
00049         self.stop_flag = False
00050 
00051         self.lock = threading.RLock()
00052 
00053         self.items         = {}     # topic -> [(timestamp, items), ...]
00054         self.last_accessed = {}     # topic -> [(access time, timestamp), ...]
00055         self.item_access   = {}     # topic -> timestamp -> access time
00056 
00057         self.max_cache_size = max_cache_size    # max number of items to cache (per topic)
00058 
00059         self.queue = Queue.Queue()
00060 
00061         self.setDaemon(True)
00062         self.start()
00063 
00064         self._debugging = False
00065         if self._debugging:
00066             self._debug_frame = wx.Frame(None, -1, 'TimelineCache debug')
00067             self._debug_text = wx.TextCtrl(self._debug_frame, style=wx.TE_READONLY | wx.NO_BORDER | wx.TE_MULTILINE)
00068             self._debug_text.SetPosition((1, 0))
00069             self._debug_text.SetFont(wx.Font(7, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL))
00070             self._debug_frame.SetSize((400, 1200))
00071             self._debug_frame.Show()
00072 
00073     def run(self):
00074         while not self.stop_flag:
00075             # Get next item to load
00076             entry = self.queue.get()
00077             
00078             # self used to signal a change in stop_flag
00079             if entry == self:
00080                 continue
00081 
00082             # Update debugging info (if enabled)
00083             if self._debugging:
00084                 wx.CallAfter(self._update_debug)
00085 
00086             # Check we haven't already cached it
00087             topic, stamp, time_threshold, item_details = entry
00088 
00089             if not self.get_item(topic, stamp, time_threshold):
00090                 # Load the item
00091                 msg_stamp, item = self.loader(topic, stamp, item_details)
00092                 if item:
00093                     # Store in the cache
00094                     self.cache_item(topic, msg_stamp, item)
00095 
00096                     if self.listener:
00097                         self.listener(topic, msg_stamp, item)
00098                 else:
00099                     print 'Failed to load:', entry
00100 
00101             self.queue.task_done()
00102         
00103     def enqueue(self, entry):
00104         self.queue.put(entry)
00105 
00106     def cache_item(self, topic, t, item):
00107         with self.lock:
00108             if topic not in self.items:
00109                 self.items[topic] = []
00110             topic_cache = self.items[topic]
00111     
00112             cache_entry = (t.to_sec(), item)
00113             cache_index = bisect.bisect_right(topic_cache, cache_entry)
00114             topic_cache.insert(cache_index, cache_entry)
00115 
00116             self._update_last_accessed(topic, t.to_sec())
00117     
00118             self._limit_cache()
00119 
00120     def get_item(self, topic, stamp, time_threshold):
00121         with self.lock:
00122             # Attempt to get a item from the cache that's within time_threshold secs from stamp
00123             topic_cache = self.items.get(topic)
00124             if topic_cache:
00125                 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
00126                 
00127                 if cache_index <= len(topic_cache) - 1:
00128                     # Get cache entry before (or at) timestamp, and entry after
00129                     (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
00130                     if cache_index < len(topic_cache) - 1:
00131                         cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
00132                     else:
00133                         cache_after_stamp = None
00134     
00135                     # Find closest entry
00136                     cache_before_dist = abs(stamp - cache_before_stamp)
00137                     if cache_after_stamp:
00138                         cache_after_dist = abs(cache_after_stamp - stamp)
00139                         
00140                     if cache_after_stamp and cache_after_dist < cache_before_dist:
00141                         cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
00142                     else:
00143                         cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
00144     
00145                     # Check entry is close enough
00146                     if cache_dist <= time_threshold:
00147                         self._update_last_accessed(topic, cache_stamp)
00148                         
00149                         return cache_item
00150     
00151             return None
00152 
00153     def _update_last_accessed(self, topic, stamp):
00154         """
00155         Maintains a sorted list of cache accesses by timestamp for each topic.
00156         """
00157         with self.lock:
00158             access_time = time.time()
00159     
00160             if topic not in self.last_accessed:
00161                 self.last_accessed[topic] = [(access_time, stamp)]
00162                 self.item_access[topic] = { stamp: access_time }
00163                 return
00164     
00165             topic_last_accessed = self.last_accessed[topic]
00166             topic_item_access = self.item_access[topic]
00167     
00168             if stamp in topic_item_access:
00169                 last_access = topic_item_access[stamp]
00170                 
00171                 index = bisect.bisect_left(topic_last_accessed, (last_access, None))
00172                 assert(topic_last_accessed[index][1] == stamp)
00173     
00174                 del topic_last_accessed[index]
00175     
00176             topic_last_accessed.append((access_time, stamp))
00177             topic_item_access[stamp] = access_time
00178 
00179     def _limit_cache(self):
00180         """
00181         Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
00182         """
00183         with self.lock:
00184             for topic, topic_cache in self.items.items():
00185                 while len(topic_cache) > self.max_cache_size:
00186                     lru_stamp = self.last_accessed[topic][0][1]
00187                     
00188                     cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
00189                     assert(topic_cache[cache_index][0] == lru_stamp)
00190     
00191                     del topic_cache[cache_index]
00192                     del self.last_accessed[topic][0]
00193                     del self.item_access[topic][lru_stamp]
00194 
00195     def stop(self):
00196         self.stop_flag = True
00197         self.queue.put(self)
00198 
00199     def _update_debug(self):
00200         try:
00201             s = ''
00202     
00203             start_stamp = self.timeline.start_stamp.to_sec()
00204     
00205             for topic, topic_cache in self.items.items():
00206                 s += topic + '\n'
00207                 for t, _ in topic_cache:
00208                     s += '%.5f\n' % (t - start_stamp)
00209 
00210                 s += 'Last Accessed:\n'
00211                 topic_last_accessed = self.last_accessed[topic]
00212                 for access_time, t in topic_last_accessed:
00213                     s += '%.02f: %.5f\n' % (time.time() - access_time, t - start_stamp)
00214     
00215             self._debug_text.SetValue(s)
00216             
00217         except Exception:
00218             pass


rxbag
Author(s): Tim Field
autogenerated on Mon Oct 6 2014 07:26:07