thread_pool.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2016 The Cartographer Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <unistd.h>
20 #include <algorithm>
21 #include <chrono>
22 #include <numeric>
23 
26 #include "glog/logging.h"
27 
28 namespace cartographer {
29 namespace common {
30 
31 void ThreadPoolInterface::Execute(Task* task) { task->Execute(); }
32 
34  task->SetThreadPool(this);
35 }
36 
37 ThreadPool::ThreadPool(int num_threads) {
38  MutexLocker locker(&mutex_);
39  for (int i = 0; i != num_threads; ++i) {
40  pool_.emplace_back([this]() { ThreadPool::DoWork(); });
41  }
42 }
43 
45  {
46  MutexLocker locker(&mutex_);
47  CHECK(running_);
48  running_ = false;
49  CHECK_EQ(task_queue_.size(), 0);
50  CHECK_EQ(tasks_not_ready_.size(), 0);
51  }
52  for (std::thread& thread : pool_) {
53  thread.join();
54  }
55 }
56 
58  MutexLocker locker(&mutex_);
59  CHECK(running_);
60  auto it = tasks_not_ready_.find(task);
61  CHECK(it != tasks_not_ready_.end());
62  task_queue_.push_back(it->second);
63  tasks_not_ready_.erase(it);
64 }
65 
66 std::weak_ptr<Task> ThreadPool::Schedule(std::unique_ptr<Task> task) {
67  std::shared_ptr<Task> shared_task;
68  {
69  MutexLocker locker(&mutex_);
70  CHECK(running_);
71  auto insert_result =
72  tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
73  CHECK(insert_result.second) << "Schedule called twice";
74  shared_task = insert_result.first->second;
75  }
76  SetThreadPool(shared_task.get());
77  return shared_task;
78 }
79 
81 #ifdef __linux__
82  // This changes the per-thread nice level of the current thread on Linux. We
83  // do this so that the background work done by the thread pool is not taking
84  // away CPU resources from more important foreground threads.
85  CHECK_NE(nice(10), -1);
86 #endif
87  for (;;) {
88  std::shared_ptr<Task> task;
89  {
90  MutexLocker locker(&mutex_);
91  locker.Await([this]() REQUIRES(mutex_) {
92  return !task_queue_.empty() || !running_;
93  });
94  if (!task_queue_.empty()) {
95  task = std::move(task_queue_.front());
96  task_queue_.pop_front();
97  } else if (!running_) {
98  return;
99  }
100  }
101  CHECK(task);
102  CHECK_EQ(task->GetState(), common::Task::DEPENDENCIES_COMPLETED);
103  Execute(task.get());
104  }
105 }
106 
107 } // namespace common
108 } // namespace cartographer
void NotifyDependenciesCompleted(Task *task) EXCLUDES(mutex_) override
Definition: thread_pool.cc:57
std::map< Task *, std::shared_ptr< Task > > tasks_not_ready_
Definition: task_test.cc:64
#define REQUIRES(...)
Definition: mutex.h:44
void Execute() EXCLUDES(mutex_)
Definition: task.cc:87
void SetThreadPool(ThreadPoolInterface *thread_pool) EXCLUDES(mutex_)
Definition: task.cc:54
std::deque< std::shared_ptr< Task > > task_queue_
Definition: task_test.cc:63
std::weak_ptr< Task > Schedule(std::unique_ptr< Task > task) EXCLUDES(mutex_) override
Definition: thread_pool.cc:66
Mutex::Locker MutexLocker
Definition: mutex.h:95
Mutex mutex_


cartographer
Author(s): The Cartographer Authors
autogenerated on Mon Feb 28 2022 22:00:58