36 from threading
import Lock, Timer
37 from rospy
import Publisher, SubscribeListener
38 from rospy
import logwarn
39 from rostopic
import get_topic_type
45 """ This class is used to solve the problem that sometimes we create a 46 publisher and then immediately publish a message, before the subscribers 47 have set up their connections. 49 Call attach() to attach the listener to a publisher. It sets up a buffer 50 of outgoing messages, then when a new connection occurs, sends the messages 53 Call detach() to detach the listener from the publisher and restore the 54 original publish methods. 56 After some particular timeout (default to 1 second), the listener stops 57 buffering messages as it is assumed by this point all subscribers will have 58 successfully set up their connections.""" 64 """ Overrides the publisher's publish method, and attaches a subscribe 65 listener to the publisher, effectively routing incoming connections 66 and outgoing publish requests through this class instance """ 69 publisher.impl.add_subscriber_listener(self)
80 """ Restores the publisher's original publish method and unhooks the 81 subscribe listeners, effectively finishing with this object """ 82 self.publisher.publish = self.
publish 83 if self
in self.publisher.impl.subscriber_listeners:
84 self.publisher.impl.subscriber_listeners.remove(self)
88 """ Called whenever there's a new subscription. 90 If we're still inside the subscription setup window, then we publish 91 any buffered messages to the peer. 93 We also check if we're timed out, but if we are we don't detach (due 94 to threading complications), we just don't propagate buffered messages 104 """ Checks to see how much time has elapsed since the publisher was 109 """ The publisher's publish method is replaced with this publish method 110 which checks for timeout and if we haven't timed out, buffers outgoing 111 messages in preparation for new subscriptions """ 114 self.msg_buffer.append(message)
120 """ Keeps track of the clients that are using a particular publisher. 122 Provides an API to publish messages and register clients that are using 125 def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
126 """ Register a publisher on the specified topic. 129 topic -- the name of the topic to register the publisher to 130 msg_type -- (optional) the type to register the publisher as. If not 131 provided, an attempt will be made to infer the topic type 132 latch -- (optional) if a client requested this publisher to be latched, 133 provide the client_id of that client here 136 TopicNotEstablishedException -- if no msg_type was specified by the 137 caller and the topic is not yet established, so a topic type cannot 139 TypeConflictException -- if the msg_type was specified by the 140 caller and the topic is established, and the established type is 141 different to the user-specified msg_type 145 topic_type = get_topic_type(topic)[0]
148 if msg_type
is None and topic_type
is None:
153 msg_type = topic_type
156 msg_class = ros_loader.get_message_class(msg_type)
159 if topic_type
is not None and topic_type != msg_class._type:
167 self.
publisher = Publisher(topic, msg_class, latch=(latched_client_id!=
None), queue_size=queue_size)
172 """ Unregisters the publisher and clears the clients """ 173 self.publisher.unregister()
177 """ Verify that the publisher publishes messages of the specified type. 180 msg_type -- the type to check this publisher against 183 Exception -- if ros_loader cannot load the specified msg type 184 TypeConflictException -- if the msg_type is different than the type of 188 if not ros_loader.get_message_class(msg_type)
is self.
msg_class:
190 self.msg_class._type, msg_type)
194 """ Publish a message using this publisher. 197 msg -- the dict (json) message to publish 200 Exception -- propagates exceptions from message conversion if the 201 provided msg does not properly conform to the message type of this 206 if self.listener.attached
and self.listener.timed_out():
207 self.listener.detach()
213 message_conversion.populate_instance(msg, inst)
216 self.publisher.publish(inst)
219 """ Register the specified client as a client of this publisher. 222 client_id -- the ID of the client using the publisher 228 """ Unregister the specified client from this publisher. 230 If the specified client_id is not a client of this publisher, nothing 234 client_id -- the ID of the client to remove 241 """ Return true if there are clients to this publisher. """ 246 """ The PublisherManager keeps track of ROS publishers 248 It maintains a MultiPublisher instance for each registered topic 250 When unregistering a client, if there are no more clients for a publisher, 251 then that publisher is unregistered from the ROS Master 259 def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
260 """ Register a publisher on the specified topic. 262 Publishers are shared between clients, so a single MultiPublisher 263 instance is created per topic, even if multiple clients register. 266 client_id -- the ID of the client making this request 267 topic -- the name of the topic to publish on 268 msg_type -- (optional) the type to publish 269 latch -- (optional) whether to make this publisher latched 270 queue_size -- (optional) rospy publisher queue_size to use 273 Exception -- exceptions are propagated from the MultiPublisher if 274 there is a problem loading the specified msg class or establishing 278 latched_client_id = client_id
if latch
else None 281 queue_size=queue_size)
282 elif latch
and self.
_publishers[topic].latched_client_id != client_id:
283 logwarn(
"Client ID %s attempted to register topic [%s] as latched " +
284 "but this topic was previously registered.", client_id, topic)
285 logwarn(
"Only a single registered latched publisher is supported at the time")
286 elif not latch
and self.
_publishers[topic].latched_client_id:
287 logwarn(
"New non-latched publisher registration for topic [%s] which is " +
288 "already registered as latched. but this topic was previously " +
289 "registered.", topic)
290 logwarn(
"Only a single registered latched publisher is supported at the time")
292 if msg_type
is not None:
298 """ Unregister a client from the publisher for the given topic. 299 Will wait some time before actually unregistering, it is done in 302 If there are no clients remaining for that publisher, then the 303 publisher is unregistered from the ROS Master 306 client_id -- the ID of the client making this request 307 topic -- the topic to unregister the publisher for 313 self.
_publishers[topic].unregister_client(client_id)
328 """ Unregisters a client from all publishers that they are registered 332 client_id -- the ID of the client making this request """ 333 for topic
in self._publishers.keys():
336 def publish(self, client_id, topic, msg, latch=False, queue_size=100):
337 """ Publish a message on the given topic. 339 Tries to create a publisher on the topic if one does not already exist. 342 client_id -- the ID of the client making this request 343 topic -- the topic to publish the message on 344 msg -- a JSON-like dict of fields and values 345 latch -- (optional) whether to make this publisher latched 346 queue_size -- (optional) rospy publisher queue_size to use 349 Exception -- a variety of exceptions are propagated. They can be 350 thrown if there is a problem setting up or getting the publisher, 351 or if the provided msg does not map to the msg class of the publisher. 354 self.
register(client_id, topic, latch=latch, queue_size=queue_size)
def attach(self, publisher)
def _unregister_impl(self, topic)
def unregister_client(self, client_id)
def register_client(self, client_id)
def peer_subscribe(self, topic_name, topic_publish, peer_publish)
def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100)
def unregister_all(self, client_id)
def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100)
def verify_type(self, msg_type)
def publish(self, client_id, topic, msg, latch=False, queue_size=100)
def publish_override(self, message)
def unregister(self, client_id, topic)