gtsam
3rdparty
Eigen
unsupported
test
cxx11_non_blocking_thread_pool.cpp
Go to the documentation of this file.
1
// This file is part of Eigen, a lightweight C++ template library
2
// for linear algebra.
3
//
4
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5
// Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
6
//
7
// This Source Code Form is subject to the terms of the Mozilla
8
// Public License v. 2.0. If a copy of the MPL was not distributed
9
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
10
11
#define EIGEN_USE_THREADS
12
#include "
main.h
"
13
#include "Eigen/CXX11/ThreadPool"
14
#include "Eigen/CXX11/Tensor"
15
16
static
void
test_create_destroy_empty_pool
()
17
{
18
// Just create and destroy the pool. This will wind up and tear down worker
19
// threads. Ensure there are no issues in that logic.
20
for
(
int
i
= 0;
i
< 16; ++
i
) {
21
ThreadPool
tp(
i
);
22
}
23
}
24
25
26
static
void
test_parallelism
(
bool
allow_spinning)
27
{
28
// Test we never-ever fail to match available tasks with idle threads.
29
const
int
kThreads = 16;
// code below expects that this is a multiple of 4
30
ThreadPool
tp(kThreads, allow_spinning);
31
VERIFY_IS_EQUAL
(tp.
NumThreads
(), kThreads);
32
VERIFY_IS_EQUAL
(tp.
CurrentThreadId
(), -1);
33
for
(
int
iter
= 0;
iter
< 100; ++
iter
) {
34
std::atomic<int> running(0);
35
std::atomic<int> done(0);
36
std::atomic<int> phase(0);
37
// Schedule kThreads tasks and ensure that they all are running.
38
for
(
int
i
= 0;
i
< kThreads; ++
i
) {
39
tp.
Schedule
([&]() {
40
const
int
thread_id = tp.
CurrentThreadId
();
41
VERIFY_GE
(thread_id, 0);
42
VERIFY_LE
(thread_id, kThreads - 1);
43
running++;
44
while
(phase < 1) {
45
}
46
done++;
47
});
48
}
49
while
(running != kThreads) {
50
}
51
running = 0;
52
phase = 1;
53
// Now, while the previous tasks exit, schedule another kThreads tasks and
54
// ensure that they are running.
55
for
(
int
i
= 0;
i
< kThreads; ++
i
) {
56
tp.
Schedule
([&,
i
]() {
57
running++;
58
while
(phase < 2) {
59
}
60
// When all tasks are running, half of tasks exit, quarter of tasks
61
// continue running and quarter of tasks schedule another 2 tasks each.
62
// Concurrently main thread schedules another quarter of tasks.
63
// This gives us another kThreads tasks and we ensure that they all
64
// are running.
65
if
(
i
< kThreads / 2) {
66
}
else
if
(
i
< 3 * kThreads / 4) {
67
running++;
68
while
(phase < 3) {
69
}
70
done++;
71
}
else
{
72
for
(
int
j
= 0;
j
< 2; ++
j
) {
73
tp.
Schedule
([&]() {
74
running++;
75
while
(phase < 3) {
76
}
77
done++;
78
});
79
}
80
}
81
done++;
82
});
83
}
84
while
(running != kThreads) {
85
}
86
running = 0;
87
phase = 2;
88
for
(
int
i
= 0;
i
< kThreads / 4; ++
i
) {
89
tp.
Schedule
([&]() {
90
running++;
91
while
(phase < 3) {
92
}
93
done++;
94
});
95
}
96
while
(running != kThreads) {
97
}
98
phase = 3;
99
while
(done != 3 * kThreads) {
100
}
101
}
102
}
103
104
105
static
void
test_cancel
()
106
{
107
ThreadPool
tp(2);
108
109
// Schedule a large number of closure that each sleeps for one second. This
110
// will keep the thread pool busy for much longer than the default test timeout.
111
for
(
int
i
= 0;
i
< 1000; ++
i
) {
112
tp.
Schedule
([]() {
113
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
114
});
115
}
116
117
// Cancel the processing of all the closures that are still pending.
118
tp.
Cancel
();
119
}
120
121
static
void
test_pool_partitions
() {
122
const
int
kThreads = 2;
123
ThreadPool
tp(kThreads);
124
125
// Assign each thread to its own partition, so that stealing other work only
126
// occurs globally when a thread is idle.
127
std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads);
128
for
(
int
i
= 0;
i
< kThreads; ++
i
) {
129
steal_partitions[
i
] = std::make_pair(
i
,
i
+ 1);
130
}
131
tp.
SetStealPartitions
(steal_partitions);
132
133
std::atomic<int> running(0);
134
std::atomic<int> done(0);
135
std::atomic<int> phase(0);
136
137
// Schedule kThreads tasks and ensure that they all are running.
138
for
(
int
i
= 0;
i
< kThreads; ++
i
) {
139
tp.
Schedule
([&]() {
140
const
int
thread_id = tp.
CurrentThreadId
();
141
VERIFY_GE
(thread_id, 0);
142
VERIFY_LE
(thread_id, kThreads - 1);
143
++running;
144
while
(phase < 1) {
145
}
146
++done;
147
});
148
}
149
while
(running != kThreads) {
150
}
151
// Schedule each closure to only run on thread 'i' and verify that it does.
152
for
(
int
i
= 0;
i
< kThreads; ++
i
) {
153
tp.
ScheduleWithHint
(
154
[&,
i
]() {
155
++running;
156
const
int
thread_id = tp.
CurrentThreadId
();
157
VERIFY_IS_EQUAL
(thread_id,
i
);
158
while
(phase < 2) {
159
}
160
++done;
161
},
162
i
,
i
+ 1);
163
}
164
running = 0;
165
phase = 1;
166
while
(running != kThreads) {
167
}
168
running = 0;
169
phase = 2;
170
}
171
172
173
EIGEN_DECLARE_TEST
(cxx11_non_blocking_thread_pool)
174
{
175
CALL_SUBTEST
(
test_create_destroy_empty_pool
());
176
CALL_SUBTEST
(
test_parallelism
(
true
));
177
CALL_SUBTEST
(
test_parallelism
(
false
));
178
CALL_SUBTEST
(
test_cancel
());
179
CALL_SUBTEST
(
test_pool_partitions
());
180
}
Eigen::ThreadPoolTempl::CurrentThreadId
int CurrentThreadId() const EIGEN_FINAL
Definition:
NonBlockingThreadPool.h:155
VERIFY_LE
#define VERIFY_LE(a, b)
Definition:
main.h:383
Eigen::ThreadPoolTempl::SetStealPartitions
void SetStealPartitions(const std::vector< std::pair< unsigned, unsigned >> &partitions)
Definition:
NonBlockingThreadPool.h:88
VERIFY_GE
#define VERIFY_GE(a, b)
Definition:
main.h:382
VERIFY_IS_EQUAL
#define VERIFY_IS_EQUAL(a, b)
Definition:
main.h:386
test_pool_partitions
static void test_pool_partitions()
Definition:
cxx11_non_blocking_thread_pool.cpp:121
test_cancel
static void test_cancel()
Definition:
cxx11_non_blocking_thread_pool.cpp:105
Eigen::ThreadPoolTempl::ScheduleWithHint
void ScheduleWithHint(std::function< void()> fn, int start, int limit) override
Definition:
NonBlockingThreadPool.h:105
j
std::ptrdiff_t j
Definition:
tut_arithmetic_redux_minmax.cpp:2
Eigen::ThreadPoolTempl::Schedule
void Schedule(std::function< void()> fn) EIGEN_OVERRIDE
Definition:
NonBlockingThreadPool.h:101
test_create_destroy_empty_pool
static void test_create_destroy_empty_pool()
Definition:
cxx11_non_blocking_thread_pool.cpp:16
Eigen::ThreadPoolTempl::NumThreads
int NumThreads() const EIGEN_FINAL
Definition:
NonBlockingThreadPool.h:153
Eigen::ThreadPoolTempl
Definition:
NonBlockingThreadPool.h:16
main.h
iter
iterator iter(handle obj)
Definition:
pytypes.h:2475
test_parallelism
static void test_parallelism(bool allow_spinning)
Definition:
cxx11_non_blocking_thread_pool.cpp:26
Eigen::ThreadPoolTempl::Cancel
void Cancel() EIGEN_OVERRIDE
Definition:
NonBlockingThreadPool.h:138
EIGEN_DECLARE_TEST
EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool)
Definition:
cxx11_non_blocking_thread_pool.cpp:173
i
int i
Definition:
BiCGSTAB_step_by_step.cpp:9
CALL_SUBTEST
#define CALL_SUBTEST(FUNC)
Definition:
main.h:399
gtsam
Author(s):
autogenerated on Wed Jan 1 2025 04:01:22