Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef ROSCPP_SUBSCRIPTION_H
00029 #define ROSCPP_SUBSCRIPTION_H
00030
00031 #include <queue>
00032 #include "ros/common.h"
00033 #include "ros/header.h"
00034 #include "ros/forwards.h"
00035 #include "ros/transport_hints.h"
00036 #include "ros/xmlrpc_manager.h"
00037 #include "XmlRpc.h"
00038
00039 #include <boost/thread.hpp>
00040 #include <boost/shared_ptr.hpp>
00041 #include <boost/enable_shared_from_this.hpp>
00042
00043 namespace ros
00044 {
00045
00046 class PublisherLink;
00047 typedef boost::shared_ptr<PublisherLink> PublisherLinkPtr;
00048
00049 class SubscriptionCallback;
00050 typedef boost::shared_ptr<SubscriptionCallback> SubscriptionCallbackPtr;
00051
00052 class SubscriptionQueue;
00053 typedef boost::shared_ptr<SubscriptionQueue> SubscriptionQueuePtr;
00054
00055 class MessageDeserializer;
00056 typedef boost::shared_ptr<MessageDeserializer> MessageDeserializerPtr;
00057
00058 class SubscriptionCallbackHelper;
00059 typedef boost::shared_ptr<SubscriptionCallbackHelper> SubscriptionCallbackHelperPtr;
00060
00064 class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription>
00065 {
00066 public:
00067 Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints);
00068 virtual ~Subscription();
00069
00073 void drop();
00077 void shutdown();
00082 bool pubUpdate(const std::vector<std::string> &pubs);
00087 bool negotiateConnection(const std::string& xmlrpc_uri);
00088
00089 void addLocalConnection(const PublicationPtr& pub);
00090
00094 bool isDropped() { return dropped_; }
00095 XmlRpc::XmlRpcValue getStats();
00096 void getInfo(XmlRpc::XmlRpcValue& info);
00097
00098 bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks);
00099 void removeCallback(const SubscriptionCallbackHelperPtr& helper);
00100
00101 typedef std::map<std::string, std::string> M_string;
00102
00107 uint32_t handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link);
00108
00109 const std::string datatype();
00110 const std::string md5sum();
00111
00115 void removePublisherLink(const PublisherLinkPtr& pub_link);
00116
00117 const std::string& getName() const { return name_; }
00118 uint32_t getNumCallbacks() const { return callbacks_.size(); }
00119 uint32_t getNumPublishers();
00120
00121
00122
00123 class ROSCPP_DECL PendingConnection : public ASyncXMLRPCConnection
00124 {
00125 public:
00126 PendingConnection(XmlRpc::XmlRpcClient* client, TransportUDPPtr udp_transport, const SubscriptionWPtr& parent, const std::string& remote_uri)
00127 : client_(client)
00128 , udp_transport_(udp_transport)
00129 , parent_(parent)
00130 , remote_uri_(remote_uri)
00131 {}
00132
00133 ~PendingConnection()
00134 {
00135 delete client_;
00136 }
00137
00138 XmlRpc::XmlRpcClient* getClient() const { return client_; }
00139 TransportUDPPtr getUDPTransport() const { return udp_transport_; }
00140
00141 virtual void addToDispatch(XmlRpc::XmlRpcDispatch* disp)
00142 {
00143 disp->addSource(client_, XmlRpc::XmlRpcDispatch::WritableEvent | XmlRpc::XmlRpcDispatch::Exception);
00144 }
00145
00146 virtual void removeFromDispatch(XmlRpc::XmlRpcDispatch* disp)
00147 {
00148 disp->removeSource(client_);
00149 }
00150
00151 virtual bool check()
00152 {
00153 SubscriptionPtr parent = parent_.lock();
00154 if (!parent)
00155 {
00156 return true;
00157 }
00158
00159 XmlRpc::XmlRpcValue result;
00160 if (client_->executeCheckDone(result))
00161 {
00162 parent->pendingConnectionDone(boost::dynamic_pointer_cast<PendingConnection>(shared_from_this()), result);
00163 return true;
00164 }
00165
00166 return false;
00167 }
00168
00169 const std::string& getRemoteURI() { return remote_uri_; }
00170
00171 private:
00172 XmlRpc::XmlRpcClient* client_;
00173 TransportUDPPtr udp_transport_;
00174 SubscriptionWPtr parent_;
00175 std::string remote_uri_;
00176 };
00177 typedef boost::shared_ptr<PendingConnection> PendingConnectionPtr;
00178
00179 void pendingConnectionDone(const PendingConnectionPtr& pending_conn, XmlRpc::XmlRpcValue& result);
00180
00181 void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti);
00182
00183 void headerReceived(const PublisherLinkPtr& link, const Header& h);
00184
00185 private:
00186 Subscription(const Subscription &);
00187 Subscription &operator =(const Subscription &);
00188
00189 void dropAllConnections();
00190
00191 void addPublisherLink(const PublisherLinkPtr& link);
00192
00193 struct CallbackInfo
00194 {
00195 CallbackQueueInterface* callback_queue_;
00196
00197
00198 SubscriptionCallbackHelperPtr helper_;
00199 SubscriptionQueuePtr subscription_queue_;
00200 bool has_tracked_object_;
00201 VoidConstWPtr tracked_object_;
00202 };
00203 typedef boost::shared_ptr<CallbackInfo> CallbackInfoPtr;
00204 typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
00205
00206 std::string name_;
00207 boost::mutex md5sum_mutex_;
00208 std::string md5sum_;
00209 std::string datatype_;
00210 boost::mutex callbacks_mutex_;
00211 V_CallbackInfo callbacks_;
00212 uint32_t nonconst_callbacks_;
00213
00214 bool dropped_;
00215 bool shutting_down_;
00216 boost::mutex shutdown_mutex_;
00217
00218 typedef std::set<PendingConnectionPtr> S_PendingConnection;
00219 S_PendingConnection pending_connections_;
00220 boost::mutex pending_connections_mutex_;
00221
00222 typedef std::vector<PublisherLinkPtr> V_PublisherLink;
00223 V_PublisherLink publisher_links_;
00224 boost::mutex publisher_links_mutex_;
00225
00226 TransportHints transport_hints_;
00227
00228 struct LatchInfo
00229 {
00230 SerializedMessage message;
00231 PublisherLinkPtr link;
00232 boost::shared_ptr<std::map<std::string, std::string> > connection_header;
00233 ros::Time receipt_time;
00234 };
00235
00236 typedef std::map<PublisherLinkPtr, LatchInfo> M_PublisherLinkToLatchInfo;
00237 M_PublisherLinkToLatchInfo latched_messages_;
00238
00239 typedef std::vector<std::pair<const std::type_info*, MessageDeserializerPtr> > V_TypeAndDeserializer;
00240 V_TypeAndDeserializer cached_deserializers_;
00241 };
00242
00243 }
00244
00245 #endif
00246
roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Aug 28 2015 12:33:11