00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 from threading import Lock
00034 from functools import partial
00035 from rospy import loginfo
00036 from rosbridge_library.capability import Capability
00037 from rosbridge_library.internal.subscribers import manager
00038 from rosbridge_library.internal.subscription_modifiers import MessageHandler
00039 from rosbridge_library.internal.pngcompression import encode
00040 try:
00041 from ujson import dumps
00042 except ImportError:
00043 try:
00044 from simplejson import dumps
00045 except ImportError:
00046 from json import dumps
00047
00048
00049 class Subscription():
00050 """ Keeps track of the clients multiple calls to subscribe.
00051
00052 Chooses the most appropriate settings to send messages """
00053
00054 def __init__(self, client_id, topic, publish):
00055 """ Create a subscription for the specified client on the specified
00056 topic, with callback publish
00057
00058 Keyword arguments:
00059 client_id -- the ID of the client making this subscription
00060 topic -- the name of the topic to subscribe to
00061 publish -- the callback function for incoming messages
00062
00063 """
00064 self.client_id = client_id
00065 self.topic = topic
00066 self.publish = publish
00067
00068 self.clients = {}
00069
00070 self.handler = MessageHandler(None, self._publish)
00071 self.handler_lock = Lock()
00072 self.update_params()
00073
00074 def unregister(self):
00075 """ Unsubscribes this subscription and cleans up resources """
00076 manager.unsubscribe(self.client_id, self.topic)
00077 with self.handler_lock:
00078 self.handler.finish()
00079 self.clients.clear()
00080
00081 def subscribe(self, sid=None, msg_type=None, throttle_rate=0,
00082 queue_length=0, fragment_size=None, compression="none"):
00083 """ Add another client's subscription request
00084
00085 If there are multiple calls to subscribe, the values actually used for
00086 queue_length, fragment_size, compression and throttle_rate are
00087 chosen to encompass all subscriptions' requirements
00088
00089 Keyword arguments:
00090 sid -- the subscription id from the client
00091 msg_type -- the type of the message to subscribe to
00092 throttle_rate -- the minimum time (in ms) allowed between messages
00093 being sent. If multiple subscriptions, the lower of these is used
00094 queue_length -- the number of messages that can be buffered. If
00095 multiple subscriptions, the lower of these is used
00096 fragment_size -- None if no fragmentation, or the maximum length of
00097 allowed outgoing messages
00098 compression -- "none" if no compression, or some other value if
00099 compression is to be used (current valid values are 'png')
00100
00101 """
00102
00103 manager.subscribe(self.client_id, self.topic, self.on_msg, msg_type)
00104
00105 client_details = {
00106 "throttle_rate": throttle_rate,
00107 "queue_length": queue_length,
00108 "fragment_size": fragment_size,
00109 "compression": compression
00110 }
00111
00112 self.clients[sid] = client_details
00113
00114 self.update_params()
00115
00116 def unsubscribe(self, sid=None):
00117 """ Unsubscribe this particular client's subscription
00118
00119 Keyword arguments:
00120 sid -- the individual subscription id. If None, all are unsubscribed
00121
00122 """
00123 if sid is None:
00124 self.clients.clear()
00125 elif sid in self.clients:
00126 del self.clients[sid]
00127
00128 if not self.is_empty():
00129 self.update_params()
00130
00131 def is_empty(self):
00132 """ Return true if there are no subscriptions currently """
00133 return len(self.clients) == 0
00134
00135 def _publish(self, message):
00136 """ Internal method to propagate published messages to the registered
00137 publish callback """
00138 self.publish(message, self.fragment_size, self.compression)
00139
00140 def on_msg(self, msg):
00141 """ Raw callback called by subscription manager for all incoming
00142 messages.
00143
00144 Incoming messages are passed to the message handler which may drop,
00145 buffer, or propagate the message
00146
00147 """
00148 with self.handler_lock:
00149 self.handler.handle_message(msg)
00150
00151 def update_params(self):
00152 """ Determine the 'lowest common denominator' params to satisfy all
00153 subscribed clients. """
00154 if len(self.clients) == 0:
00155 self.throttle_rate = 0
00156 self.queue_length = 0
00157 self.fragment_size = None
00158 self.compression = "none"
00159 return
00160
00161 def f(fieldname):
00162 return [x[fieldname] for x in self.clients.values()]
00163
00164 self.throttle_rate = min(f("throttle_rate"))
00165 self.queue_length = min(f("queue_length"))
00166 frags = [x for x in f("fragment_size") if x != None]
00167 if frags == []:
00168 self.fragment_size = None
00169 else:
00170 self.fragment_size = min(frags)
00171 self.compression = "png" if "png" in f("compression") else "none"
00172
00173 with self.handler_lock:
00174 self.handler = self.handler.set_throttle_rate(self.throttle_rate)
00175 self.handler = self.handler.set_queue_length(self.queue_length)
00176
00177
00178 class Subscribe(Capability):
00179
00180 subscribe_msg_fields = [(True, "topic", (str, unicode)), (False, "type", (str, unicode)),
00181 (False, "throttle_rate", int), (False, "fragment_size", int),
00182 (False, "queue_length", int), (False, "compression", (str, unicode))]
00183 unsubscribe_msg_fields = [(True, "topic", (str, unicode))]
00184
00185 def __init__(self, protocol):
00186
00187 Capability.__init__(self, protocol)
00188
00189
00190 protocol.register_operation("subscribe", self.subscribe)
00191 protocol.register_operation("unsubscribe", self.unsubscribe)
00192
00193 self._subscriptions = {}
00194
00195 def subscribe(self, msg):
00196
00197 sid = msg.get("id", None)
00198
00199
00200 self.basic_type_check(msg, self.subscribe_msg_fields)
00201
00202
00203 topic = msg["topic"]
00204 if not topic in self._subscriptions:
00205 client_id = self.protocol.client_id
00206 cb = partial(self.publish, topic)
00207 self._subscriptions[topic] = Subscription(client_id, topic, cb)
00208
00209
00210 subscribe_args = {
00211 "sid": sid,
00212 "msg_type": msg.get("type", None),
00213 "throttle_rate": msg.get("throttle_rate", 0),
00214 "fragment_size": msg.get("fragment_size", None),
00215 "queue_length": msg.get("queue_length", 0),
00216 "compression": msg.get("compression", "none")
00217 }
00218 self._subscriptions[topic].subscribe(**subscribe_args)
00219
00220 self.protocol.log("info", "Subscribed to %s" % topic)
00221
00222 def unsubscribe(self, msg):
00223
00224 sid = msg.get("id", None)
00225
00226 self.basic_type_check(msg, self.unsubscribe_msg_fields)
00227
00228 topic = msg["topic"]
00229 if topic not in self._subscriptions:
00230 return
00231 self._subscriptions[topic].unsubscribe(sid)
00232
00233 if self._subscriptions[topic].is_empty():
00234 self._subscriptions[topic].unregister()
00235 del self._subscriptions[topic]
00236
00237 self.protocol.log("info", "Unsubscribed from %s" % topic)
00238
00239 def publish(self, topic, message, fragment_size=None, compression="none"):
00240 """ Publish a message to the client
00241
00242 Keyword arguments:
00243 topic -- the topic to publish the message on
00244 message -- a dict of key-value pairs. Will be wrapped in a message with
00245 opcode publish
00246 fragment_size -- (optional) fragment the serialized message into msgs
00247 with payloads not greater than this value
00248 compression -- (optional) compress the message. valid values are
00249 'png' and 'none'
00250
00251 """
00252
00253 outgoing_msg = {"op": "publish", "topic": topic, "msg": message}
00254 if compression=="png":
00255 outgoing_msg_dumped = dumps(outgoing_msg)
00256 outgoing_msg = {"op": "png", "data": encode(outgoing_msg_dumped)}
00257 self.protocol.send(outgoing_msg)
00258
00259 def finish(self):
00260 for subscription in self._subscriptions.values():
00261 subscription.unregister()
00262 self._subscriptions.clear()
00263 self.protocol.unregister_operation("subscribe")
00264 self.protocol.unregister_operation("unsubscribe")