NonBlockingThreadPool.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
11 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
12 
13 
14 namespace Eigen {
15 
16 template <typename Environment>
18  public:
19  typedef typename Environment::Task Task;
21 
22  NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment())
23  : env_(env),
24  threads_(num_threads),
25  queues_(num_threads),
26  coprimes_(num_threads),
27  waiters_(num_threads),
28  blocked_(0),
29  spinning_(0),
30  done_(false),
31  ec_(waiters_) {
32  waiters_.resize(num_threads);
33 
34  // Calculate coprimes of num_threads.
35  // Coprimes are used for a random walk over all threads in Steal
36  // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
37  // a walk starting thread index t and calculate num_threads - 1 subsequent
38  // indices as (t + coprime) % num_threads, we will cover all threads without
39  // repetitions (effectively getting a presudo-random permutation of thread
40  // indices).
41  for (int i = 1; i <= num_threads; i++) {
42  unsigned a = i;
43  unsigned b = num_threads;
44  // If GCD(a, b) == 1, then a and b are coprimes.
45  while (b != 0) {
46  unsigned tmp = a;
47  a = b;
48  b = tmp % b;
49  }
50  if (a == 1) {
52  }
53  }
54  for (int i = 0; i < num_threads; i++) {
55  queues_.push_back(new Queue());
56  }
57  for (int i = 0; i < num_threads; i++) {
58  threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
59  }
60  }
61 
63  done_ = true;
64  // Now if all threads block without work, they will start exiting.
65  // But note that threads can continue to work arbitrary long,
66  // block, submit new work, unblock and otherwise live full life.
67  ec_.Notify(true);
68 
69  // Join threads explicitly to avoid destruction order issues.
70  for (size_t i = 0; i < threads_.size(); i++) delete threads_[i];
71  for (size_t i = 0; i < threads_.size(); i++) delete queues_[i];
72  }
73 
74  void Schedule(std::function<void()> fn) {
75  Task t = env_.CreateTask(std::move(fn));
77  if (pt->pool == this) {
78  // Worker thread of this pool, push onto the thread's queue.
79  Queue* q = queues_[pt->thread_id];
80  t = q->PushFront(std::move(t));
81  } else {
82  // A free-standing thread (or worker of another pool), push onto a random
83  // queue.
84  Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
85  t = q->PushBack(std::move(t));
86  }
87  // Note: below we touch this after making w available to worker threads.
88  // Strictly speaking, this can lead to a racy-use-after-free. Consider that
89  // Schedule is called from a thread that is neither main thread nor a worker
90  // thread of this pool. Then, execution of w directly or indirectly
91  // completes overall computations, which in turn leads to destruction of
92  // this. We expect that such scenario is prevented by program, that is,
93  // this is kept alive while any threads can potentially be in Schedule.
94  if (!t.f)
95  ec_.Notify(false);
96  else
97  env_.ExecuteTask(t); // Push failed, execute directly.
98  }
99 
100  int NumThreads() const final {
101  return static_cast<int>(threads_.size());
102  }
103 
104  int CurrentThreadId() const final {
105  const PerThread* pt =
106  const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread();
107  if (pt->pool == this) {
108  return pt->thread_id;
109  } else {
110  return -1;
111  }
112  }
113 
114  private:
115  typedef typename Environment::EnvThread Thread;
116 
117  struct PerThread {
118  constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
119  NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads.
120  uint64_t rand; // Random generator state.
121  int thread_id; // Worker thread index in pool.
122  };
123 
124  Environment env_;
129  std::atomic<unsigned> blocked_;
130  std::atomic<bool> spinning_;
131  std::atomic<bool> done_;
133 
134  // Main worker thread loop.
135  void WorkerLoop(int thread_id) {
137  pt->pool = this;
138  pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
139  pt->thread_id = thread_id;
140  Queue* q = queues_[thread_id];
141  EventCount::Waiter* waiter = &waiters_[thread_id];
142  for (;;) {
143  Task t = q->PopFront();
144  if (!t.f) {
145  t = Steal();
146  if (!t.f) {
147  // Leave one thread spinning. This reduces latency.
148  // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it.
149  // Also, the time it takes to attempt to steal work 1000 times depends
150  // on the size of the thread pool. However the speed at which the user
151  // of the thread pool submit tasks is independent of the size of the
152  // pool. Consider a time based limit instead.
153  if (!spinning_ && !spinning_.exchange(true)) {
154  for (int i = 0; i < 1000 && !t.f; i++) {
155  t = Steal();
156  }
157  spinning_ = false;
158  }
159  if (!t.f) {
160  if (!WaitForWork(waiter, &t)) {
161  return;
162  }
163  }
164  }
165  }
166  if (t.f) {
167  env_.ExecuteTask(t);
168  }
169  }
170  }
171 
172  // Steal tries to steal work from other worker threads in best-effort manner.
173  Task Steal() {
175  const size_t size = queues_.size();
176  unsigned r = Rand(&pt->rand);
177  unsigned inc = coprimes_[r % coprimes_.size()];
178  unsigned victim = r % size;
179  for (unsigned i = 0; i < size; i++) {
180  Task t = queues_[victim]->PopBack();
181  if (t.f) {
182  return t;
183  }
184  victim += inc;
185  if (victim >= size) {
186  victim -= size;
187  }
188  }
189  return Task();
190  }
191 
192  // WaitForWork blocks until new work is available (returns true), or if it is
193  // time to exit (returns false). Can optionally return a task to execute in t
194  // (in such case t.f != nullptr on return).
195  bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
196  eigen_assert(!t->f);
197  // We already did best-effort emptiness check in Steal, so prepare for
198  // blocking.
199  ec_.Prewait(waiter);
200  // Now do a reliable emptiness check.
201  int victim = NonEmptyQueueIndex();
202  if (victim != -1) {
203  ec_.CancelWait(waiter);
204  *t = queues_[victim]->PopBack();
205  return true;
206  }
207  // Number of blocked threads is used as termination condition.
208  // If we are shutting down and all worker threads blocked without work,
209  // that's we are done.
210  blocked_++;
211  if (done_ && blocked_ == threads_.size()) {
212  ec_.CancelWait(waiter);
213  // Almost done, but need to re-check queues.
214  // Consider that all queues are empty and all worker threads are preempted
215  // right after incrementing blocked_ above. Now a free-standing thread
216  // submits work and calls destructor (which sets done_). If we don't
217  // re-check queues, we will exit leaving the work unexecuted.
218  if (NonEmptyQueueIndex() != -1) {
219  // Note: we must not pop from queues before we decrement blocked_,
220  // otherwise the following scenario is possible. Consider that instead
221  // of checking for emptiness we popped the only element from queues.
222  // Now other worker threads can start exiting, which is bad if the
223  // work item submits other work. So we just check emptiness here,
224  // which ensures that all worker threads exit at the same time.
225  blocked_--;
226  return true;
227  }
228  // Reached stable termination state.
229  ec_.Notify(true);
230  return false;
231  }
232  ec_.CommitWait(waiter);
233  blocked_--;
234  return true;
235  }
236 
239  const size_t size = queues_.size();
240  unsigned r = Rand(&pt->rand);
241  unsigned inc = coprimes_[r % coprimes_.size()];
242  unsigned victim = r % size;
243  for (unsigned i = 0; i < size; i++) {
244  if (!queues_[victim]->Empty()) {
245  return victim;
246  }
247  victim += inc;
248  if (victim >= size) {
249  victim -= size;
250  }
251  }
252  return -1;
253  }
254 
256  EIGEN_THREAD_LOCAL PerThread per_thread_;
257  PerThread* pt = &per_thread_;
258  return pt;
259  }
260 
261  static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
262  uint64_t current = *state;
263  // Update the internal state
264  *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
265  // Generate the random output (using the PCG-XSH-RS scheme)
266  return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
267  }
268 };
269 
271 
272 } // namespace Eigen
273 
274 #endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
Work PushBack(Work w)
Definition: RunQueue.h:85
#define EIGEN_STRONG_INLINE
Definition: Macros.h:494
Scalar * b
Definition: benchVecAdd.cpp:17
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void push_back(const T &t)
Definition: MaxSizeVector.h:71
NonBlockingThreadPoolTempl(int num_threads, Environment env=Environment())
Namespace containing all symbols from the Eigen library.
Definition: jet.h:637
static const Point3 pt(1.0, 2.0, 3.0)
Work PopFront()
Definition: RunQueue.h:69
bool WaitForWork(EventCount::Waiter *waiter, Task *t)
Array33i a
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
static EIGEN_STRONG_INLINE PerThread * GetPerThread()
#define eigen_assert(x)
Definition: Macros.h:579
unsigned __int64 uint64_t
Definition: ms_stdint.h:95
EIGEN_DEVICE_FUNC const Scalar & q
#define NULL
Definition: ccolamd.c:609
void Notify(bool all)
Definition: EventCount.h:131
void CancelWait(Waiter *w)
Definition: EventCount.h:106
Work PushFront(Work w)
Definition: RunQueue.h:54
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t size() const
#define EIGEN_THREAD_LOCAL
Definition: ThreadLocal.h:19
void Schedule(std::function< void()> fn)
MaxSizeVector< EventCount::Waiter > waiters_
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t *state)
NonBlockingThreadPoolTempl< StlThreadEnvironment > NonBlockingThreadPool
void Prewait(Waiter *w)
Definition: EventCount.h:67
Point2 t(10, 10)
Definition: env.py:1
void CommitWait(Waiter *w)
Definition: EventCount.h:73


gtsam
Author(s):
autogenerated on Sat May 8 2021 02:43:03