00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import bisect
00035 try:
00036 from queue import Queue
00037 except ImportError:
00038 from Queue import Queue
00039 import threading
00040 import time
00041
00042
00043 class TimelineCache(threading.Thread):
00044 """
00045 Caches items for timeline renderers
00046 """
00047 def __init__(self, loader, listener=None, max_cache_size=100):
00048 threading.Thread.__init__(self)
00049
00050 self.loader = loader
00051 self.listener = listener
00052 self.stop_flag = False
00053 self.lock = threading.RLock()
00054 self.items = {}
00055 self.last_accessed = {}
00056 self.item_access = {}
00057 self.max_cache_size = max_cache_size
00058 self.queue = Queue()
00059 self.setDaemon(True)
00060 self.start()
00061
00062 def run(self):
00063 while not self.stop_flag:
00064
00065 entry = self.queue.get()
00066
00067 if entry == self:
00068 continue
00069
00070 topic, stamp, time_threshold, item_details = entry
00071
00072 if not self.get_item(topic, stamp, time_threshold):
00073
00074 msg_stamp, item = self.loader(topic, stamp, item_details)
00075 if item:
00076
00077 self.cache_item(topic, msg_stamp, item)
00078
00079 if self.listener:
00080 self.listener(topic, msg_stamp, item)
00081
00082
00083
00084
00085
00086 self.queue.task_done()
00087
00088 def enqueue(self, entry):
00089 self.queue.put(entry)
00090
00091 def cache_item(self, topic, t, item):
00092 with self.lock:
00093 if topic not in self.items:
00094 self.items[topic] = []
00095 topic_cache = self.items[topic]
00096
00097 cache_entry = (t.to_sec(), item)
00098 cache_index = bisect.bisect_right(topic_cache, cache_entry)
00099 topic_cache.insert(cache_index, cache_entry)
00100
00101 self._update_last_accessed(topic, t.to_sec())
00102
00103 self._limit_cache()
00104
00105 def get_item(self, topic, stamp, time_threshold):
00106 with self.lock:
00107
00108 topic_cache = self.items.get(topic)
00109 if topic_cache:
00110 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
00111
00112 if cache_index <= len(topic_cache) - 1:
00113
00114 (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
00115 if cache_index < len(topic_cache) - 1:
00116 cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
00117 else:
00118 cache_after_stamp = None
00119
00120
00121 cache_before_dist = abs(stamp - cache_before_stamp)
00122 if cache_after_stamp:
00123 cache_after_dist = abs(cache_after_stamp - stamp)
00124
00125 if cache_after_stamp and cache_after_dist < cache_before_dist:
00126 cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
00127 else:
00128 cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
00129
00130
00131 if cache_dist <= time_threshold:
00132 self._update_last_accessed(topic, cache_stamp)
00133 return cache_item
00134 return None
00135
00136 def _update_last_accessed(self, topic, stamp):
00137 """
00138 Maintains a sorted list of cache accesses by timestamp for each topic.
00139 """
00140 with self.lock:
00141 access_time = time.time()
00142
00143 if topic not in self.last_accessed:
00144 self.last_accessed[topic] = [(access_time, stamp)]
00145 self.item_access[topic] = {stamp: access_time}
00146 return
00147
00148 topic_last_accessed = self.last_accessed[topic]
00149 topic_item_access = self.item_access[topic]
00150
00151 if stamp in topic_item_access:
00152 last_access = topic_item_access[stamp]
00153
00154 index = bisect.bisect_left(topic_last_accessed, (last_access, None))
00155 assert(topic_last_accessed[index][1] == stamp)
00156
00157 del topic_last_accessed[index]
00158
00159 topic_last_accessed.append((access_time, stamp))
00160 topic_item_access[stamp] = access_time
00161
00162 def _limit_cache(self):
00163 """
00164 Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
00165 """
00166 with self.lock:
00167 for topic, topic_cache in self.items.items():
00168 while len(topic_cache) > self.max_cache_size:
00169 lru_stamp = self.last_accessed[topic][0][1]
00170
00171 cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
00172 assert(topic_cache[cache_index][0] == lru_stamp)
00173
00174 del topic_cache[cache_index]
00175 del self.last_accessed[topic][0]
00176 del self.item_access[topic][lru_stamp]
00177
00178 def stop(self):
00179 self.stop_flag = True
00180 self.queue.put(self)