timeline_cache.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2009, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 
00034 import bisect
00035 try:
00036     from queue import Queue
00037 except ImportError:
00038     from Queue import Queue
00039 import threading
00040 import time
00041 
00042 
00043 class TimelineCache(threading.Thread):
00044 
00045     """
00046     Caches items for timeline renderers
00047     """
00048 
00049     def __init__(self, loader, listener=None, max_cache_size=100):
00050         threading.Thread.__init__(self)
00051 
00052         self.loader = loader
00053         self.listener = listener
00054         self.stop_flag = False
00055         self.lock = threading.RLock()
00056         self.items = {}  # topic -> [(timestamp, items), ...]
00057         self.last_accessed = {}  # topic -> [(access time, timestamp), ...]
00058         self.item_access = {}  # topic -> timestamp -> access time
00059         self.max_cache_size = max_cache_size  # max number of items to cache (per topic)
00060         self.queue = Queue()
00061         self.setDaemon(True)
00062         self.start()
00063 
00064     def run(self):
00065         while not self.stop_flag:
00066             # Get next item to load
00067             entry = self.queue.get()
00068             # self used to signal a change in stop_flag
00069             if entry == self:
00070                 continue
00071             # Check we haven't already cached it
00072             topic, stamp, time_threshold, item_details = entry
00073 
00074             if not self.get_item(topic, stamp, time_threshold):
00075                 # Load the item
00076                 msg_stamp, item = self.loader(topic, stamp, item_details)
00077                 if item:
00078                     # Store in the cache
00079                     self.cache_item(topic, msg_stamp, item)
00080 
00081                     if self.listener:
00082                         self.listener(topic, msg_stamp, item)
00083 #                else:
00084 #                    try:
00085 #                        qWarning('Failed to load:%s' % entry)
00086 #                    except:
00087 #                        qWarning('Failed to load cache item')
00088             self.queue.task_done()
00089 
00090     def enqueue(self, entry):
00091         self.queue.put(entry)
00092 
00093     def cache_item(self, topic, t, item):
00094         with self.lock:
00095             if topic not in self.items:
00096                 self.items[topic] = []
00097             topic_cache = self.items[topic]
00098 
00099             cache_entry = (t.to_sec(), item)
00100             cache_index = bisect.bisect_right(topic_cache, cache_entry)
00101             topic_cache.insert(cache_index, cache_entry)
00102 
00103             self._update_last_accessed(topic, t.to_sec())
00104 
00105             self._limit_cache()
00106 
00107     def get_item(self, topic, stamp, time_threshold):
00108         with self.lock:
00109             # Attempt to get a item from the cache that's within time_threshold secs from stamp
00110             topic_cache = self.items.get(topic)
00111             if topic_cache:
00112                 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
00113 
00114                 if cache_index <= len(topic_cache) - 1:
00115                     # Get cache entry before (or at) timestamp, and entry after
00116                     (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
00117                     if cache_index < len(topic_cache) - 1:
00118                         cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
00119                     else:
00120                         cache_after_stamp = None
00121 
00122                     # Find closest entry
00123                     cache_before_dist = abs(stamp - cache_before_stamp)
00124                     if cache_after_stamp:
00125                         cache_after_dist = abs(cache_after_stamp - stamp)
00126 
00127                     if cache_after_stamp and cache_after_dist < cache_before_dist:
00128                         cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
00129                     else:
00130                         cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
00131 
00132                     # Check entry is close enough
00133                     if cache_dist <= time_threshold:
00134                         self._update_last_accessed(topic, cache_stamp)
00135                         return cache_item
00136             return None
00137 
00138     def _update_last_accessed(self, topic, stamp):
00139         """
00140         Maintains a sorted list of cache accesses by timestamp for each topic.
00141         """
00142         with self.lock:
00143             access_time = time.time()
00144 
00145             if topic not in self.last_accessed:
00146                 self.last_accessed[topic] = [(access_time, stamp)]
00147                 self.item_access[topic] = {stamp: access_time}
00148                 return
00149 
00150             topic_last_accessed = self.last_accessed[topic]
00151             topic_item_access = self.item_access[topic]
00152 
00153             if stamp in topic_item_access:
00154                 last_access = topic_item_access[stamp]
00155 
00156                 index = bisect.bisect_left(topic_last_accessed, (last_access, None))
00157                 assert(topic_last_accessed[index][1] == stamp)
00158 
00159                 del topic_last_accessed[index]
00160 
00161             topic_last_accessed.append((access_time, stamp))
00162             topic_item_access[stamp] = access_time
00163 
00164     def _limit_cache(self):
00165         """
00166         Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
00167         """
00168         with self.lock:
00169             for topic, topic_cache in self.items.items():
00170                 while len(topic_cache) > self.max_cache_size:
00171                     lru_stamp = self.last_accessed[topic][0][1]
00172 
00173                     cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
00174                     assert(topic_cache[cache_index][0] == lru_stamp)
00175 
00176                     del topic_cache[cache_index]
00177                     del self.last_accessed[topic][0]
00178                     del self.item_access[topic][lru_stamp]
00179 
00180     def stop(self):
00181         self.stop_flag = True
00182         self.queue.put(self)


rqt_bag
Author(s): Aaron Blasdel, Tim Field
autogenerated on Thu Jun 6 2019 18:52:48