Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 from threading import Thread, Condition
00034 from time import time
00035
00036 """ Sits between incoming messages from a subscription, and the outgoing
00037 publish method. Provides throttling / buffering capabilities.
00038
00039 When the parameters change, the handler may transition to a different kind
00040 of handler
00041 """
00042
00043
00044 class MessageHandler():
00045 def __init__(self, previous_handler=None, publish=None):
00046 if previous_handler:
00047 self.last_publish = previous_handler.last_publish
00048 self.throttle_rate = previous_handler.throttle_rate
00049 self.queue_length = previous_handler.queue_length
00050 self.publish = previous_handler.publish
00051 else:
00052 self.last_publish = 0
00053 self.throttle_rate = 0
00054 self.queue_length = 0
00055 self.publish = publish
00056
00057 def set_throttle_rate(self, throttle_rate):
00058 self.throttle_rate = throttle_rate / 1000.0
00059 return self.transition()
00060
00061 def set_queue_length(self, queue_length):
00062 self.queue_length = queue_length
00063 return self.transition()
00064
00065 def time_remaining(self):
00066 return max((self.last_publish + self.throttle_rate) - time(), 0)
00067
00068 def handle_message(self, msg):
00069 self.last_publish = time()
00070 self.publish(msg)
00071
00072 def transition(self):
00073 if self.throttle_rate == 0:
00074 return self
00075 elif self.queue_length == 0:
00076 return ThrottleMessageHandler(self)
00077 else:
00078 return QueueMessageHandler(self)
00079
00080 def finish(self):
00081 pass
00082
00083
00084 class ThrottleMessageHandler(MessageHandler):
00085
00086 def handle_message(self, msg):
00087 if self.time_remaining() == 0:
00088 MessageHandler.handle_message(self, msg)
00089
00090 def transition(self):
00091 if self.throttle_rate == 0:
00092 return MessageHandler(self)
00093 elif self.queue_length == 0:
00094 return self
00095 else:
00096 return QueueMessageHandler(self)
00097
00098 def finish(self):
00099 pass
00100
00101
00102 class QueueMessageHandler(MessageHandler, Thread):
00103
00104 def __init__(self, previous_handler):
00105 Thread.__init__(self)
00106 MessageHandler.__init__(self, previous_handler)
00107 self.daemon = True
00108 self.queue = []
00109 self.c = Condition()
00110 self.alive = True
00111 self.start()
00112
00113 def handle_message(self, msg):
00114 with self.c:
00115 should_notify = len(self.queue) == 0
00116 self.queue.append(msg)
00117 if len(self.queue) > self.queue_length:
00118 del self.queue[0:len(self.queue) - self.queue_length]
00119 if should_notify:
00120 self.c.notify()
00121
00122 def transition(self):
00123 if self.throttle_rate == 0:
00124 self.finish()
00125 return MessageHandler(self)
00126 elif self.queue_length == 0:
00127 self.finish()
00128 return ThrottleMessageHandler(self)
00129 else:
00130 with self.c:
00131 if len(self.queue) > self.queue_length:
00132 del self.queue[0:len(self.queue) - self.queue_length]
00133 self.c.notify()
00134 return self
00135
00136 def finish(self):
00137 """ If throttle was set to 0, this pushes all buffered messages """
00138
00139 with self.c:
00140 self.alive = False
00141 self.c.notify()
00142
00143 self.join()
00144
00145 def run(self):
00146 while self.alive:
00147 with self.c:
00148 while self.alive and (self.time_remaining() > 0 or len(self.queue) == 0):
00149 if len(self.queue) == 0:
00150 self.c.wait()
00151 else:
00152 self.c.wait(self.time_remaining())
00153 if self.alive and self.time_remaining() == 0 and len(self.queue) > 0:
00154 try:
00155 MessageHandler.handle_message(self, self.queue[0])
00156 except:
00157 pass
00158 del self.queue[0]
00159 while self.time_remaining() == 0 and len(self.queue) > 0:
00160 try:
00161 MessageHandler.handle_message(self, self.queue[0])
00162 except:
00163 pass
00164 del self.queue[0]