test_callback_queue.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 /* Author: Josh Faust */
31 
32 /*
33  * Test callback queue
34  */
35 
36 #include <gtest/gtest.h>
37 #include <ros/callback_queue.h>
38 #include <ros/console.h>
40 #include <ros/subscription_queue.h>
42 #include <ros/timer.h>
43 
44 #include <boost/atomic.hpp>
45 #include <boost/shared_ptr.hpp>
46 #include <boost/bind/bind.hpp>
47 #include <boost/thread.hpp>
48 #include <boost/function.hpp>
49 
50 #include "fake_message.h"
51 
52 using namespace ros;
53 
55 {
56 public:
58  : count(0)
59  {}
60 
61  virtual CallResult call()
62  {
63  boost::mutex::scoped_lock lock(mutex);
64  ++count;
65 
66  return Success;
67  }
68 
69  boost::mutex mutex;
70  size_t count;
71 };
73 
75  CallbackQueue* queue, bool& done, boost::atomic<size_t>* num_calls,
76  ros::WallDuration call_timeout = ros::WallDuration(0.1))
77 {
78  size_t i = 0;
79  while (!done)
80  {
81  queue->callAvailable(call_timeout);
82  ++i;
83  }
84 
85  if (num_calls)
86  {
87  num_calls->fetch_add(i);
88  }
89 }
90 
91 
92 TEST(CallbackQueue, singleCallback)
93 {
94  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
95  CallbackQueue queue;
96  queue.addCallback(cb, 1);
97  queue.callOne();
98 
99  EXPECT_EQ(cb->count, 1U);
100 
101  queue.addCallback(cb, 1);
102  queue.callAvailable();
103 
104  EXPECT_EQ(cb->count, 2U);
105 
106  queue.callOne();
107  EXPECT_EQ(cb->count, 2U);
108 
109  queue.callAvailable();
110  EXPECT_EQ(cb->count, 2U);
111 }
112 
113 TEST(CallbackQueue, multipleCallbacksCallAvailable)
114 {
115  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
116  CallbackQueue queue;
117  for (uint32_t i = 0; i < 1000; ++i)
118  {
119  queue.addCallback(cb, 1);
120  }
121 
122  queue.callAvailable();
123 
124  EXPECT_EQ(cb->count, 1000U);
125 }
126 
127 TEST(CallbackQueue, multipleCallbacksCallOne)
128 {
129  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
130  CallbackQueue queue;
131  for (uint32_t i = 0; i < 1000; ++i)
132  {
133  queue.addCallback(cb, 1);
134  }
135 
136  for (uint32_t i = 0; i < 1000; ++i)
137  {
138  queue.callOne();
139  EXPECT_EQ(cb->count, i + 1);
140  }
141 }
142 
144 {
145  CountingCallbackPtr cb1(boost::make_shared<CountingCallback>());
146  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
147  CallbackQueue queue;
148  queue.addCallback(cb1, 1);
149  queue.addCallback(cb2, 2);
150  queue.removeByID(1);
151  queue.callAvailable();
152 
153  EXPECT_EQ(cb1->count, 0U);
154  EXPECT_EQ(cb2->count, 1U);
155 }
156 
158 {
159 public:
160  SelfRemovingCallback(CallbackQueue* queue, uint64_t id)
161  : count(0)
162  , queue(queue)
163  , id(id)
164  {}
165 
166  virtual CallResult call()
167  {
168  ++count;
169 
170  queue->removeByID(id);
171 
172  return Success;
173  }
174 
175  size_t count;
176 
178  uint64_t id;
179 };
181 
182 TEST(CallbackQueue, removeSelf)
183 {
184  CallbackQueue queue;
185  SelfRemovingCallbackPtr cb1(boost::make_shared<SelfRemovingCallback>(&queue, 1));
186  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
187  queue.addCallback(cb1, 1);
188  queue.addCallback(cb2, 1);
189  queue.addCallback(cb2, 1);
190 
191  queue.callOne();
192 
193  queue.addCallback(cb2, 1);
194 
195  queue.callAvailable();
196 
197  EXPECT_EQ(cb1->count, 1U);
198  EXPECT_EQ(cb2->count, 1U);
199 }
200 
202 {
203 public:
205  : count(0)
206  {}
207 
208  virtual CallResult call()
209  {
210  mutex_.lock();
211  ++count;
212 
213  return Success;
214  }
215 
216  boost::mutex mutex_;
217  size_t count;
218 };
220 
221 
222 // This test checks whether removing callbacks by an id doesn't block if one of those callback is being executed.
223 TEST(CallbackQueue, removeCallbackWhileExecuting)
224 {
225  const uint64_t owner_id = 1;
226  const uint64_t unrelated_id = 2;
227 
228  CallbackQueue queue;
229  BlockingCallbackPtr cb1(boost::make_shared<BlockingCallback>());
230  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
231  CountingCallbackPtr cb3(boost::make_shared<CountingCallback>());
232 
233  cb1->mutex_.lock(); // lock the mutex to ensure the blocking callback will stall processing of callback queue.
234 
235  queue.addCallback(cb1, owner_id); // Add the blocking callback.
236 
237  // Now, we need to serve the callback queue from another thread.
238  bool done = false;
239  boost::atomic<size_t> calls(0);
240  ros::WallDuration call_one_timeout(0.1);
241  boost::thread t = boost::thread(boost::bind(&callAvailableThread, &queue, boost::ref(done), &calls, call_one_timeout));
242 
243  ros::WallDuration(1.0).sleep(); // Callback 1 should be being served now.
244 
245  queue.addCallback(cb2, unrelated_id); // Add a second callback with different owner.
246  queue.addCallback(cb3, owner_id); // Add a third with same owner, this one should never get executed.
247 
248  // Now try to remove the callback that's being executed.
249  queue.removeByID(owner_id); // This should not block because cb1 is being served, it should prevent cb3 from running.
250 
251  ros::WallDuration(1.0).sleep();
252 
253  // The removeByID should not block, so now we can unblock the blocking callback.
254  cb1->mutex_.unlock(); // This allows processing of cb1 to continue.
255 
256  while (!queue.isEmpty()) // Wait until the queue is empty.
257  {
258  ros::WallDuration(0.01).sleep();
259  }
260 
261  // Properly shut down our callback serving thread.
262  done = true;
263  t.join();
264 
265  EXPECT_EQ(cb1->count, 1U);
266  EXPECT_EQ(cb2->count, 1U);
267  EXPECT_EQ(cb3->count, 0U);
268 
269  cb1->mutex_.unlock(); // Ensure the mutex is unlocked before destruction.
270 }
271 
273 {
274 public:
275  RecursiveCallback(CallbackQueue* queue, bool use_available)
276  : count(0)
277  , queue(queue)
278  , use_available(use_available)
279  {}
280 
281  virtual CallResult call()
282  {
283  ++count;
284 
285  if (count < 3)
286  {
287  if (use_available)
288  {
289  queue->callAvailable();
290  }
291  else
292  {
293  queue->callOne();
294  }
295  }
296 
297  return Success;
298  }
299 
300  size_t count;
301 
304 };
306 
307 TEST(CallbackQueue, recursive1)
308 {
309  CallbackQueue queue;
310  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, true));
311  queue.addCallback(cb, 1);
312  queue.addCallback(cb, 1);
313  queue.addCallback(cb, 1);
314  queue.callAvailable();
315 
316  EXPECT_EQ(cb->count, 3U);
317 }
318 
319 TEST(CallbackQueue, recursive2)
320 {
321  CallbackQueue queue;
322  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, false));
323  queue.addCallback(cb, 1);
324  queue.addCallback(cb, 1);
325  queue.addCallback(cb, 1);
326  queue.callOne();
327 
328  EXPECT_EQ(cb->count, 3U);
329 }
330 
331 TEST(CallbackQueue, recursive3)
332 {
333  CallbackQueue queue;
334  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, false));
335  queue.addCallback(cb, 1);
336  queue.addCallback(cb, 1);
337  queue.addCallback(cb, 1);
338  queue.callAvailable();
339 
340  EXPECT_EQ(cb->count, 3U);
341 }
342 
343 TEST(CallbackQueue, recursive4)
344 {
345  CallbackQueue queue;
346  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, true));
347  queue.addCallback(cb, 1);
348  queue.addCallback(cb, 1);
349  queue.addCallback(cb, 1);
350  queue.callOne();
351 
352  EXPECT_EQ(cb->count, 3U);
353 }
354 
356  const CallbackInterfacePtr& cb,
357  const boost::function<void(CallbackQueue*, bool&, boost::atomic<size_t>*, ros::WallDuration)>& threadFunc,
358  size_t* num_calls = NULL, size_t num_threads = 10,
359  ros::WallDuration duration = ros::WallDuration(5),
360  ros::WallDuration pause_between_callbacks = ros::WallDuration(0),
361  ros::WallDuration call_one_timeout = ros::WallDuration(0.1))
362 {
363  CallbackQueue queue;
364  boost::thread_group tg;
365  bool done = false;
366  boost::atomic<size_t> calls(0);
367 
368  for (uint32_t i = 0; i < num_threads; ++i)
369  {
370  tg.create_thread(boost::bind(threadFunc, &queue, boost::ref(done), &calls, call_one_timeout));
371  }
372 
374  size_t i = 0;
375  while (ros::WallTime::now() - start < duration)
376  {
377  queue.addCallback(cb);
378  ++i;
379  if (!pause_between_callbacks.isZero())
380  {
381  pause_between_callbacks.sleep();
382  }
383  }
384 
385  while (!queue.isEmpty())
386  {
387  ros::WallDuration(0.01).sleep();
388  }
389 
390  done = true;
391  tg.join_all();
392 
393  if (num_calls)
394  *num_calls = calls;
395 
396  return i;
397 }
398 
399 TEST(CallbackQueue, threadedCallAvailable)
400 {
401  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
402  size_t i = runThreadedTest(cb, callAvailableThread);
403  ROS_INFO_STREAM(i);
404  EXPECT_EQ(cb->count, i);
405 }
406 
408  CallbackQueue* queue, bool& done, boost::atomic<size_t>* num_calls,
409  ros::WallDuration timeout = ros::WallDuration(0.1))
410 {
411  size_t i = 0;
412  while (!done)
413  {
414  queue->callOne(timeout);
415  ++i;
416  }
417 
418  if (num_calls)
419  {
420  num_calls->fetch_add(i);
421  }
422 }
423 
424 TEST(CallbackQueue, threadedCallOne)
425 {
426  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
427  size_t i = runThreadedTest(cb, callOneThread);
428  ROS_INFO_STREAM(i);
429  EXPECT_EQ(cb->count, i);
430 }
431 
433 {
434 public:
436  const std::string& topic, int32_t queue_size,
437  bool allow_concurrent_callbacks)
438  : SubscriptionQueue(topic, queue_size, allow_concurrent_callbacks),
439  ready_count(0)
440  {}
441 
442  virtual bool ready()
443  {
444  ready_count.fetch_add(1);
445  return SubscriptionQueue::ready();
446  }
447 
448  boost::atomic<size_t> ready_count;
449 };
451 
453  ros::WallDuration callback_duration; // long-lasting callback
454  size_t num_threads;
458 };
459 
460 class CallbackQueueParamTest : public ::testing::TestWithParam<ThreadedCallOneSlowParams>
461 {};
462 
463 TEST_P(CallbackQueueParamTest, threadedCallOneSlow)
464 {
465  // test for https://github.com/ros/ros_comm/issues/1545
466  // "roscpp multithreaded spinners eat up CPU when callbacks take too long"
467 
468  const ThreadedCallOneSlowParams param = GetParam();
469  const WallDuration& callback_duration = param.callback_duration; // long-lasting callback
470  const size_t num_threads = param.num_threads;
471  const ros::WallDuration call_one_timeout = param.call_one_timeout;
472  const ros::WallDuration test_duration = param.test_duration;
473  const ros::WallDuration pause_between_callbacks = param.pause_between_callbacks;
474  // queue_size is chosen such that it is larger than the max number of callbacks we
475  // are really able to process in 5 secs (since allow_concurrent_callbacks is false,
476  // it is equal to the number of seconds the queue is running)
477  const size_t queue_size = static_cast<size_t>(test_duration.toSec()) + 1;
478 
479  // create a subscription queue counting the number of ready() calls
481  boost::make_shared<CountingSubscriptionQueue>("test", queue_size, false));
482 
483  // create a slow subscription callback (takes 1 second to complete)
484  const FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
485  helper->cb_ = boost::bind(&ros::WallDuration::sleep, callback_duration);
486  const MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(
488 
489  // fill the subscription queue to get max performance
490  for (size_t i = 0; i < queue_size; ++i)
491  {
492  cb->push(helper, des, false, VoidConstWPtr(), true);
493  }
494 
495  // keep filling the callback queue at maximum speed for 5 seconds and
496  // spin up 10 processing threads until the queue is empty
497  size_t num_call_one_calls = 0;
498  const size_t num_callbacks_to_call = runThreadedTest(
499  cb, callOneThread, &num_call_one_calls,
500  num_threads, test_duration, pause_between_callbacks, call_one_timeout);
501 
502  const uint32_t num_callbacks_called = helper->calls_;
503  const size_t num_ready_called = cb->ready_count;
504 
505  // what should happen: even though we have multiple processing threads,
506  // the subscription queue has a per-topic lock which prevents multiple threads from
507  // processing the topic's callbacks simultaneously; so even though there were
508  // tens of thousands of callbacks in the callback queue, we only got time to process
509  // less than 5 of them because queue_size is quite limited (3), so most callbacks
510  // get thrown away during processing of the slow callbacks
511 
512  // we test the number of SubscriptionQueue::ready() calls to see how often do the
513  // idle threads ask for more work; with bug 1545 unfixed, this gets to millions of
514  // calls which acts as a busy-wait; with the bug fixed, the number should not be
515  // higher than number of callbacks (~ 80k), since each newly added callback should
516  // wake one idle thread and let it ask for work
517 
518  ROS_INFO_STREAM("Callback queue processed " <<
519  num_callbacks_called << " callbacks out of " << num_callbacks_to_call);
520  ROS_INFO_STREAM("callOne() was called " << num_call_one_calls << " times.");
521  ROS_INFO_STREAM("ready() was called " << num_ready_called << " times.");
522 
523  EXPECT_EQ(num_callbacks_called, queue_size);
524  EXPECT_LE(num_call_one_calls, 2 * num_callbacks_to_call + num_threads * (1/call_one_timeout.toSec()) * queue_size);
525 }
526 
527 INSTANTIATE_TEST_CASE_P(slow, CallbackQueueParamTest, ::testing::Values(
528  //ThreadedCallOneSlowParams{callback_duration, num_threads, call_one_timeout, test_duration, pause_between_callbacks}
536  ));
537 
538 // this class is just an ugly hack
539 // to access the constructor Timer(TimerOptions)
540 namespace ros
541 {
542 class NodeHandle
543 {
544 public:
545  static Timer createTimer(const TimerOptions& ops)
546  {
547  return Timer(ops);
548  }
549 };
550 }
551 
553 {
554 }
555 
557 
559 {
560  // wait until the timer is TimerRecreationCallback is garbaged
561  WallDuration(2).sleep();
562 
563  TimerOptions ops(Duration(0.1), dummyTimer, recursiveTimerQueue, false, false);
565  t.start();
566 }
567 
569 {
570 public:
572  : queue(_queue)
573  {}
574 
575  virtual CallResult call()
576  {
577  TimerOptions ops(Duration(0.1), recursiveTimer, queue, false, false);
579  t.start();
580 
581  // wait until the recursiveTimer has been fired
582  WallDuration(1).sleep();
583 
584  return Success;
585  }
586 
588 };
590 
592 {
593  // ensure that the test does not dead-lock, see #3867
594  ros::Time::init();
595  CallbackQueue queue;
596  recursiveTimerQueue = &queue;
597  TimerRecursionCallbackPtr cb(boost::make_shared<TimerRecursionCallback>(&queue));
598  queue.addCallback(cb, 1);
599 
600  boost::thread_group tg;
601  bool done = false;
602  boost::atomic<size_t> calls(0);
603 
604  for (uint32_t i = 0; i < 2; ++i)
605  {
606  tg.create_thread(boost::bind(callOneThread, &queue, boost::ref(done), &calls, ros::WallDuration(0.1)));
607  }
608 
609  while (!queue.isEmpty())
610  {
611  ros::WallDuration(0.01).sleep();
612  }
613 
614  done = true;
615  tg.join_all();
616 }
617 
619 {
620 public:
622  : id(0), queue(_queue) {
623  condition_sync.store(true);
624  condition_one.store(false);
625  condition_stop.store(false);
626  }
627 
628  void add();
629 
630  unsigned long id;
632  boost::atomic<bool> condition_one;
633  boost::atomic<bool> condition_sync;
634  boost::atomic<bool> condition_stop;
635 };
636 
638 {
639 public:
640  RaceConditionCallback(ConditionObject * _condition_object, unsigned long * _id)
641  : condition_object(_condition_object), id(_id)
642  {}
643 
644  virtual CallResult call()
645  {
646  condition_object->condition_one.store(false);
647  return Success;
648  }
649 
651  unsigned long * id;
652 };
653 
655 {
656  while(!condition_stop.load())
657  {
658  if(condition_sync.load() && queue->isEmpty())
659  {
660  condition_one.store(true);
661  id++;
662  queue->addCallback(boost::make_shared<RaceConditionCallback>(this, &id), id);
663  }
664  boost::this_thread::sleep(boost::posix_time::microseconds(1));
665  }
666 }
667 
668 TEST(CallbackQueue, raceConditionCallback)
669 {
670  CallbackQueue queue;
671  ConditionObject condition_object(&queue);
672 
673  boost::thread t(boost::bind(&ConditionObject::add, &condition_object));
674  for(unsigned int i = 0; i < 1000000; ++i)
675  {
676  condition_object.condition_sync.store(false);
677  if (queue.callOne() == CallbackQueue::Called)
678  {
679  if(condition_object.condition_one.load())
680  {
681  condition_object.condition_stop.store(true);
682  ASSERT_FALSE(condition_object.condition_one.load());
683  }
684  }
685 
686  queue.clear();
687  condition_object.condition_sync.store(true);
688  }
689  condition_object.condition_stop.store(true);
690  t.join();
691 }
692 
693 int main(int argc, char** argv)
694 {
695  testing::InitGoogleTest(&argc, argv);
696  return RUN_ALL_TESTS();
697 }
RecursiveCallbackPtr
boost::shared_ptr< RecursiveCallback > RecursiveCallbackPtr
Definition: test_callback_queue.cpp:305
ConditionObject::queue
CallbackQueue * queue
Definition: test_callback_queue.cpp:631
t
ros::WallTime t
Definition: pointcloud_serdes.cpp:41
ros::WallDuration::sleep
bool sleep() const
ros::SerializedMessage
ThreadedCallOneSlowParams::call_one_timeout
ros::WallDuration call_one_timeout
Definition: test_callback_queue.cpp:455
TimerRecursionCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:575
runThreadedTest
size_t runThreadedTest(const CallbackInterfacePtr &cb, const boost::function< void(CallbackQueue *, bool &, boost::atomic< size_t > *, ros::WallDuration)> &threadFunc, size_t *num_calls=NULL, size_t num_threads=10, ros::WallDuration duration=ros::WallDuration(5), ros::WallDuration pause_between_callbacks=ros::WallDuration(0), ros::WallDuration call_one_timeout=ros::WallDuration(0.1))
Definition: test_callback_queue.cpp:355
TimerRecursionCallback::queue
CallbackQueueInterface * queue
Definition: test_callback_queue.cpp:587
boost::shared_ptr
BlockingCallback::mutex_
boost::mutex mutex_
Definition: test_callback_queue.cpp:216
ros::TimerOptions
CountingSubscriptionQueuePtr
boost::shared_ptr< CountingSubscriptionQueue > CountingSubscriptionQueuePtr
Definition: test_callback_queue.cpp:450
CallbackQueueParamTest
Definition: test_callback_queue.cpp:460
recursiveTimerQueue
CallbackQueueInterface * recursiveTimerQueue
Definition: test_callback_queue.cpp:556
INSTANTIATE_TEST_CASE_P
INSTANTIATE_TEST_CASE_P(slow, CallbackQueueParamTest, ::testing::Values(ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.1)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.001)}, ThreadedCallOneSlowParams{ros::WallDuration(0.1), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(0.001), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 1, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 2, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}))
ros
ConditionObject::condition_one
boost::atomic< bool > condition_one
Definition: test_callback_queue.cpp:632
ConditionObject::id
unsigned long id
Definition: test_callback_queue.cpp:630
ThreadedCallOneSlowParams::test_duration
ros::WallDuration test_duration
Definition: test_callback_queue.cpp:456
subscription_callback_helper.h
ros::CallbackQueue::clear
void clear()
SelfRemovingCallback::queue
CallbackQueue * queue
Definition: test_callback_queue.cpp:177
RecursiveCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:281
ros::CallbackQueue::isEmpty
bool isEmpty()
ThreadedCallOneSlowParams
Definition: test_callback_queue.cpp:452
CountingCallbackPtr
boost::shared_ptr< CountingCallback > CountingCallbackPtr
Definition: test_callback_queue.cpp:72
CountingSubscriptionQueue
Definition: test_callback_queue.cpp:432
TimerRecursionCallback
Definition: test_callback_queue.cpp:568
RaceConditionCallback::RaceConditionCallback
RaceConditionCallback(ConditionObject *_condition_object, unsigned long *_id)
Definition: test_callback_queue.cpp:640
subscription_queue.h
CountingSubscriptionQueue::ready
virtual bool ready()
Definition: test_callback_queue.cpp:442
ros::CallbackQueue::addCallback
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
ThreadedCallOneSlowParams::pause_between_callbacks
ros::WallDuration pause_between_callbacks
Definition: test_callback_queue.cpp:457
ros::CallbackInterface::CallResult
CallResult
ros::Timer
RecursiveCallback::use_available
bool use_available
Definition: test_callback_queue.cpp:303
RecursiveCallback
Definition: test_callback_queue.cpp:272
add
bool add(test_roscpp::TestStringString::Request &, test_roscpp::TestStringString::Response &)
Definition: service_callback_types.cpp:43
dummyTimer
void dummyTimer(const ros::TimerEvent &)
Definition: test_callback_queue.cpp:552
console.h
BlockingCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:208
BlockingCallback::BlockingCallback
BlockingCallback()
Definition: test_callback_queue.cpp:204
callOneThread
void callOneThread(CallbackQueue *queue, bool &done, boost::atomic< size_t > *num_calls, ros::WallDuration timeout=ros::WallDuration(0.1))
Definition: test_callback_queue.cpp:407
CountingCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:61
CountingCallback::CountingCallback
CountingCallback()
Definition: test_callback_queue.cpp:57
BlockingCallback::count
size_t count
Definition: test_callback_queue.cpp:217
ConditionObject::condition_stop
boost::atomic< bool > condition_stop
Definition: test_callback_queue.cpp:634
CountingSubscriptionQueue::ready_count
boost::atomic< size_t > ready_count
Definition: test_callback_queue.cpp:448
RecursiveCallback::count
size_t count
Definition: test_callback_queue.cpp:300
CountingSubscriptionQueue::CountingSubscriptionQueue
CountingSubscriptionQueue(const std::string &topic, int32_t queue_size, bool allow_concurrent_callbacks)
Definition: test_callback_queue.cpp:435
SelfRemovingCallbackPtr
boost::shared_ptr< SelfRemovingCallback > SelfRemovingCallbackPtr
Definition: test_callback_queue.cpp:180
RecursiveCallback::RecursiveCallback
RecursiveCallback(CallbackQueue *queue, bool use_available)
Definition: test_callback_queue.cpp:275
ConditionObject
Definition: test_callback_queue.cpp:618
ThreadedCallOneSlowParams::num_threads
size_t num_threads
Definition: test_callback_queue.cpp:454
TimerRecursionCallback::TimerRecursionCallback
TimerRecursionCallback(CallbackQueueInterface *_queue)
Definition: test_callback_queue.cpp:571
ros::WallTime::now
static WallTime now()
BlockingCallback
Definition: test_callback_queue.cpp:201
CountingCallback::count
size_t count
Definition: test_callback_queue.cpp:70
ros::CallbackQueue::removeByID
virtual void removeByID(uint64_t removal_id)
RecursiveCallback::queue
CallbackQueue * queue
Definition: test_callback_queue.cpp:302
fake_message.h
ros::NodeHandle
Definition: test_callback_queue.cpp:542
ros::CallbackInterface
ros::TimerEvent
ROS_INFO_STREAM
#define ROS_INFO_STREAM(args)
ros::WallTime
RaceConditionCallback::id
unsigned long * id
Definition: test_callback_queue.cpp:651
message_deserializer.h
timer.h
callback_queue.h
SelfRemovingCallback::count
size_t count
Definition: test_callback_queue.cpp:175
ConditionObject::add
void add()
Definition: test_callback_queue.cpp:654
ros::VoidConstWPtr
boost::weak_ptr< void const > VoidConstWPtr
callAvailableThread
void callAvailableThread(CallbackQueue *queue, bool &done, boost::atomic< size_t > *num_calls, ros::WallDuration call_timeout=ros::WallDuration(0.1))
Definition: test_callback_queue.cpp:74
ros::start
ROSCPP_DECL void start()
ros::Time::init
static void init()
ros::SubscriptionQueue::ready
virtual bool ready()
TEST
TEST(CallbackQueue, singleCallback)
Definition: test_callback_queue.cpp:92
recursiveTimer
void recursiveTimer(const ros::TimerEvent &)
Definition: test_callback_queue.cpp:558
ros::CallbackQueue
RaceConditionCallback::condition_object
ConditionObject * condition_object
Definition: test_callback_queue.cpp:650
TimerRecursionCallbackPtr
boost::shared_ptr< TimerRecursionCallback > TimerRecursionCallbackPtr
Definition: test_callback_queue.cpp:589
ThreadedCallOneSlowParams::callback_duration
ros::WallDuration callback_duration
Definition: test_callback_queue.cpp:453
main
int main(int argc, char **argv)
Definition: test_callback_queue.cpp:693
SelfRemovingCallback
Definition: test_callback_queue.cpp:157
ConditionObject::ConditionObject
ConditionObject(CallbackQueue *_queue)
Definition: test_callback_queue.cpp:621
ros::SubscriptionQueue
BlockingCallbackPtr
boost::shared_ptr< BlockingCallback > BlockingCallbackPtr
Definition: test_callback_queue.cpp:219
RaceConditionCallback
Definition: test_callback_queue.cpp:637
SelfRemovingCallback::id
uint64_t id
Definition: test_callback_queue.cpp:178
CountingCallback
Definition: test_callback_queue.cpp:54
SelfRemovingCallback::SelfRemovingCallback
SelfRemovingCallback(CallbackQueue *queue, uint64_t id)
Definition: test_callback_queue.cpp:160
param
T param(const std::string &param_name, const T &default_val)
ConditionObject::condition_sync
boost::atomic< bool > condition_sync
Definition: test_callback_queue.cpp:633
ros::CallbackQueue::callOne
CallOneResult callOne()
DurationBase< WallDuration >::toSec
double toSec() const
ros::WallDuration
ros::NodeHandle::createTimer
Timer createTimer(Duration period, const TimerCallback &callback, bool oneshot=false, bool autostart=true) const
ros::Duration
ros::CallbackQueue::callAvailable
void callAvailable()
ros::CallbackQueue::Called
Called
CountingCallback::mutex
boost::mutex mutex
Definition: test_callback_queue.cpp:69
RaceConditionCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:644
TEST_P
TEST_P(CallbackQueueParamTest, threadedCallOneSlow)
Definition: test_callback_queue.cpp:463
threadFunc
void threadFunc(boost::barrier *b)
ros::CallbackQueueInterface
SelfRemovingCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:166


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:30