Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/spinner.h"
00029 #include "ros/ros.h"
00030 #include "ros/callback_queue.h"
00031
00032 #include <boost/thread/thread.hpp>
00033 #include <boost/thread/recursive_mutex.hpp>
00034
00035 namespace {
00036 boost::recursive_mutex spinmutex;
00037 }
00038
00039 namespace ros
00040 {
00041
00042
00043 void SingleThreadedSpinner::spin(CallbackQueue* queue)
00044 {
00045 boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00046 if(!spinlock.owns_lock()) {
00047 ROS_ERROR("SingleThreadedSpinner: You've attempted to call spin "
00048 "from multiple threads. Use a MultiThreadedSpinner instead.");
00049 return;
00050 }
00051
00052 ros::WallDuration timeout(0.1f);
00053
00054 if (!queue)
00055 {
00056 queue = getGlobalCallbackQueue();
00057 }
00058
00059 ros::NodeHandle n;
00060 while (n.ok())
00061 {
00062 queue->callAvailable(timeout);
00063 }
00064 }
00065
00066 MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)
00067 : thread_count_(thread_count)
00068 {
00069 }
00070
00071 void MultiThreadedSpinner::spin(CallbackQueue* queue)
00072 {
00073 boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00074 if (!spinlock.owns_lock()) {
00075 ROS_ERROR("MultiThreadeSpinner: You've attempted to call ros::spin "
00076 "from multiple threads... "
00077 "but this spinner is already multithreaded.");
00078 return;
00079 }
00080 AsyncSpinner s(thread_count_, queue);
00081 s.start();
00082
00083 ros::waitForShutdown();
00084 }
00085
00086 class AsyncSpinnerImpl
00087 {
00088 public:
00089 AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue);
00090 ~AsyncSpinnerImpl();
00091 void start();
00092 void stop();
00093
00094 private:
00095 void threadFunc();
00096
00097 boost::mutex mutex_;
00098 boost::recursive_mutex::scoped_try_lock member_spinlock;
00099 boost::thread_group threads_;
00100
00101 uint32_t thread_count_;
00102 CallbackQueue* callback_queue_;
00103
00104 volatile bool continue_;
00105
00106 ros::NodeHandle nh_;
00107 };
00108
00109 AsyncSpinnerImpl::AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue)
00110 : thread_count_(thread_count)
00111 , callback_queue_(queue)
00112 , continue_(false)
00113 {
00114 if (thread_count == 0)
00115 {
00116 thread_count_ = boost::thread::hardware_concurrency();
00117
00118 if (thread_count_ == 0)
00119 {
00120 thread_count_ = 1;
00121 }
00122 }
00123
00124 if (!queue)
00125 {
00126 callback_queue_ = getGlobalCallbackQueue();
00127 }
00128 }
00129
00130 AsyncSpinnerImpl::~AsyncSpinnerImpl()
00131 {
00132 stop();
00133 }
00134
00135 void AsyncSpinnerImpl::start()
00136 {
00137 boost::mutex::scoped_lock lock(mutex_);
00138
00139 if (continue_)
00140 return;
00141
00142 boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00143 if (!spinlock.owns_lock()) {
00144 ROS_ERROR("AsyncSpinnerImpl: Attempt to call spin from multiple "
00145 "threads. We already spin multithreaded.");
00146 return;
00147 }
00148 spinlock.swap(member_spinlock);
00149
00150 continue_ = true;
00151
00152 for (uint32_t i = 0; i < thread_count_; ++i)
00153 {
00154 threads_.create_thread(boost::bind(&AsyncSpinnerImpl::threadFunc, this));
00155 }
00156 }
00157
00158 void AsyncSpinnerImpl::stop()
00159 {
00160 boost::mutex::scoped_lock lock(mutex_);
00161 if (!continue_)
00162 return;
00163
00164 ROS_ASSERT_MSG(member_spinlock.owns_lock(),
00165 "Async spinner's member lock doesn't own the global spinlock, hrm.");
00166 ROS_ASSERT_MSG(member_spinlock.mutex() == &spinmutex,
00167 "Async spinner's member lock owns a lock on the wrong mutex?!?!?");
00168 member_spinlock.unlock();
00169
00170 continue_ = false;
00171 threads_.join_all();
00172 }
00173
00174 void AsyncSpinnerImpl::threadFunc()
00175 {
00176 disableAllSignalsInThisThread();
00177
00178 CallbackQueue* queue = callback_queue_;
00179 bool use_call_available = thread_count_ == 1;
00180 WallDuration timeout(0.1);
00181
00182 while (continue_ && nh_.ok())
00183 {
00184 if (use_call_available)
00185 {
00186 queue->callAvailable(timeout);
00187 }
00188 else
00189 {
00190 queue->callOne(timeout);
00191 }
00192 }
00193 }
00194
00195 AsyncSpinner::AsyncSpinner(uint32_t thread_count)
00196 : impl_(new AsyncSpinnerImpl(thread_count, 0))
00197 {
00198 }
00199
00200 AsyncSpinner::AsyncSpinner(uint32_t thread_count, CallbackQueue* queue)
00201 : impl_(new AsyncSpinnerImpl(thread_count, queue))
00202 {
00203 }
00204
00205 void AsyncSpinner::start()
00206 {
00207 impl_->start();
00208 }
00209
00210 void AsyncSpinner::stop()
00211 {
00212 impl_->stop();
00213 }
00214
00215 }
roscpp
Author(s): Morgan Quigley mquigley@cs.stanford.edu, Josh Faust jfaust@willowgarage.com, Brian Gerkey gerkey@willowgarage.com, Troy Straszheim straszheim@willowgarage.com
autogenerated on Sat Dec 28 2013 17:35:52