33 from collections
import deque
34 from threading
import Thread, Condition
39 """ Sits between incoming messages from a subscription, and the outgoing 40 publish method. Provides throttling / buffering capabilities. 42 When the parameters change, the handler may transition to a different kind 48 def __init__(self, previous_handler=None, publish=None):
87 class ThrottleMessageHandler(MessageHandler):
91 MessageHandler.handle_message(self, msg)
105 class QueueMessageHandler(MessageHandler, Thread):
108 Thread.__init__(self)
109 MessageHandler.__init__(self, previous_handler)
121 should_notify = len(self.
queue) == 0
122 self.
queue.append(msg)
135 old_queue = self.
queue 137 while len(old_queue) > 0:
138 self.
queue.append(old_queue.popleft())
143 """ If throttle was set to 0, this pushes all buffered messages """ 156 if len(self.
queue) == 0:
161 msg = self.
queue.popleft()
164 MessageHandler.handle_message(self, msg)
166 traceback.print_exc(file=sys.stderr)
169 MessageHandler.handle_message(self, self.
queue.popleft())
171 traceback.print_exc(file=sys.stderr)
def finish(self, block=True)
def finish(self, block=True)
def finish(self, block=True)
def __init__(self, previous_handler)
def handle_message(self, msg)
def set_throttle_rate(self, throttle_rate)
def handle_message(self, msg)
def __init__(self, previous_handler=None, publish=None)
def handle_message(self, msg)
def set_queue_length(self, queue_length)