callback_queue.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2009, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/callback_queue.h"
36 #include "ros/assert.h"
37 #include <boost/scope_exit.hpp>
38 
39 namespace ros
40 {
41 
43 : calling_(0)
44 , enabled_(enabled)
45 {
46 }
47 
49 {
50  disable();
51 }
52 
54 {
55  boost::mutex::scoped_lock lock(mutex_);
56  enabled_ = true;
57 
58  condition_.notify_all();
59 }
60 
62 {
63  boost::mutex::scoped_lock lock(mutex_);
64  enabled_ = false;
65 
66  condition_.notify_all();
67 }
68 
70 {
71  boost::mutex::scoped_lock lock(mutex_);
72 
73  callbacks_.clear();
74 }
75 
77 {
78  boost::mutex::scoped_lock lock(mutex_);
79 
80  return callbacks_.empty() && calling_ == 0;
81 }
82 
84 {
85  boost::mutex::scoped_lock lock(mutex_);
86 
87  return enabled_;
88 }
89 
91 {
92  if (!tls_.get())
93  {
94  tls_.reset(new TLS);
95  }
96 }
97 
98 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id)
99 {
100  CallbackInfo info;
101  info.callback = callback;
102  info.removal_id = removal_id;
103 
104  {
105  boost::mutex::scoped_lock lock(id_info_mutex_);
106 
107  M_IDInfo::iterator it = id_info_.find(removal_id);
108  if (it == id_info_.end())
109  {
110  IDInfoPtr id_info(boost::make_shared<IDInfo>());
111  id_info->id = removal_id;
112  id_info_.insert(std::make_pair(removal_id, id_info));
113  }
114  }
115 
116  {
117  boost::mutex::scoped_lock lock(mutex_);
118 
119  if (!enabled_)
120  {
121  return;
122  }
123 
124  callbacks_.push_back(info);
125  }
126 
127  if (callback->ready())
128  {
129  condition_.notify_one();
130  }
131 }
132 
134 {
135  boost::mutex::scoped_lock lock(id_info_mutex_);
136  M_IDInfo::iterator it = id_info_.find(id);
137  if (it != id_info_.end())
138  {
139  return it->second;
140  }
141 
142  return IDInfoPtr();
143 }
144 
145 void CallbackQueue::removeByID(uint64_t removal_id)
146 {
147  setupTLS();
148 
149  {
150  IDInfoPtr id_info;
151  {
152  boost::mutex::scoped_lock lock(id_info_mutex_);
153  M_IDInfo::iterator it = id_info_.find(removal_id);
154  if (it != id_info_.end())
155  {
156  id_info = it->second;
157  }
158  else
159  {
160  return;
161  }
162  }
163 
164  // If we're being called from within a callback from our queue, we must unlock the shared lock we already own
165  // here so that we can take a unique lock. We'll re-lock it later.
166  if (tls_->calling_in_this_thread == id_info->id)
167  {
168  id_info->calling_rw_mutex.unlock_shared();
169  }
170 
171  {
172  boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
173  boost::mutex::scoped_lock lock(mutex_);
174  D_CallbackInfo::iterator it = callbacks_.begin();
175  for (; it != callbacks_.end();)
176  {
177  CallbackInfo& info = *it;
178  if (info.removal_id == removal_id)
179  {
180  it = callbacks_.erase(it);
181  }
182  else
183  {
184  ++it;
185  }
186  }
187  }
188 
189  if (tls_->calling_in_this_thread == id_info->id)
190  {
191  id_info->calling_rw_mutex.lock_shared();
192  }
193  }
194 
195  // If we're being called from within a callback, we need to remove the callbacks that match the id that have already been
196  // popped off the queue
197  {
198  D_CallbackInfo::iterator it = tls_->callbacks.begin();
199  D_CallbackInfo::iterator end = tls_->callbacks.end();
200  for (; it != end; ++it)
201  {
202  CallbackInfo& info = *it;
203  if (info.removal_id == removal_id)
204  {
205  info.marked_for_removal = true;
206  }
207  }
208  }
209 
210  {
211  boost::mutex::scoped_lock lock(id_info_mutex_);
212  id_info_.erase(removal_id);
213  }
214 }
215 
217 {
218  setupTLS();
219  TLS* tls = tls_.get();
220 
221  CallbackInfo cb_info;
222 
223  {
224  boost::mutex::scoped_lock lock(mutex_);
225 
226  if (!enabled_)
227  {
228  return Disabled;
229  }
230 
231  boost::chrono::steady_clock::time_point wait_until =
232  boost::chrono::steady_clock::now() + boost::chrono::nanoseconds(timeout.toNSec());
233  while (!cb_info.callback) {
234  D_CallbackInfo::iterator it = callbacks_.begin();
235  for (; it != callbacks_.end();)
236  {
237  CallbackInfo& info = *it;
238 
239  if (info.marked_for_removal)
240  {
241  it = callbacks_.erase(it);
242  continue;
243  }
244 
245  if (info.callback->ready())
246  {
247  cb_info = info;
248  it = callbacks_.erase(it);
249  break;
250  }
251 
252  ++it;
253  }
254 
255  // Found a ready callback?
256  if (cb_info.callback) {
257  break;
258  }
259 
260  boost::cv_status wait_status = boost::cv_status::timeout;
261  if (!timeout.isZero())
262  {
263  wait_status = condition_.wait_until(lock, wait_until);
264  }
265 
266  if (callbacks_.empty())
267  {
268  return Empty;
269  }
270 
271  if (!enabled_)
272  {
273  return Disabled;
274  }
275 
276  if (wait_status == boost::cv_status::timeout)
277  {
278  return TryAgain;
279  }
280  }
281 
282  ++calling_;
283  }
284 
285  bool was_empty = tls->callbacks.empty();
286  tls->callbacks.push_back(cb_info);
287  if (was_empty)
288  {
289  tls->cb_it = tls->callbacks.begin();
290  }
291 
292  CallOneResult res = callOneCB(tls);
293  if (res != Empty)
294  {
295  boost::mutex::scoped_lock lock(mutex_);
296  --calling_;
297  }
298  return res;
299 }
300 
302 {
303  setupTLS();
304  TLS* tls = tls_.get();
305 
306  {
307  boost::mutex::scoped_lock lock(mutex_);
308 
309  if (!enabled_)
310  {
311  return;
312  }
313 
314  if (callbacks_.empty())
315  {
316  if (!timeout.isZero())
317  {
318  condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
319  }
320 
321  if (callbacks_.empty() || !enabled_)
322  {
323  return;
324  }
325  }
326 
327  bool was_empty = tls->callbacks.empty();
328 
329  tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
330  callbacks_.clear();
331 
332  calling_ += tls->callbacks.size();
333 
334  if (was_empty)
335  {
336  tls->cb_it = tls->callbacks.begin();
337  }
338  }
339 
340  size_t called = 0;
341 
342  while (!tls->callbacks.empty())
343  {
344  if (callOneCB(tls) != Empty)
345  {
346  ++called;
347  }
348  }
349 
350  {
351  boost::mutex::scoped_lock lock(mutex_);
352  calling_ -= called;
353  }
354 }
355 
357 {
358  // Check for a recursive call. If recursive, increment the current iterator. Otherwise
359  // set the iterator it the beginning of the thread-local callbacks
360  if (tls->calling_in_this_thread == 0xffffffffffffffffULL)
361  {
362  tls->cb_it = tls->callbacks.begin();
363  }
364 
365  if (tls->cb_it == tls->callbacks.end())
366  {
367  return Empty;
368  }
369 
370  ROS_ASSERT(!tls->callbacks.empty());
371  ROS_ASSERT(tls->cb_it != tls->callbacks.end());
372 
373  CallbackInfo info = *tls->cb_it;
374  CallbackInterfacePtr& cb = info.callback;
375 
376  IDInfoPtr id_info = getIDInfo(info.removal_id);
377  if (id_info)
378  {
379  boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
380 
381  uint64_t last_calling = tls->calling_in_this_thread;
382  tls->calling_in_this_thread = id_info->id;
383 
385 
386  {
387  // Ensure that thread id gets restored, even if callback throws.
388  // This is done with RAII rather than try-catch so that the source
389  // of the original exception is not masked in a crash report.
390  BOOST_SCOPE_EXIT(&tls, &last_calling)
391  {
392  tls->calling_in_this_thread = last_calling;
393  }
394  BOOST_SCOPE_EXIT_END
395 
396  if (info.marked_for_removal)
397  {
398  tls->cb_it = tls->callbacks.erase(tls->cb_it);
399  }
400  else
401  {
402  tls->cb_it = tls->callbacks.erase(tls->cb_it);
403  result = cb->call();
404  if (result == CallbackInterface::Success)
405  {
406  condition_.notify_one();
407  }
408  }
409  }
410 
411  // Push TryAgain callbacks to the back of the shared queue
412  if (result == CallbackInterface::TryAgain && !info.marked_for_removal)
413  {
414  boost::mutex::scoped_lock lock(mutex_);
415  callbacks_.push_back(info);
416 
417  return TryAgain;
418  }
419 
420  return Called;
421  }
422  else
423  {
424  tls->cb_it = tls->callbacks.erase(tls->cb_it);
425  }
426 
427  return Called;
428 }
429 
430 }
boost::mutex mutex_
CallResult
Possible results for the call() method.
boost::thread_specific_ptr< TLS > tls_
D_CallbackInfo::iterator cb_it
void disable()
Disable the queue, meaning any calls to addCallback() will have no effect.
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
Add a callback, with an optional owner id. The owner id can be used to remove a set of callbacks from...
void enable()
Enable the queue (queue is enabled by default)
ros::internal::condition_variable_monotonic condition_
CallOneResult callOne()
Pop a single callback off the front of the queue and invoke it. If the callback was not ready to be c...
D_CallbackInfo callbacks_
boost::shared_ptr< IDInfo > IDInfoPtr
CallbackQueue(bool enabled=true)
void callAvailable()
Invoke all callbacks currently in the queue. If a callback was not ready to be called, pushes it back onto the queue.
CallOneResult callOneCB(TLS *tls)
bool isEnabled()
Returns whether or not this queue is enabled.
D_CallbackInfo callbacks
boost::mutex id_info_mutex_
virtual void removeByID(uint64_t removal_id)
Remove all callbacks associated with an owner id.
Call not ready, try again later.
IDInfoPtr getIDInfo(uint64_t id)
bool isEmpty()
returns whether or not the queue is empty
#define ROS_ASSERT(cond)
void clear()
Removes all callbacks from the queue. Does not wait for calls currently in progress to finish...


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Sat Aug 1 2020 05:36:29