10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H) 
   11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H 
   17 template <
typename Function, 
typename... Args> 
struct FunctionWrapperWithNotification
 
   19   static void run(Notification* 
n, Function 
f, Args... 
args) {
 
   27 template <
typename Function, 
typename... Args> 
struct FunctionWrapperWithBarrier
 
   29   static void run(Barrier* 
b, Function 
f, Args... 
args) {
 
   37 template <
typename SyncType>
 
   47   virtual ~Allocator() {}
 
   48   virtual void* allocate(
size_t num_bytes) 
const = 0;
 
   49   virtual void deallocate(
void* 
buffer) 
const = 0;
 
   53 struct ThreadPoolDevice {
 
   55   ThreadPoolDevice(ThreadPoolInterface* pool, 
int num_cores, Allocator* allocator = 
nullptr)
 
   56       : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
 
   59     return allocator_ ? allocator_->allocate(num_bytes)
 
   65       allocator_->deallocate(
buffer);
 
   72     return allocate(num_bytes);
 
   79   template<
typename Type>
 
   86     ::memcpy(dst, src, 
n);
 
   92     const size_t kMinBlockSize = 32768;
 
   93     const size_t num_threads = CostModel::numThreads(
n, TensorOpCost(1.0, 1.0, 0), 4);
 
   94     if (
n <= kMinBlockSize || num_threads < 2) {
 
   95       ::memcpy(dst, src, 
n);
 
   97       const char* src_ptr = 
static_cast<const char*
>(src);
 
   98       char* dst_ptr = 
static_cast<char*
>(dst);
 
   99       const size_t blocksize = (
n + (num_threads - 1)) / num_threads;
 
  100       Barrier barrier(
static_cast<int>(num_threads - 1));
 
  102       for (
size_t i = 1; 
i < num_threads; ++
i) {
 
  103         enqueue_with_barrier(&barrier, [
n, 
i, src_ptr, dst_ptr, blocksize] {
 
  104           ::memcpy(dst_ptr + 
i * blocksize, src_ptr + 
i * blocksize,
 
  109       ::memcpy(dst_ptr, src_ptr, blocksize);
 
  132     return pool_->NumThreads();
 
  149   template <
class Function, 
class... Args>
 
  151                                             Args&&... 
args)
 const {
 
  152     Notification* 
n = 
new Notification();
 
  155                   std::move(
f), 
args...));
 
  159   template <
class Function, 
class... Args>
 
  161                                                 Args&&... 
args)
 const {
 
  164                   std::move(
f), 
args...));
 
  167   template <
class Function, 
class... Args>
 
  169                                                  Args&&... 
args)
 const {
 
  170     if (
sizeof...(
args) > 0) {
 
  171       pool_->Schedule(std::bind(std::move(
f), 
args...));
 
  173       pool_->Schedule(std::move(
f));
 
  180     return pool_->CurrentThreadId();
 
  190   void parallelFor(
Index n, 
const TensorOpCost& cost,
 
  196     } 
else if (
n == 1 || numThreads() == 1 ||
 
  197                CostModel::numThreads(
n, cost, 
static_cast<int>(numThreads())) == 1) {
 
  203     ParallelForBlock 
block = CalculateParallelForBlock(
n, cost, block_align);
 
  208     Barrier barrier(
static_cast<unsigned int>(
block.count));
 
  209     std::function<void(
Index, 
Index)> handleRange;
 
  210     handleRange = [=, &handleRange, &barrier, &
f](
Index firstIdx,
 
  212       while (lastIdx - firstIdx > 
block.size) {
 
  215         pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
 
  219       f(firstIdx, lastIdx);
 
  223     if (
block.count <= numThreads()) {
 
  230       pool_->Schedule([=, &handleRange]() { handleRange(0, 
n); });
 
  237   void parallelFor(
Index n, 
const TensorOpCost& cost,
 
  239     parallelFor(
n, cost, 
nullptr, std::move(
f));
 
  249   void parallelForAsync(
Index n, 
const TensorOpCost& cost,
 
  252                         std::function<
void()> done)
 const {
 
  254     if (
n <= 1 || numThreads() == 1 ||
 
  255         CostModel::numThreads(
n, cost, 
static_cast<int>(numThreads())) == 1) {
 
  262     ParallelForBlock 
block = CalculateParallelForBlock(
n, cost, block_align);
 
  264     ParallelForAsyncContext* 
const ctx =
 
  265         new ParallelForAsyncContext(
block.count, std::move(
f), std::move(done));
 
  270     ctx->handle_range = [
this, ctx, 
block](
Index firstIdx, 
Index lastIdx) {
 
  271       while (lastIdx - firstIdx > 
block.size) {
 
  275             [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
 
  280       ctx->f(firstIdx, lastIdx);
 
  283       if (ctx->count.fetch_sub(1) == 1) 
delete ctx;
 
  286     if (
block.count <= numThreads()) {
 
  289       ctx->handle_range(0, 
n);
 
  293       pool_->Schedule([ctx, 
n]() { ctx->handle_range(0, 
n); });
 
  298   void parallelForAsync(
Index n, 
const TensorOpCost& cost,
 
  300                         std::function<
void()> done)
 const {
 
  301     parallelForAsync(
n, cost, 
nullptr, std::move(
f), std::move(done));
 
  305   ThreadPoolInterface* getPool()
 const { 
return pool_; }
 
  308   Allocator* allocator()
 const { 
return allocator_; }
 
  311   typedef TensorCostModel<ThreadPoolDevice> CostModel;
 
  315   struct ParallelForAsyncContext {
 
  316     ParallelForAsyncContext(
Index block_count,
 
  318                             std::function<
void()> done_callback)
 
  319         : count(block_count),
 
  321           done(
std::
move(done_callback)) {}
 
  322     ~ParallelForAsyncContext() { done(); }
 
  324     std::atomic<Index> count;
 
  326     std::function<void()> done;
 
  328     std::function<void(
Index, 
Index)> handle_range;
 
  331   struct ParallelForBlock {
 
  341   ParallelForBlock CalculateParallelForBlock(
 
  342       const Index n, 
const TensorOpCost& cost,
 
  343       std::function<
Index(
Index)> block_align)
 const {
 
  344     const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
 
  345     const Index max_oversharding_factor = 4;
 
  347         n, numext::maxi<Index>(
 
  348                divup<Index>(
n, max_oversharding_factor * numThreads()),
 
  353       Index new_block_size = block_align(block_size);
 
  362     double max_efficiency =
 
  363         static_cast<double>(block_count) /
 
  364         (divup<int>(block_count, numThreads()) * numThreads());
 
  368     for (
Index prev_block_count = block_count;
 
  369          max_efficiency < 1.0 && prev_block_count > 1;) {
 
  372       Index coarser_block_size = 
divup(
n, prev_block_count - 1);
 
  374         Index new_block_size = block_align(coarser_block_size);
 
  378       if (coarser_block_size > max_block_size) {
 
  382       const Index coarser_block_count = 
divup(
n, coarser_block_size);
 
  384       prev_block_count = coarser_block_count;
 
  385       const double coarser_efficiency =
 
  386           static_cast<double>(coarser_block_count) /
 
  387           (divup<int>(coarser_block_count, numThreads()) * numThreads());
 
  388       if (coarser_efficiency + 0.01 >= max_efficiency) {
 
  390         block_size = coarser_block_size;
 
  391         block_count = coarser_block_count;
 
  392         if (max_efficiency < coarser_efficiency) {
 
  393           max_efficiency = coarser_efficiency;
 
  398     return {block_size, block_count};
 
  401   ThreadPoolInterface* pool_;
 
  403   Allocator* allocator_;
 
  409 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H