spinner.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/spinner.h"
00029 #include "ros/ros.h"
00030 #include "ros/callback_queue.h"
00031 
00032 #include <boost/thread/thread.hpp>
00033 #include <boost/thread/recursive_mutex.hpp>
00034 
00035 namespace {
00036   boost::recursive_mutex spinmutex;
00037 }
00038 
00039 namespace ros
00040 {
00041 
00042 
00043 void SingleThreadedSpinner::spin(CallbackQueue* queue)
00044 {
00045   boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00046   if(!spinlock.owns_lock()) {
00047     ROS_ERROR("SingleThreadedSpinner: You've attempted to call spin "
00048               "from multiple threads.  Use a MultiThreadedSpinner instead.");
00049     return;
00050   }
00051 
00052   ros::WallDuration timeout(0.1f);
00053 
00054   if (!queue)
00055   {
00056     queue = getGlobalCallbackQueue();
00057   }
00058 
00059   ros::NodeHandle n;
00060   while (n.ok())
00061   {
00062     queue->callAvailable(timeout);
00063   }
00064 }
00065 
00066 MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)
00067 : thread_count_(thread_count)
00068 {
00069 }
00070 
00071 void MultiThreadedSpinner::spin(CallbackQueue* queue)
00072 {
00073   boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00074   if (!spinlock.owns_lock()) {
00075     ROS_ERROR("MultiThreadeSpinner: You've attempted to call ros::spin "
00076               "from multiple threads... "
00077               "but this spinner is already multithreaded.");
00078     return;
00079   }
00080   AsyncSpinner s(thread_count_, queue);
00081   s.start();
00082 
00083   ros::waitForShutdown();
00084 }
00085 
00086 class AsyncSpinnerImpl
00087 {
00088 public:
00089   AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue);
00090   ~AsyncSpinnerImpl();
00091   void start();
00092   void stop();
00093 
00094 private:
00095   void threadFunc();
00096 
00097   boost::mutex mutex_;
00098   boost::recursive_mutex::scoped_try_lock member_spinlock;
00099   boost::thread_group threads_;
00100 
00101   uint32_t thread_count_;
00102   CallbackQueue* callback_queue_;
00103 
00104   volatile bool continue_;
00105 
00106   ros::NodeHandle nh_;
00107 };
00108 
00109 AsyncSpinnerImpl::AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue)
00110 : thread_count_(thread_count)
00111 , callback_queue_(queue)
00112 , continue_(false)
00113 {
00114   if (thread_count == 0)
00115   {
00116     thread_count_ = boost::thread::hardware_concurrency();
00117 
00118     if (thread_count_ == 0)
00119     {
00120       thread_count_ = 1;
00121     }
00122   }
00123 
00124   if (!queue)
00125   {
00126     callback_queue_ = getGlobalCallbackQueue();
00127   }
00128 }
00129 
00130 AsyncSpinnerImpl::~AsyncSpinnerImpl()
00131 {
00132   stop();
00133 }
00134 
00135 void AsyncSpinnerImpl::start()
00136 {
00137   boost::mutex::scoped_lock lock(mutex_);
00138 
00139   if (continue_)
00140     return;
00141 
00142   boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00143   if (!spinlock.owns_lock()) {
00144     ROS_ERROR("AsyncSpinnerImpl: Attempt to call spin from multiple "
00145               "threads.  We already spin multithreaded.");
00146     return;
00147   }
00148   spinlock.swap(member_spinlock);
00149 
00150   continue_ = true;
00151 
00152   for (uint32_t i = 0; i < thread_count_; ++i)
00153   {
00154     threads_.create_thread(boost::bind(&AsyncSpinnerImpl::threadFunc, this));
00155   }
00156 }
00157 
00158 void AsyncSpinnerImpl::stop()
00159 {
00160   boost::mutex::scoped_lock lock(mutex_);
00161   if (!continue_)
00162     return;
00163 
00164   ROS_ASSERT_MSG(member_spinlock.owns_lock(), 
00165                  "Async spinner's member lock doesn't own the global spinlock, hrm.");
00166   ROS_ASSERT_MSG(member_spinlock.mutex() == &spinmutex, 
00167                  "Async spinner's member lock owns a lock on the wrong mutex?!?!?");
00168   member_spinlock.unlock();
00169 
00170   continue_ = false;
00171   threads_.join_all();
00172 }
00173 
00174 void AsyncSpinnerImpl::threadFunc()
00175 {
00176   disableAllSignalsInThisThread();
00177 
00178   CallbackQueue* queue = callback_queue_;
00179   bool use_call_available = thread_count_ == 1;
00180   WallDuration timeout(0.1);
00181 
00182   while (continue_ && nh_.ok())
00183   {
00184     if (use_call_available)
00185     {
00186       queue->callAvailable(timeout);
00187     }
00188     else
00189     {
00190       queue->callOne(timeout);
00191     }
00192   }
00193 }
00194 
00195 AsyncSpinner::AsyncSpinner(uint32_t thread_count)
00196 : impl_(new AsyncSpinnerImpl(thread_count, 0))
00197 {
00198 }
00199 
00200 AsyncSpinner::AsyncSpinner(uint32_t thread_count, CallbackQueue* queue)
00201 : impl_(new AsyncSpinnerImpl(thread_count, queue))
00202 {
00203 }
00204 
00205 void AsyncSpinner::start()
00206 {
00207   impl_->start();
00208 }
00209 
00210 void AsyncSpinner::stop()
00211 {
00212   impl_->stop();
00213 }
00214 
00215 }


roscpp
Author(s): Morgan Quigley mquigley@cs.stanford.edu, Josh Faust jfaust@willowgarage.com, Brian Gerkey gerkey@willowgarage.com, Troy Straszheim straszheim@willowgarage.com
autogenerated on Sat Dec 28 2013 17:35:52