ThreadLocal.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
11 #define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
12 
13 #ifdef EIGEN_AVOID_THREAD_LOCAL
14 
15 #ifdef EIGEN_THREAD_LOCAL
16 #undef EIGEN_THREAD_LOCAL
17 #endif
18 
19 #else
20 
21 #if EIGEN_MAX_CPP_VER >= 11 && \
22  ((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \
23  __has_feature(cxx_thread_local) || \
24  (EIGEN_COMP_MSVC >= 1900) )
25 #define EIGEN_THREAD_LOCAL static thread_local
26 #endif
27 
28 // Disable TLS for Apple and Android builds with older toolchains.
29 #if defined(__APPLE__)
30 // Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED,
31 // __IPHONE_8_0.
32 #include <Availability.h>
33 #include <TargetConditionals.h>
34 #endif
35 // Checks whether C++11's `thread_local` storage duration specifier is
36 // supported.
37 #if defined(__apple_build_version__) && \
38  ((__apple_build_version__ < 8000042) || \
39  (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0))
40 // Notes: Xcode's clang did not support `thread_local` until version
41 // 8, and even then not for all iOS < 9.0.
42 #undef EIGEN_THREAD_LOCAL
43 
44 #elif defined(__ANDROID__) && EIGEN_COMP_CLANG
45 // There are platforms for which TLS should not be used even though the compiler
46 // makes it seem like it's supported (Android NDK < r12b for example).
47 // This is primarily because of linker problems and toolchain misconfiguration:
48 // TLS isn't supported until NDK r12b per
49 // https://developer.android.com/ndk/downloads/revision_history.html
50 // Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in
51 // <android/ndk-version.h>. For NDK < r16, users should define these macros,
52 // e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11.
53 #if __has_include(<android/ndk-version.h>)
54 #include <android/ndk-version.h>
55 #endif // __has_include(<android/ndk-version.h>)
56 #if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \
57  defined(__NDK_MINOR__) && \
58  ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1)))
59 #undef EIGEN_THREAD_LOCAL
60 #endif
61 #endif // defined(__ANDROID__) && defined(__clang__)
62 
63 #endif // EIGEN_AVOID_THREAD_LOCAL
64 
65 namespace Eigen {
66 
67 namespace internal {
68 template <typename T>
70  void operator()(T&) const {}
71 };
72 
73 template <typename T>
75  void operator()(T&) const {}
76 };
77 
78 } // namespace internal
79 
80 // Thread local container for elements of type T, that does not use thread local
81 // storage. As long as the number of unique threads accessing this storage
82 // is smaller than `capacity_`, it is lock-free and wait-free. Otherwise it will
83 // use a mutex for synchronization.
84 //
85 // Type `T` has to be default constructible, and by default each thread will get
86 // a default constructed value. It is possible to specify custom `initialize`
87 // callable, that will be called lazily from each thread accessing this object,
88 // and will be passed a default initialized object of type `T`. Also it's
89 // possible to pass a custom `release` callable, that will be invoked before
90 // calling ~T().
91 //
92 // Example:
93 //
94 // struct Counter {
95 // int value = 0;
96 // }
97 //
98 // Eigen::ThreadLocal<Counter> counter(10);
99 //
100 // // Each thread will have access to it's own counter object.
101 // Counter& cnt = counter.local();
102 // cnt++;
103 //
104 // WARNING: Eigen::ThreadLocal uses the OS-specific value returned by
105 // std::this_thread::get_id() to identify threads. This value is not guaranteed
106 // to be unique except for the life of the thread. A newly created thread may
107 // get an OS-specific ID equal to that of an already destroyed thread.
108 //
109 // Somewhat similar to TBB thread local storage, with similar restrictions:
110 // https://www.threadingbuildingblocks.org/docs/help/reference/thread_local_storage/enumerable_thread_specific_cls.html
111 //
112 template <typename T,
113  typename Initialize = internal::ThreadLocalNoOpInitialize<T>,
114  typename Release = internal::ThreadLocalNoOpRelease<T>>
115 class ThreadLocal {
116  // We preallocate default constructed elements in MaxSizedVector.
118  "ThreadLocal data type must be default constructible");
119 
120  public:
121  explicit ThreadLocal(int capacity)
122  : ThreadLocal(capacity, internal::ThreadLocalNoOpInitialize<T>(),
123  internal::ThreadLocalNoOpRelease<T>()) {}
124 
125  ThreadLocal(int capacity, Initialize initialize)
126  : ThreadLocal(capacity, std::move(initialize),
127  internal::ThreadLocalNoOpRelease<T>()) {}
128 
129  ThreadLocal(int capacity, Initialize initialize, Release release)
130  : initialize_(std::move(initialize)),
131  release_(std::move(release)),
132  capacity_(capacity),
133  data_(capacity_),
134  ptr_(capacity_),
135  filled_records_(0) {
136  eigen_assert(capacity_ >= 0);
137  data_.resize(capacity_);
138  for (int i = 0; i < capacity_; ++i) {
139  ptr_.emplace_back(nullptr);
140  }
141  }
142 
143  T& local() {
144  std::thread::id this_thread = std::this_thread::get_id();
145  if (capacity_ == 0) return SpilledLocal(this_thread);
146 
147  std::size_t h = std::hash<std::thread::id>()(this_thread);
148  const int start_idx = h % capacity_;
149 
150  // NOTE: From the definition of `std::this_thread::get_id()` it is
151  // guaranteed that we never can have concurrent insertions with the same key
152  // to our hash-map like data structure. If we didn't find an element during
153  // the initial traversal, it's guaranteed that no one else could have
154  // inserted it while we are in this function. This allows to massively
155  // simplify out lock-free insert-only hash map.
156 
157  // Check if we already have an element for `this_thread`.
158  int idx = start_idx;
159  while (ptr_[idx].load() != nullptr) {
160  ThreadIdAndValue& record = *(ptr_[idx].load());
161  if (record.thread_id == this_thread) return record.value;
162 
163  idx += 1;
164  if (idx >= capacity_) idx -= capacity_;
165  if (idx == start_idx) break;
166  }
167 
168  // If we are here, it means that we found an insertion point in lookup
169  // table at `idx`, or we did a full traversal and table is full.
170 
171  // If lock-free storage is full, fallback on mutex.
172  if (filled_records_.load() >= capacity_) return SpilledLocal(this_thread);
173 
174  // We double check that we still have space to insert an element into a lock
175  // free storage. If old value in `filled_records_` is larger than the
176  // records capacity, it means that some other thread added an element while
177  // we were traversing lookup table.
178  int insertion_index =
179  filled_records_.fetch_add(1, std::memory_order_relaxed);
180  if (insertion_index >= capacity_) return SpilledLocal(this_thread);
181 
182  // At this point it's guaranteed that we can access to
183  // data_[insertion_index_] without a data race.
184  data_[insertion_index].thread_id = this_thread;
185  initialize_(data_[insertion_index].value);
186 
187  // That's the pointer we'll put into the lookup table.
188  ThreadIdAndValue* inserted = &data_[insertion_index];
189 
190  // We'll use nullptr pointer to ThreadIdAndValue in a compare-and-swap loop.
191  ThreadIdAndValue* empty = nullptr;
192 
193  // Now we have to find an insertion point into the lookup table. We start
194  // from the `idx` that was identified as an insertion point above, it's
195  // guaranteed that we will have an empty record somewhere in a lookup table
196  // (because we created a record in the `data_`).
197  const int insertion_idx = idx;
198 
199  do {
200  // Always start search from the original insertion candidate.
201  idx = insertion_idx;
202  while (ptr_[idx].load() != nullptr) {
203  idx += 1;
204  if (idx >= capacity_) idx -= capacity_;
205  // If we did a full loop, it means that we don't have any free entries
206  // in the lookup table, and this means that something is terribly wrong.
207  eigen_assert(idx != insertion_idx);
208  }
209  // Atomic CAS of the pointer guarantees that any other thread, that will
210  // follow this pointer will see all the mutations in the `data_`.
211  } while (!ptr_[idx].compare_exchange_weak(empty, inserted));
212 
213  return inserted->value;
214  }
215 
216  // WARN: It's not thread safe to call it concurrently with `local()`.
217  void ForEach(std::function<void(std::thread::id, T&)> f) {
218  // Reading directly from `data_` is unsafe, because only CAS to the
219  // record in `ptr_` makes all changes visible to other threads.
220  for (auto& ptr : ptr_) {
221  ThreadIdAndValue* record = ptr.load();
222  if (record == nullptr) continue;
223  f(record->thread_id, record->value);
224  }
225 
226  // We did not spill into the map based storage.
227  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
228 
229  // Adds a happens before edge from the last call to SpilledLocal().
230  std::unique_lock<std::mutex> lock(mu_);
231  for (auto& kv : per_thread_map_) {
232  f(kv.first, kv.second);
233  }
234  }
235 
236  // WARN: It's not thread safe to call it concurrently with `local()`.
238  // Reading directly from `data_` is unsafe, because only CAS to the record
239  // in `ptr_` makes all changes visible to other threads.
240  for (auto& ptr : ptr_) {
241  ThreadIdAndValue* record = ptr.load();
242  if (record == nullptr) continue;
243  release_(record->value);
244  }
245 
246  // We did not spill into the map based storage.
247  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
248 
249  // Adds a happens before edge from the last call to SpilledLocal().
250  std::unique_lock<std::mutex> lock(mu_);
251  for (auto& kv : per_thread_map_) {
252  release_(kv.second);
253  }
254  }
255 
256  private:
260  };
261 
262  // Use unordered map guarded by a mutex when lock free storage is full.
264  std::unique_lock<std::mutex> lock(mu_);
265 
266  auto it = per_thread_map_.find(this_thread);
267  if (it == per_thread_map_.end()) {
268  auto result = per_thread_map_.emplace(this_thread, T());
269  eigen_assert(result.second);
270  initialize_((*result.first).second);
271  return (*result.first).second;
272  } else {
273  return it->second;
274  }
275  }
276 
277  Initialize initialize_;
278  Release release_;
279  const int capacity_;
280 
281  // Storage that backs lock-free lookup table `ptr_`. Records stored in this
282  // storage contiguously starting from index 0.
284 
285  // Atomic pointers to the data stored in `data_`. Used as a lookup table for
286  // linear probing hash map (https://en.wikipedia.org/wiki/Linear_probing).
288 
289  // Number of records stored in the `data_`.
290  std::atomic<int> filled_records_;
291 
292  // We fallback on per thread map if lock-free storage is full. In practice
293  // this should never happen, if `capacity_` is a reasonable estimate of the
294  // number of threads running in a system.
295  std::mutex mu_; // Protects per_thread_map_.
296  std::unordered_map<std::thread::id, T> per_thread_map_;
297 };
298 
299 } // namespace Eigen
300 
301 #endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
Initialize initialize_
Definition: ThreadLocal.h:277
std::unordered_map< std::thread::id, T > per_thread_map_
Definition: ThreadLocal.h:296
Namespace containing all symbols from the Eigen library.
Definition: jet.h:637
ThreadLocal(int capacity, Initialize initialize, Release release)
Definition: ThreadLocal.h:129
const int capacity_
Definition: ThreadLocal.h:279
Definition: BFloat16.h:88
void ForEach(std::function< void(std::thread::id, T &)> f)
Definition: ThreadLocal.h:217
MaxSizeVector< std::atomic< ThreadIdAndValue * > > ptr_
Definition: ThreadLocal.h:287
static const Similarity3 id
detail::enable_if_t<!detail::move_never< T >::value, T > move(object &&obj)
Definition: cast.h:1080
MaxSizeVector< ThreadIdAndValue > data_
Definition: ThreadLocal.h:283
Values result
ThreadLocal(int capacity)
Definition: ThreadLocal.h:121
std::atomic< int > filled_records_
Definition: ThreadLocal.h:290
#define eigen_assert(x)
Definition: Macros.h:1037
Eigen::Triplet< double > T
Point2(* f)(const Point3 &, OptionalJacobian< 2, 3 >)
T & SpilledLocal(std::thread::id this_thread)
Definition: ThreadLocal.h:263
ThreadLocal(int capacity, Initialize initialize)
Definition: ThreadLocal.h:125
const double h
The MaxSizeVector class.
Definition: MaxSizeVector.h:31
Values initialize(const NonlinearFactorGraph &graph, bool useOdometricPath)
Definition: lago.cpp:375


gtsam
Author(s):
autogenerated on Tue Jul 4 2023 02:39:57