spinner.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/spinner.h"
00029 #include "ros/ros.h"
00030 #include "ros/callback_queue.h"
00031 
00032 #include <boost/thread/thread.hpp>
00033 #include <boost/thread/recursive_mutex.hpp>
00034 
00035 namespace {
00036   boost::recursive_mutex spinmutex;
00037 }
00038 
00039 namespace ros
00040 {
00041 
00042 
00043 void SingleThreadedSpinner::spin(CallbackQueue* queue)
00044 {
00045   boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00046   if(!spinlock.owns_lock()) {
00047     ROS_ERROR("SingleThreadedSpinner: You've attempted to call spin "
00048               "from multiple threads.  Use a MultiThreadedSpinner instead.");
00049     return;
00050   }
00051 
00052   ros::WallDuration timeout(0.1f);
00053 
00054   if (!queue)
00055   {
00056     queue = getGlobalCallbackQueue();
00057   }
00058 
00059   ros::NodeHandle n;
00060   while (n.ok())
00061   {
00062     queue->callAvailable(timeout);
00063   }
00064 }
00065 
00066 MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)
00067 : thread_count_(thread_count)
00068 {
00069 }
00070 
00071 void MultiThreadedSpinner::spin(CallbackQueue* queue)
00072 {
00073   boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00074   if (!spinlock.owns_lock()) {
00075     ROS_ERROR("MultiThreadeSpinner: You've attempted to call ros::spin "
00076               "from multiple threads... "
00077               "but this spinner is already multithreaded.");
00078     return;
00079   }
00080   AsyncSpinner s(thread_count_, queue);
00081   s.start();
00082 
00083   ros::waitForShutdown();
00084 }
00085 
00086 class AsyncSpinnerImpl
00087 {
00088 public:
00089   AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue);
00090   ~AsyncSpinnerImpl();
00091 
00092   bool canStart();
00093   void start();
00094   void stop();
00095 
00096 private:
00097   void threadFunc();
00098 
00099   boost::mutex mutex_;
00100   boost::recursive_mutex::scoped_try_lock member_spinlock;
00101   boost::thread_group threads_;
00102 
00103   uint32_t thread_count_;
00104   CallbackQueue* callback_queue_;
00105 
00106   volatile bool continue_;
00107 
00108   ros::NodeHandle nh_;
00109 };
00110 
00111 AsyncSpinnerImpl::AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue)
00112 : thread_count_(thread_count)
00113 , callback_queue_(queue)
00114 , continue_(false)
00115 {
00116   if (thread_count == 0)
00117   {
00118     thread_count_ = boost::thread::hardware_concurrency();
00119 
00120     if (thread_count_ == 0)
00121     {
00122       thread_count_ = 1;
00123     }
00124   }
00125 
00126   if (!queue)
00127   {
00128     callback_queue_ = getGlobalCallbackQueue();
00129   }
00130 }
00131 
00132 AsyncSpinnerImpl::~AsyncSpinnerImpl()
00133 {
00134   stop();
00135 }
00136 
00137 bool AsyncSpinnerImpl::canStart()
00138 {
00139   boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00140   return spinlock.owns_lock();
00141 }
00142 
00143 void AsyncSpinnerImpl::start()
00144 {
00145   boost::mutex::scoped_lock lock(mutex_);
00146 
00147   if (continue_)
00148     return;
00149 
00150   boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00151   if (!spinlock.owns_lock()) {
00152     ROS_WARN("AsyncSpinnerImpl: Attempt to start() an AsyncSpinner failed "
00153              "because another AsyncSpinner is already running. Note that the "
00154              "other AsyncSpinner might not be using the same callback queue "
00155              "as this AsyncSpinner, in which case no callbacks in your "
00156              "callback queue will be serviced.");
00157     return;
00158   }
00159   spinlock.swap(member_spinlock);
00160 
00161   continue_ = true;
00162 
00163   for (uint32_t i = 0; i < thread_count_; ++i)
00164   {
00165     threads_.create_thread(boost::bind(&AsyncSpinnerImpl::threadFunc, this));
00166   }
00167 }
00168 
00169 void AsyncSpinnerImpl::stop()
00170 {
00171   boost::mutex::scoped_lock lock(mutex_);
00172   if (!continue_)
00173     return;
00174 
00175   ROS_ASSERT_MSG(member_spinlock.owns_lock(), 
00176                  "Async spinner's member lock doesn't own the global spinlock, hrm.");
00177   ROS_ASSERT_MSG(member_spinlock.mutex() == &spinmutex, 
00178                  "Async spinner's member lock owns a lock on the wrong mutex?!?!?");
00179   member_spinlock.unlock();
00180 
00181   continue_ = false;
00182   threads_.join_all();
00183 }
00184 
00185 void AsyncSpinnerImpl::threadFunc()
00186 {
00187   disableAllSignalsInThisThread();
00188 
00189   CallbackQueue* queue = callback_queue_;
00190   bool use_call_available = thread_count_ == 1;
00191   WallDuration timeout(0.1);
00192 
00193   while (continue_ && nh_.ok())
00194   {
00195     if (use_call_available)
00196     {
00197       queue->callAvailable(timeout);
00198     }
00199     else
00200     {
00201       queue->callOne(timeout);
00202     }
00203   }
00204 }
00205 
00206 AsyncSpinner::AsyncSpinner(uint32_t thread_count)
00207 : impl_(new AsyncSpinnerImpl(thread_count, 0))
00208 {
00209 }
00210 
00211 AsyncSpinner::AsyncSpinner(uint32_t thread_count, CallbackQueue* queue)
00212 : impl_(new AsyncSpinnerImpl(thread_count, queue))
00213 {
00214 }
00215 
00216 bool AsyncSpinner::canStart()
00217 {
00218   return impl_->canStart();
00219 }
00220 
00221 void AsyncSpinner::start()
00222 {
00223   impl_->start();
00224 }
00225 
00226 void AsyncSpinner::stop()
00227 {
00228   impl_->stop();
00229 }
00230 
00231 }


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 03:44:47