publishers.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # Copyright (c) 2014, Creativa 77 SRL
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 from time import time
00035 from copy import copy
00036 from threading import Lock
00037 from rospy import Publisher, SubscribeListener
00038 from rospy import logwarn
00039 from rostopic import get_topic_type
00040 from rosbridge_library.internal import ros_loader, message_conversion
00041 from rosbridge_library.internal.topics import TopicNotEstablishedException, TypeConflictException
00042 
00043 
00044 class PublisherConsistencyListener(SubscribeListener):
00045     """ This class is used to solve the problem that sometimes we create a
00046     publisher and then immediately publish a message, before the subscribers
00047     have set up their connections.
00048 
00049     Call attach() to attach the listener to a publisher.  It sets up a buffer
00050     of outgoing messages, then when a new connection occurs, sends the messages
00051     in the buffer.
00052 
00053     Call detach() to detach the listener from the publisher and restore the
00054     original publish methods.
00055 
00056     After some particular timeout (default to 1 second), the listener stops
00057     buffering messages as it is assumed by this point all subscribers will have
00058     successfully set up their connections."""
00059 
00060     timeout = 1  # Timeout in seconds to wait for new subscribers
00061     attached = False
00062 
00063     def attach(self, publisher):
00064         """ Overrides the publisher's publish method, and attaches a subscribe
00065         listener to the publisher, effectively routing incoming connections
00066         and outgoing publish requests through this class instance """
00067         # Do the attaching
00068         self.publisher = publisher
00069         publisher.impl.add_subscriber_listener(self)
00070         self.publish = publisher.publish
00071         publisher.publish = self.publish_override
00072 
00073         # Set state variables
00074         self.lock = Lock()
00075         self.established_time = time()
00076         self.msg_buffer = []
00077         self.attached = True
00078 
00079     def detach(self):
00080         """ Restores the publisher's original publish method and unhooks the
00081         subscribe listeners, effectively finishing with this object """
00082         self.publisher.publish = self.publish
00083         if self in self.publisher.impl.subscriber_listeners:
00084             self.publisher.impl.subscriber_listeners.remove(self)
00085         self.attached = False
00086 
00087     def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00088         """ Called whenever there's a new subscription.
00089 
00090         If we're still inside the subscription setup window, then we publish
00091         any buffered messages to the peer.
00092 
00093         We also check if we're timed out, but if we are we don't detach (due
00094         to threading complications), we just don't propagate buffered messages
00095         """
00096         if not self.timed_out():
00097             self.lock.acquire()
00098             msgs = copy(self.msg_buffer)
00099             self.lock.release()
00100             for msg in msgs:
00101                 peer_publish(msg)
00102 
00103     def timed_out(self):
00104         """ Checks to see how much time has elapsed since the publisher was
00105         created """
00106         return time() - self.established_time > self.timeout
00107 
00108     def publish_override(self, message):
00109         """ The publisher's publish method is replaced with this publish method
00110         which checks for timeout and if we haven't timed out, buffers outgoing
00111         messages in preparation for new subscriptions """
00112         if not self.timed_out():
00113             self.lock.acquire()
00114             self.msg_buffer.append(message)
00115             self.lock.release()
00116         self.publish(message)
00117 
00118 
00119 class MultiPublisher():
00120     """ Keeps track of the clients that are using a particular publisher.
00121 
00122     Provides an API to publish messages and register clients that are using
00123     this publisher """
00124 
00125     def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
00126         """ Register a publisher on the specified topic.
00127 
00128         Keyword arguments:
00129         topic    -- the name of the topic to register the publisher to
00130         msg_type -- (optional) the type to register the publisher as.  If not
00131         provided, an attempt will be made to infer the topic type
00132         latch    -- (optional) if a client requested this publisher to be latched,
00133                     provide the client_id of that client here
00134 
00135         Throws:
00136         TopicNotEstablishedException -- if no msg_type was specified by the
00137         caller and the topic is not yet established, so a topic type cannot
00138         be inferred
00139         TypeConflictException        -- if the msg_type was specified by the
00140         caller and the topic is established, and the established type is
00141         different to the user-specified msg_type
00142 
00143         """
00144         # First check to see if the topic is already established
00145         topic_type = get_topic_type(topic)[0]
00146 
00147         # If it's not established and no type was specified, exception
00148         if msg_type is None and topic_type is None:
00149             raise TopicNotEstablishedException(topic)
00150 
00151         # Use the established topic type if none was specified
00152         if msg_type is None:
00153             msg_type = topic_type
00154 
00155         # Load the message class, propagating any exceptions from bad msg types
00156         msg_class = ros_loader.get_message_class(msg_type)
00157 
00158         # Make sure the specified msg type and established msg type are same
00159         if topic_type is not None and topic_type != msg_class._type:
00160             raise TypeConflictException(topic, topic_type, msg_class._type)
00161 
00162         # Create the publisher and associated member variables
00163         self.clients = {}
00164         self.latched_client_id = latched_client_id
00165         self.topic = topic
00166         self.msg_class = msg_class
00167         self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None), queue_size=queue_size)
00168         self.listener = PublisherConsistencyListener()
00169         self.listener.attach(self.publisher)
00170 
00171     def unregister(self):
00172         """ Unregisters the publisher and clears the clients """
00173         self.publisher.unregister()
00174         self.clients.clear()
00175 
00176     def verify_type(self, msg_type):
00177         """ Verify that the publisher publishes messages of the specified type.
00178 
00179         Keyword arguments:
00180         msg_type -- the type to check this publisher against
00181 
00182         Throws:
00183         Exception -- if ros_loader cannot load the specified msg type
00184         TypeConflictException -- if the msg_type is different than the type of
00185         this publisher
00186 
00187         """
00188         if not ros_loader.get_message_class(msg_type) is self.msg_class:
00189             raise TypeConflictException(self.topic,
00190                                         self.msg_class._type, msg_type)
00191         return
00192 
00193     def publish(self, msg):
00194         """ Publish a message using this publisher.
00195 
00196         Keyword arguments:
00197         msg -- the dict (json) message to publish
00198 
00199         Throws:
00200         Exception -- propagates exceptions from message conversion if the
00201         provided msg does not properly conform to the message type of this
00202         publisher
00203 
00204         """
00205         # First, check the publisher consistency listener to see if it's done
00206         if self.listener.attached and self.listener.timed_out():
00207             self.listener.detach()
00208 
00209         # Create a message instance
00210         inst = self.msg_class()
00211 
00212         # Populate the instance, propagating any exceptions that may be thrown
00213         message_conversion.populate_instance(msg, inst)
00214 
00215         # Publish the message
00216         self.publisher.publish(inst)
00217 
00218     def register_client(self, client_id):
00219         """ Register the specified client as a client of this publisher.
00220 
00221         Keyword arguments:
00222         client_id -- the ID of the client using the publisher
00223 
00224         """
00225         self.clients[client_id] = True
00226 
00227     def unregister_client(self, client_id):
00228         """ Unregister the specified client from this publisher.
00229 
00230         If the specified client_id is not a client of this publisher, nothing
00231         happens.
00232 
00233         Keyword arguments:
00234         client_id -- the ID of the client to remove
00235 
00236         """
00237         if client_id in self.clients:
00238             del self.clients[client_id]
00239 
00240     def has_clients(self):
00241         """ Return true if there are clients to this publisher. """
00242         return len(self.clients) != 0
00243 
00244 
00245 class PublisherManager():
00246     """ The PublisherManager keeps track of ROS publishers
00247 
00248     It maintains a MultiPublisher instance for each registered topic
00249 
00250     When unregistering a client, if there are no more clients for a publisher,
00251     then that publisher is unregistered from the ROS Master
00252     """
00253 
00254     def __init__(self):
00255         self._publishers = {}
00256 
00257     def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
00258         """ Register a publisher on the specified topic.
00259 
00260         Publishers are shared between clients, so a single MultiPublisher
00261         instance is created per topic, even if multiple clients register.
00262 
00263         Keyword arguments:
00264         client_id  -- the ID of the client making this request
00265         topic      -- the name of the topic to publish on
00266         msg_type   -- (optional) the type to publish
00267         latch      -- (optional) whether to make this publisher latched
00268         queue_size -- (optional) rospy publisher queue_size to use
00269 
00270         Throws:
00271         Exception -- exceptions are propagated from the MultiPublisher if
00272         there is a problem loading the specified msg class or establishing
00273         the publisher
00274 
00275         """
00276         latched_client_id = client_id if latch else None
00277         if not topic in self._publishers:
00278             self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id,
00279              queue_size=queue_size)
00280         elif latch and self._publishers[topic].latched_client_id != client_id:
00281             logwarn("Client ID %s attempted to register topic [%s] as latched " +
00282                     "but this topic was previously registered." % (client_id, topic))
00283             logwarn("Only a single registered latched publisher is supported at the time")
00284         elif not latch and self._publishers[topic].latched_client_id:
00285             logwarn("New non-latched publisher registration for topic [%s] which is " +
00286                     "already registered as latched. but this topic was previously " +
00287                     "registered." % topic)
00288             logwarn("Only a single registered latched publisher is supported at the time")
00289 
00290         if msg_type is not None:
00291             self._publishers[topic].verify_type(msg_type)
00292 
00293         self._publishers[topic].register_client(client_id)
00294 
00295     def unregister(self, client_id, topic):
00296         """ Unregister a client from the publisher for the given topic.
00297 
00298         If there are no clients remaining for that publisher, then the
00299         publisher is unregistered from the ROS Master
00300 
00301         Keyword arguments:
00302         client_id -- the ID of the client making this request
00303         topic     -- the topic to unregister the publisher for
00304 
00305         """
00306         if not topic in self._publishers:
00307             return
00308 
00309         self._publishers[topic].unregister_client(client_id)
00310 
00311         if not self._publishers[topic].has_clients():
00312             self._publishers[topic].unregister()
00313             del self._publishers[topic]
00314 
00315     def unregister_all(self, client_id):
00316         """ Unregisters a client from all publishers that they are registered
00317         to.
00318 
00319         Keyword arguments:
00320         client_id -- the ID of the client making this request """
00321         for topic in self._publishers.keys():
00322             self.unregister(client_id, topic)
00323 
00324     def publish(self, client_id, topic, msg, latch=False, queue_size=100):
00325         """ Publish a message on the given topic.
00326 
00327         Tries to create a publisher on the topic if one does not already exist.
00328 
00329         Keyword arguments:
00330         client_id -- the ID of the client making this request
00331         topic     -- the topic to publish the message on
00332         msg       -- a JSON-like dict of fields and values
00333         latch     -- (optional) whether to make this publisher latched
00334         queue_size -- (optional) rospy publisher queue_size to use
00335 
00336         Throws:
00337         Exception -- a variety of exceptions are propagated.  They can be
00338         thrown if there is a problem setting up or getting the publisher,
00339         or if the provided msg does not map to the msg class of the publisher.
00340 
00341         """
00342         self.register(client_id, topic, latch=latch, queue_size=queue_size)
00343 
00344         self._publishers[topic].publish(msg)
00345 
00346 
00347 manager = PublisherManager()


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Aug 27 2015 14:50:35