33 from collections
import deque
34 from threading
import Thread, Condition
39 """ Sits between incoming messages from a subscription, and the outgoing
40 publish method. Provides throttling / buffering capabilities.
42 When the parameters change, the handler may transition to a different kind
48 def __init__(self, previous_handler=None, publish=None):
87 class ThrottleMessageHandler(MessageHandler):
91 MessageHandler.handle_message(self, msg)
105 class QueueMessageHandler(MessageHandler, Thread):
108 Thread.__init__(self)
109 MessageHandler.__init__(self, previous_handler)
121 should_notify = len(self.
queue) == 0
122 self.
queue.append(msg)
135 old_queue = self.
queue
137 while len(old_queue) > 0:
138 self.
queue.append(old_queue.popleft())
143 """ If throttle was set to 0, this pushes all buffered messages """
156 if len(self.
queue) == 0:
161 msg = self.
queue.popleft()
164 MessageHandler.handle_message(self, msg)
166 traceback.print_exc(file=sys.stderr)
169 MessageHandler.handle_message(self, self.
queue.popleft())
171 traceback.print_exc(file=sys.stderr)