00001 /* 00002 * Copyright 2016 The Cartographer Authors 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00017 #include "cartographer/common/thread_pool.h" 00018 00019 #ifndef WIN32 00020 #include <unistd.h> 00021 #endif 00022 #include <algorithm> 00023 #include <chrono> 00024 #include <numeric> 00025 00026 #include "absl/memory/memory.h" 00027 #include "cartographer/common/task.h" 00028 #include "glog/logging.h" 00029 00030 namespace cartographer { 00031 namespace common { 00032 00033 void ThreadPoolInterface::Execute(Task* task) { task->Execute(); } 00034 00035 void ThreadPoolInterface::SetThreadPool(Task* task) { 00036 task->SetThreadPool(this); 00037 } 00038 00039 ThreadPool::ThreadPool(int num_threads) { 00040 absl::MutexLock locker(&mutex_); 00041 for (int i = 0; i != num_threads; ++i) { 00042 pool_.emplace_back([this]() { ThreadPool::DoWork(); }); 00043 } 00044 } 00045 00046 ThreadPool::~ThreadPool() { 00047 { 00048 absl::MutexLock locker(&mutex_); 00049 CHECK(running_); 00050 running_ = false; 00051 } 00052 for (std::thread& thread : pool_) { 00053 thread.join(); 00054 } 00055 } 00056 00057 void ThreadPool::NotifyDependenciesCompleted(Task* task) { 00058 absl::MutexLock locker(&mutex_); 00059 auto it = tasks_not_ready_.find(task); 00060 CHECK(it != tasks_not_ready_.end()); 00061 task_queue_.push_back(it->second); 00062 tasks_not_ready_.erase(it); 00063 } 00064 00065 std::weak_ptr<Task> ThreadPool::Schedule(std::unique_ptr<Task> task) { 00066 std::shared_ptr<Task> shared_task; 00067 { 00068 absl::MutexLock locker(&mutex_); 00069 auto insert_result = 00070 tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task))); 00071 CHECK(insert_result.second) << "Schedule called twice"; 00072 shared_task = insert_result.first->second; 00073 } 00074 SetThreadPool(shared_task.get()); 00075 return shared_task; 00076 } 00077 00078 void ThreadPool::DoWork() { 00079 #ifdef __linux__ 00080 // This changes the per-thread nice level of the current thread on Linux. We 00081 // do this so that the background work done by the thread pool is not taking 00082 // away CPU resources from more important foreground threads. 00083 CHECK_NE(nice(10), -1); 00084 #endif 00085 const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) { 00086 return !task_queue_.empty() || !running_; 00087 }; 00088 for (;;) { 00089 std::shared_ptr<Task> task; 00090 { 00091 absl::MutexLock locker(&mutex_); 00092 mutex_.Await(absl::Condition(&predicate)); 00093 if (!task_queue_.empty()) { 00094 task = std::move(task_queue_.front()); 00095 task_queue_.pop_front(); 00096 } else if (!running_) { 00097 return; 00098 } 00099 } 00100 CHECK(task); 00101 CHECK_EQ(task->GetState(), common::Task::DEPENDENCIES_COMPLETED); 00102 Execute(task.get()); 00103 } 00104 } 00105 00106 } // namespace common 00107 } // namespace cartographer