subscription.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #ifndef ROSCPP_SUBSCRIPTION_H
00029 #define ROSCPP_SUBSCRIPTION_H
00030 
00031 #include <queue>
00032 #include "ros/common.h"
00033 #include "ros/header.h"
00034 #include "ros/forwards.h"
00035 #include "ros/transport_hints.h"
00036 #include "ros/xmlrpc_manager.h"
00037 #include "ros/statistics.h"
00038 #include "XmlRpc.h"
00039 
00040 #include <boost/thread.hpp>
00041 #include <boost/shared_ptr.hpp>
00042 #include <boost/enable_shared_from_this.hpp>
00043 
00044 namespace ros
00045 {
00046 
00047 class PublisherLink;
00048 typedef boost::shared_ptr<PublisherLink> PublisherLinkPtr;
00049 
00050 class SubscriptionCallback;
00051 typedef boost::shared_ptr<SubscriptionCallback> SubscriptionCallbackPtr;
00052 
00053 class SubscriptionQueue;
00054 typedef boost::shared_ptr<SubscriptionQueue> SubscriptionQueuePtr;
00055 
00056 class MessageDeserializer;
00057 typedef boost::shared_ptr<MessageDeserializer> MessageDeserializerPtr;
00058 
00059 class SubscriptionCallbackHelper;
00060 typedef boost::shared_ptr<SubscriptionCallbackHelper> SubscriptionCallbackHelperPtr;
00061 
00065 class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription>
00066 {
00067 public:
00068   Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints);
00069   virtual ~Subscription();
00070 
00074   void drop();
00078   void shutdown();
00083   bool pubUpdate(const std::vector<std::string> &pubs);
00088   bool negotiateConnection(const std::string& xmlrpc_uri);
00089 
00090   void addLocalConnection(const PublicationPtr& pub);
00091 
00095   bool isDropped() { return dropped_; }
00096   XmlRpc::XmlRpcValue getStats();
00097   void getInfo(XmlRpc::XmlRpcValue& info);
00098 
00099   bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks);
00100   void removeCallback(const SubscriptionCallbackHelperPtr& helper);
00101 
00102   typedef std::map<std::string, std::string> M_string;
00103 
00108   uint32_t handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link);
00109 
00110   const std::string datatype();
00111   const std::string md5sum();
00112 
00116   void removePublisherLink(const PublisherLinkPtr& pub_link);
00117 
00118   const std::string& getName() const { return name_; }
00119   uint32_t getNumCallbacks() const { return callbacks_.size(); }
00120   uint32_t getNumPublishers();
00121 
00122   // We'll keep a list of these objects, representing in-progress XMLRPC 
00123   // connections to other nodes.
00124   class ROSCPP_DECL PendingConnection : public ASyncXMLRPCConnection
00125   {
00126     public:
00127       PendingConnection(XmlRpc::XmlRpcClient* client, TransportUDPPtr udp_transport, const SubscriptionWPtr& parent, const std::string& remote_uri)
00128       : client_(client)
00129       , udp_transport_(udp_transport)
00130       , parent_(parent)
00131       , remote_uri_(remote_uri)
00132       {}
00133 
00134       ~PendingConnection()
00135       {
00136         delete client_;
00137       }
00138 
00139       XmlRpc::XmlRpcClient* getClient() const { return client_; }
00140       TransportUDPPtr getUDPTransport() const { return udp_transport_; }
00141 
00142       virtual void addToDispatch(XmlRpc::XmlRpcDispatch* disp)
00143       {
00144         disp->addSource(client_, XmlRpc::XmlRpcDispatch::WritableEvent | XmlRpc::XmlRpcDispatch::Exception);
00145       }
00146 
00147       virtual void removeFromDispatch(XmlRpc::XmlRpcDispatch* disp)
00148       {
00149         disp->removeSource(client_);
00150       }
00151 
00152       virtual bool check()
00153       {
00154         SubscriptionPtr parent = parent_.lock();
00155         if (!parent)
00156         {
00157           return true;
00158         }
00159 
00160         XmlRpc::XmlRpcValue result;
00161         if (client_->executeCheckDone(result))
00162         {
00163           parent->pendingConnectionDone(boost::dynamic_pointer_cast<PendingConnection>(shared_from_this()), result);
00164           return true;
00165         }
00166 
00167         return false;
00168       }
00169 
00170       const std::string& getRemoteURI() { return remote_uri_; }
00171 
00172     private:
00173       XmlRpc::XmlRpcClient* client_;
00174       TransportUDPPtr udp_transport_;
00175       SubscriptionWPtr parent_;
00176       std::string remote_uri_;
00177   };
00178   typedef boost::shared_ptr<PendingConnection> PendingConnectionPtr;
00179 
00180   void pendingConnectionDone(const PendingConnectionPtr& pending_conn, XmlRpc::XmlRpcValue& result);
00181 
00182   void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti);
00183 
00184   void headerReceived(const PublisherLinkPtr& link, const Header& h);
00185 
00186 private:
00187   Subscription(const Subscription &); // not copyable
00188   Subscription &operator =(const Subscription &); // nor assignable
00189 
00190   void dropAllConnections();
00191 
00192   void addPublisherLink(const PublisherLinkPtr& link);
00193 
00194   struct CallbackInfo
00195   {
00196     CallbackQueueInterface* callback_queue_;
00197 
00198     // Only used if callback_queue_ is non-NULL (NodeHandle API)
00199     SubscriptionCallbackHelperPtr helper_;
00200     SubscriptionQueuePtr subscription_queue_;
00201     bool has_tracked_object_;
00202     VoidConstWPtr tracked_object_;
00203   };
00204   typedef boost::shared_ptr<CallbackInfo> CallbackInfoPtr;
00205   typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
00206 
00207   std::string name_;
00208   boost::mutex md5sum_mutex_;
00209   std::string md5sum_;
00210   std::string datatype_;
00211   boost::mutex callbacks_mutex_;
00212   V_CallbackInfo callbacks_;
00213   uint32_t nonconst_callbacks_;
00214 
00215   bool dropped_;
00216   bool shutting_down_;
00217   boost::mutex shutdown_mutex_;
00218 
00219   typedef std::set<PendingConnectionPtr> S_PendingConnection;
00220   S_PendingConnection pending_connections_;
00221   boost::mutex pending_connections_mutex_;
00222 
00223   typedef std::vector<PublisherLinkPtr> V_PublisherLink;
00224   V_PublisherLink publisher_links_;
00225   boost::mutex publisher_links_mutex_;
00226 
00227   TransportHints transport_hints_;
00228 
00229   StatisticsLogger statistics_;
00230 
00231   struct LatchInfo
00232   {
00233     SerializedMessage message;
00234     PublisherLinkPtr link;
00235     boost::shared_ptr<std::map<std::string, std::string> > connection_header;
00236     ros::Time receipt_time;
00237   };
00238 
00239   typedef std::map<PublisherLinkPtr, LatchInfo> M_PublisherLinkToLatchInfo;
00240   M_PublisherLinkToLatchInfo latched_messages_;
00241 
00242   typedef std::vector<std::pair<const std::type_info*, MessageDeserializerPtr> > V_TypeAndDeserializer;
00243   V_TypeAndDeserializer cached_deserializers_;
00244 };
00245 
00246 }
00247 
00248 #endif
00249 


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 04:01:04