00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 from time import time
00035 from copy import copy
00036 from threading import Lock
00037 from rospy import Publisher, SubscribeListener
00038 from rospy import logwarn
00039 from rostopic import get_topic_type
00040 from rosbridge_library.internal import ros_loader, message_conversion
00041 from rosbridge_library.internal.topics import TopicNotEstablishedException, TypeConflictException
00042
00043
00044 class PublisherConsistencyListener(SubscribeListener):
00045 """ This class is used to solve the problem that sometimes we create a
00046 publisher and then immediately publish a message, before the subscribers
00047 have set up their connections.
00048
00049 Call attach() to attach the listener to a publisher. It sets up a buffer
00050 of outgoing messages, then when a new connection occurs, sends the messages
00051 in the buffer.
00052
00053 Call detach() to detach the listener from the publisher and restore the
00054 original publish methods.
00055
00056 After some particular timeout (default to 1 second), the listener stops
00057 buffering messages as it is assumed by this point all subscribers will have
00058 successfully set up their connections."""
00059
00060 timeout = 1
00061 attached = False
00062
00063 def attach(self, publisher):
00064 """ Overrides the publisher's publish method, and attaches a subscribe
00065 listener to the publisher, effectively routing incoming connections
00066 and outgoing publish requests through this class instance """
00067
00068 self.publisher = publisher
00069 publisher.impl.add_subscriber_listener(self)
00070 self.publish = publisher.publish
00071 publisher.publish = self.publish_override
00072
00073
00074 self.lock = Lock()
00075 self.established_time = time()
00076 self.msg_buffer = []
00077 self.attached = True
00078
00079 def detach(self):
00080 """ Restores the publisher's original publish method and unhooks the
00081 subscribe listeners, effectively finishing with this object """
00082 self.publisher.publish = self.publish
00083 if self in self.publisher.impl.subscriber_listeners:
00084 self.publisher.impl.subscriber_listeners.remove(self)
00085 self.attached = False
00086
00087 def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00088 """ Called whenever there's a new subscription.
00089
00090 If we're still inside the subscription setup window, then we publish
00091 any buffered messages to the peer.
00092
00093 We also check if we're timed out, but if we are we don't detach (due
00094 to threading complications), we just don't propagate buffered messages
00095 """
00096 if not self.timed_out():
00097 self.lock.acquire()
00098 msgs = copy(self.msg_buffer)
00099 self.lock.release()
00100 for msg in msgs:
00101 peer_publish(msg)
00102
00103 def timed_out(self):
00104 """ Checks to see how much time has elapsed since the publisher was
00105 created """
00106 return time() - self.established_time > self.timeout
00107
00108 def publish_override(self, message):
00109 """ The publisher's publish method is replaced with this publish method
00110 which checks for timeout and if we haven't timed out, buffers outgoing
00111 messages in preparation for new subscriptions """
00112 if not self.timed_out():
00113 self.lock.acquire()
00114 self.msg_buffer.append(message)
00115 self.lock.release()
00116 self.publish(message)
00117
00118
00119 class MultiPublisher():
00120 """ Keeps track of the clients that are using a particular publisher.
00121
00122 Provides an API to publish messages and register clients that are using
00123 this publisher """
00124
00125 def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
00126 """ Register a publisher on the specified topic.
00127
00128 Keyword arguments:
00129 topic -- the name of the topic to register the publisher to
00130 msg_type -- (optional) the type to register the publisher as. If not
00131 provided, an attempt will be made to infer the topic type
00132 latch -- (optional) if a client requested this publisher to be latched,
00133 provide the client_id of that client here
00134
00135 Throws:
00136 TopicNotEstablishedException -- if no msg_type was specified by the
00137 caller and the topic is not yet established, so a topic type cannot
00138 be inferred
00139 TypeConflictException -- if the msg_type was specified by the
00140 caller and the topic is established, and the established type is
00141 different to the user-specified msg_type
00142
00143 """
00144
00145 topic_type = get_topic_type(topic)[0]
00146
00147
00148 if msg_type is None and topic_type is None:
00149 raise TopicNotEstablishedException(topic)
00150
00151
00152 if msg_type is None:
00153 msg_type = topic_type
00154
00155
00156 msg_class = ros_loader.get_message_class(msg_type)
00157
00158
00159 if topic_type is not None and topic_type != msg_class._type:
00160 raise TypeConflictException(topic, topic_type, msg_class._type)
00161
00162
00163 self.clients = {}
00164 self.latched_client_id = latched_client_id
00165 self.topic = topic
00166 self.msg_class = msg_class
00167 self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None), queue_size=queue_size)
00168 self.listener = PublisherConsistencyListener()
00169 self.listener.attach(self.publisher)
00170
00171 def unregister(self):
00172 """ Unregisters the publisher and clears the clients """
00173 self.publisher.unregister()
00174 self.clients.clear()
00175
00176 def verify_type(self, msg_type):
00177 """ Verify that the publisher publishes messages of the specified type.
00178
00179 Keyword arguments:
00180 msg_type -- the type to check this publisher against
00181
00182 Throws:
00183 Exception -- if ros_loader cannot load the specified msg type
00184 TypeConflictException -- if the msg_type is different than the type of
00185 this publisher
00186
00187 """
00188 if not ros_loader.get_message_class(msg_type) is self.msg_class:
00189 raise TypeConflictException(self.topic,
00190 self.msg_class._type, msg_type)
00191 return
00192
00193 def publish(self, msg):
00194 """ Publish a message using this publisher.
00195
00196 Keyword arguments:
00197 msg -- the dict (json) message to publish
00198
00199 Throws:
00200 Exception -- propagates exceptions from message conversion if the
00201 provided msg does not properly conform to the message type of this
00202 publisher
00203
00204 """
00205
00206 if self.listener.attached and self.listener.timed_out():
00207 self.listener.detach()
00208
00209
00210 inst = self.msg_class()
00211
00212
00213 message_conversion.populate_instance(msg, inst)
00214
00215
00216 self.publisher.publish(inst)
00217
00218 def register_client(self, client_id):
00219 """ Register the specified client as a client of this publisher.
00220
00221 Keyword arguments:
00222 client_id -- the ID of the client using the publisher
00223
00224 """
00225 self.clients[client_id] = True
00226
00227 def unregister_client(self, client_id):
00228 """ Unregister the specified client from this publisher.
00229
00230 If the specified client_id is not a client of this publisher, nothing
00231 happens.
00232
00233 Keyword arguments:
00234 client_id -- the ID of the client to remove
00235
00236 """
00237 if client_id in self.clients:
00238 del self.clients[client_id]
00239
00240 def has_clients(self):
00241 """ Return true if there are clients to this publisher. """
00242 return len(self.clients) != 0
00243
00244
00245 class PublisherManager():
00246 """ The PublisherManager keeps track of ROS publishers
00247
00248 It maintains a MultiPublisher instance for each registered topic
00249
00250 When unregistering a client, if there are no more clients for a publisher,
00251 then that publisher is unregistered from the ROS Master
00252 """
00253
00254 def __init__(self):
00255 self._publishers = {}
00256
00257 def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
00258 """ Register a publisher on the specified topic.
00259
00260 Publishers are shared between clients, so a single MultiPublisher
00261 instance is created per topic, even if multiple clients register.
00262
00263 Keyword arguments:
00264 client_id -- the ID of the client making this request
00265 topic -- the name of the topic to publish on
00266 msg_type -- (optional) the type to publish
00267 latch -- (optional) whether to make this publisher latched
00268 queue_size -- (optional) rospy publisher queue_size to use
00269
00270 Throws:
00271 Exception -- exceptions are propagated from the MultiPublisher if
00272 there is a problem loading the specified msg class or establishing
00273 the publisher
00274
00275 """
00276 latched_client_id = client_id if latch else None
00277 if not topic in self._publishers:
00278 self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id,
00279 queue_size=queue_size)
00280 elif latch and self._publishers[topic].latched_client_id != client_id:
00281 logwarn("Client ID %s attempted to register topic [%s] as latched " +
00282 "but this topic was previously registered." % (client_id, topic))
00283 logwarn("Only a single registered latched publisher is supported at the time")
00284 elif not latch and self._publishers[topic].latched_client_id:
00285 logwarn("New non-latched publisher registration for topic [%s] which is " +
00286 "already registered as latched. but this topic was previously " +
00287 "registered." % topic)
00288 logwarn("Only a single registered latched publisher is supported at the time")
00289
00290 if msg_type is not None:
00291 self._publishers[topic].verify_type(msg_type)
00292
00293 self._publishers[topic].register_client(client_id)
00294
00295 def unregister(self, client_id, topic):
00296 """ Unregister a client from the publisher for the given topic.
00297
00298 If there are no clients remaining for that publisher, then the
00299 publisher is unregistered from the ROS Master
00300
00301 Keyword arguments:
00302 client_id -- the ID of the client making this request
00303 topic -- the topic to unregister the publisher for
00304
00305 """
00306 if not topic in self._publishers:
00307 return
00308
00309 self._publishers[topic].unregister_client(client_id)
00310
00311 if not self._publishers[topic].has_clients():
00312 self._publishers[topic].unregister()
00313 del self._publishers[topic]
00314
00315 def unregister_all(self, client_id):
00316 """ Unregisters a client from all publishers that they are registered
00317 to.
00318
00319 Keyword arguments:
00320 client_id -- the ID of the client making this request """
00321 for topic in self._publishers.keys():
00322 self.unregister(client_id, topic)
00323
00324 def publish(self, client_id, topic, msg, latch=False, queue_size=100):
00325 """ Publish a message on the given topic.
00326
00327 Tries to create a publisher on the topic if one does not already exist.
00328
00329 Keyword arguments:
00330 client_id -- the ID of the client making this request
00331 topic -- the topic to publish the message on
00332 msg -- a JSON-like dict of fields and values
00333 latch -- (optional) whether to make this publisher latched
00334 queue_size -- (optional) rospy publisher queue_size to use
00335
00336 Throws:
00337 Exception -- a variety of exceptions are propagated. They can be
00338 thrown if there is a problem setting up or getting the publisher,
00339 or if the provided msg does not map to the msg class of the publisher.
00340
00341 """
00342 self.register(client_id, topic, latch=latch, queue_size=queue_size)
00343
00344 self._publishers[topic].publish(msg)
00345
00346
00347 manager = PublisherManager()