00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <nodelet/detail/callback_queue_manager.h>
00031 #include <nodelet/detail/callback_queue.h>
00032 #include <ros/callback_queue.h>
00033
00034 #include <boost/thread/thread.hpp>
00035 #include <boost/thread/condition_variable.hpp>
00036
00037 #include <boost/bind.hpp>
00038
00039 #include <ros/assert.h>
00040
00041 namespace nodelet
00042 {
00043 namespace detail
00044 {
00045
00046 CallbackQueueManager::CallbackQueueManager(uint32_t num_worker_threads)
00047 : running_(true),
00048 num_worker_threads_(num_worker_threads)
00049 {
00050 tg_.create_thread(boost::bind(&CallbackQueueManager::managerThread, this));
00051
00052 size_t num_threads = getNumWorkerThreads();
00053 thread_info_.resize(num_threads);
00054 for (size_t i = 0; i < num_threads; ++i)
00055 {
00056 tg_.create_thread(boost::bind(&CallbackQueueManager::workerThread, this, &thread_info_[i]));
00057 }
00058 }
00059
00060 CallbackQueueManager::~CallbackQueueManager()
00061 {
00062 running_ = false;
00063 {
00064 boost::mutex::scoped_lock lock(waiting_mutex_);
00065 waiting_cond_.notify_all();
00066 }
00067
00068 size_t num_threads = getNumWorkerThreads();
00069 for (size_t i = 0; i < num_threads; ++i)
00070 {
00071 boost::mutex::scoped_lock lock(*thread_info_[i].queue_mutex);
00072 thread_info_[i].queue_cond->notify_all();
00073 }
00074
00075 tg_.join_all();
00076 }
00077
00078 uint32_t CallbackQueueManager::getNumWorkerThreads()
00079 {
00080 return num_worker_threads_;
00081 }
00082
00083 void CallbackQueueManager::addQueue(const CallbackQueuePtr& queue, bool threaded)
00084 {
00085 boost::mutex::scoped_lock lock(queues_mutex_);
00086
00087 QueueInfoPtr& info = queues_[queue.get()];
00088 ROS_ASSERT(!info);
00089 info.reset(new QueueInfo);
00090 info->queue = queue;
00091 info->threaded = threaded;
00092 }
00093
00094 void CallbackQueueManager::removeQueue(const CallbackQueuePtr& queue)
00095 {
00096 boost::mutex::scoped_lock lock(queues_mutex_);
00097 ROS_ASSERT(queues_.find(queue.get()) != queues_.end());
00098
00099 queues_.erase(queue.get());
00100 }
00101
00102 void CallbackQueueManager::callbackAdded(const CallbackQueuePtr& queue)
00103 {
00104 {
00105 boost::mutex::scoped_lock lock(waiting_mutex_);
00106 waiting_.push_back(queue);
00107 }
00108
00109 waiting_cond_.notify_all();
00110 }
00111
00112 CallbackQueueManager::ThreadInfo* CallbackQueueManager::getSmallestQueue()
00113 {
00114 size_t smallest = std::numeric_limits<size_t>::max();
00115 uint32_t smallest_index = 0xffffffff;
00116 V_ThreadInfo::iterator it = thread_info_.begin();
00117 V_ThreadInfo::iterator end = thread_info_.end();
00118 for (; it != end; ++it)
00119 {
00120 ThreadInfo& ti = *it;
00121 boost::mutex::scoped_lock lock(*ti.queue_mutex);
00122
00123 size_t size = ti.queue.size() + ti.calling;
00124 if (size == 0)
00125 {
00126 return &ti;
00127 }
00128
00129 if (size < smallest)
00130 {
00131 smallest = size;
00132 smallest_index = it - thread_info_.begin();
00133 }
00134 }
00135
00136 return &thread_info_[smallest_index];
00137 }
00138
00139 void CallbackQueueManager::managerThread()
00140 {
00141 V_Queue local_waiting;
00142
00143 while (running_)
00144 {
00145 {
00146 boost::mutex::scoped_lock lock(waiting_mutex_);
00147
00148 while (waiting_.empty() && running_)
00149 {
00150 waiting_cond_.wait(lock);
00151 }
00152
00153 if (!running_)
00154 {
00155 return;
00156 }
00157
00158 local_waiting.swap(waiting_);
00159 }
00160
00161 {
00162 boost::mutex::scoped_lock lock(queues_mutex_);
00163
00164 V_Queue::iterator it = local_waiting.begin();
00165 V_Queue::iterator end = local_waiting.end();
00166 for (; it != end; ++it)
00167 {
00168 CallbackQueuePtr& queue = *it;
00169
00170 M_Queue::iterator it = queues_.find(queue.get());
00171 if (it != queues_.end())
00172 {
00173 QueueInfoPtr& info = it->second;
00174 ThreadInfo* ti = 0;
00175 if (info->threaded)
00176 {
00177
00178 ti = getSmallestQueue();
00179 boost::mutex::scoped_lock lock(*ti->queue_mutex);
00180 ti->queue.push_back(std::make_pair(queue, info));
00181 }
00182 else
00183 {
00184
00185
00186
00187 boost::mutex::scoped_lock lock(info->st_mutex);
00188
00189 ++info->in_thread;
00190
00191 if (info->in_thread > 1)
00192 {
00193 ti = &thread_info_[info->thread_index];
00194 boost::mutex::scoped_lock lock(*ti->queue_mutex);
00195 ti->queue.push_back(std::make_pair(queue, info));
00196 }
00197 else
00198 {
00199 ti = getSmallestQueue();
00200 info->thread_index = ti - &thread_info_.front();
00201 boost::mutex::scoped_lock lock(*ti->queue_mutex);
00202 ti->queue.push_back(std::make_pair(queue, info));
00203 }
00204 }
00205
00206 ti->queue_cond->notify_all();
00207 }
00208 }
00209 }
00210
00211 local_waiting.clear();
00212 }
00213 }
00214
00215 void CallbackQueueManager::workerThread(ThreadInfo* info)
00216 {
00217 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > local_queues;
00218
00219 while (running_)
00220 {
00221 {
00222 boost::mutex::scoped_lock lock(*info->queue_mutex);
00223
00224 info->calling = 0;
00225
00226 while (info->queue.empty() && running_)
00227 {
00228 info->queue_cond->wait(lock);
00229 }
00230
00231 if (!running_)
00232 {
00233 return;
00234 }
00235
00236 info->calling += info->queue.size();
00237 info->queue.swap(local_queues);
00238 }
00239
00240 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator it = local_queues.begin();
00241 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator end = local_queues.end();
00242 for (; it != end; ++it)
00243 {
00244 CallbackQueuePtr& queue = it->first;
00245 QueueInfoPtr& qi = it->second;
00246 if (queue->callOne() == ros::CallbackQueue::TryAgain)
00247 {
00248 callbackAdded(queue);
00249 }
00250
00251 if (!qi->threaded)
00252 {
00253 boost::mutex::scoped_lock lock(qi->st_mutex);
00254 --qi->in_thread;
00255 }
00256 }
00257
00258 local_queues.clear();
00259
00260 }
00261 }
00262
00263 }
00264 }