Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 #include <rosrt/publisher.h>
00036 #include <rosrt/detail/publisher_manager.h>
00037 #include <rosrt/detail/managers.h>
00038 #include <rosrt/init.h>
00039 #include <ros/debug.h>
00040 
00041 #include <lockfree/object_pool.h>
00042 
00043 #include "boost/thread.hpp"
00044 
00045 namespace rosrt
00046 {
00047 namespace detail
00048 {
00049 
00050 bool publish(const ros::Publisher& pub, const VoidConstPtr& msg, PublishFunc pub_func, CloneFunc clone_func)
00051 {
00052   return detail::getPublisherManager()->publish(pub, msg, pub_func, clone_func);
00053 }
00054 
00055 PublishQueue::PublishQueue(uint32_t size)
00056 : queue_(size)
00057 {
00058 }
00059 
00060 bool PublishQueue::push(const ros::Publisher& pub, const VoidConstPtr& msg, PublishFunc pub_func, CloneFunc clone_func)
00061 {
00062   PubItem i;
00063   i.pub = pub;
00064   i.msg = msg;
00065   i.pub_func = pub_func;
00066   i.clone_func = clone_func;
00067   return queue_.push(i);
00068 }
00069 
00070 uint32_t PublishQueue::publishAll()
00071 {
00072   uint32_t count = 0;
00073 
00074   MWSRQueue<PubItem>::Node* it = queue_.popAll();
00075   while (it)
00076   {
00077     
00078     
00079     VoidConstPtr clone = it->val.clone_func(it->val.msg);
00080     it->val.pub_func(it->val.pub, clone);
00081     it->val.msg.reset();
00082     it->val.pub = ros::Publisher();
00083     MWSRQueue<PubItem>::Node* tmp = it;
00084     it = it->next;
00085 
00086     queue_.free(tmp);
00087 
00088     ++count;
00089   }
00090 
00091   return count;
00092 }
00093 
00094 PublisherManager::PublisherManager(const InitOptions& ops)
00095 : queue_(ops.pubmanager_queue_size)
00096 , pub_count_(0)
00097 , running_(true)
00098 {
00099   pub_thread_ = boost::thread(&PublisherManager::publishThread, this);
00100 }
00101 
00102 PublisherManager::~PublisherManager()
00103 {
00104   running_ = false;
00105   cond_.notify_one();
00106   pub_thread_.join();
00107 }
00108 
00109 void PublisherManager::publishThread()
00110 {
00111   while (running_)
00112   {
00113     {
00114       boost::mutex::scoped_lock lock(cond_mutex_);
00115       while (running_ && pub_count_.load() == 0)
00116       {
00117         cond_.wait(lock);
00118       }
00119 
00120       if (!running_)
00121       {
00122         return;
00123       }
00124     }
00125 
00126     uint32_t count = queue_.publishAll();
00127     pub_count_.fetch_sub(count);
00128   }
00129 }
00130 
00131 bool PublisherManager::publish(const ros::Publisher& pub, const VoidConstPtr& msg, PublishFunc pub_func, CloneFunc clone_func)
00132 {
00133   if (!queue_.push(pub, msg, pub_func, clone_func))
00134   {
00135     return false;
00136   }
00137 
00138   pub_count_.fetch_add(1);
00139   cond_.notify_one();
00140 
00141   return true;
00142 }
00143 
00144 } 
00145 }