subscription_modifiers.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 from collections import deque
34 from threading import Thread, Condition
35 from time import time
36 import traceback
37 import sys
38 
39 """ Sits between incoming messages from a subscription, and the outgoing
40 publish method. Provides throttling / buffering capabilities.
41 
42 When the parameters change, the handler may transition to a different kind
43 of handler
44 """
45 
46 
48  def __init__(self, previous_handler=None, publish=None):
49  if previous_handler:
50  self.last_publish = previous_handler.last_publish
51  self.throttle_rate = previous_handler.throttle_rate
52  self.queue_length = previous_handler.queue_length
53  self.publish = previous_handler.publish
54  else:
55  self.last_publish = 0
56  self.throttle_rate = 0
57  self.queue_length = 0
58  self.publish = publish
59 
60  def set_throttle_rate(self, throttle_rate):
61  self.throttle_rate = throttle_rate / 1000.0
62  return self.transition()
63 
64  def set_queue_length(self, queue_length):
65  self.queue_length = queue_length
66  return self.transition()
67 
68  def time_remaining(self):
69  return max((self.last_publish + self.throttle_rate) - time(), 0)
70 
71  def handle_message(self, msg):
72  self.last_publish = time()
73  self.publish(msg)
74 
75  def transition(self):
76  if self.throttle_rate == 0 and self.queue_length == 0:
77  return self
78  elif self.queue_length == 0:
79  return ThrottleMessageHandler(self)
80  else:
81  return QueueMessageHandler(self)
82 
83  def finish(self, block=True):
84  pass
85 
86 
87 class ThrottleMessageHandler(MessageHandler):
88 
89  def handle_message(self, msg):
90  if self.time_remaining() == 0:
91  MessageHandler.handle_message(self, msg)
92 
93  def transition(self):
94  if self.throttle_rate == 0 and self.queue_length == 0:
95  return MessageHandler(self)
96  elif self.queue_length == 0:
97  return self
98  else:
99  return QueueMessageHandler(self)
100 
101  def finish(self, block=True):
102  pass
103 
104 
105 class QueueMessageHandler(MessageHandler, Thread):
106 
107  def __init__(self, previous_handler):
108  Thread.__init__(self)
109  MessageHandler.__init__(self, previous_handler)
110  self.daemon = True
111  self.queue = deque(maxlen=self.queue_length)
112  self.c = Condition()
113  self.alive = True
114  self.start()
115 
116  def handle_message(self, msg):
117  with self.c:
118  if not self.alive:
119  return
120 
121  should_notify = len(self.queue) == 0
122  self.queue.append(msg)
123  if should_notify:
124  self.c.notify()
125 
126  def transition(self):
127  if self.throttle_rate == 0 and self.queue_length == 0:
128  self.finish()
129  return MessageHandler(self)
130  elif self.queue_length == 0:
131  self.finish()
132  return ThrottleMessageHandler(self)
133  else:
134  with self.c:
135  old_queue = self.queue
136  self.queue = deque(maxlen=self.queue_length)
137  while len(old_queue) > 0:
138  self.queue.append(old_queue.popleft())
139  self.c.notify()
140  return self
141 
142  def finish(self, block=True):
143  """ If throttle was set to 0, this pushes all buffered messages """
144  # Notify the thread to finish
145  with self.c:
146  self.alive = False
147  self.c.notify()
148 
149  if block:
150  self.join()
151 
152  def run(self):
153  while self.alive:
154  msg = None
155  with self.c:
156  if len(self.queue) == 0:
157  self.c.wait()
158  else:
159  self.c.wait(self.time_remaining())
160  if self.alive and self.time_remaining() == 0 and len(self.queue) > 0:
161  msg = self.queue.popleft()
162  if msg is not None:
163  try:
164  MessageHandler.handle_message(self, msg)
165  except:
166  traceback.print_exc(file=sys.stderr)
167  while self.time_remaining() == 0 and len(self.queue) > 0:
168  try:
169  MessageHandler.handle_message(self, self.queue.popleft())
170  except:
171  traceback.print_exc(file=sys.stderr)
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler.c
c
Definition: subscription_modifiers.py:112
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler.daemon
daemon
Definition: subscription_modifiers.py:110
rosbridge_library.internal.subscription_modifiers.ThrottleMessageHandler.finish
def finish(self, block=True)
Definition: subscription_modifiers.py:101
rosbridge_library.internal.subscription_modifiers.MessageHandler
Definition: subscription_modifiers.py:47
rosbridge_library.internal.subscription_modifiers.ThrottleMessageHandler
Definition: subscription_modifiers.py:87
rosbridge_library.internal.subscription_modifiers.MessageHandler.set_queue_length
def set_queue_length(self, queue_length)
Definition: subscription_modifiers.py:64
rosbridge_library.internal.subscription_modifiers.MessageHandler.set_throttle_rate
def set_throttle_rate(self, throttle_rate)
Definition: subscription_modifiers.py:60
rosbridge_library.internal.subscription_modifiers.MessageHandler.finish
def finish(self, block=True)
Definition: subscription_modifiers.py:83
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler.handle_message
def handle_message(self, msg)
Definition: subscription_modifiers.py:116
rosbridge_library.internal.subscription_modifiers.MessageHandler.throttle_rate
throttle_rate
Definition: subscription_modifiers.py:51
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler.transition
def transition(self)
Definition: subscription_modifiers.py:126
rosbridge_library.internal.subscription_modifiers.ThrottleMessageHandler.handle_message
def handle_message(self, msg)
Definition: subscription_modifiers.py:89
rosbridge_library.internal.subscription_modifiers.MessageHandler.handle_message
def handle_message(self, msg)
Definition: subscription_modifiers.py:71
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler
Definition: subscription_modifiers.py:105
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler.__init__
def __init__(self, previous_handler)
Definition: subscription_modifiers.py:107
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler.finish
def finish(self, block=True)
Definition: subscription_modifiers.py:142
rosbridge_library.internal.subscription_modifiers.MessageHandler.last_publish
last_publish
Definition: subscription_modifiers.py:50
rosbridge_library.internal.subscription_modifiers.ThrottleMessageHandler.transition
def transition(self)
Definition: subscription_modifiers.py:93
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler.alive
alive
Definition: subscription_modifiers.py:113
rosbridge_library.internal.subscription_modifiers.MessageHandler.time_remaining
def time_remaining(self)
Definition: subscription_modifiers.py:68
rosbridge_library.internal.subscription_modifiers.MessageHandler.__init__
def __init__(self, previous_handler=None, publish=None)
Definition: subscription_modifiers.py:48
rosbridge_library.internal.subscription_modifiers.MessageHandler.queue_length
queue_length
Definition: subscription_modifiers.py:52
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler.queue
queue
Definition: subscription_modifiers.py:111
rosbridge_library.internal.subscription_modifiers.MessageHandler.transition
def transition(self)
Definition: subscription_modifiers.py:75
rosbridge_library.internal.subscription_modifiers.QueueMessageHandler.run
def run(self)
Definition: subscription_modifiers.py:152
rosbridge_library.internal.subscription_modifiers.MessageHandler.publish
publish
Definition: subscription_modifiers.py:53


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Tue Oct 3 2023 02:12:45