10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
17 template <
typename Function,
typename... Args>
struct FunctionWrapperWithNotification
19 static void run(Notification*
n, Function
f, Args...
args) {
27 template <
typename Function,
typename... Args>
struct FunctionWrapperWithBarrier
29 static void run(Barrier*
b, Function
f, Args...
args) {
37 template <
typename SyncType>
47 virtual ~Allocator() {}
48 virtual void* allocate(
size_t num_bytes)
const = 0;
49 virtual void deallocate(
void*
buffer)
const = 0;
53 struct ThreadPoolDevice {
55 ThreadPoolDevice(ThreadPoolInterface* pool,
int num_cores, Allocator* allocator =
nullptr)
56 : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
59 return allocator_ ? allocator_->allocate(num_bytes)
65 allocator_->deallocate(
buffer);
72 return allocate(num_bytes);
79 template<
typename Type>
86 ::memcpy(dst, src,
n);
92 const size_t kMinBlockSize = 32768;
93 const size_t num_threads = CostModel::numThreads(
n, TensorOpCost(1.0, 1.0, 0), 4);
94 if (
n <= kMinBlockSize || num_threads < 2) {
95 ::memcpy(dst, src,
n);
97 const char* src_ptr =
static_cast<const char*
>(src);
98 char* dst_ptr =
static_cast<char*
>(dst);
99 const size_t blocksize = (
n + (num_threads - 1)) / num_threads;
100 Barrier barrier(
static_cast<int>(num_threads - 1));
102 for (
size_t i = 1;
i < num_threads; ++
i) {
103 enqueue_with_barrier(&barrier, [
n,
i, src_ptr, dst_ptr, blocksize] {
104 ::memcpy(dst_ptr +
i * blocksize, src_ptr +
i * blocksize,
109 ::memcpy(dst_ptr, src_ptr, blocksize);
132 return pool_->NumThreads();
149 template <
class Function,
class... Args>
151 Args&&...
args)
const {
152 Notification*
n =
new Notification();
155 std::move(
f),
args...));
159 template <
class Function,
class... Args>
161 Args&&...
args)
const {
164 std::move(
f),
args...));
167 template <
class Function,
class... Args>
169 Args&&...
args)
const {
170 if (
sizeof...(
args) > 0) {
171 pool_->Schedule(std::bind(std::move(
f),
args...));
173 pool_->Schedule(std::move(
f));
180 return pool_->CurrentThreadId();
190 void parallelFor(
Index n,
const TensorOpCost& cost,
196 }
else if (
n == 1 || numThreads() == 1 ||
197 CostModel::numThreads(
n, cost,
static_cast<int>(numThreads())) == 1) {
203 ParallelForBlock
block = CalculateParallelForBlock(
n, cost, block_align);
208 Barrier barrier(
static_cast<unsigned int>(
block.count));
209 std::function<void(
Index,
Index)> handleRange;
210 handleRange = [=, &handleRange, &barrier, &
f](
Index firstIdx,
212 while (lastIdx - firstIdx >
block.size) {
215 pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
219 f(firstIdx, lastIdx);
223 if (
block.count <= numThreads()) {
230 pool_->Schedule([=, &handleRange]() { handleRange(0,
n); });
237 void parallelFor(
Index n,
const TensorOpCost& cost,
239 parallelFor(
n, cost,
nullptr, std::move(
f));
249 void parallelForAsync(
Index n,
const TensorOpCost& cost,
252 std::function<
void()> done)
const {
254 if (
n <= 1 || numThreads() == 1 ||
255 CostModel::numThreads(
n, cost,
static_cast<int>(numThreads())) == 1) {
262 ParallelForBlock
block = CalculateParallelForBlock(
n, cost, block_align);
264 ParallelForAsyncContext*
const ctx =
265 new ParallelForAsyncContext(
block.count, std::move(
f), std::move(done));
270 ctx->handle_range = [
this, ctx,
block](
Index firstIdx,
Index lastIdx) {
271 while (lastIdx - firstIdx >
block.size) {
275 [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
280 ctx->f(firstIdx, lastIdx);
283 if (ctx->count.fetch_sub(1) == 1)
delete ctx;
286 if (
block.count <= numThreads()) {
289 ctx->handle_range(0,
n);
293 pool_->Schedule([ctx,
n]() { ctx->handle_range(0,
n); });
298 void parallelForAsync(
Index n,
const TensorOpCost& cost,
300 std::function<
void()> done)
const {
301 parallelForAsync(
n, cost,
nullptr, std::move(
f), std::move(done));
305 ThreadPoolInterface* getPool()
const {
return pool_; }
308 Allocator* allocator()
const {
return allocator_; }
311 typedef TensorCostModel<ThreadPoolDevice> CostModel;
315 struct ParallelForAsyncContext {
316 ParallelForAsyncContext(
Index block_count,
318 std::function<
void()> done_callback)
319 : count(block_count),
321 done(
std::
move(done_callback)) {}
322 ~ParallelForAsyncContext() { done(); }
324 std::atomic<Index> count;
326 std::function<void()> done;
328 std::function<void(
Index,
Index)> handle_range;
331 struct ParallelForBlock {
341 ParallelForBlock CalculateParallelForBlock(
342 const Index n,
const TensorOpCost& cost,
343 std::function<
Index(
Index)> block_align)
const {
344 const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
345 const Index max_oversharding_factor = 4;
347 n, numext::maxi<Index>(
348 divup<Index>(
n, max_oversharding_factor * numThreads()),
353 Index new_block_size = block_align(block_size);
362 double max_efficiency =
363 static_cast<double>(block_count) /
364 (divup<int>(block_count, numThreads()) * numThreads());
368 for (
Index prev_block_count = block_count;
369 max_efficiency < 1.0 && prev_block_count > 1;) {
372 Index coarser_block_size =
divup(
n, prev_block_count - 1);
374 Index new_block_size = block_align(coarser_block_size);
378 if (coarser_block_size > max_block_size) {
382 const Index coarser_block_count =
divup(
n, coarser_block_size);
384 prev_block_count = coarser_block_count;
385 const double coarser_efficiency =
386 static_cast<double>(coarser_block_count) /
387 (divup<int>(coarser_block_count, numThreads()) * numThreads());
388 if (coarser_efficiency + 0.01 >= max_efficiency) {
390 block_size = coarser_block_size;
391 block_count = coarser_block_count;
392 if (max_efficiency < coarser_efficiency) {
393 max_efficiency = coarser_efficiency;
398 return {block_size, block_count};
401 ThreadPoolInterface* pool_;
403 Allocator* allocator_;
409 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H