PubQueue.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2012 Open Source Robotics Foundation
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  *
00016  */
00017 /* copied from https://github.com/ros-simulation/gazebo_ros_pkgs/blob/hydro-devel/gazebo_plugins/include/gazebo_plugins/PubQueue.h */
00018 
00019 #ifndef ROS_PUBQUEUE_H
00020 #define ROS_PUBQUEUE_H
00021 
00022 #include <boost/thread.hpp>
00023 #include <boost/bind.hpp>
00024 #include <deque>
00025 #include <list>
00026 #include <vector>
00027 
00028 #include <ros/ros.h>
00029 
00030 
00033 template<class T>
00034 class PubMessagePair
00035 {
00036   public:
00038     T msg_;
00040     ros::Publisher pub_;
00041     PubMessagePair(T& msg, ros::Publisher& pub) :
00042       msg_(msg), pub_(pub) {}
00043 };
00044 
00048 template<class T>
00049 class PubQueue
00050 {
00051   public:
00052     typedef boost::shared_ptr<std::deque<boost::shared_ptr<
00053       PubMessagePair<T> > > > QueuePtr;
00054     typedef boost::shared_ptr<PubQueue<T> > Ptr;
00055 
00056   private:
00058     QueuePtr queue_;
00060     boost::shared_ptr<boost::mutex> queue_lock_;
00062     boost::function<void()> notify_func_;
00063 
00064   public:
00065     PubQueue(QueuePtr queue, 
00066              boost::shared_ptr<boost::mutex> queue_lock,
00067              boost::function<void()> notify_func) :
00068       queue_(queue), queue_lock_(queue_lock), notify_func_(notify_func) {}
00069     ~PubQueue() {}
00070 
00074     void push(T& msg, ros::Publisher& pub)
00075     {
00076       boost::shared_ptr<PubMessagePair<T> > el(new PubMessagePair<T>(msg, pub));
00077       boost::mutex::scoped_lock lock(*queue_lock_);
00078       queue_->push_back(el);
00079       notify_func_();
00080     }
00081 
00084     void pop(std::vector<boost::shared_ptr<PubMessagePair<T> > >& els)
00085     {
00086       boost::mutex::scoped_lock lock(*queue_lock_);
00087       while(!queue_->empty())
00088       {
00089         els.push_back(queue_->front());
00090         queue_->pop_front();
00091       }
00092     }
00093 };
00094 
00097 class PubMultiQueue
00098 {
00099   private:
00101     std::list<boost::function<void()> > service_funcs_;
00103     boost::mutex service_funcs_lock_;
00105     boost::thread service_thread_;
00107     bool service_thread_running_;
00109     boost::condition_variable service_cond_var_;
00111     boost::mutex service_cond_var_lock_;
00112 
00115     template <class T> 
00116     void serviceFunc(boost::shared_ptr<PubQueue<T> > pq)
00117     {
00118       std::vector<boost::shared_ptr<PubMessagePair<T> > > els;
00119       pq->pop(els);
00120       for(typename std::vector<boost::shared_ptr<PubMessagePair<T> > >::iterator it = els.begin();
00121           it != els.end();
00122           ++it)
00123       {
00124         (*it)->pub_.publish((*it)->msg_);
00125       }
00126     }
00127   
00128   public:
00129     PubMultiQueue() {}
00130     ~PubMultiQueue() 
00131     {
00132       if(service_thread_.joinable())
00133       {
00134         service_thread_running_ = false;
00135         notifyServiceThread();
00136         service_thread_.join();
00137       }
00138     }
00139 
00143     template <class T>
00144     boost::shared_ptr<PubQueue<T> > addPub()
00145     {
00146       typename PubQueue<T>::QueuePtr queue(new std::deque<boost::shared_ptr<PubMessagePair<T> > >);
00147       boost::shared_ptr<boost::mutex> queue_lock(new boost::mutex);
00148       boost::shared_ptr<PubQueue<T> > pq(new PubQueue<T>(queue, queue_lock, boost::bind(&PubMultiQueue::notifyServiceThread, this)));
00149       boost::function<void()> f = boost::bind(&PubMultiQueue::serviceFunc<T>, this, pq);
00150       {
00151         boost::mutex::scoped_lock lock(service_funcs_lock_);
00152         service_funcs_.push_back(f);
00153       }
00154       return pq;
00155     }
00156 
00158     void spinOnce()
00159     {
00160       boost::mutex::scoped_lock lock(service_funcs_lock_);
00161       for(std::list<boost::function<void()> >::iterator it = service_funcs_.begin();
00162           it != service_funcs_.end();
00163           ++it)
00164       {
00165         (*it)();
00166       }
00167     }
00168 
00171     void spin()
00172     {
00173       while(ros::ok() && service_thread_running_)
00174       {
00175         boost::unique_lock<boost::mutex> lock(service_cond_var_lock_);
00176         service_cond_var_.wait(lock);
00177         spinOnce();
00178       }
00179     }
00180 
00182     void startServiceThread()
00183     {
00184       service_thread_running_ = true;
00185       service_thread_ = boost::thread(boost::bind(&PubMultiQueue::spin, this));
00186     }
00187 
00190     void notifyServiceThread()
00191     {
00192       service_cond_var_.notify_one();
00193     }
00194 };
00195 
00196 #endif


hrpsys_gazebo_general
Author(s): Yohei Kakiuchi , Kei Okada
autogenerated on Wed Sep 16 2015 10:52:48