thread_pool.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright 2016 The Cartographer Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *      http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #include "cartographer/common/thread_pool.h"
00018 
00019 #ifndef WIN32
00020 #include <unistd.h>
00021 #endif
00022 #include <algorithm>
00023 #include <chrono>
00024 #include <numeric>
00025 
00026 #include "absl/memory/memory.h"
00027 #include "cartographer/common/task.h"
00028 #include "glog/logging.h"
00029 
00030 namespace cartographer {
00031 namespace common {
00032 
00033 void ThreadPoolInterface::Execute(Task* task) { task->Execute(); }
00034 
00035 void ThreadPoolInterface::SetThreadPool(Task* task) {
00036   task->SetThreadPool(this);
00037 }
00038 
00039 ThreadPool::ThreadPool(int num_threads) {
00040   absl::MutexLock locker(&mutex_);
00041   for (int i = 0; i != num_threads; ++i) {
00042     pool_.emplace_back([this]() { ThreadPool::DoWork(); });
00043   }
00044 }
00045 
00046 ThreadPool::~ThreadPool() {
00047   {
00048     absl::MutexLock locker(&mutex_);
00049     CHECK(running_);
00050     running_ = false;
00051   }
00052   for (std::thread& thread : pool_) {
00053     thread.join();
00054   }
00055 }
00056 
00057 void ThreadPool::NotifyDependenciesCompleted(Task* task) {
00058   absl::MutexLock locker(&mutex_);
00059   auto it = tasks_not_ready_.find(task);
00060   CHECK(it != tasks_not_ready_.end());
00061   task_queue_.push_back(it->second);
00062   tasks_not_ready_.erase(it);
00063 }
00064 
00065 std::weak_ptr<Task> ThreadPool::Schedule(std::unique_ptr<Task> task) {
00066   std::shared_ptr<Task> shared_task;
00067   {
00068     absl::MutexLock locker(&mutex_);
00069     auto insert_result =
00070         tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
00071     CHECK(insert_result.second) << "Schedule called twice";
00072     shared_task = insert_result.first->second;
00073   }
00074   SetThreadPool(shared_task.get());
00075   return shared_task;
00076 }
00077 
00078 void ThreadPool::DoWork() {
00079 #ifdef __linux__
00080   // This changes the per-thread nice level of the current thread on Linux. We
00081   // do this so that the background work done by the thread pool is not taking
00082   // away CPU resources from more important foreground threads.
00083   CHECK_NE(nice(10), -1);
00084 #endif
00085   const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00086     return !task_queue_.empty() || !running_;
00087   };
00088   for (;;) {
00089     std::shared_ptr<Task> task;
00090     {
00091       absl::MutexLock locker(&mutex_);
00092       mutex_.Await(absl::Condition(&predicate));
00093       if (!task_queue_.empty()) {
00094         task = std::move(task_queue_.front());
00095         task_queue_.pop_front();
00096       } else if (!running_) {
00097         return;
00098       }
00099     }
00100     CHECK(task);
00101     CHECK_EQ(task->GetState(), common::Task::DEPENDENCIES_COMPLETED);
00102     Execute(task.get());
00103   }
00104 }
00105 
00106 }  // namespace common
00107 }  // namespace cartographer


cartographer
Author(s): The Cartographer Authors
autogenerated on Thu May 9 2019 02:27:36