callback_queue_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of the Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
32 #include <ros/callback_queue.h>
33 
34 #include <boost/thread/thread.hpp>
35 #include <boost/thread/condition_variable.hpp>
36 #include <boost/type_traits/alignment_of.hpp>
37 #include <boost/bind/bind.hpp>
38 
39 #include <ros/assert.h>
40 
41 namespace nodelet
42 {
43 namespace detail
44 {
45 
46 CallbackQueueManager::CallbackQueueManager(uint32_t num_worker_threads)
47 : running_(true),
48  num_worker_threads_(num_worker_threads)
49 {
50  if (num_worker_threads_ == 0)
51  num_worker_threads_ = boost::thread::hardware_concurrency();
52 
53  tg_.create_thread(boost::bind(&CallbackQueueManager::managerThread, this));
54 
55  size_t num_threads = getNumWorkerThreads();
56  thread_info_.reset( new ThreadInfo[num_threads] );
57  for (size_t i = 0; i < num_threads; ++i)
58  {
59  tg_.create_thread(boost::bind(&CallbackQueueManager::workerThread, this, &thread_info_[i]));
60  }
61 }
62 
64 {
65  stop();
66 
67 #ifdef NODELET_QUEUE_DEBUG
68  // Write out task assignment histories for each thread
69  typedef ThreadInfo::Record Record;
70  size_t num_threads = getNumWorkerThreads();
71  for (size_t i = 0; i < num_threads; ++i)
72  {
73  char filename[128];
74  sprintf(filename, "thread_history_%d.txt", (int)i);
75  FILE* file = fopen(filename, "w");
76  fprintf(file, "# timestamps tasks threaded\n");
77  const std::vector<Record>& history = thread_info_[i].history;
78  for (int j = 0; j < (int)history.size(); ++j)
79  {
80  Record r = history[j];
81  fprintf(file, "%.6f %u %d\n", r.stamp, r.tasks, (int)r.threaded);
82  }
83  fclose(file);
84  }
85 #endif
86 }
87 
89 {
90  running_ = false;
91  {
92  boost::mutex::scoped_lock lock(waiting_mutex_);
93  waiting_cond_.notify_all();
94  }
95 
96  size_t num_threads = getNumWorkerThreads();
97  for (size_t i = 0; i < num_threads; ++i)
98  {
99  boost::mutex::scoped_lock lock(thread_info_[i].queue_mutex);
100  thread_info_[i].queue_cond.notify_all();
101  }
102 
103  tg_.join_all();
104 }
105 
107 {
108  return num_worker_threads_;
109 }
110 
111 void CallbackQueueManager::addQueue(const CallbackQueuePtr& queue, bool threaded)
112 {
113  boost::mutex::scoped_lock lock(queues_mutex_);
114 
115  QueueInfoPtr& info = queues_[queue.get()];
116  ROS_ASSERT(!info);
117  info.reset(new QueueInfo);
118  info->queue = queue;
119  info->threaded = threaded;
120 }
121 
123 {
124  boost::mutex::scoped_lock lock(queues_mutex_);
125  ROS_ASSERT(queues_.find(queue.get()) != queues_.end());
126 
127  queues_.erase(queue.get());
128 }
129 
131 {
132  {
133  boost::mutex::scoped_lock lock(waiting_mutex_);
134  waiting_.push_back(queue);
135  }
136 
137  waiting_cond_.notify_all();
138 }
139 
141 {
142  size_t smallest = std::numeric_limits<size_t>::max();
143  uint32_t smallest_index = 0xffffffff;
144  for (unsigned i = 0; i < num_worker_threads_; ++i)
145  {
146  ThreadInfo& ti = thread_info_[i];
147 
148  size_t size = ti.calling;
149  if (size == 0)
150  {
151  return &ti;
152  }
153 
154  if (size < smallest)
155  {
156  smallest = size;
157  smallest_index = i;
158  }
159  }
160 
161  return &thread_info_[smallest_index];
162 }
163 
165 {
166  V_Queue local_waiting;
167 
168  while (running_)
169  {
170  {
171  boost::mutex::scoped_lock lock(waiting_mutex_);
172 
173  while (waiting_.empty() && running_)
174  {
175  waiting_cond_.wait(lock);
176  }
177 
178  if (!running_)
179  {
180  return;
181  }
182 
183  local_waiting.swap(waiting_);
184  }
185 
186  {
187  boost::mutex::scoped_lock lock(queues_mutex_);
188 
189  V_Queue::iterator it = local_waiting.begin();
190  V_Queue::iterator end = local_waiting.end();
191  for (; it != end; ++it)
192  {
193  CallbackQueuePtr& queue = *it;
194 
195  M_Queue::iterator it = queues_.find(queue.get());
196  if (it != queues_.end())
197  {
198  QueueInfoPtr& info = it->second;
199  ThreadInfo* ti = 0;
200  if (info->threaded)
201  {
202  // If this queue is thread-safe we immediately add it to the thread with the least work queued
203  ti = getSmallestQueue();
204  }
205  else
206  {
207  // If this queue is non-thread-safe and has no in-progress calls happening, we add it to the
208  // thread with the least work queued. If the queue already has calls in-progress we add it
209  // to the thread it's already being called from
210  boost::mutex::scoped_lock lock(info->st_mutex);
211 
212  if (info->in_thread == 0)
213  {
214  ti = getSmallestQueue();
215  info->thread_index = ti - thread_info_.get();
216  }
217  else
218  {
219  ti = &thread_info_[info->thread_index];
220  }
221 
222  ++info->in_thread;
223  }
224 
225  {
226  boost::mutex::scoped_lock lock(ti->queue_mutex);
227  ti->queue.push_back(std::make_pair(queue, info));
228  ++ti->calling;
229 #ifdef NODELET_QUEUE_DEBUG
230  double stamp = ros::WallTime::now().toSec();
231  uint32_t tasks = ti->calling;
232  ti->history.push_back(ThreadInfo::Record(stamp, tasks, true));
233 #endif
234  }
235 
236  ti->queue_cond.notify_all();
237  }
238  }
239  }
240 
241  local_waiting.clear();
242  }
243 }
244 
246 {
247  std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > local_queues;
248 
249  while (running_)
250  {
251  {
252  boost::mutex::scoped_lock lock(info->queue_mutex);
253 
254  while (info->queue.empty() && running_)
255  {
256  info->queue_cond.wait(lock);
257  }
258 
259  if (!running_)
260  {
261  return;
262  }
263 
264  info->queue.swap(local_queues);
265  }
266 
267  std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator it = local_queues.begin();
268  std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator end = local_queues.end();
269  for (; it != end; ++it)
270  {
271  CallbackQueuePtr& queue = it->first;
272  QueueInfoPtr& qi = it->second;
273  if (queue->callOne() == ros::CallbackQueue::TryAgain)
274  {
275  callbackAdded(queue);
276  }
277  --info->calling;
278 
279  if (!qi->threaded)
280  {
281  boost::mutex::scoped_lock lock(qi->st_mutex);
282  --qi->in_thread;
283  }
284  }
285 
286  local_queues.clear();
287 
288  }
289 }
290 
291 } // namespace detail
292 } // namespace nodelet
nodelet::detail::CallbackQueueManager::waiting_
V_Queue waiting_
Definition: callback_queue_manager.h:116
nodelet::detail::CallbackQueueManager::ThreadInfo::queue_cond
boost::condition_variable queue_cond
Definition: callback_queue_manager.h:129
nodelet::detail::CallbackQueueManager::running_
bool running_
Definition: callback_queue_manager.h:161
boost::shared_ptr< CallbackQueue >
ros::CallbackQueue::TryAgain
TryAgain
nodelet::detail::CallbackQueueManager::removeQueue
void removeQueue(const CallbackQueuePtr &queue)
Definition: callback_queue_manager.cpp:122
nodelet::detail::CallbackQueueManager::QueueInfo
Definition: callback_queue_manager.h:91
TimeBase< WallTime, WallDuration >::toSec
double toSec() const
nodelet::detail::CallbackQueueManager::callbackAdded
void callbackAdded(const CallbackQueuePtr &queue)
Definition: callback_queue_manager.cpp:130
nodelet::detail::CallbackQueueManager::queues_mutex_
boost::mutex queues_mutex_
Definition: callback_queue_manager.h:112
nodelet::detail::CallbackQueueManager::CallbackQueueManager
CallbackQueueManager(uint32_t num_worker_threads=0)
Constructor.
Definition: callback_queue_manager.cpp:46
nodelet::detail::CallbackQueueManager::queues_
M_Queue queues_
Definition: callback_queue_manager.h:111
nodelet::detail::CallbackQueueManager::getNumWorkerThreads
uint32_t getNumWorkerThreads()
Definition: callback_queue_manager.cpp:106
nodelet::detail::CallbackQueueManager::waiting_mutex_
boost::mutex waiting_mutex_
Definition: callback_queue_manager.h:117
nodelet::detail::CallbackQueueManager::ThreadInfo::queue
std::vector< std::pair< CallbackQueuePtr, QueueInfoPtr > > queue
Definition: callback_queue_manager.h:130
nodelet::detail::CallbackQueueManager::stop
void stop()
Definition: callback_queue_manager.cpp:88
ros::WallTime::now
static WallTime now()
nodelet::detail::CallbackQueueManager::~CallbackQueueManager
~CallbackQueueManager()
Definition: callback_queue_manager.cpp:63
nodelet::detail::CallbackQueueManager::ThreadInfo::calling
boost::detail::atomic_count calling
Definition: callback_queue_manager.h:131
nodelet::detail::CallbackQueueManager::num_worker_threads_
uint32_t num_worker_threads_
Definition: callback_queue_manager.h:162
nodelet::detail::CallbackQueueManager::tg_
boost::thread_group tg_
Definition: callback_queue_manager.h:119
callback_queue.h
nodelet::detail::CallbackQueueManager::getSmallestQueue
ThreadInfo * getSmallestQueue()
Definition: callback_queue_manager.cpp:140
nodelet::detail::CallbackQueueManager::waiting_cond_
boost::condition_variable waiting_cond_
Definition: callback_queue_manager.h:118
nodelet::detail::CallbackQueueManager::managerThread
void managerThread()
Definition: callback_queue_manager.cpp:164
nodelet::detail::CallbackQueueManager::ThreadInfo
Definition: callback_queue_manager.h:121
nodelet::detail::CallbackQueueManager::thread_info_
V_ThreadInfo thread_info_
Definition: callback_queue_manager.h:159
nodelet
Definition: callback_queue.h:44
nodelet::detail::CallbackQueueManager::workerThread
void workerThread(ThreadInfo *)
Definition: callback_queue_manager.cpp:245
nodelet::detail::CallbackQueueManager::V_Queue
std::vector< CallbackQueuePtr > V_Queue
Definition: callback_queue_manager.h:115
callback_queue.h
nodelet::detail::CallbackQueueManager::ThreadInfo::queue_mutex
boost::mutex queue_mutex
Definition: callback_queue_manager.h:128
assert.h
ROS_ASSERT
#define ROS_ASSERT(cond)
callback_queue_manager.h
nodelet::detail::CallbackQueueManager::addQueue
void addQueue(const CallbackQueuePtr &queue, bool threaded)
Definition: callback_queue_manager.cpp:111


nodelet
Author(s): Tully Foote, Radu Bogdan Rusu, Michael Carroll
autogenerated on Fri Jan 12 2024 03:40:39