00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef ROSCPP_PUBLICATION_H
00029 #define ROSCPP_PUBLICATION_H
00030
00031 #include "ros/forwards.h"
00032 #include "ros/advertise_options.h"
00033 #include "common.h"
00034 #include "XmlRpc.h"
00035
00036 #include <boost/thread/mutex.hpp>
00037
00038 #include <boost/shared_ptr.hpp>
00039 #include <boost/shared_array.hpp>
00040
00041 #include <vector>
00042
00043 namespace ros
00044 {
00045
00046 class SubscriberLink;
00047 typedef boost::shared_ptr<SubscriberLink> SubscriberLinkPtr;
00048 typedef std::vector<SubscriberLinkPtr> V_SubscriberLink;
00049
00053 class ROSCPP_DECL Publication
00054 {
00055 public:
00056 Publication(const std::string &name,
00057 const std::string& datatype,
00058 const std::string& _md5sum,
00059 const std::string& message_definition,
00060 size_t max_queue,
00061 bool latch,
00062 bool has_header);
00063
00064 ~Publication();
00065
00066 void addCallbacks(const SubscriberCallbacksPtr& callbacks);
00067 void removeCallbacks(const SubscriberCallbacksPtr& callbacks);
00068
00072 bool enqueueMessage(const SerializedMessage& m);
00076 inline size_t getMaxQueue() { return max_queue_; }
00080 XmlRpc::XmlRpcValue getStats();
00084 void getInfo(XmlRpc::XmlRpcValue& info);
00085
00089 bool hasSubscribers();
00093 uint32_t getNumSubscribers();
00094
00095 void getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti);
00096
00100 const std::string& getName() const { return name_; }
00104 const std::string& getDataType() const { return datatype_; }
00108 const std::string& getMD5Sum() const { return md5sum_; }
00112 const std::string& getMessageDefinition() const { return message_definition_; }
00116 uint32_t getSequence() { return seq_; }
00117
00118 bool isLatched() { return latch_; }
00119
00123 void addSubscriberLink(const SubscriberLinkPtr& sub_link);
00127 void removeSubscriberLink(const SubscriberLinkPtr& sub_link);
00128
00132 void drop();
00136 bool isDropped() { return dropped_; }
00137
00138 uint32_t incrementSequence();
00139
00140 size_t getNumCallbacks();
00141
00142 bool isLatching() { return latch_; }
00143
00144 void publish(SerializedMessage& m);
00145 void processPublishQueue();
00146
00147 bool validateHeader(const Header& h, std::string& error_msg);
00148
00149 private:
00150 void dropAllConnections();
00151
00155 void peerConnect(const SubscriberLinkPtr& sub_link);
00159 void peerDisconnect(const SubscriberLinkPtr& sub_link);
00160
00161 std::string name_;
00162 std::string datatype_;
00163 std::string md5sum_;
00164 std::string message_definition_;
00165 size_t max_queue_;
00166 uint32_t seq_;
00167 boost::mutex seq_mutex_;
00168
00169 typedef std::vector<SubscriberCallbacksPtr> V_Callback;
00170 V_Callback callbacks_;
00171 boost::mutex callbacks_mutex_;
00172
00173 V_SubscriberLink subscriber_links_;
00174
00175 boost::mutex subscriber_links_mutex_;
00176
00177 bool dropped_;
00178
00179 bool latch_;
00180 bool has_header_;
00181 SerializedMessage last_message_;
00182
00183 uint32_t intraprocess_subscriber_count_;
00184
00185 typedef std::vector<SerializedMessage> V_SerializedMessage;
00186 V_SerializedMessage publish_queue_;
00187 boost::mutex publish_queue_mutex_;
00188 };
00189
00190 }
00191
00192 #endif // ROSCPP_PUBLICATION_H