00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 from time import time
00035 from copy import copy
00036 from threading import Lock, Timer
00037 from rospy import Publisher, SubscribeListener
00038 from rospy import logwarn
00039 from rostopic import get_topic_type
00040 from rosbridge_library.internal import ros_loader, message_conversion
00041 from rosbridge_library.internal.topics import TopicNotEstablishedException, TypeConflictException
00042
00043 UNREGISTER_TIMEOUT = 10.0
00044
00045 class PublisherConsistencyListener(SubscribeListener):
00046 """ This class is used to solve the problem that sometimes we create a
00047 publisher and then immediately publish a message, before the subscribers
00048 have set up their connections.
00049
00050 Call attach() to attach the listener to a publisher. It sets up a buffer
00051 of outgoing messages, then when a new connection occurs, sends the messages
00052 in the buffer.
00053
00054 Call detach() to detach the listener from the publisher and restore the
00055 original publish methods.
00056
00057 After some particular timeout (default to 1 second), the listener stops
00058 buffering messages as it is assumed by this point all subscribers will have
00059 successfully set up their connections."""
00060
00061 timeout = 1
00062 attached = False
00063
00064 def attach(self, publisher):
00065 """ Overrides the publisher's publish method, and attaches a subscribe
00066 listener to the publisher, effectively routing incoming connections
00067 and outgoing publish requests through this class instance """
00068
00069 self.publisher = publisher
00070 publisher.impl.add_subscriber_listener(self)
00071 self.publish = publisher.publish
00072 publisher.publish = self.publish_override
00073
00074
00075 self.lock = Lock()
00076 self.established_time = time()
00077 self.msg_buffer = []
00078 self.attached = True
00079
00080 def detach(self):
00081 """ Restores the publisher's original publish method and unhooks the
00082 subscribe listeners, effectively finishing with this object """
00083 self.publisher.publish = self.publish
00084 if self in self.publisher.impl.subscriber_listeners:
00085 self.publisher.impl.subscriber_listeners.remove(self)
00086 self.attached = False
00087
00088 def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00089 """ Called whenever there's a new subscription.
00090
00091 If we're still inside the subscription setup window, then we publish
00092 any buffered messages to the peer.
00093
00094 We also check if we're timed out, but if we are we don't detach (due
00095 to threading complications), we just don't propagate buffered messages
00096 """
00097 if not self.timed_out():
00098 self.lock.acquire()
00099 msgs = copy(self.msg_buffer)
00100 self.lock.release()
00101 for msg in msgs:
00102 peer_publish(msg)
00103
00104 def timed_out(self):
00105 """ Checks to see how much time has elapsed since the publisher was
00106 created """
00107 return time() - self.established_time > self.timeout
00108
00109 def publish_override(self, message):
00110 """ The publisher's publish method is replaced with this publish method
00111 which checks for timeout and if we haven't timed out, buffers outgoing
00112 messages in preparation for new subscriptions """
00113 if not self.timed_out():
00114 self.lock.acquire()
00115 self.msg_buffer.append(message)
00116 self.lock.release()
00117 self.publish(message)
00118
00119
00120 class MultiPublisher():
00121 """ Keeps track of the clients that are using a particular publisher.
00122
00123 Provides an API to publish messages and register clients that are using
00124 this publisher """
00125
00126 def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
00127 """ Register a publisher on the specified topic.
00128
00129 Keyword arguments:
00130 topic -- the name of the topic to register the publisher to
00131 msg_type -- (optional) the type to register the publisher as. If not
00132 provided, an attempt will be made to infer the topic type
00133 latch -- (optional) if a client requested this publisher to be latched,
00134 provide the client_id of that client here
00135
00136 Throws:
00137 TopicNotEstablishedException -- if no msg_type was specified by the
00138 caller and the topic is not yet established, so a topic type cannot
00139 be inferred
00140 TypeConflictException -- if the msg_type was specified by the
00141 caller and the topic is established, and the established type is
00142 different to the user-specified msg_type
00143
00144 """
00145
00146 topic_type = get_topic_type(topic)[0]
00147
00148
00149 if msg_type is None and topic_type is None:
00150 raise TopicNotEstablishedException(topic)
00151
00152
00153 if msg_type is None:
00154 msg_type = topic_type
00155
00156
00157 msg_class = ros_loader.get_message_class(msg_type)
00158
00159
00160 if topic_type is not None and topic_type != msg_class._type:
00161 raise TypeConflictException(topic, topic_type, msg_class._type)
00162
00163
00164 self.clients = {}
00165 self.latched_client_id = latched_client_id
00166 self.topic = topic
00167 self.msg_class = msg_class
00168 self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None), queue_size=queue_size)
00169 self.listener = PublisherConsistencyListener()
00170 self.listener.attach(self.publisher)
00171
00172 def unregister(self):
00173 """ Unregisters the publisher and clears the clients """
00174 self.publisher.unregister()
00175 self.clients.clear()
00176
00177 def verify_type(self, msg_type):
00178 """ Verify that the publisher publishes messages of the specified type.
00179
00180 Keyword arguments:
00181 msg_type -- the type to check this publisher against
00182
00183 Throws:
00184 Exception -- if ros_loader cannot load the specified msg type
00185 TypeConflictException -- if the msg_type is different than the type of
00186 this publisher
00187
00188 """
00189 if not ros_loader.get_message_class(msg_type) is self.msg_class:
00190 raise TypeConflictException(self.topic,
00191 self.msg_class._type, msg_type)
00192 return
00193
00194 def publish(self, msg):
00195 """ Publish a message using this publisher.
00196
00197 Keyword arguments:
00198 msg -- the dict (json) message to publish
00199
00200 Throws:
00201 Exception -- propagates exceptions from message conversion if the
00202 provided msg does not properly conform to the message type of this
00203 publisher
00204
00205 """
00206
00207 if self.listener.attached and self.listener.timed_out():
00208 self.listener.detach()
00209
00210
00211 inst = self.msg_class()
00212
00213
00214 message_conversion.populate_instance(msg, inst)
00215
00216
00217 self.publisher.publish(inst)
00218
00219 def register_client(self, client_id):
00220 """ Register the specified client as a client of this publisher.
00221
00222 Keyword arguments:
00223 client_id -- the ID of the client using the publisher
00224
00225 """
00226 self.clients[client_id] = True
00227
00228 def unregister_client(self, client_id):
00229 """ Unregister the specified client from this publisher.
00230
00231 If the specified client_id is not a client of this publisher, nothing
00232 happens.
00233
00234 Keyword arguments:
00235 client_id -- the ID of the client to remove
00236
00237 """
00238 if client_id in self.clients:
00239 del self.clients[client_id]
00240
00241 def has_clients(self):
00242 """ Return true if there are clients to this publisher. """
00243 return len(self.clients) != 0
00244
00245
00246 class PublisherManager():
00247 """ The PublisherManager keeps track of ROS publishers
00248
00249 It maintains a MultiPublisher instance for each registered topic
00250
00251 When unregistering a client, if there are no more clients for a publisher,
00252 then that publisher is unregistered from the ROS Master
00253 """
00254
00255 def __init__(self):
00256 self._publishers = {}
00257 self.unregister_timers = {}
00258
00259 def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
00260 """ Register a publisher on the specified topic.
00261
00262 Publishers are shared between clients, so a single MultiPublisher
00263 instance is created per topic, even if multiple clients register.
00264
00265 Keyword arguments:
00266 client_id -- the ID of the client making this request
00267 topic -- the name of the topic to publish on
00268 msg_type -- (optional) the type to publish
00269 latch -- (optional) whether to make this publisher latched
00270 queue_size -- (optional) rospy publisher queue_size to use
00271
00272 Throws:
00273 Exception -- exceptions are propagated from the MultiPublisher if
00274 there is a problem loading the specified msg class or establishing
00275 the publisher
00276
00277 """
00278 latched_client_id = client_id if latch else None
00279 if not topic in self._publishers:
00280 self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id,
00281 queue_size=queue_size)
00282 elif latch and self._publishers[topic].latched_client_id != client_id:
00283 logwarn("Client ID %s attempted to register topic [%s] as latched " +
00284 "but this topic was previously registered." % (client_id, topic))
00285 logwarn("Only a single registered latched publisher is supported at the time")
00286 elif not latch and self._publishers[topic].latched_client_id:
00287 logwarn("New non-latched publisher registration for topic [%s] which is " +
00288 "already registered as latched. but this topic was previously " +
00289 "registered." % topic)
00290 logwarn("Only a single registered latched publisher is supported at the time")
00291
00292 if msg_type is not None:
00293 self._publishers[topic].verify_type(msg_type)
00294
00295 self._publishers[topic].register_client(client_id)
00296
00297 def unregister(self, client_id, topic):
00298 """ Unregister a client from the publisher for the given topic.
00299 Will wait some time before actually unregistering, it is done in
00300 _unregister_impl
00301
00302 If there are no clients remaining for that publisher, then the
00303 publisher is unregistered from the ROS Master
00304
00305 Keyword arguments:
00306 client_id -- the ID of the client making this request
00307 topic -- the topic to unregister the publisher for
00308
00309 """
00310 if not topic in self._publishers:
00311 return
00312
00313 self._publishers[topic].unregister_client(client_id)
00314 if self.unregister_timers.has_key(topic):
00315 self.unregister_timers[topic].cancel()
00316 del self.unregister_timers[topic]
00317 self.unregister_timers[topic] = Timer(UNREGISTER_TIMEOUT, self._unregister_impl,
00318 [topic])
00319 self.unregister_timers[topic].start()
00320
00321 def _unregister_impl(self, topic):
00322 if not self._publishers[topic].has_clients():
00323 self._publishers[topic].unregister()
00324 del self._publishers[topic]
00325 del self.unregister_timers[topic]
00326
00327 def unregister_all(self, client_id):
00328 """ Unregisters a client from all publishers that they are registered
00329 to.
00330
00331 Keyword arguments:
00332 client_id -- the ID of the client making this request """
00333 for topic in self._publishers.keys():
00334 self.unregister(client_id, topic)
00335
00336 def publish(self, client_id, topic, msg, latch=False, queue_size=100):
00337 """ Publish a message on the given topic.
00338
00339 Tries to create a publisher on the topic if one does not already exist.
00340
00341 Keyword arguments:
00342 client_id -- the ID of the client making this request
00343 topic -- the topic to publish the message on
00344 msg -- a JSON-like dict of fields and values
00345 latch -- (optional) whether to make this publisher latched
00346 queue_size -- (optional) rospy publisher queue_size to use
00347
00348 Throws:
00349 Exception -- a variety of exceptions are propagated. They can be
00350 thrown if there is a problem setting up or getting the publisher,
00351 or if the provided msg does not map to the msg class of the publisher.
00352
00353 """
00354 self.register(client_id, topic, latch=latch, queue_size=queue_size)
00355
00356 self._publishers[topic].publish(msg)
00357
00358
00359 manager = PublisherManager()