36 from threading
import Lock, Timer
37 from rospy
import Publisher, SubscribeListener
38 from rospy
import logwarn
39 from rostopic
import get_topic_type
45 """ This class is used to solve the problem that sometimes we create a
46 publisher and then immediately publish a message, before the subscribers
47 have set up their connections.
49 Call attach() to attach the listener to a publisher. It sets up a buffer
50 of outgoing messages, then when a new connection occurs, sends the messages
53 Call detach() to detach the listener from the publisher and restore the
54 original publish methods.
56 After some particular timeout (default to 1 second), the listener stops
57 buffering messages as it is assumed by this point all subscribers will have
58 successfully set up their connections."""
64 """ Overrides the publisher's publish method, and attaches a subscribe
65 listener to the publisher, effectively routing incoming connections
66 and outgoing publish requests through this class instance """
69 publisher.impl.add_subscriber_listener(self)
80 """ Restores the publisher's original publish method and unhooks the
81 subscribe listeners, effectively finishing with this object """
83 if self
in self.
publisher.impl.subscriber_listeners:
84 self.
publisher.impl.subscriber_listeners.remove(self)
88 """ Called whenever there's a new subscription.
90 If we're still inside the subscription setup window, then we publish
91 any buffered messages to the peer.
93 We also check if we're timed out, but if we are we don't detach (due
94 to threading complications), we just don't propagate buffered messages
103 """ Checks to see how much time has elapsed since the publisher was
108 """ The publisher's publish method is replaced with this publish method
109 which checks for timeout and if we haven't timed out, buffers outgoing
110 messages in preparation for new subscriptions """
118 """ Keeps track of the clients that are using a particular publisher.
120 Provides an API to publish messages and register clients that are using
123 def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
124 """ Register a publisher on the specified topic.
127 topic -- the name of the topic to register the publisher to
128 msg_type -- (optional) the type to register the publisher as. If not
129 provided, an attempt will be made to infer the topic type
130 latch -- (optional) if a client requested this publisher to be latched,
131 provide the client_id of that client here
134 TopicNotEstablishedException -- if no msg_type was specified by the
135 caller and the topic is not yet established, so a topic type cannot
137 TypeConflictException -- if the msg_type was specified by the
138 caller and the topic is established, and the established type is
139 different to the user-specified msg_type
143 topic_type = get_topic_type(topic)[0]
146 if msg_type
is None and topic_type
is None:
151 msg_type = topic_type
154 msg_class = ros_loader.get_message_class(msg_type)
157 if topic_type
is not None and topic_type != msg_class._type:
165 self.
publisher = Publisher(topic, msg_class, latch=(latched_client_id!=
None), queue_size=queue_size)
170 """ Unregisters the publisher and clears the clients """
175 """ Verify that the publisher publishes messages of the specified type.
178 msg_type -- the type to check this publisher against
181 Exception -- if ros_loader cannot load the specified msg type
182 TypeConflictException -- if the msg_type is different than the type of
186 if not ros_loader.get_message_class(msg_type)
is self.
msg_class:
192 """ Publish a message using this publisher.
195 msg -- the dict (json) message to publish
198 Exception -- propagates exceptions from message conversion if the
199 provided msg does not properly conform to the message type of this
211 message_conversion.populate_instance(msg, inst)
217 """ Register the specified client as a client of this publisher.
220 client_id -- the ID of the client using the publisher
226 """ Unregister the specified client from this publisher.
228 If the specified client_id is not a client of this publisher, nothing
232 client_id -- the ID of the client to remove
239 """ Return true if there are clients to this publisher. """
244 """ The PublisherManager keeps track of ROS publishers
246 It maintains a MultiPublisher instance for each registered topic
248 When unregistering a client, if there are no more clients for a publisher,
249 then that publisher is unregistered from the ROS Master
257 def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
258 """ Register a publisher on the specified topic.
260 Publishers are shared between clients, so a single MultiPublisher
261 instance is created per topic, even if multiple clients register.
264 client_id -- the ID of the client making this request
265 topic -- the name of the topic to publish on
266 msg_type -- (optional) the type to publish
267 latch -- (optional) whether to make this publisher latched
268 queue_size -- (optional) rospy publisher queue_size to use
271 Exception -- exceptions are propagated from the MultiPublisher if
272 there is a problem loading the specified msg class or establishing
276 latched_client_id = client_id
if latch
else None
279 queue_size=queue_size)
280 elif latch
and self.
_publishers[topic].latched_client_id != client_id:
281 logwarn(
"Client ID %s attempted to register topic [%s] as latched " +
282 "but this topic was previously registered.", client_id, topic)
283 logwarn(
"Only a single registered latched publisher is supported at the time")
284 elif not latch
and self.
_publishers[topic].latched_client_id:
285 logwarn(
"New non-latched publisher registration for topic [%s] which is " +
286 "already registered as latched. but this topic was previously " +
287 "registered.", topic)
288 logwarn(
"Only a single registered latched publisher is supported at the time")
290 if msg_type
is not None:
296 """ Unregister a client from the publisher for the given topic.
297 Will wait some time before actually unregistering, it is done in
300 If there are no clients remaining for that publisher, then the
301 publisher is unregistered from the ROS Master
304 client_id -- the ID of the client making this request
305 topic -- the topic to unregister the publisher for
311 self.
_publishers[topic].unregister_client(client_id)
326 """ Unregisters a client from all publishers that they are registered
330 client_id -- the ID of the client making this request """
334 def publish(self, client_id, topic, msg, latch=False, queue_size=100):
335 """ Publish a message on the given topic.
337 Tries to create a publisher on the topic if one does not already exist.
340 client_id -- the ID of the client making this request
341 topic -- the topic to publish the message on
342 msg -- a JSON-like dict of fields and values
343 latch -- (optional) whether to make this publisher latched
344 queue_size -- (optional) rospy publisher queue_size to use
347 Exception -- a variety of exceptions are propagated. They can be
348 thrown if there is a problem setting up or getting the publisher,
349 or if the provided msg does not map to the msg class of the publisher.
352 self.
register(client_id, topic, latch=latch, queue_size=queue_size)