timeline_cache.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2009, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 
34 import bisect
35 try:
36  from queue import Queue
37 except ImportError:
38  from Queue import Queue
39 import threading
40 import time
41 
42 
43 class TimelineCache(threading.Thread):
44 
45  """
46  Caches items for timeline renderers
47  """
48 
49  def __init__(self, loader, listener=None, max_cache_size=100):
50  threading.Thread.__init__(self)
51 
52  self.loader = loader
53  self.listener = listener
54  self.stop_flag = False
55  self.lock = threading.RLock()
56  self.items = {} # topic -> [(timestamp, items), ...]
57  self.last_accessed = {} # topic -> [(access time, timestamp), ...]
58  self.item_access = {} # topic -> timestamp -> access time
59  self.max_cache_size = max_cache_size # max number of items to cache (per topic)
60  self.queue = Queue()
61  self.setDaemon(True)
62  self.start()
63 
64  def run(self):
65  while not self.stop_flag:
66  # Get next item to load
67  entry = self.queue.get()
68  # self used to signal a change in stop_flag
69  if entry == self:
70  continue
71  # Check we haven't already cached it
72  topic, stamp, time_threshold, item_details = entry
73 
74  if not self.get_item(topic, stamp, time_threshold):
75  # Load the item
76  msg_stamp, item = self.loader(topic, stamp, item_details)
77  if item:
78  # Store in the cache
79  self.cache_item(topic, msg_stamp, item)
80 
81  if self.listener:
82  self.listener(topic, msg_stamp, item)
83 # else:
84 # try:
85 # qWarning('Failed to load:%s' % entry)
86 # except:
87 # qWarning('Failed to load cache item')
88  self.queue.task_done()
89 
90  def enqueue(self, entry):
91  self.queue.put(entry)
92 
93  def cache_item(self, topic, t, item):
94  with self.lock:
95  if topic not in self.items:
96  self.items[topic] = []
97  topic_cache = self.items[topic]
98 
99  cache_entry = (t.to_sec(), item)
100  cache_index = bisect.bisect_right(topic_cache, cache_entry)
101  topic_cache.insert(cache_index, cache_entry)
102 
103  self._update_last_accessed(topic, t.to_sec())
104 
105  self._limit_cache()
106 
107  def get_item(self, topic, stamp, time_threshold):
108  with self.lock:
109  # Attempt to get a item from the cache that's within time_threshold secs from stamp
110  topic_cache = self.items.get(topic)
111  if topic_cache:
112  cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
113 
114  if cache_index <= len(topic_cache) - 1:
115  # Get cache entry before (or at) timestamp, and entry after
116  (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
117  if cache_index < len(topic_cache) - 1:
118  cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
119  else:
120  cache_after_stamp = None
121 
122  # Find closest entry
123  cache_before_dist = abs(stamp - cache_before_stamp)
124  if cache_after_stamp:
125  cache_after_dist = abs(cache_after_stamp - stamp)
126 
127  if cache_after_stamp and cache_after_dist < cache_before_dist:
128  cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
129  else:
130  cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
131 
132  # Check entry is close enough
133  if cache_dist <= time_threshold:
134  self._update_last_accessed(topic, cache_stamp)
135  return cache_item
136  return None
137 
138  def _update_last_accessed(self, topic, stamp):
139  """
140  Maintains a sorted list of cache accesses by timestamp for each topic.
141  """
142  with self.lock:
143  access_time = time.time()
144 
145  if topic not in self.last_accessed:
146  self.last_accessed[topic] = [(access_time, stamp)]
147  self.item_access[topic] = {stamp: access_time}
148  return
149 
150  topic_last_accessed = self.last_accessed[topic]
151  topic_item_access = self.item_access[topic]
152 
153  if stamp in topic_item_access:
154  last_access = topic_item_access[stamp]
155 
156  index = bisect.bisect_left(topic_last_accessed, (last_access, None))
157  assert(topic_last_accessed[index][1] == stamp)
158 
159  del topic_last_accessed[index]
160 
161  topic_last_accessed.append((access_time, stamp))
162  topic_item_access[stamp] = access_time
163 
164  def _limit_cache(self):
165  """
166  Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
167  """
168  with self.lock:
169  for topic, topic_cache in self.items.items():
170  while len(topic_cache) > self.max_cache_size:
171  lru_stamp = self.last_accessed[topic][0][1]
172 
173  cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
174  assert(topic_cache[cache_index][0] == lru_stamp)
175 
176  del topic_cache[cache_index]
177  del self.last_accessed[topic][0]
178  del self.item_access[topic][lru_stamp]
179 
180  def stop(self):
181  self.stop_flag = True
182  self.queue.put(self)
def get_item(self, topic, stamp, time_threshold)
def _update_last_accessed(self, topic, stamp)
def cache_item(self, topic, t, item)
def __init__(self, loader, listener=None, max_cache_size=100)


rqt_bag
Author(s): Aaron Blasdel, Tim Field
autogenerated on Fri Jun 7 2019 22:05:54