00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <nodelet/detail/callback_queue_manager.h>
00031 #include <nodelet/detail/callback_queue.h>
00032 #include <ros/callback_queue.h>
00033 #include <ros/time.h>
00034 #include <ros/console.h>
00035
00036 #include <boost/thread.hpp>
00037
00038 #include <gtest/gtest.h>
00039
00040 using namespace nodelet;
00041 using namespace nodelet::detail;
00042
00043 class SingleThreadedCallback : public ros::CallbackInterface
00044 {
00045 public:
00046 SingleThreadedCallback(boost::barrier* bar)
00047 : success(true)
00048 , calls(0)
00049 , inited_(false)
00050 , barrier_(bar)
00051 {}
00052
00053 ros::CallbackInterface::CallResult call()
00054 {
00055 ros::WallDuration(0.1).sleep();
00056 barrier_->wait();
00057
00058 {
00059 boost::mutex::scoped_lock lock(mutex_);
00060 if (!inited_)
00061 {
00062 initial_call_id = boost::this_thread::get_id();
00063 inited_ = true;
00064 }
00065
00066 if (initial_call_id != boost::this_thread::get_id())
00067 {
00068 success = false;
00069 }
00070
00071 ++calls;
00072 }
00073
00074 return Success;
00075 }
00076
00077 bool success;
00078 uint32_t calls;
00079 boost::thread::id initial_call_id;
00080
00081 private:
00082 bool inited_;
00083 boost::mutex mutex_;
00084 boost::barrier* barrier_;
00085 };
00086 typedef boost::shared_ptr<SingleThreadedCallback> SingleThreadedCallbackPtr;
00087
00088 TEST(CallbackQueueManager, singleThreaded)
00089 {
00090 CallbackQueueManager man;
00091 CallbackQueuePtr queue(new CallbackQueue(&man));
00092 man.addQueue(queue, false);
00093
00094 boost::barrier bar(1);
00095 SingleThreadedCallbackPtr cb(new SingleThreadedCallback(&bar));
00096 for (uint32_t i = 0; i < 10; ++i)
00097 {
00098 queue->addCallback(cb, 0);
00099 }
00100
00101 while (cb->calls < 10)
00102 {
00103 ros::WallDuration(0.01).sleep();
00104 }
00105
00106 EXPECT_EQ(cb->calls, 10U);
00107 ASSERT_TRUE(cb->success);
00108 }
00109
00110 TEST(CallbackQueueManager, multipleSingleThreaded)
00111 {
00112 uint32_t thread_count = boost::thread::hardware_concurrency();
00113
00114 if (thread_count == 1)
00115 {
00116 thread_count = 2;
00117 }
00118 CallbackQueueManager man(thread_count);
00119 CallbackQueuePtr queue1(new CallbackQueue(&man));
00120 CallbackQueuePtr queue2(new CallbackQueue(&man));
00121 man.addQueue(queue1, false);
00122 man.addQueue(queue2, false);
00123
00124 boost::barrier bar(2);
00125 SingleThreadedCallbackPtr cb1(new SingleThreadedCallback(&bar));
00126 SingleThreadedCallbackPtr cb2(new SingleThreadedCallback(&bar));
00127 for (uint32_t i = 0; i < 10; ++i)
00128 {
00129 queue1->addCallback(cb1, 1);
00130 queue2->addCallback(cb2, 1);
00131 }
00132
00133 while (cb1->calls < 10 || cb2->calls < 10)
00134 {
00135 ros::WallDuration(0.01).sleep();
00136 }
00137
00138 EXPECT_EQ(cb1->calls, 10U);
00139 EXPECT_EQ(cb2->calls, 10U);
00140 EXPECT_TRUE(cb1->success);
00141 EXPECT_TRUE(cb2->success);
00142 EXPECT_NE(cb1->initial_call_id, cb2->initial_call_id);
00143 }
00144
00145 class MultiThreadedCallback : public ros::CallbackInterface
00146 {
00147 public:
00148 MultiThreadedCallback(boost::barrier* bar)
00149 : barrier_(bar)
00150 {}
00151
00152 ros::CallbackInterface::CallResult call()
00153 {
00154 barrier_->wait();
00155
00156 return Success;
00157 }
00158
00159 private:
00160 boost::barrier* barrier_;
00161 };
00162 typedef boost::shared_ptr<MultiThreadedCallback> MultiThreadedCallbackPtr;
00163
00164 TEST(CallbackQueueManager, multiThreaded)
00165 {
00166 for (uint32_t j = 0; j < 1000; ++j)
00167 {
00168
00169 CallbackQueueManager man(1);
00170 CallbackQueuePtr queue(new CallbackQueue(&man));
00171 man.addQueue(queue, true);
00172
00173 uint32_t num_threads = man.getNumWorkerThreads();
00174 boost::barrier bar(num_threads + 1);
00175
00176 MultiThreadedCallbackPtr cb(new MultiThreadedCallback(&bar));
00177 for (uint32_t i = 0; i < num_threads; ++i)
00178 {
00179 queue->addCallback(cb, 0);
00180 }
00181
00182 bar.wait();
00183 queue->removeByID(0);
00184 }
00185 }
00186
00187 int main(int argc, char** argv)
00188 {
00189 ::testing::InitGoogleTest(&argc, argv);
00190 return RUN_ALL_TESTS();
00191 }