00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/spinner.h"
00029 #include "ros/ros.h"
00030 #include "ros/callback_queue.h"
00031
00032 #include <boost/thread/thread.hpp>
00033 #include <boost/thread/recursive_mutex.hpp>
00034
00035 namespace {
00036 boost::recursive_mutex spinmutex;
00037 }
00038
00039 namespace ros
00040 {
00041
00042
00043 void SingleThreadedSpinner::spin(CallbackQueue* queue)
00044 {
00045 boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00046 if(not spinlock.owns_lock()) {
00047 ROS_ERROR("SingleThreadedSpinner: You've attempted to call spin "
00048 "from multiple threads. Use a MultiThreadedSpinner instead.");
00049 return;
00050 }
00051
00052 ros::WallDuration timeout(0.1f);
00053
00054 if (!queue)
00055 {
00056 queue = getGlobalCallbackQueue();
00057 }
00058
00059 ros::NodeHandle n;
00060 while (n.ok())
00061 {
00062 queue->callAvailable(timeout);
00063 }
00064 }
00065
00066 MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)
00067 : thread_count_(thread_count)
00068 {
00069 }
00070
00071 void MultiThreadedSpinner::spin(CallbackQueue* queue)
00072 {
00073 boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00074 if (not spinlock.owns_lock()) {
00075 ROS_ERROR("MultiThreadeSpinner: You've attempted to call ros::spin "
00076 "from multiple threads... "
00077 "but this spinner is already multithreaded.");
00078 return;
00079 }
00080 AsyncSpinner s(thread_count_, queue);
00081 s.start();
00082
00083 ros::waitForShutdown();
00084 }
00085
00086 class AsyncSpinnerImpl
00087 {
00088 public:
00089 AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue);
00090 ~AsyncSpinnerImpl();
00091 void start();
00092 void stop();
00093
00094 private:
00095 void threadFunc();
00096
00097 boost::mutex mutex_;
00098 boost::recursive_mutex::scoped_try_lock member_spinlock;
00099 boost::thread_group threads_;
00100
00101 uint32_t thread_count_;
00102 CallbackQueue* callback_queue_;
00103
00104 volatile bool continue_;
00105
00106 ros::NodeHandle nh_;
00107 };
00108
00109 AsyncSpinnerImpl::AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue)
00110 : thread_count_(thread_count)
00111 , callback_queue_(queue)
00112 , continue_(false)
00113 {
00114 if (thread_count == 0)
00115 {
00116 thread_count_ = boost::thread::hardware_concurrency();
00117
00118 if (thread_count_ == 0)
00119 {
00120 thread_count_ = 1;
00121 }
00122 }
00123
00124 if (!queue)
00125 {
00126 callback_queue_ = getGlobalCallbackQueue();
00127 }
00128 }
00129
00130 AsyncSpinnerImpl::~AsyncSpinnerImpl()
00131 {
00132 stop();
00133 }
00134
00135 void AsyncSpinnerImpl::start()
00136 {
00137 boost::mutex::scoped_lock lock(mutex_);
00138
00139 if (continue_)
00140 return;
00141
00142 boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
00143 if (not spinlock.owns_lock()) {
00144 ROS_ERROR("AsyncSpinnerImpl: Attempt to call spin from multiple "
00145 "threads. We already spin multithreaded.");
00146 return;
00147 }
00148 spinlock.swap(member_spinlock);
00149
00150 continue_ = true;
00151
00152 for (uint32_t i = 0; i < thread_count_; ++i)
00153 {
00154 threads_.create_thread(boost::bind(&AsyncSpinnerImpl::threadFunc, this));
00155 }
00156 }
00157
00158 void AsyncSpinnerImpl::stop()
00159 {
00160 boost::mutex::scoped_lock lock(mutex_);
00161 if (!continue_)
00162 return;
00163
00164 ROS_ASSERT_MSG(member_spinlock.owns_lock(),
00165 "Async spinner's member lock doesn't own the global spinlock, hrm.");
00166 ROS_ASSERT_MSG(member_spinlock.mutex() == &spinmutex,
00167 "Async spinner's member lock owns a lock on the wrong mutex?!?!?");
00168 member_spinlock.unlock();
00169
00170 continue_ = false;
00171 threads_.join_all();
00172 }
00173
00174 void AsyncSpinnerImpl::threadFunc()
00175 {
00176 disableAllSignalsInThisThread();
00177
00178 CallbackQueue* queue = callback_queue_;
00179 bool use_call_available = thread_count_ == 1;
00180 WallDuration timeout(0.1);
00181
00182 while (continue_ && nh_.ok())
00183 {
00184 if (use_call_available)
00185 {
00186 queue->callAvailable(timeout);
00187 }
00188 else
00189 {
00190 queue->callOne(timeout);
00191 }
00192 }
00193 }
00194
00195 AsyncSpinner::AsyncSpinner(uint32_t thread_count)
00196 : impl_(new AsyncSpinnerImpl(thread_count, 0))
00197 {
00198 }
00199
00200 AsyncSpinner::AsyncSpinner(uint32_t thread_count, CallbackQueue* queue)
00201 : impl_(new AsyncSpinnerImpl(thread_count, queue))
00202 {
00203 }
00204
00205 void AsyncSpinner::start()
00206 {
00207 impl_->start();
00208 }
00209
00210 void AsyncSpinner::stop()
00211 {
00212 impl_->stop();
00213 }
00214
00215 }