00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import fnmatch
00034 from threading import Lock
00035 from functools import partial
00036 from rospy import loginfo
00037 from rosbridge_library.capability import Capability
00038 from rosbridge_library.internal.subscribers import manager
00039 from rosbridge_library.internal.subscription_modifiers import MessageHandler
00040 from rosbridge_library.internal.pngcompression import encode
00041
00042 try:
00043 from ujson import dumps
00044 except ImportError:
00045 try:
00046 from simplejson import dumps
00047 except ImportError:
00048 from json import dumps
00049
00050 from rosbridge_library.util import string_types
00051
00052
00053 class Subscription():
00054 """ Keeps track of the clients multiple calls to subscribe.
00055
00056 Chooses the most appropriate settings to send messages """
00057
00058 def __init__(self, client_id, topic, publish):
00059 """ Create a subscription for the specified client on the specified
00060 topic, with callback publish
00061
00062 Keyword arguments:
00063 client_id -- the ID of the client making this subscription
00064 topic -- the name of the topic to subscribe to
00065 publish -- the callback function for incoming messages
00066
00067 """
00068 self.client_id = client_id
00069 self.topic = topic
00070 self.publish = publish
00071
00072 self.clients = {}
00073
00074 self.handler = MessageHandler(None, self._publish)
00075 self.handler_lock = Lock()
00076 self.update_params()
00077
00078 def unregister(self):
00079 """ Unsubscribes this subscription and cleans up resources """
00080 manager.unsubscribe(self.client_id, self.topic)
00081 with self.handler_lock:
00082 self.handler.finish()
00083 self.clients.clear()
00084
00085 def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
00086 queue_length=0, fragment_size=None, compression="none"):
00087 """ Add another client's subscription request
00088
00089 If there are multiple calls to subscribe, the values actually used for
00090 queue_length, fragment_size, compression and throttle_rate are
00091 chosen to encompass all subscriptions' requirements
00092
00093 Keyword arguments:
00094 sid -- the subscription id from the client
00095 msg_type -- the type of the message to subscribe to
00096 throttle_rate -- the minimum time (in ms) allowed between messages
00097 being sent. If multiple subscriptions, the lower of these is used
00098 queue_length -- the number of messages that can be buffered. If
00099 multiple subscriptions, the lower of these is used
00100 fragment_size -- None if no fragmentation, or the maximum length of
00101 allowed outgoing messages
00102 compression -- "none" if no compression, or some other value if
00103 compression is to be used (current valid values are 'png')
00104
00105 """
00106
00107 client_details = {
00108 "throttle_rate": throttle_rate,
00109 "queue_length": queue_length,
00110 "fragment_size": fragment_size,
00111 "compression": compression
00112 }
00113
00114 self.clients[sid] = client_details
00115
00116 self.update_params()
00117
00118
00119 manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
00120
00121 def unsubscribe(self, sid=None):
00122 """ Unsubscribe this particular client's subscription
00123
00124 Keyword arguments:
00125 sid -- the individual subscription id. If None, all are unsubscribed
00126
00127 """
00128 if sid is None:
00129 self.clients.clear()
00130 elif sid in self.clients:
00131 del self.clients[sid]
00132
00133 if not self.is_empty():
00134 self.update_params()
00135
00136 def is_empty(self):
00137 """ Return true if there are no subscriptions currently """
00138 return len(self.clients) == 0
00139
00140 def _publish(self, message):
00141 """ Internal method to propagate published messages to the registered
00142 publish callback """
00143 self.publish(message, self.fragment_size, self.compression)
00144
00145 def on_msg(self, msg):
00146 """ Raw callback called by subscription manager for all incoming
00147 messages.
00148
00149 Incoming messages are passed to the message handler which may drop,
00150 buffer, or propagate the message
00151
00152 """
00153 with self.handler_lock:
00154 self.handler.handle_message(msg)
00155
00156 def update_params(self):
00157 """ Determine the 'lowest common denominator' params to satisfy all
00158 subscribed clients. """
00159 if len(self.clients) == 0:
00160 self.throttle_rate = 0
00161 self.queue_length = 0
00162 self.fragment_size = None
00163 self.compression = "none"
00164 return
00165
00166 def f(fieldname):
00167 return [x[fieldname] for x in self.clients.values()]
00168
00169 self.throttle_rate = min(f("throttle_rate"))
00170 self.queue_length = min(f("queue_length"))
00171 frags = [x for x in f("fragment_size") if x != None]
00172 if frags == []:
00173 self.fragment_size = None
00174 else:
00175 self.fragment_size = min(frags)
00176 self.compression = "png" if "png" in f("compression") else "none"
00177
00178 with self.handler_lock:
00179 self.handler = self.handler.set_throttle_rate(self.throttle_rate)
00180 self.handler = self.handler.set_queue_length(self.queue_length)
00181
00182
00183 class Subscribe(Capability):
00184
00185 subscribe_msg_fields = [(True, "topic", string_types), (False, "type", string_types),
00186 (False, "throttle_rate", int), (False, "fragment_size", int),
00187 (False, "queue_length", int), (False, "compression", string_types)]
00188 unsubscribe_msg_fields = [(True, "topic", string_types)]
00189
00190 topics_glob = None
00191
00192 def __init__(self, protocol):
00193
00194 Capability.__init__(self, protocol)
00195
00196
00197 protocol.register_operation("subscribe", self.subscribe)
00198 protocol.register_operation("unsubscribe", self.unsubscribe)
00199
00200 self._subscriptions = {}
00201
00202 def subscribe(self, msg):
00203
00204 sid = msg.get("id", None)
00205
00206
00207 self.basic_type_check(msg, self.subscribe_msg_fields)
00208
00209
00210 topic = msg["topic"]
00211
00212 if Subscribe.topics_glob is not None and Subscribe.topics_glob:
00213 self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00214 match = False
00215 for glob in Subscribe.topics_glob:
00216 if (fnmatch.fnmatch(topic, glob)):
00217 self.protocol.log("debug", "Found match with glob " + glob + ", continuing subscription...")
00218 match = True
00219 break
00220 if not match:
00221 self.protocol.log("warn", "No match found for topic, cancelling subscription to: " + topic)
00222 return
00223 else:
00224 self.protocol.log("debug", "No topic security glob, not checking subscription.")
00225
00226 if not topic in self._subscriptions:
00227 client_id = self.protocol.client_id
00228 cb = partial(self.publish, topic)
00229 self._subscriptions[topic] = Subscription(client_id, topic, cb)
00230
00231
00232 subscribe_args = {
00233 "sid": sid,
00234 "msg_type": msg.get("type", None),
00235 "throttle_rate": msg.get("throttle_rate", 0),
00236 "fragment_size": msg.get("fragment_size", None),
00237 "queue_length": msg.get("queue_length", 0),
00238 "compression": msg.get("compression", "none")
00239 }
00240 self._subscriptions[topic].subscribe(**subscribe_args)
00241
00242 self.protocol.log("info", "Subscribed to %s" % topic)
00243
00244 def unsubscribe(self, msg):
00245
00246 sid = msg.get("id", None)
00247
00248 self.basic_type_check(msg, self.unsubscribe_msg_fields)
00249
00250 topic = msg["topic"]
00251 if Subscribe.topics_glob is not None and Subscribe.topics_glob:
00252 self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00253 match = False
00254 for glob in Subscribe.topics_glob:
00255 if (fnmatch.fnmatch(topic, glob)):
00256 self.protocol.log("debug", "Found match with glob " + glob + ", continuing unsubscription...")
00257 match = True
00258 break
00259 if not match:
00260 self.protocol.log("warn", "No match found for topic, cancelling unsubscription from: " + topic)
00261 return
00262 else:
00263 self.protocol.log("debug", "No topic security glob, not checking unsubscription.")
00264
00265 if topic not in self._subscriptions:
00266 return
00267 self._subscriptions[topic].unsubscribe(sid)
00268
00269 if self._subscriptions[topic].is_empty():
00270 self._subscriptions[topic].unregister()
00271 del self._subscriptions[topic]
00272
00273 self.protocol.log("info", "Unsubscribed from %s" % topic)
00274
00275 def publish(self, topic, message, fragment_size=None, compression="none"):
00276 """ Publish a message to the client
00277
00278 Keyword arguments:
00279 topic -- the topic to publish the message on
00280 message -- a dict of key-value pairs. Will be wrapped in a message with
00281 opcode publish
00282 fragment_size -- (optional) fragment the serialized message into msgs
00283 with payloads not greater than this value
00284 compression -- (optional) compress the message. valid values are
00285 'png' and 'none'
00286
00287 """
00288
00289 if Subscribe.topics_glob and Subscribe.topics_glob:
00290 self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00291 match = False
00292 for glob in Subscribe.topics_glob:
00293 if (fnmatch.fnmatch(topic, glob)):
00294 self.protocol.log("debug", "Found match with glob " + glob + ", continuing topic publish...")
00295 match = True
00296 break
00297 if not match:
00298 self.protocol.log("warn", "No match found for topic, cancelling topic publish to: " + topic)
00299 return
00300 else:
00301 self.protocol.log("debug", "No topic security glob, not checking topic publish.")
00302
00303 outgoing_msg = {"op": "publish", "topic": topic, "msg": message}
00304 if compression == "png":
00305 outgoing_msg_dumped = dumps(outgoing_msg)
00306 outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)}
00307 self.protocol.send(outgoing_msg)
00308
00309 def finish(self):
00310 for subscription in self._subscriptions.values():
00311 subscription.unregister()
00312 self._subscriptions.clear()
00313 self.protocol.unregister_operation("subscribe")
00314 self.protocol.unregister_operation("unsubscribe")