cxx11_tensor_thread_local.cpp
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // This Source Code Form is subject to the terms of the Mozilla
5 // Public License v. 2.0. If a copy of the MPL was not distributed
6 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 
8 #define EIGEN_USE_THREADS
9 
10 #include <iostream>
11 #include <unordered_set>
12 
13 #include "main.h"
14 #include <Eigen/CXX11/ThreadPool>
15 
16 struct Counter {
17  Counter() = default;
18 
19  void inc() {
20  // Check that mutation happens only in a thread that created this counter.
21  VERIFY_IS_EQUAL(std::this_thread::get_id(), created_by);
22  counter_value++;
23  }
24  int value() { return counter_value; }
25 
27  int counter_value = 0;
28 };
29 
30 struct InitCounter {
31  void operator()(Counter& counter) {
32  counter.created_by = std::this_thread::get_id();
33  }
34 };
35 
37  int num_threads = internal::random<int>(4, 32);
38  Eigen::ThreadPool thread_pool(num_threads);
40 
41  int num_tasks = 3 * num_threads;
42  Eigen::Barrier barrier(num_tasks);
43 
44  for (int i = 0; i < num_tasks; ++i) {
45  thread_pool.Schedule([&counter, &barrier]() {
46  Counter& local = counter.local();
47  local.inc();
48 
49  std::this_thread::sleep_for(std::chrono::milliseconds(100));
50  barrier.Notify();
51  });
52  }
53 
54  barrier.Wait();
55 
56  counter.ForEach(
57  [](std::thread::id, Counter& cnt) { VERIFY_IS_EQUAL(cnt.value(), 3); });
58 }
59 
62 
63  Counter& local = counter.local();
64  local.inc();
65 
66  int total = 0;
67  counter.ForEach([&total](std::thread::id, Counter& cnt) {
68  total += cnt.value();
69  VERIFY_IS_EQUAL(cnt.value(), 1);
70  });
71 
72  VERIFY_IS_EQUAL(total, 1);
73 }
74 
75 // All thread local values fits into the lock-free storage.
77  int num_threads = internal::random<int>(4, 32);
78  Eigen::ThreadPool thread_pool(num_threads);
80 
81  int num_tasks = 10000;
82  Eigen::Barrier barrier(num_tasks);
83 
84  for (int i = 0; i < num_tasks; ++i) {
85  thread_pool.Schedule([&counter, &barrier]() {
86  Counter& local = counter.local();
87  local.inc();
88  barrier.Notify();
89  });
90  }
91 
92  barrier.Wait();
93 
94  int total = 0;
95  std::unordered_set<std::thread::id> unique_threads;
96 
97  counter.ForEach([&](std::thread::id id, Counter& cnt) {
98  total += cnt.value();
99  unique_threads.insert(id);
100  });
101 
102  VERIFY_IS_EQUAL(total, num_tasks);
103  // Not all threads in a pool might be woken up to execute submitted tasks.
104  // Also thread_pool.Schedule() might use current thread if queue is full.
106  unique_threads.size() <= (static_cast<size_t>(num_threads + 1)), true);
107 }
108 
109 // Lock free thread local storage is too small to fit all the unique threads,
110 // and it spills to a map guarded by a mutex.
112  int num_threads = internal::random<int>(4, 32);
113  Eigen::ThreadPool thread_pool(num_threads);
115 
116  int num_tasks = 10000;
117  Eigen::Barrier barrier(num_tasks);
118 
119  for (int i = 0; i < num_tasks; ++i) {
120  thread_pool.Schedule([&counter, &barrier]() {
121  Counter& local = counter.local();
122  local.inc();
123  barrier.Notify();
124  });
125  }
126 
127  barrier.Wait();
128 
129  int total = 0;
130  std::unordered_set<std::thread::id> unique_threads;
131 
132  counter.ForEach([&](std::thread::id id, Counter& cnt) {
133  total += cnt.value();
134  unique_threads.insert(id);
135  });
136 
137  VERIFY_IS_EQUAL(total, num_tasks);
138  // Not all threads in a pool might be woken up to execute submitted tasks.
139  // Also thread_pool.Schedule() might use current thread if queue is full.
141  unique_threads.size() <= (static_cast<size_t>(num_threads + 1)), true);
142 }
143 
144 EIGEN_DECLARE_TEST(cxx11_tensor_thread_local) {
149 }
test_simple_thread_local
void test_simple_thread_local()
Definition: cxx11_tensor_thread_local.cpp:36
Eigen::ThreadLocal::ForEach
void ForEach(std::function< void(std::thread::id, T &)> f)
Definition: ThreadLocal.h:217
test_large_number_of_tasks_no_spill
void test_large_number_of_tasks_no_spill()
Definition: cxx11_tensor_thread_local.cpp:76
VERIFY_IS_EQUAL
#define VERIFY_IS_EQUAL(a, b)
Definition: main.h:386
Counter::value
int value()
Definition: cxx11_tensor_thread_local.cpp:24
Counter::counter_value
int counter_value
Definition: cxx11_tensor_thread_local.cpp:27
Eigen::Barrier::Wait
void Wait()
Definition: Barrier.h:40
gtsam::utils.numerical_derivative.local
np.ndarray local(Y a, Y b)
Definition: numerical_derivative.py:33
Counter::created_by
std::thread::id created_by
Definition: cxx11_tensor_thread_local.cpp:26
id
static const Similarity3 id
Definition: testSimilarity3.cpp:44
Counter::inc
void inc()
Definition: cxx11_tensor_thread_local.cpp:19
Eigen::ThreadPoolTempl::Schedule
void Schedule(std::function< void()> fn) EIGEN_OVERRIDE
Definition: NonBlockingThreadPool.h:101
Counter::Counter
Counter()=default
EIGEN_DECLARE_TEST
EIGEN_DECLARE_TEST(cxx11_tensor_thread_local)
Definition: cxx11_tensor_thread_local.cpp:144
Eigen::ThreadPoolTempl
Definition: NonBlockingThreadPool.h:16
Eigen::ThreadLocal::local
T & local()
Definition: ThreadLocal.h:143
InitCounter
Definition: cxx11_tensor_thread_local.cpp:30
test_large_number_of_tasks_with_spill
void test_large_number_of_tasks_with_spill()
Definition: cxx11_tensor_thread_local.cpp:111
main.h
test_zero_sized_thread_local
void test_zero_sized_thread_local()
Definition: cxx11_tensor_thread_local.cpp:60
InitCounter::operator()
void operator()(Counter &counter)
Definition: cxx11_tensor_thread_local.cpp:31
Eigen::Barrier
Definition: Barrier.h:18
Eigen::ThreadLocal
Definition: ThreadLocal.h:115
i
int i
Definition: BiCGSTAB_step_by_step.cpp:9
Eigen::Barrier::Notify
void Notify()
Definition: Barrier.h:25
CALL_SUBTEST
#define CALL_SUBTEST(FUNC)
Definition: main.h:399


gtsam
Author(s):
autogenerated on Tue Jan 7 2025 04:02:07