Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <nodelet/detail/callback_queue_manager.h>
00031 #include <nodelet/detail/callback_queue.h>
00032 #include <ros/callback_queue.h>
00033 #include <ros/time.h>
00034 #include <ros/console.h>
00035
00036 #include <boost/thread.hpp>
00037
00038 #include <gtest/gtest.h>
00039
00040 using namespace nodelet;
00041 using namespace nodelet::detail;
00042
00043 class SingleThreadedCallback : public ros::CallbackInterface
00044 {
00045 public:
00046 SingleThreadedCallback(boost::barrier* bar)
00047 : success(true)
00048 , calls(0)
00049 , inited_(false)
00050 , barrier_(bar)
00051 {}
00052
00053 ros::CallbackInterface::CallResult call()
00054 {
00055 ros::WallDuration(0.1).sleep();
00056 barrier_->wait();
00057
00058 {
00059 boost::mutex::scoped_lock lock(mutex_);
00060 if (!inited_)
00061 {
00062 initial_call_id = boost::this_thread::get_id();
00063 inited_ = true;
00064 }
00065
00066 if (initial_call_id != boost::this_thread::get_id())
00067 {
00068 success = false;
00069 }
00070
00071 ++calls;
00072 }
00073
00074 return Success;
00075 }
00076
00077 bool success;
00078 uint32_t calls;
00079 boost::thread::id initial_call_id;
00080
00081 private:
00082 bool inited_;
00083 boost::mutex mutex_;
00084 boost::barrier* barrier_;
00085 };
00086 typedef boost::shared_ptr<SingleThreadedCallback> SingleThreadedCallbackPtr;
00087
00088 TEST(CallbackQueueManager, singleThreaded)
00089 {
00090 CallbackQueueManager man;
00091 CallbackQueuePtr queue(new CallbackQueue(&man));
00092 man.addQueue(queue, false);
00093
00094 boost::barrier bar(1);
00095 SingleThreadedCallbackPtr cb(new SingleThreadedCallback(&bar));
00096 for (uint32_t i = 0; i < 10; ++i)
00097 {
00098 queue->addCallback(cb, 0);
00099 }
00100
00101 while (cb->calls < 10)
00102 {
00103 ros::WallDuration(0.01).sleep();
00104 }
00105
00106 EXPECT_EQ(cb->calls, 10U);
00107 ASSERT_TRUE(cb->success);
00108 }
00109
00110 TEST(CallbackQueueManager, multipleSingleThreaded)
00111 {
00112 uint32_t thread_count = boost::thread::hardware_concurrency();
00113
00114 if (thread_count == 1)
00115 {
00116 thread_count = 2;
00117 }
00118 CallbackQueueManager man(thread_count);
00119 CallbackQueuePtr queue1(new CallbackQueue(&man));
00120 CallbackQueuePtr queue2(new CallbackQueue(&man));
00121 man.addQueue(queue1, false);
00122 man.addQueue(queue2, false);
00123
00124 boost::barrier bar(2);
00125 SingleThreadedCallbackPtr cb1(new SingleThreadedCallback(&bar));
00126 SingleThreadedCallbackPtr cb2(new SingleThreadedCallback(&bar));
00127 for (uint32_t i = 0; i < 10; ++i)
00128 {
00129 queue1->addCallback(cb1, 1);
00130 queue2->addCallback(cb2, 1);
00131 }
00132
00133 while (cb1->calls < 10 || cb2->calls < 10)
00134 {
00135 ros::WallDuration(0.01).sleep();
00136 }
00137
00138 EXPECT_EQ(cb1->calls, 10U);
00139 EXPECT_EQ(cb2->calls, 10U);
00140 EXPECT_TRUE(cb1->success);
00141 EXPECT_TRUE(cb2->success);
00142 EXPECT_NE(cb1->initial_call_id, cb2->initial_call_id);
00143 }
00144
00145 class MultiThreadedCallback : public ros::CallbackInterface
00146 {
00147 public:
00148 MultiThreadedCallback(boost::barrier* bar)
00149 : barrier_(bar)
00150 {}
00151
00152 ros::CallbackInterface::CallResult call()
00153 {
00154 barrier_->wait();
00155
00156 return Success;
00157 }
00158
00159 private:
00160 boost::barrier* barrier_;
00161 };
00162 typedef boost::shared_ptr<MultiThreadedCallback> MultiThreadedCallbackPtr;
00163
00164 TEST(CallbackQueueManager, multiThreaded)
00165 {
00166 for (uint32_t j = 0; j < 1000; ++j)
00167 {
00168
00169 CallbackQueueManager man(1);
00170 CallbackQueuePtr queue(new CallbackQueue(&man));
00171 man.addQueue(queue, true);
00172
00173 uint32_t num_threads = man.getNumWorkerThreads();
00174 boost::barrier bar(num_threads + 1);
00175
00176 MultiThreadedCallbackPtr cb(new MultiThreadedCallback(&bar));
00177 for (uint32_t i = 0; i < num_threads; ++i)
00178 {
00179 queue->addCallback(cb, 0);
00180 }
00181
00182 bar.wait();
00183 queue->removeByID(0);
00184 }
00185 }
00186
00187 int main(int argc, char** argv)
00188 {
00189 ::testing::InitGoogleTest(&argc, argv);
00190 return RUN_ALL_TESTS();
00191 }