subscribe.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 import sys
34 PYTHON2 = sys.version_info < (3, 0)
35 
36 import fnmatch
37 from threading import Lock
38 from functools import partial
39 from rospy import loginfo
40 from rosbridge_library.capability import Capability
43 from rosbridge_library.internal.pngcompression import encode as encode_png
44 
45 try:
46  from cbor import dumps as encode_cbor
47 except ImportError:
48  from rosbridge_library.util.cbor import dumps as encode_cbor
49 
50 try:
51  from ujson import dumps as encode_json
52 except ImportError:
53  try:
54  from simplejson import dumps as encode_json
55  except ImportError:
56  from json import dumps as encode_json
57 
58 from rosbridge_library.util import string_types
59 
60 
61 class Subscription():
62  """ Keeps track of the clients multiple calls to subscribe.
63 
64  Chooses the most appropriate settings to send messages """
65 
66  def __init__(self, client_id, topic, publish):
67  """ Create a subscription for the specified client on the specified
68  topic, with callback publish
69 
70  Keyword arguments:
71  client_id -- the ID of the client making this subscription
72  topic -- the name of the topic to subscribe to
73  publish -- the callback function for incoming messages
74 
75  """
76  self.client_id = client_id
77  self.topic = topic
78  self.publish = publish
79 
80  self.clients = {}
81 
82  self.handler = MessageHandler(None, self._publish)
83  self.handler_lock = Lock()
84  self.update_params()
85 
86  def unregister(self):
87  """ Unsubscribes this subscription and cleans up resources """
88  manager.unsubscribe(self.client_id, self.topic)
89  with self.handler_lock:
90  self.handler.finish()
91  self.clients.clear()
92 
93  def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
94  queue_length=0, fragment_size=None, compression="none"):
95  """ Add another client's subscription request
96 
97  If there are multiple calls to subscribe, the values actually used for
98  queue_length, fragment_size, compression and throttle_rate are
99  chosen to encompass all subscriptions' requirements
100 
101  Keyword arguments:
102  sid -- the subscription id from the client
103  msg_type -- the type of the message to subscribe to
104  throttle_rate -- the minimum time (in ms) allowed between messages
105  being sent. If multiple subscriptions, the lower of these is used
106  queue_length -- the number of messages that can be buffered. If
107  multiple subscriptions, the lower of these is used
108  fragment_size -- None if no fragmentation, or the maximum length of
109  allowed outgoing messages
110  compression -- "none" if no compression, or some other value if
111  compression is to be used (current valid values are 'png')
112 
113  """
114 
115  client_details = {
116  "throttle_rate": throttle_rate,
117  "queue_length": queue_length,
118  "fragment_size": fragment_size,
119  "compression": compression
120  }
121 
122  self.clients[sid] = client_details
123 
124  self.update_params()
125 
126  # Subscribe with the manager. This will propagate any exceptions
127  manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
128 
129  def unsubscribe(self, sid=None):
130  """ Unsubscribe this particular client's subscription
131 
132  Keyword arguments:
133  sid -- the individual subscription id. If None, all are unsubscribed
134 
135  """
136  if sid is None:
137  self.clients.clear()
138  elif sid in self.clients:
139  del self.clients[sid]
140 
141  if not self.is_empty():
142  self.update_params()
143 
144  def is_empty(self):
145  """ Return true if there are no subscriptions currently """
146  return len(self.clients) == 0
147 
148  def _publish(self, message):
149  """ Internal method to propagate published messages to the registered
150  publish callback """
151  self.publish(message, self.fragment_size, self.compression)
152 
153  def on_msg(self, msg):
154  """ Raw callback called by subscription manager for all incoming
155  messages.
156 
157  Incoming messages are passed to the message handler which may drop,
158  buffer, or propagate the message
159 
160  """
161  with self.handler_lock:
162  self.handler.handle_message(msg)
163 
164  def update_params(self):
165  """ Determine the 'lowest common denominator' params to satisfy all
166  subscribed clients. """
167  if len(self.clients) == 0:
168  self.throttle_rate = 0
169  self.queue_length = 0
170  self.fragment_size = None
171  self.compression = "none"
172  return
173 
174  def f(fieldname):
175  return [x[fieldname] for x in self.clients.values()]
176 
177  self.throttle_rate = min(f("throttle_rate"))
178  self.queue_length = min(f("queue_length"))
179  frags = [x for x in f("fragment_size") if x != None]
180  if frags == []:
181  self.fragment_size = None
182  else:
183  self.fragment_size = min(frags)
184 
185  self.compression = "none"
186  if "png" in f("compression"):
187  self.compression = "png"
188  if "cbor" in f("compression"):
189  self.compression = "cbor"
190 
191  with self.handler_lock:
192  self.handler = self.handler.set_throttle_rate(self.throttle_rate)
193  self.handler = self.handler.set_queue_length(self.queue_length)
194 
195 
197 
198  subscribe_msg_fields = [(True, "topic", string_types), (False, "type", string_types),
199  (False, "throttle_rate", int), (False, "fragment_size", int),
200  (False, "queue_length", int), (False, "compression", string_types)]
201  unsubscribe_msg_fields = [(True, "topic", string_types)]
202 
203  topics_glob = None
204 
205  def __init__(self, protocol):
206  # Call superclass constructor
207  Capability.__init__(self, protocol)
208 
209  # Register the operations that this capability provides
210  protocol.register_operation("subscribe", self.subscribe)
211  protocol.register_operation("unsubscribe", self.unsubscribe)
212 
213  self._subscriptions = {}
214 
215  def subscribe(self, msg):
216  # Pull out the ID
217  sid = msg.get("id", None)
218 
219  # Check the args
220  self.basic_type_check(msg, self.subscribe_msg_fields)
221 
222  # Make the subscription
223  topic = msg["topic"]
224 
225  if Subscribe.topics_glob is not None and Subscribe.topics_glob:
226  self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
227  match = False
228  for glob in Subscribe.topics_glob:
229  if (fnmatch.fnmatch(topic, glob)):
230  self.protocol.log("debug", "Found match with glob " + glob + ", continuing subscription...")
231  match = True
232  break
233  if not match:
234  self.protocol.log("warn", "No match found for topic, cancelling subscription to: " + topic)
235  return
236  else:
237  self.protocol.log("debug", "No topic security glob, not checking subscription.")
238 
239  if not topic in self._subscriptions:
240  client_id = self.protocol.client_id
241  cb = partial(self.publish, topic)
242  self._subscriptions[topic] = Subscription(client_id, topic, cb)
243 
244  # Register the subscriber
245  subscribe_args = {
246  "sid": sid,
247  "msg_type": msg.get("type", None),
248  "throttle_rate": msg.get("throttle_rate", 0),
249  "fragment_size": msg.get("fragment_size", None),
250  "queue_length": msg.get("queue_length", 0),
251  "compression": msg.get("compression", "none")
252  }
253  self._subscriptions[topic].subscribe(**subscribe_args)
254 
255  self.protocol.log("info", "Subscribed to %s" % topic)
256 
257  def unsubscribe(self, msg):
258  # Pull out the ID
259  sid = msg.get("id", None)
260 
262 
263  topic = msg["topic"]
264  if Subscribe.topics_glob is not None and Subscribe.topics_glob:
265  self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
266  match = False
267  for glob in Subscribe.topics_glob:
268  if (fnmatch.fnmatch(topic, glob)):
269  self.protocol.log("debug", "Found match with glob " + glob + ", continuing unsubscription...")
270  match = True
271  break
272  if not match:
273  self.protocol.log("warn", "No match found for topic, cancelling unsubscription from: " + topic)
274  return
275  else:
276  self.protocol.log("debug", "No topic security glob, not checking unsubscription.")
277 
278  if topic not in self._subscriptions:
279  return
280  self._subscriptions[topic].unsubscribe(sid)
281 
282  if self._subscriptions[topic].is_empty():
283  self._subscriptions[topic].unregister()
284  del self._subscriptions[topic]
285 
286  self.protocol.log("info", "Unsubscribed from %s" % topic)
287 
288  def publish(self, topic, message, fragment_size=None, compression="none"):
289  """ Publish a message to the client
290 
291  Keyword arguments:
292  topic -- the topic to publish the message on
293  message -- a ROS message wrapped by OutgoingMessage
294  fragment_size -- (optional) fragment the serialized message into msgs
295  with payloads not greater than this value
296  compression -- (optional) compress the message. valid values are
297  'png' and 'none'
298 
299  """
300  # TODO: fragmentation, proper ids
301  if Subscribe.topics_glob and Subscribe.topics_glob:
302  self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
303  match = False
304  for glob in Subscribe.topics_glob:
305  if (fnmatch.fnmatch(topic, glob)):
306  self.protocol.log("debug", "Found match with glob " + glob + ", continuing topic publish...")
307  match = True
308  break
309  if not match:
310  self.protocol.log("warn", "No match found for topic, cancelling topic publish to: " + topic)
311  return
312  else:
313  self.protocol.log("debug", "No topic security glob, not checking topic publish.")
314 
315  if PYTHON2:
316  topic = unicode(topic)
317 
318  outgoing_msg = {u"op": u"publish", u"topic": topic}
319  if compression=="png":
320  outgoing_msg["msg"] = message.get_json_values()
321  outgoing_msg_dumped = encode_json(outgoing_msg)
322  outgoing_msg = {"op": "png", "data": encode_png(outgoing_msg_dumped)}
323  elif compression=="cbor":
324  outgoing_msg[u"msg"] = message.get_cbor_values()
325  outgoing_msg = bytearray(encode_cbor(outgoing_msg))
326  else:
327  outgoing_msg["msg"] = message.get_json_values()
328 
329  self.protocol.send(outgoing_msg)
330 
331  def finish(self):
332  for subscription in self._subscriptions.values():
333  subscription.unregister()
334  self._subscriptions.clear()
335  self.protocol.unregister_operation("subscribe")
336  self.protocol.unregister_operation("unsubscribe")
def __init__(self, client_id, topic, publish)
Definition: subscribe.py:66
def subscribe(self, sid=None, msg_type=None, throttle_rate=0, queue_length=0, fragment_size=None, compression="none")
Definition: subscribe.py:94
def publish(self, topic, message, fragment_size=None, compression="none")
Definition: subscribe.py:288
def basic_type_check(self, msg, types_info)
Definition: capability.py:76


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri May 10 2019 02:17:02