test_callback_queue.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 /* Author: Josh Faust */
31 
32 /*
33  * Test callback queue
34  */
35 
36 #include <gtest/gtest.h>
37 #include <ros/callback_queue.h>
38 #include <ros/console.h>
39 #include <ros/timer.h>
40 
41 #include <boost/atomic.hpp>
42 #include <boost/shared_ptr.hpp>
43 #include <boost/bind.hpp>
44 #include <boost/thread.hpp>
45 #include <boost/function.hpp>
46 
47 using namespace ros;
48 
50 {
51 public:
53  : count(0)
54  {}
55 
56  virtual CallResult call()
57  {
58  boost::mutex::scoped_lock lock(mutex);
59  ++count;
60 
61  return Success;
62  }
63 
64  boost::mutex mutex;
65  size_t count;
66 };
68 
69 TEST(CallbackQueue, singleCallback)
70 {
71  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
72  CallbackQueue queue;
73  queue.addCallback(cb, 1);
74  queue.callOne();
75 
76  EXPECT_EQ(cb->count, 1U);
77 
78  queue.addCallback(cb, 1);
79  queue.callAvailable();
80 
81  EXPECT_EQ(cb->count, 2U);
82 
83  queue.callOne();
84  EXPECT_EQ(cb->count, 2U);
85 
86  queue.callAvailable();
87  EXPECT_EQ(cb->count, 2U);
88 }
89 
90 TEST(CallbackQueue, multipleCallbacksCallAvailable)
91 {
92  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
93  CallbackQueue queue;
94  for (uint32_t i = 0; i < 1000; ++i)
95  {
96  queue.addCallback(cb, 1);
97  }
98 
99  queue.callAvailable();
100 
101  EXPECT_EQ(cb->count, 1000U);
102 }
103 
104 TEST(CallbackQueue, multipleCallbacksCallOne)
105 {
106  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
107  CallbackQueue queue;
108  for (uint32_t i = 0; i < 1000; ++i)
109  {
110  queue.addCallback(cb, 1);
111  }
112 
113  for (uint32_t i = 0; i < 1000; ++i)
114  {
115  queue.callOne();
116  EXPECT_EQ(cb->count, i + 1);
117  }
118 }
119 
121 {
122  CountingCallbackPtr cb1(boost::make_shared<CountingCallback>());
123  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
124  CallbackQueue queue;
125  queue.addCallback(cb1, 1);
126  queue.addCallback(cb2, 2);
127  queue.removeByID(1);
128  queue.callAvailable();
129 
130  EXPECT_EQ(cb1->count, 0U);
131  EXPECT_EQ(cb2->count, 1U);
132 }
133 
135 {
136 public:
137  SelfRemovingCallback(CallbackQueue* queue, uint64_t id)
138  : count(0)
139  , queue(queue)
140  , id(id)
141  {}
142 
143  virtual CallResult call()
144  {
145  ++count;
146 
147  queue->removeByID(id);
148 
149  return Success;
150  }
151 
152  size_t count;
153 
155  uint64_t id;
156 };
158 
159 TEST(CallbackQueue, removeSelf)
160 {
161  CallbackQueue queue;
162  SelfRemovingCallbackPtr cb1(boost::make_shared<SelfRemovingCallback>(&queue, 1));
163  CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
164  queue.addCallback(cb1, 1);
165  queue.addCallback(cb2, 1);
166  queue.addCallback(cb2, 1);
167 
168  queue.callOne();
169 
170  queue.addCallback(cb2, 1);
171 
172  queue.callAvailable();
173 
174  EXPECT_EQ(cb1->count, 1U);
175  EXPECT_EQ(cb2->count, 1U);
176 }
177 
179 {
180 public:
181  RecursiveCallback(CallbackQueue* queue, bool use_available)
182  : count(0)
183  , queue(queue)
184  , use_available(use_available)
185  {}
186 
187  virtual CallResult call()
188  {
189  ++count;
190 
191  if (count < 3)
192  {
193  if (use_available)
194  {
195  queue->callAvailable();
196  }
197  else
198  {
199  queue->callOne();
200  }
201  }
202 
203  return Success;
204  }
205 
206  size_t count;
207 
210 };
212 
213 TEST(CallbackQueue, recursive1)
214 {
215  CallbackQueue queue;
216  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, true));
217  queue.addCallback(cb, 1);
218  queue.addCallback(cb, 1);
219  queue.addCallback(cb, 1);
220  queue.callAvailable();
221 
222  EXPECT_EQ(cb->count, 3U);
223 }
224 
225 TEST(CallbackQueue, recursive2)
226 {
227  CallbackQueue queue;
228  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, false));
229  queue.addCallback(cb, 1);
230  queue.addCallback(cb, 1);
231  queue.addCallback(cb, 1);
232  queue.callOne();
233 
234  EXPECT_EQ(cb->count, 3U);
235 }
236 
237 TEST(CallbackQueue, recursive3)
238 {
239  CallbackQueue queue;
240  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, false));
241  queue.addCallback(cb, 1);
242  queue.addCallback(cb, 1);
243  queue.addCallback(cb, 1);
244  queue.callAvailable();
245 
246  EXPECT_EQ(cb->count, 3U);
247 }
248 
249 TEST(CallbackQueue, recursive4)
250 {
251  CallbackQueue queue;
252  RecursiveCallbackPtr cb(boost::make_shared<RecursiveCallback>(&queue, true));
253  queue.addCallback(cb, 1);
254  queue.addCallback(cb, 1);
255  queue.addCallback(cb, 1);
256  queue.callOne();
257 
258  EXPECT_EQ(cb->count, 3U);
259 }
260 
261 void callAvailableThread(CallbackQueue* queue, bool& done)
262 {
263  while (!done)
264  {
265  queue->callAvailable(ros::WallDuration(0.1));
266  }
267 }
268 
269 size_t runThreadedTest(const CountingCallbackPtr& cb, const boost::function<void(CallbackQueue*, bool&)>& threadFunc)
270 {
271  CallbackQueue queue;
272  boost::thread_group tg;
273  bool done = false;
274 
275  for (uint32_t i = 0; i < 10; ++i)
276  {
277  tg.create_thread(boost::bind(threadFunc, &queue, boost::ref(done)));
278  }
279 
281  size_t i = 0;
282  while (ros::WallTime::now() - start < ros::WallDuration(5))
283  {
284  queue.addCallback(cb);
285  ++i;
286  }
287 
288  while (!queue.isEmpty())
289  {
290  ros::WallDuration(0.01).sleep();
291  }
292 
293  done = true;
294  tg.join_all();
295 
296  return i;
297 }
298 
299 TEST(CallbackQueue, threadedCallAvailable)
300 {
301  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
302  size_t i = runThreadedTest(cb, callAvailableThread);
303  ROS_INFO_STREAM(i);
304  EXPECT_EQ(cb->count, i);
305 }
306 
307 void callOneThread(CallbackQueue* queue, bool& done)
308 {
309  while (!done)
310  {
311  queue->callOne(ros::WallDuration(0.1));
312  }
313 }
314 
315 TEST(CallbackQueue, threadedCallOne)
316 {
317  CountingCallbackPtr cb(boost::make_shared<CountingCallback>());
318  size_t i = runThreadedTest(cb, callOneThread);
319  ROS_INFO_STREAM(i);
320  EXPECT_EQ(cb->count, i);
321 }
322 
323 // this class is just an ugly hack
324 // to access the constructor Timer(TimerOptions)
325 namespace ros
326 {
327 class NodeHandle
328 {
329 public:
330  static Timer createTimer(const TimerOptions& ops)
331  {
332  return Timer(ops);
333  }
334 };
335 }
336 
338 {
339 }
340 
342 
344 {
345  // wait until the timer is TimerRecreationCallback is garbaged
346  WallDuration(2).sleep();
347 
348  TimerOptions ops(Duration(0.1), dummyTimer, recursiveTimerQueue, false, false);
350  t.start();
351 }
352 
354 {
355 public:
357  : queue(_queue)
358  {}
359 
360  virtual CallResult call()
361  {
362  TimerOptions ops(Duration(0.1), recursiveTimer, queue, false, false);
364  t.start();
365 
366  // wait until the recursiveTimer has been fired
367  WallDuration(1).sleep();
368 
369  return Success;
370  }
371 
373 };
375 
377 {
378  // ensure that the test does not dead-lock, see #3867
379  ros::Time::init();
380  CallbackQueue queue;
381  recursiveTimerQueue = &queue;
382  TimerRecursionCallbackPtr cb(boost::make_shared<TimerRecursionCallback>(&queue));
383  queue.addCallback(cb, 1);
384 
385  boost::thread_group tg;
386  bool done = false;
387 
388  for (uint32_t i = 0; i < 2; ++i)
389  {
390  tg.create_thread(boost::bind(callOneThread, &queue, boost::ref(done)));
391  }
392 
393  while (!queue.isEmpty())
394  {
395  ros::WallDuration(0.01).sleep();
396  }
397 
398  done = true;
399  tg.join_all();
400 }
401 
403 {
404 public:
406  : id(0), queue(_queue) {
407  condition_sync.store(true);
408  condition_one.store(false);
409  condition_stop.store(false);
410  }
411 
412  void add();
413 
414  unsigned long id;
416  boost::atomic<bool> condition_one;
417  boost::atomic<bool> condition_sync;
418  boost::atomic<bool> condition_stop;
419 };
420 
422 {
423 public:
424  RaceConditionCallback(ConditionObject * _condition_object, unsigned long * _id)
425  : condition_object(_condition_object), id(_id)
426  {}
427 
428  virtual CallResult call()
429  {
430  condition_object->condition_one.store(false);
431  return Success;
432  }
433 
435  unsigned long * id;
436 };
437 
439 {
440  while(!condition_stop.load())
441  {
442  if(condition_sync.load() && queue->isEmpty())
443  {
444  condition_one.store(true);
445  id++;
446  queue->addCallback(boost::make_shared<RaceConditionCallback>(this, &id), id);
447  }
448  boost::this_thread::sleep(boost::posix_time::microseconds(1));
449  }
450 }
451 
452 TEST(CallbackQueue, raceConditionCallback)
453 {
454  CallbackQueue queue;
455  ConditionObject condition_object(&queue);
456 
457  boost::thread t(boost::bind(&ConditionObject::add, &condition_object));
458  for(unsigned int i = 0; i < 1000000; ++i)
459  {
460  condition_object.condition_sync.store(false);
461  if (queue.callOne() == CallbackQueue::Called)
462  {
463  if(condition_object.condition_one.load())
464  {
465  condition_object.condition_stop.store(true);
466  ASSERT_FALSE(condition_object.condition_one.load());
467  }
468  }
469 
470  queue.clear();
471  condition_object.condition_sync.store(true);
472  }
473  condition_object.condition_stop.store(true);
474  t.join();
475 }
476 
477 int main(int argc, char** argv)
478 {
479  testing::InitGoogleTest(&argc, argv);
480  return RUN_ALL_TESTS();
481 }
482 
483 
484 
boost::shared_ptr< TimerRecursionCallback > TimerRecursionCallbackPtr
int main(int argc, char **argv)
virtual CallResult call()
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
ROSCPP_DECL void start()
void dummyTimer(const ros::TimerEvent &)
boost::shared_ptr< CountingCallback > CountingCallbackPtr
size_t runThreadedTest(const CountingCallbackPtr &cb, const boost::function< void(CallbackQueue *, bool &)> &threadFunc)
virtual CallResult call()
ConditionObject(CallbackQueue *_queue)
void start()
virtual CallResult call()
CallOneResult callOne()
boost::atomic< bool > condition_stop
virtual CallResult call()
RecursiveCallback(CallbackQueue *queue, bool use_available)
boost::shared_ptr< RecursiveCallback > RecursiveCallbackPtr
bool add(test_roscpp::TestStringString::Request &, test_roscpp::TestStringString::Response &)
boost::atomic< bool > condition_one
CallbackQueueInterface * recursiveTimerQueue
bool sleep() const
ConditionObject * condition_object
CallbackQueueInterface * queue
static void init()
void threadFunc(boost::barrier *b)
void callAvailableThread(CallbackQueue *queue, bool &done)
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
virtual void removeByID(uint64_t removal_id)
TEST(CallbackQueue, singleCallback)
void recursiveTimer(const ros::TimerEvent &)
void callOneThread(CallbackQueue *queue, bool &done)
#define ROS_INFO_STREAM(args)
static WallTime now()
boost::shared_ptr< SelfRemovingCallback > SelfRemovingCallbackPtr
CallbackQueue * queue
SelfRemovingCallback(CallbackQueue *queue, uint64_t id)
virtual CallResult call()
TimerRecursionCallback(CallbackQueueInterface *_queue)
ros::WallTime t
boost::atomic< bool > condition_sync
RaceConditionCallback(ConditionObject *_condition_object, unsigned long *_id)


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:46