00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import bisect
00035 import Queue
00036 import threading
00037 import time
00038
00039
00040 class TimelineCache(threading.Thread):
00041 """
00042 Caches items for timeline renderers
00043 """
00044 def __init__(self, loader, listener=None, max_cache_size=100):
00045 threading.Thread.__init__(self)
00046
00047 self.loader = loader
00048 self.listener = listener
00049 self.stop_flag = False
00050 self.lock = threading.RLock()
00051 self.items = {}
00052 self.last_accessed = {}
00053 self.item_access = {}
00054 self.max_cache_size = max_cache_size
00055 self.queue = Queue.Queue()
00056 self.setDaemon(True)
00057 self.start()
00058
00059 def run(self):
00060 while not self.stop_flag:
00061
00062 entry = self.queue.get()
00063
00064 if entry == self:
00065 continue
00066
00067 topic, stamp, time_threshold, item_details = entry
00068
00069 if not self.get_item(topic, stamp, time_threshold):
00070
00071 msg_stamp, item = self.loader(topic, stamp, item_details)
00072 if item:
00073
00074 self.cache_item(topic, msg_stamp, item)
00075
00076 if self.listener:
00077 self.listener(topic, msg_stamp, item)
00078
00079
00080
00081
00082
00083 self.queue.task_done()
00084
00085 def enqueue(self, entry):
00086 self.queue.put(entry)
00087
00088 def cache_item(self, topic, t, item):
00089 with self.lock:
00090 if topic not in self.items:
00091 self.items[topic] = []
00092 topic_cache = self.items[topic]
00093
00094 cache_entry = (t.to_sec(), item)
00095 cache_index = bisect.bisect_right(topic_cache, cache_entry)
00096 topic_cache.insert(cache_index, cache_entry)
00097
00098 self._update_last_accessed(topic, t.to_sec())
00099
00100 self._limit_cache()
00101
00102 def get_item(self, topic, stamp, time_threshold):
00103 with self.lock:
00104
00105 topic_cache = self.items.get(topic)
00106 if topic_cache:
00107 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
00108
00109 if cache_index <= len(topic_cache) - 1:
00110
00111 (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
00112 if cache_index < len(topic_cache) - 1:
00113 cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
00114 else:
00115 cache_after_stamp = None
00116
00117
00118 cache_before_dist = abs(stamp - cache_before_stamp)
00119 if cache_after_stamp:
00120 cache_after_dist = abs(cache_after_stamp - stamp)
00121
00122 if cache_after_stamp and cache_after_dist < cache_before_dist:
00123 cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
00124 else:
00125 cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
00126
00127
00128 if cache_dist <= time_threshold:
00129 self._update_last_accessed(topic, cache_stamp)
00130 return cache_item
00131 return None
00132
00133 def _update_last_accessed(self, topic, stamp):
00134 """
00135 Maintains a sorted list of cache accesses by timestamp for each topic.
00136 """
00137 with self.lock:
00138 access_time = time.time()
00139
00140 if topic not in self.last_accessed:
00141 self.last_accessed[topic] = [(access_time, stamp)]
00142 self.item_access[topic] = {stamp: access_time}
00143 return
00144
00145 topic_last_accessed = self.last_accessed[topic]
00146 topic_item_access = self.item_access[topic]
00147
00148 if stamp in topic_item_access:
00149 last_access = topic_item_access[stamp]
00150
00151 index = bisect.bisect_left(topic_last_accessed, (last_access, None))
00152 assert(topic_last_accessed[index][1] == stamp)
00153
00154 del topic_last_accessed[index]
00155
00156 topic_last_accessed.append((access_time, stamp))
00157 topic_item_access[stamp] = access_time
00158
00159 def _limit_cache(self):
00160 """
00161 Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
00162 """
00163 with self.lock:
00164 for topic, topic_cache in self.items.items():
00165 while len(topic_cache) > self.max_cache_size:
00166 lru_stamp = self.last_accessed[topic][0][1]
00167
00168 cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
00169 assert(topic_cache[cache_index][0] == lru_stamp)
00170
00171 del topic_cache[cache_index]
00172 del self.last_accessed[topic][0]
00173 del self.item_access[topic][lru_stamp]
00174
00175 def stop(self):
00176 self.stop_flag = True
00177 self.queue.put(self)