publishers.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # Copyright (c) 2014, Creativa 77 SRL
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 from time import time
00035 from copy import copy
00036 from threading import Lock, Timer
00037 from rospy import Publisher, SubscribeListener
00038 from rospy import logwarn
00039 from rostopic import get_topic_type
00040 from rosbridge_library.internal import ros_loader, message_conversion
00041 from rosbridge_library.internal.topics import TopicNotEstablishedException, TypeConflictException
00042 
00043 UNREGISTER_TIMEOUT = 10.0
00044 
00045 class PublisherConsistencyListener(SubscribeListener):
00046     """ This class is used to solve the problem that sometimes we create a
00047     publisher and then immediately publish a message, before the subscribers
00048     have set up their connections.
00049 
00050     Call attach() to attach the listener to a publisher.  It sets up a buffer
00051     of outgoing messages, then when a new connection occurs, sends the messages
00052     in the buffer.
00053 
00054     Call detach() to detach the listener from the publisher and restore the
00055     original publish methods.
00056 
00057     After some particular timeout (default to 1 second), the listener stops
00058     buffering messages as it is assumed by this point all subscribers will have
00059     successfully set up their connections."""
00060 
00061     timeout = 1  # Timeout in seconds to wait for new subscribers
00062     attached = False
00063 
00064     def attach(self, publisher):
00065         """ Overrides the publisher's publish method, and attaches a subscribe
00066         listener to the publisher, effectively routing incoming connections
00067         and outgoing publish requests through this class instance """
00068         # Do the attaching
00069         self.publisher = publisher
00070         publisher.impl.add_subscriber_listener(self)
00071         self.publish = publisher.publish
00072         publisher.publish = self.publish_override
00073 
00074         # Set state variables
00075         self.lock = Lock()
00076         self.established_time = time()
00077         self.msg_buffer = []
00078         self.attached = True
00079 
00080     def detach(self):
00081         """ Restores the publisher's original publish method and unhooks the
00082         subscribe listeners, effectively finishing with this object """
00083         self.publisher.publish = self.publish
00084         if self in self.publisher.impl.subscriber_listeners:
00085             self.publisher.impl.subscriber_listeners.remove(self)
00086         self.attached = False
00087 
00088     def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00089         """ Called whenever there's a new subscription.
00090 
00091         If we're still inside the subscription setup window, then we publish
00092         any buffered messages to the peer.
00093 
00094         We also check if we're timed out, but if we are we don't detach (due
00095         to threading complications), we just don't propagate buffered messages
00096         """
00097         if not self.timed_out():
00098             self.lock.acquire()
00099             msgs = copy(self.msg_buffer)
00100             self.lock.release()
00101             for msg in msgs:
00102                 peer_publish(msg)
00103 
00104     def timed_out(self):
00105         """ Checks to see how much time has elapsed since the publisher was
00106         created """
00107         return time() - self.established_time > self.timeout
00108 
00109     def publish_override(self, message):
00110         """ The publisher's publish method is replaced with this publish method
00111         which checks for timeout and if we haven't timed out, buffers outgoing
00112         messages in preparation for new subscriptions """
00113         if not self.timed_out():
00114             self.lock.acquire()
00115             self.msg_buffer.append(message)
00116             self.lock.release()
00117         self.publish(message)
00118 
00119 
00120 class MultiPublisher():
00121     """ Keeps track of the clients that are using a particular publisher.
00122 
00123     Provides an API to publish messages and register clients that are using
00124     this publisher """
00125 
00126     def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
00127         """ Register a publisher on the specified topic.
00128 
00129         Keyword arguments:
00130         topic    -- the name of the topic to register the publisher to
00131         msg_type -- (optional) the type to register the publisher as.  If not
00132         provided, an attempt will be made to infer the topic type
00133         latch    -- (optional) if a client requested this publisher to be latched,
00134                     provide the client_id of that client here
00135 
00136         Throws:
00137         TopicNotEstablishedException -- if no msg_type was specified by the
00138         caller and the topic is not yet established, so a topic type cannot
00139         be inferred
00140         TypeConflictException        -- if the msg_type was specified by the
00141         caller and the topic is established, and the established type is
00142         different to the user-specified msg_type
00143 
00144         """
00145         # First check to see if the topic is already established
00146         topic_type = get_topic_type(topic)[0]
00147 
00148         # If it's not established and no type was specified, exception
00149         if msg_type is None and topic_type is None:
00150             raise TopicNotEstablishedException(topic)
00151 
00152         # Use the established topic type if none was specified
00153         if msg_type is None:
00154             msg_type = topic_type
00155 
00156         # Load the message class, propagating any exceptions from bad msg types
00157         msg_class = ros_loader.get_message_class(msg_type)
00158 
00159         # Make sure the specified msg type and established msg type are same
00160         if topic_type is not None and topic_type != msg_class._type:
00161             raise TypeConflictException(topic, topic_type, msg_class._type)
00162 
00163         # Create the publisher and associated member variables
00164         self.clients = {}
00165         self.latched_client_id = latched_client_id
00166         self.topic = topic
00167         self.msg_class = msg_class
00168         self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None), queue_size=queue_size)
00169         self.listener = PublisherConsistencyListener()
00170         self.listener.attach(self.publisher)
00171 
00172     def unregister(self):
00173         """ Unregisters the publisher and clears the clients """
00174         self.publisher.unregister()
00175         self.clients.clear()
00176 
00177     def verify_type(self, msg_type):
00178         """ Verify that the publisher publishes messages of the specified type.
00179 
00180         Keyword arguments:
00181         msg_type -- the type to check this publisher against
00182 
00183         Throws:
00184         Exception -- if ros_loader cannot load the specified msg type
00185         TypeConflictException -- if the msg_type is different than the type of
00186         this publisher
00187 
00188         """
00189         if not ros_loader.get_message_class(msg_type) is self.msg_class:
00190             raise TypeConflictException(self.topic,
00191                                         self.msg_class._type, msg_type)
00192         return
00193 
00194     def publish(self, msg):
00195         """ Publish a message using this publisher.
00196 
00197         Keyword arguments:
00198         msg -- the dict (json) message to publish
00199 
00200         Throws:
00201         Exception -- propagates exceptions from message conversion if the
00202         provided msg does not properly conform to the message type of this
00203         publisher
00204 
00205         """
00206         # First, check the publisher consistency listener to see if it's done
00207         if self.listener.attached and self.listener.timed_out():
00208             self.listener.detach()
00209 
00210         # Create a message instance
00211         inst = self.msg_class()
00212 
00213         # Populate the instance, propagating any exceptions that may be thrown
00214         message_conversion.populate_instance(msg, inst)
00215 
00216         # Publish the message
00217         self.publisher.publish(inst)
00218 
00219     def register_client(self, client_id):
00220         """ Register the specified client as a client of this publisher.
00221 
00222         Keyword arguments:
00223         client_id -- the ID of the client using the publisher
00224 
00225         """
00226         self.clients[client_id] = True
00227 
00228     def unregister_client(self, client_id):
00229         """ Unregister the specified client from this publisher.
00230 
00231         If the specified client_id is not a client of this publisher, nothing
00232         happens.
00233 
00234         Keyword arguments:
00235         client_id -- the ID of the client to remove
00236 
00237         """
00238         if client_id in self.clients:
00239             del self.clients[client_id]
00240 
00241     def has_clients(self):
00242         """ Return true if there are clients to this publisher. """
00243         return len(self.clients) != 0
00244 
00245 
00246 class PublisherManager():
00247     """ The PublisherManager keeps track of ROS publishers
00248 
00249     It maintains a MultiPublisher instance for each registered topic
00250 
00251     When unregistering a client, if there are no more clients for a publisher,
00252     then that publisher is unregistered from the ROS Master
00253     """
00254 
00255     def __init__(self):
00256         self._publishers = {}
00257         self.unregister_timers = {}
00258 
00259     def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
00260         """ Register a publisher on the specified topic.
00261 
00262         Publishers are shared between clients, so a single MultiPublisher
00263         instance is created per topic, even if multiple clients register.
00264 
00265         Keyword arguments:
00266         client_id  -- the ID of the client making this request
00267         topic      -- the name of the topic to publish on
00268         msg_type   -- (optional) the type to publish
00269         latch      -- (optional) whether to make this publisher latched
00270         queue_size -- (optional) rospy publisher queue_size to use
00271 
00272         Throws:
00273         Exception -- exceptions are propagated from the MultiPublisher if
00274         there is a problem loading the specified msg class or establishing
00275         the publisher
00276 
00277         """
00278         latched_client_id = client_id if latch else None
00279         if not topic in self._publishers:
00280             self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id,
00281              queue_size=queue_size)
00282         elif latch and self._publishers[topic].latched_client_id != client_id:
00283             logwarn("Client ID %s attempted to register topic [%s] as latched " +
00284                     "but this topic was previously registered." % (client_id, topic))
00285             logwarn("Only a single registered latched publisher is supported at the time")
00286         elif not latch and self._publishers[topic].latched_client_id:
00287             logwarn("New non-latched publisher registration for topic [%s] which is " +
00288                     "already registered as latched. but this topic was previously " +
00289                     "registered." % topic)
00290             logwarn("Only a single registered latched publisher is supported at the time")
00291 
00292         if msg_type is not None:
00293             self._publishers[topic].verify_type(msg_type)
00294 
00295         self._publishers[topic].register_client(client_id)
00296 
00297     def unregister(self, client_id, topic):
00298         """ Unregister a client from the publisher for the given topic.
00299             Will wait some time before actually unregistering, it is done in
00300             _unregister_impl
00301 
00302         If there are no clients remaining for that publisher, then the
00303         publisher is unregistered from the ROS Master
00304 
00305         Keyword arguments:
00306         client_id -- the ID of the client making this request
00307         topic     -- the topic to unregister the publisher for
00308 
00309         """
00310         if not topic in self._publishers:
00311             return
00312 
00313         self._publishers[topic].unregister_client(client_id)
00314         if topic in self.unregister_timers:
00315             self.unregister_timers[topic].cancel()
00316             del self.unregister_timers[topic]
00317         self.unregister_timers[topic] = Timer(UNREGISTER_TIMEOUT, self._unregister_impl,
00318                                               [topic])
00319         self.unregister_timers[topic].start()
00320 
00321     def _unregister_impl(self, topic):
00322         if not self._publishers[topic].has_clients():
00323             self._publishers[topic].unregister()
00324             del self._publishers[topic]
00325         del self.unregister_timers[topic]
00326 
00327     def unregister_all(self, client_id):
00328         """ Unregisters a client from all publishers that they are registered
00329         to.
00330 
00331         Keyword arguments:
00332         client_id -- the ID of the client making this request """
00333         for topic in self._publishers.keys():
00334             self.unregister(client_id, topic)
00335 
00336     def publish(self, client_id, topic, msg, latch=False, queue_size=100):
00337         """ Publish a message on the given topic.
00338 
00339         Tries to create a publisher on the topic if one does not already exist.
00340 
00341         Keyword arguments:
00342         client_id -- the ID of the client making this request
00343         topic     -- the topic to publish the message on
00344         msg       -- a JSON-like dict of fields and values
00345         latch     -- (optional) whether to make this publisher latched
00346         queue_size -- (optional) rospy publisher queue_size to use
00347 
00348         Throws:
00349         Exception -- a variety of exceptions are propagated.  They can be
00350         thrown if there is a problem setting up or getting the publisher,
00351         or if the provided msg does not map to the msg class of the publisher.
00352 
00353         """
00354         self.register(client_id, topic, latch=latch, queue_size=queue_size)
00355 
00356         self._publishers[topic].publish(msg)
00357 
00358 
00359 manager = PublisherManager()


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43