subscribe.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 import sys
34 PYTHON2 = sys.version_info < (3, 0)
35 
36 import fnmatch
37 from threading import Lock
38 from functools import partial
39 from rosbridge_library.capability import Capability
42 from rosbridge_library.internal.pngcompression import encode as encode_png
43 
44 try:
45  from ujson import dumps as encode_json
46 except ImportError:
47  try:
48  from simplejson import dumps as encode_json
49  except ImportError:
50  from json import dumps as encode_json
51 
52 from rosbridge_library.util import string_types
53 
54 
55 class Subscription():
56  """ Keeps track of the clients multiple calls to subscribe.
57 
58  Chooses the most appropriate settings to send messages """
59 
60  def __init__(self, client_id, topic, publish):
61  """ Create a subscription for the specified client on the specified
62  topic, with callback publish
63 
64  Keyword arguments:
65  client_id -- the ID of the client making this subscription
66  topic -- the name of the topic to subscribe to
67  publish -- the callback function for incoming messages
68 
69  """
70  self.client_id = client_id
71  self.topic = topic
72  self.publish = publish
73 
74  self.clients = {}
75 
76  self.handler = MessageHandler(None, self._publish)
77  self.handler_lock = Lock()
78  self.update_params()
79 
80  def unregister(self):
81  """ Unsubscribes this subscription and cleans up resources """
82  manager.unsubscribe(self.client_id, self.topic)
83  with self.handler_lock:
84  self.handler.finish(block=False)
85  self.clients.clear()
86 
87  def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
88  queue_length=0, fragment_size=None, compression="none"):
89  """ Add another client's subscription request
90 
91  If there are multiple calls to subscribe, the values actually used for
92  queue_length, fragment_size, compression and throttle_rate are
93  chosen to encompass all subscriptions' requirements
94 
95  Keyword arguments:
96  sid -- the subscription id from the client
97  msg_type -- the type of the message to subscribe to
98  throttle_rate -- the minimum time (in ms) allowed between messages
99  being sent. If multiple subscriptions, the lower of these is used
100  queue_length -- the number of messages that can be buffered. If
101  multiple subscriptions, the lower of these is used
102  fragment_size -- None if no fragmentation, or the maximum length of
103  allowed outgoing messages
104  compression -- "none" if no compression, or some other value if
105  compression is to be used (current valid values are 'png')
106 
107  """
108 
109  client_details = {
110  "throttle_rate": throttle_rate,
111  "queue_length": queue_length,
112  "fragment_size": fragment_size,
113  "compression": compression
114  }
115 
116  self.clients[sid] = client_details
117 
118  self.update_params()
119 
120  if compression == "cbor-raw":
121  msg_type = "__AnyMsg"
122 
123  # Subscribe with the manager. This will propagate any exceptions
124  manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
125 
126  def unsubscribe(self, sid=None):
127  """ Unsubscribe this particular client's subscription
128 
129  Keyword arguments:
130  sid -- the individual subscription id. If None, all are unsubscribed
131 
132  """
133  if sid is None:
134  self.clients.clear()
135  elif sid in self.clients:
136  del self.clients[sid]
137 
138  if not self.is_empty():
139  self.update_params()
140 
141  def is_empty(self):
142  """ Return true if there are no subscriptions currently """
143  return len(self.clients) == 0
144 
145  def _publish(self, message):
146  """ Internal method to propagate published messages to the registered
147  publish callback """
148  self.publish(message, self.fragment_size, self.compression)
149 
150  def on_msg(self, msg):
151  """ Raw callback called by subscription manager for all incoming
152  messages.
153 
154  Incoming messages are passed to the message handler which may drop,
155  buffer, or propagate the message
156 
157  """
158  with self.handler_lock:
159  self.handler.handle_message(msg)
160 
161  def update_params(self):
162  """ Determine the 'lowest common denominator' params to satisfy all
163  subscribed clients. """
164  if len(self.clients) == 0:
165  self.throttle_rate = 0
166  self.queue_length = 0
167  self.fragment_size = None
168  self.compression = "none"
169  return
170 
171  def f(fieldname):
172  return [x[fieldname] for x in self.clients.values()]
173 
174  self.throttle_rate = min(f("throttle_rate"))
175  self.queue_length = min(f("queue_length"))
176  frags = [x for x in f("fragment_size") if x != None]
177  if frags == []:
178  self.fragment_size = None
179  else:
180  self.fragment_size = min(frags)
181 
182  self.compression = "none"
183  if "png" in f("compression"):
184  self.compression = "png"
185  if "cbor" in f("compression"):
186  self.compression = "cbor"
187  if "cbor-raw" in f("compression"):
188  self.compression = "cbor-raw"
189 
190  with self.handler_lock:
191  self.handler = self.handler.set_throttle_rate(self.throttle_rate)
192  self.handler = self.handler.set_queue_length(self.queue_length)
193 
194 
196 
197  subscribe_msg_fields = [(True, "topic", string_types), (False, "type", string_types),
198  (False, "throttle_rate", int), (False, "fragment_size", int),
199  (False, "queue_length", int), (False, "compression", string_types)]
200  unsubscribe_msg_fields = [(True, "topic", string_types)]
201 
202  topics_glob = None
203 
204  def __init__(self, protocol):
205  # Call superclass constructor
206  Capability.__init__(self, protocol)
207 
208  # Register the operations that this capability provides
209  protocol.register_operation("subscribe", self.subscribe)
210  protocol.register_operation("unsubscribe", self.unsubscribe)
211 
212  self._subscriptions = {}
213 
214  def subscribe(self, msg):
215  # Pull out the ID
216  sid = msg.get("id", None)
217 
218  # Check the args
219  self.basic_type_check(msg, self.subscribe_msg_fields)
220 
221  # Make the subscription
222  topic = msg["topic"]
223 
224  if Subscribe.topics_glob is not None and Subscribe.topics_glob:
225  self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
226  match = False
227  for glob in Subscribe.topics_glob:
228  if (fnmatch.fnmatch(topic, glob)):
229  self.protocol.log("debug", "Found match with glob " + glob + ", continuing subscription...")
230  match = True
231  break
232  if not match:
233  self.protocol.log("warn", "No match found for topic, cancelling subscription to: " + topic)
234  return
235  else:
236  self.protocol.log("debug", "No topic security glob, not checking subscription.")
237 
238  if not topic in self._subscriptions:
239  client_id = self.protocol.client_id
240  cb = partial(self.publish, topic)
241  self._subscriptions[topic] = Subscription(client_id, topic, cb)
242 
243  # Register the subscriber
244  subscribe_args = {
245  "sid": sid,
246  "msg_type": msg.get("type", None),
247  "throttle_rate": msg.get("throttle_rate", 0),
248  "fragment_size": msg.get("fragment_size", None),
249  "queue_length": msg.get("queue_length", 0),
250  "compression": msg.get("compression", "none")
251  }
252  self._subscriptions[topic].subscribe(**subscribe_args)
253 
254  self.protocol.log("info", "Subscribed to %s" % topic)
255 
256  def unsubscribe(self, msg):
257  # Pull out the ID
258  sid = msg.get("id", None)
259 
261 
262  topic = msg["topic"]
263 
264  if topic not in self._subscriptions:
265  return
266  self._subscriptions[topic].unsubscribe(sid)
267 
268  if self._subscriptions[topic].is_empty():
269  self._subscriptions[topic].unregister()
270  del self._subscriptions[topic]
271 
272  self.protocol.log("info", "Unsubscribed from %s" % topic)
273 
274  def publish(self, topic, message, fragment_size=None, compression="none"):
275  """ Publish a message to the client
276 
277  Keyword arguments:
278  topic -- the topic to publish the message on
279  message -- a ROS message wrapped by OutgoingMessage
280  fragment_size -- (optional) fragment the serialized message into msgs
281  with payloads not greater than this value
282  compression -- (optional) compress the message. valid values are
283  'png' and 'none'
284 
285  """
286  # TODO: fragmentation, proper ids
287 
288  outgoing_msg = {u"op": u"publish", u"topic": topic}
289  if compression=="png":
290  outgoing_msg["msg"] = message.get_json_values()
291  outgoing_msg_dumped = encode_json(outgoing_msg)
292  outgoing_msg = {"op": "png", "data": encode_png(outgoing_msg_dumped)}
293  elif compression=="cbor":
294  outgoing_msg = message.get_cbor(outgoing_msg)
295  elif compression=="cbor-raw":
296  outgoing_msg = message.get_cbor_raw(outgoing_msg)
297  else:
298  outgoing_msg["msg"] = message.get_json_values()
299 
300  self.protocol.send(outgoing_msg, compression=compression)
301 
302  def finish(self):
303  for subscription in self._subscriptions.values():
304  subscription.unregister()
305  self._subscriptions.clear()
306  self.protocol.unregister_operation("subscribe")
307  self.protocol.unregister_operation("unsubscribe")
def __init__(self, client_id, topic, publish)
Definition: subscribe.py:60
def subscribe(self, sid=None, msg_type=None, throttle_rate=0, queue_length=0, fragment_size=None, compression="none")
Definition: subscribe.py:88
def publish(self, topic, message, fragment_size=None, compression="none")
Definition: subscribe.py:274
def basic_type_check(self, msg, types_info)
Definition: capability.py:76


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18