34 #include <boost/thread/thread.hpp>
35 #include <boost/thread/condition_variable.hpp>
36 #include <boost/type_traits/alignment_of.hpp>
37 #include <boost/bind/bind.hpp>
48 num_worker_threads_(num_worker_threads)
57 for (
size_t i = 0; i < num_threads; ++i)
67 #ifdef NODELET_QUEUE_DEBUG
69 typedef ThreadInfo::Record Record;
71 for (
size_t i = 0; i < num_threads; ++i)
74 sprintf(filename,
"thread_history_%d.txt", (
int)i);
75 FILE* file = fopen(filename,
"w");
76 fprintf(file,
"# timestamps tasks threaded\n");
77 const std::vector<Record>& history =
thread_info_[i].history;
78 for (
int j = 0; j < (int)history.size(); ++j)
80 Record r = history[j];
81 fprintf(file,
"%.6f %u %d\n", r.stamp, r.tasks, (
int)r.threaded);
97 for (
size_t i = 0; i < num_threads; ++i)
99 boost::mutex::scoped_lock lock(
thread_info_[i].queue_mutex);
119 info->threaded = threaded;
142 size_t smallest = std::numeric_limits<size_t>::max();
143 uint32_t smallest_index = 0xffffffff;
189 V_Queue::iterator it = local_waiting.begin();
190 V_Queue::iterator end = local_waiting.end();
191 for (; it != end; ++it)
195 M_Queue::iterator it =
queues_.find(queue.get());
210 boost::mutex::scoped_lock lock(info->st_mutex);
212 if (info->in_thread == 0)
227 ti->
queue.push_back(std::make_pair(queue, info));
229 #ifdef NODELET_QUEUE_DEBUG
232 ti->history.push_back(ThreadInfo::Record(stamp, tasks,
true));
241 local_waiting.clear();
247 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > local_queues;
264 info->
queue.swap(local_queues);
267 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator it = local_queues.begin();
268 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator end = local_queues.end();
269 for (; it != end; ++it)
281 boost::mutex::scoped_lock lock(qi->st_mutex);
286 local_queues.clear();