cxx11_non_blocking_thread_pool.cpp
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 // Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
6 //
7 // This Source Code Form is subject to the terms of the Mozilla
8 // Public License v. 2.0. If a copy of the MPL was not distributed
9 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
10 
11 #define EIGEN_USE_THREADS
12 #include "main.h"
13 #include "Eigen/CXX11/ThreadPool"
14 #include "Eigen/CXX11/Tensor"
15 
17 {
18  // Just create and destroy the pool. This will wind up and tear down worker
19  // threads. Ensure there are no issues in that logic.
20  for (int i = 0; i < 16; ++i) {
21  ThreadPool tp(i);
22  }
23 }
24 
25 
26 static void test_parallelism(bool allow_spinning)
27 {
28  // Test we never-ever fail to match available tasks with idle threads.
29  const int kThreads = 16; // code below expects that this is a multiple of 4
30  ThreadPool tp(kThreads, allow_spinning);
31  VERIFY_IS_EQUAL(tp.NumThreads(), kThreads);
33  for (int iter = 0; iter < 100; ++iter) {
34  std::atomic<int> running(0);
35  std::atomic<int> done(0);
36  std::atomic<int> phase(0);
37  // Schedule kThreads tasks and ensure that they all are running.
38  for (int i = 0; i < kThreads; ++i) {
39  tp.Schedule([&]() {
40  const int thread_id = tp.CurrentThreadId();
41  VERIFY_GE(thread_id, 0);
42  VERIFY_LE(thread_id, kThreads - 1);
43  running++;
44  while (phase < 1) {
45  }
46  done++;
47  });
48  }
49  while (running != kThreads) {
50  }
51  running = 0;
52  phase = 1;
53  // Now, while the previous tasks exit, schedule another kThreads tasks and
54  // ensure that they are running.
55  for (int i = 0; i < kThreads; ++i) {
56  tp.Schedule([&, i]() {
57  running++;
58  while (phase < 2) {
59  }
60  // When all tasks are running, half of tasks exit, quarter of tasks
61  // continue running and quarter of tasks schedule another 2 tasks each.
62  // Concurrently main thread schedules another quarter of tasks.
63  // This gives us another kThreads tasks and we ensure that they all
64  // are running.
65  if (i < kThreads / 2) {
66  } else if (i < 3 * kThreads / 4) {
67  running++;
68  while (phase < 3) {
69  }
70  done++;
71  } else {
72  for (int j = 0; j < 2; ++j) {
73  tp.Schedule([&]() {
74  running++;
75  while (phase < 3) {
76  }
77  done++;
78  });
79  }
80  }
81  done++;
82  });
83  }
84  while (running != kThreads) {
85  }
86  running = 0;
87  phase = 2;
88  for (int i = 0; i < kThreads / 4; ++i) {
89  tp.Schedule([&]() {
90  running++;
91  while (phase < 3) {
92  }
93  done++;
94  });
95  }
96  while (running != kThreads) {
97  }
98  phase = 3;
99  while (done != 3 * kThreads) {
100  }
101  }
102 }
103 
104 
105 static void test_cancel()
106 {
107  ThreadPool tp(2);
108 
109  // Schedule a large number of closure that each sleeps for one second. This
110  // will keep the thread pool busy for much longer than the default test timeout.
111  for (int i = 0; i < 1000; ++i) {
112  tp.Schedule([]() {
113  std::this_thread::sleep_for(std::chrono::milliseconds(2000));
114  });
115  }
116 
117  // Cancel the processing of all the closures that are still pending.
118  tp.Cancel();
119 }
120 
121 static void test_pool_partitions() {
122  const int kThreads = 2;
123  ThreadPool tp(kThreads);
124 
125  // Assign each thread to its own partition, so that stealing other work only
126  // occurs globally when a thread is idle.
127  std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads);
128  for (int i = 0; i < kThreads; ++i) {
129  steal_partitions[i] = std::make_pair(i, i + 1);
130  }
131  tp.SetStealPartitions(steal_partitions);
132 
133  std::atomic<int> running(0);
134  std::atomic<int> done(0);
135  std::atomic<int> phase(0);
136 
137  // Schedule kThreads tasks and ensure that they all are running.
138  for (int i = 0; i < kThreads; ++i) {
139  tp.Schedule([&]() {
140  const int thread_id = tp.CurrentThreadId();
141  VERIFY_GE(thread_id, 0);
142  VERIFY_LE(thread_id, kThreads - 1);
143  ++running;
144  while (phase < 1) {
145  }
146  ++done;
147  });
148  }
149  while (running != kThreads) {
150  }
151  // Schedule each closure to only run on thread 'i' and verify that it does.
152  for (int i = 0; i < kThreads; ++i) {
153  tp.ScheduleWithHint(
154  [&, i]() {
155  ++running;
156  const int thread_id = tp.CurrentThreadId();
157  VERIFY_IS_EQUAL(thread_id, i);
158  while (phase < 2) {
159  }
160  ++done;
161  },
162  i, i + 1);
163  }
164  running = 0;
165  phase = 1;
166  while (running != kThreads) {
167  }
168  running = 0;
169  phase = 2;
170 }
171 
172 
173 EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool)
174 {
180 }
Eigen::ThreadPoolTempl::CurrentThreadId
int CurrentThreadId() const EIGEN_FINAL
Definition: NonBlockingThreadPool.h:155
VERIFY_LE
#define VERIFY_LE(a, b)
Definition: main.h:383
Eigen::ThreadPoolTempl::SetStealPartitions
void SetStealPartitions(const std::vector< std::pair< unsigned, unsigned >> &partitions)
Definition: NonBlockingThreadPool.h:88
VERIFY_GE
#define VERIFY_GE(a, b)
Definition: main.h:382
VERIFY_IS_EQUAL
#define VERIFY_IS_EQUAL(a, b)
Definition: main.h:386
test_pool_partitions
static void test_pool_partitions()
Definition: cxx11_non_blocking_thread_pool.cpp:121
test_cancel
static void test_cancel()
Definition: cxx11_non_blocking_thread_pool.cpp:105
Eigen::ThreadPoolTempl::ScheduleWithHint
void ScheduleWithHint(std::function< void()> fn, int start, int limit) override
Definition: NonBlockingThreadPool.h:105
j
std::ptrdiff_t j
Definition: tut_arithmetic_redux_minmax.cpp:2
Eigen::ThreadPoolTempl::Schedule
void Schedule(std::function< void()> fn) EIGEN_OVERRIDE
Definition: NonBlockingThreadPool.h:101
test_create_destroy_empty_pool
static void test_create_destroy_empty_pool()
Definition: cxx11_non_blocking_thread_pool.cpp:16
Eigen::ThreadPoolTempl::NumThreads
int NumThreads() const EIGEN_FINAL
Definition: NonBlockingThreadPool.h:153
Eigen::ThreadPoolTempl
Definition: NonBlockingThreadPool.h:16
main.h
iter
iterator iter(handle obj)
Definition: pytypes.h:2475
test_parallelism
static void test_parallelism(bool allow_spinning)
Definition: cxx11_non_blocking_thread_pool.cpp:26
Eigen::ThreadPoolTempl::Cancel
void Cancel() EIGEN_OVERRIDE
Definition: NonBlockingThreadPool.h:138
EIGEN_DECLARE_TEST
EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool)
Definition: cxx11_non_blocking_thread_pool.cpp:173
i
int i
Definition: BiCGSTAB_step_by_step.cpp:9
CALL_SUBTEST
#define CALL_SUBTEST(FUNC)
Definition: main.h:399


gtsam
Author(s):
autogenerated on Sat Nov 16 2024 04:02:07