callback_queue.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2009, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/callback_queue.h"
36 #include "ros/assert.h"
37 #include <boost/scope_exit.hpp>
38 
39 namespace ros
40 {
41 
43 : calling_(0)
44 , enabled_(enabled)
45 {
46 }
47 
49 {
50  disable();
51 }
52 
54 {
55  boost::mutex::scoped_lock lock(mutex_);
56  enabled_ = true;
57 
58  condition_.notify_all();
59 }
60 
62 {
63  boost::mutex::scoped_lock lock(mutex_);
64  enabled_ = false;
65 
66  condition_.notify_all();
67 }
68 
70 {
71  boost::mutex::scoped_lock lock(mutex_);
72 
73  callbacks_.clear();
74 }
75 
77 {
78  boost::mutex::scoped_lock lock(mutex_);
79 
80  return callbacks_.empty() && calling_ == 0;
81 }
82 
84 {
85  boost::mutex::scoped_lock lock(mutex_);
86 
87  return enabled_;
88 }
89 
91 {
92  if (!tls_.get())
93  {
94  tls_.reset(new TLS);
95  }
96 }
97 
98 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id)
99 {
100  CallbackInfo info;
101  info.callback = callback;
102  info.removal_id = removal_id;
103 
104  {
105  boost::mutex::scoped_lock lock(id_info_mutex_);
106 
107  M_IDInfo::iterator it = id_info_.find(removal_id);
108  if (it == id_info_.end())
109  {
110  IDInfoPtr id_info(boost::make_shared<IDInfo>());
111  id_info->id = removal_id;
112  id_info_.insert(std::make_pair(removal_id, id_info));
113  }
114  }
115 
116  {
117  boost::mutex::scoped_lock lock(mutex_);
118 
119  if (!enabled_)
120  {
121  return;
122  }
123 
124  callbacks_.push_back(info);
125  }
126 
127  condition_.notify_one();
128 }
129 
131 {
132  boost::mutex::scoped_lock lock(id_info_mutex_);
133  M_IDInfo::iterator it = id_info_.find(id);
134  if (it != id_info_.end())
135  {
136  return it->second;
137  }
138 
139  return IDInfoPtr();
140 }
141 
142 void CallbackQueue::removeByID(uint64_t removal_id)
143 {
144  setupTLS();
145 
146  {
147  IDInfoPtr id_info;
148  {
149  boost::mutex::scoped_lock lock(id_info_mutex_);
150  M_IDInfo::iterator it = id_info_.find(removal_id);
151  if (it != id_info_.end())
152  {
153  id_info = it->second;
154  }
155  else
156  {
157  return;
158  }
159  }
160 
161  // If we're being called from within a callback from our queue, we must unlock the shared lock we already own
162  // here so that we can take a unique lock. We'll re-lock it later.
163  if (tls_->calling_in_this_thread == id_info->id)
164  {
165  id_info->calling_rw_mutex.unlock_shared();
166  }
167 
168  {
169  boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex, boost::defer_lock);
170  if (rw_lock.try_lock())
171  {
172  boost::mutex::scoped_lock lock(mutex_);
173  D_CallbackInfo::iterator it = callbacks_.begin();
174  for (; it != callbacks_.end();)
175  {
176  CallbackInfo& info = *it;
177  if (info.removal_id == removal_id)
178  {
179  it = callbacks_.erase(it);
180  }
181  else
182  {
183  ++it;
184  }
185  }
186  }
187  else
188  {
189  // We failed to acquire the lock, it can be that we are trying to remove something from the callback queue
190  // while it is being executed. Mark it for removal and let it be cleaned up later.
191  boost::mutex::scoped_lock lock(mutex_);
192  for (D_CallbackInfo::iterator it = callbacks_.begin(); it != callbacks_.end(); it++)
193  {
194  CallbackInfo& info = *it;
195  if (info.removal_id == removal_id)
196  {
197  info.marked_for_removal = true;
198  }
199  }
200  }
201  }
202 
203  if (tls_->calling_in_this_thread == id_info->id)
204  {
205  id_info->calling_rw_mutex.lock_shared();
206  }
207  }
208 
209  // If we're being called from within a callback, we need to remove the callbacks that match the id that have already been
210  // popped off the queue
211  {
212  D_CallbackInfo::iterator it = tls_->callbacks.begin();
213  D_CallbackInfo::iterator end = tls_->callbacks.end();
214  for (; it != end; ++it)
215  {
216  CallbackInfo& info = *it;
217  if (info.removal_id == removal_id)
218  {
219  info.marked_for_removal = true;
220  }
221  }
222  }
223 
224  {
225  boost::mutex::scoped_lock lock(id_info_mutex_);
226  id_info_.erase(removal_id);
227  }
228 }
229 
231 {
232  setupTLS();
233  TLS* tls = tls_.get();
234 
235  CallbackInfo cb_info;
236 
237  {
238  boost::mutex::scoped_lock lock(mutex_);
239 
240  if (!enabled_)
241  {
242  return Disabled;
243  }
244 
245  if (callbacks_.empty())
246  {
247  if (!timeout.isZero())
248  {
249  condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
250  }
251 
252  if (callbacks_.empty())
253  {
254  return Empty;
255  }
256 
257  if (!enabled_)
258  {
259  return Disabled;
260  }
261  }
262 
263  D_CallbackInfo::iterator it = callbacks_.begin();
264  for (; it != callbacks_.end();)
265  {
266  CallbackInfo& info = *it;
267 
268  if (info.marked_for_removal)
269  {
270  it = callbacks_.erase(it);
271  continue;
272  }
273 
274  if (info.callback->ready())
275  {
276  cb_info = info;
277  it = callbacks_.erase(it);
278  break;
279  }
280 
281  ++it;
282  }
283 
284  if (!cb_info.callback)
285  {
286  return TryAgain;
287  }
288 
289  ++calling_;
290  }
291 
292  bool was_empty = tls->callbacks.empty();
293  tls->callbacks.push_back(cb_info);
294  if (was_empty)
295  {
296  tls->cb_it = tls->callbacks.begin();
297  }
298 
299  CallOneResult res = callOneCB(tls);
300  if (res != Empty)
301  {
302  boost::mutex::scoped_lock lock(mutex_);
303  --calling_;
304  }
305  return res;
306 }
307 
309 {
310  setupTLS();
311  TLS* tls = tls_.get();
312 
313  {
314  boost::mutex::scoped_lock lock(mutex_);
315 
316  if (!enabled_)
317  {
318  return;
319  }
320 
321  if (callbacks_.empty())
322  {
323  if (!timeout.isZero())
324  {
325  condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
326  }
327 
328  if (callbacks_.empty() || !enabled_)
329  {
330  return;
331  }
332  }
333 
334  bool was_empty = tls->callbacks.empty();
335 
336  tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
337  callbacks_.clear();
338 
339  calling_ += tls->callbacks.size();
340 
341  if (was_empty)
342  {
343  tls->cb_it = tls->callbacks.begin();
344  }
345  }
346 
347  size_t called = 0;
348 
349  while (!tls->callbacks.empty())
350  {
351  if (callOneCB(tls) != Empty)
352  {
353  ++called;
354  }
355  }
356 
357  {
358  boost::mutex::scoped_lock lock(mutex_);
359  calling_ -= called;
360  }
361 }
362 
364 {
365  // Check for a recursive call. If recursive, increment the current iterator. Otherwise
366  // set the iterator it the beginning of the thread-local callbacks
367  if (tls->calling_in_this_thread == 0xffffffffffffffffULL)
368  {
369  tls->cb_it = tls->callbacks.begin();
370  }
371 
372  if (tls->cb_it == tls->callbacks.end())
373  {
374  return Empty;
375  }
376 
377  ROS_ASSERT(!tls->callbacks.empty());
378  ROS_ASSERT(tls->cb_it != tls->callbacks.end());
379 
380  CallbackInfo info = *tls->cb_it;
381  CallbackInterfacePtr& cb = info.callback;
382 
383  IDInfoPtr id_info = getIDInfo(info.removal_id);
384  if (id_info)
385  {
386  boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
387 
388  uint64_t last_calling = tls->calling_in_this_thread;
389  tls->calling_in_this_thread = id_info->id;
390 
392 
393  {
394  // Ensure that thread id gets restored, even if callback throws.
395  // This is done with RAII rather than try-catch so that the source
396  // of the original exception is not masked in a crash report.
397  BOOST_SCOPE_EXIT(&tls, &last_calling)
398  {
399  tls->calling_in_this_thread = last_calling;
400  }
401  BOOST_SCOPE_EXIT_END
402 
403  if (info.marked_for_removal)
404  {
405  tls->cb_it = tls->callbacks.erase(tls->cb_it);
406  }
407  else
408  {
409  tls->cb_it = tls->callbacks.erase(tls->cb_it);
410  result = cb->call();
411  }
412  }
413 
414  // Push TryAgain callbacks to the back of the shared queue
415  if (result == CallbackInterface::TryAgain && !info.marked_for_removal)
416  {
417  boost::mutex::scoped_lock lock(mutex_);
418  callbacks_.push_back(info);
419 
420  return TryAgain;
421  }
422 
423  return Called;
424  }
425  else
426  {
427  tls->cb_it = tls->callbacks.erase(tls->cb_it);
428  }
429 
430  return Called;
431 }
432 
433 }
ros::CallbackInterface::Invalid
@ Invalid
Call no longer valid.
Definition: callback_queue_interface.h:58
ros::CallbackQueue::callOneCB
CallOneResult callOneCB(TLS *tls)
Definition: callback_queue.cpp:363
ros::CallbackQueue::Disabled
@ Disabled
Definition: callback_queue.h:70
ros::CallbackQueue::TLS::cb_it
D_CallbackInfo::iterator cb_it
Definition: callback_queue.h:179
ros::CallbackQueue::~CallbackQueue
virtual ~CallbackQueue()
Definition: callback_queue.cpp:48
ros::CallbackQueue::calling_
size_t calling_
Definition: callback_queue.h:164
ros::CallbackQueue::TLS
Definition: callback_queue.h:171
boost::shared_ptr< CallbackInterface >
ros::CallbackQueue::enabled_
bool enabled_
Definition: callback_queue.h:183
ros::CallbackQueue::CallbackInfo::removal_id
uint64_t removal_id
Definition: callback_queue.h:158
ros::CallbackQueue::TryAgain
@ TryAgain
Definition: callback_queue.h:69
ros
ros::CallbackQueue::clear
void clear()
Removes all callbacks from the queue. Does not wait for calls currently in progress to finish.
Definition: callback_queue.cpp:69
ros::CallbackQueue::isEnabled
bool isEnabled()
Returns whether or not this queue is enabled.
Definition: callback_queue.cpp:83
ros::CallbackQueue::isEmpty
bool isEmpty()
returns whether or not the queue is empty
Definition: callback_queue.cpp:76
ros::CallbackQueue::callbacks_
D_CallbackInfo callbacks_
Definition: callback_queue.h:163
ros::CallbackQueue::CallbackQueue
CallbackQueue(bool enabled=true)
Definition: callback_queue.cpp:42
ros::CallbackQueue::addCallback
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
Add a callback, with an optional owner id. The owner id can be used to remove a set of callbacks from...
Definition: callback_queue.cpp:98
ros::CallbackQueue::CallOneResult
CallOneResult
Definition: callback_queue.h:66
ros::CallbackInterface::CallResult
CallResult
Possible results for the call() method.
Definition: callback_queue_interface.h:54
ros::CallbackQueue::CallbackInfo::callback
CallbackInterfacePtr callback
Definition: callback_queue.h:157
ros::CallbackQueue::CallbackInfo
Definition: callback_queue.h:151
ros::CallbackQueue::TLS::callbacks
D_CallbackInfo callbacks
Definition: callback_queue.h:178
ros::CallbackQueue::removeByID
virtual void removeByID(uint64_t removal_id)
Remove all callbacks associated with an owner id.
Definition: callback_queue.cpp:142
ros::CallbackQueue::setupTLS
void setupTLS()
Definition: callback_queue.cpp:90
ros::CallbackInterface::TryAgain
@ TryAgain
Call not ready, try again later.
Definition: callback_queue_interface.h:57
ros::CallbackQueue::condition_
boost::condition_variable condition_
Definition: callback_queue.h:166
ros::CallbackQueue::id_info_
M_IDInfo id_info_
Definition: callback_queue.h:169
DurationBase< WallDuration >::toNSec
int64_t toNSec() const
ros::CallbackQueue::enable
void enable()
Enable the queue (queue is enabled by default)
Definition: callback_queue.cpp:53
ros::CallbackQueue::id_info_mutex_
boost::mutex id_info_mutex_
Definition: callback_queue.h:168
ros::CallbackQueue::callOne
CallOneResult callOne()
Pop a single callback off the front of the queue and invoke it. If the callback was not ready to be c...
Definition: callback_queue.h:78
callback_queue.h
ros::WallDuration
ros::CallbackQueue::tls_
boost::thread_specific_ptr< TLS > tls_
Definition: callback_queue.h:181
ros::CallbackQueue::mutex_
boost::mutex mutex_
Definition: callback_queue.h:165
assert.h
ROS_ASSERT
#define ROS_ASSERT(cond)
ros::CallbackQueue::Empty
@ Empty
Definition: callback_queue.h:71
ros::CallbackQueue::callAvailable
void callAvailable()
Invoke all callbacks currently in the queue. If a callback was not ready to be called,...
Definition: callback_queue.h:96
ros::CallbackQueue::Called
@ Called
Definition: callback_queue.h:68
ros::CallbackQueue::getIDInfo
IDInfoPtr getIDInfo(uint64_t id)
Definition: callback_queue.cpp:130
ros::CallbackQueue::disable
void disable()
Disable the queue, meaning any calls to addCallback() will have no effect.
Definition: callback_queue.cpp:61
DurationBase< WallDuration >::isZero
bool isZero() const
ros::CallbackQueue::TLS::calling_in_this_thread
uint64_t calling_in_this_thread
Definition: callback_queue.h:177
ros::CallbackQueue::CallbackInfo::marked_for_removal
bool marked_for_removal
Definition: callback_queue.h:159
ros::CallbackQueue::IDInfoPtr
boost::shared_ptr< IDInfo > IDInfoPtr
Definition: callback_queue.h:146


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Wed Mar 2 2022 00:57:15