$search
00001 /* 00002 * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc. 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * * Redistributions of source code must retain the above copyright notice, 00007 * this list of conditions and the following disclaimer. 00008 * * Redistributions in binary form must reproduce the above copyright 00009 * notice, this list of conditions and the following disclaimer in the 00010 * documentation and/or other materials provided with the distribution. 00011 * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its 00012 * contributors may be used to endorse or promote products derived from 00013 * this software without specific prior written permission. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00016 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00017 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00019 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00020 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00021 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00023 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00024 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00025 * POSSIBILITY OF SUCH DAMAGE. 00026 */ 00027 00028 #ifndef ROSCPP_SUBSCRIPTION_H 00029 #define ROSCPP_SUBSCRIPTION_H 00030 00031 #include <queue> 00032 #include "ros/common.h" 00033 #include "ros/header.h" 00034 #include "ros/forwards.h" 00035 #include "ros/transport_hints.h" 00036 #include "ros/xmlrpc_manager.h" 00037 #include "XmlRpc.h" 00038 00039 #include <boost/thread.hpp> 00040 #include <boost/shared_ptr.hpp> 00041 #include <boost/enable_shared_from_this.hpp> 00042 00043 namespace ros 00044 { 00045 00046 class PublisherLink; 00047 typedef boost::shared_ptr<PublisherLink> PublisherLinkPtr; 00048 00049 class SubscriptionCallback; 00050 typedef boost::shared_ptr<SubscriptionCallback> SubscriptionCallbackPtr; 00051 00052 class SubscriptionQueue; 00053 typedef boost::shared_ptr<SubscriptionQueue> SubscriptionQueuePtr; 00054 00055 class MessageDeserializer; 00056 typedef boost::shared_ptr<MessageDeserializer> MessageDeserializerPtr; 00057 00058 class SubscriptionCallbackHelper; 00059 typedef boost::shared_ptr<SubscriptionCallbackHelper> SubscriptionCallbackHelperPtr; 00060 00064 class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription> 00065 { 00066 public: 00067 Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints); 00068 virtual ~Subscription(); 00069 00073 void drop(); 00077 void shutdown(); 00082 bool pubUpdate(const std::vector<std::string> &pubs); 00087 bool negotiateConnection(const std::string& xmlrpc_uri); 00088 00089 void addLocalConnection(const PublicationPtr& pub); 00090 00094 bool isDropped() { return dropped_; } 00095 XmlRpc::XmlRpcValue getStats(); 00096 void getInfo(XmlRpc::XmlRpcValue& info); 00097 00098 bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks); 00099 void removeCallback(const SubscriptionCallbackHelperPtr& helper); 00100 00101 typedef std::map<std::string, std::string> M_string; 00102 00107 uint32_t handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link); 00108 00109 const std::string datatype(); 00110 const std::string md5sum(); 00111 00115 void removePublisherLink(const PublisherLinkPtr& pub_link); 00116 00117 const std::string& getName() const { return name_; } 00118 uint32_t getNumCallbacks() const { return callbacks_.size(); } 00119 uint32_t getNumPublishers(); 00120 00121 // We'll keep a list of these objects, representing in-progress XMLRPC 00122 // connections to other nodes. 00123 class ROSCPP_DECL PendingConnection : public ASyncXMLRPCConnection 00124 { 00125 public: 00126 PendingConnection(XmlRpc::XmlRpcClient* client, TransportUDPPtr udp_transport, const SubscriptionWPtr& parent, const std::string& remote_uri) 00127 : client_(client) 00128 , udp_transport_(udp_transport) 00129 , parent_(parent) 00130 , remote_uri_(remote_uri) 00131 {} 00132 00133 ~PendingConnection() 00134 { 00135 delete client_; 00136 } 00137 00138 XmlRpc::XmlRpcClient* getClient() const { return client_; } 00139 TransportUDPPtr getUDPTransport() const { return udp_transport_; } 00140 00141 virtual void addToDispatch(XmlRpc::XmlRpcDispatch* disp) 00142 { 00143 disp->addSource(client_, XmlRpc::XmlRpcDispatch::WritableEvent | XmlRpc::XmlRpcDispatch::Exception); 00144 } 00145 00146 virtual void removeFromDispatch(XmlRpc::XmlRpcDispatch* disp) 00147 { 00148 disp->removeSource(client_); 00149 } 00150 00151 virtual bool check() 00152 { 00153 SubscriptionPtr parent = parent_.lock(); 00154 if (!parent) 00155 { 00156 return true; 00157 } 00158 00159 XmlRpc::XmlRpcValue result; 00160 if (client_->executeCheckDone(result)) 00161 { 00162 parent->pendingConnectionDone(boost::dynamic_pointer_cast<PendingConnection>(shared_from_this()), result); 00163 return true; 00164 } 00165 00166 return false; 00167 } 00168 00169 const std::string& getRemoteURI() { return remote_uri_; } 00170 00171 private: 00172 XmlRpc::XmlRpcClient* client_; 00173 TransportUDPPtr udp_transport_; 00174 SubscriptionWPtr parent_; 00175 std::string remote_uri_; 00176 }; 00177 typedef boost::shared_ptr<PendingConnection> PendingConnectionPtr; 00178 00179 void pendingConnectionDone(const PendingConnectionPtr& pending_conn, XmlRpc::XmlRpcValue& result); 00180 00181 void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti); 00182 00183 void headerReceived(const PublisherLinkPtr& link, const Header& h); 00184 00185 private: 00186 Subscription(const Subscription &); // not copyable 00187 Subscription &operator =(const Subscription &); // nor assignable 00188 00189 void dropAllConnections(); 00190 00191 void addPublisherLink(const PublisherLinkPtr& link); 00192 00193 struct CallbackInfo 00194 { 00195 CallbackQueueInterface* callback_queue_; 00196 00197 // Only used if callback_queue_ is non-NULL (NodeHandle API) 00198 SubscriptionCallbackHelperPtr helper_; 00199 SubscriptionQueuePtr subscription_queue_; 00200 bool has_tracked_object_; 00201 VoidConstWPtr tracked_object_; 00202 }; 00203 typedef boost::shared_ptr<CallbackInfo> CallbackInfoPtr; 00204 typedef std::vector<CallbackInfoPtr> V_CallbackInfo; 00205 00206 std::string name_; 00207 boost::mutex md5sum_mutex_; 00208 std::string md5sum_; 00209 std::string datatype_; 00210 boost::mutex callbacks_mutex_; 00211 V_CallbackInfo callbacks_; 00212 uint32_t nonconst_callbacks_; 00213 00214 bool dropped_; 00215 bool shutting_down_; 00216 boost::mutex shutdown_mutex_; 00217 00218 typedef std::set<PendingConnectionPtr> S_PendingConnection; 00219 S_PendingConnection pending_connections_; 00220 boost::mutex pending_connections_mutex_; 00221 00222 typedef std::vector<PublisherLinkPtr> V_PublisherLink; 00223 V_PublisherLink publisher_links_; 00224 boost::mutex publisher_links_mutex_; 00225 00226 TransportHints transport_hints_; 00227 00228 struct LatchInfo 00229 { 00230 SerializedMessage message; 00231 PublisherLinkPtr link; 00232 boost::shared_ptr<std::map<std::string, std::string> > connection_header; 00233 ros::Time receipt_time; 00234 }; 00235 00236 typedef std::map<PublisherLinkPtr, LatchInfo> M_PublisherLinkToLatchInfo; 00237 M_PublisherLinkToLatchInfo latched_messages_; 00238 00239 typedef std::vector<std::pair<const std::type_info*, MessageDeserializerPtr> > V_TypeAndDeserializer; 00240 V_TypeAndDeserializer cached_deserializers_; 00241 }; 00242 00243 } 00244 00245 #endif 00246