00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <nodelet/detail/callback_queue_manager.h>
00031 #include <nodelet/detail/callback_queue.h>
00032 #include <ros/callback_queue.h>
00033
00034 #include <boost/thread/thread.hpp>
00035 #include <boost/thread/condition_variable.hpp>
00036 #include <boost/type_traits/alignment_of.hpp>
00037 #include <boost/bind.hpp>
00038
00039 #include <ros/assert.h>
00040
00041 namespace nodelet
00042 {
00043 namespace detail
00044 {
00045
00046 CallbackQueueManager::CallbackQueueManager(uint32_t num_worker_threads)
00047 : running_(true),
00048 num_worker_threads_(num_worker_threads)
00049 {
00050 tg_.create_thread(boost::bind(&CallbackQueueManager::managerThread, this));
00051
00052 size_t num_threads = getNumWorkerThreads();
00053 thread_info_.reset( new ThreadInfo[num_threads] );
00054 for (size_t i = 0; i < num_threads; ++i)
00055 {
00056 tg_.create_thread(boost::bind(&CallbackQueueManager::workerThread, this, &thread_info_[i]));
00057 }
00058 }
00059
00060 CallbackQueueManager::~CallbackQueueManager()
00061 {
00062 stop();
00063
00064 #ifdef NODELET_QUEUE_DEBUG
00065
00066 typedef ThreadInfo::Record Record;
00067 for (size_t i = 0; i < num_threads; ++i)
00068 {
00069 char filename[128];
00070 sprintf(filename, "thread_history_%d.txt", (int)i);
00071 FILE* file = fopen(filename, "w");
00072 fprintf(file, "# timestamps tasks threaded\n");
00073 const std::vector<Record>& history = thread_info_[i].history;
00074 for (int j = 0; j < (int)history.size(); ++j)
00075 {
00076 Record r = history[j];
00077 fprintf(file, "%.6f %u %d\n", r.stamp, r.tasks, (int)r.threaded);
00078 }
00079 fclose(file);
00080 }
00081 #endif
00082 }
00083
00084 void CallbackQueueManager::stop()
00085 {
00086 running_ = false;
00087 {
00088 boost::mutex::scoped_lock lock(waiting_mutex_);
00089 waiting_cond_.notify_all();
00090 }
00091
00092 size_t num_threads = getNumWorkerThreads();
00093 for (size_t i = 0; i < num_threads; ++i)
00094 {
00095 boost::mutex::scoped_lock lock(thread_info_[i].queue_mutex);
00096 thread_info_[i].queue_cond.notify_all();
00097 }
00098
00099 tg_.join_all();
00100 }
00101
00102 uint32_t CallbackQueueManager::getNumWorkerThreads()
00103 {
00104 return num_worker_threads_;
00105 }
00106
00107 void CallbackQueueManager::addQueue(const CallbackQueuePtr& queue, bool threaded)
00108 {
00109 boost::mutex::scoped_lock lock(queues_mutex_);
00110
00111 QueueInfoPtr& info = queues_[queue.get()];
00112 ROS_ASSERT(!info);
00113 info.reset(new QueueInfo);
00114 info->queue = queue;
00115 info->threaded = threaded;
00116 }
00117
00118 void CallbackQueueManager::removeQueue(const CallbackQueuePtr& queue)
00119 {
00120 boost::mutex::scoped_lock lock(queues_mutex_);
00121 ROS_ASSERT(queues_.find(queue.get()) != queues_.end());
00122
00123 queues_.erase(queue.get());
00124 }
00125
00126 void CallbackQueueManager::callbackAdded(const CallbackQueuePtr& queue)
00127 {
00128 {
00129 boost::mutex::scoped_lock lock(waiting_mutex_);
00130 waiting_.push_back(queue);
00131 }
00132
00133 waiting_cond_.notify_all();
00134 }
00135
00136 CallbackQueueManager::ThreadInfo* CallbackQueueManager::getSmallestQueue()
00137 {
00138 size_t smallest = std::numeric_limits<size_t>::max();
00139 uint32_t smallest_index = 0xffffffff;
00140 for (unsigned i = 0; i < num_worker_threads_; ++i)
00141 {
00142 ThreadInfo& ti = thread_info_[i];
00143
00144 size_t size = ti.calling;
00145 if (size == 0)
00146 {
00147 return &ti;
00148 }
00149
00150 if (size < smallest)
00151 {
00152 smallest = size;
00153 smallest_index = i;
00154 }
00155 }
00156
00157 return &thread_info_[smallest_index];
00158 }
00159
00160 void CallbackQueueManager::managerThread()
00161 {
00162 V_Queue local_waiting;
00163
00164 while (running_)
00165 {
00166 {
00167 boost::mutex::scoped_lock lock(waiting_mutex_);
00168
00169 while (waiting_.empty() && running_)
00170 {
00171 waiting_cond_.wait(lock);
00172 }
00173
00174 if (!running_)
00175 {
00176 return;
00177 }
00178
00179 local_waiting.swap(waiting_);
00180 }
00181
00182 {
00183 boost::mutex::scoped_lock lock(queues_mutex_);
00184
00185 V_Queue::iterator it = local_waiting.begin();
00186 V_Queue::iterator end = local_waiting.end();
00187 for (; it != end; ++it)
00188 {
00189 CallbackQueuePtr& queue = *it;
00190
00191 M_Queue::iterator it = queues_.find(queue.get());
00192 if (it != queues_.end())
00193 {
00194 QueueInfoPtr& info = it->second;
00195 ThreadInfo* ti = 0;
00196 if (info->threaded)
00197 {
00198
00199 ti = getSmallestQueue();
00200 }
00201 else
00202 {
00203
00204
00205
00206 boost::mutex::scoped_lock lock(info->st_mutex);
00207
00208 if (info->in_thread == 0)
00209 {
00210 ti = getSmallestQueue();
00211 info->thread_index = ti - thread_info_.get();
00212 }
00213 else
00214 {
00215 ti = &thread_info_[info->thread_index];
00216 }
00217
00218 ++info->in_thread;
00219 }
00220
00221 {
00222 boost::mutex::scoped_lock lock(ti->queue_mutex);
00223 ti->queue.push_back(std::make_pair(queue, info));
00224 ++ti->calling;
00225 #ifdef NODELET_QUEUE_DEBUG
00226 double stamp = ros::WallTime::now().toSec();
00227 uint32_t tasks = ti->calling;
00228 ti->history.push_back(ThreadInfo::Record(stamp, tasks, true));
00229 #endif
00230 }
00231
00232 ti->queue_cond.notify_all();
00233 }
00234 }
00235 }
00236
00237 local_waiting.clear();
00238 }
00239 }
00240
00241 void CallbackQueueManager::workerThread(ThreadInfo* info)
00242 {
00243 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > local_queues;
00244
00245 while (running_)
00246 {
00247 {
00248 boost::mutex::scoped_lock lock(info->queue_mutex);
00249
00250 while (info->queue.empty() && running_)
00251 {
00252 info->queue_cond.wait(lock);
00253 }
00254
00255 if (!running_)
00256 {
00257 return;
00258 }
00259
00260 info->queue.swap(local_queues);
00261 }
00262
00263 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator it = local_queues.begin();
00264 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator end = local_queues.end();
00265 for (; it != end; ++it)
00266 {
00267 CallbackQueuePtr& queue = it->first;
00268 QueueInfoPtr& qi = it->second;
00269 if (queue->callOne() == ros::CallbackQueue::TryAgain)
00270 {
00271 callbackAdded(queue);
00272 }
00273 --info->calling;
00274
00275 if (!qi->threaded)
00276 {
00277 boost::mutex::scoped_lock lock(qi->st_mutex);
00278 --qi->in_thread;
00279 }
00280 }
00281
00282 local_queues.clear();
00283
00284 }
00285 }
00286
00287 }
00288 }