Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import threading
00034
00035
00036 class MessageLoaderThread(threading.Thread):
00037 """
00038 Waits for a new playhead position on the given topic, then loads the message at that position and notifies the view threads.
00039
00040 One thread per topic. Maintains a cache of recently loaded messages.
00041 """
00042 def __init__(self, timeline, topic):
00043 threading.Thread.__init__(self)
00044
00045 self.timeline = timeline
00046 self.topic = topic
00047
00048 self.bag_playhead_position = None
00049
00050 self._message_cache_capacity = 50
00051 self._message_cache = {}
00052 self._message_cache_keys = []
00053
00054 self._stop_flag = False
00055
00056 self.setDaemon(True)
00057 self.start()
00058
00059 def reset(self):
00060 self.bag_playhead_position = None
00061
00062 def run(self):
00063 while not self._stop_flag:
00064
00065 cv = self.timeline._playhead_positions_cvs[self.topic]
00066 with cv:
00067 while (self.topic not in self.timeline._playhead_positions) or (self.bag_playhead_position == self.timeline._playhead_positions[self.topic]):
00068 cv.wait()
00069 if self._stop_flag:
00070 return
00071 bag, playhead_position = self.timeline._playhead_positions[self.topic]
00072
00073 self.bag_playhead_position = (bag, playhead_position)
00074
00075
00076 if not self.timeline.has_listeners(self.topic):
00077 continue
00078
00079
00080 if playhead_position is None:
00081 msg_data = None
00082 else:
00083 msg_data = self._get_message(bag, playhead_position)
00084
00085
00086 messages_cv = self.timeline._messages_cvs[self.topic]
00087 with messages_cv:
00088 self.timeline._messages[self.topic] = (bag, msg_data)
00089 messages_cv.notify_all()
00090
00091 def _get_message(self, bag, position):
00092 key = '%s%s' % (bag.filename, str(position))
00093 if key in self._message_cache:
00094 return self._message_cache[key]
00095
00096 msg_data = self.timeline.read_message(bag, position)
00097
00098 self._message_cache[key] = msg_data
00099 self._message_cache_keys.append(key)
00100
00101 if len(self._message_cache) > self._message_cache_capacity:
00102 oldest_key = self._message_cache_keys[0]
00103 del self._message_cache[oldest_key]
00104 self._message_cache_keys.remove(oldest_key)
00105
00106 return msg_data
00107
00108 def stop(self):
00109 self._stop_flag = True
00110 cv = self.timeline._playhead_positions_cvs[self.topic]
00111 with cv:
00112 cv.notify_all()