36 from queue
import Queue
38 from Queue
import Queue
46 Caches items for timeline renderers 49 def __init__(self, loader, listener=None, max_cache_size=100):
50 threading.Thread.__init__(self)
55 self.
lock = threading.RLock()
67 entry = self.queue.get()
72 topic, stamp, time_threshold, item_details = entry
74 if not self.
get_item(topic, stamp, time_threshold):
76 msg_stamp, item = self.
loader(topic, stamp, item_details)
82 self.
listener(topic, msg_stamp, item)
88 self.queue.task_done()
95 if topic
not in self.
items:
96 self.
items[topic] = []
97 topic_cache = self.
items[topic]
99 cache_entry = (t.to_sec(), item)
100 cache_index = bisect.bisect_right(topic_cache, cache_entry)
101 topic_cache.insert(cache_index, cache_entry)
110 topic_cache = self.items.get(topic)
112 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp,
None)) - 1)
114 if cache_index <= len(topic_cache) - 1:
116 (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
117 if cache_index < len(topic_cache) - 1:
118 cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
120 cache_after_stamp =
None 123 cache_before_dist = abs(stamp - cache_before_stamp)
124 if cache_after_stamp:
125 cache_after_dist = abs(cache_after_stamp - stamp)
127 if cache_after_stamp
and cache_after_dist < cache_before_dist:
128 cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
130 cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
133 if cache_dist <= time_threshold:
140 Maintains a sorted list of cache accesses by timestamp for each topic. 143 access_time = time.time()
153 if stamp
in topic_item_access:
154 last_access = topic_item_access[stamp]
156 index = bisect.bisect_left(topic_last_accessed, (last_access,
None))
157 assert(topic_last_accessed[index][1] == stamp)
159 del topic_last_accessed[index]
161 topic_last_accessed.append((access_time, stamp))
162 topic_item_access[stamp] = access_time
166 Removes LRU's from cache until size of each topic's cache is <= max_cache_size. 169 for topic, topic_cache
in self.items.items():
173 cache_index = bisect.bisect_left(topic_cache, (lru_stamp,
None))
174 assert(topic_cache[cache_index][0] == lru_stamp)
176 del topic_cache[cache_index]
def get_item(self, topic, stamp, time_threshold)
def _update_last_accessed(self, topic, stamp)
def cache_item(self, topic, t, item)
def __init__(self, loader, listener=None, max_cache_size=100)