publisher.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2010, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of the Willow Garage nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 *********************************************************************/
00034 
00035 #include <rosrt/publisher.h>
00036 #include <rosrt/detail/publisher_manager.h>
00037 #include <rosrt/detail/managers.h>
00038 #include <rosrt/init.h>
00039 #include <ros/debug.h>
00040 
00041 #include <lockfree/object_pool.h>
00042 
00043 #include <boost/thread.hpp>
00044 
00045 namespace rosrt
00046 {
00047 namespace detail
00048 {
00049 
00050 bool publish(const ros::Publisher& pub, const VoidConstPtr& msg, PublishFunc pub_func, CloneFunc clone_func)
00051 {
00052   return detail::getPublisherManager()->publish(pub, msg, pub_func, clone_func);
00053 }
00054 
00055 PublishQueue::PublishQueue(uint32_t size)
00056 : queue_(size)
00057 {
00058 }
00059 
00060 bool PublishQueue::push(const ros::Publisher& pub, const VoidConstPtr& msg, PublishFunc pub_func, CloneFunc clone_func)
00061 {
00062   PubItem i;
00063   i.pub = pub;
00064   i.msg = msg;
00065   i.pub_func = pub_func;
00066   i.clone_func = clone_func;
00067   return queue_.push(i);
00068 }
00069 
00070 uint32_t PublishQueue::publishAll()
00071 {
00072   uint32_t count = 0;
00073 
00074   MWSRQueue<PubItem>::Node* it = queue_.popAll();
00075   while (it)
00076   {
00077     // Always clone the message before publishing.  Otherwise, if there's an intraprocess non-realtime subscriber that stores off the messages
00078     // it could starve the realtime publisher for messages.
00079     VoidConstPtr clone = it->val.clone_func(it->val.msg);
00080     it->val.pub_func(it->val.pub, clone);
00081     it->val.msg.reset();
00082     it->val.pub = ros::Publisher();
00083     MWSRQueue<PubItem>::Node* tmp = it;
00084     it = it->next;
00085 
00086     queue_.free(tmp);
00087 
00088     ++count;
00089   }
00090 
00091   return count;
00092 }
00093 
00094 PublisherManager::PublisherManager(const InitOptions& ops)
00095 : queue_(ops.pubmanager_queue_size)
00096 , pub_count_(0)
00097 , running_(true)
00098 {
00099   pub_thread_ = boost::thread(&PublisherManager::publishThread, this);
00100 }
00101 
00102 PublisherManager::~PublisherManager()
00103 {
00104   running_ = false;
00105   cond_.notify_one();
00106   pub_thread_.join();
00107 }
00108 
00109 void PublisherManager::publishThread()
00110 {
00111   while (running_)
00112   {
00113     {
00114       boost::mutex::scoped_lock lock(cond_mutex_);
00115       while (running_ && pub_count_.load() == 0)
00116       {
00117         cond_.wait(lock);
00118       }
00119 
00120       if (!running_)
00121       {
00122         return;
00123       }
00124     }
00125 
00126     uint32_t count = queue_.publishAll();
00127     pub_count_.fetch_sub(count);
00128   }
00129 }
00130 
00131 bool PublisherManager::publish(const ros::Publisher& pub, const VoidConstPtr& msg, PublishFunc pub_func, CloneFunc clone_func)
00132 {
00133   if (!queue_.push(pub, msg, pub_func, clone_func))
00134   {
00135     return false;
00136   }
00137 
00138   pub_count_.fetch_add(1);
00139   cond_.notify_one();
00140 
00141   return true;
00142 }
00143 
00144 } // namespace detail
00145 } // namespace rosrt


rosrt
Author(s): Josh Faust
autogenerated on Mon Oct 6 2014 06:54:58