00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "cartographer/common/task.h"
00018
00019 #include <memory>
00020 #include <queue>
00021
00022 #include "absl/memory/memory.h"
00023 #include "cartographer/common/thread_pool.h"
00024 #include "gmock/gmock.h"
00025 #include "gtest/gtest.h"
00026
00027 namespace cartographer {
00028 namespace common {
00029 namespace {
00030
00031 class MockCallback {
00032 public:
00033 MOCK_METHOD0(Run, void());
00034 };
00035
00036 class FakeThreadPool : public ThreadPoolInterface {
00037 public:
00038 void NotifyDependenciesCompleted(Task* task) override {
00039 auto it = tasks_not_ready_.find(task);
00040 ASSERT_NE(it, tasks_not_ready_.end());
00041 task_queue_.push_back(it->second);
00042 tasks_not_ready_.erase(it);
00043 }
00044
00045 std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task) override {
00046 std::shared_ptr<Task> shared_task;
00047 auto it =
00048 tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
00049 EXPECT_TRUE(it.second);
00050 shared_task = it.first->second;
00051 SetThreadPool(shared_task.get());
00052 return shared_task;
00053 }
00054
00055 void RunNext() {
00056 ASSERT_GE(task_queue_.size(), 1);
00057 Execute(task_queue_.front().get());
00058 task_queue_.pop_front();
00059 }
00060
00061 bool IsEmpty() { return task_queue_.empty(); }
00062
00063 private:
00064 std::deque<std::shared_ptr<Task>> task_queue_;
00065 std::map<Task*, std::shared_ptr<Task>> tasks_not_ready_;
00066 };
00067
00068 class TaskTest : public ::testing::Test {
00069 protected:
00070 FakeThreadPool* thread_pool() { return &thread_pool_; }
00071 FakeThreadPool thread_pool_;
00072 };
00073
00074 TEST_F(TaskTest, RunTask) {
00075 auto a = absl::make_unique<Task>();
00076 MockCallback callback;
00077 a->SetWorkItem([&callback]() { callback.Run(); });
00078 EXPECT_EQ(a->GetState(), Task::NEW);
00079 auto shared_a = thread_pool()->Schedule(std::move(a)).lock();
00080 EXPECT_NE(shared_a, nullptr);
00081 EXPECT_EQ(shared_a->GetState(), Task::DEPENDENCIES_COMPLETED);
00082 EXPECT_CALL(callback, Run()).Times(1);
00083 thread_pool()->RunNext();
00084 EXPECT_EQ(shared_a->GetState(), Task::COMPLETED);
00085 EXPECT_TRUE(thread_pool()->IsEmpty());
00086 }
00087
00088 TEST_F(TaskTest, RunTaskWithDependency) {
00089 auto a = absl::make_unique<Task>();
00090 auto b = absl::make_unique<Task>();
00091 MockCallback callback_a;
00092 a->SetWorkItem([&callback_a]() { callback_a.Run(); });
00093 MockCallback callback_b;
00094 b->SetWorkItem([&callback_b]() { callback_b.Run(); });
00095 EXPECT_EQ(a->GetState(), Task::NEW);
00096 EXPECT_EQ(b->GetState(), Task::NEW);
00097 {
00098 ::testing::InSequence dummy;
00099 EXPECT_CALL(callback_a, Run()).Times(1);
00100 EXPECT_CALL(callback_b, Run()).Times(1);
00101 }
00102 auto shared_a = thread_pool()->Schedule(std::move(a)).lock();
00103 EXPECT_NE(shared_a, nullptr);
00104 b->AddDependency(shared_a);
00105 auto shared_b = thread_pool()->Schedule(std::move(b)).lock();
00106 EXPECT_NE(shared_b, nullptr);
00107 EXPECT_EQ(shared_b->GetState(), Task::DISPATCHED);
00108 EXPECT_EQ(shared_a->GetState(), Task::DEPENDENCIES_COMPLETED);
00109 thread_pool()->RunNext();
00110 EXPECT_EQ(shared_b->GetState(), Task::DEPENDENCIES_COMPLETED);
00111 thread_pool()->RunNext();
00112 EXPECT_EQ(shared_a->GetState(), Task::COMPLETED);
00113 EXPECT_EQ(shared_b->GetState(), Task::COMPLETED);
00114 }
00115
00116 TEST_F(TaskTest, RunTaskWithTwoDependency) {
00117
00118
00119
00120 auto a = absl::make_unique<Task>();
00121 auto b = absl::make_unique<Task>();
00122 auto c = absl::make_unique<Task>();
00123 auto d = absl::make_unique<Task>();
00124 MockCallback callback_a;
00125 a->SetWorkItem([&callback_a]() { callback_a.Run(); });
00126 MockCallback callback_b;
00127 b->SetWorkItem([&callback_b]() { callback_b.Run(); });
00128 MockCallback callback_c;
00129 c->SetWorkItem([&callback_c]() { callback_c.Run(); });
00130 MockCallback callback_d;
00131 d->SetWorkItem([&callback_d]() { callback_d.Run(); });
00132 EXPECT_CALL(callback_a, Run()).Times(1);
00133 EXPECT_CALL(callback_b, Run()).Times(1);
00134 EXPECT_CALL(callback_c, Run()).Times(1);
00135 EXPECT_CALL(callback_d, Run()).Times(1);
00136 auto shared_a = thread_pool()->Schedule(std::move(a)).lock();
00137 EXPECT_NE(shared_a, nullptr);
00138 b->AddDependency(shared_a);
00139 auto shared_b = thread_pool()->Schedule(std::move(b)).lock();
00140 EXPECT_NE(shared_b, nullptr);
00141 auto shared_c = thread_pool()->Schedule(std::move(c)).lock();
00142 EXPECT_NE(shared_c, nullptr);
00143 d->AddDependency(shared_b);
00144 d->AddDependency(shared_c);
00145 auto shared_d = thread_pool()->Schedule(std::move(d)).lock();
00146 EXPECT_NE(shared_d, nullptr);
00147 EXPECT_EQ(shared_b->GetState(), Task::DISPATCHED);
00148 EXPECT_EQ(shared_d->GetState(), Task::DISPATCHED);
00149 thread_pool()->RunNext();
00150 EXPECT_EQ(shared_a->GetState(), Task::COMPLETED);
00151 EXPECT_EQ(shared_b->GetState(), Task::DEPENDENCIES_COMPLETED);
00152 EXPECT_EQ(shared_c->GetState(), Task::DEPENDENCIES_COMPLETED);
00153 thread_pool()->RunNext();
00154 thread_pool()->RunNext();
00155 EXPECT_EQ(shared_b->GetState(), Task::COMPLETED);
00156 EXPECT_EQ(shared_c->GetState(), Task::COMPLETED);
00157 EXPECT_EQ(shared_d->GetState(), Task::DEPENDENCIES_COMPLETED);
00158 thread_pool()->RunNext();
00159 EXPECT_EQ(shared_d->GetState(), Task::COMPLETED);
00160 }
00161
00162 TEST_F(TaskTest, RunWithCompletedDependency) {
00163 auto a = absl::make_unique<Task>();
00164 MockCallback callback_a;
00165 a->SetWorkItem([&callback_a]() { callback_a.Run(); });
00166 auto shared_a = thread_pool()->Schedule(std::move(a)).lock();
00167 EXPECT_NE(shared_a, nullptr);
00168 EXPECT_EQ(shared_a->GetState(), Task::DEPENDENCIES_COMPLETED);
00169 EXPECT_CALL(callback_a, Run()).Times(1);
00170 thread_pool()->RunNext();
00171 EXPECT_EQ(shared_a->GetState(), Task::COMPLETED);
00172 auto b = absl::make_unique<Task>();
00173 MockCallback callback_b;
00174 b->SetWorkItem([&callback_b]() { callback_b.Run(); });
00175 b->AddDependency(shared_a);
00176 EXPECT_EQ(b->GetState(), Task::NEW);
00177 auto shared_b = thread_pool()->Schedule(std::move(b)).lock();
00178 EXPECT_NE(shared_b, nullptr);
00179 EXPECT_EQ(shared_b->GetState(), Task::DEPENDENCIES_COMPLETED);
00180 EXPECT_CALL(callback_b, Run()).Times(1);
00181 thread_pool()->RunNext();
00182 EXPECT_EQ(shared_b->GetState(), Task::COMPLETED);
00183 }
00184
00185 }
00186 }
00187 }