00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import bisect
00035 try:
00036 from queue import Queue
00037 except ImportError:
00038 from Queue import Queue
00039 import threading
00040 import time
00041
00042
00043 class TimelineCache(threading.Thread):
00044
00045 """
00046 Caches items for timeline renderers
00047 """
00048
00049 def __init__(self, loader, listener=None, max_cache_size=100):
00050 threading.Thread.__init__(self)
00051
00052 self.loader = loader
00053 self.listener = listener
00054 self.stop_flag = False
00055 self.lock = threading.RLock()
00056 self.items = {}
00057 self.last_accessed = {}
00058 self.item_access = {}
00059 self.max_cache_size = max_cache_size
00060 self.queue = Queue()
00061 self.setDaemon(True)
00062 self.start()
00063
00064 def run(self):
00065 while not self.stop_flag:
00066
00067 entry = self.queue.get()
00068
00069 if entry == self:
00070 continue
00071
00072 topic, stamp, time_threshold, item_details = entry
00073
00074 if not self.get_item(topic, stamp, time_threshold):
00075
00076 msg_stamp, item = self.loader(topic, stamp, item_details)
00077 if item:
00078
00079 self.cache_item(topic, msg_stamp, item)
00080
00081 if self.listener:
00082 self.listener(topic, msg_stamp, item)
00083
00084
00085
00086
00087
00088 self.queue.task_done()
00089
00090 def enqueue(self, entry):
00091 self.queue.put(entry)
00092
00093 def cache_item(self, topic, t, item):
00094 with self.lock:
00095 if topic not in self.items:
00096 self.items[topic] = []
00097 topic_cache = self.items[topic]
00098
00099 cache_entry = (t.to_sec(), item)
00100 cache_index = bisect.bisect_right(topic_cache, cache_entry)
00101 topic_cache.insert(cache_index, cache_entry)
00102
00103 self._update_last_accessed(topic, t.to_sec())
00104
00105 self._limit_cache()
00106
00107 def get_item(self, topic, stamp, time_threshold):
00108 with self.lock:
00109
00110 topic_cache = self.items.get(topic)
00111 if topic_cache:
00112 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
00113
00114 if cache_index <= len(topic_cache) - 1:
00115
00116 (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
00117 if cache_index < len(topic_cache) - 1:
00118 cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
00119 else:
00120 cache_after_stamp = None
00121
00122
00123 cache_before_dist = abs(stamp - cache_before_stamp)
00124 if cache_after_stamp:
00125 cache_after_dist = abs(cache_after_stamp - stamp)
00126
00127 if cache_after_stamp and cache_after_dist < cache_before_dist:
00128 cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
00129 else:
00130 cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
00131
00132
00133 if cache_dist <= time_threshold:
00134 self._update_last_accessed(topic, cache_stamp)
00135 return cache_item
00136 return None
00137
00138 def _update_last_accessed(self, topic, stamp):
00139 """
00140 Maintains a sorted list of cache accesses by timestamp for each topic.
00141 """
00142 with self.lock:
00143 access_time = time.time()
00144
00145 if topic not in self.last_accessed:
00146 self.last_accessed[topic] = [(access_time, stamp)]
00147 self.item_access[topic] = {stamp: access_time}
00148 return
00149
00150 topic_last_accessed = self.last_accessed[topic]
00151 topic_item_access = self.item_access[topic]
00152
00153 if stamp in topic_item_access:
00154 last_access = topic_item_access[stamp]
00155
00156 index = bisect.bisect_left(topic_last_accessed, (last_access, None))
00157 assert(topic_last_accessed[index][1] == stamp)
00158
00159 del topic_last_accessed[index]
00160
00161 topic_last_accessed.append((access_time, stamp))
00162 topic_item_access[stamp] = access_time
00163
00164 def _limit_cache(self):
00165 """
00166 Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
00167 """
00168 with self.lock:
00169 for topic, topic_cache in self.items.items():
00170 while len(topic_cache) > self.max_cache_size:
00171 lru_stamp = self.last_accessed[topic][0][1]
00172
00173 cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
00174 assert(topic_cache[cache_index][0] == lru_stamp)
00175
00176 del topic_cache[cache_index]
00177 del self.last_accessed[topic][0]
00178 del self.item_access[topic][lru_stamp]
00179
00180 def stop(self):
00181 self.stop_flag = True
00182 self.queue.put(self)