subscription_modifiers.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 from collections import deque
34 from threading import Thread, Condition
35 from time import time
36 import traceback
37 import sys
38 
39 """ Sits between incoming messages from a subscription, and the outgoing
40 publish method. Provides throttling / buffering capabilities.
41 
42 When the parameters change, the handler may transition to a different kind
43 of handler
44 """
45 
46 
48  def __init__(self, previous_handler=None, publish=None):
49  if previous_handler:
50  self.last_publish = previous_handler.last_publish
51  self.throttle_rate = previous_handler.throttle_rate
52  self.queue_length = previous_handler.queue_length
53  self.publish = previous_handler.publish
54  else:
55  self.last_publish = 0
56  self.throttle_rate = 0
57  self.queue_length = 0
58  self.publish = publish
59 
60  def set_throttle_rate(self, throttle_rate):
61  self.throttle_rate = throttle_rate / 1000.0
62  return self.transition()
63 
64  def set_queue_length(self, queue_length):
65  self.queue_length = queue_length
66  return self.transition()
67 
68  def time_remaining(self):
69  return max((self.last_publish + self.throttle_rate) - time(), 0)
70 
71  def handle_message(self, msg):
72  self.last_publish = time()
73  self.publish(msg)
74 
75  def transition(self):
76  if self.throttle_rate == 0 and self.queue_length == 0:
77  return self
78  elif self.queue_length == 0:
79  return ThrottleMessageHandler(self)
80  else:
81  return QueueMessageHandler(self)
82 
83  def finish(self):
84  pass
85 
86 
87 class ThrottleMessageHandler(MessageHandler):
88 
89  def handle_message(self, msg):
90  if self.time_remaining() == 0:
91  MessageHandler.handle_message(self, msg)
92 
93  def transition(self):
94  if self.throttle_rate == 0 and self.queue_length == 0:
95  return MessageHandler(self)
96  elif self.queue_length == 0:
97  return self
98  else:
99  return QueueMessageHandler(self)
100 
101  def finish(self):
102  pass
103 
104 
105 class QueueMessageHandler(MessageHandler, Thread):
106 
107  def __init__(self, previous_handler):
108  Thread.__init__(self)
109  MessageHandler.__init__(self, previous_handler)
110  self.daemon = True
111  self.queue = deque(maxlen=self.queue_length)
112  self.c = Condition()
113  self.alive = True
114  self.start()
115 
116  def handle_message(self, msg):
117  with self.c:
118  should_notify = len(self.queue) == 0
119  self.queue.append(msg)
120  if should_notify:
121  self.c.notify()
122 
123  def transition(self):
124  if self.throttle_rate == 0 and self.queue_length == 0:
125  self.finish()
126  return MessageHandler(self)
127  elif self.queue_length == 0:
128  self.finish()
129  return ThrottleMessageHandler(self)
130  else:
131  with self.c:
132  old_queue = self.queue
133  self.queue = deque(maxlen=self.queue_length)
134  while len(old_queue) > 0:
135  self.queue.append(old_queue.popleft())
136  self.c.notify()
137  return self
138 
139  def finish(self):
140  """ If throttle was set to 0, this pushes all buffered messages """
141  # Notify the thread to finish
142  with self.c:
143  self.alive = False
144  self.c.notify()
145 
146  self.join()
147 
148  def run(self):
149  while self.alive:
150  msg = None
151  with self.c:
152  if len(self.queue) == 0:
153  self.c.wait()
154  else:
155  self.c.wait(self.time_remaining())
156  if self.alive and self.time_remaining() == 0 and len(self.queue) > 0:
157  msg = self.queue.popleft()
158  if msg is not None:
159  try:
160  MessageHandler.handle_message(self, msg)
161  except:
162  traceback.print_exc(file=sys.stderr)
163  while self.time_remaining() == 0 and len(self.queue) > 0:
164  try:
165  MessageHandler.handle_message(self, self.queue.popleft())
166  except:
167  traceback.print_exc(file=sys.stderr)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14