publishers.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 from time import time
00034 from copy import copy
00035 from threading import Lock
00036 from rospy import Publisher, SubscribeListener
00037 from rostopic import get_topic_type
00038 from rosbridge_library.internal import ros_loader, message_conversion
00039 from rosbridge_library.internal.topics import TopicNotEstablishedException, TypeConflictException
00040 
00041 
00042 class PublisherConsistencyListener(SubscribeListener):
00043     """ This class is used to solve the problem that sometimes we create a
00044     publisher and then immediately publish a message, before the subscribers
00045     have set up their connections.
00046 
00047     Call attach() to attach the listener to a publisher.  It sets up a buffer
00048     of outgoing messages, then when a new connection occurs, sends the messages
00049     in the buffer.
00050 
00051     Call detach() to detach the listener from the publisher and restore the
00052     original publish methods.
00053 
00054     After some particular timeout (default to 1 second), the listener stops
00055     buffering messages as it is assumed by this point all subscribers will have
00056     successfully set up their connections."""
00057 
00058     timeout = 1  # Timeout in seconds to wait for new subscribers
00059     attached = False
00060 
00061     def attach(self, publisher):
00062         """ Overrides the publisher's publish method, and attaches a subscribe
00063         listener to the publisher, effectively routing incoming connections
00064         and outgoing publish requests through this class instance """
00065         # Do the attaching
00066         self.publisher = publisher
00067         publisher.impl.add_subscriber_listener(self)
00068         self.publish = publisher.publish
00069         publisher.publish = self.publish_override
00070 
00071         # Set state variables
00072         self.lock = Lock()
00073         self.established_time = time()
00074         self.msg_buffer = []
00075         self.attached = True
00076 
00077     def detach(self):
00078         """ Restores the publisher's original publish method and unhooks the
00079         subscribe listeners, effectively finishing with this object """
00080         self.publisher.publish = self.publish
00081         if self in self.publisher.impl.subscriber_listeners:
00082             self.publisher.impl.subscriber_listeners.remove(self)
00083         self.attached = False
00084 
00085     def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00086         """ Called whenever there's a new subscription.
00087 
00088         If we're still inside the subscription setup window, then we publish
00089         any buffered messages to the peer.
00090 
00091         We also check if we're timed out, but if we are we don't detach (due
00092         to threading complications), we just don't propagate buffered messages
00093         """
00094         if not self.timed_out():
00095             self.lock.acquire()
00096             msgs = copy(self.msg_buffer)
00097             self.lock.release()
00098             for msg in msgs:
00099                 peer_publish(msg)
00100 
00101     def timed_out(self):
00102         """ Checks to see how much time has elapsed since the publisher was
00103         created """
00104         return time() - self.established_time > self.timeout
00105 
00106     def publish_override(self, message):
00107         """ The publisher's publish method is replaced with this publish method
00108         which checks for timeout and if we haven't timed out, buffers outgoing
00109         messages in preparation for new subscriptions """
00110         if not self.timed_out():
00111             self.lock.acquire()
00112             self.msg_buffer.append(message)
00113             self.lock.release()
00114         self.publish(message)
00115 
00116 
00117 class MultiPublisher():
00118     """ Keeps track of the clients that are using a particular publisher.
00119 
00120     Provides an API to publish messages and register clients that are using
00121     this publisher """
00122 
00123     def __init__(self, topic, msg_type=None):
00124         """ Register a publisher on the specified topic.
00125 
00126         Keyword arguments:
00127         topic    -- the name of the topic to register the publisher to
00128         msg_type -- (optional) the type to register the publisher as.  If not
00129         provided, an attempt will be made to infer the topic type
00130 
00131         Throws:
00132         TopicNotEstablishedException -- if no msg_type was specified by the
00133         caller and the topic is not yet established, so a topic type cannot
00134         be inferred
00135         TypeConflictException        -- if the msg_type was specified by the
00136         caller and the topic is established, and the established type is
00137         different to the user-specified msg_type
00138 
00139         """
00140         # First check to see if the topic is already established
00141         topic_type = get_topic_type(topic)[0]
00142 
00143         # If it's not established and no type was specified, exception
00144         if msg_type is None and topic_type is None:
00145             raise TopicNotEstablishedException(topic)
00146 
00147         # Use the established topic type if none was specified
00148         if msg_type is None:
00149             msg_type = topic_type
00150 
00151         # Load the message class, propagating any exceptions from bad msg types
00152         msg_class = ros_loader.get_message_class(msg_type)
00153 
00154         # Make sure the specified msg type and established msg type are same
00155         if topic_type is not None and topic_type != msg_class._type:
00156             raise TypeConflictException(topic, topic_type, msg_class.type)
00157 
00158         # Create the publisher and associated member variables
00159         self.clients = {}
00160         self.topic = topic
00161         self.msg_class = msg_class
00162         self.publisher = Publisher(topic, msg_class)
00163         self.listener = PublisherConsistencyListener()
00164         self.listener.attach(self.publisher)
00165 
00166     def unregister(self):
00167         """ Unregisters the publisher and clears the clients """
00168         # Rospy's unregister is unreliable.
00169         # self.publisher.unregister()
00170         self.clients.clear()
00171 
00172     def verify_type(self, msg_type):
00173         """ Verify that the publisher publishes messages of the specified type.
00174 
00175         Keyword arguments:
00176         msg_type -- the type to check this publisher against
00177 
00178         Throws:
00179         Exception -- if ros_loader cannot load the specified msg type
00180         TypeConflictException -- if the msg_type is different than the type of
00181         this publisher
00182 
00183         """
00184         if not ros_loader.get_message_class(msg_type) is self.msg_class:
00185             raise TypeConflictException(self.topic,
00186                                         self.msg_class._type, msg_type)
00187         return
00188 
00189     def publish(self, msg):
00190         """ Publish a message using this publisher.
00191 
00192         Keyword arguments:
00193         msg -- the dict (json) message to publish
00194 
00195         Throws:
00196         Exception -- propagates exceptions from message conversion if the
00197         provided msg does not properly conform to the message type of this
00198         publisher
00199 
00200         """
00201         # First, check the publisher consistency listener to see if it's done
00202         if self.listener.attached and self.listener.timed_out():
00203             self.listener.detach()
00204 
00205         # Create a message instance
00206         inst = self.msg_class()
00207 
00208         # Populate the instance, propagating any exceptions that may be thrown
00209         message_conversion.populate_instance(msg, inst)
00210 
00211         # Publish the message
00212         self.publisher.publish(inst)
00213 
00214     def register_client(self, client_id):
00215         """ Register the specified client as a client of this publisher.
00216 
00217         Keyword arguments:
00218         client_id -- the ID of the client using the publisher
00219 
00220         """
00221         self.clients[client_id] = True
00222 
00223     def unregister_client(self, client_id):
00224         """ Unregister the specified client from this publisher.
00225 
00226         If the specified client_id is not a client of this publisher, nothing
00227         happens.
00228 
00229         Keyword arguments:
00230         client_id -- the ID of the client to remove
00231 
00232         """
00233         if client_id in self.clients:
00234             del self.clients[client_id]
00235 
00236     def has_clients(self):
00237         """ Return true if there are clients to this publisher. """
00238         return len(self.clients) != 0
00239 
00240 
00241 class PublisherManager():
00242     """ The PublisherManager keeps track of ROS publishers
00243 
00244     It maintains a MultiPublisher instance for each registered topic
00245 
00246     When unregistering a client, if there are no more clients for a publisher,
00247     then that publisher is unregistered from the ROS Master
00248     """
00249 
00250     def __init__(self):
00251         self._publishers = {}
00252 
00253     def register(self, client_id, topic, msg_type=None):
00254         """ Register a publisher on the specified topic.
00255 
00256         Publishers are shared between clients, so a single MultiPublisher
00257         instance is created per topic, even if multiple clients register.
00258 
00259         Keyword arguments:
00260         client_id -- the ID of the client making this request
00261         topic     -- the name of the topic to publish on
00262         msg_type  -- (optional) the type to publish
00263 
00264         Throws:
00265         Exception -- exceptions are propagated from the MultiPublisher if
00266         there is a problem loading the specified msg class or establishing
00267         the publisher
00268 
00269         """
00270         if not topic in self._publishers:
00271             self._publishers[topic] = MultiPublisher(topic, msg_type)
00272 
00273         if msg_type is not None:
00274             self._publishers[topic].verify_type(msg_type)
00275 
00276         self._publishers[topic].register_client(client_id)
00277 
00278     def unregister(self, client_id, topic):
00279         """ Unregister a client from the publisher for the given topic.
00280 
00281         If there are no clients remaining for that publisher, then the
00282         publisher is unregistered from the ROS Master
00283 
00284         Keyword arguments:
00285         client_id -- the ID of the client making this request
00286         topic     -- the topic to unregister the publisher for
00287 
00288         """
00289         if not topic in self._publishers:
00290             return
00291 
00292         self._publishers[topic].unregister_client(client_id)
00293 
00294         if not self._publishers[topic].has_clients():
00295             self._publishers[topic].unregister()
00296             del self._publishers[topic]
00297 
00298     def unregister_all(self, client_id):
00299         """ Unregisters a client from all publishers that they are registered
00300         to.
00301 
00302         Keyword arguments:
00303         client_id -- the ID of the client making this request """
00304         for topic in self._publishers.keys():
00305             self.unregister(client_id, topic)
00306 
00307     def publish(self, client_id, topic, msg):
00308         """ Publish a message on the given topic.
00309 
00310         Tries to create a publisher on the topic if one does not already exist.
00311 
00312         Keyword arguments:
00313         client_id -- the ID of the client making this request
00314         topic     -- the topic to publish the message on
00315         msg       -- a JSON-like dict of fields and values
00316 
00317         Throws:
00318         Exception -- a variety of exceptions are propagated.  They can be
00319         thrown if there is a problem setting up or getting the publisher,
00320         or if the provided msg does not map to the msg class of the publisher.
00321 
00322         """
00323         self.register(client_id, topic)
00324 
00325         self._publishers[topic].publish(msg)
00326 
00327 
00328 manager = PublisherManager()


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:53:35