subscription.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #ifndef ROSCPP_SUBSCRIPTION_H
00029 #define ROSCPP_SUBSCRIPTION_H
00030 
00031 #include <queue>
00032 #include "ros/common.h"
00033 #include "ros/header.h"
00034 #include "ros/forwards.h"
00035 #include "ros/transport_hints.h"
00036 #include "ros/xmlrpc_manager.h"
00037 #include "XmlRpc.h"
00038 
00039 #include <boost/thread.hpp>
00040 #include <boost/shared_ptr.hpp>
00041 #include <boost/enable_shared_from_this.hpp>
00042 
00043 namespace ros
00044 {
00045 
00046 class PublisherLink;
00047 typedef boost::shared_ptr<PublisherLink> PublisherLinkPtr;
00048 
00049 class SubscriptionCallback;
00050 typedef boost::shared_ptr<SubscriptionCallback> SubscriptionCallbackPtr;
00051 
00052 class SubscriptionQueue;
00053 typedef boost::shared_ptr<SubscriptionQueue> SubscriptionQueuePtr;
00054 
00055 class MessageDeserializer;
00056 typedef boost::shared_ptr<MessageDeserializer> MessageDeserializerPtr;
00057 
00058 class SubscriptionCallbackHelper;
00059 typedef boost::shared_ptr<SubscriptionCallbackHelper> SubscriptionCallbackHelperPtr;
00060 
00064 class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription>
00065 {
00066 public:
00067   Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints);
00068   virtual ~Subscription();
00069 
00073   void drop();
00077   void shutdown();
00082   bool pubUpdate(const std::vector<std::string> &pubs);
00087   bool negotiateConnection(const std::string& xmlrpc_uri);
00088 
00089   void addLocalConnection(const PublicationPtr& pub);
00090 
00094   bool isDropped() { return dropped_; }
00095   XmlRpc::XmlRpcValue getStats();
00096   void getInfo(XmlRpc::XmlRpcValue& info);
00097 
00098   bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks);
00099   void removeCallback(const SubscriptionCallbackHelperPtr& helper);
00100 
00101   typedef std::map<std::string, std::string> M_string;
00102 
00107   uint32_t handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link);
00108 
00109   const std::string datatype();
00110   const std::string md5sum();
00111 
00115   void removePublisherLink(const PublisherLinkPtr& pub_link);
00116 
00117   const std::string& getName() const { return name_; }
00118   uint32_t getNumCallbacks() const { return callbacks_.size(); }
00119   uint32_t getNumPublishers();
00120 
00121   // We'll keep a list of these objects, representing in-progress XMLRPC 
00122   // connections to other nodes.
00123   class ROSCPP_DECL PendingConnection : public ASyncXMLRPCConnection
00124   {
00125     public:
00126       PendingConnection(XmlRpc::XmlRpcClient* client, TransportUDPPtr udp_transport, const SubscriptionWPtr& parent, const std::string& remote_uri)
00127       : client_(client)
00128       , udp_transport_(udp_transport)
00129       , parent_(parent)
00130       , remote_uri_(remote_uri)
00131       {}
00132 
00133       ~PendingConnection()
00134       {
00135         delete client_;
00136       }
00137 
00138       XmlRpc::XmlRpcClient* getClient() const { return client_; }
00139       TransportUDPPtr getUDPTransport() const { return udp_transport_; }
00140 
00141       virtual void addToDispatch(XmlRpc::XmlRpcDispatch* disp)
00142       {
00143         disp->addSource(client_, XmlRpc::XmlRpcDispatch::WritableEvent | XmlRpc::XmlRpcDispatch::Exception);
00144       }
00145 
00146       virtual void removeFromDispatch(XmlRpc::XmlRpcDispatch* disp)
00147       {
00148         disp->removeSource(client_);
00149       }
00150 
00151       virtual bool check()
00152       {
00153         SubscriptionPtr parent = parent_.lock();
00154         if (!parent)
00155         {
00156           return true;
00157         }
00158 
00159         XmlRpc::XmlRpcValue result;
00160         if (client_->executeCheckDone(result))
00161         {
00162           parent->pendingConnectionDone(boost::dynamic_pointer_cast<PendingConnection>(shared_from_this()), result);
00163           return true;
00164         }
00165 
00166         return false;
00167       }
00168 
00169       const std::string& getRemoteURI() { return remote_uri_; }
00170 
00171     private:
00172       XmlRpc::XmlRpcClient* client_;
00173       TransportUDPPtr udp_transport_;
00174       SubscriptionWPtr parent_;
00175       std::string remote_uri_;
00176   };
00177   typedef boost::shared_ptr<PendingConnection> PendingConnectionPtr;
00178 
00179   void pendingConnectionDone(const PendingConnectionPtr& pending_conn, XmlRpc::XmlRpcValue& result);
00180 
00181   void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti);
00182 
00183   void headerReceived(const PublisherLinkPtr& link, const Header& h);
00184 
00185 private:
00186   Subscription(const Subscription &); // not copyable
00187   Subscription &operator =(const Subscription &); // nor assignable
00188 
00189   void dropAllConnections();
00190 
00191   void addPublisherLink(const PublisherLinkPtr& link);
00192 
00193   struct CallbackInfo
00194   {
00195     CallbackQueueInterface* callback_queue_;
00196 
00197     // Only used if callback_queue_ is non-NULL (NodeHandle API)
00198     SubscriptionCallbackHelperPtr helper_;
00199     SubscriptionQueuePtr subscription_queue_;
00200     bool has_tracked_object_;
00201     VoidConstWPtr tracked_object_;
00202   };
00203   typedef boost::shared_ptr<CallbackInfo> CallbackInfoPtr;
00204   typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
00205 
00206   std::string name_;
00207   boost::mutex md5sum_mutex_;
00208   std::string md5sum_;
00209   std::string datatype_;
00210   boost::mutex callbacks_mutex_;
00211   V_CallbackInfo callbacks_;
00212   uint32_t nonconst_callbacks_;
00213 
00214   bool dropped_;
00215   bool shutting_down_;
00216   boost::mutex shutdown_mutex_;
00217 
00218   typedef std::set<PendingConnectionPtr> S_PendingConnection;
00219   S_PendingConnection pending_connections_;
00220   boost::mutex pending_connections_mutex_;
00221 
00222   typedef std::vector<PublisherLinkPtr> V_PublisherLink;
00223   V_PublisherLink publisher_links_;
00224   boost::mutex publisher_links_mutex_;
00225 
00226   TransportHints transport_hints_;
00227 
00228   struct LatchInfo
00229   {
00230     SerializedMessage message;
00231     PublisherLinkPtr link;
00232     boost::shared_ptr<std::map<std::string, std::string> > connection_header;
00233     ros::Time receipt_time;
00234   };
00235 
00236   typedef std::map<PublisherLinkPtr, LatchInfo> M_PublisherLinkToLatchInfo;
00237   M_PublisherLinkToLatchInfo latched_messages_;
00238 
00239   typedef std::vector<std::pair<const std::type_info*, MessageDeserializerPtr> > V_TypeAndDeserializer;
00240   V_TypeAndDeserializer cached_deserializers_;
00241 };
00242 
00243 }
00244 
00245 #endif
00246 


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Mon Oct 6 2014 11:46:44