subscription_modifiers.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 from threading import Thread, Condition
34 from time import time
35 
36 """ Sits between incoming messages from a subscription, and the outgoing
37 publish method. Provides throttling / buffering capabilities.
38 
39 When the parameters change, the handler may transition to a different kind
40 of handler
41 """
42 
43 
45  def __init__(self, previous_handler=None, publish=None):
46  if previous_handler:
47  self.last_publish = previous_handler.last_publish
48  self.throttle_rate = previous_handler.throttle_rate
49  self.queue_length = previous_handler.queue_length
50  self.publish = previous_handler.publish
51  else:
52  self.last_publish = 0
53  self.throttle_rate = 0
54  self.queue_length = 0
55  self.publish = publish
56 
57  def set_throttle_rate(self, throttle_rate):
58  self.throttle_rate = throttle_rate / 1000.0
59  return self.transition()
60 
61  def set_queue_length(self, queue_length):
62  self.queue_length = queue_length
63  return self.transition()
64 
65  def time_remaining(self):
66  return max((self.last_publish + self.throttle_rate) - time(), 0)
67 
68  def handle_message(self, msg):
69  self.last_publish = time()
70  self.publish(msg)
71 
72  def transition(self):
73  if self.throttle_rate == 0 and self.queue_length == 0:
74  return self
75  elif self.queue_length == 0:
76  return ThrottleMessageHandler(self)
77  else:
78  return QueueMessageHandler(self)
79 
80  def finish(self):
81  pass
82 
83 
84 class ThrottleMessageHandler(MessageHandler):
85 
86  def handle_message(self, msg):
87  if self.time_remaining() == 0:
88  MessageHandler.handle_message(self, msg)
89 
90  def transition(self):
91  if self.throttle_rate == 0 and self.queue_length == 0:
92  return MessageHandler(self)
93  elif self.queue_length == 0:
94  return self
95  else:
96  return QueueMessageHandler(self)
97 
98  def finish(self):
99  pass
100 
101 
102 class QueueMessageHandler(MessageHandler, Thread):
103 
104  def __init__(self, previous_handler):
105  Thread.__init__(self)
106  MessageHandler.__init__(self, previous_handler)
107  self.daemon = True
108  self.queue = []
109  self.c = Condition()
110  self.alive = True
111  self.start()
112 
113  def handle_message(self, msg):
114  with self.c:
115  should_notify = len(self.queue) == 0
116  self.queue.append(msg)
117  if len(self.queue) > self.queue_length:
118  del self.queue[0:len(self.queue) - self.queue_length]
119  if should_notify:
120  self.c.notify()
121 
122  def transition(self):
123  if self.throttle_rate == 0 and self.queue_length == 0:
124  self.finish()
125  return MessageHandler(self)
126  elif self.queue_length == 0:
127  self.finish()
128  return ThrottleMessageHandler(self)
129  else:
130  with self.c:
131  if len(self.queue) > self.queue_length:
132  del self.queue[0:len(self.queue) - self.queue_length]
133  self.c.notify()
134  return self
135 
136  def finish(self):
137  """ If throttle was set to 0, this pushes all buffered messages """
138  # Notify the thread to finish
139  with self.c:
140  self.alive = False
141  self.c.notify()
142 
143  self.join()
144 
145  def run(self):
146  while self.alive:
147  with self.c:
148  while self.alive and (self.time_remaining() > 0 or len(self.queue) == 0):
149  if len(self.queue) == 0:
150  self.c.wait()
151  else:
152  self.c.wait(self.time_remaining())
153  if self.alive and self.time_remaining() == 0 and len(self.queue) > 0:
154  try:
155  MessageHandler.handle_message(self, self.queue[0])
156  except:
157  pass
158  del self.queue[0]
159  while self.time_remaining() == 0 and len(self.queue) > 0:
160  try:
161  MessageHandler.handle_message(self, self.queue[0])
162  except:
163  pass
164  del self.queue[0]


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri May 10 2019 02:17:02