PubQueue.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2012 Open Source Robotics Foundation
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  *
00016  */
00017 
00018 #ifndef ROS_PUBQUEUE_H
00019 #define ROS_PUBQUEUE_H
00020 
00021 #include <boost/thread.hpp>
00022 #include <boost/bind.hpp>
00023 #include <deque>
00024 #include <list>
00025 #include <vector>
00026 
00027 #include <ros/ros.h>
00028 
00029 
00032 template<class T>
00033 class PubMessagePair
00034 {
00035   public:
00037     T msg_;
00039     ros::Publisher pub_;
00040     PubMessagePair(T& msg, ros::Publisher& pub) :
00041       msg_(msg), pub_(pub) {}
00042 };
00043 
00047 template<class T>
00048 class PubQueue
00049 {
00050   public:
00051     typedef boost::shared_ptr<std::deque<boost::shared_ptr<
00052       PubMessagePair<T> > > > QueuePtr;
00053     typedef boost::shared_ptr<PubQueue<T> > Ptr;
00054 
00055   private:
00057     QueuePtr queue_;
00059     boost::shared_ptr<boost::mutex> queue_lock_;
00061     boost::function<void()> notify_func_;
00062 
00063   public:
00064     PubQueue(QueuePtr queue, 
00065              boost::shared_ptr<boost::mutex> queue_lock,
00066              boost::function<void()> notify_func) :
00067       queue_(queue), queue_lock_(queue_lock), notify_func_(notify_func) {}
00068     ~PubQueue() {}
00069 
00073     void push(T& msg, ros::Publisher& pub)
00074     {
00075       boost::shared_ptr<PubMessagePair<T> > el(new PubMessagePair<T>(msg, pub));
00076       boost::mutex::scoped_lock lock(*queue_lock_);
00077       queue_->push_back(el);
00078       notify_func_();
00079     }
00080 
00083     void pop(std::vector<boost::shared_ptr<PubMessagePair<T> > >& els)
00084     {
00085       boost::mutex::scoped_lock lock(*queue_lock_);
00086       while(!queue_->empty())
00087       {
00088         els.push_back(queue_->front());
00089         queue_->pop_front();
00090       }
00091     }
00092 };
00093 
00096 class PubMultiQueue
00097 {
00098   private:
00100     std::list<boost::function<void()> > service_funcs_;
00102     boost::mutex service_funcs_lock_;
00104     boost::thread service_thread_;
00106     bool service_thread_running_;
00108     boost::condition_variable service_cond_var_;
00110     boost::mutex service_cond_var_lock_;
00111 
00114     template <class T> 
00115     void serviceFunc(boost::shared_ptr<PubQueue<T> > pq)
00116     {
00117       std::vector<boost::shared_ptr<PubMessagePair<T> > > els;
00118       pq->pop(els);
00119       for(typename std::vector<boost::shared_ptr<PubMessagePair<T> > >::iterator it = els.begin();
00120           it != els.end();
00121           ++it)
00122       {
00123         (*it)->pub_.publish((*it)->msg_);
00124       }
00125     }
00126   
00127   public:
00128     PubMultiQueue() {}
00129     ~PubMultiQueue() 
00130     {
00131       if(service_thread_.joinable())
00132       {
00133         service_thread_running_ = false;
00134         notifyServiceThread();
00135         service_thread_.join();
00136       }
00137     }
00138 
00142     template <class T>
00143     boost::shared_ptr<PubQueue<T> > addPub()
00144     {
00145       typename PubQueue<T>::QueuePtr queue(new std::deque<boost::shared_ptr<PubMessagePair<T> > >);
00146       boost::shared_ptr<boost::mutex> queue_lock(new boost::mutex);
00147       boost::shared_ptr<PubQueue<T> > pq(new PubQueue<T>(queue, queue_lock, boost::bind(&PubMultiQueue::notifyServiceThread, this)));
00148       boost::function<void()> f = boost::bind(&PubMultiQueue::serviceFunc<T>, this, pq);
00149       {
00150         boost::mutex::scoped_lock lock(service_funcs_lock_);
00151         service_funcs_.push_back(f);
00152       }
00153       return pq;
00154     }
00155 
00157     void spinOnce()
00158     {
00159       boost::mutex::scoped_lock lock(service_funcs_lock_);
00160       for(std::list<boost::function<void()> >::iterator it = service_funcs_.begin();
00161           it != service_funcs_.end();
00162           ++it)
00163       {
00164         (*it)();
00165       }
00166     }
00167 
00170     void spin()
00171     {
00172       while(ros::ok() && service_thread_running_)
00173       {
00174         boost::unique_lock<boost::mutex> lock(service_cond_var_lock_);
00175         service_cond_var_.wait(lock);
00176         spinOnce();
00177       }
00178     }
00179 
00181     void startServiceThread()
00182     {
00183       service_thread_running_ = true;
00184       service_thread_ = boost::thread(boost::bind(&PubMultiQueue::spin, this));
00185     }
00186 
00189     void notifyServiceThread()
00190     {
00191       service_cond_var_.notify_one();
00192     }
00193 };
00194 
00195 #endif


gazebo_plugins
Author(s): John Hsu
autogenerated on Thu Feb 23 2017 03:43:23