callback_queue.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2009, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/callback_queue.h"
36 #include "ros/assert.h"
37 #include <boost/scope_exit.hpp>
38 
39 namespace ros
40 {
41 
43 : calling_(0)
44 , enabled_(enabled)
45 {
46 }
47 
49 {
50  disable();
51 }
52 
54 {
55  boost::mutex::scoped_lock lock(mutex_);
56  enabled_ = true;
57 
59 }
60 
62 {
63  boost::mutex::scoped_lock lock(mutex_);
64  enabled_ = false;
65 
67 }
68 
70 {
71  boost::mutex::scoped_lock lock(mutex_);
72 
73  callbacks_.clear();
74 }
75 
77 {
78  boost::mutex::scoped_lock lock(mutex_);
79 
80  return callbacks_.empty() && calling_ == 0;
81 }
82 
84 {
85  boost::mutex::scoped_lock lock(mutex_);
86 
87  return enabled_;
88 }
89 
91 {
92  if (!tls_.get())
93  {
94  tls_.reset(new TLS);
95  }
96 }
97 
98 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id)
99 {
100  CallbackInfo info;
101  info.callback = callback;
102  info.removal_id = removal_id;
103 
104  {
105  boost::mutex::scoped_lock lock(id_info_mutex_);
106 
107  M_IDInfo::iterator it = id_info_.find(removal_id);
108  if (it == id_info_.end())
109  {
110  IDInfoPtr id_info(boost::make_shared<IDInfo>());
111  id_info->id = removal_id;
112  id_info_.insert(std::make_pair(removal_id, id_info));
113  }
114  }
115 
116  {
117  boost::mutex::scoped_lock lock(mutex_);
118 
119  if (!enabled_)
120  {
121  return;
122  }
123 
124  callbacks_.push_back(info);
125  }
126 
128 }
129 
131 {
132  boost::mutex::scoped_lock lock(id_info_mutex_);
133  M_IDInfo::iterator it = id_info_.find(id);
134  if (it != id_info_.end())
135  {
136  return it->second;
137  }
138 
139  return IDInfoPtr();
140 }
141 
142 void CallbackQueue::removeByID(uint64_t removal_id)
143 {
144  setupTLS();
145 
146  {
147  IDInfoPtr id_info;
148  {
149  boost::mutex::scoped_lock lock(id_info_mutex_);
150  M_IDInfo::iterator it = id_info_.find(removal_id);
151  if (it != id_info_.end())
152  {
153  id_info = it->second;
154  }
155  else
156  {
157  return;
158  }
159  }
160 
161  // If we're being called from within a callback from our queue, we must unlock the shared lock we already own
162  // here so that we can take a unique lock. We'll re-lock it later.
163  if (tls_->calling_in_this_thread == id_info->id)
164  {
165  id_info->calling_rw_mutex.unlock_shared();
166  }
167 
168  {
169  boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
170  boost::mutex::scoped_lock lock(mutex_);
171  D_CallbackInfo::iterator it = callbacks_.begin();
172  for (; it != callbacks_.end();)
173  {
174  CallbackInfo& info = *it;
175  if (info.removal_id == removal_id)
176  {
177  it = callbacks_.erase(it);
178  }
179  else
180  {
181  ++it;
182  }
183  }
184  }
185 
186  if (tls_->calling_in_this_thread == id_info->id)
187  {
188  id_info->calling_rw_mutex.lock_shared();
189  }
190  }
191 
192  // If we're being called from within a callback, we need to remove the callbacks that match the id that have already been
193  // popped off the queue
194  {
195  D_CallbackInfo::iterator it = tls_->callbacks.begin();
196  D_CallbackInfo::iterator end = tls_->callbacks.end();
197  for (; it != end; ++it)
198  {
199  CallbackInfo& info = *it;
200  if (info.removal_id == removal_id)
201  {
202  info.marked_for_removal = true;
203  }
204  }
205  }
206 
207  {
208  boost::mutex::scoped_lock lock(id_info_mutex_);
209  id_info_.erase(removal_id);
210  }
211 }
212 
214 {
215  setupTLS();
216  TLS* tls = tls_.get();
217 
218  CallbackInfo cb_info;
219 
220  {
221  boost::mutex::scoped_lock lock(mutex_);
222 
223  if (!enabled_)
224  {
225  return Disabled;
226  }
227 
228  if (callbacks_.empty())
229  {
230  if (!timeout.isZero())
231  {
232  condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
233  }
234 
235  if (callbacks_.empty())
236  {
237  return Empty;
238  }
239 
240  if (!enabled_)
241  {
242  return Disabled;
243  }
244  }
245 
246  D_CallbackInfo::iterator it = callbacks_.begin();
247  for (; it != callbacks_.end();)
248  {
249  CallbackInfo& info = *it;
250 
251  if (info.marked_for_removal)
252  {
253  it = callbacks_.erase(it);
254  continue;
255  }
256 
257  if (info.callback->ready())
258  {
259  cb_info = info;
260  it = callbacks_.erase(it);
261  break;
262  }
263 
264  ++it;
265  }
266 
267  if (!cb_info.callback)
268  {
269  return TryAgain;
270  }
271 
272  ++calling_;
273  }
274 
275  bool was_empty = tls->callbacks.empty();
276  tls->callbacks.push_back(cb_info);
277  if (was_empty)
278  {
279  tls->cb_it = tls->callbacks.begin();
280  }
281 
282  CallOneResult res = callOneCB(tls);
283  if (res != Empty)
284  {
285  boost::mutex::scoped_lock lock(mutex_);
286  --calling_;
287  }
288  return res;
289 }
290 
292 {
293  setupTLS();
294  TLS* tls = tls_.get();
295 
296  {
297  boost::mutex::scoped_lock lock(mutex_);
298 
299  if (!enabled_)
300  {
301  return;
302  }
303 
304  if (callbacks_.empty())
305  {
306  if (!timeout.isZero())
307  {
308  condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
309  }
310 
311  if (callbacks_.empty() || !enabled_)
312  {
313  return;
314  }
315  }
316 
317  bool was_empty = tls->callbacks.empty();
318 
319  tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
320  callbacks_.clear();
321 
322  calling_ += tls->callbacks.size();
323 
324  if (was_empty)
325  {
326  tls->cb_it = tls->callbacks.begin();
327  }
328  }
329 
330  size_t called = 0;
331 
332  while (!tls->callbacks.empty())
333  {
334  if (callOneCB(tls) != Empty)
335  {
336  ++called;
337  }
338  }
339 
340  {
341  boost::mutex::scoped_lock lock(mutex_);
342  calling_ -= called;
343  }
344 }
345 
347 {
348  // Check for a recursive call. If recursive, increment the current iterator. Otherwise
349  // set the iterator it the beginning of the thread-local callbacks
350  if (tls->calling_in_this_thread == 0xffffffffffffffffULL)
351  {
352  tls->cb_it = tls->callbacks.begin();
353  }
354 
355  if (tls->cb_it == tls->callbacks.end())
356  {
357  return Empty;
358  }
359 
360  ROS_ASSERT(!tls->callbacks.empty());
361  ROS_ASSERT(tls->cb_it != tls->callbacks.end());
362 
363  CallbackInfo info = *tls->cb_it;
364  CallbackInterfacePtr& cb = info.callback;
365 
366  IDInfoPtr id_info = getIDInfo(info.removal_id);
367  if (id_info)
368  {
369  boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
370 
371  uint64_t last_calling = tls->calling_in_this_thread;
372  tls->calling_in_this_thread = id_info->id;
373 
375 
376  {
377  // Ensure that thread id gets restored, even if callback throws.
378  // This is done with RAII rather than try-catch so that the source
379  // of the original exception is not masked in a crash report.
380  BOOST_SCOPE_EXIT(&tls, &last_calling)
381  {
382  tls->calling_in_this_thread = last_calling;
383  }
384  BOOST_SCOPE_EXIT_END
385 
386  if (info.marked_for_removal)
387  {
388  tls->cb_it = tls->callbacks.erase(tls->cb_it);
389  }
390  else
391  {
392  tls->cb_it = tls->callbacks.erase(tls->cb_it);
393  result = cb->call();
394  }
395  }
396 
397  // Push TryAgain callbacks to the back of the shared queue
398  if (result == CallbackInterface::TryAgain && !info.marked_for_removal)
399  {
400  boost::mutex::scoped_lock lock(mutex_);
401  callbacks_.push_back(info);
402 
403  return TryAgain;
404  }
405 
406  return Called;
407  }
408  else
409  {
410  tls->cb_it = tls->callbacks.erase(tls->cb_it);
411  }
412 
413  return Called;
414 }
415 
416 }
boost::mutex mutex_
CallResult
Possible results for the call() method.
boost::thread_specific_ptr< TLS > tls_
D_CallbackInfo::iterator cb_it
void disable()
Disable the queue, meaning any calls to addCallback() will have no effect.
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
Add a callback, with an optional owner id. The owner id can be used to remove a set of callbacks from...
void enable()
Enable the queue (queue is enabled by default)
CallOneResult callOne()
Pop a single callback off the front of the queue and invoke it. If the callback was not ready to be c...
D_CallbackInfo callbacks_
boost::shared_ptr< IDInfo > IDInfoPtr
CallbackQueue(bool enabled=true)
void callAvailable()
Invoke all callbacks currently in the queue. If a callback was not ready to be called, pushes it back onto the queue.
CallOneResult callOneCB(TLS *tls)
bool isEnabled()
Returns whether or not this queue is enabled.
D_CallbackInfo callbacks
boost::mutex id_info_mutex_
virtual void removeByID(uint64_t removal_id)
Remove all callbacks associated with an owner id.
Call not ready, try again later.
IDInfoPtr getIDInfo(uint64_t id)
boost::condition_variable condition_
bool isEmpty()
returns whether or not the queue is empty
#define ROS_ASSERT(cond)
void clear()
Removes all callbacks from the queue. Does not wait for calls currently in progress to finish...


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Wed Dec 20 2017 03:58:41