test_callback_queue_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of the Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
32 #include <ros/callback_queue.h>
33 #include <ros/time.h>
34 #include <ros/console.h>
35 
36 #include <boost/thread.hpp>
37 
38 #include <gtest/gtest.h>
39 
40 using namespace nodelet;
41 using namespace nodelet::detail;
42 
44 {
45 public:
46  SingleThreadedCallback(boost::barrier* bar)
47  : success(true)
48  , calls(0)
49  , inited_(false)
50  , barrier_(bar)
51  {}
52 
54  {
55  ros::WallDuration(0.1).sleep();
56  barrier_->wait();
57 
58  {
59  boost::mutex::scoped_lock lock(mutex_);
60  if (!inited_)
61  {
62  initial_call_id = boost::this_thread::get_id();
63  inited_ = true;
64  }
65 
66  if (initial_call_id != boost::this_thread::get_id())
67  {
68  success = false;
69  }
70 
71  ++calls;
72  }
73 
74  return Success;
75  }
76 
77  bool success;
78  uint32_t calls;
79  boost::thread::id initial_call_id;
80 
81 private:
82  bool inited_;
83  boost::mutex mutex_;
84  boost::barrier* barrier_;
85 };
87 
88 TEST(CallbackQueueManager, singleThreaded)
89 {
91  CallbackQueuePtr queue(new CallbackQueue(&man));
92  man.addQueue(queue, false);
93 
94  boost::barrier bar(1);
96  for (uint32_t i = 0; i < 10; ++i)
97  {
98  queue->addCallback(cb, 0);
99  }
100 
101  while (cb->calls < 10)
102  {
103  ros::WallDuration(0.01).sleep();
104  }
105 
106  EXPECT_EQ(cb->calls, 10U);
107  ASSERT_TRUE(cb->success);
108 }
109 
110 TEST(CallbackQueueManager, multipleSingleThreaded)
111 {
112  uint32_t thread_count = boost::thread::hardware_concurrency();
113  // Need at least 2 threads here
114  if (thread_count == 1)
115  {
116  thread_count = 2;
117  }
118  CallbackQueueManager man(thread_count);
119  CallbackQueuePtr queue1(new CallbackQueue(&man));
120  CallbackQueuePtr queue2(new CallbackQueue(&man));
121  man.addQueue(queue1, false);
122  man.addQueue(queue2, false);
123 
124  boost::barrier bar(2);
127  for (uint32_t i = 0; i < 10; ++i)
128  {
129  queue1->addCallback(cb1, 1);
130  queue2->addCallback(cb2, 1);
131  }
132 
133  while (cb1->calls < 10 || cb2->calls < 10)
134  {
135  ros::WallDuration(0.01).sleep();
136  }
137 
138  EXPECT_EQ(cb1->calls, 10U);
139  EXPECT_EQ(cb2->calls, 10U);
140  EXPECT_TRUE(cb1->success);
141  EXPECT_TRUE(cb2->success);
142  EXPECT_NE(cb1->initial_call_id, cb2->initial_call_id);
143 }
144 
146 {
147 public:
148  MultiThreadedCallback(boost::barrier* bar)
149  : barrier_(bar)
150  {}
151 
153  {
154  barrier_->wait();
155 
156  return Success;
157  }
158 
159 private:
160  boost::barrier* barrier_;
161 };
163 
164 TEST(CallbackQueueManager, multiThreaded)
165 {
166  for (uint32_t j = 0; j < 1000; ++j)
167  {
168  //ROS_INFO_COND(j % 1000 == 0, "%d", j);
169  CallbackQueueManager man(1);
170  CallbackQueuePtr queue(new CallbackQueue(&man));
171  man.addQueue(queue, true);
172 
173  uint32_t num_threads = man.getNumWorkerThreads();
174  boost::barrier bar(num_threads + 1);
175 
177  for (uint32_t i = 0; i < num_threads; ++i)
178  {
179  queue->addCallback(cb, 0);
180  }
181 
182  bar.wait();
183  queue->removeByID(0);
184  }
185 }
186 
187 int main(int argc, char** argv)
188 {
189  ::testing::InitGoogleTest(&argc, argv);
190  return RUN_ALL_TESTS();
191 }
TEST(CallbackQueueManager, singleThreaded)
boost::shared_ptr< SingleThreadedCallback > SingleThreadedCallbackPtr
void addQueue(const CallbackQueuePtr &queue, bool threaded)
MultiThreadedCallback(boost::barrier *bar)
ros::CallbackInterface::CallResult call()
ros::CallbackInterface::CallResult call()
bool sleep() const
boost::shared_ptr< MultiThreadedCallback > MultiThreadedCallbackPtr
SingleThreadedCallback(boost::barrier *bar)
int main(int argc, char **argv)


test_nodelet
Author(s): Tully Foote
autogenerated on Mon Feb 28 2022 22:57:16