Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef ROS_PUBQUEUE_H
00020 #define ROS_PUBQUEUE_H
00021
00022 #include <boost/thread.hpp>
00023 #include <boost/bind.hpp>
00024 #include <deque>
00025 #include <list>
00026 #include <vector>
00027
00028 #include <ros/ros.h>
00029
00030
00033 template<class T>
00034 class PubMessagePair
00035 {
00036 public:
00038 T msg_;
00040 ros::Publisher pub_;
00041 PubMessagePair(T& msg, ros::Publisher& pub) :
00042 msg_(msg), pub_(pub) {}
00043 };
00044
00048 template<class T>
00049 class PubQueue
00050 {
00051 public:
00052 typedef boost::shared_ptr<std::deque<boost::shared_ptr<
00053 PubMessagePair<T> > > > QueuePtr;
00054 typedef boost::shared_ptr<PubQueue<T> > Ptr;
00055
00056 private:
00058 QueuePtr queue_;
00060 boost::shared_ptr<boost::mutex> queue_lock_;
00062 boost::function<void()> notify_func_;
00063
00064 public:
00065 PubQueue(QueuePtr queue,
00066 boost::shared_ptr<boost::mutex> queue_lock,
00067 boost::function<void()> notify_func) :
00068 queue_(queue), queue_lock_(queue_lock), notify_func_(notify_func) {}
00069 ~PubQueue() {}
00070
00074 void push(T& msg, ros::Publisher& pub)
00075 {
00076 boost::shared_ptr<PubMessagePair<T> > el(new PubMessagePair<T>(msg, pub));
00077 boost::mutex::scoped_lock lock(*queue_lock_);
00078 queue_->push_back(el);
00079 notify_func_();
00080 }
00081
00084 void pop(std::vector<boost::shared_ptr<PubMessagePair<T> > >& els)
00085 {
00086 boost::mutex::scoped_lock lock(*queue_lock_);
00087 while(!queue_->empty())
00088 {
00089 els.push_back(queue_->front());
00090 queue_->pop_front();
00091 }
00092 }
00093 };
00094
00097 class PubMultiQueue
00098 {
00099 private:
00101 std::list<boost::function<void()> > service_funcs_;
00103 boost::mutex service_funcs_lock_;
00105 boost::thread service_thread_;
00107 bool service_thread_running_;
00109 boost::condition_variable service_cond_var_;
00111 boost::mutex service_cond_var_lock_;
00112
00115 template <class T>
00116 void serviceFunc(boost::shared_ptr<PubQueue<T> > pq)
00117 {
00118 std::vector<boost::shared_ptr<PubMessagePair<T> > > els;
00119 pq->pop(els);
00120 for(typename std::vector<boost::shared_ptr<PubMessagePair<T> > >::iterator it = els.begin();
00121 it != els.end();
00122 ++it)
00123 {
00124 (*it)->pub_.publish((*it)->msg_);
00125 }
00126 }
00127
00128 public:
00129 PubMultiQueue() {}
00130 ~PubMultiQueue()
00131 {
00132 if(service_thread_.joinable())
00133 {
00134 service_thread_running_ = false;
00135 notifyServiceThread();
00136 service_thread_.join();
00137 }
00138 }
00139
00143 template <class T>
00144 boost::shared_ptr<PubQueue<T> > addPub()
00145 {
00146 typename PubQueue<T>::QueuePtr queue(new std::deque<boost::shared_ptr<PubMessagePair<T> > >);
00147 boost::shared_ptr<boost::mutex> queue_lock(new boost::mutex);
00148 boost::shared_ptr<PubQueue<T> > pq(new PubQueue<T>(queue, queue_lock, boost::bind(&PubMultiQueue::notifyServiceThread, this)));
00149 boost::function<void()> f = boost::bind(&PubMultiQueue::serviceFunc<T>, this, pq);
00150 {
00151 boost::mutex::scoped_lock lock(service_funcs_lock_);
00152 service_funcs_.push_back(f);
00153 }
00154 return pq;
00155 }
00156
00158 void spinOnce()
00159 {
00160 boost::mutex::scoped_lock lock(service_funcs_lock_);
00161 for(std::list<boost::function<void()> >::iterator it = service_funcs_.begin();
00162 it != service_funcs_.end();
00163 ++it)
00164 {
00165 (*it)();
00166 }
00167 }
00168
00171 void spin()
00172 {
00173 while(ros::ok() && service_thread_running_)
00174 {
00175 boost::unique_lock<boost::mutex> lock(service_cond_var_lock_);
00176 service_cond_var_.wait(lock);
00177 spinOnce();
00178 }
00179 }
00180
00182 void startServiceThread()
00183 {
00184 service_thread_running_ = true;
00185 service_thread_ = boost::thread(boost::bind(&PubMultiQueue::spin, this));
00186 }
00187
00190 void notifyServiceThread()
00191 {
00192 service_cond_var_.notify_one();
00193 }
00194 };
00195
00196 #endif