timeline_cache.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2009, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 
00034 import bisect
00035 try:
00036     from queue import Queue
00037 except ImportError:
00038     from Queue import Queue
00039 import threading
00040 import time
00041 
00042 
00043 class TimelineCache(threading.Thread):
00044     """
00045     Caches items for timeline renderers
00046     """
00047     def __init__(self, loader, listener=None, max_cache_size=100):
00048         threading.Thread.__init__(self)
00049 
00050         self.loader = loader
00051         self.listener = listener
00052         self.stop_flag = False
00053         self.lock = threading.RLock()
00054         self.items = {}  # topic -> [(timestamp, items), ...]
00055         self.last_accessed = {}  # topic -> [(access time, timestamp), ...]
00056         self.item_access = {}  # topic -> timestamp -> access time
00057         self.max_cache_size = max_cache_size  # max number of items to cache (per topic)
00058         self.queue = Queue()
00059         self.setDaemon(True)
00060         self.start()
00061 
00062     def run(self):
00063         while not self.stop_flag:
00064             # Get next item to load
00065             entry = self.queue.get()
00066             # self used to signal a change in stop_flag
00067             if entry == self:
00068                 continue
00069             # Check we haven't already cached it
00070             topic, stamp, time_threshold, item_details = entry
00071 
00072             if not self.get_item(topic, stamp, time_threshold):
00073                 # Load the item
00074                 msg_stamp, item = self.loader(topic, stamp, item_details)
00075                 if item:
00076                     # Store in the cache
00077                     self.cache_item(topic, msg_stamp, item)
00078 
00079                     if self.listener:
00080                         self.listener(topic, msg_stamp, item)
00081 #                else:
00082 #                    try:
00083 #                        qWarning('Failed to load:%s' % entry)
00084 #                    except:
00085 #                        qWarning('Failed to load cache item')
00086             self.queue.task_done()
00087 
00088     def enqueue(self, entry):
00089         self.queue.put(entry)
00090 
00091     def cache_item(self, topic, t, item):
00092         with self.lock:
00093             if topic not in self.items:
00094                 self.items[topic] = []
00095             topic_cache = self.items[topic]
00096 
00097             cache_entry = (t.to_sec(), item)
00098             cache_index = bisect.bisect_right(topic_cache, cache_entry)
00099             topic_cache.insert(cache_index, cache_entry)
00100 
00101             self._update_last_accessed(topic, t.to_sec())
00102 
00103             self._limit_cache()
00104 
00105     def get_item(self, topic, stamp, time_threshold):
00106         with self.lock:
00107             # Attempt to get a item from the cache that's within time_threshold secs from stamp
00108             topic_cache = self.items.get(topic)
00109             if topic_cache:
00110                 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
00111 
00112                 if cache_index <= len(topic_cache) - 1:
00113                     # Get cache entry before (or at) timestamp, and entry after
00114                     (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
00115                     if cache_index < len(topic_cache) - 1:
00116                         cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
00117                     else:
00118                         cache_after_stamp = None
00119 
00120                     # Find closest entry
00121                     cache_before_dist = abs(stamp - cache_before_stamp)
00122                     if cache_after_stamp:
00123                         cache_after_dist = abs(cache_after_stamp - stamp)
00124 
00125                     if cache_after_stamp and cache_after_dist < cache_before_dist:
00126                         cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
00127                     else:
00128                         cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
00129 
00130                     # Check entry is close enough
00131                     if cache_dist <= time_threshold:
00132                         self._update_last_accessed(topic, cache_stamp)
00133                         return cache_item
00134             return None
00135 
00136     def _update_last_accessed(self, topic, stamp):
00137         """
00138         Maintains a sorted list of cache accesses by timestamp for each topic.
00139         """
00140         with self.lock:
00141             access_time = time.time()
00142 
00143             if topic not in self.last_accessed:
00144                 self.last_accessed[topic] = [(access_time, stamp)]
00145                 self.item_access[topic] = {stamp: access_time}
00146                 return
00147 
00148             topic_last_accessed = self.last_accessed[topic]
00149             topic_item_access = self.item_access[topic]
00150 
00151             if stamp in topic_item_access:
00152                 last_access = topic_item_access[stamp]
00153 
00154                 index = bisect.bisect_left(topic_last_accessed, (last_access, None))
00155                 assert(topic_last_accessed[index][1] == stamp)
00156 
00157                 del topic_last_accessed[index]
00158 
00159             topic_last_accessed.append((access_time, stamp))
00160             topic_item_access[stamp] = access_time
00161 
00162     def _limit_cache(self):
00163         """
00164         Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
00165         """
00166         with self.lock:
00167             for topic, topic_cache in self.items.items():
00168                 while len(topic_cache) > self.max_cache_size:
00169                     lru_stamp = self.last_accessed[topic][0][1]
00170 
00171                     cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
00172                     assert(topic_cache[cache_index][0] == lru_stamp)
00173 
00174                     del topic_cache[cache_index]
00175                     del self.last_accessed[topic][0]
00176                     del self.item_access[topic][lru_stamp]
00177 
00178     def stop(self):
00179         self.stop_flag = True
00180         self.queue.put(self)


rqt_bag
Author(s): Aaron Blasdel, Tim Field
autogenerated on Thu Aug 17 2017 02:19:27