33 from collections
import deque
34 from threading
import Thread, Condition
39 """ Sits between incoming messages from a subscription, and the outgoing 40 publish method. Provides throttling / buffering capabilities. 42 When the parameters change, the handler may transition to a different kind 48 def __init__(self, previous_handler=None, publish=None):
87 class ThrottleMessageHandler(MessageHandler):
91 MessageHandler.handle_message(self, msg)
105 class QueueMessageHandler(MessageHandler, Thread):
108 Thread.__init__(self)
109 MessageHandler.__init__(self, previous_handler)
118 should_notify = len(self.
queue) == 0
119 self.queue.append(msg)
132 old_queue = self.
queue 134 while len(old_queue) > 0:
135 self.queue.append(old_queue.popleft())
140 """ If throttle was set to 0, this pushes all buffered messages """ 152 if len(self.
queue) == 0:
157 msg = self.queue.popleft()
160 MessageHandler.handle_message(self, msg)
162 traceback.print_exc(file=sys.stderr)
165 MessageHandler.handle_message(self, self.queue.popleft())
167 traceback.print_exc(file=sys.stderr)
def __init__(self, previous_handler)
def handle_message(self, msg)
def set_throttle_rate(self, throttle_rate)
def handle_message(self, msg)
def __init__(self, previous_handler=None, publish=None)
def handle_message(self, msg)
def set_queue_length(self, queue_length)