callback_queue.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2009, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 // Make sure we use CLOCK_MONOTONIC for the condition variable wait_for if not Apple.
36 #ifndef __APPLE__
37 #define BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC
38 #endif
39 
40 #include "ros/callback_queue.h"
41 #include "ros/assert.h"
42 #include <boost/scope_exit.hpp>
43 
44 // check if we have really included the backported boost condition variable
45 // just in case someone messes with the include order...
46 #if BOOST_VERSION < 106100
47 #ifndef USING_BACKPORTED_BOOST_CONDITION_VARIABLE
48 #error "needs boost version >= 1.61 or the backported headers!"
49 #endif
50 #endif
51 
52 namespace ros
53 {
54 
56 : calling_(0)
57 , enabled_(enabled)
58 {
59 }
60 
62 {
63  disable();
64 }
65 
67 {
68  boost::mutex::scoped_lock lock(mutex_);
69  enabled_ = true;
70 
72 }
73 
75 {
76  boost::mutex::scoped_lock lock(mutex_);
77  enabled_ = false;
78 
80 }
81 
83 {
84  boost::mutex::scoped_lock lock(mutex_);
85 
86  callbacks_.clear();
87 }
88 
90 {
91  boost::mutex::scoped_lock lock(mutex_);
92 
93  return callbacks_.empty() && calling_ == 0;
94 }
95 
97 {
98  boost::mutex::scoped_lock lock(mutex_);
99 
100  return enabled_;
101 }
102 
104 {
105  if (!tls_.get())
106  {
107  tls_.reset(new TLS);
108  }
109 }
110 
111 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id)
112 {
113  CallbackInfo info;
114  info.callback = callback;
115  info.removal_id = removal_id;
116 
117  {
118  boost::mutex::scoped_lock lock(id_info_mutex_);
119 
120  M_IDInfo::iterator it = id_info_.find(removal_id);
121  if (it == id_info_.end())
122  {
123  IDInfoPtr id_info(boost::make_shared<IDInfo>());
124  id_info->id = removal_id;
125  id_info_.insert(std::make_pair(removal_id, id_info));
126  }
127  }
128 
129  {
130  boost::mutex::scoped_lock lock(mutex_);
131 
132  if (!enabled_)
133  {
134  return;
135  }
136 
137  callbacks_.push_back(info);
138  }
139 
141 }
142 
144 {
145  boost::mutex::scoped_lock lock(id_info_mutex_);
146  M_IDInfo::iterator it = id_info_.find(id);
147  if (it != id_info_.end())
148  {
149  return it->second;
150  }
151 
152  return IDInfoPtr();
153 }
154 
155 void CallbackQueue::removeByID(uint64_t removal_id)
156 {
157  setupTLS();
158 
159  {
160  IDInfoPtr id_info;
161  {
162  boost::mutex::scoped_lock lock(id_info_mutex_);
163  M_IDInfo::iterator it = id_info_.find(removal_id);
164  if (it != id_info_.end())
165  {
166  id_info = it->second;
167  }
168  else
169  {
170  return;
171  }
172  }
173 
174  // If we're being called from within a callback from our queue, we must unlock the shared lock we already own
175  // here so that we can take a unique lock. We'll re-lock it later.
176  if (tls_->calling_in_this_thread == id_info->id)
177  {
178  id_info->calling_rw_mutex.unlock_shared();
179  }
180 
181  {
182  boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
183  boost::mutex::scoped_lock lock(mutex_);
184  D_CallbackInfo::iterator it = callbacks_.begin();
185  for (; it != callbacks_.end();)
186  {
187  CallbackInfo& info = *it;
188  if (info.removal_id == removal_id)
189  {
190  it = callbacks_.erase(it);
191  }
192  else
193  {
194  ++it;
195  }
196  }
197  }
198 
199  if (tls_->calling_in_this_thread == id_info->id)
200  {
201  id_info->calling_rw_mutex.lock_shared();
202  }
203  }
204 
205  // If we're being called from within a callback, we need to remove the callbacks that match the id that have already been
206  // popped off the queue
207  {
208  D_CallbackInfo::iterator it = tls_->callbacks.begin();
209  D_CallbackInfo::iterator end = tls_->callbacks.end();
210  for (; it != end; ++it)
211  {
212  CallbackInfo& info = *it;
213  if (info.removal_id == removal_id)
214  {
215  info.marked_for_removal = true;
216  }
217  }
218  }
219 
220  {
221  boost::mutex::scoped_lock lock(id_info_mutex_);
222  id_info_.erase(removal_id);
223  }
224 }
225 
227 {
228  setupTLS();
229  TLS* tls = tls_.get();
230 
231  CallbackInfo cb_info;
232 
233  {
234  boost::mutex::scoped_lock lock(mutex_);
235 
236  if (!enabled_)
237  {
238  return Disabled;
239  }
240 
241  if (callbacks_.empty())
242  {
243  if (!timeout.isZero())
244  {
245  condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
246  }
247 
248  if (callbacks_.empty())
249  {
250  return Empty;
251  }
252 
253  if (!enabled_)
254  {
255  return Disabled;
256  }
257  }
258 
259  D_CallbackInfo::iterator it = callbacks_.begin();
260  for (; it != callbacks_.end();)
261  {
262  CallbackInfo& info = *it;
263 
264  if (info.marked_for_removal)
265  {
266  it = callbacks_.erase(it);
267  continue;
268  }
269 
270  if (info.callback->ready())
271  {
272  cb_info = info;
273  it = callbacks_.erase(it);
274  break;
275  }
276 
277  ++it;
278  }
279 
280  if (!cb_info.callback)
281  {
282  return TryAgain;
283  }
284 
285  ++calling_;
286  }
287 
288  bool was_empty = tls->callbacks.empty();
289  tls->callbacks.push_back(cb_info);
290  if (was_empty)
291  {
292  tls->cb_it = tls->callbacks.begin();
293  }
294 
295  CallOneResult res = callOneCB(tls);
296  if (res != Empty)
297  {
298  boost::mutex::scoped_lock lock(mutex_);
299  --calling_;
300  }
301  return res;
302 }
303 
305 {
306  setupTLS();
307  TLS* tls = tls_.get();
308 
309  {
310  boost::mutex::scoped_lock lock(mutex_);
311 
312  if (!enabled_)
313  {
314  return;
315  }
316 
317  if (callbacks_.empty())
318  {
319  if (!timeout.isZero())
320  {
321  condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
322  }
323 
324  if (callbacks_.empty() || !enabled_)
325  {
326  return;
327  }
328  }
329 
330  bool was_empty = tls->callbacks.empty();
331 
332  tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
333  callbacks_.clear();
334 
335  calling_ += tls->callbacks.size();
336 
337  if (was_empty)
338  {
339  tls->cb_it = tls->callbacks.begin();
340  }
341  }
342 
343  size_t called = 0;
344 
345  while (!tls->callbacks.empty())
346  {
347  if (callOneCB(tls) != Empty)
348  {
349  ++called;
350  }
351  }
352 
353  {
354  boost::mutex::scoped_lock lock(mutex_);
355  calling_ -= called;
356  }
357 }
358 
360 {
361  // Check for a recursive call. If recursive, increment the current iterator. Otherwise
362  // set the iterator it the beginning of the thread-local callbacks
363  if (tls->calling_in_this_thread == 0xffffffffffffffffULL)
364  {
365  tls->cb_it = tls->callbacks.begin();
366  }
367 
368  if (tls->cb_it == tls->callbacks.end())
369  {
370  return Empty;
371  }
372 
373  ROS_ASSERT(!tls->callbacks.empty());
374  ROS_ASSERT(tls->cb_it != tls->callbacks.end());
375 
376  CallbackInfo info = *tls->cb_it;
377  CallbackInterfacePtr& cb = info.callback;
378 
379  IDInfoPtr id_info = getIDInfo(info.removal_id);
380  if (id_info)
381  {
382  boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
383 
384  uint64_t last_calling = tls->calling_in_this_thread;
385  tls->calling_in_this_thread = id_info->id;
386 
388 
389  {
390  // Ensure that thread id gets restored, even if callback throws.
391  // This is done with RAII rather than try-catch so that the source
392  // of the original exception is not masked in a crash report.
393  BOOST_SCOPE_EXIT(&tls, &last_calling)
394  {
395  tls->calling_in_this_thread = last_calling;
396  }
397  BOOST_SCOPE_EXIT_END
398 
399  if (info.marked_for_removal)
400  {
401  tls->cb_it = tls->callbacks.erase(tls->cb_it);
402  }
403  else
404  {
405  tls->cb_it = tls->callbacks.erase(tls->cb_it);
406  result = cb->call();
407  }
408  }
409 
410  // Push TryAgain callbacks to the back of the shared queue
411  if (result == CallbackInterface::TryAgain && !info.marked_for_removal)
412  {
413  boost::mutex::scoped_lock lock(mutex_);
414  callbacks_.push_back(info);
415 
416  return TryAgain;
417  }
418 
419  return Called;
420  }
421  else
422  {
423  tls->cb_it = tls->callbacks.erase(tls->cb_it);
424  }
425 
426  return Called;
427 }
428 
429 }
boost::mutex mutex_
CallResult
Possible results for the call() method.
boost::thread_specific_ptr< TLS > tls_
D_CallbackInfo::iterator cb_it
void disable()
Disable the queue, meaning any calls to addCallback() will have no effect.
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
Add a callback, with an optional owner id. The owner id can be used to remove a set of callbacks from...
void enable()
Enable the queue (queue is enabled by default)
CallOneResult callOne()
Pop a single callback off the front of the queue and invoke it. If the callback was not ready to be c...
D_CallbackInfo callbacks_
boost::shared_ptr< IDInfo > IDInfoPtr
CallbackQueue(bool enabled=true)
void callAvailable()
Invoke all callbacks currently in the queue. If a callback was not ready to be called, pushes it back onto the queue.
CallOneResult callOneCB(TLS *tls)
bool isEnabled()
Returns whether or not this queue is enabled.
D_CallbackInfo callbacks
boost::mutex id_info_mutex_
virtual void removeByID(uint64_t removal_id)
Remove all callbacks associated with an owner id.
Call not ready, try again later.
IDInfoPtr getIDInfo(uint64_t id)
boost::condition_variable condition_
bool isEmpty()
returns whether or not the queue is empty
#define ROS_ASSERT(cond)
void clear()
Removes all callbacks from the queue. Does not wait for calls currently in progress to finish...


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Sun Feb 3 2019 03:29:54