TensorDeviceThreadPool.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
12 
13 namespace Eigen {
14 
15 // Runs an arbitrary function and then calls Notify() on the passed in
16 // Notification.
17 template <typename Function, typename... Args> struct FunctionWrapperWithNotification
18 {
19  static void run(Notification* n, Function f, Args... args) {
20  f(args...);
21  if (n) {
22  n->Notify();
23  }
24  }
25 };
26 
27 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
28 {
29  static void run(Barrier* b, Function f, Args... args) {
30  f(args...);
31  if (b) {
32  b->Notify();
33  }
34  }
35 };
36 
37 template <typename SyncType>
38 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
39  if (n) {
40  n->Wait();
41  }
42 }
43 
44 // An abstract interface to a device specific memory allocator.
45 class Allocator {
46  public:
47  virtual ~Allocator() {}
48  virtual void* allocate(size_t num_bytes) const = 0;
49  virtual void deallocate(void* buffer) const = 0;
50 };
51 
52 // Build a thread pool device on top the an existing pool of threads.
53 struct ThreadPoolDevice {
54  // The ownership of the thread pool remains with the caller.
55  ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
56  : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
57 
58  EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
59  return allocator_ ? allocator_->allocate(num_bytes)
60  : internal::aligned_malloc(num_bytes);
61  }
62 
63  EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
64  if (allocator_) {
65  allocator_->deallocate(buffer);
66  } else {
68  }
69  }
70 
71  EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const {
72  return allocate(num_bytes);
73  }
74 
75  EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const {
76  deallocate(buffer);
77  }
78 
79  template<typename Type>
81  return data;
82  }
83 
84  EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
85 #ifdef __ANDROID__
86  ::memcpy(dst, src, n);
87 #else
88  // TODO(rmlarsen): Align blocks on cache lines.
89  // We have observed that going beyond 4 threads usually just wastes
90  // CPU cycles due to the threads competing for memory bandwidth, so we
91  // statically schedule at most 4 block copies here.
92  const size_t kMinBlockSize = 32768;
93  const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
94  if (n <= kMinBlockSize || num_threads < 2) {
95  ::memcpy(dst, src, n);
96  } else {
97  const char* src_ptr = static_cast<const char*>(src);
98  char* dst_ptr = static_cast<char*>(dst);
99  const size_t blocksize = (n + (num_threads - 1)) / num_threads;
100  Barrier barrier(static_cast<int>(num_threads - 1));
101  // Launch the last 3 blocks on worker threads.
102  for (size_t i = 1; i < num_threads; ++i) {
103  enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
104  ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize,
105  numext::mini(blocksize, n - (i * blocksize)));
106  });
107  }
108  // Launch the first block on the main thread.
109  ::memcpy(dst_ptr, src_ptr, blocksize);
110  barrier.Wait();
111  }
112 #endif
113  }
114  EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
115  memcpy(dst, src, n);
116  }
117  EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
118  memcpy(dst, src, n);
119  }
120 
121  EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
122  ::memset(buffer, c, n);
123  }
124 
125  EIGEN_STRONG_INLINE int numThreads() const {
126  return num_threads_;
127  }
128 
129  // Number of theads available in the underlying thread pool. This number can
130  // be different from the value returned by numThreads().
131  EIGEN_STRONG_INLINE int numThreadsInPool() const {
132  return pool_->NumThreads();
133  }
134 
135  EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
136  return l1CacheSize();
137  }
138 
139  EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
140  // The l3 cache size is shared between all the cores.
141  return l3CacheSize() / num_threads_;
142  }
143 
144  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
145  // Should return an enum that encodes the ISA supported by the CPU
146  return 1;
147  }
148 
149  template <class Function, class... Args>
150  EIGEN_STRONG_INLINE Notification* enqueue(Function&& f,
151  Args&&... args) const {
152  Notification* n = new Notification();
153  pool_->Schedule(
155  std::move(f), args...));
156  return n;
157  }
158 
159  template <class Function, class... Args>
160  EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f,
161  Args&&... args) const {
162  pool_->Schedule(
164  std::move(f), args...));
165  }
166 
167  template <class Function, class... Args>
168  EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f,
169  Args&&... args) const {
170  if (sizeof...(args) > 0) {
171  pool_->Schedule(std::bind(std::move(f), args...));
172  } else {
173  pool_->Schedule(std::move(f));
174  }
175  }
176 
177  // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
178  // called from one of the threads in pool_. Returns -1 otherwise.
179  EIGEN_STRONG_INLINE int currentThreadId() const {
180  return pool_->CurrentThreadId();
181  }
182 
183  // WARNING: This function is synchronous and will block the calling thread.
184  //
185  // Synchronous parallelFor executes f with [0, n) arguments in parallel and
186  // waits for completion. F accepts a half-open interval [first, last). Block
187  // size is chosen based on the iteration cost and resulting parallel
188  // efficiency. If block_align is not nullptr, it is called to round up the
189  // block size.
190  void parallelFor(Index n, const TensorOpCost& cost,
191  std::function<Index(Index)> block_align,
192  std::function<void(Index, Index)> f) const {
193  if (EIGEN_PREDICT_FALSE(n <= 0)){
194  return;
195  // Compute small problems directly in the caller thread.
196  } else if (n == 1 || numThreads() == 1 ||
197  CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
198  f(0, n);
199  return;
200  }
201 
202  // Compute block size and total count of blocks.
203  ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
204 
205  // Recursively divide size into halves until we reach block_size.
206  // Division code rounds mid to block_size, so we are guaranteed to get
207  // block_count leaves that do actual computations.
208  Barrier barrier(static_cast<unsigned int>(block.count));
209  std::function<void(Index, Index)> handleRange;
210  handleRange = [=, &handleRange, &barrier, &f](Index firstIdx,
211  Index lastIdx) {
212  while (lastIdx - firstIdx > block.size) {
213  // Split into halves and schedule the second half on a different thread.
214  const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
215  pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
216  lastIdx = midIdx;
217  }
218  // Single block or less, execute directly.
219  f(firstIdx, lastIdx);
220  barrier.Notify();
221  };
222 
223  if (block.count <= numThreads()) {
224  // Avoid a thread hop by running the root of the tree and one block on the
225  // main thread.
226  handleRange(0, n);
227  } else {
228  // Execute the root in the thread pool to avoid running work on more than
229  // numThreads() threads.
230  pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
231  }
232 
233  barrier.Wait();
234  }
235 
236  // Convenience wrapper for parallelFor that does not align blocks.
237  void parallelFor(Index n, const TensorOpCost& cost,
238  std::function<void(Index, Index)> f) const {
239  parallelFor(n, cost, nullptr, std::move(f));
240  }
241 
242  // WARNING: This function is asynchronous and will not block the calling thread.
243  //
244  // Asynchronous parallelFor executes f with [0, n) arguments in parallel
245  // without waiting for completion. When the last block finished, it will call
246  // 'done' callback. F accepts a half-open interval [first, last). Block size
247  // is chosen based on the iteration cost and resulting parallel efficiency. If
248  // block_align is not nullptr, it is called to round up the block size.
249  void parallelForAsync(Index n, const TensorOpCost& cost,
250  std::function<Index(Index)> block_align,
251  std::function<void(Index, Index)> f,
252  std::function<void()> done) const {
253  // Compute small problems directly in the caller thread.
254  if (n <= 1 || numThreads() == 1 ||
255  CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
256  f(0, n);
257  done();
258  return;
259  }
260 
261  // Compute block size and total count of blocks.
262  ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
263 
264  ParallelForAsyncContext* const ctx =
265  new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
266 
267  // Recursively divide size into halves until we reach block_size.
268  // Division code rounds mid to block_size, so we are guaranteed to get
269  // block_count leaves that do actual computations.
270  ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
271  while (lastIdx - firstIdx > block.size) {
272  // Split into halves and schedule the second half on a different thread.
273  const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
274  pool_->Schedule(
275  [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
276  lastIdx = midIdx;
277  }
278 
279  // Single block or less, execute directly.
280  ctx->f(firstIdx, lastIdx);
281 
282  // Delete async context if it was the last block.
283  if (ctx->count.fetch_sub(1) == 1) delete ctx;
284  };
285 
286  if (block.count <= numThreads()) {
287  // Avoid a thread hop by running the root of the tree and one block on the
288  // main thread.
289  ctx->handle_range(0, n);
290  } else {
291  // Execute the root in the thread pool to avoid running work on more than
292  // numThreads() threads.
293  pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
294  }
295  }
296 
297  // Convenience wrapper for parallelForAsync that does not align blocks.
298  void parallelForAsync(Index n, const TensorOpCost& cost,
299  std::function<void(Index, Index)> f,
300  std::function<void()> done) const {
301  parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
302  }
303 
304  // Thread pool accessor.
305  ThreadPoolInterface* getPool() const { return pool_; }
306 
307  // Allocator accessor.
308  Allocator* allocator() const { return allocator_; }
309 
310  private:
311  typedef TensorCostModel<ThreadPoolDevice> CostModel;
312 
313  // For parallelForAsync we must keep passed in closures on the heap, and
314  // delete them only after `done` callback finished.
315  struct ParallelForAsyncContext {
316  ParallelForAsyncContext(Index block_count,
317  std::function<void(Index, Index)> block_f,
318  std::function<void()> done_callback)
319  : count(block_count),
320  f(std::move(block_f)),
321  done(std::move(done_callback)) {}
322  ~ParallelForAsyncContext() { done(); }
323 
324  std::atomic<Index> count;
325  std::function<void(Index, Index)> f;
326  std::function<void()> done;
327 
328  std::function<void(Index, Index)> handle_range;
329  };
330 
331  struct ParallelForBlock {
332  Index size; // block size
333  Index count; // number of blocks
334  };
335 
336  // Calculates block size based on (1) the iteration cost and (2) parallel
337  // efficiency. We want blocks to be not too small to mitigate parallelization
338  // overheads; not too large to mitigate tail effect and potential load
339  // imbalance and we also want number of blocks to be evenly dividable across
340  // threads.
341  ParallelForBlock CalculateParallelForBlock(
342  const Index n, const TensorOpCost& cost,
343  std::function<Index(Index)> block_align) const {
344  const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
345  const Index max_oversharding_factor = 4;
346  Index block_size = numext::mini(
347  n, numext::maxi<Index>(
348  divup<Index>(n, max_oversharding_factor * numThreads()),
349  block_size_f));
350  const Index max_block_size = numext::mini(n, 2 * block_size);
351 
352  if (block_align) {
353  Index new_block_size = block_align(block_size);
354  eigen_assert(new_block_size >= block_size);
355  block_size = numext::mini(n, new_block_size);
356  }
357 
358  Index block_count = divup(n, block_size);
359 
360  // Calculate parallel efficiency as fraction of total CPU time used for
361  // computations:
362  double max_efficiency =
363  static_cast<double>(block_count) /
364  (divup<int>(block_count, numThreads()) * numThreads());
365 
366  // Now try to increase block size up to max_block_size as long as it
367  // doesn't decrease parallel efficiency.
368  for (Index prev_block_count = block_count;
369  max_efficiency < 1.0 && prev_block_count > 1;) {
370  // This is the next block size that divides size into a smaller number
371  // of blocks than the current block_size.
372  Index coarser_block_size = divup(n, prev_block_count - 1);
373  if (block_align) {
374  Index new_block_size = block_align(coarser_block_size);
375  eigen_assert(new_block_size >= coarser_block_size);
376  coarser_block_size = numext::mini(n, new_block_size);
377  }
378  if (coarser_block_size > max_block_size) {
379  break; // Reached max block size. Stop.
380  }
381  // Recalculate parallel efficiency.
382  const Index coarser_block_count = divup(n, coarser_block_size);
383  eigen_assert(coarser_block_count < prev_block_count);
384  prev_block_count = coarser_block_count;
385  const double coarser_efficiency =
386  static_cast<double>(coarser_block_count) /
387  (divup<int>(coarser_block_count, numThreads()) * numThreads());
388  if (coarser_efficiency + 0.01 >= max_efficiency) {
389  // Taking it.
390  block_size = coarser_block_size;
391  block_count = coarser_block_count;
392  if (max_efficiency < coarser_efficiency) {
393  max_efficiency = coarser_efficiency;
394  }
395  }
396  }
397 
398  return {block_size, block_count};
399  }
400 
401  ThreadPoolInterface* pool_;
402  int num_threads_;
403  Allocator* allocator_;
404 };
405 
406 
407 } // end namespace Eigen
408 
409 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
EIGEN_DEVICE_FUNC
#define EIGEN_DEVICE_FUNC
Definition: Macros.h:976
Eigen
Namespace containing all symbols from the Eigen library.
Definition: jet.h:637
Eigen::l3CacheSize
std::ptrdiff_t l3CacheSize()
Definition: products/GeneralBlockPanelKernel.h:2626
c
Scalar Scalar * c
Definition: benchVecAdd.cpp:17
b
Scalar * b
Definition: benchVecAdd.cpp:17
EIGEN_PREDICT_FALSE
#define EIGEN_PREDICT_FALSE(x)
Definition: Macros.h:1321
eigen_assert
#define eigen_assert(x)
Definition: Macros.h:1037
buffer
Definition: pytypes.h:2270
Eigen::internal::aligned_malloc
EIGEN_DEVICE_FUNC void * aligned_malloc(std::size_t size)
Definition: Memory.h:174
block
m m block(1, 0, 2, 2)<< 4
size
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
n
int n
Definition: BiCGSTAB_simple.cpp:1
Eigen::internal::aligned_free
EIGEN_DEVICE_FUNC void aligned_free(void *ptr)
Definition: Memory.h:198
Type
Definition: typing.h:67
data
int data[]
Definition: Map_placement_new.cpp:1
Eigen::numext::mini
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T mini(const T &x, const T &y)
Definition: Eigen/src/Core/MathFunctions.h:1085
EIGEN_STRONG_INLINE
#define EIGEN_STRONG_INLINE
Definition: Macros.h:917
gtsam.examples.DogLegOptimizerExample.run
def run(args)
Definition: DogLegOptimizerExample.py:21
tree::f
Point2(* f)(const Point3 &, OptionalJacobian< 2, 3 >)
Definition: testExpression.cpp:218
move
detail::enable_if_t<!detail::move_never< T >::value, T > move(object &&obj)
Definition: cast.h:1243
std
Definition: BFloat16.h:88
args
Definition: pytypes.h:2210
Eigen::divup
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T divup(const X x, const Y y)
Definition: TensorMeta.h:30
Eigen::l1CacheSize
std::ptrdiff_t l1CacheSize()
Definition: products/GeneralBlockPanelKernel.h:2607
get
Container::iterator get(Container &c, Position position)
Definition: stdlist_overload.cpp:29
i
int i
Definition: BiCGSTAB_step_by_step.cpp:9
Eigen::Index
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition: Meta.h:74


gtsam
Author(s):
autogenerated on Tue Jan 7 2025 04:05:38