PubQueue.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2012 Open Source Robotics Foundation
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  *
00016  */
00017 /* copied from drcsim/ros/atlas_msgs/PubQueue.h at https://bitbucket.org/osrf/drcsim */
00018 
00019 #ifndef ROS_PUBQUEUE_H
00020 #define ROS_PUBQUEUE_H
00021 
00022 #include <boost/thread.hpp>
00023 #include <boost/bind.hpp>
00024 #include <deque>
00025 #include <list>
00026 #include <vector>
00027 
00028 #include <ros/ros.h>
00029 
00030 
00033 template<class T>
00034 class PubMessagePair
00035 {
00036   public:
00038     T msg_;
00040     ros::Publisher pub_;
00041     PubMessagePair(T& msg, ros::Publisher& pub) :
00042       msg_(msg), pub_(pub) {}
00043 };
00044 
00048 template<class T>
00049 class PubQueue
00050 {
00051   public:
00052     typedef boost::shared_ptr<std::deque<boost::shared_ptr<
00053       PubMessagePair<T> > > > QueuePtr;
00054     typedef boost::shared_ptr<PubQueue<T> > Ptr;
00055 
00056   private:
00058     QueuePtr queue_;
00060     boost::shared_ptr<boost::mutex> queue_lock_;
00062     boost::function<void()> notify_func_;
00063 
00064   public:
00065     PubQueue(QueuePtr queue, 
00066              boost::shared_ptr<boost::mutex> queue_lock,
00067              boost::function<void()> notify_func) :
00068       queue_(queue), queue_lock_(queue_lock), notify_func_(notify_func) {}
00069     ~PubQueue() {}
00070 
00074     void push(T& msg, ros::Publisher& pub)
00075     {
00076       boost::shared_ptr<PubMessagePair<T> > el(new PubMessagePair<T>(msg, pub));
00077       boost::mutex::scoped_lock lock(*queue_lock_);
00078       queue_->push_back(el);
00079       notify_func_();
00080     }
00081 
00084     void pop(std::vector<boost::shared_ptr<PubMessagePair<T> > >& els)
00085     {
00086       boost::mutex::scoped_lock lock(*queue_lock_);
00087       while(!queue_->empty())
00088       {
00089         els.push_back(queue_->front());
00090         queue_->pop_front();
00091       }
00092     }
00093 };
00094 
00097 class PubMultiQueue
00098 {
00099   private:
00101     std::list<boost::function<void()> > service_funcs_;
00103     boost::mutex service_funcs_lock_;
00105     boost::thread service_thread_;
00107     boost::condition_variable service_cond_var_;
00109     boost::mutex service_cond_var_lock_;
00110 
00113     template <class T> 
00114     void serviceFunc(boost::shared_ptr<PubQueue<T> > pq)
00115     {
00116       std::vector<boost::shared_ptr<PubMessagePair<T> > > els;
00117       pq->pop(els);
00118       for(typename std::vector<boost::shared_ptr<PubMessagePair<T> > >::iterator it = els.begin();
00119           it != els.end();
00120           ++it)
00121       {
00122         (*it)->pub_.publish((*it)->msg_);
00123       }
00124     }
00125   
00126   public:
00127     PubMultiQueue() {}
00128     ~PubMultiQueue() 
00129     {
00130       if(service_thread_.joinable())
00131       {
00132         notifyServiceThread();
00133         service_thread_.join();
00134       }
00135     }
00136 
00140     template <class T>
00141     boost::shared_ptr<PubQueue<T> > addPub()
00142     {
00143       typename PubQueue<T>::QueuePtr queue(new std::deque<boost::shared_ptr<PubMessagePair<T> > >);
00144       boost::shared_ptr<boost::mutex> queue_lock(new boost::mutex);
00145       boost::shared_ptr<PubQueue<T> > pq(new PubQueue<T>(queue, queue_lock, boost::bind(&PubMultiQueue::notifyServiceThread, this)));
00146       boost::function<void()> f = boost::bind(&PubMultiQueue::serviceFunc<T>, this, pq);
00147       {
00148         boost::mutex::scoped_lock lock(service_funcs_lock_);
00149         service_funcs_.push_back(f);
00150       }
00151       return pq;
00152     }
00153 
00155     void spinOnce()
00156     {
00157       boost::mutex::scoped_lock lock(service_funcs_lock_);
00158       for(std::list<boost::function<void()> >::iterator it = service_funcs_.begin();
00159           it != service_funcs_.end();
00160           ++it)
00161       {
00162         (*it)();
00163       }
00164     }
00165 
00168     void spin()
00169     {
00170       while(ros::ok())
00171       {
00172         boost::unique_lock<boost::mutex> lock(service_cond_var_lock_);
00173         service_cond_var_.wait(lock);
00174         spinOnce();
00175       }
00176     }
00177 
00179     void startServiceThread()
00180     {
00181       service_thread_ = boost::thread(boost::bind(&PubMultiQueue::spin, this));
00182     }
00183 
00186     void notifyServiceThread()
00187     {
00188       service_cond_var_.notify_one();
00189     }
00190 };
00191 
00192 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Friends Defines


hrpsys_gazebo_general
Author(s): Yohei Kakiuchi
autogenerated on Tue Jul 23 2013 11:49:13