callback_queue_manager.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2010, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00030 #include <nodelet/detail/callback_queue_manager.h>
00031 #include <nodelet/detail/callback_queue.h>
00032 #include <ros/callback_queue.h>
00033 
00034 #include <boost/thread/thread.hpp>
00035 #include <boost/thread/condition_variable.hpp>
00036 #include <boost/type_traits/alignment_of.hpp>
00037 #include <boost/bind.hpp>
00038 
00039 #include <ros/assert.h>
00040 
00041 namespace nodelet
00042 {
00043 namespace detail
00044 {
00045 
00046 CallbackQueueManager::CallbackQueueManager(uint32_t num_worker_threads)
00047 : running_(true),
00048   num_worker_threads_(num_worker_threads)
00049 {
00050   if (num_worker_threads_ == 0)
00051     num_worker_threads_ = boost::thread::hardware_concurrency();
00052   
00053   tg_.create_thread(boost::bind(&CallbackQueueManager::managerThread, this));
00054 
00055   size_t num_threads = getNumWorkerThreads();
00056   thread_info_.reset( new ThreadInfo[num_threads] );
00057   for (size_t i = 0; i < num_threads; ++i)
00058   {
00059     tg_.create_thread(boost::bind(&CallbackQueueManager::workerThread, this, &thread_info_[i]));
00060   }
00061 }
00062 
00063 CallbackQueueManager::~CallbackQueueManager()
00064 {
00065   stop();
00066   
00067 #ifdef NODELET_QUEUE_DEBUG
00068   // Write out task assignment histories for each thread
00069   typedef ThreadInfo::Record Record;
00070   for (size_t i = 0; i < num_threads; ++i)
00071   {
00072     char filename[128];
00073     sprintf(filename, "thread_history_%d.txt", (int)i);
00074     FILE* file = fopen(filename, "w");
00075     fprintf(file, "# timestamps tasks threaded\n");
00076     const std::vector<Record>& history = thread_info_[i].history;
00077     for (int j = 0; j < (int)history.size(); ++j)
00078     {
00079       Record r = history[j];
00080       fprintf(file, "%.6f %u %d\n", r.stamp, r.tasks, (int)r.threaded);
00081     }
00082     fclose(file);
00083   }
00084 #endif
00085 }
00086 
00087 void CallbackQueueManager::stop()
00088 {
00089   running_ = false;
00090   {
00091     boost::mutex::scoped_lock lock(waiting_mutex_);
00092     waiting_cond_.notify_all();
00093   }
00094 
00095   size_t num_threads = getNumWorkerThreads();
00096   for (size_t i = 0; i < num_threads; ++i)
00097   {
00098     boost::mutex::scoped_lock lock(thread_info_[i].queue_mutex);
00099     thread_info_[i].queue_cond.notify_all();
00100   }
00101 
00102   tg_.join_all();
00103 }
00104 
00105 uint32_t CallbackQueueManager::getNumWorkerThreads()
00106 {
00107   return num_worker_threads_;
00108 }
00109 
00110 void CallbackQueueManager::addQueue(const CallbackQueuePtr& queue, bool threaded)
00111 {
00112   boost::mutex::scoped_lock lock(queues_mutex_);
00113 
00114   QueueInfoPtr& info = queues_[queue.get()];
00115   ROS_ASSERT(!info);
00116   info.reset(new QueueInfo);
00117   info->queue = queue;
00118   info->threaded = threaded;
00119 }
00120 
00121 void CallbackQueueManager::removeQueue(const CallbackQueuePtr& queue)
00122 {
00123   boost::mutex::scoped_lock lock(queues_mutex_);
00124   ROS_ASSERT(queues_.find(queue.get()) != queues_.end());
00125 
00126   queues_.erase(queue.get());
00127 }
00128 
00129 void CallbackQueueManager::callbackAdded(const CallbackQueuePtr& queue)
00130 {
00131   {
00132     boost::mutex::scoped_lock lock(waiting_mutex_);
00133     waiting_.push_back(queue);
00134   }
00135 
00136   waiting_cond_.notify_all();
00137 }
00138 
00139 CallbackQueueManager::ThreadInfo* CallbackQueueManager::getSmallestQueue()
00140 {
00141   size_t smallest = std::numeric_limits<size_t>::max();
00142   uint32_t smallest_index = 0xffffffff;
00143   for (unsigned i = 0; i < num_worker_threads_; ++i)
00144   {
00145     ThreadInfo& ti = thread_info_[i];
00146 
00147     size_t size = ti.calling;
00148     if (size == 0)
00149     {
00150       return &ti;
00151     }
00152 
00153     if (size < smallest)
00154     {
00155       smallest = size;
00156       smallest_index = i;
00157     }
00158   }
00159 
00160   return &thread_info_[smallest_index];
00161 }
00162 
00163 void CallbackQueueManager::managerThread()
00164 {
00165   V_Queue local_waiting;
00166 
00167   while (running_)
00168   {
00169     {
00170       boost::mutex::scoped_lock lock(waiting_mutex_);
00171 
00172       while (waiting_.empty() && running_)
00173       {
00174         waiting_cond_.wait(lock);
00175       }
00176 
00177       if (!running_)
00178       {
00179         return;
00180       }
00181 
00182       local_waiting.swap(waiting_);
00183     }
00184 
00185     {
00186       boost::mutex::scoped_lock lock(queues_mutex_);
00187 
00188       V_Queue::iterator it = local_waiting.begin();
00189       V_Queue::iterator end = local_waiting.end();
00190       for (; it != end; ++it)
00191       {
00192         CallbackQueuePtr& queue = *it;
00193 
00194         M_Queue::iterator it = queues_.find(queue.get());
00195         if (it != queues_.end())
00196         {
00197           QueueInfoPtr& info = it->second;
00198           ThreadInfo* ti = 0;
00199           if (info->threaded)
00200           {
00201             // If this queue is thread-safe we immediately add it to the thread with the least work queued
00202             ti = getSmallestQueue();
00203           }
00204           else
00205           {
00206             // If this queue is non-thread-safe and has no in-progress calls happening, we add it to the
00207             // thread with the least work queued.  If the queue already has calls in-progress we add it
00208             // to the thread it's already being called from
00209             boost::mutex::scoped_lock lock(info->st_mutex);
00210 
00211             if (info->in_thread == 0)
00212             {
00213               ti = getSmallestQueue();
00214               info->thread_index = ti - thread_info_.get();
00215             }
00216             else
00217             {
00218               ti = &thread_info_[info->thread_index];
00219             }
00220 
00221             ++info->in_thread;
00222           }
00223 
00224           {
00225             boost::mutex::scoped_lock lock(ti->queue_mutex);
00226             ti->queue.push_back(std::make_pair(queue, info));
00227             ++ti->calling;
00228 #ifdef NODELET_QUEUE_DEBUG
00229             double stamp = ros::WallTime::now().toSec();
00230             uint32_t tasks = ti->calling;
00231             ti->history.push_back(ThreadInfo::Record(stamp, tasks, true));
00232 #endif
00233           }
00234 
00235           ti->queue_cond.notify_all();
00236         }
00237       }
00238     }
00239 
00240     local_waiting.clear();
00241   }
00242 }
00243 
00244 void CallbackQueueManager::workerThread(ThreadInfo* info)
00245 {
00246   std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > local_queues;
00247 
00248   while (running_)
00249   {
00250     {
00251       boost::mutex::scoped_lock lock(info->queue_mutex);
00252 
00253       while (info->queue.empty() && running_)
00254       {
00255         info->queue_cond.wait(lock);
00256       }
00257 
00258       if (!running_)
00259       {
00260         return;
00261       }
00262 
00263       info->queue.swap(local_queues);
00264     }
00265 
00266     std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator it = local_queues.begin();
00267     std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator end = local_queues.end();
00268     for (; it != end; ++it)
00269     {
00270       CallbackQueuePtr& queue = it->first;
00271       QueueInfoPtr& qi = it->second;
00272       if (queue->callOne() == ros::CallbackQueue::TryAgain)
00273       {
00274         callbackAdded(queue);
00275       }
00276       --info->calling;
00277 
00278       if (!qi->threaded)
00279       {
00280         boost::mutex::scoped_lock lock(qi->st_mutex);
00281         --qi->in_thread;
00282       }
00283     }
00284 
00285     local_queues.clear();
00286 
00287   }
00288 }
00289 
00290 } // namespace detail
00291 } // namespace nodelet


nodelet
Author(s): Tully Foote, Radu Bogdan Rusu
autogenerated on Wed Aug 26 2015 14:56:45