00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 from __future__ import with_statement
00034
00035 PKG = 'rxbag'
00036 import roslib; roslib.load_manifest(PKG)
00037 import rospy
00038
00039 import bisect
00040 import Queue
00041 import sys
00042 import threading
00043 import time
00044
00045 import wx
00046
00047 class TimelineCache(threading.Thread):
00048 def __init__(self, loader, listener=None, max_cache_size=100):
00049 threading.Thread.__init__(self)
00050
00051 self.loader = loader
00052 self.listener = listener
00053 self.stop_flag = False
00054
00055 self.lock = threading.RLock()
00056
00057 self.items = {}
00058 self.last_accessed = {}
00059 self.item_access = {}
00060
00061 self.max_cache_size = max_cache_size
00062
00063 self.queue = Queue.Queue()
00064
00065 self.setDaemon(True)
00066 self.start()
00067
00068 self._debugging = False
00069 if self._debugging:
00070 self._debug_frame = wx.Frame(None, -1, 'TimelineCache debug')
00071 self._debug_text = wx.TextCtrl(self._debug_frame, style=wx.TE_READONLY | wx.NO_BORDER | wx.TE_MULTILINE)
00072 self._debug_text.SetPosition((1, 0))
00073 self._debug_text.SetFont(wx.Font(7, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL))
00074 self._debug_frame.SetSize((400, 1200))
00075 self._debug_frame.Show()
00076
00077 def run(self):
00078 while not self.stop_flag:
00079
00080 entry = self.queue.get()
00081
00082
00083 if entry == self:
00084 continue
00085
00086
00087 if self._debugging:
00088 wx.CallAfter(self._update_debug)
00089
00090
00091 topic, stamp, time_threshold, item_details = entry
00092
00093 if not self.get_item(topic, stamp, time_threshold):
00094
00095 msg_stamp, item = self.loader(topic, stamp, item_details)
00096 if item:
00097
00098 self.cache_item(topic, msg_stamp, item)
00099
00100 if self.listener:
00101 self.listener(topic, msg_stamp, item)
00102 else:
00103 print 'Failed to load:', entry
00104
00105 self.queue.task_done()
00106
00107 def enqueue(self, entry):
00108 self.queue.put(entry)
00109
00110 def cache_item(self, topic, t, item):
00111 with self.lock:
00112 if topic not in self.items:
00113 self.items[topic] = []
00114 topic_cache = self.items[topic]
00115
00116 cache_entry = (t.to_sec(), item)
00117 cache_index = bisect.bisect_right(topic_cache, cache_entry)
00118 topic_cache.insert(cache_index, cache_entry)
00119
00120 self._update_last_accessed(topic, t.to_sec())
00121
00122 self._limit_cache()
00123
00124 def get_item(self, topic, stamp, time_threshold):
00125 with self.lock:
00126
00127 topic_cache = self.items.get(topic)
00128 if topic_cache:
00129 cache_index = max(0, bisect.bisect_right(topic_cache, (stamp, None)) - 1)
00130
00131 if cache_index <= len(topic_cache) - 1:
00132
00133 (cache_before_stamp, cache_before_item) = topic_cache[cache_index]
00134 if cache_index < len(topic_cache) - 1:
00135 cache_after_stamp, cache_after_item = topic_cache[cache_index + 1]
00136 else:
00137 cache_after_stamp = None
00138
00139
00140 cache_before_dist = abs(stamp - cache_before_stamp)
00141 if cache_after_stamp:
00142 cache_after_dist = abs(cache_after_stamp - stamp)
00143
00144 if cache_after_stamp and cache_after_dist < cache_before_dist:
00145 cache_dist, cache_stamp, cache_item = cache_after_dist, cache_after_stamp, cache_after_item
00146 else:
00147 cache_dist, cache_stamp, cache_item = cache_before_dist, cache_before_stamp, cache_before_item
00148
00149
00150 if cache_dist <= time_threshold:
00151 self._update_last_accessed(topic, cache_stamp)
00152
00153 return cache_item
00154
00155 return None
00156
00157 def _update_last_accessed(self, topic, stamp):
00158 """
00159 Maintains a sorted list of cache accesses by timestamp for each topic.
00160 """
00161 with self.lock:
00162 access_time = time.time()
00163
00164 if topic not in self.last_accessed:
00165 self.last_accessed[topic] = [(access_time, stamp)]
00166 self.item_access[topic] = { stamp: access_time }
00167 return
00168
00169 topic_last_accessed = self.last_accessed[topic]
00170 topic_item_access = self.item_access[topic]
00171
00172 if stamp in topic_item_access:
00173 last_access = topic_item_access[stamp]
00174
00175 index = bisect.bisect_left(topic_last_accessed, (last_access, None))
00176 assert(topic_last_accessed[index][1] == stamp)
00177
00178 del topic_last_accessed[index]
00179
00180 topic_last_accessed.append((access_time, stamp))
00181 topic_item_access[stamp] = access_time
00182
00183 def _limit_cache(self):
00184 """
00185 Removes LRU's from cache until size of each topic's cache is <= max_cache_size.
00186 """
00187 with self.lock:
00188 for topic, topic_cache in self.items.items():
00189 while len(topic_cache) > self.max_cache_size:
00190 lru_stamp = self.last_accessed[topic][0][1]
00191
00192 cache_index = bisect.bisect_left(topic_cache, (lru_stamp, None))
00193 assert(topic_cache[cache_index][0] == lru_stamp)
00194
00195 del topic_cache[cache_index]
00196 del self.last_accessed[topic][0]
00197 del self.item_access[topic][lru_stamp]
00198
00199 def stop(self):
00200 self.stop_flag = True
00201 self.queue.put(self)
00202
00203 def _update_debug(self):
00204 try:
00205 s = ''
00206
00207 start_stamp = self.timeline.start_stamp.to_sec()
00208
00209 for topic, topic_cache in self.items.items():
00210 s += topic + '\n'
00211 for t, _ in topic_cache:
00212 s += '%.5f\n' % (t - start_stamp)
00213
00214 s += 'Last Accessed:\n'
00215 topic_last_accessed = self.last_accessed[topic]
00216 for access_time, t in topic_last_accessed:
00217 s += '%.02f: %.5f\n' % (time.time() - access_time, t - start_stamp)
00218
00219 self._debug_text.SetValue(s)
00220
00221 except Exception:
00222 pass