subscribe.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 import sys
34 PYTHON2 = sys.version_info < (3, 0)
35 
36 import fnmatch
37 from threading import Lock
38 from functools import partial
39 from rospy import loginfo, get_rostime
40 from rosbridge_library.capability import Capability
43 from rosbridge_library.internal.pngcompression import encode as encode_png
44 
45 try:
46  from cbor import dumps as encode_cbor
47 except ImportError:
48  from rosbridge_library.util.cbor import dumps as encode_cbor
49 
50 try:
51  from ujson import dumps as encode_json
52 except ImportError:
53  try:
54  from simplejson import dumps as encode_json
55  except ImportError:
56  from json import dumps as encode_json
57 
58 from rosbridge_library.util import string_types
59 
60 
61 class Subscription():
62  """ Keeps track of the clients multiple calls to subscribe.
63 
64  Chooses the most appropriate settings to send messages """
65 
66  def __init__(self, client_id, topic, publish):
67  """ Create a subscription for the specified client on the specified
68  topic, with callback publish
69 
70  Keyword arguments:
71  client_id -- the ID of the client making this subscription
72  topic -- the name of the topic to subscribe to
73  publish -- the callback function for incoming messages
74 
75  """
76  self.client_id = client_id
77  self.topic = topic
78  self.publish = publish
79 
80  self.clients = {}
81 
82  self.handler = MessageHandler(None, self._publish)
83  self.handler_lock = Lock()
84  self.update_params()
85 
86  def unregister(self):
87  """ Unsubscribes this subscription and cleans up resources """
88  manager.unsubscribe(self.client_id, self.topic)
89  with self.handler_lock:
90  self.handler.finish()
91  self.clients.clear()
92 
93  def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
94  queue_length=0, fragment_size=None, compression="none"):
95  """ Add another client's subscription request
96 
97  If there are multiple calls to subscribe, the values actually used for
98  queue_length, fragment_size, compression and throttle_rate are
99  chosen to encompass all subscriptions' requirements
100 
101  Keyword arguments:
102  sid -- the subscription id from the client
103  msg_type -- the type of the message to subscribe to
104  throttle_rate -- the minimum time (in ms) allowed between messages
105  being sent. If multiple subscriptions, the lower of these is used
106  queue_length -- the number of messages that can be buffered. If
107  multiple subscriptions, the lower of these is used
108  fragment_size -- None if no fragmentation, or the maximum length of
109  allowed outgoing messages
110  compression -- "none" if no compression, or some other value if
111  compression is to be used (current valid values are 'png')
112 
113  """
114 
115  client_details = {
116  "throttle_rate": throttle_rate,
117  "queue_length": queue_length,
118  "fragment_size": fragment_size,
119  "compression": compression
120  }
121 
122  self.clients[sid] = client_details
123 
124  self.update_params()
125 
126  if compression == "cbor-raw":
127  msg_type = "__AnyMsg"
128 
129  # Subscribe with the manager. This will propagate any exceptions
130  manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
131 
132  def unsubscribe(self, sid=None):
133  """ Unsubscribe this particular client's subscription
134 
135  Keyword arguments:
136  sid -- the individual subscription id. If None, all are unsubscribed
137 
138  """
139  if sid is None:
140  self.clients.clear()
141  elif sid in self.clients:
142  del self.clients[sid]
143 
144  if not self.is_empty():
145  self.update_params()
146 
147  def is_empty(self):
148  """ Return true if there are no subscriptions currently """
149  return len(self.clients) == 0
150 
151  def _publish(self, message):
152  """ Internal method to propagate published messages to the registered
153  publish callback """
154  self.publish(message, self.fragment_size, self.compression)
155 
156  def on_msg(self, msg):
157  """ Raw callback called by subscription manager for all incoming
158  messages.
159 
160  Incoming messages are passed to the message handler which may drop,
161  buffer, or propagate the message
162 
163  """
164  with self.handler_lock:
165  self.handler.handle_message(msg)
166 
167  def update_params(self):
168  """ Determine the 'lowest common denominator' params to satisfy all
169  subscribed clients. """
170  if len(self.clients) == 0:
171  self.throttle_rate = 0
172  self.queue_length = 0
173  self.fragment_size = None
174  self.compression = "none"
175  return
176 
177  def f(fieldname):
178  return [x[fieldname] for x in self.clients.values()]
179 
180  self.throttle_rate = min(f("throttle_rate"))
181  self.queue_length = min(f("queue_length"))
182  frags = [x for x in f("fragment_size") if x != None]
183  if frags == []:
184  self.fragment_size = None
185  else:
186  self.fragment_size = min(frags)
187 
188  self.compression = "none"
189  if "png" in f("compression"):
190  self.compression = "png"
191  if "cbor" in f("compression"):
192  self.compression = "cbor"
193  if "cbor-raw" in f("compression"):
194  self.compression = "cbor-raw"
195 
196  with self.handler_lock:
197  self.handler = self.handler.set_throttle_rate(self.throttle_rate)
198  self.handler = self.handler.set_queue_length(self.queue_length)
199 
200 
202 
203  subscribe_msg_fields = [(True, "topic", string_types), (False, "type", string_types),
204  (False, "throttle_rate", int), (False, "fragment_size", int),
205  (False, "queue_length", int), (False, "compression", string_types)]
206  unsubscribe_msg_fields = [(True, "topic", string_types)]
207 
208  topics_glob = None
209 
210  def __init__(self, protocol):
211  # Call superclass constructor
212  Capability.__init__(self, protocol)
213 
214  # Register the operations that this capability provides
215  protocol.register_operation("subscribe", self.subscribe)
216  protocol.register_operation("unsubscribe", self.unsubscribe)
217 
218  self._subscriptions = {}
219 
220  def subscribe(self, msg):
221  # Pull out the ID
222  sid = msg.get("id", None)
223 
224  # Check the args
225  self.basic_type_check(msg, self.subscribe_msg_fields)
226 
227  # Make the subscription
228  topic = msg["topic"]
229 
230  if Subscribe.topics_glob is not None and Subscribe.topics_glob:
231  self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
232  match = False
233  for glob in Subscribe.topics_glob:
234  if (fnmatch.fnmatch(topic, glob)):
235  self.protocol.log("debug", "Found match with glob " + glob + ", continuing subscription...")
236  match = True
237  break
238  if not match:
239  self.protocol.log("warn", "No match found for topic, cancelling subscription to: " + topic)
240  return
241  else:
242  self.protocol.log("debug", "No topic security glob, not checking subscription.")
243 
244  if not topic in self._subscriptions:
245  client_id = self.protocol.client_id
246  cb = partial(self.publish, topic)
247  self._subscriptions[topic] = Subscription(client_id, topic, cb)
248 
249  # Register the subscriber
250  subscribe_args = {
251  "sid": sid,
252  "msg_type": msg.get("type", None),
253  "throttle_rate": msg.get("throttle_rate", 0),
254  "fragment_size": msg.get("fragment_size", None),
255  "queue_length": msg.get("queue_length", 0),
256  "compression": msg.get("compression", "none")
257  }
258  self._subscriptions[topic].subscribe(**subscribe_args)
259 
260  self.protocol.log("info", "Subscribed to %s" % topic)
261 
262  def unsubscribe(self, msg):
263  # Pull out the ID
264  sid = msg.get("id", None)
265 
267 
268  topic = msg["topic"]
269  if Subscribe.topics_glob is not None and Subscribe.topics_glob:
270  self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
271  match = False
272  for glob in Subscribe.topics_glob:
273  if (fnmatch.fnmatch(topic, glob)):
274  self.protocol.log("debug", "Found match with glob " + glob + ", continuing unsubscription...")
275  match = True
276  break
277  if not match:
278  self.protocol.log("warn", "No match found for topic, cancelling unsubscription from: " + topic)
279  return
280  else:
281  self.protocol.log("debug", "No topic security glob, not checking unsubscription.")
282 
283  if topic not in self._subscriptions:
284  return
285  self._subscriptions[topic].unsubscribe(sid)
286 
287  if self._subscriptions[topic].is_empty():
288  self._subscriptions[topic].unregister()
289  del self._subscriptions[topic]
290 
291  self.protocol.log("info", "Unsubscribed from %s" % topic)
292 
293  def publish(self, topic, message, fragment_size=None, compression="none"):
294  """ Publish a message to the client
295 
296  Keyword arguments:
297  topic -- the topic to publish the message on
298  message -- a ROS message wrapped by OutgoingMessage
299  fragment_size -- (optional) fragment the serialized message into msgs
300  with payloads not greater than this value
301  compression -- (optional) compress the message. valid values are
302  'png' and 'none'
303 
304  """
305  # TODO: fragmentation, proper ids
306  if Subscribe.topics_glob and Subscribe.topics_glob:
307  self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
308  match = False
309  for glob in Subscribe.topics_glob:
310  if (fnmatch.fnmatch(topic, glob)):
311  self.protocol.log("debug", "Found match with glob " + glob + ", continuing topic publish...")
312  match = True
313  break
314  if not match:
315  self.protocol.log("warn", "No match found for topic, cancelling topic publish to: " + topic)
316  return
317  else:
318  self.protocol.log("debug", "No topic security glob, not checking topic publish.")
319 
320  if PYTHON2:
321  topic = unicode(topic)
322 
323  outgoing_msg = {u"op": u"publish", u"topic": topic}
324  if compression=="png":
325  outgoing_msg["msg"] = message.get_json_values()
326  outgoing_msg_dumped = encode_json(outgoing_msg)
327  outgoing_msg = {"op": "png", "data": encode_png(outgoing_msg_dumped)}
328  elif compression=="cbor":
329  outgoing_msg[u"msg"] = message.get_cbor_values()
330  outgoing_msg = bytearray(encode_cbor(outgoing_msg))
331  elif compression=="cbor-raw":
332  now = get_rostime()
333  outgoing_msg[u"msg"] = {
334  u"secs": now.secs,
335  u"nsecs": now.nsecs,
336  u"bytes": message._message._buff
337  }
338  outgoing_msg = bytearray(encode_cbor(outgoing_msg))
339  else:
340  outgoing_msg["msg"] = message.get_json_values()
341 
342  self.protocol.send(outgoing_msg)
343 
344  def finish(self):
345  for subscription in self._subscriptions.values():
346  subscription.unregister()
347  self._subscriptions.clear()
348  self.protocol.unregister_operation("subscribe")
349  self.protocol.unregister_operation("unsubscribe")
def __init__(self, client_id, topic, publish)
Definition: subscribe.py:66
def subscribe(self, sid=None, msg_type=None, throttle_rate=0, queue_length=0, fragment_size=None, compression="none")
Definition: subscribe.py:94
def publish(self, topic, message, fragment_size=None, compression="none")
Definition: subscribe.py:293
def basic_type_check(self, msg, types_info)
Definition: capability.py:76


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14