Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef ROS_PUBQUEUE_H
00020 #define ROS_PUBQUEUE_H
00021
00022 #include <boost/thread.hpp>
00023 #include <boost/bind.hpp>
00024 #include <deque>
00025 #include <list>
00026 #include <vector>
00027
00028 #include <ros/ros.h>
00029
00030
00033 template<class T>
00034 class PubMessagePair
00035 {
00036 public:
00038 T msg_;
00040 ros::Publisher pub_;
00041 PubMessagePair(T& msg, ros::Publisher& pub) :
00042 msg_(msg), pub_(pub) {}
00043 };
00044
00048 template<class T>
00049 class PubQueue
00050 {
00051 public:
00052 typedef boost::shared_ptr<std::deque<boost::shared_ptr<
00053 PubMessagePair<T> > > > QueuePtr;
00054 typedef boost::shared_ptr<PubQueue<T> > Ptr;
00055
00056 private:
00058 QueuePtr queue_;
00060 boost::shared_ptr<boost::mutex> queue_lock_;
00062 boost::function<void()> notify_func_;
00063
00064 public:
00065 PubQueue(QueuePtr queue,
00066 boost::shared_ptr<boost::mutex> queue_lock,
00067 boost::function<void()> notify_func) :
00068 queue_(queue), queue_lock_(queue_lock), notify_func_(notify_func) {}
00069 ~PubQueue() {}
00070
00074 void push(T& msg, ros::Publisher& pub)
00075 {
00076 boost::shared_ptr<PubMessagePair<T> > el(new PubMessagePair<T>(msg, pub));
00077 boost::mutex::scoped_lock lock(*queue_lock_);
00078 queue_->push_back(el);
00079 notify_func_();
00080 }
00081
00084 void pop(std::vector<boost::shared_ptr<PubMessagePair<T> > >& els)
00085 {
00086 boost::mutex::scoped_lock lock(*queue_lock_);
00087 while(!queue_->empty())
00088 {
00089 els.push_back(queue_->front());
00090 queue_->pop_front();
00091 }
00092 }
00093 };
00094
00097 class PubMultiQueue
00098 {
00099 private:
00101 std::list<boost::function<void()> > service_funcs_;
00103 boost::mutex service_funcs_lock_;
00105 boost::thread service_thread_;
00107 boost::condition_variable service_cond_var_;
00109 boost::mutex service_cond_var_lock_;
00110
00113 template <class T>
00114 void serviceFunc(boost::shared_ptr<PubQueue<T> > pq)
00115 {
00116 std::vector<boost::shared_ptr<PubMessagePair<T> > > els;
00117 pq->pop(els);
00118 for(typename std::vector<boost::shared_ptr<PubMessagePair<T> > >::iterator it = els.begin();
00119 it != els.end();
00120 ++it)
00121 {
00122 (*it)->pub_.publish((*it)->msg_);
00123 }
00124 }
00125
00126 public:
00127 PubMultiQueue() {}
00128 ~PubMultiQueue()
00129 {
00130 if(service_thread_.joinable())
00131 {
00132 notifyServiceThread();
00133 service_thread_.join();
00134 }
00135 }
00136
00140 template <class T>
00141 boost::shared_ptr<PubQueue<T> > addPub()
00142 {
00143 typename PubQueue<T>::QueuePtr queue(new std::deque<boost::shared_ptr<PubMessagePair<T> > >);
00144 boost::shared_ptr<boost::mutex> queue_lock(new boost::mutex);
00145 boost::shared_ptr<PubQueue<T> > pq(new PubQueue<T>(queue, queue_lock, boost::bind(&PubMultiQueue::notifyServiceThread, this)));
00146 boost::function<void()> f = boost::bind(&PubMultiQueue::serviceFunc<T>, this, pq);
00147 {
00148 boost::mutex::scoped_lock lock(service_funcs_lock_);
00149 service_funcs_.push_back(f);
00150 }
00151 return pq;
00152 }
00153
00155 void spinOnce()
00156 {
00157 boost::mutex::scoped_lock lock(service_funcs_lock_);
00158 for(std::list<boost::function<void()> >::iterator it = service_funcs_.begin();
00159 it != service_funcs_.end();
00160 ++it)
00161 {
00162 (*it)();
00163 }
00164 }
00165
00168 void spin()
00169 {
00170 while(ros::ok())
00171 {
00172 boost::unique_lock<boost::mutex> lock(service_cond_var_lock_);
00173 service_cond_var_.wait(lock);
00174 spinOnce();
00175 }
00176 }
00177
00179 void startServiceThread()
00180 {
00181 service_thread_ = boost::thread(boost::bind(&PubMultiQueue::spin, this));
00182 }
00183
00186 void notifyServiceThread()
00187 {
00188 service_cond_var_.notify_one();
00189 }
00190 };
00191
00192 #endif