callback_queue_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of the Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
32 #include <ros/callback_queue.h>
33 
34 #include <boost/thread/thread.hpp>
35 #include <boost/thread/condition_variable.hpp>
36 #include <boost/type_traits/alignment_of.hpp>
37 #include <boost/bind.hpp>
38 
39 #include <ros/assert.h>
40 
41 namespace nodelet
42 {
43 namespace detail
44 {
45 
46 CallbackQueueManager::CallbackQueueManager(uint32_t num_worker_threads)
47 : running_(true),
48  num_worker_threads_(num_worker_threads)
49 {
50  if (num_worker_threads_ == 0)
51  num_worker_threads_ = boost::thread::hardware_concurrency();
52 
53  tg_.create_thread(boost::bind(&CallbackQueueManager::managerThread, this));
54 
55  size_t num_threads = getNumWorkerThreads();
56  thread_info_.reset( new ThreadInfo[num_threads] );
57  for (size_t i = 0; i < num_threads; ++i)
58  {
59  tg_.create_thread(boost::bind(&CallbackQueueManager::workerThread, this, &thread_info_[i]));
60  }
61 }
62 
64 {
65  stop();
66 
67 #ifdef NODELET_QUEUE_DEBUG
68  // Write out task assignment histories for each thread
69  typedef ThreadInfo::Record Record;
70  for (size_t i = 0; i < num_threads; ++i)
71  {
72  char filename[128];
73  sprintf(filename, "thread_history_%d.txt", (int)i);
74  FILE* file = fopen(filename, "w");
75  fprintf(file, "# timestamps tasks threaded\n");
76  const std::vector<Record>& history = thread_info_[i].history;
77  for (int j = 0; j < (int)history.size(); ++j)
78  {
79  Record r = history[j];
80  fprintf(file, "%.6f %u %d\n", r.stamp, r.tasks, (int)r.threaded);
81  }
82  fclose(file);
83  }
84 #endif
85 }
86 
88 {
89  running_ = false;
90  {
91  boost::mutex::scoped_lock lock(waiting_mutex_);
92  waiting_cond_.notify_all();
93  }
94 
95  size_t num_threads = getNumWorkerThreads();
96  for (size_t i = 0; i < num_threads; ++i)
97  {
98  boost::mutex::scoped_lock lock(thread_info_[i].queue_mutex);
99  thread_info_[i].queue_cond.notify_all();
100  }
101 
102  tg_.join_all();
103 }
104 
106 {
107  return num_worker_threads_;
108 }
109 
110 void CallbackQueueManager::addQueue(const CallbackQueuePtr& queue, bool threaded)
111 {
112  boost::mutex::scoped_lock lock(queues_mutex_);
113 
114  QueueInfoPtr& info = queues_[queue.get()];
115  ROS_ASSERT(!info);
116  info.reset(new QueueInfo);
117  info->queue = queue;
118  info->threaded = threaded;
119 }
120 
122 {
123  boost::mutex::scoped_lock lock(queues_mutex_);
124  ROS_ASSERT(queues_.find(queue.get()) != queues_.end());
125 
126  queues_.erase(queue.get());
127 }
128 
130 {
131  {
132  boost::mutex::scoped_lock lock(waiting_mutex_);
133  waiting_.push_back(queue);
134  }
135 
136  waiting_cond_.notify_all();
137 }
138 
140 {
141  size_t smallest = std::numeric_limits<size_t>::max();
142  uint32_t smallest_index = 0xffffffff;
143  for (unsigned i = 0; i < num_worker_threads_; ++i)
144  {
145  ThreadInfo& ti = thread_info_[i];
146 
147  size_t size = ti.calling;
148  if (size == 0)
149  {
150  return &ti;
151  }
152 
153  if (size < smallest)
154  {
155  smallest = size;
156  smallest_index = i;
157  }
158  }
159 
160  return &thread_info_[smallest_index];
161 }
162 
164 {
165  V_Queue local_waiting;
166 
167  while (running_)
168  {
169  {
170  boost::mutex::scoped_lock lock(waiting_mutex_);
171 
172  while (waiting_.empty() && running_)
173  {
174  waiting_cond_.wait(lock);
175  }
176 
177  if (!running_)
178  {
179  return;
180  }
181 
182  local_waiting.swap(waiting_);
183  }
184 
185  {
186  boost::mutex::scoped_lock lock(queues_mutex_);
187 
188  V_Queue::iterator it = local_waiting.begin();
189  V_Queue::iterator end = local_waiting.end();
190  for (; it != end; ++it)
191  {
192  CallbackQueuePtr& queue = *it;
193 
194  M_Queue::iterator it = queues_.find(queue.get());
195  if (it != queues_.end())
196  {
197  QueueInfoPtr& info = it->second;
198  ThreadInfo* ti = 0;
199  if (info->threaded)
200  {
201  // If this queue is thread-safe we immediately add it to the thread with the least work queued
202  ti = getSmallestQueue();
203  }
204  else
205  {
206  // If this queue is non-thread-safe and has no in-progress calls happening, we add it to the
207  // thread with the least work queued. If the queue already has calls in-progress we add it
208  // to the thread it's already being called from
209  boost::mutex::scoped_lock lock(info->st_mutex);
210 
211  if (info->in_thread == 0)
212  {
213  ti = getSmallestQueue();
214  info->thread_index = ti - thread_info_.get();
215  }
216  else
217  {
218  ti = &thread_info_[info->thread_index];
219  }
220 
221  ++info->in_thread;
222  }
223 
224  {
225  boost::mutex::scoped_lock lock(ti->queue_mutex);
226  ti->queue.push_back(std::make_pair(queue, info));
227  ++ti->calling;
228 #ifdef NODELET_QUEUE_DEBUG
229  double stamp = ros::WallTime::now().toSec();
230  uint32_t tasks = ti->calling;
231  ti->history.push_back(ThreadInfo::Record(stamp, tasks, true));
232 #endif
233  }
234 
235  ti->queue_cond.notify_all();
236  }
237  }
238  }
239 
240  local_waiting.clear();
241  }
242 }
243 
245 {
246  std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > local_queues;
247 
248  while (running_)
249  {
250  {
251  boost::mutex::scoped_lock lock(info->queue_mutex);
252 
253  while (info->queue.empty() && running_)
254  {
255  info->queue_cond.wait(lock);
256  }
257 
258  if (!running_)
259  {
260  return;
261  }
262 
263  info->queue.swap(local_queues);
264  }
265 
266  std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator it = local_queues.begin();
267  std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >::iterator end = local_queues.end();
268  for (; it != end; ++it)
269  {
270  CallbackQueuePtr& queue = it->first;
271  QueueInfoPtr& qi = it->second;
272  if (queue->callOne() == ros::CallbackQueue::TryAgain)
273  {
274  callbackAdded(queue);
275  }
276  --info->calling;
277 
278  if (!qi->threaded)
279  {
280  boost::mutex::scoped_lock lock(qi->st_mutex);
281  --qi->in_thread;
282  }
283  }
284 
285  local_queues.clear();
286 
287  }
288 }
289 
290 } // namespace detail
291 } // namespace nodelet
void removeQueue(const CallbackQueuePtr &queue)
CallbackQueueManager(uint32_t num_worker_threads=0)
Constructor.
void addQueue(const CallbackQueuePtr &queue, bool threaded)
void callbackAdded(const CallbackQueuePtr &queue)
std::vector< CallbackQueuePtr > V_Queue
std::vector< std::pair< CallbackQueuePtr, QueueInfoPtr > > queue
static WallTime now()
#define ROS_ASSERT(cond)


nodelet
Author(s): Tully Foote, Radu Bogdan Rusu
autogenerated on Mon Feb 28 2022 22:57:12