Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <nodelet/detail/callback_queue_manager.h>
00031 #include <nodelet/detail/callback_queue.h>
00032 #include <ros/callback_queue.h>
00033
00034 #include <boost/thread/thread.hpp>
00035 #include <boost/thread/condition_variable.hpp>
00036 #include <boost/type_traits/alignment_of.hpp>
00037 #include <boost/bind.hpp>
00038
00039 #include <ros/assert.h>
00040
00041 namespace nodelet
00042 {
00043 namespace detail
00044 {
00045
00046 CallbackQueueManager::CallbackQueueManager(uint32_t num_worker_threads)
00047 : running_(true),
00048 num_worker_threads_(num_worker_threads)
00049 {
00050 if (num_worker_threads_ == 0)
00051 num_worker_threads_ = boost::thread::hardware_concurrency();
00052
00053 tg_.create_thread(boost::bind(&CallbackQueueManager::managerThread, this));
00054
00055 size_t num_threads = getNumWorkerThreads();
00056 thread_info_.reset( new ThreadInfo[num_threads] );
00057 for (size_t i = 0; i < num_threads; ++i)
00058 {
00059 tg_.create_thread(boost::bind(&CallbackQueueManager::workerThread, this, &thread_info_[i]));
00060 }
00061 }
00062
00063 CallbackQueueManager::~CallbackQueueManager()
00064 {
00065 stop();
00066
00067 #ifdef NODELET_QUEUE_DEBUG
00068
00069 typedef ThreadInfo::Record Record;
00070 for (size_t i = 0; i < num_threads; ++i)
00071 {
00072 char filename[128];
00073 sprintf(filename, "thread_history_%d.txt", (int)i);
00074 FILE* file = fopen(filename, "w");
00075 fprintf(file, "# timestamps tasks threaded\n");
00076 const std::vector<Record>& history = thread_info_[i].history;
00077 for (int j = 0; j < (int)history.size(); ++j)
00078 {
00079 Record r = history[j];
00080 fprintf(file, "%.6f %u %d\n", r.stamp, r.tasks, (int)r.threaded);
00081 }
00082 fclose(file);
00083 }
00084 #endif
00085 }
00086
00087 void CallbackQueueManager::stop()
00088 {
00089 running_ = false;
00090 {
00091 boost::mutex::scoped_lock lock(waiting_mutex_);
00092 waiting_cond_.notify_all();
00093 }
00094
00095 size_t num_threads = getNumWorkerThreads();
00096 for (size_t i = 0; i < num_threads; ++i)
00097 {
00098 boost::mutex::scoped_lock lock(thread_info_[i].queue_mutex);
00099 thread_info_[i].queue_cond.notify_all();
00100 }
00101
00102 tg_.join_all();
00103 }
00104
00105 uint32_t CallbackQueueManager::getNumWorkerThreads()
00106 {
00107 return num_worker_threads_;
00108 }
00109
00110 void CallbackQueueManager::addQueue(const CallbackQueuePtr& queue, bool threaded)
00111 {
00112 boost::mutex::scoped_lock lock(queues_mutex_);
00113
00114 QueueInfoPtr& info = queues_[queue.get()];
00115 ROS_ASSERT(!info);
00116 info.reset(new QueueInfo);
00117 info->queue = queue;
00118 info->threaded = threaded;
00119 }
00120
00121 void CallbackQueueManager::removeQueue(const CallbackQueuePtr& queue)
00122 {
00123 boost::mutex::scoped_lock lock(queues_mutex_);
00124 ROS_ASSERT(queues_.find(queue.get()) != queues_.end());
00125
00126 queues_.erase(queue.get());
00127 }
00128
00129 void CallbackQueueManager::callbackAdded(const CallbackQueuePtr& queue)
00130 {
00131 {
00132 boost::mutex::scoped_lock lock(waiting_mutex_);
00133 waiting_.push_back(queue);
00134 }
00135
00136 waiting_cond_.notify_all();
00137 }
00138
00139 CallbackQueueManager::ThreadInfo* CallbackQueueManager::getSmallestQueue()
00140 {
00141 size_t smallest = std::numeric_limits<size_t>::max();
00142 uint32_t smallest_index = 0xffffffff;
00143 for (unsigned i = 0; i < num_worker_threads_; ++i)
00144 {
00145 ThreadInfo& ti = thread_info_[i];
00146
00147 size_t size = ti.calling;
00148 if (size == 0)
00149 {
00150 return &ti;
00151 }
00152
00153 if (size < smallest)
00154 {
00155 smallest = size;
00156 smallest_index = i;
00157 }
00158 }
00159
00160 return &thread_info_[smallest_index];
00161 }
00162
00163 void CallbackQueueManager::managerThread()
00164 {
00165 V_Queue local_waiting;
00166
00167 while (running_)
00168 {
00169 {
00170 boost::mutex::scoped_lock lock(waiting_mutex_);
00171
00172 while (waiting_.empty() && running_)
00173 {
00174 waiting_cond_.wait(lock);
00175 }
00176
00177 if (!running_)
00178 {
00179 return;
00180 }
00181
00182 local_waiting.swap(waiting_);
00183 }
00184
00185 {
00186 boost::mutex::scoped_lock lock(queues_mutex_);
00187
00188 V_Queue::iterator it = local_waiting.begin();
00189 V_Queue::iterator end = local_waiting.end();
00190 for (; it != end; ++it)
00191 {
00192 CallbackQueuePtr& queue = *it;
00193
00194 M_Queue::iterator it = queues_.find(queue.get());
00195 if (it != queues_.end())
00196 {
00197 QueueInfoPtr& info = it->second;
00198 ThreadInfo* ti = 0;
00199 if (info->threaded)
00200 {
00201
00202 ti = getSmallestQueue();
00203 }
00204 else
00205 {
00206
00207
00208
00209 boost::mutex::scoped_lock lock(info->st_mutex);
00210
00211 if (info->in_thread == 0)
00212 {
00213 ti = getSmallestQueue();
00214 info->thread_index = ti - thread_info_.get();
00215 }
00216 else
00217 {
00218 ti = &thread_info_[info->thread_index];
00219 }
00220
00221 ++info->in_thread;
00222 }
00223
00224 {
00225 boost::mutex::scoped_lock lock(ti->queue_mutex);
00226 ti->queue.push_back(std::make_pair(queue, info));
00227 ++ti->calling;
00228 #ifdef NODELET_QUEUE_DEBUG
00229 double stamp = ros::WallTime::now().toSec();
00230 uint32_t tasks = ti->calling;
00231 ti->history.push_back(ThreadInfo::Record(stamp, tasks, true));
00232 #endif
00233 }
00234
00235 ti->queue_cond.notify_all();
00236 }
00237 }
00238 }
00239
00240 local_waiting.clear();
00241 }
00242 }
00243
00244 void CallbackQueueManager::workerThread(ThreadInfo* info)
00245 {
00246 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > local_queues;
00247
00248 while (running_)
00249 {
00250 {
00251 boost::mutex::scoped_lock lock(info->queue_mutex);
00252
00253 while (info->queue.empty() && running_)
00254 {
00255 info->queue_cond.wait(lock);
00256 }
00257
00258 if (!running_)
00259 {
00260 return;
00261 }
00262
00263 info->queue.swap(local_queues);
00264 }
00265
00266 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator it = local_queues.begin();
00267 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator end = local_queues.end();
00268 for (; it != end; ++it)
00269 {
00270 CallbackQueuePtr& queue = it->first;
00271 QueueInfoPtr& qi = it->second;
00272 if (queue->callOne() == ros::CallbackQueue::TryAgain)
00273 {
00274 callbackAdded(queue);
00275 }
00276 --info->calling;
00277
00278 if (!qi->threaded)
00279 {
00280 boost::mutex::scoped_lock lock(qi->st_mutex);
00281 --qi->in_thread;
00282 }
00283 }
00284
00285 local_queues.clear();
00286
00287 }
00288 }
00289
00290 }
00291 }