timeline_cache.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2009, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 
00034 import bisect
00035 import Queue
00036 import threading
00037 import time
00038 
00039 
00040 class TimelineCache(threading.Thread):
00041     """
00042     Caches items for timeline renderers
00043     """
00044     def __init__(self, loader, listener=None, max_cache_size=100):
00045         threading.Thread.__init__(self)
00046 
00047         self.loader = loader
00048         self.listener = listener
00049         self.stop_flag = False
00050         self.lock = threading.RLock()
00051         self.items = {}  # topic -> [(timestamp, items), ...]
00052         self.last_accessed = {}  # topic -> [(access time, timestamp), ...]
00053         self.item_access = {}  # topic -> timestamp -> access time
00054         self.max_cache_size = max_cache_size  # max number of items to cache (per topic)
00055         self.queue = Queue.Queue()
00056         self.setDaemon(True)
00057         self.start()
00058 
00059     def run(self):
00060         while not self.stop_flag:
00061             # Get next item to load
00062             entry = self.queue.get()
00063             # self used to signal a change in stop_flag
00064             if entry == self:
00065                 continue
00066             # Check we haven't already cached it
00067             topic, stamp, time_threshold, item_details = entry
00068 
00069             if not self.get_item(topic, stamp, time_threshold):
00070                 # Load the item
00071                 msg_stamp, item = self.loader(topic, stamp, item_details)
00072                 if item:
00073                     # Store in the cache
00074                     self.cache_item(topic, msg_stamp, item)
00075 
00076                     if self.listener:
00077                         self.listener(topic, msg_stamp, item)
00078 #                else:
00079 #                    try:
00080 #                        qWarning('Failed to load:%s' % entry)
00081 #                    except:
00082 #                        qWarning('Failed to load cache item')
00083             self.queue.task_done()
00084 
00085     def enqueue(self, entry):
00086         self.queue.put(entry)
00087 
00088     def cache_item(self, topic, t, item):
00089         with self.lock:
00090             if topic not in self.items:
00091                 self.items[topic] = []
00092             topic_cache = self.items[topic]
00093 
00094             cache_entry = (t.to_sec(), item)
00095             cache_index = bisect.bisect_right(topic_cache, cache_entry)
00096             topic_cache.insert(cache_index, cache_entry)
00097 
00098             self._update_last_accessed(topic, t.to_sec())
00099 
00100             self._limit_cache()
00101 
00102     def get_item(self, topic, stamp, time_threshold):
00103         with self.lock:
00104             # Attempt to get a item from the cache that's within time_threshold secs from stamp
00105             topic_cache = self.items.get(topic)
00106             if topic_cache:
00107                 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
00108 
00109                 if cache_index <= len(topic_cache) - 1:
00110                     # Get cache entry before (or at) timestamp, and entry after
00111                     (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
00112                     if cache_index < len(topic_cache) - 1:
00113                         cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
00114                     else:
00115                         cache_after_stamp = None
00116 
00117                     # Find closest entry
00118                     cache_before_dist = abs(stamp - cache_before_stamp)
00119                     if cache_after_stamp:
00120                         cache_after_dist = abs(cache_after_stamp - stamp)
00121 
00122                     if cache_after_stamp and cache_after_dist < cache_before_dist:
00123                         cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
00124                     else:
00125                         cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
00126 
00127                     # Check entry is close enough
00128                     if cache_dist <= time_threshold:
00129                         self._update_last_accessed(topic, cache_stamp)
00130                         return cache_item
00131             return None
00132 
00133     def _update_last_accessed(self, topic, stamp):
00134         """
00135         Maintains a sorted list of cache accesses by timestamp for each topic.
00136         """
00137         with self.lock:
00138             access_time = time.time()
00139 
00140             if topic not in self.last_accessed:
00141                 self.last_accessed[topic] = [(access_time, stamp)]
00142                 self.item_access[topic] = {stamp: access_time}
00143                 return
00144 
00145             topic_last_accessed = self.last_accessed[topic]
00146             topic_item_access = self.item_access[topic]
00147 
00148             if stamp in topic_item_access:
00149                 last_access = topic_item_access[stamp]
00150 
00151                 index = bisect.bisect_left(topic_last_accessed, (last_access, None))
00152                 assert(topic_last_accessed[index][1] == stamp)
00153 
00154                 del topic_last_accessed[index]
00155 
00156             topic_last_accessed.append((access_time, stamp))
00157             topic_item_access[stamp] = access_time
00158 
00159     def _limit_cache(self):
00160         """
00161         Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
00162         """
00163         with self.lock:
00164             for topic, topic_cache in self.items.items():
00165                 while len(topic_cache) > self.max_cache_size:
00166                     lru_stamp = self.last_accessed[topic][0][1]
00167 
00168                     cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
00169                     assert(topic_cache[cache_index][0] == lru_stamp)
00170 
00171                     del topic_cache[cache_index]
00172                     del self.last_accessed[topic][0]
00173                     del self.item_access[topic][lru_stamp]
00174 
00175     def stop(self):
00176         self.stop_flag = True
00177         self.queue.put(self)


rqt_bag
Author(s): Aaron Blasdel
autogenerated on Fri Jan 3 2014 11:55:06