TensorDeviceThreadPool.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
12 
13 namespace Eigen {
14 
15 // Use the SimpleThreadPool by default. We'll switch to the new non blocking
16 // thread pool later.
17 #ifndef EIGEN_USE_SIMPLE_THREAD_POOL
18 template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>;
19 typedef NonBlockingThreadPool ThreadPool;
20 #else
21 template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>;
22 typedef SimpleThreadPool ThreadPool;
23 #endif
24 
25 
26 // Barrier is an object that allows one or more threads to wait until
27 // Notify has been called a specified number of times.
28 class Barrier {
29  public:
30  Barrier(unsigned int count) : state_(count << 1), notified_(false) {
31  eigen_assert(((count << 1) >> 1) == count);
32  }
33  ~Barrier() {
34  eigen_assert((state_>>1) == 0);
35  }
36 
37  void Notify() {
38  unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
39  if (v != 1) {
40  eigen_assert(((v + 2) & ~1) != 0);
41  return; // either count has not dropped to 0, or waiter is not waiting
42  }
43  std::unique_lock<std::mutex> l(mu_);
44  eigen_assert(!notified_);
45  notified_ = true;
46  cv_.notify_all();
47  }
48 
49  void Wait() {
50  unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
51  if ((v >> 1) == 0) return;
52  std::unique_lock<std::mutex> l(mu_);
53  while (!notified_) {
54  cv_.wait(l);
55  }
56  }
57 
58  private:
59  std::mutex mu_;
60  std::condition_variable cv_;
61  std::atomic<unsigned int> state_; // low bit is waiter flag
62  bool notified_;
63 };
64 
65 
66 // Notification is an object that allows a user to to wait for another
67 // thread to signal a notification that an event has occurred.
68 //
69 // Multiple threads can wait on the same Notification object,
70 // but only one caller must call Notify() on the object.
71 struct Notification : Barrier {
72  Notification() : Barrier(1) {};
73 };
74 
75 
76 // Runs an arbitrary function and then calls Notify() on the passed in
77 // Notification.
78 template <typename Function, typename... Args> struct FunctionWrapperWithNotification
79 {
80  static void run(Notification* n, Function f, Args... args) {
81  f(args...);
82  if (n) {
83  n->Notify();
84  }
85  }
86 };
87 
88 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
89 {
90  static void run(Barrier* b, Function f, Args... args) {
91  f(args...);
92  if (b) {
93  b->Notify();
94  }
95  }
96 };
97 
98 template <typename SyncType>
99 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
100  if (n) {
101  n->Wait();
102  }
103 }
104 
105 
106 // Build a thread pool device on top the an existing pool of threads.
107 struct ThreadPoolDevice {
108  // The ownership of the thread pool remains with the caller.
109  ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) : pool_(pool), num_threads_(num_cores) { }
110 
111  EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
112  return internal::aligned_malloc(num_bytes);
113  }
114 
115  EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
116  internal::aligned_free(buffer);
117  }
118 
119  EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
120  ::memcpy(dst, src, n);
121  }
122  EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
123  memcpy(dst, src, n);
124  }
125  EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
126  memcpy(dst, src, n);
127  }
128 
129  EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
130  ::memset(buffer, c, n);
131  }
132 
133  EIGEN_STRONG_INLINE int numThreads() const {
134  return num_threads_;
135  }
136 
137  EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
138  return l1CacheSize();
139  }
140 
141  EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
142  // The l3 cache size is shared between all the cores.
143  return l3CacheSize() / num_threads_;
144  }
145 
146  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
147  // Should return an enum that encodes the ISA supported by the CPU
148  return 1;
149  }
150 
151  template <class Function, class... Args>
152  EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
153  Notification* n = new Notification();
154  pool_->Schedule(std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...));
155  return n;
156  }
157 
158  template <class Function, class... Args>
159  EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b,
160  Function&& f,
161  Args&&... args) const {
162  pool_->Schedule(std::bind(
164  }
165 
166  template <class Function, class... Args>
167  EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
168  pool_->Schedule(std::bind(f, args...));
169  }
170 
171  // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
172  // called from one of the threads in pool_. Returns -1 otherwise.
173  EIGEN_STRONG_INLINE int currentThreadId() const {
174  return pool_->CurrentThreadId();
175  }
176 
177  // parallelFor executes f with [0, n) arguments in parallel and waits for
178  // completion. F accepts a half-open interval [first, last).
179  // Block size is choosen based on the iteration cost and resulting parallel
180  // efficiency. If block_align is not nullptr, it is called to round up the
181  // block size.
182  void parallelFor(Index n, const TensorOpCost& cost,
183  std::function<Index(Index)> block_align,
184  std::function<void(Index, Index)> f) const {
185  typedef TensorCostModel<ThreadPoolDevice> CostModel;
186  if (n <= 1 || numThreads() == 1 ||
187  CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
188  f(0, n);
189  return;
190  }
191 
192  // Calculate block size based on (1) the iteration cost and (2) parallel
193  // efficiency. We want blocks to be not too small to mitigate
194  // parallelization overheads; not too large to mitigate tail
195  // effect and potential load imbalance and we also want number
196  // of blocks to be evenly dividable across threads.
197 
198  double block_size_f = 1.0 / CostModel::taskSize(1, cost);
199  const Index max_oversharding_factor = 4;
200  Index block_size = numext::mini(
201  n, numext::maxi<Index>(divup<Index>(n, max_oversharding_factor * numThreads()),
202  block_size_f));
203  const Index max_block_size = numext::mini(n, 2 * block_size);
204  if (block_align) {
205  Index new_block_size = block_align(block_size);
206  eigen_assert(new_block_size >= block_size);
207  block_size = numext::mini(n, new_block_size);
208  }
209  Index block_count = divup(n, block_size);
210  // Calculate parallel efficiency as fraction of total CPU time used for
211  // computations:
212  double max_efficiency =
213  static_cast<double>(block_count) /
214  (divup<int>(block_count, numThreads()) * numThreads());
215  // Now try to increase block size up to max_block_size as long as it
216  // doesn't decrease parallel efficiency.
217  for (Index prev_block_count = block_count;
218  max_efficiency < 1.0 && prev_block_count > 1;) {
219  // This is the next block size that divides size into a smaller number
220  // of blocks than the current block_size.
221  Index coarser_block_size = divup(n, prev_block_count - 1);
222  if (block_align) {
223  Index new_block_size = block_align(coarser_block_size);
224  eigen_assert(new_block_size >= coarser_block_size);
225  coarser_block_size = numext::mini(n, new_block_size);
226  }
227  if (coarser_block_size > max_block_size) {
228  break; // Reached max block size. Stop.
229  }
230  // Recalculate parallel efficiency.
231  const Index coarser_block_count = divup(n, coarser_block_size);
232  eigen_assert(coarser_block_count < prev_block_count);
233  prev_block_count = coarser_block_count;
234  const double coarser_efficiency =
235  static_cast<double>(coarser_block_count) /
236  (divup<int>(coarser_block_count, numThreads()) * numThreads());
237  if (coarser_efficiency + 0.01 >= max_efficiency) {
238  // Taking it.
239  block_size = coarser_block_size;
240  block_count = coarser_block_count;
241  if (max_efficiency < coarser_efficiency) {
242  max_efficiency = coarser_efficiency;
243  }
244  }
245  }
246 
247  // Recursively divide size into halves until we reach block_size.
248  // Division code rounds mid to block_size, so we are guaranteed to get
249  // block_count leaves that do actual computations.
250  Barrier barrier(static_cast<unsigned int>(block_count));
251  std::function<void(Index, Index)> handleRange;
252  handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) {
253  if (last - first <= block_size) {
254  // Single block or less, execute directly.
255  f(first, last);
256  barrier.Notify();
257  return;
258  }
259  // Split into halves and submit to the pool.
260  Index mid = first + divup((last - first) / 2, block_size) * block_size;
261  pool_->Schedule([=, &handleRange]() { handleRange(mid, last); });
262  pool_->Schedule([=, &handleRange]() { handleRange(first, mid); });
263  };
264  handleRange(0, n);
265  barrier.Wait();
266  }
267 
268  // Convenience wrapper for parallelFor that does not align blocks.
269  void parallelFor(Index n, const TensorOpCost& cost,
270  std::function<void(Index, Index)> f) const {
271  parallelFor(n, cost, nullptr, std::move(f));
272  }
273 
274  private:
275  ThreadPoolInterface* pool_;
276  int num_threads_;
277 };
278 
279 
280 } // end namespace Eigen
281 
282 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
#define EIGEN_STRONG_INLINE
Definition: Macros.h:494
EIGEN_DEVICE_FUNC void * aligned_malloc(std::size_t size)
Definition: Memory.h:153
constexpr int last(int, int result)
Scalar * b
Definition: benchVecAdd.cpp:17
ArrayXcf v
Definition: Cwise_arg.cpp:1
Definition: pytypes.h:1322
int n
Scalar Scalar * c
Definition: benchVecAdd.cpp:17
Namespace containing all symbols from the Eigen library.
Definition: jet.h:637
std::ptrdiff_t l3CacheSize()
EIGEN_DEVICE_FUNC void aligned_free(void *ptr)
Definition: Memory.h:174
static const Line3 l(Rot3(), 1, 1)
constexpr int first(int i)
Implementation details for constexpr functions.
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition: Meta.h:33
#define eigen_assert(x)
Definition: Macros.h:579
Point2(* f)(const Point3 &, OptionalJacobian< 2, 3 >)
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T mini(const T &x, const T &y)
SimpleThreadPoolTempl< StlThreadEnvironment > SimpleThreadPool
void run(Expr &expr, Dev &dev)
Definition: TensorSyclRun.h:33
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T divup(const X x, const Y y)
Definition: TensorMeta.h:30
NonBlockingThreadPoolTempl< StlThreadEnvironment > NonBlockingThreadPool
std::ptrdiff_t l1CacheSize()


gtsam
Author(s):
autogenerated on Sat May 8 2021 02:45:20