SimpleThreadPool.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
11 #define EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
12 
13 namespace Eigen {
14 
15 // The implementation of the ThreadPool type ensures that the Schedule method
16 // runs the functions it is provided in FIFO order when the scheduling is done
17 // by a single thread.
18 // Environment provides a way to create threads and also allows to intercept
19 // task submission and execution.
20 template <typename Environment>
22  public:
23  // Construct a pool that contains "num_threads" threads.
24  explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment())
25  : env_(env), threads_(num_threads), waiters_(num_threads) {
26  for (int i = 0; i < num_threads; i++) {
27  threads_.push_back(env.CreateThread([this, i]() { WorkerLoop(i); }));
28  }
29  }
30 
31  // Wait until all scheduled work has finished and then destroy the
32  // set of threads.
34  {
35  // Wait for all work to get done.
36  std::unique_lock<std::mutex> l(mu_);
37  while (!pending_.empty()) {
38  empty_.wait(l);
39  }
40  exiting_ = true;
41 
42  // Wakeup all waiters.
43  for (auto w : waiters_) {
44  w->ready = true;
45  w->task.f = nullptr;
46  w->cv.notify_one();
47  }
48  }
49 
50  // Wait for threads to finish.
51  for (auto t : threads_) {
52  delete t;
53  }
54  }
55 
56  // Schedule fn() for execution in the pool of threads. The functions are
57  // executed in the order in which they are scheduled.
58  void Schedule(std::function<void()> fn) final {
59  Task t = env_.CreateTask(std::move(fn));
60  std::unique_lock<std::mutex> l(mu_);
61  if (waiters_.empty()) {
62  pending_.push_back(std::move(t));
63  } else {
64  Waiter* w = waiters_.back();
65  waiters_.pop_back();
66  w->ready = true;
67  w->task = std::move(t);
68  w->cv.notify_one();
69  }
70  }
71 
72  int NumThreads() const final {
73  return static_cast<int>(threads_.size());
74  }
75 
76  int CurrentThreadId() const final {
77  const PerThread* pt = this->GetPerThread();
78  if (pt->pool == this) {
79  return pt->thread_id;
80  } else {
81  return -1;
82  }
83  }
84 
85  protected:
86  void WorkerLoop(int thread_id) {
87  std::unique_lock<std::mutex> l(mu_);
88  PerThread* pt = GetPerThread();
89  pt->pool = this;
90  pt->thread_id = thread_id;
91  Waiter w;
92  Task t;
93  while (!exiting_) {
94  if (pending_.empty()) {
95  // Wait for work to be assigned to me
96  w.ready = false;
97  waiters_.push_back(&w);
98  while (!w.ready) {
99  w.cv.wait(l);
100  }
101  t = w.task;
102  w.task.f = nullptr;
103  } else {
104  // Pick up pending work
105  t = std::move(pending_.front());
106  pending_.pop_front();
107  if (pending_.empty()) {
108  empty_.notify_all();
109  }
110  }
111  if (t.f) {
112  mu_.unlock();
113  env_.ExecuteTask(t);
114  t.f = nullptr;
115  mu_.lock();
116  }
117  }
118  }
119 
120  private:
121  typedef typename Environment::Task Task;
122  typedef typename Environment::EnvThread Thread;
123 
124  struct Waiter {
125  std::condition_variable cv;
126  Task task;
127  bool ready;
128  };
129 
130  struct PerThread {
131  constexpr PerThread() : pool(NULL), thread_id(-1) { }
132  SimpleThreadPoolTempl* pool; // Parent pool, or null for normal threads.
133  int thread_id; // Worker thread index in pool.
134  };
135 
136  Environment env_;
137  std::mutex mu_;
139  MaxSizeVector<Waiter*> waiters_; // Stack of waiting threads.
140  std::deque<Task> pending_; // Queue of pending work
141  std::condition_variable empty_; // Signaled on pending_.empty()
142  bool exiting_ = false;
143 
145  EIGEN_THREAD_LOCAL PerThread per_thread;
146  return &per_thread;
147  }
148 };
149 
151 
152 } // namespace Eigen
153 
154 #endif // EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
std::condition_variable empty_
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void push_back(const T &t)
Definition: MaxSizeVector.h:71
void WorkerLoop(int thread_id)
Definition: LDLT.h:16
Environment::EnvThread Thread
void Schedule(std::function< void()> fn) final
SimpleThreadPoolTempl(int num_threads, Environment env=Environment())
MaxSizeVector< Waiter * > waiters_
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t size() const
SimpleThreadPoolTempl< StlThreadEnvironment > SimpleThreadPool
TFSIMD_FORCE_INLINE const tfScalar & w() const
PerThread * GetPerThread() const
#define EIGEN_THREAD_LOCAL
Definition: ThreadLocal.h:19
int CurrentThreadId() const final
MaxSizeVector< Thread * > threads_


hebiros
Author(s): Xavier Artache , Matthew Tesch
autogenerated on Thu Sep 3 2020 04:08:47