00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import fnmatch
00034 from threading import Lock
00035 from functools import partial
00036 from rospy import loginfo
00037 from rosbridge_library.capability import Capability
00038 from rosbridge_library.internal.subscribers import manager
00039 from rosbridge_library.internal.subscription_modifiers import MessageHandler
00040 from rosbridge_library.internal.pngcompression import encode
00041
00042 try:
00043 from ujson import dumps
00044 except ImportError:
00045 try:
00046 from simplejson import dumps
00047 except ImportError:
00048 from json import dumps
00049
00050
00051 class Subscription():
00052 """ Keeps track of the clients multiple calls to subscribe.
00053
00054 Chooses the most appropriate settings to send messages """
00055
00056 def __init__(self, client_id, topic, publish):
00057 """ Create a subscription for the specified client on the specified
00058 topic, with callback publish
00059
00060 Keyword arguments:
00061 client_id -- the ID of the client making this subscription
00062 topic -- the name of the topic to subscribe to
00063 publish -- the callback function for incoming messages
00064
00065 """
00066 self.client_id = client_id
00067 self.topic = topic
00068 self.publish = publish
00069
00070 self.clients = {}
00071
00072 self.handler = MessageHandler(None, self._publish)
00073 self.handler_lock = Lock()
00074 self.update_params()
00075
00076 def unregister(self):
00077 """ Unsubscribes this subscription and cleans up resources """
00078 manager.unsubscribe(self.client_id, self.topic)
00079 with self.handler_lock:
00080 self.handler.finish()
00081 self.clients.clear()
00082
00083 def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
00084 queue_length=0, fragment_size=None, compression="none"):
00085 """ Add another client's subscription request
00086
00087 If there are multiple calls to subscribe, the values actually used for
00088 queue_length, fragment_size, compression and throttle_rate are
00089 chosen to encompass all subscriptions' requirements
00090
00091 Keyword arguments:
00092 sid -- the subscription id from the client
00093 msg_type -- the type of the message to subscribe to
00094 throttle_rate -- the minimum time (in ms) allowed between messages
00095 being sent. If multiple subscriptions, the lower of these is used
00096 queue_length -- the number of messages that can be buffered. If
00097 multiple subscriptions, the lower of these is used
00098 fragment_size -- None if no fragmentation, or the maximum length of
00099 allowed outgoing messages
00100 compression -- "none" if no compression, or some other value if
00101 compression is to be used (current valid values are 'png')
00102
00103 """
00104
00105 client_details = {
00106 "throttle_rate": throttle_rate,
00107 "queue_length": queue_length,
00108 "fragment_size": fragment_size,
00109 "compression": compression
00110 }
00111
00112 self.clients[sid] = client_details
00113
00114 self.update_params()
00115
00116
00117 manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
00118
00119 def unsubscribe(self, sid=None):
00120 """ Unsubscribe this particular client's subscription
00121
00122 Keyword arguments:
00123 sid -- the individual subscription id. If None, all are unsubscribed
00124
00125 """
00126 if sid is None:
00127 self.clients.clear()
00128 elif sid in self.clients:
00129 del self.clients[sid]
00130
00131 if not self.is_empty():
00132 self.update_params()
00133
00134 def is_empty(self):
00135 """ Return true if there are no subscriptions currently """
00136 return len(self.clients) == 0
00137
00138 def _publish(self, message):
00139 """ Internal method to propagate published messages to the registered
00140 publish callback """
00141 self.publish(message, self.fragment_size, self.compression)
00142
00143 def on_msg(self, msg):
00144 """ Raw callback called by subscription manager for all incoming
00145 messages.
00146
00147 Incoming messages are passed to the message handler which may drop,
00148 buffer, or propagate the message
00149
00150 """
00151 with self.handler_lock:
00152 self.handler.handle_message(msg)
00153
00154 def update_params(self):
00155 """ Determine the 'lowest common denominator' params to satisfy all
00156 subscribed clients. """
00157 if len(self.clients) == 0:
00158 self.throttle_rate = 0
00159 self.queue_length = 0
00160 self.fragment_size = None
00161 self.compression = "none"
00162 return
00163
00164 def f(fieldname):
00165 return [x[fieldname] for x in self.clients.values()]
00166
00167 self.throttle_rate = min(f("throttle_rate"))
00168 self.queue_length = min(f("queue_length"))
00169 frags = [x for x in f("fragment_size") if x != None]
00170 if frags == []:
00171 self.fragment_size = None
00172 else:
00173 self.fragment_size = min(frags)
00174 self.compression = "png" if "png" in f("compression") else "none"
00175
00176 with self.handler_lock:
00177 self.handler = self.handler.set_throttle_rate(self.throttle_rate)
00178 self.handler = self.handler.set_queue_length(self.queue_length)
00179
00180
00181 class Subscribe(Capability):
00182
00183 subscribe_msg_fields = [(True, "topic", (str, unicode)), (False, "type", (str, unicode)),
00184 (False, "throttle_rate", int), (False, "fragment_size", int),
00185 (False, "queue_length", int), (False, "compression", (str, unicode))]
00186 unsubscribe_msg_fields = [(True, "topic", (str, unicode))]
00187
00188 topics_glob = None
00189
00190 def __init__(self, protocol):
00191
00192 Capability.__init__(self, protocol)
00193
00194
00195 protocol.register_operation("subscribe", self.subscribe)
00196 protocol.register_operation("unsubscribe", self.unsubscribe)
00197
00198 self._subscriptions = {}
00199
00200 def subscribe(self, msg):
00201
00202 sid = msg.get("id", None)
00203
00204
00205 self.basic_type_check(msg, self.subscribe_msg_fields)
00206
00207
00208 topic = msg["topic"]
00209
00210 if Subscribe.topics_glob is not None and Subscribe.topics_glob:
00211 self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00212 match = False
00213 for glob in Subscribe.topics_glob:
00214 if (fnmatch.fnmatch(topic, glob)):
00215 self.protocol.log("debug", "Found match with glob " + glob + ", continuing subscription...")
00216 match = True
00217 break
00218 if not match:
00219 self.protocol.log("warn", "No match found for topic, cancelling subscription to: " + topic)
00220 return
00221 else:
00222 self.protocol.log("debug", "No topic security glob, not checking subscription.")
00223
00224 if not topic in self._subscriptions:
00225 client_id = self.protocol.client_id
00226 cb = partial(self.publish, topic)
00227 self._subscriptions[topic] = Subscription(client_id, topic, cb)
00228
00229
00230 subscribe_args = {
00231 "sid": sid,
00232 "msg_type": msg.get("type", None),
00233 "throttle_rate": msg.get("throttle_rate", 0),
00234 "fragment_size": msg.get("fragment_size", None),
00235 "queue_length": msg.get("queue_length", 0),
00236 "compression": msg.get("compression", "none")
00237 }
00238 self._subscriptions[topic].subscribe(**subscribe_args)
00239
00240 self.protocol.log("info", "Subscribed to %s" % topic)
00241
00242 def unsubscribe(self, msg):
00243
00244 sid = msg.get("id", None)
00245
00246 self.basic_type_check(msg, self.unsubscribe_msg_fields)
00247
00248 topic = msg["topic"]
00249 if Subscribe.topics_glob is not None and Subscribe.topics_glob:
00250 self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00251 match = False
00252 for glob in Subscribe.topics_glob:
00253 if (fnmatch.fnmatch(topic, glob)):
00254 self.protocol.log("debug", "Found match with glob " + glob + ", continuing unsubscription...")
00255 match = True
00256 break
00257 if not match:
00258 self.protocol.log("warn", "No match found for topic, cancelling unsubscription from: " + topic)
00259 return
00260 else:
00261 self.protocol.log("debug", "No topic security glob, not checking unsubscription.")
00262
00263 if topic not in self._subscriptions:
00264 return
00265 self._subscriptions[topic].unsubscribe(sid)
00266
00267 if self._subscriptions[topic].is_empty():
00268 self._subscriptions[topic].unregister()
00269 del self._subscriptions[topic]
00270
00271 self.protocol.log("info", "Unsubscribed from %s" % topic)
00272
00273 def publish(self, topic, message, fragment_size=None, compression="none"):
00274 """ Publish a message to the client
00275
00276 Keyword arguments:
00277 topic -- the topic to publish the message on
00278 message -- a dict of key-value pairs. Will be wrapped in a message with
00279 opcode publish
00280 fragment_size -- (optional) fragment the serialized message into msgs
00281 with payloads not greater than this value
00282 compression -- (optional) compress the message. valid values are
00283 'png' and 'none'
00284
00285 """
00286
00287 if Subscribe.topics_glob and Subscribe.topics_glob:
00288 self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
00289 match = False
00290 for glob in Subscribe.topics_glob:
00291 if (fnmatch.fnmatch(topic, glob)):
00292 self.protocol.log("debug", "Found match with glob " + glob + ", continuing topic publish...")
00293 match = True
00294 break
00295 if not match:
00296 self.protocol.log("warn", "No match found for topic, cancelling topic publish to: " + topic)
00297 return
00298 else:
00299 self.protocol.log("debug", "No topic security glob, not checking topic publish.")
00300
00301 outgoing_msg = {"op": "publish", "topic": topic, "msg": message}
00302 if compression == "png":
00303 outgoing_msg_dumped = dumps(outgoing_msg)
00304 outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)}
00305 self.protocol.send(outgoing_msg)
00306
00307 def finish(self):
00308 for subscription in self._subscriptions.values():
00309 subscription.unregister()
00310 self._subscriptions.clear()
00311 self.protocol.unregister_operation("subscribe")
00312 self.protocol.unregister_operation("unsubscribe")