thread_pool_for_testing.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2018 The Cartographer Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <unistd.h>
20 #include <algorithm>
21 #include <chrono>
22 #include <numeric>
23 
27 #include "glog/logging.h"
28 
29 namespace cartographer {
30 namespace common {
31 namespace testing {
32 
34  : thread_([this]() { ThreadPoolForTesting::DoWork(); }) {}
35 
37  {
38  MutexLocker locker(&mutex_);
39  CHECK(running_);
40  running_ = false;
41  CHECK_EQ(task_queue_.size(), 0);
42  CHECK_EQ(tasks_not_ready_.size(), 0);
43  }
44  thread_.join();
45 }
46 
48  MutexLocker locker(&mutex_);
49  CHECK(running_);
50  auto it = tasks_not_ready_.find(task);
51  CHECK(it != tasks_not_ready_.end());
52  task_queue_.push_back(it->second);
53  tasks_not_ready_.erase(it);
54 }
55 
56 std::weak_ptr<Task> ThreadPoolForTesting::Schedule(std::unique_ptr<Task> task) {
57  std::shared_ptr<Task> shared_task;
58  {
59  MutexLocker locker(&mutex_);
60  idle_ = false;
61  CHECK(running_);
62  auto insert_result =
63  tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
64  CHECK(insert_result.second) << "ScheduleWhenReady called twice";
65  shared_task = insert_result.first->second;
66  }
67  SetThreadPool(shared_task.get());
68  return shared_task;
69 }
70 
72  for (;;) {
73  {
74  common::MutexLocker locker(&mutex_);
75  if (locker.AwaitWithTimeout([this]() REQUIRES(mutex_) { return idle_; },
76  common::FromSeconds(0.1))) {
77  return;
78  }
79  }
80  }
81 }
82 
84  for (;;) {
85  std::shared_ptr<Task> task;
86  {
87  MutexLocker locker(&mutex_);
88  locker.AwaitWithTimeout(
89  [this]()
90  REQUIRES(mutex_) { return !task_queue_.empty() || !running_; },
91  common::FromSeconds(0.1));
92  if (!task_queue_.empty()) {
93  task = task_queue_.front();
94  task_queue_.pop_front();
95  }
96  if (!running_) {
97  return;
98  }
99  if (tasks_not_ready_.empty() && task_queue_.empty() && !task) {
100  idle_ = true;
101  }
102  }
103  if (task) Execute(task.get());
104  }
105 }
106 
107 } // namespace testing
108 } // namespace common
109 } // namespace cartographer
void NotifyDependenciesCompleted(Task *task) EXCLUDES(mutex_) override
std::map< Task *, std::shared_ptr< Task > > tasks_not_ready_
Definition: task_test.cc:64
Duration FromSeconds(const double seconds)
Definition: time.cc:24
#define REQUIRES(...)
Definition: mutex.h:44
std::weak_ptr< Task > Schedule(std::unique_ptr< Task > task) EXCLUDES(mutex_) override
std::deque< std::shared_ptr< Task > > task_queue_
Definition: task_test.cc:63
Mutex::Locker MutexLocker
Definition: mutex.h:95


cartographer
Author(s): The Cartographer Authors
autogenerated on Mon Feb 28 2022 22:00:58