00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef ROSCPP_SUBSCRIPTION_H
00029 #define ROSCPP_SUBSCRIPTION_H
00030
00031 #include <queue>
00032 #include "ros/common.h"
00033 #include "ros/header.h"
00034 #include "ros/forwards.h"
00035 #include "ros/transport_hints.h"
00036 #include "ros/xmlrpc_manager.h"
00037 #include "XmlRpc.h"
00038
00039 #include <boost/thread.hpp>
00040 #include <boost/shared_ptr.hpp>
00041 #include <boost/enable_shared_from_this.hpp>
00042
00043 namespace ros
00044 {
00045
00046 class PublisherLink;
00047 typedef boost::shared_ptr<PublisherLink> PublisherLinkPtr;
00048
00049 class SubscriptionCallback;
00050 typedef boost::shared_ptr<SubscriptionCallback> SubscriptionCallbackPtr;
00051
00052 class SubscriptionQueue;
00053 typedef boost::shared_ptr<SubscriptionQueue> SubscriptionQueuePtr;
00054
00055 class MessageDeserializer;
00056 typedef boost::shared_ptr<MessageDeserializer> MessageDeserializerPtr;
00057
00058 class SubscriptionCallbackHelper;
00059 typedef boost::shared_ptr<SubscriptionCallbackHelper> SubscriptionCallbackHelperPtr;
00060
00064 class Subscription : public boost::enable_shared_from_this<Subscription>
00065 {
00066 public:
00067 Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints);
00068 virtual ~Subscription();
00069
00073 void drop();
00077 void shutdown();
00082 bool pubUpdate(const std::vector<std::string> &pubs);
00087 bool negotiateConnection(const std::string& xmlrpc_uri);
00088
00089 void addLocalConnection(const PublicationPtr& pub);
00090
00094 bool isDropped() { return dropped_; }
00095 XmlRpc::XmlRpcValue getStats();
00096 void getInfo(XmlRpc::XmlRpcValue& info);
00097
00098 bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks);
00099 void removeCallback(const SubscriptionCallbackHelperPtr& helper);
00100
00101 typedef std::map<std::string, std::string> M_string;
00102
00107 uint32_t handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link);
00108
00109 const std::string datatype();
00110 const std::string md5sum();
00111
00115 void removePublisherLink(const PublisherLinkPtr& pub_link);
00116
00117 const std::string& getName() const { return name_; }
00118 uint32_t getNumCallbacks() const { return callbacks_.size(); }
00119 uint32_t getNumPublishers();
00120
00121
00122
00123 class PendingConnection : public ASyncXMLRPCConnection
00124 {
00125 public:
00126 PendingConnection(XmlRpc::XmlRpcClient* client, TransportUDPPtr udp_transport, const SubscriptionWPtr& parent, const std::string& remote_uri)
00127 : client_(client)
00128 , udp_transport_(udp_transport)
00129 , parent_(parent)
00130 , remote_uri_(remote_uri)
00131 {}
00132
00133 ~PendingConnection()
00134 {
00135 delete client_;
00136 }
00137
00138 XmlRpc::XmlRpcClient* getClient() const { return client_; }
00139 TransportUDPPtr getUDPTransport() const { return udp_transport_; }
00140
00141 virtual void addToDispatch(XmlRpc::XmlRpcDispatch* disp)
00142 {
00143 disp->addSource(client_, XmlRpc::XmlRpcDispatch::WritableEvent | XmlRpc::XmlRpcDispatch::Exception);
00144 }
00145
00146 virtual void removeFromDispatch(XmlRpc::XmlRpcDispatch* disp)
00147 {
00148 disp->removeSource(client_);
00149 }
00150
00151 virtual bool check()
00152 {
00153 SubscriptionPtr parent = parent_.lock();
00154 if (!parent)
00155 {
00156 return true;
00157 }
00158
00159 XmlRpc::XmlRpcValue result;
00160 if (client_->executeCheckDone(result))
00161 {
00162 parent->pendingConnectionDone(boost::dynamic_pointer_cast<PendingConnection>(shared_from_this()), result);
00163 return true;
00164 }
00165
00166 return false;
00167 }
00168
00169 const std::string& getRemoteURI() { return remote_uri_; }
00170
00171 private:
00172 XmlRpc::XmlRpcClient* client_;
00173 TransportUDPPtr udp_transport_;
00174 SubscriptionWPtr parent_;
00175 std::string remote_uri_;
00176 };
00177 typedef boost::shared_ptr<PendingConnection> PendingConnectionPtr;
00178
00179 void pendingConnectionDone(const PendingConnectionPtr& pending_conn, XmlRpc::XmlRpcValue& result);
00180
00181 void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti);
00182
00183 void headerReceived(const PublisherLinkPtr& link, const Header& h);
00184
00185 private:
00186 Subscription(const Subscription &);
00187 Subscription &operator =(const Subscription &);
00188
00189 void dropAllConnections();
00190
00191 void addPublisherLink(const PublisherLinkPtr& link);
00192
00193 struct CallbackInfo
00194 {
00195 CallbackQueueInterface* callback_queue_;
00196
00197
00198 SubscriptionCallbackHelperPtr helper_;
00199 SubscriptionQueuePtr subscription_queue_;
00200 bool has_tracked_object_;
00201 VoidConstWPtr tracked_object_;
00202 };
00203 typedef boost::shared_ptr<CallbackInfo> CallbackInfoPtr;
00204 typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
00205
00206 std::string name_;
00207 boost::mutex md5sum_mutex_;
00208 std::string md5sum_;
00209 std::string datatype_;
00210 boost::mutex callbacks_mutex_;
00211 V_CallbackInfo callbacks_;
00212 uint32_t nonconst_callbacks_;
00213
00214 bool dropped_;
00215 bool shutting_down_;
00216 boost::mutex shutdown_mutex_;
00217
00218 typedef std::set<PendingConnectionPtr> S_PendingConnection;
00219 S_PendingConnection pending_connections_;
00220 boost::mutex pending_connections_mutex_;
00221
00222 typedef std::vector<PublisherLinkPtr> V_PublisherLink;
00223 V_PublisherLink publisher_links_;
00224 boost::mutex publisher_links_mutex_;
00225
00226 TransportHints transport_hints_;
00227
00228 struct LatchInfo
00229 {
00230 SerializedMessage message;
00231 PublisherLinkPtr link;
00232 boost::shared_ptr<std::map<std::string, std::string> > connection_header;
00233 ros::Time receipt_time;
00234 };
00235
00236 typedef std::map<PublisherLinkPtr, LatchInfo> M_PublisherLinkToLatchInfo;
00237 M_PublisherLinkToLatchInfo latched_messages_;
00238
00239 typedef std::vector<std::pair<const std::type_info*, MessageDeserializerPtr> > V_TypeAndDeserializer;
00240 V_TypeAndDeserializer cached_deserializers_;
00241 };
00242
00243 }
00244
00245 #endif
00246