thread_pool_for_testing.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright 2018 The Cartographer Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *      http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #include "cartographer/common/internal/testing/thread_pool_for_testing.h"
00018 
00019 #ifndef WIN32
00020 #include <unistd.h>
00021 #endif
00022 #include <algorithm>
00023 #include <chrono>
00024 #include <numeric>
00025 
00026 #include "absl/memory/memory.h"
00027 #include "cartographer/common/task.h"
00028 #include "cartographer/common/time.h"
00029 #include "glog/logging.h"
00030 
00031 namespace cartographer {
00032 namespace common {
00033 namespace testing {
00034 
00035 ThreadPoolForTesting::ThreadPoolForTesting()
00036     : thread_([this]() { ThreadPoolForTesting::DoWork(); }) {}
00037 
00038 ThreadPoolForTesting::~ThreadPoolForTesting() {
00039   {
00040     absl::MutexLock locker(&mutex_);
00041     CHECK(running_);
00042     running_ = false;
00043     CHECK_EQ(task_queue_.size(), 0);
00044     CHECK_EQ(tasks_not_ready_.size(), 0);
00045   }
00046   thread_.join();
00047 }
00048 
00049 void ThreadPoolForTesting::NotifyDependenciesCompleted(Task* task) {
00050   absl::MutexLock locker(&mutex_);
00051   CHECK(running_);
00052   auto it = tasks_not_ready_.find(task);
00053   CHECK(it != tasks_not_ready_.end());
00054   task_queue_.push_back(it->second);
00055   tasks_not_ready_.erase(it);
00056 }
00057 
00058 std::weak_ptr<Task> ThreadPoolForTesting::Schedule(std::unique_ptr<Task> task) {
00059   std::shared_ptr<Task> shared_task;
00060   {
00061     absl::MutexLock locker(&mutex_);
00062     idle_ = false;
00063     CHECK(running_);
00064     auto insert_result =
00065         tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
00066     CHECK(insert_result.second) << "ScheduleWhenReady called twice";
00067     shared_task = insert_result.first->second;
00068   }
00069   SetThreadPool(shared_task.get());
00070   return shared_task;
00071 }
00072 
00073 void ThreadPoolForTesting::WaitUntilIdle() {
00074   const auto predicate = [this]()
00075                              EXCLUSIVE_LOCKS_REQUIRED(mutex_) { return idle_; };
00076   for (;;) {
00077     {
00078       absl::MutexLock locker(&mutex_);
00079       if (mutex_.AwaitWithTimeout(absl::Condition(&predicate),
00080                                   absl::FromChrono(common::FromSeconds(0.1)))) {
00081         return;
00082       }
00083     }
00084   }
00085 }
00086 
00087 void ThreadPoolForTesting::DoWork() {
00088   const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00089     return !task_queue_.empty() || !running_;
00090   };
00091   for (;;) {
00092     std::shared_ptr<Task> task;
00093     {
00094       absl::MutexLock locker(&mutex_);
00095       mutex_.AwaitWithTimeout(absl::Condition(&predicate),
00096                               absl::FromChrono(common::FromSeconds(0.1)));
00097       if (!task_queue_.empty()) {
00098         task = task_queue_.front();
00099         task_queue_.pop_front();
00100       }
00101       if (!running_) {
00102         return;
00103       }
00104       if (tasks_not_ready_.empty() && task_queue_.empty() && !task) {
00105         idle_ = true;
00106       }
00107     }
00108     if (task) Execute(task.get());
00109   }
00110 }
00111 
00112 }  // namespace testing
00113 }  // namespace common
00114 }  // namespace cartographer


cartographer
Author(s): The Cartographer Authors
autogenerated on Thu May 9 2019 02:27:36