34 #include <boost/thread/thread.hpp> 35 #include <boost/thread/condition_variable.hpp> 36 #include <boost/type_traits/alignment_of.hpp> 37 #include <boost/bind.hpp> 48 num_worker_threads_(num_worker_threads)
57 for (
size_t i = 0; i < num_threads; ++i)
67 #ifdef NODELET_QUEUE_DEBUG 69 typedef ThreadInfo::Record Record;
70 for (
size_t i = 0; i < num_threads; ++i)
73 sprintf(filename,
"thread_history_%d.txt", (
int)i);
74 FILE* file = fopen(filename,
"w");
75 fprintf(file,
"# timestamps tasks threaded\n");
76 const std::vector<Record>& history =
thread_info_[i].history;
77 for (
int j = 0; j < (int)history.size(); ++j)
79 Record r = history[j];
80 fprintf(file,
"%.6f %u %d\n", r.stamp, r.tasks, (
int)r.threaded);
96 for (
size_t i = 0; i < num_threads; ++i)
98 boost::mutex::scoped_lock lock(
thread_info_[i].queue_mutex);
118 info->threaded = threaded;
141 size_t smallest = std::numeric_limits<size_t>::max();
142 uint32_t smallest_index = 0xffffffff;
188 V_Queue::iterator it = local_waiting.begin();
189 V_Queue::iterator end = local_waiting.end();
190 for (; it != end; ++it)
194 M_Queue::iterator it =
queues_.find(queue.get());
209 boost::mutex::scoped_lock lock(info->st_mutex);
211 if (info->in_thread == 0)
226 ti->
queue.push_back(std::make_pair(queue, info));
228 #ifdef NODELET_QUEUE_DEBUG 231 ti->history.push_back(ThreadInfo::Record(stamp, tasks,
true));
240 local_waiting.clear();
246 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > local_queues;
263 info->
queue.swap(local_queues);
266 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator it = local_queues.begin();
267 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator end = local_queues.end();
268 for (; it != end; ++it)
280 boost::mutex::scoped_lock lock(qi->st_mutex);
285 local_queues.clear();
boost::detail::atomic_count calling
void removeQueue(const CallbackQueuePtr &queue)
CallbackQueueManager(uint32_t num_worker_threads=0)
Constructor.
void addQueue(const CallbackQueuePtr &queue, bool threaded)
boost::condition_variable waiting_cond_
boost::condition_variable queue_cond
void callbackAdded(const CallbackQueuePtr &queue)
uint32_t num_worker_threads_
void workerThread(ThreadInfo *)
V_ThreadInfo thread_info_
std::vector< CallbackQueuePtr > V_Queue
std::vector< std::pair< CallbackQueuePtr, QueueInfoPtr > > queue
boost::mutex waiting_mutex_
ThreadInfo * getSmallestQueue()
boost::mutex queues_mutex_
uint32_t getNumWorkerThreads()