Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "cartographer/common/thread_pool.h"
00018
00019 #include <vector>
00020
00021 #include "absl/memory/memory.h"
00022 #include "gtest/gtest.h"
00023
00024 namespace cartographer {
00025 namespace common {
00026 namespace {
00027
00028 class Receiver {
00029 public:
00030 void Receive(int number) {
00031 absl::MutexLock locker(&mutex_);
00032 received_numbers_.push_back(number);
00033 }
00034
00035 void WaitForNumberSequence(const std::vector<int>& expected_numbers) {
00036 const auto predicate =
00037 [this, &expected_numbers]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00038 return (received_numbers_.size() >= expected_numbers.size());
00039 };
00040 absl::MutexLock locker(&mutex_);
00041 mutex_.Await(absl::Condition(&predicate));
00042 EXPECT_EQ(expected_numbers, received_numbers_);
00043 }
00044
00045 absl::Mutex mutex_;
00046 std::vector<int> received_numbers_ GUARDED_BY(mutex_);
00047 };
00048
00049 TEST(ThreadPoolTest, RunTask) {
00050 ThreadPool pool(1);
00051 Receiver receiver;
00052 auto task = absl::make_unique<Task>();
00053 task->SetWorkItem([&receiver]() { receiver.Receive(1); });
00054 pool.Schedule(std::move(task));
00055 receiver.WaitForNumberSequence({1});
00056 }
00057
00058 TEST(ThreadPoolTest, ManyTasks) {
00059 for (int a = 0; a < 5; ++a) {
00060 ThreadPool pool(3);
00061 Receiver receiver;
00062 int kNumTasks = 10;
00063 for (int i = 0; i < kNumTasks; ++i) {
00064 auto task = absl::make_unique<Task>();
00065 task->SetWorkItem([&receiver]() { receiver.Receive(1); });
00066 pool.Schedule(std::move(task));
00067 }
00068 receiver.WaitForNumberSequence(std::vector<int>(kNumTasks, 1));
00069 }
00070 }
00071
00072 TEST(ThreadPoolTest, RunWithDependency) {
00073 ThreadPool pool(2);
00074 Receiver receiver;
00075 auto task_2 = absl::make_unique<Task>();
00076 task_2->SetWorkItem([&receiver]() { receiver.Receive(2); });
00077 auto task_1 = absl::make_unique<Task>();
00078 task_1->SetWorkItem([&receiver]() { receiver.Receive(1); });
00079 auto weak_task_1 = pool.Schedule(std::move(task_1));
00080 task_2->AddDependency(weak_task_1);
00081 pool.Schedule(std::move(task_2));
00082 receiver.WaitForNumberSequence({1, 2});
00083 }
00084
00085 TEST(ThreadPoolTest, RunWithOutOfScopeDependency) {
00086 ThreadPool pool(2);
00087 Receiver receiver;
00088 auto task_2 = absl::make_unique<Task>();
00089 task_2->SetWorkItem([&receiver]() { receiver.Receive(2); });
00090 {
00091 auto task_1 = absl::make_unique<Task>();
00092 task_1->SetWorkItem([&receiver]() { receiver.Receive(1); });
00093 auto weak_task_1 = pool.Schedule(std::move(task_1));
00094 task_2->AddDependency(weak_task_1);
00095 }
00096 pool.Schedule(std::move(task_2));
00097 receiver.WaitForNumberSequence({1, 2});
00098 }
00099
00100 TEST(ThreadPoolTest, ManyDependencies) {
00101 for (int a = 0; a < 5; ++a) {
00102 ThreadPool pool(5);
00103 Receiver receiver;
00104 int kNumDependencies = 10;
00105 auto task = absl::make_unique<Task>();
00106 task->SetWorkItem([&receiver]() { receiver.Receive(1); });
00107 for (int i = 0; i < kNumDependencies; ++i) {
00108 auto dependency_task = absl::make_unique<Task>();
00109 dependency_task->SetWorkItem([]() {});
00110 task->AddDependency(pool.Schedule(std::move(dependency_task)));
00111 }
00112 pool.Schedule(std::move(task));
00113 receiver.WaitForNumberSequence({1});
00114 }
00115 }
00116
00117 TEST(ThreadPoolTest, ManyDependants) {
00118 for (int a = 0; a < 5; ++a) {
00119 ThreadPool pool(5);
00120 Receiver receiver;
00121 int kNumDependants = 10;
00122 auto dependency_task = absl::make_unique<Task>();
00123 dependency_task->SetWorkItem([]() {});
00124 auto dependency_handle = pool.Schedule(std::move(dependency_task));
00125 for (int i = 0; i < kNumDependants; ++i) {
00126 auto task = absl::make_unique<Task>();
00127 task->AddDependency(dependency_handle);
00128 task->SetWorkItem([&receiver]() { receiver.Receive(1); });
00129 pool.Schedule(std::move(task));
00130 }
00131 receiver.WaitForNumberSequence(std::vector<int>(kNumDependants, 1));
00132 }
00133 }
00134
00135 TEST(ThreadPoolTest, RunWithMultipleDependencies) {
00136 ThreadPool pool(2);
00137 Receiver receiver;
00138 auto task_1 = absl::make_unique<Task>();
00139 task_1->SetWorkItem([&receiver]() { receiver.Receive(1); });
00140 auto task_2a = absl::make_unique<Task>();
00141 task_2a->SetWorkItem([&receiver]() { receiver.Receive(2); });
00142 auto task_2b = absl::make_unique<Task>();
00143 task_2b->SetWorkItem([&receiver]() { receiver.Receive(2); });
00144 auto task_3 = absl::make_unique<Task>();
00145 task_3->SetWorkItem([&receiver]() { receiver.Receive(3); });
00146
00147
00148
00149 auto weak_task_1 = pool.Schedule(std::move(task_1));
00150 task_2a->AddDependency(weak_task_1);
00151 auto weak_task_2a = pool.Schedule(std::move(task_2a));
00152 task_3->AddDependency(weak_task_1);
00153 task_3->AddDependency(weak_task_2a);
00154 task_2b->AddDependency(weak_task_1);
00155 auto weak_task_2b = pool.Schedule(std::move(task_2b));
00156 task_3->AddDependency(weak_task_2b);
00157 pool.Schedule(std::move(task_3));
00158 receiver.WaitForNumberSequence({1, 2, 2, 3});
00159 }
00160
00161 TEST(ThreadPoolTest, RunWithFinishedDependency) {
00162 ThreadPool pool(2);
00163 Receiver receiver;
00164 auto task_1 = absl::make_unique<Task>();
00165 task_1->SetWorkItem([&receiver]() { receiver.Receive(1); });
00166 auto task_2 = absl::make_unique<Task>();
00167 task_2->SetWorkItem([&receiver]() { receiver.Receive(2); });
00168 auto weak_task_1 = pool.Schedule(std::move(task_1));
00169 task_2->AddDependency(weak_task_1);
00170 receiver.WaitForNumberSequence({1});
00171 pool.Schedule(std::move(task_2));
00172 receiver.WaitForNumberSequence({1, 2});
00173 }
00174
00175 }
00176 }
00177 }