Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import threading
00034
00035
00036 class MessageLoaderThread(threading.Thread):
00037
00038 """
00039 Waits for a new playhead position on the given topic, then loads the message at that position and notifies the view threads.
00040
00041 One thread per topic. Maintains a cache of recently loaded messages.
00042 """
00043
00044 def __init__(self, timeline, topic):
00045 threading.Thread.__init__(self)
00046
00047 self.timeline = timeline
00048 self.topic = topic
00049
00050 self.bag_playhead_position = None
00051
00052 self._message_cache_capacity = 50
00053 self._message_cache = {}
00054 self._message_cache_keys = []
00055
00056 self._stop_flag = False
00057
00058 self.setDaemon(True)
00059 self.start()
00060
00061 def reset(self):
00062 self.bag_playhead_position = None
00063
00064 def run(self):
00065 while not self._stop_flag:
00066
00067 cv = self.timeline._playhead_positions_cvs[self.topic]
00068 with cv:
00069 while (self.topic not in self.timeline._playhead_positions) or (self.bag_playhead_position == self.timeline._playhead_positions[self.topic]):
00070 cv.wait()
00071 if self._stop_flag:
00072 return
00073 bag, playhead_position = self.timeline._playhead_positions[self.topic]
00074
00075 self.bag_playhead_position = (bag, playhead_position)
00076
00077
00078 if not self.timeline.has_listeners(self.topic):
00079 continue
00080
00081
00082 if playhead_position is None:
00083 msg_data = None
00084 else:
00085 msg_data = self._get_message(bag, playhead_position)
00086
00087
00088 messages_cv = self.timeline._messages_cvs[self.topic]
00089 with messages_cv:
00090 self.timeline._messages[self.topic] = (bag, msg_data)
00091 messages_cv.notify_all()
00092
00093 def _get_message(self, bag, position):
00094 key = '%s%s' % (bag.filename, str(position))
00095 if key in self._message_cache:
00096 return self._message_cache[key]
00097
00098 msg_data = self.timeline.read_message(bag, position)
00099
00100 self._message_cache[key] = msg_data
00101 self._message_cache_keys.append(key)
00102
00103 if len(self._message_cache) > self._message_cache_capacity:
00104 oldest_key = self._message_cache_keys[0]
00105 del self._message_cache[oldest_key]
00106 self._message_cache_keys.remove(oldest_key)
00107
00108 return msg_data
00109
00110 def stop(self):
00111 self._stop_flag = True
00112 cv = self.timeline._playhead_positions_cvs[self.topic]
00113 with cv:
00114 cv.notify_all()