callback_queue.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2009, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/callback_queue.h"
36 #include "ros/assert.h"
37 #include <boost/scope_exit.hpp>
38 
39 namespace ros
40 {
41 
43 : calling_(0)
44 , enabled_(enabled)
45 {
46 }
47 
49 {
50  disable();
51 }
52 
54 {
55  boost::mutex::scoped_lock lock(mutex_);
56  enabled_ = true;
57 
58  condition_.notify_all();
59 }
60 
62 {
63  boost::mutex::scoped_lock lock(mutex_);
64  enabled_ = false;
65 
66  condition_.notify_all();
67 }
68 
70 {
71  boost::mutex::scoped_lock lock(mutex_);
72 
73  callbacks_.clear();
74 }
75 
77 {
78  boost::mutex::scoped_lock lock(mutex_);
79 
80  return callbacks_.empty() && calling_ == 0;
81 }
82 
84 {
85  boost::mutex::scoped_lock lock(mutex_);
86 
87  return enabled_;
88 }
89 
91 {
92  if (!tls_.get())
93  {
94  tls_.reset(new TLS);
95  }
96 }
97 
98 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id)
99 {
100  CallbackInfo info;
101  info.callback = callback;
102  info.removal_id = removal_id;
103 
104  {
105  boost::mutex::scoped_lock lock(id_info_mutex_);
106 
107  M_IDInfo::iterator it = id_info_.find(removal_id);
108  if (it == id_info_.end())
109  {
110  IDInfoPtr id_info(boost::make_shared<IDInfo>());
111  id_info->id = removal_id;
112  id_info_.insert(std::make_pair(removal_id, id_info));
113  }
114  }
115 
116  {
117  boost::mutex::scoped_lock lock(mutex_);
118 
119  if (!enabled_)
120  {
121  return;
122  }
123 
124  callbacks_.push_back(info);
125  }
126 
127  if (callback->ready())
128  {
129  condition_.notify_one();
130  }
131 }
132 
134 {
135  boost::mutex::scoped_lock lock(id_info_mutex_);
136  M_IDInfo::iterator it = id_info_.find(id);
137  if (it != id_info_.end())
138  {
139  return it->second;
140  }
141 
142  return IDInfoPtr();
143 }
144 
145 void CallbackQueue::removeByID(uint64_t removal_id)
146 {
147  setupTLS();
148 
149  {
150  IDInfoPtr id_info;
151  {
152  boost::mutex::scoped_lock lock(id_info_mutex_);
153  M_IDInfo::iterator it = id_info_.find(removal_id);
154  if (it != id_info_.end())
155  {
156  id_info = it->second;
157  }
158  else
159  {
160  return;
161  }
162  }
163 
164  // If we're being called from within a callback from our queue, we must unlock the shared lock we already own
165  // here so that we can take a unique lock. We'll re-lock it later.
166  if (tls_->calling_in_this_thread == id_info->id)
167  {
168  id_info->calling_rw_mutex.unlock_shared();
169  }
170 
171  {
172  boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex, boost::defer_lock);
173  if (rw_lock.try_lock())
174  {
175  boost::mutex::scoped_lock lock(mutex_);
176  D_CallbackInfo::iterator it = callbacks_.begin();
177  for (; it != callbacks_.end();)
178  {
179  CallbackInfo& info = *it;
180  if (info.removal_id == removal_id)
181  {
182  it = callbacks_.erase(it);
183  }
184  else
185  {
186  ++it;
187  }
188  }
189  }
190  else
191  {
192  // We failed to acquire the lock, it can be that we are trying to remove something from the callback queue
193  // while it is being executed. Mark it for removal and let it be cleaned up later.
194  boost::mutex::scoped_lock lock(mutex_);
195  for (D_CallbackInfo::iterator it = callbacks_.begin(); it != callbacks_.end(); it++)
196  {
197  CallbackInfo& info = *it;
198  if (info.removal_id == removal_id)
199  {
200  info.marked_for_removal = true;
201  }
202  }
203  }
204  }
205 
206  if (tls_->calling_in_this_thread == id_info->id)
207  {
208  id_info->calling_rw_mutex.lock_shared();
209  }
210  }
211 
212  // If we're being called from within a callback, we need to remove the callbacks that match the id that have already been
213  // popped off the queue
214  {
215  D_CallbackInfo::iterator it = tls_->callbacks.begin();
216  D_CallbackInfo::iterator end = tls_->callbacks.end();
217  for (; it != end; ++it)
218  {
219  CallbackInfo& info = *it;
220  if (info.removal_id == removal_id)
221  {
222  info.marked_for_removal = true;
223  }
224  }
225  }
226 
227  {
228  boost::mutex::scoped_lock lock(id_info_mutex_);
229  id_info_.erase(removal_id);
230  }
231 }
232 
234 {
235  setupTLS();
236  TLS* tls = tls_.get();
237 
238  CallbackInfo cb_info;
239 
240  {
241  boost::mutex::scoped_lock lock(mutex_);
242 
243  if (!enabled_)
244  {
245  return Disabled;
246  }
247 
248  boost::chrono::steady_clock::time_point wait_until =
249  boost::chrono::steady_clock::now() + boost::chrono::nanoseconds(timeout.toNSec());
250  while (!cb_info.callback) {
251  D_CallbackInfo::iterator it = callbacks_.begin();
252  for (; it != callbacks_.end();)
253  {
254  CallbackInfo& info = *it;
255 
256  if (info.marked_for_removal)
257  {
258  it = callbacks_.erase(it);
259  continue;
260  }
261 
262  if (info.callback->ready())
263  {
264  cb_info = info;
265  it = callbacks_.erase(it);
266  break;
267  }
268 
269  ++it;
270  }
271 
272  // Found a ready callback?
273  if (cb_info.callback) {
274  break;
275  }
276 
277  boost::cv_status wait_status = boost::cv_status::timeout;
278  if (!timeout.isZero())
279  {
280  wait_status = condition_.wait_until(lock, wait_until);
281  }
282 
283  if (callbacks_.empty())
284  {
285  return Empty;
286  }
287 
288  if (!enabled_)
289  {
290  return Disabled;
291  }
292 
293  if (wait_status == boost::cv_status::timeout)
294  {
295  return TryAgain;
296  }
297  }
298 
299  ++calling_;
300  }
301 
302  bool was_empty = tls->callbacks.empty();
303  tls->callbacks.push_back(cb_info);
304  if (was_empty)
305  {
306  tls->cb_it = tls->callbacks.begin();
307  }
308 
309  CallOneResult res = callOneCB(tls);
310  if (res != Empty)
311  {
312  boost::mutex::scoped_lock lock(mutex_);
313  --calling_;
314  }
315  return res;
316 }
317 
319 {
320  setupTLS();
321  TLS* tls = tls_.get();
322 
323  {
324  boost::mutex::scoped_lock lock(mutex_);
325 
326  if (!enabled_)
327  {
328  return;
329  }
330 
331  if (callbacks_.empty())
332  {
333  if (!timeout.isZero())
334  {
335  condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
336  }
337 
338  if (callbacks_.empty() || !enabled_)
339  {
340  return;
341  }
342  }
343 
344  bool was_empty = tls->callbacks.empty();
345 
346  tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
347  callbacks_.clear();
348 
349  calling_ += tls->callbacks.size();
350 
351  if (was_empty)
352  {
353  tls->cb_it = tls->callbacks.begin();
354  }
355  }
356 
357  size_t called = 0;
358 
359  while (!tls->callbacks.empty())
360  {
361  if (callOneCB(tls) != Empty)
362  {
363  ++called;
364  }
365  }
366 
367  {
368  boost::mutex::scoped_lock lock(mutex_);
369  calling_ -= called;
370  }
371 }
372 
374 {
375  // Check for a recursive call. If recursive, increment the current iterator. Otherwise
376  // set the iterator it the beginning of the thread-local callbacks
377  if (tls->calling_in_this_thread == 0xffffffffffffffffULL)
378  {
379  tls->cb_it = tls->callbacks.begin();
380  }
381 
382  if (tls->cb_it == tls->callbacks.end())
383  {
384  return Empty;
385  }
386 
387  ROS_ASSERT(!tls->callbacks.empty());
388  ROS_ASSERT(tls->cb_it != tls->callbacks.end());
389 
390  CallbackInfo info = *tls->cb_it;
391  CallbackInterfacePtr& cb = info.callback;
392 
393  IDInfoPtr id_info = getIDInfo(info.removal_id);
394  if (id_info)
395  {
396  boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
397 
398  uint64_t last_calling = tls->calling_in_this_thread;
399  tls->calling_in_this_thread = id_info->id;
400 
402 
403  {
404  // Ensure that thread id gets restored, even if callback throws.
405  // This is done with RAII rather than try-catch so that the source
406  // of the original exception is not masked in a crash report.
407  BOOST_SCOPE_EXIT(&tls, &last_calling)
408  {
409  tls->calling_in_this_thread = last_calling;
410  }
411  BOOST_SCOPE_EXIT_END
412 
413  if (info.marked_for_removal)
414  {
415  tls->cb_it = tls->callbacks.erase(tls->cb_it);
416  }
417  else
418  {
419  tls->cb_it = tls->callbacks.erase(tls->cb_it);
420  result = cb->call();
421  if (result == CallbackInterface::Success)
422  {
423  condition_.notify_one();
424  }
425  }
426  }
427 
428  // Push TryAgain callbacks to the back of the shared queue
429  if (result == CallbackInterface::TryAgain && !info.marked_for_removal)
430  {
431  boost::mutex::scoped_lock lock(mutex_);
432  callbacks_.push_back(info);
433 
434  return TryAgain;
435  }
436 
437  return Called;
438  }
439  else
440  {
441  tls->cb_it = tls->callbacks.erase(tls->cb_it);
442  }
443 
444  return Called;
445 }
446 
447 }
boost::mutex mutex_
CallResult
Possible results for the call() method.
boost::thread_specific_ptr< TLS > tls_
D_CallbackInfo::iterator cb_it
void disable()
Disable the queue, meaning any calls to addCallback() will have no effect.
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
Add a callback, with an optional owner id. The owner id can be used to remove a set of callbacks from...
void enable()
Enable the queue (queue is enabled by default)
ros::internal::condition_variable_monotonic condition_
CallOneResult callOne()
Pop a single callback off the front of the queue and invoke it. If the callback was not ready to be c...
D_CallbackInfo callbacks_
boost::shared_ptr< IDInfo > IDInfoPtr
CallbackQueue(bool enabled=true)
void callAvailable()
Invoke all callbacks currently in the queue. If a callback was not ready to be called, pushes it back onto the queue.
CallOneResult callOneCB(TLS *tls)
bool isEnabled()
Returns whether or not this queue is enabled.
D_CallbackInfo callbacks
boost::mutex id_info_mutex_
virtual void removeByID(uint64_t removal_id)
Remove all callbacks associated with an owner id.
Call not ready, try again later.
IDInfoPtr getIDInfo(uint64_t id)
bool isEmpty()
returns whether or not the queue is empty
#define ROS_ASSERT(cond)
void clear()
Removes all callbacks from the queue. Does not wait for calls currently in progress to finish...


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27