00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 from time import time
00034 from copy import copy
00035 from threading import Lock
00036 from rospy import Publisher, SubscribeListener
00037 from rostopic import get_topic_type
00038 from rosbridge_library.internal import ros_loader, message_conversion
00039 from rosbridge_library.internal.topics import TopicNotEstablishedException, TypeConflictException
00040
00041
00042 class PublisherConsistencyListener(SubscribeListener):
00043 """ This class is used to solve the problem that sometimes we create a
00044 publisher and then immediately publish a message, before the subscribers
00045 have set up their connections.
00046
00047 Call attach() to attach the listener to a publisher. It sets up a buffer
00048 of outgoing messages, then when a new connection occurs, sends the messages
00049 in the buffer.
00050
00051 Call detach() to detach the listener from the publisher and restore the
00052 original publish methods.
00053
00054 After some particular timeout (default to 1 second), the listener stops
00055 buffering messages as it is assumed by this point all subscribers will have
00056 successfully set up their connections."""
00057
00058 timeout = 1
00059 attached = False
00060
00061 def attach(self, publisher):
00062 """ Overrides the publisher's publish method, and attaches a subscribe
00063 listener to the publisher, effectively routing incoming connections
00064 and outgoing publish requests through this class instance """
00065
00066 self.publisher = publisher
00067 publisher.impl.add_subscriber_listener(self)
00068 self.publish = publisher.publish
00069 publisher.publish = self.publish_override
00070
00071
00072 self.lock = Lock()
00073 self.established_time = time()
00074 self.msg_buffer = []
00075 self.attached = True
00076
00077 def detach(self):
00078 """ Restores the publisher's original publish method and unhooks the
00079 subscribe listeners, effectively finishing with this object """
00080 self.publisher.publish = self.publish
00081 if self in self.publisher.impl.subscriber_listeners:
00082 self.publisher.impl.subscriber_listeners.remove(self)
00083 self.attached = False
00084
00085 def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00086 """ Called whenever there's a new subscription.
00087
00088 If we're still inside the subscription setup window, then we publish
00089 any buffered messages to the peer.
00090
00091 We also check if we're timed out, but if we are we don't detach (due
00092 to threading complications), we just don't propagate buffered messages
00093 """
00094 if not self.timed_out():
00095 self.lock.acquire()
00096 msgs = copy(self.msg_buffer)
00097 self.lock.release()
00098 for msg in msgs:
00099 peer_publish(msg)
00100
00101 def timed_out(self):
00102 """ Checks to see how much time has elapsed since the publisher was
00103 created """
00104 return time() - self.established_time > self.timeout
00105
00106 def publish_override(self, message):
00107 """ The publisher's publish method is replaced with this publish method
00108 which checks for timeout and if we haven't timed out, buffers outgoing
00109 messages in preparation for new subscriptions """
00110 if not self.timed_out():
00111 self.lock.acquire()
00112 self.msg_buffer.append(message)
00113 self.lock.release()
00114 self.publish(message)
00115
00116
00117 class MultiPublisher():
00118 """ Keeps track of the clients that are using a particular publisher.
00119
00120 Provides an API to publish messages and register clients that are using
00121 this publisher """
00122
00123 def __init__(self, topic, msg_type=None):
00124 """ Register a publisher on the specified topic.
00125
00126 Keyword arguments:
00127 topic -- the name of the topic to register the publisher to
00128 msg_type -- (optional) the type to register the publisher as. If not
00129 provided, an attempt will be made to infer the topic type
00130
00131 Throws:
00132 TopicNotEstablishedException -- if no msg_type was specified by the
00133 caller and the topic is not yet established, so a topic type cannot
00134 be inferred
00135 TypeConflictException -- if the msg_type was specified by the
00136 caller and the topic is established, and the established type is
00137 different to the user-specified msg_type
00138
00139 """
00140
00141 topic_type = get_topic_type(topic)[0]
00142
00143
00144 if msg_type is None and topic_type is None:
00145 raise TopicNotEstablishedException(topic)
00146
00147
00148 if msg_type is None:
00149 msg_type = topic_type
00150
00151
00152 msg_class = ros_loader.get_message_class(msg_type)
00153
00154
00155 if topic_type is not None and topic_type != msg_class._type:
00156 raise TypeConflictException(topic, topic_type, msg_class.type)
00157
00158
00159 self.clients = {}
00160 self.topic = topic
00161 self.msg_class = msg_class
00162 self.publisher = Publisher(topic, msg_class)
00163 self.listener = PublisherConsistencyListener()
00164 self.listener.attach(self.publisher)
00165
00166 def unregister(self):
00167 """ Unregisters the publisher and clears the clients """
00168
00169
00170 self.clients.clear()
00171
00172 def verify_type(self, msg_type):
00173 """ Verify that the publisher publishes messages of the specified type.
00174
00175 Keyword arguments:
00176 msg_type -- the type to check this publisher against
00177
00178 Throws:
00179 Exception -- if ros_loader cannot load the specified msg type
00180 TypeConflictException -- if the msg_type is different than the type of
00181 this publisher
00182
00183 """
00184 if not ros_loader.get_message_class(msg_type) is self.msg_class:
00185 raise TypeConflictException(self.topic,
00186 self.msg_class._type, msg_type)
00187 return
00188
00189 def publish(self, msg):
00190 """ Publish a message using this publisher.
00191
00192 Keyword arguments:
00193 msg -- the dict (json) message to publish
00194
00195 Throws:
00196 Exception -- propagates exceptions from message conversion if the
00197 provided msg does not properly conform to the message type of this
00198 publisher
00199
00200 """
00201
00202 if self.listener.attached and self.listener.timed_out():
00203 self.listener.detach()
00204
00205
00206 inst = self.msg_class()
00207
00208
00209 message_conversion.populate_instance(msg, inst)
00210
00211
00212 self.publisher.publish(inst)
00213
00214 def register_client(self, client_id):
00215 """ Register the specified client as a client of this publisher.
00216
00217 Keyword arguments:
00218 client_id -- the ID of the client using the publisher
00219
00220 """
00221 self.clients[client_id] = True
00222
00223 def unregister_client(self, client_id):
00224 """ Unregister the specified client from this publisher.
00225
00226 If the specified client_id is not a client of this publisher, nothing
00227 happens.
00228
00229 Keyword arguments:
00230 client_id -- the ID of the client to remove
00231
00232 """
00233 if client_id in self.clients:
00234 del self.clients[client_id]
00235
00236 def has_clients(self):
00237 """ Return true if there are clients to this publisher. """
00238 return len(self.clients) != 0
00239
00240
00241 class PublisherManager():
00242 """ The PublisherManager keeps track of ROS publishers
00243
00244 It maintains a MultiPublisher instance for each registered topic
00245
00246 When unregistering a client, if there are no more clients for a publisher,
00247 then that publisher is unregistered from the ROS Master
00248 """
00249
00250 def __init__(self):
00251 self._publishers = {}
00252
00253 def register(self, client_id, topic, msg_type=None):
00254 """ Register a publisher on the specified topic.
00255
00256 Publishers are shared between clients, so a single MultiPublisher
00257 instance is created per topic, even if multiple clients register.
00258
00259 Keyword arguments:
00260 client_id -- the ID of the client making this request
00261 topic -- the name of the topic to publish on
00262 msg_type -- (optional) the type to publish
00263
00264 Throws:
00265 Exception -- exceptions are propagated from the MultiPublisher if
00266 there is a problem loading the specified msg class or establishing
00267 the publisher
00268
00269 """
00270 if not topic in self._publishers:
00271 self._publishers[topic] = MultiPublisher(topic, msg_type)
00272
00273 if msg_type is not None:
00274 self._publishers[topic].verify_type(msg_type)
00275
00276 self._publishers[topic].register_client(client_id)
00277
00278 def unregister(self, client_id, topic):
00279 """ Unregister a client from the publisher for the given topic.
00280
00281 If there are no clients remaining for that publisher, then the
00282 publisher is unregistered from the ROS Master
00283
00284 Keyword arguments:
00285 client_id -- the ID of the client making this request
00286 topic -- the topic to unregister the publisher for
00287
00288 """
00289 if not topic in self._publishers:
00290 return
00291
00292 self._publishers[topic].unregister_client(client_id)
00293
00294 if not self._publishers[topic].has_clients():
00295 self._publishers[topic].unregister()
00296 del self._publishers[topic]
00297
00298 def unregister_all(self, client_id):
00299 """ Unregisters a client from all publishers that they are registered
00300 to.
00301
00302 Keyword arguments:
00303 client_id -- the ID of the client making this request """
00304 for topic in self._publishers.keys():
00305 self.unregister(client_id, topic)
00306
00307 def publish(self, client_id, topic, msg):
00308 """ Publish a message on the given topic.
00309
00310 Tries to create a publisher on the topic if one does not already exist.
00311
00312 Keyword arguments:
00313 client_id -- the ID of the client making this request
00314 topic -- the topic to publish the message on
00315 msg -- a JSON-like dict of fields and values
00316
00317 Throws:
00318 Exception -- a variety of exceptions are propagated. They can be
00319 thrown if there is a problem setting up or getting the publisher,
00320 or if the provided msg does not map to the msg class of the publisher.
00321
00322 """
00323 self.register(client_id, topic)
00324
00325 self._publishers[topic].publish(msg)
00326
00327
00328 manager = PublisherManager()