test_callback_queue.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 /* Author: Josh Faust */
31 
32 /*
33  * Test callback queue
34  */
35 
36 #include <gtest/gtest.h>
37 #include <ros/callback_queue.h>
38 #include <ros/console.h>
40 #include <ros/subscription_queue.h>
42 #include <ros/timer.h>
43 
44 #include <boost/atomic.hpp>
45 #include <boost/shared_ptr.hpp>
46 #include <boost/bind.hpp>
47 #include <boost/thread.hpp>
48 #include <boost/function.hpp>
49 
50 #include "fake_message.h"
51 
52 using namespace ros;
53 
55 {
56 public:
58  : count(0)
59  {}
60 
61  virtual CallResult call()
62  {
63  boost::mutex::scoped_lock lock(mutex);
64  ++count;
65 
66  return Success;
67  }
68 
69  boost::mutex mutex;
70  size_t count;
71 };
73 
74 TEST(CallbackQueue, singleCallback)
75 {
76  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
77  CallbackQueue queue;
78  queue.addCallback(cb, 1);
79  queue.callOne();
80 
81  EXPECT_EQ(cb->count, 1U);
82 
83  queue.addCallback(cb, 1);
84  queue.callAvailable();
85 
86  EXPECT_EQ(cb->count, 2U);
87 
88  queue.callOne();
89  EXPECT_EQ(cb->count, 2U);
90 
91  queue.callAvailable();
92  EXPECT_EQ(cb->count, 2U);
93 }
94 
95 TEST(CallbackQueue, multipleCallbacksCallAvailable)
96 {
97  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
98  CallbackQueue queue;
99  for (uint32_t i = 0; i < 1000; ++i)
100  {
101  queue.addCallback(cb, 1);
102  }
103 
104  queue.callAvailable();
105 
106  EXPECT_EQ(cb->count, 1000U);
107 }
108 
109 TEST(CallbackQueue, multipleCallbacksCallOne)
110 {
111  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
112  CallbackQueue queue;
113  for (uint32_t i = 0; i < 1000; ++i)
114  {
115  queue.addCallback(cb, 1);
116  }
117 
118  for (uint32_t i = 0; i < 1000; ++i)
119  {
120  queue.callOne();
121  EXPECT_EQ(cb->count, i + 1);
122  }
123 }
124 
126 {
127  CountingCallbackPtr cb1(boost::make_shared<CountingCallback>());
128  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
129  CallbackQueue queue;
130  queue.addCallback(cb1, 1);
131  queue.addCallback(cb2, 2);
132  queue.removeByID(1);
133  queue.callAvailable();
134 
135  EXPECT_EQ(cb1->count, 0U);
136  EXPECT_EQ(cb2->count, 1U);
137 }
138 
140 {
141 public:
142  SelfRemovingCallback(CallbackQueue* queue, uint64_t id)
143  : count(0)
144  , queue(queue)
145  , id(id)
146  {}
147 
148  virtual CallResult call()
149  {
150  ++count;
151 
152  queue->removeByID(id);
153 
154  return Success;
155  }
156 
157  size_t count;
158 
160  uint64_t id;
161 };
163 
164 TEST(CallbackQueue, removeSelf)
165 {
166  CallbackQueue queue;
167  SelfRemovingCallbackPtr cb1(boost::make_shared<SelfRemovingCallback>(&queue, 1));
168  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
169  queue.addCallback(cb1, 1);
170  queue.addCallback(cb2, 1);
171  queue.addCallback(cb2, 1);
172 
173  queue.callOne();
174 
175  queue.addCallback(cb2, 1);
176 
177  queue.callAvailable();
178 
179  EXPECT_EQ(cb1->count, 1U);
180  EXPECT_EQ(cb2->count, 1U);
181 }
182 
184 {
185 public:
186  RecursiveCallback(CallbackQueue* queue, bool use_available)
187  : count(0)
188  , queue(queue)
189  , use_available(use_available)
190  {}
191 
192  virtual CallResult call()
193  {
194  ++count;
195 
196  if (count < 3)
197  {
198  if (use_available)
199  {
200  queue->callAvailable();
201  }
202  else
203  {
204  queue->callOne();
205  }
206  }
207 
208  return Success;
209  }
210 
211  size_t count;
212 
215 };
217 
218 TEST(CallbackQueue, recursive1)
219 {
220  CallbackQueue queue;
221  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, true));
222  queue.addCallback(cb, 1);
223  queue.addCallback(cb, 1);
224  queue.addCallback(cb, 1);
225  queue.callAvailable();
226 
227  EXPECT_EQ(cb->count, 3U);
228 }
229 
230 TEST(CallbackQueue, recursive2)
231 {
232  CallbackQueue queue;
233  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, false));
234  queue.addCallback(cb, 1);
235  queue.addCallback(cb, 1);
236  queue.addCallback(cb, 1);
237  queue.callOne();
238 
239  EXPECT_EQ(cb->count, 3U);
240 }
241 
242 TEST(CallbackQueue, recursive3)
243 {
244  CallbackQueue queue;
245  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, false));
246  queue.addCallback(cb, 1);
247  queue.addCallback(cb, 1);
248  queue.addCallback(cb, 1);
249  queue.callAvailable();
250 
251  EXPECT_EQ(cb->count, 3U);
252 }
253 
254 TEST(CallbackQueue, recursive4)
255 {
256  CallbackQueue queue;
257  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, true));
258  queue.addCallback(cb, 1);
259  queue.addCallback(cb, 1);
260  queue.addCallback(cb, 1);
261  queue.callOne();
262 
263  EXPECT_EQ(cb->count, 3U);
264 }
265 
267  CallbackQueue* queue, bool& done, boost::atomic<size_t>* num_calls,
268  ros::WallDuration call_timeout = ros::WallDuration(0.1))
269 {
270  size_t i = 0;
271  while (!done)
272  {
273  queue->callAvailable(call_timeout);
274  ++i;
275  }
276 
277  if (num_calls)
278  {
279  num_calls->fetch_add(i);
280  }
281 }
282 
284  const CallbackInterfacePtr& cb,
285  const boost::function<void(CallbackQueue*, bool&, boost::atomic<size_t>*, ros::WallDuration)>& threadFunc,
286  size_t* num_calls = NULL, size_t num_threads = 10,
287  ros::WallDuration duration = ros::WallDuration(5),
288  ros::WallDuration pause_between_callbacks = ros::WallDuration(0),
289  ros::WallDuration call_one_timeout = ros::WallDuration(0.1))
290 {
291  CallbackQueue queue;
292  boost::thread_group tg;
293  bool done = false;
294  boost::atomic<size_t> calls(0);
295 
296  for (uint32_t i = 0; i < num_threads; ++i)
297  {
298  tg.create_thread(boost::bind(threadFunc, &queue, boost::ref(done), &calls, call_one_timeout));
299  }
300 
302  size_t i = 0;
303  while (ros::WallTime::now() - start < duration)
304  {
305  queue.addCallback(cb);
306  ++i;
307  if (!pause_between_callbacks.isZero())
308  {
309  pause_between_callbacks.sleep();
310  }
311  }
312 
313  while (!queue.isEmpty())
314  {
315  ros::WallDuration(0.01).sleep();
316  }
317 
318  done = true;
319  tg.join_all();
320 
321  if (num_calls)
322  *num_calls = calls;
323 
324  return i;
325 }
326 
327 TEST(CallbackQueue, threadedCallAvailable)
328 {
329  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
330  size_t i = runThreadedTest(cb, callAvailableThread);
331  ROS_INFO_STREAM(i);
332  EXPECT_EQ(cb->count, i);
333 }
334 
336  CallbackQueue* queue, bool& done, boost::atomic<size_t>* num_calls,
337  ros::WallDuration timeout = ros::WallDuration(0.1))
338 {
339  size_t i = 0;
340  while (!done)
341  {
342  queue->callOne(timeout);
343  ++i;
344  }
345 
346  if (num_calls)
347  {
348  num_calls->fetch_add(i);
349  }
350 }
351 
352 TEST(CallbackQueue, threadedCallOne)
353 {
354  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
355  size_t i = runThreadedTest(cb, callOneThread);
356  ROS_INFO_STREAM(i);
357  EXPECT_EQ(cb->count, i);
358 }
359 
361 {
362 public:
364  const std::string& topic, int32_t queue_size,
365  bool allow_concurrent_callbacks)
366  : SubscriptionQueue(topic, queue_size, allow_concurrent_callbacks),
367  ready_count(0)
368  {}
369 
370  virtual bool ready()
371  {
372  ready_count.fetch_add(1);
373  return SubscriptionQueue::ready();
374  }
375 
376  boost::atomic<size_t> ready_count;
377 };
379 
381  ros::WallDuration callback_duration; // long-lasting callback
382  size_t num_threads;
386 };
387 
388 class CallbackQueueParamTest : public ::testing::TestWithParam<ThreadedCallOneSlowParams>
389 {};
390 
391 TEST_P(CallbackQueueParamTest, threadedCallOneSlow)
392 {
393  // test for https://github.com/ros/ros_comm/issues/1545
394  // "roscpp multithreaded spinners eat up CPU when callbacks take too long"
395 
396  const ThreadedCallOneSlowParams param = GetParam();
397  const WallDuration& callback_duration = param.callback_duration; // long-lasting callback
398  const size_t num_threads = param.num_threads;
399  const ros::WallDuration call_one_timeout = param.call_one_timeout;
400  const ros::WallDuration test_duration = param.test_duration;
401  const ros::WallDuration pause_between_callbacks = param.pause_between_callbacks;
402  // queue_size is chosen such that it is larger than the max number of callbacks we
403  // are really able to process in 5 secs (since allow_concurrent_callbacks is false,
404  // it is equal to the number of seconds the queue is running)
405  const size_t queue_size = static_cast<size_t>(test_duration.toSec()) + 1;
406 
407  // create a subscription queue counting the number of ready() calls
408  const CountingSubscriptionQueuePtr cb(
409  boost::make_shared<CountingSubscriptionQueue>("test", queue_size, false));
410 
411  // create a slow subscription callback (takes 1 second to complete)
412  const FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
413  helper->cb_ = boost::bind(&ros::WallDuration::sleep, callback_duration);
414  const MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(
416 
417  // fill the subscription queue to get max performance
418  for (size_t i = 0; i < queue_size; ++i)
419  {
420  cb->push(helper, des, false, VoidConstWPtr(), true);
421  }
422 
423  // keep filling the callback queue at maximum speed for 5 seconds and
424  // spin up 10 processing threads until the queue is empty
425  size_t num_call_one_calls = 0;
426  const size_t num_callbacks_to_call = runThreadedTest(
427  cb, callOneThread, &num_call_one_calls,
428  num_threads, test_duration, pause_between_callbacks, call_one_timeout);
429 
430  const uint32_t num_callbacks_called = helper->calls_;
431  const size_t num_ready_called = cb->ready_count;
432 
433  // what should happen: even though we have multiple processing threads,
434  // the subscription queue has a per-topic lock which prevents multiple threads from
435  // processing the topic's callbacks simultaneously; so even though there were
436  // tens of thousands of callbacks in the callback queue, we only got time to process
437  // less than 5 of them because queue_size is quite limited (3), so most callbacks
438  // get thrown away during processing of the slow callbacks
439 
440  // we test the number of SubscriptionQueue::ready() calls to see how often do the
441  // idle threads ask for more work; with bug 1545 unfixed, this gets to millions of
442  // calls which acts as a busy-wait; with the bug fixed, the number should not be
443  // higher than number of callbacks (~ 80k), since each newly added callback should
444  // wake one idle thread and let it ask for work
445 
446  ROS_INFO_STREAM("Callback queue processed " <<
447  num_callbacks_called << " callbacks out of " << num_callbacks_to_call);
448  ROS_INFO_STREAM("callOne() was called " << num_call_one_calls << " times.");
449  ROS_INFO_STREAM("ready() was called " << num_ready_called << " times.");
450 
451  EXPECT_EQ(num_callbacks_called, queue_size);
452  EXPECT_LE(num_call_one_calls, 2 * num_callbacks_to_call + num_threads * (1/call_one_timeout.toSec()) * queue_size);
453 }
454 
455 INSTANTIATE_TEST_CASE_P(slow, CallbackQueueParamTest, ::testing::Values(
456  //ThreadedCallOneSlowParams{callback_duration, num_threads, call_one_timeout, test_duration, pause_between_callbacks}
464  ));
465 
466 // this class is just an ugly hack
467 // to access the constructor Timer(TimerOptions)
468 namespace ros
469 {
470 class NodeHandle
471 {
472 public:
473  static Timer createTimer(const TimerOptions& ops)
474  {
475  return Timer(ops);
476  }
477 };
478 }
479 
481 {
482 }
483 
485 
487 {
488  // wait until the timer is TimerRecreationCallback is garbaged
489  WallDuration(2).sleep();
490 
491  TimerOptions ops(Duration(0.1), dummyTimer, recursiveTimerQueue, false, false);
493  t.start();
494 }
495 
497 {
498 public:
500  : queue(_queue)
501  {}
502 
503  virtual CallResult call()
504  {
505  TimerOptions ops(Duration(0.1), recursiveTimer, queue, false, false);
507  t.start();
508 
509  // wait until the recursiveTimer has been fired
510  WallDuration(1).sleep();
511 
512  return Success;
513  }
514 
516 };
518 
520 {
521  // ensure that the test does not dead-lock, see #3867
522  ros::Time::init();
523  CallbackQueue queue;
524  recursiveTimerQueue = &queue;
525  TimerRecursionCallbackPtr cb(boost::make_shared<TimerRecursionCallback>(&queue));
526  queue.addCallback(cb, 1);
527 
528  boost::thread_group tg;
529  bool done = false;
530  boost::atomic<size_t> calls(0);
531 
532  for (uint32_t i = 0; i < 2; ++i)
533  {
534  tg.create_thread(boost::bind(callOneThread, &queue, boost::ref(done), &calls, ros::WallDuration(0.1)));
535  }
536 
537  while (!queue.isEmpty())
538  {
539  ros::WallDuration(0.01).sleep();
540  }
541 
542  done = true;
543  tg.join_all();
544 }
545 
547 {
548 public:
550  : id(0), queue(_queue) {
551  condition_sync.store(true);
552  condition_one.store(false);
553  condition_stop.store(false);
554  }
555 
556  void add();
557 
558  unsigned long id;
560  boost::atomic<bool> condition_one;
561  boost::atomic<bool> condition_sync;
562  boost::atomic<bool> condition_stop;
563 };
564 
566 {
567 public:
568  RaceConditionCallback(ConditionObject * _condition_object, unsigned long * _id)
569  : condition_object(_condition_object), id(_id)
570  {}
571 
572  virtual CallResult call()
573  {
574  condition_object->condition_one.store(false);
575  return Success;
576  }
577 
579  unsigned long * id;
580 };
581 
583 {
584  while(!condition_stop.load())
585  {
586  if(condition_sync.load() && queue->isEmpty())
587  {
588  condition_one.store(true);
589  id++;
590  queue->addCallback(boost::make_shared<RaceConditionCallback>(this, &id), id);
591  }
592  boost::this_thread::sleep(boost::posix_time::microseconds(1));
593  }
594 }
595 
596 TEST(CallbackQueue, raceConditionCallback)
597 {
598  CallbackQueue queue;
599  ConditionObject condition_object(&queue);
600 
601  boost::thread t(boost::bind(&ConditionObject::add, &condition_object));
602  for(unsigned int i = 0; i < 1000000; ++i)
603  {
604  condition_object.condition_sync.store(false);
605  if (queue.callOne() == CallbackQueue::Called)
606  {
607  if(condition_object.condition_one.load())
608  {
609  condition_object.condition_stop.store(true);
610  ASSERT_FALSE(condition_object.condition_one.load());
611  }
612  }
613 
614  queue.clear();
615  condition_object.condition_sync.store(true);
616  }
617  condition_object.condition_stop.store(true);
618  t.join();
619 }
620 
621 int main(int argc, char** argv)
622 {
623  testing::InitGoogleTest(&argc, argv);
624  return RUN_ALL_TESTS();
625 }
626 
627 
628 
boost::shared_ptr< TimerRecursionCallback > TimerRecursionCallbackPtr
int main(int argc, char **argv)
virtual CallResult call()
bool param(const std::string &param_name, T &param_val, const T &default_val)
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
ROSCPP_DECL void start()
void dummyTimer(const ros::TimerEvent &)
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
boost::shared_ptr< CountingCallback > CountingCallbackPtr
boost::atomic< size_t > ready_count
virtual CallResult call()
ConditionObject(CallbackQueue *_queue)
void start()
virtual CallResult call()
CallOneResult callOne()
boost::atomic< bool > condition_stop
virtual CallResult call()
RecursiveCallback(CallbackQueue *queue, bool use_available)
ros::WallDuration pause_between_callbacks
boost::shared_ptr< RecursiveCallback > RecursiveCallbackPtr
bool add(test_roscpp::TestStringString::Request &, test_roscpp::TestStringString::Response &)
boost::atomic< bool > condition_one
CallbackQueueInterface * recursiveTimerQueue
TEST_P(CallbackQueueParamTest, threadedCallOneSlow)
ConditionObject * condition_object
CallbackQueueInterface * queue
static void init()
boost::weak_ptr< void const > VoidConstWPtr
void threadFunc(boost::barrier *b)
bool sleep() const
virtual void removeByID(uint64_t removal_id)
INSTANTIATE_TEST_CASE_P(slow, CallbackQueueParamTest, ::testing::Values(ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.1)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.001)}, ThreadedCallOneSlowParams{ros::WallDuration(0.1), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(0.001), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 1, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 2, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}))
TEST(CallbackQueue, singleCallback)
void recursiveTimer(const ros::TimerEvent &)
virtual bool ready()
#define ROS_INFO_STREAM(args)
static WallTime now()
void callAvailableThread(CallbackQueue *queue, bool &done, boost::atomic< size_t > *num_calls, ros::WallDuration call_timeout=ros::WallDuration(0.1))
boost::shared_ptr< SelfRemovingCallback > SelfRemovingCallbackPtr
CountingSubscriptionQueue(const std::string &topic, int32_t queue_size, bool allow_concurrent_callbacks)
size_t runThreadedTest(const CallbackInterfacePtr &cb, const boost::function< void(CallbackQueue *, bool &, boost::atomic< size_t > *, ros::WallDuration)> &threadFunc, size_t *num_calls=NULL, size_t num_threads=10, ros::WallDuration duration=ros::WallDuration(5), ros::WallDuration pause_between_callbacks=ros::WallDuration(0), ros::WallDuration call_one_timeout=ros::WallDuration(0.1))
CallbackQueue * queue
SelfRemovingCallback(CallbackQueue *queue, uint64_t id)
boost::shared_ptr< CountingSubscriptionQueue > CountingSubscriptionQueuePtr
virtual CallResult call()
void callOneThread(CallbackQueue *queue, bool &done, boost::atomic< size_t > *num_calls, ros::WallDuration timeout=ros::WallDuration(0.1))
TimerRecursionCallback(CallbackQueueInterface *_queue)
ros::WallTime t
boost::atomic< bool > condition_sync
RaceConditionCallback(ConditionObject *_condition_object, unsigned long *_id)


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:34:01