36 from threading
import Lock, Timer
37 from rospy
import Publisher, SubscribeListener
38 from rospy
import logwarn
39 from rostopic
import get_topic_type
45 """ This class is used to solve the problem that sometimes we create a 46 publisher and then immediately publish a message, before the subscribers 47 have set up their connections. 49 Call attach() to attach the listener to a publisher. It sets up a buffer 50 of outgoing messages, then when a new connection occurs, sends the messages 53 Call detach() to detach the listener from the publisher and restore the 54 original publish methods. 56 After some particular timeout (default to 1 second), the listener stops 57 buffering messages as it is assumed by this point all subscribers will have 58 successfully set up their connections.""" 64 """ Overrides the publisher's publish method, and attaches a subscribe 65 listener to the publisher, effectively routing incoming connections 66 and outgoing publish requests through this class instance """ 69 publisher.impl.add_subscriber_listener(self)
80 """ Restores the publisher's original publish method and unhooks the 81 subscribe listeners, effectively finishing with this object """ 82 self.publisher.publish = self.
publish 83 if self
in self.publisher.impl.subscriber_listeners:
84 self.publisher.impl.subscriber_listeners.remove(self)
88 """ Called whenever there's a new subscription. 90 If we're still inside the subscription setup window, then we publish 91 any buffered messages to the peer. 93 We also check if we're timed out, but if we are we don't detach (due 94 to threading complications), we just don't propagate buffered messages 103 """ Checks to see how much time has elapsed since the publisher was 108 """ The publisher's publish method is replaced with this publish method 109 which checks for timeout and if we haven't timed out, buffers outgoing 110 messages in preparation for new subscriptions """ 113 self.msg_buffer.append(message)
118 """ Keeps track of the clients that are using a particular publisher. 120 Provides an API to publish messages and register clients that are using 123 def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
124 """ Register a publisher on the specified topic. 127 topic -- the name of the topic to register the publisher to 128 msg_type -- (optional) the type to register the publisher as. If not 129 provided, an attempt will be made to infer the topic type 130 latch -- (optional) if a client requested this publisher to be latched, 131 provide the client_id of that client here 134 TopicNotEstablishedException -- if no msg_type was specified by the 135 caller and the topic is not yet established, so a topic type cannot 137 TypeConflictException -- if the msg_type was specified by the 138 caller and the topic is established, and the established type is 139 different to the user-specified msg_type 143 topic_type = get_topic_type(topic)[0]
146 if msg_type
is None and topic_type
is None:
151 msg_type = topic_type
154 msg_class = ros_loader.get_message_class(msg_type)
157 if topic_type
is not None and topic_type != msg_class._type:
165 self.
publisher = Publisher(topic, msg_class, latch=(latched_client_id!=
None), queue_size=queue_size)
170 """ Unregisters the publisher and clears the clients """ 171 self.publisher.unregister()
175 """ Verify that the publisher publishes messages of the specified type. 178 msg_type -- the type to check this publisher against 181 Exception -- if ros_loader cannot load the specified msg type 182 TypeConflictException -- if the msg_type is different than the type of 186 if not ros_loader.get_message_class(msg_type)
is self.
msg_class:
188 self.msg_class._type, msg_type)
192 """ Publish a message using this publisher. 195 msg -- the dict (json) message to publish 198 Exception -- propagates exceptions from message conversion if the 199 provided msg does not properly conform to the message type of this 204 if self.listener.attached
and self.listener.timed_out():
205 self.listener.detach()
211 message_conversion.populate_instance(msg, inst)
214 self.publisher.publish(inst)
217 """ Register the specified client as a client of this publisher. 220 client_id -- the ID of the client using the publisher 226 """ Unregister the specified client from this publisher. 228 If the specified client_id is not a client of this publisher, nothing 232 client_id -- the ID of the client to remove 239 """ Return true if there are clients to this publisher. """ 244 """ The PublisherManager keeps track of ROS publishers 246 It maintains a MultiPublisher instance for each registered topic 248 When unregistering a client, if there are no more clients for a publisher, 249 then that publisher is unregistered from the ROS Master 257 def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
258 """ Register a publisher on the specified topic. 260 Publishers are shared between clients, so a single MultiPublisher 261 instance is created per topic, even if multiple clients register. 264 client_id -- the ID of the client making this request 265 topic -- the name of the topic to publish on 266 msg_type -- (optional) the type to publish 267 latch -- (optional) whether to make this publisher latched 268 queue_size -- (optional) rospy publisher queue_size to use 271 Exception -- exceptions are propagated from the MultiPublisher if 272 there is a problem loading the specified msg class or establishing 276 latched_client_id = client_id
if latch
else None 279 queue_size=queue_size)
280 elif latch
and self.
_publishers[topic].latched_client_id != client_id:
281 logwarn(
"Client ID %s attempted to register topic [%s] as latched " +
282 "but this topic was previously registered.", client_id, topic)
283 logwarn(
"Only a single registered latched publisher is supported at the time")
284 elif not latch
and self.
_publishers[topic].latched_client_id:
285 logwarn(
"New non-latched publisher registration for topic [%s] which is " +
286 "already registered as latched. but this topic was previously " +
287 "registered.", topic)
288 logwarn(
"Only a single registered latched publisher is supported at the time")
290 if msg_type
is not None:
296 """ Unregister a client from the publisher for the given topic. 297 Will wait some time before actually unregistering, it is done in 300 If there are no clients remaining for that publisher, then the 301 publisher is unregistered from the ROS Master 304 client_id -- the ID of the client making this request 305 topic -- the topic to unregister the publisher for 311 self.
_publishers[topic].unregister_client(client_id)
326 """ Unregisters a client from all publishers that they are registered 330 client_id -- the ID of the client making this request """ 331 for topic
in self._publishers.keys():
334 def publish(self, client_id, topic, msg, latch=False, queue_size=100):
335 """ Publish a message on the given topic. 337 Tries to create a publisher on the topic if one does not already exist. 340 client_id -- the ID of the client making this request 341 topic -- the topic to publish the message on 342 msg -- a JSON-like dict of fields and values 343 latch -- (optional) whether to make this publisher latched 344 queue_size -- (optional) rospy publisher queue_size to use 347 Exception -- a variety of exceptions are propagated. They can be 348 thrown if there is a problem setting up or getting the publisher, 349 or if the provided msg does not map to the msg class of the publisher. 352 self.
register(client_id, topic, latch=latch, queue_size=queue_size)
def attach(self, publisher)
def _unregister_impl(self, topic)
def unregister_client(self, client_id)
def register_client(self, client_id)
def peer_subscribe(self, topic_name, topic_publish, peer_publish)
def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100)
def unregister_all(self, client_id)
def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100)
def verify_type(self, msg_type)
def publish(self, client_id, topic, msg, latch=False, queue_size=100)
def publish_override(self, message)
def unregister(self, client_id, topic)