test_callback_queue.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 /* Author: Josh Faust */
31 
32 /*
33  * Test callback queue
34  */
35 
36 #include <gtest/gtest.h>
37 #include <ros/callback_queue.h>
38 #include <ros/console.h>
39 #include <ros/timer.h>
40 
41 #include <boost/atomic.hpp>
42 #include <boost/shared_ptr.hpp>
43 #include <boost/bind.hpp>
44 #include <boost/thread.hpp>
45 #include <boost/function.hpp>
46 
47 using namespace ros;
48 
50 {
51 public:
53  : count(0)
54  {}
55 
56  virtual CallResult call()
57  {
58  boost::mutex::scoped_lock lock(mutex);
59  ++count;
60 
61  return Success;
62  }
63 
64  boost::mutex mutex;
65  size_t count;
66 };
68 
69 void callAvailableThread(CallbackQueue* queue, bool& done)
70 {
71  while (!done)
72  {
73  queue->callAvailable(ros::WallDuration(0.1));
74  }
75 }
76 
77 
78 TEST(CallbackQueue, singleCallback)
79 {
80  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
81  CallbackQueue queue;
82  queue.addCallback(cb, 1);
83  queue.callOne();
84 
85  EXPECT_EQ(cb->count, 1U);
86 
87  queue.addCallback(cb, 1);
88  queue.callAvailable();
89 
90  EXPECT_EQ(cb->count, 2U);
91 
92  queue.callOne();
93  EXPECT_EQ(cb->count, 2U);
94 
95  queue.callAvailable();
96  EXPECT_EQ(cb->count, 2U);
97 }
98 
99 TEST(CallbackQueue, multipleCallbacksCallAvailable)
100 {
101  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
102  CallbackQueue queue;
103  for (uint32_t i = 0; i < 1000; ++i)
104  {
105  queue.addCallback(cb, 1);
106  }
107 
108  queue.callAvailable();
109 
110  EXPECT_EQ(cb->count, 1000U);
111 }
112 
113 TEST(CallbackQueue, multipleCallbacksCallOne)
114 {
115  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
116  CallbackQueue queue;
117  for (uint32_t i = 0; i < 1000; ++i)
118  {
119  queue.addCallback(cb, 1);
120  }
121 
122  for (uint32_t i = 0; i < 1000; ++i)
123  {
124  queue.callOne();
125  EXPECT_EQ(cb->count, i + 1);
126  }
127 }
128 
130 {
131  CountingCallbackPtr cb1(boost::make_shared<CountingCallback>());
132  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
133  CallbackQueue queue;
134  queue.addCallback(cb1, 1);
135  queue.addCallback(cb2, 2);
136  queue.removeByID(1);
137  queue.callAvailable();
138 
139  EXPECT_EQ(cb1->count, 0U);
140  EXPECT_EQ(cb2->count, 1U);
141 }
142 
144 {
145 public:
146  SelfRemovingCallback(CallbackQueue* queue, uint64_t id)
147  : count(0)
148  , queue(queue)
149  , id(id)
150  {}
151 
152  virtual CallResult call()
153  {
154  ++count;
155 
156  queue->removeByID(id);
157 
158  return Success;
159  }
160 
161  size_t count;
162 
164  uint64_t id;
165 };
167 
168 TEST(CallbackQueue, removeSelf)
169 {
170  CallbackQueue queue;
171  SelfRemovingCallbackPtr cb1(boost::make_shared<SelfRemovingCallback>(&queue, 1));
172  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
173  queue.addCallback(cb1, 1);
174  queue.addCallback(cb2, 1);
175  queue.addCallback(cb2, 1);
176 
177  queue.callOne();
178 
179  queue.addCallback(cb2, 1);
180 
181  queue.callAvailable();
182 
183  EXPECT_EQ(cb1->count, 1U);
184  EXPECT_EQ(cb2->count, 1U);
185 }
186 
188 {
189 public:
191  : count(0)
192  {}
193 
194  virtual CallResult call()
195  {
196  mutex_.lock();
197  ++count;
198 
199  return Success;
200  }
201 
202  boost::mutex mutex_;
203  size_t count;
204 };
206 
207 
208 // This test checks whether removing callbacks by an id doesn't block if one of those callback is being executed.
209 TEST(CallbackQueue, removeCallbackWhileExecuting)
210 {
211  const uint64_t owner_id = 1;
212  const uint64_t unrelated_id = 2;
213 
214  CallbackQueue queue;
215  BlockingCallbackPtr cb1(boost::make_shared<BlockingCallback>());
216  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
217  CountingCallbackPtr cb3(boost::make_shared<CountingCallback>());
218 
219  cb1->mutex_.lock(); // lock the mutex to ensure the blocking callback will stall processing of callback queue.
220 
221  queue.addCallback(cb1, owner_id); // Add the blocking callback.
222 
223  // Now, we need to serve the callback queue from another thread.
224  bool done = false;
225  boost::thread t = boost::thread(boost::bind(&callAvailableThread, &queue, boost::ref(done)));
226 
227  ros::WallDuration(1.0).sleep(); // Callback 1 should be being served now.
228 
229  queue.addCallback(cb2, unrelated_id); // Add a second callback with different owner.
230  queue.addCallback(cb3, owner_id); // Add a third with same owner, this one should never get executed.
231 
232  // Now try to remove the callback that's being executed.
233  queue.removeByID(owner_id); // This should not block because cb1 is being served, it should prevent cb3 from running.
234 
235  ros::WallDuration(1.0).sleep();
236 
237  // The removeByID should not block, so now we can unblock the blocking callback.
238  cb1->mutex_.unlock(); // This allows processing of cb1 to continue.
239 
240  while (!queue.isEmpty()) // Wait until the queue is empty.
241  {
242  ros::WallDuration(0.01).sleep();
243  }
244 
245  // Properly shut down our callback serving thread.
246  done = true;
247  t.join();
248 
249  EXPECT_EQ(cb1->count, 1U);
250  EXPECT_EQ(cb2->count, 1U);
251  EXPECT_EQ(cb3->count, 0U);
252 
253  cb1->mutex_.unlock(); // Ensure the mutex is unlocked before destruction.
254 }
255 
257 {
258 public:
259  RecursiveCallback(CallbackQueue* queue, bool use_available)
260  : count(0)
261  , queue(queue)
262  , use_available(use_available)
263  {}
264 
265  virtual CallResult call()
266  {
267  ++count;
268 
269  if (count < 3)
270  {
271  if (use_available)
272  {
273  queue->callAvailable();
274  }
275  else
276  {
277  queue->callOne();
278  }
279  }
280 
281  return Success;
282  }
283 
284  size_t count;
285 
288 };
290 
291 TEST(CallbackQueue, recursive1)
292 {
293  CallbackQueue queue;
294  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, true));
295  queue.addCallback(cb, 1);
296  queue.addCallback(cb, 1);
297  queue.addCallback(cb, 1);
298  queue.callAvailable();
299 
300  EXPECT_EQ(cb->count, 3U);
301 }
302 
303 TEST(CallbackQueue, recursive2)
304 {
305  CallbackQueue queue;
306  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, false));
307  queue.addCallback(cb, 1);
308  queue.addCallback(cb, 1);
309  queue.addCallback(cb, 1);
310  queue.callOne();
311 
312  EXPECT_EQ(cb->count, 3U);
313 }
314 
315 TEST(CallbackQueue, recursive3)
316 {
317  CallbackQueue queue;
318  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, false));
319  queue.addCallback(cb, 1);
320  queue.addCallback(cb, 1);
321  queue.addCallback(cb, 1);
322  queue.callAvailable();
323 
324  EXPECT_EQ(cb->count, 3U);
325 }
326 
327 TEST(CallbackQueue, recursive4)
328 {
329  CallbackQueue queue;
330  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, true));
331  queue.addCallback(cb, 1);
332  queue.addCallback(cb, 1);
333  queue.addCallback(cb, 1);
334  queue.callOne();
335 
336  EXPECT_EQ(cb->count, 3U);
337 }
338 
339 size_t runThreadedTest(const CountingCallbackPtr& cb, const boost::function<void(CallbackQueue*, bool&)>& threadFunc)
340 {
341  CallbackQueue queue;
342  boost::thread_group tg;
343  bool done = false;
344 
345  for (uint32_t i = 0; i < 10; ++i)
346  {
347  tg.create_thread(boost::bind(threadFunc, &queue, boost::ref(done)));
348  }
349 
351  size_t i = 0;
353  {
354  queue.addCallback(cb);
355  ++i;
356  }
357 
358  while (!queue.isEmpty())
359  {
360  ros::WallDuration(0.01).sleep();
361  }
362 
363  done = true;
364  tg.join_all();
365 
366  return i;
367 }
368 
369 TEST(CallbackQueue, threadedCallAvailable)
370 {
371  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
372  size_t i = runThreadedTest(cb, callAvailableThread);
373  ROS_INFO_STREAM(i);
374  EXPECT_EQ(cb->count, i);
375 }
376 
377 void callOneThread(CallbackQueue* queue, bool& done)
378 {
379  while (!done)
380  {
381  queue->callOne(ros::WallDuration(0.1));
382  }
383 }
384 
385 TEST(CallbackQueue, threadedCallOne)
386 {
387  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
388  size_t i = runThreadedTest(cb, callOneThread);
389  ROS_INFO_STREAM(i);
390  EXPECT_EQ(cb->count, i);
391 }
392 
393 // this class is just an ugly hack
394 // to access the constructor Timer(TimerOptions)
395 namespace ros
396 {
397 class NodeHandle
398 {
399 public:
400  static Timer createTimer(const TimerOptions& ops)
401  {
402  return Timer(ops);
403  }
404 };
405 }
406 
408 {
409 }
410 
412 
414 {
415  // wait until the timer is TimerRecreationCallback is garbaged
416  WallDuration(2).sleep();
417 
418  TimerOptions ops(Duration(0.1), dummyTimer, recursiveTimerQueue, false, false);
420  t.start();
421 }
422 
424 {
425 public:
427  : queue(_queue)
428  {}
429 
430  virtual CallResult call()
431  {
432  TimerOptions ops(Duration(0.1), recursiveTimer, queue, false, false);
434  t.start();
435 
436  // wait until the recursiveTimer has been fired
437  WallDuration(1).sleep();
438 
439  return Success;
440  }
441 
443 };
445 
447 {
448  // ensure that the test does not dead-lock, see #3867
449  ros::Time::init();
450  CallbackQueue queue;
451  recursiveTimerQueue = &queue;
452  TimerRecursionCallbackPtr cb(boost::make_shared<TimerRecursionCallback>(&queue));
453  queue.addCallback(cb, 1);
454 
455  boost::thread_group tg;
456  bool done = false;
457 
458  for (uint32_t i = 0; i < 2; ++i)
459  {
460  tg.create_thread(boost::bind(callOneThread, &queue, boost::ref(done)));
461  }
462 
463  while (!queue.isEmpty())
464  {
465  ros::WallDuration(0.01).sleep();
466  }
467 
468  done = true;
469  tg.join_all();
470 }
471 
473 {
474 public:
476  : id(0), queue(_queue) {
477  condition_sync.store(true);
478  condition_one.store(false);
479  condition_stop.store(false);
480  }
481 
482  void add();
483 
484  unsigned long id;
486  boost::atomic<bool> condition_one;
487  boost::atomic<bool> condition_sync;
488  boost::atomic<bool> condition_stop;
489 };
490 
492 {
493 public:
494  RaceConditionCallback(ConditionObject * _condition_object, unsigned long * _id)
495  : condition_object(_condition_object), id(_id)
496  {}
497 
498  virtual CallResult call()
499  {
500  condition_object->condition_one.store(false);
501  return Success;
502  }
503 
505  unsigned long * id;
506 };
507 
509 {
510  while(!condition_stop.load())
511  {
512  if(condition_sync.load() && queue->isEmpty())
513  {
514  condition_one.store(true);
515  id++;
516  queue->addCallback(boost::make_shared<RaceConditionCallback>(this, &id), id);
517  }
518  boost::this_thread::sleep(boost::posix_time::microseconds(1));
519  }
520 }
521 
522 TEST(CallbackQueue, raceConditionCallback)
523 {
524  CallbackQueue queue;
525  ConditionObject condition_object(&queue);
526 
527  boost::thread t(boost::bind(&ConditionObject::add, &condition_object));
528  for(unsigned int i = 0; i < 1000000; ++i)
529  {
530  condition_object.condition_sync.store(false);
531  if (queue.callOne() == CallbackQueue::Called)
532  {
533  if(condition_object.condition_one.load())
534  {
535  condition_object.condition_stop.store(true);
536  ASSERT_FALSE(condition_object.condition_one.load());
537  }
538  }
539 
540  queue.clear();
541  condition_object.condition_sync.store(true);
542  }
543  condition_object.condition_stop.store(true);
544  t.join();
545 }
546 
547 int main(int argc, char** argv)
548 {
549  testing::InitGoogleTest(&argc, argv);
550  return RUN_ALL_TESTS();
551 }
552 
553 
554 
RecursiveCallbackPtr
boost::shared_ptr< RecursiveCallback > RecursiveCallbackPtr
Definition: test_callback_queue.cpp:289
ConditionObject::queue
CallbackQueue * queue
Definition: test_callback_queue.cpp:485
t
ros::WallTime t
Definition: pointcloud_serdes.cpp:41
ros::WallDuration::sleep
bool sleep() const
TimerRecursionCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:430
TimerRecursionCallback::queue
CallbackQueueInterface * queue
Definition: test_callback_queue.cpp:442
boost::shared_ptr
BlockingCallback::mutex_
boost::mutex mutex_
Definition: test_callback_queue.cpp:202
ros::TimerOptions
recursiveTimerQueue
CallbackQueueInterface * recursiveTimerQueue
Definition: test_callback_queue.cpp:411
ros
callOneThread
void callOneThread(CallbackQueue *queue, bool &done)
Definition: test_callback_queue.cpp:377
ConditionObject::condition_one
boost::atomic< bool > condition_one
Definition: test_callback_queue.cpp:486
ConditionObject::id
unsigned long id
Definition: test_callback_queue.cpp:484
ros::CallbackQueue::clear
void clear()
SelfRemovingCallback::queue
CallbackQueue * queue
Definition: test_callback_queue.cpp:163
RecursiveCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:265
ros::CallbackQueue::isEmpty
bool isEmpty()
CountingCallbackPtr
boost::shared_ptr< CountingCallback > CountingCallbackPtr
Definition: test_callback_queue.cpp:67
TimerRecursionCallback
Definition: test_callback_queue.cpp:423
RaceConditionCallback::RaceConditionCallback
RaceConditionCallback(ConditionObject *_condition_object, unsigned long *_id)
Definition: test_callback_queue.cpp:494
ros::CallbackQueue::addCallback
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
ros::CallbackInterface::CallResult
CallResult
ros::Timer
RecursiveCallback::use_available
bool use_available
Definition: test_callback_queue.cpp:287
RecursiveCallback
Definition: test_callback_queue.cpp:256
add
bool add(test_roscpp::TestStringString::Request &, test_roscpp::TestStringString::Response &)
Definition: service_callback_types.cpp:43
dummyTimer
void dummyTimer(const ros::TimerEvent &)
Definition: test_callback_queue.cpp:407
console.h
BlockingCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:194
BlockingCallback::BlockingCallback
BlockingCallback()
Definition: test_callback_queue.cpp:190
CountingCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:56
CountingCallback::CountingCallback
CountingCallback()
Definition: test_callback_queue.cpp:52
BlockingCallback::count
size_t count
Definition: test_callback_queue.cpp:203
ConditionObject::condition_stop
boost::atomic< bool > condition_stop
Definition: test_callback_queue.cpp:488
RecursiveCallback::count
size_t count
Definition: test_callback_queue.cpp:284
SelfRemovingCallbackPtr
boost::shared_ptr< SelfRemovingCallback > SelfRemovingCallbackPtr
Definition: test_callback_queue.cpp:166
RecursiveCallback::RecursiveCallback
RecursiveCallback(CallbackQueue *queue, bool use_available)
Definition: test_callback_queue.cpp:259
ConditionObject
Definition: test_callback_queue.cpp:472
TimerRecursionCallback::TimerRecursionCallback
TimerRecursionCallback(CallbackQueueInterface *_queue)
Definition: test_callback_queue.cpp:426
ros::WallTime::now
static WallTime now()
BlockingCallback
Definition: test_callback_queue.cpp:187
CountingCallback::count
size_t count
Definition: test_callback_queue.cpp:65
ros::CallbackQueue::removeByID
virtual void removeByID(uint64_t removal_id)
RecursiveCallback::queue
CallbackQueue * queue
Definition: test_callback_queue.cpp:286
ros::NodeHandle
Definition: test_callback_queue.cpp:397
ros::CallbackInterface
ros::TimerEvent
ROS_INFO_STREAM
#define ROS_INFO_STREAM(args)
ros::WallTime
RaceConditionCallback::id
unsigned long * id
Definition: test_callback_queue.cpp:505
timer.h
callback_queue.h
SelfRemovingCallback::count
size_t count
Definition: test_callback_queue.cpp:161
ConditionObject::add
void add()
Definition: test_callback_queue.cpp:508
runThreadedTest
size_t runThreadedTest(const CountingCallbackPtr &cb, const boost::function< void(CallbackQueue *, bool &)> &threadFunc)
Definition: test_callback_queue.cpp:339
ros::start
ROSCPP_DECL void start()
ros::Time::init
static void init()
TEST
TEST(CallbackQueue, singleCallback)
Definition: test_callback_queue.cpp:78
recursiveTimer
void recursiveTimer(const ros::TimerEvent &)
Definition: test_callback_queue.cpp:413
ros::CallbackQueue
RaceConditionCallback::condition_object
ConditionObject * condition_object
Definition: test_callback_queue.cpp:504
TimerRecursionCallbackPtr
boost::shared_ptr< TimerRecursionCallback > TimerRecursionCallbackPtr
Definition: test_callback_queue.cpp:444
main
int main(int argc, char **argv)
Definition: test_callback_queue.cpp:547
SelfRemovingCallback
Definition: test_callback_queue.cpp:143
ConditionObject::ConditionObject
ConditionObject(CallbackQueue *_queue)
Definition: test_callback_queue.cpp:475
BlockingCallbackPtr
boost::shared_ptr< BlockingCallback > BlockingCallbackPtr
Definition: test_callback_queue.cpp:205
RaceConditionCallback
Definition: test_callback_queue.cpp:491
SelfRemovingCallback::id
uint64_t id
Definition: test_callback_queue.cpp:164
CountingCallback
Definition: test_callback_queue.cpp:49
SelfRemovingCallback::SelfRemovingCallback
SelfRemovingCallback(CallbackQueue *queue, uint64_t id)
Definition: test_callback_queue.cpp:146
callAvailableThread
void callAvailableThread(CallbackQueue *queue, bool &done)
Definition: test_callback_queue.cpp:69
ConditionObject::condition_sync
boost::atomic< bool > condition_sync
Definition: test_callback_queue.cpp:487
ros::CallbackQueue::callOne
CallOneResult callOne()
ros::WallDuration
ros::NodeHandle::createTimer
Timer createTimer(Duration period, const TimerCallback &callback, bool oneshot=false, bool autostart=true) const
ros::Duration
ros::CallbackQueue::callAvailable
void callAvailable()
ros::CallbackQueue::Called
Called
CountingCallback::mutex
boost::mutex mutex
Definition: test_callback_queue.cpp:64
RaceConditionCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:498
threadFunc
void threadFunc(boost::barrier *b)
ros::CallbackQueueInterface
SelfRemovingCallback::call
virtual CallResult call()
Definition: test_callback_queue.cpp:152


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:02:02