00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import rospy
00034
00035 import bisect
00036 import Queue
00037 import sys
00038 import threading
00039 import time
00040
00041 import wx
00042
00043 class TimelineCache(threading.Thread):
00044 def __init__(self, loader, listener=None, max_cache_size=100):
00045 threading.Thread.__init__(self)
00046
00047 self.loader = loader
00048 self.listener = listener
00049 self.stop_flag = False
00050
00051 self.lock = threading.RLock()
00052
00053 self.items = {}
00054 self.last_accessed = {}
00055 self.item_access = {}
00056
00057 self.max_cache_size = max_cache_size
00058
00059 self.queue = Queue.Queue()
00060
00061 self.setDaemon(True)
00062 self.start()
00063
00064 self._debugging = False
00065 if self._debugging:
00066 self._debug_frame = wx.Frame(None, -1, 'TimelineCache debug')
00067 self._debug_text = wx.TextCtrl(self._debug_frame, style=wx.TE_READONLY | wx.NO_BORDER | wx.TE_MULTILINE)
00068 self._debug_text.SetPosition((1, 0))
00069 self._debug_text.SetFont(wx.Font(7, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL))
00070 self._debug_frame.SetSize((400, 1200))
00071 self._debug_frame.Show()
00072
00073 def run(self):
00074 while not self.stop_flag:
00075
00076 entry = self.queue.get()
00077
00078
00079 if entry == self:
00080 continue
00081
00082
00083 if self._debugging:
00084 wx.CallAfter(self._update_debug)
00085
00086
00087 topic, stamp, time_threshold, item_details = entry
00088
00089 if not self.get_item(topic, stamp, time_threshold):
00090
00091 msg_stamp, item = self.loader(topic, stamp, item_details)
00092 if item:
00093
00094 self.cache_item(topic, msg_stamp, item)
00095
00096 if self.listener:
00097 self.listener(topic, msg_stamp, item)
00098 else:
00099 print 'Failed to load:', entry
00100
00101 self.queue.task_done()
00102
00103 def enqueue(self, entry):
00104 self.queue.put(entry)
00105
00106 def cache_item(self, topic, t, item):
00107 with self.lock:
00108 if topic not in self.items:
00109 self.items[topic] = []
00110 topic_cache = self.items[topic]
00111
00112 cache_entry = (t.to_sec(), item)
00113 cache_index = bisect.bisect_right(topic_cache, cache_entry)
00114 topic_cache.insert(cache_index, cache_entry)
00115
00116 self._update_last_accessed(topic, t.to_sec())
00117
00118 self._limit_cache()
00119
00120 def get_item(self, topic, stamp, time_threshold):
00121 with self.lock:
00122
00123 topic_cache = self.items.get(topic)
00124 if topic_cache:
00125 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
00126
00127 if cache_index <= len(topic_cache) - 1:
00128
00129 (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
00130 if cache_index < len(topic_cache) - 1:
00131 cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
00132 else:
00133 cache_after_stamp = None
00134
00135
00136 cache_before_dist = abs(stamp - cache_before_stamp)
00137 if cache_after_stamp:
00138 cache_after_dist = abs(cache_after_stamp - stamp)
00139
00140 if cache_after_stamp and cache_after_dist < cache_before_dist:
00141 cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
00142 else:
00143 cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
00144
00145
00146 if cache_dist <= time_threshold:
00147 self._update_last_accessed(topic, cache_stamp)
00148
00149 return cache_item
00150
00151 return None
00152
00153 def _update_last_accessed(self, topic, stamp):
00154 """
00155 Maintains a sorted list of cache accesses by timestamp for each topic.
00156 """
00157 with self.lock:
00158 access_time = time.time()
00159
00160 if topic not in self.last_accessed:
00161 self.last_accessed[topic] = [(access_time, stamp)]
00162 self.item_access[topic] = { stamp: access_time }
00163 return
00164
00165 topic_last_accessed = self.last_accessed[topic]
00166 topic_item_access = self.item_access[topic]
00167
00168 if stamp in topic_item_access:
00169 last_access = topic_item_access[stamp]
00170
00171 index = bisect.bisect_left(topic_last_accessed, (last_access, None))
00172 assert(topic_last_accessed[index][1] == stamp)
00173
00174 del topic_last_accessed[index]
00175
00176 topic_last_accessed.append((access_time, stamp))
00177 topic_item_access[stamp] = access_time
00178
00179 def _limit_cache(self):
00180 """
00181 Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
00182 """
00183 with self.lock:
00184 for topic, topic_cache in self.items.items():
00185 while len(topic_cache) > self.max_cache_size:
00186 lru_stamp = self.last_accessed[topic][0][1]
00187
00188 cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
00189 assert(topic_cache[cache_index][0] == lru_stamp)
00190
00191 del topic_cache[cache_index]
00192 del self.last_accessed[topic][0]
00193 del self.item_access[topic][lru_stamp]
00194
00195 def stop(self):
00196 self.stop_flag = True
00197 self.queue.put(self)
00198
00199 def _update_debug(self):
00200 try:
00201 s = ''
00202
00203 start_stamp = self.timeline.start_stamp.to_sec()
00204
00205 for topic, topic_cache in self.items.items():
00206 s += topic + '\n'
00207 for t, _ in topic_cache:
00208 s += '%.5f\n' % (t - start_stamp)
00209
00210 s += 'Last Accessed:\n'
00211 topic_last_accessed = self.last_accessed[topic]
00212 for access_time, t in topic_last_accessed:
00213 s += '%.02f: %.5f\n' % (time.time() - access_time, t - start_stamp)
00214
00215 self._debug_text.SetValue(s)
00216
00217 except Exception:
00218 pass