00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 from threading import Lock
00034 from functools import partial
00035 from rospy import loginfo
00036 from rosbridge_library.capability import Capability
00037 from rosbridge_library.internal.subscribers import manager
00038 from rosbridge_library.internal.subscription_modifiers import MessageHandler
00039 from rosbridge_library.internal.pngcompression import encode
00040 try:
00041 from ujson import dumps
00042 except ImportError:
00043 try:
00044 from simplejson import dumps
00045 except ImportError:
00046 from json import dumps
00047
00048
00049 class Subscription():
00050 """ Keeps track of the clients multiple calls to subscribe.
00051
00052 Chooses the most appropriate settings to send messages """
00053
00054 def __init__(self, client_id, topic, publish):
00055 """ Create a subscription for the specified client on the specified
00056 topic, with callback publish
00057
00058 Keyword arguments:
00059 client_id -- the ID of the client making this subscription
00060 topic -- the name of the topic to subscribe to
00061 publish -- the callback function for incoming messages
00062
00063 """
00064 self.client_id = client_id
00065 self.topic = topic
00066 self.publish = publish
00067
00068 self.clients = {}
00069
00070 self.handler = MessageHandler(None, self._publish)
00071 self.handler_lock = Lock()
00072 self.update_params()
00073
00074 def unregister(self):
00075 """ Unsubscribes this subscription and cleans up resources """
00076 manager.unsubscribe(self.client_id, self.topic)
00077 with self.handler_lock:
00078 self.handler.finish()
00079 self.clients.clear()
00080
00081 def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
00082 queue_length=0, fragment_size=None, compression="none"):
00083 """ Add another client's subscription request
00084
00085 If there are multiple calls to subscribe, the values actually used for
00086 queue_length, fragment_size, compression and throttle_rate are
00087 chosen to encompass all subscriptions' requirements
00088
00089 Keyword arguments:
00090 sid -- the subscription id from the client
00091 msg_type -- the type of the message to subscribe to
00092 throttle_rate -- the minimum time (in ms) allowed between messages
00093 being sent. If multiple subscriptions, the lower of these is used
00094 queue_length -- the number of messages that can be buffered. If
00095 multiple subscriptions, the lower of these is used
00096 fragment_size -- None if no fragmentation, or the maximum length of
00097 allowed outgoing messages
00098 compression -- "none" if no compression, or some other value if
00099 compression is to be used (current valid values are 'png')
00100
00101 """
00102
00103 client_details = {
00104 "throttle_rate": throttle_rate,
00105 "queue_length": queue_length,
00106 "fragment_size": fragment_size,
00107 "compression": compression
00108 }
00109
00110 self.clients[sid] = client_details
00111
00112 self.update_params()
00113
00114
00115 manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
00116
00117 def unsubscribe(self, sid=None):
00118 """ Unsubscribe this particular client's subscription
00119
00120 Keyword arguments:
00121 sid -- the individual subscription id. If None, all are unsubscribed
00122
00123 """
00124 if sid is None:
00125 self.clients.clear()
00126 elif sid in self.clients:
00127 del self.clients[sid]
00128
00129 if not self.is_empty():
00130 self.update_params()
00131
00132 def is_empty(self):
00133 """ Return true if there are no subscriptions currently """
00134 return len(self.clients) == 0
00135
00136 def _publish(self, message):
00137 """ Internal method to propagate published messages to the registered
00138 publish callback """
00139 self.publish(message, self.fragment_size, self.compression)
00140
00141 def on_msg(self, msg):
00142 """ Raw callback called by subscription manager for all incoming
00143 messages.
00144
00145 Incoming messages are passed to the message handler which may drop,
00146 buffer, or propagate the message
00147
00148 """
00149 with self.handler_lock:
00150 self.handler.handle_message(msg)
00151
00152 def update_params(self):
00153 """ Determine the 'lowest common denominator' params to satisfy all
00154 subscribed clients. """
00155 if len(self.clients) == 0:
00156 self.throttle_rate = 0
00157 self.queue_length = 0
00158 self.fragment_size = None
00159 self.compression = "none"
00160 return
00161
00162 def f(fieldname):
00163 return [x[fieldname] for x in self.clients.values()]
00164
00165 self.throttle_rate = min(f("throttle_rate"))
00166 self.queue_length = min(f("queue_length"))
00167 frags = [x for x in f("fragment_size") if x != None]
00168 if frags == []:
00169 self.fragment_size = None
00170 else:
00171 self.fragment_size = min(frags)
00172 self.compression = "png" if "png" in f("compression") else "none"
00173
00174 with self.handler_lock:
00175 self.handler = self.handler.set_throttle_rate(self.throttle_rate)
00176 self.handler = self.handler.set_queue_length(self.queue_length)
00177
00178
00179 class Subscribe(Capability):
00180
00181 subscribe_msg_fields = [(True, "topic", (str, unicode)), (False, "type", (str, unicode)),
00182 (False, "throttle_rate", int), (False, "fragment_size", int),
00183 (False, "queue_length", int), (False, "compression", (str, unicode))]
00184 unsubscribe_msg_fields = [(True, "topic", (str, unicode))]
00185
00186 def __init__(self, protocol):
00187
00188 Capability.__init__(self, protocol)
00189
00190
00191 protocol.register_operation("subscribe", self.subscribe)
00192 protocol.register_operation("unsubscribe", self.unsubscribe)
00193
00194 self._subscriptions = {}
00195
00196 def subscribe(self, msg):
00197
00198 sid = msg.get("id", None)
00199
00200
00201 self.basic_type_check(msg, self.subscribe_msg_fields)
00202
00203
00204 topic = msg["topic"]
00205 if not topic in self._subscriptions:
00206 client_id = self.protocol.client_id
00207 cb = partial(self.publish, topic)
00208 self._subscriptions[topic] = Subscription(client_id, topic, cb)
00209
00210
00211 subscribe_args = {
00212 "sid": sid,
00213 "msg_type": msg.get("type", None),
00214 "throttle_rate": msg.get("throttle_rate", 0),
00215 "fragment_size": msg.get("fragment_size", None),
00216 "queue_length": msg.get("queue_length", 0),
00217 "compression": msg.get("compression", "none")
00218 }
00219 self._subscriptions[topic].subscribe(**subscribe_args)
00220
00221 self.protocol.log("info", "Subscribed to %s" % topic)
00222
00223 def unsubscribe(self, msg):
00224
00225 sid = msg.get("id", None)
00226
00227 self.basic_type_check(msg, self.unsubscribe_msg_fields)
00228
00229 topic = msg["topic"]
00230 if topic not in self._subscriptions:
00231 return
00232 self._subscriptions[topic].unsubscribe(sid)
00233
00234 if self._subscriptions[topic].is_empty():
00235 self._subscriptions[topic].unregister()
00236 del self._subscriptions[topic]
00237
00238 self.protocol.log("info", "Unsubscribed from %s" % topic)
00239
00240 def publish(self, topic, message, fragment_size=None, compression="none"):
00241 """ Publish a message to the client
00242
00243 Keyword arguments:
00244 topic -- the topic to publish the message on
00245 message -- a dict of key-value pairs. Will be wrapped in a message with
00246 opcode publish
00247 fragment_size -- (optional) fragment the serialized message into msgs
00248 with payloads not greater than this value
00249 compression -- (optional) compress the message. valid values are
00250 'png' and 'none'
00251
00252 """
00253
00254 outgoing_msg = {"op": "publish", "topic": topic, "msg": message}
00255 if compression=="png":
00256 outgoing_msg_dumped = dumps(outgoing_msg)
00257 outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)}
00258 self.protocol.send(outgoing_msg)
00259
00260 def finish(self):
00261 for subscription in self._subscriptions.values():
00262 subscription.unregister()
00263 self._subscriptions.clear()
00264 self.protocol.unregister_operation("subscribe")
00265 self.protocol.unregister_operation("unsubscribe")