00001 /* 00002 * Copyright 2018 The Cartographer Authors 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00017 #include "cartographer/common/internal/testing/thread_pool_for_testing.h" 00018 00019 #ifndef WIN32 00020 #include <unistd.h> 00021 #endif 00022 #include <algorithm> 00023 #include <chrono> 00024 #include <numeric> 00025 00026 #include "absl/memory/memory.h" 00027 #include "cartographer/common/task.h" 00028 #include "cartographer/common/time.h" 00029 #include "glog/logging.h" 00030 00031 namespace cartographer { 00032 namespace common { 00033 namespace testing { 00034 00035 ThreadPoolForTesting::ThreadPoolForTesting() 00036 : thread_([this]() { ThreadPoolForTesting::DoWork(); }) {} 00037 00038 ThreadPoolForTesting::~ThreadPoolForTesting() { 00039 { 00040 absl::MutexLock locker(&mutex_); 00041 CHECK(running_); 00042 running_ = false; 00043 CHECK_EQ(task_queue_.size(), 0); 00044 CHECK_EQ(tasks_not_ready_.size(), 0); 00045 } 00046 thread_.join(); 00047 } 00048 00049 void ThreadPoolForTesting::NotifyDependenciesCompleted(Task* task) { 00050 absl::MutexLock locker(&mutex_); 00051 CHECK(running_); 00052 auto it = tasks_not_ready_.find(task); 00053 CHECK(it != tasks_not_ready_.end()); 00054 task_queue_.push_back(it->second); 00055 tasks_not_ready_.erase(it); 00056 } 00057 00058 std::weak_ptr<Task> ThreadPoolForTesting::Schedule(std::unique_ptr<Task> task) { 00059 std::shared_ptr<Task> shared_task; 00060 { 00061 absl::MutexLock locker(&mutex_); 00062 idle_ = false; 00063 CHECK(running_); 00064 auto insert_result = 00065 tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task))); 00066 CHECK(insert_result.second) << "ScheduleWhenReady called twice"; 00067 shared_task = insert_result.first->second; 00068 } 00069 SetThreadPool(shared_task.get()); 00070 return shared_task; 00071 } 00072 00073 void ThreadPoolForTesting::WaitUntilIdle() { 00074 const auto predicate = [this]() 00075 EXCLUSIVE_LOCKS_REQUIRED(mutex_) { return idle_; }; 00076 for (;;) { 00077 { 00078 absl::MutexLock locker(&mutex_); 00079 if (mutex_.AwaitWithTimeout(absl::Condition(&predicate), 00080 absl::FromChrono(common::FromSeconds(0.1)))) { 00081 return; 00082 } 00083 } 00084 } 00085 } 00086 00087 void ThreadPoolForTesting::DoWork() { 00088 const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) { 00089 return !task_queue_.empty() || !running_; 00090 }; 00091 for (;;) { 00092 std::shared_ptr<Task> task; 00093 { 00094 absl::MutexLock locker(&mutex_); 00095 mutex_.AwaitWithTimeout(absl::Condition(&predicate), 00096 absl::FromChrono(common::FromSeconds(0.1))); 00097 if (!task_queue_.empty()) { 00098 task = task_queue_.front(); 00099 task_queue_.pop_front(); 00100 } 00101 if (!running_) { 00102 return; 00103 } 00104 if (tasks_not_ready_.empty() && task_queue_.empty() && !task) { 00105 idle_ = true; 00106 } 00107 } 00108 if (task) Execute(task.get()); 00109 } 00110 } 00111 00112 } // namespace testing 00113 } // namespace common 00114 } // namespace cartographer