Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef ROSCPP_SUBSCRIPTION_H
00029 #define ROSCPP_SUBSCRIPTION_H
00030
00031 #include <queue>
00032 #include "ros/common.h"
00033 #include "ros/header.h"
00034 #include "ros/forwards.h"
00035 #include "ros/transport_hints.h"
00036 #include "ros/xmlrpc_manager.h"
00037 #include "ros/statistics.h"
00038 #include "XmlRpc.h"
00039
00040 #include <boost/thread.hpp>
00041 #include <boost/shared_ptr.hpp>
00042 #include <boost/enable_shared_from_this.hpp>
00043
00044 namespace ros
00045 {
00046
00047 class PublisherLink;
00048 typedef boost::shared_ptr<PublisherLink> PublisherLinkPtr;
00049
00050 class SubscriptionCallback;
00051 typedef boost::shared_ptr<SubscriptionCallback> SubscriptionCallbackPtr;
00052
00053 class SubscriptionQueue;
00054 typedef boost::shared_ptr<SubscriptionQueue> SubscriptionQueuePtr;
00055
00056 class MessageDeserializer;
00057 typedef boost::shared_ptr<MessageDeserializer> MessageDeserializerPtr;
00058
00059 class SubscriptionCallbackHelper;
00060 typedef boost::shared_ptr<SubscriptionCallbackHelper> SubscriptionCallbackHelperPtr;
00061
00065 class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription>
00066 {
00067 public:
00068 Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints);
00069 virtual ~Subscription();
00070
00074 void drop();
00078 void shutdown();
00083 bool pubUpdate(const std::vector<std::string> &pubs);
00088 bool negotiateConnection(const std::string& xmlrpc_uri);
00089
00090 void addLocalConnection(const PublicationPtr& pub);
00091
00095 bool isDropped() { return dropped_; }
00096 XmlRpc::XmlRpcValue getStats();
00097 void getInfo(XmlRpc::XmlRpcValue& info);
00098
00099 bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks);
00100 void removeCallback(const SubscriptionCallbackHelperPtr& helper);
00101
00102 typedef std::map<std::string, std::string> M_string;
00103
00108 uint32_t handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link);
00109
00110 const std::string datatype();
00111 const std::string md5sum();
00112
00116 void removePublisherLink(const PublisherLinkPtr& pub_link);
00117
00118 const std::string& getName() const { return name_; }
00119 uint32_t getNumCallbacks() const { return callbacks_.size(); }
00120 uint32_t getNumPublishers();
00121
00122
00123
00124 class ROSCPP_DECL PendingConnection : public ASyncXMLRPCConnection
00125 {
00126 public:
00127 PendingConnection(XmlRpc::XmlRpcClient* client, TransportUDPPtr udp_transport, const SubscriptionWPtr& parent, const std::string& remote_uri)
00128 : client_(client)
00129 , udp_transport_(udp_transport)
00130 , parent_(parent)
00131 , remote_uri_(remote_uri)
00132 {}
00133
00134 ~PendingConnection()
00135 {
00136 delete client_;
00137 }
00138
00139 XmlRpc::XmlRpcClient* getClient() const { return client_; }
00140 TransportUDPPtr getUDPTransport() const { return udp_transport_; }
00141
00142 virtual void addToDispatch(XmlRpc::XmlRpcDispatch* disp)
00143 {
00144 disp->addSource(client_, XmlRpc::XmlRpcDispatch::WritableEvent | XmlRpc::XmlRpcDispatch::Exception);
00145 }
00146
00147 virtual void removeFromDispatch(XmlRpc::XmlRpcDispatch* disp)
00148 {
00149 disp->removeSource(client_);
00150 }
00151
00152 virtual bool check()
00153 {
00154 SubscriptionPtr parent = parent_.lock();
00155 if (!parent)
00156 {
00157 return true;
00158 }
00159
00160 XmlRpc::XmlRpcValue result;
00161 if (client_->executeCheckDone(result))
00162 {
00163 parent->pendingConnectionDone(boost::dynamic_pointer_cast<PendingConnection>(shared_from_this()), result);
00164 return true;
00165 }
00166
00167 return false;
00168 }
00169
00170 const std::string& getRemoteURI() { return remote_uri_; }
00171
00172 private:
00173 XmlRpc::XmlRpcClient* client_;
00174 TransportUDPPtr udp_transport_;
00175 SubscriptionWPtr parent_;
00176 std::string remote_uri_;
00177 };
00178 typedef boost::shared_ptr<PendingConnection> PendingConnectionPtr;
00179
00180 void pendingConnectionDone(const PendingConnectionPtr& pending_conn, XmlRpc::XmlRpcValue& result);
00181
00182 void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti);
00183
00184 void headerReceived(const PublisherLinkPtr& link, const Header& h);
00185
00186 private:
00187 Subscription(const Subscription &);
00188 Subscription &operator =(const Subscription &);
00189
00190 void dropAllConnections();
00191
00192 void addPublisherLink(const PublisherLinkPtr& link);
00193
00194 struct CallbackInfo
00195 {
00196 CallbackQueueInterface* callback_queue_;
00197
00198
00199 SubscriptionCallbackHelperPtr helper_;
00200 SubscriptionQueuePtr subscription_queue_;
00201 bool has_tracked_object_;
00202 VoidConstWPtr tracked_object_;
00203 };
00204 typedef boost::shared_ptr<CallbackInfo> CallbackInfoPtr;
00205 typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
00206
00207 std::string name_;
00208 boost::mutex md5sum_mutex_;
00209 std::string md5sum_;
00210 std::string datatype_;
00211 boost::mutex callbacks_mutex_;
00212 V_CallbackInfo callbacks_;
00213 uint32_t nonconst_callbacks_;
00214
00215 bool dropped_;
00216 bool shutting_down_;
00217 boost::mutex shutdown_mutex_;
00218
00219 typedef std::set<PendingConnectionPtr> S_PendingConnection;
00220 S_PendingConnection pending_connections_;
00221 boost::mutex pending_connections_mutex_;
00222
00223 typedef std::vector<PublisherLinkPtr> V_PublisherLink;
00224 V_PublisherLink publisher_links_;
00225 boost::mutex publisher_links_mutex_;
00226
00227 TransportHints transport_hints_;
00228
00229 StatisticsLogger statistics_;
00230
00231 struct LatchInfo
00232 {
00233 SerializedMessage message;
00234 PublisherLinkPtr link;
00235 boost::shared_ptr<std::map<std::string, std::string> > connection_header;
00236 ros::Time receipt_time;
00237 };
00238
00239 typedef std::map<PublisherLinkPtr, LatchInfo> M_PublisherLinkToLatchInfo;
00240 M_PublisherLinkToLatchInfo latched_messages_;
00241
00242 typedef std::vector<std::pair<const std::type_info*, MessageDeserializerPtr> > V_TypeAndDeserializer;
00243 V_TypeAndDeserializer cached_deserializers_;
00244 };
00245
00246 }
00247
00248 #endif
00249
roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Thu Jun 6 2019 21:10:05