Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef ROS_PUBQUEUE_H
00019 #define ROS_PUBQUEUE_H
00020
00021 #include <boost/thread.hpp>
00022 #include <boost/bind.hpp>
00023 #include <deque>
00024 #include <list>
00025 #include <vector>
00026
00027 #include <ros/ros.h>
00028
00029
00032 template<class T>
00033 class PubMessagePair
00034 {
00035 public:
00037 T msg_;
00039 ros::Publisher pub_;
00040 PubMessagePair(T& msg, ros::Publisher& pub) :
00041 msg_(msg), pub_(pub) {}
00042 };
00043
00047 template<class T>
00048 class PubQueue
00049 {
00050 public:
00051 typedef boost::shared_ptr<std::deque<boost::shared_ptr<
00052 PubMessagePair<T> > > > QueuePtr;
00053 typedef boost::shared_ptr<PubQueue<T> > Ptr;
00054
00055 private:
00057 QueuePtr queue_;
00059 boost::shared_ptr<boost::mutex> queue_lock_;
00061 boost::function<void()> notify_func_;
00062
00063 public:
00064 PubQueue(QueuePtr queue,
00065 boost::shared_ptr<boost::mutex> queue_lock,
00066 boost::function<void()> notify_func) :
00067 queue_(queue), queue_lock_(queue_lock), notify_func_(notify_func) {}
00068 ~PubQueue() {}
00069
00073 void push(T& msg, ros::Publisher& pub)
00074 {
00075 boost::shared_ptr<PubMessagePair<T> > el(new PubMessagePair<T>(msg, pub));
00076 boost::mutex::scoped_lock lock(*queue_lock_);
00077 queue_->push_back(el);
00078 notify_func_();
00079 }
00080
00083 void pop(std::vector<boost::shared_ptr<PubMessagePair<T> > >& els)
00084 {
00085 boost::mutex::scoped_lock lock(*queue_lock_);
00086 while(!queue_->empty())
00087 {
00088 els.push_back(queue_->front());
00089 queue_->pop_front();
00090 }
00091 }
00092 };
00093
00096 class PubMultiQueue
00097 {
00098 private:
00100 std::list<boost::function<void()> > service_funcs_;
00102 boost::mutex service_funcs_lock_;
00104 boost::thread service_thread_;
00106 bool service_thread_running_;
00108 boost::condition_variable service_cond_var_;
00110 boost::mutex service_cond_var_lock_;
00111
00114 template <class T>
00115 void serviceFunc(boost::shared_ptr<PubQueue<T> > pq)
00116 {
00117 std::vector<boost::shared_ptr<PubMessagePair<T> > > els;
00118 pq->pop(els);
00119 for(typename std::vector<boost::shared_ptr<PubMessagePair<T> > >::iterator it = els.begin();
00120 it != els.end();
00121 ++it)
00122 {
00123 (*it)->pub_.publish((*it)->msg_);
00124 }
00125 }
00126
00127 public:
00128 PubMultiQueue() {}
00129 ~PubMultiQueue()
00130 {
00131 if(service_thread_.joinable())
00132 {
00133 service_thread_running_ = false;
00134 notifyServiceThread();
00135 service_thread_.join();
00136 }
00137 }
00138
00142 template <class T>
00143 boost::shared_ptr<PubQueue<T> > addPub()
00144 {
00145 typename PubQueue<T>::QueuePtr queue(new std::deque<boost::shared_ptr<PubMessagePair<T> > >);
00146 boost::shared_ptr<boost::mutex> queue_lock(new boost::mutex);
00147 boost::shared_ptr<PubQueue<T> > pq(new PubQueue<T>(queue, queue_lock, boost::bind(&PubMultiQueue::notifyServiceThread, this)));
00148 boost::function<void()> f = boost::bind(&PubMultiQueue::serviceFunc<T>, this, pq);
00149 {
00150 boost::mutex::scoped_lock lock(service_funcs_lock_);
00151 service_funcs_.push_back(f);
00152 }
00153 return pq;
00154 }
00155
00157 void spinOnce()
00158 {
00159 boost::mutex::scoped_lock lock(service_funcs_lock_);
00160 for(std::list<boost::function<void()> >::iterator it = service_funcs_.begin();
00161 it != service_funcs_.end();
00162 ++it)
00163 {
00164 (*it)();
00165 }
00166 }
00167
00170 void spin()
00171 {
00172 while(ros::ok() && service_thread_running_)
00173 {
00174 boost::unique_lock<boost::mutex> lock(service_cond_var_lock_);
00175 service_cond_var_.wait(lock);
00176 spinOnce();
00177 }
00178 }
00179
00181 void startServiceThread()
00182 {
00183 service_thread_running_ = true;
00184 service_thread_ = boost::thread(boost::bind(&PubMultiQueue::spin, this));
00185 }
00186
00189 void notifyServiceThread()
00190 {
00191 service_cond_var_.notify_one();
00192 }
00193 };
00194
00195 #endif