10 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H 11 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H 15 template <
typename Environment>
18 typedef typename Environment::Task
Task;
25 Environment
env = Environment())
51 #ifndef EIGEN_THREAD_LOCAL 58 env_.CreateThread([
this,
i]() { WorkerLoop(i); }));
60 #ifndef EIGEN_THREAD_LOCAL 93 const auto& pair = partitions[
i];
94 unsigned start = pair.first,
end = pair.second;
106 int limit)
override {
107 Task
t =
env_.CreateTask(std::move(fn));
109 if (pt->
pool ==
this) {
118 int num_queues = limit - start;
119 int rnd =
Rand(&pt->
rand) % num_queues;
143 #ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION 157 if (pt->
pool ==
this) {
176 return (start << kMaxPartitionBits) | limit;
180 *limit = val & (kMaxThreads - 1);
192 thread_data_[
i].steal_partition.store(val, std::memory_order_relaxed);
196 return thread_data_[
i].steal_partition.load(std::memory_order_relaxed);
200 for (
int i = 1;
i <=
N;
i++) {
215 typedef typename Environment::EnvThread
Thread;
222 #ifndef EIGEN_THREAD_LOCAL 229 constexpr
ThreadData() : thread(), steal_partition(0), queue() {}
247 #ifndef EIGEN_THREAD_LOCAL 255 #ifndef EIGEN_THREAD_LOCAL 256 std::unique_ptr<PerThread> new_pt(
new PerThread());
257 per_thread_map_mutex_.lock();
258 bool insertOK = per_thread_map_.emplace(
GlobalThreadIdHash(), std::move(new_pt)).second;
261 per_thread_map_mutex_.unlock();
262 init_barrier_->Notify();
263 init_barrier_->Wait();
275 const int spin_count =
276 allow_spinning_ && num_threads_ > 0 ? 5000 /
num_threads_ : 0;
277 if (num_threads_ == 1) {
284 while (!cancelled_) {
286 for (
int i = 0;
i < spin_count && !t.f;
i++) {
287 if (!cancelled_.load(std::memory_order_relaxed)) {
301 while (!cancelled_) {
309 if (allow_spinning_ && !spinning_ && !spinning_.exchange(
true)) {
310 for (
int i = 0;
i < spin_count && !t.f;
i++) {
311 if (!cancelled_.load(std::memory_order_relaxed)) {
336 Task
Steal(
unsigned start,
unsigned limit) {
338 const size_t size = limit - start;
345 unsigned inc = all_coprimes_[size - 1][index];
347 for (
unsigned i = 0;
i <
size;
i++) {
349 Task
t = thread_data_[start + victim].queue.PopBack();
354 if (victim >= size) {
367 if (global_steal_partition_ == partition)
return Task();
368 unsigned start, limit;
372 return Steal(start, limit);
377 return Steal(0, num_threads_);
396 *t = thread_data_[victim].queue.PopBack();
405 if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
436 const size_t size = thread_data_.
size();
438 unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].
size()];
439 unsigned victim = r %
size;
440 for (
unsigned i = 0;
i <
size;
i++) {
441 if (!thread_data_[victim].queue.Empty()) {
445 if (victim >= size) {
453 return std::hash<std::thread::id>()(std::this_thread::get_id());
457 #ifndef EIGEN_THREAD_LOCAL 460 if (it == per_thread_map_.end()) {
463 return it->second.get();
466 EIGEN_THREAD_LOCAL
PerThread per_thread_;
475 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
477 return static_cast<unsigned>((current ^ (current >> 22)) >>
478 (22 + (current >> 61)));
486 #endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
void ScheduleWithHint(std::function< void()> fn, int start, int limit) override
std::atomic< bool > spinning_
void ComputeCoprimes(int N, MaxSizeVector< unsigned > *coprimes)
#define EIGEN_STRONG_INLINE
void Cancel() EIGEN_OVERRIDE
ThreadPoolTempl< StlThreadEnvironment > ThreadPool
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t size() const
std::unordered_map< uint64_t, std::unique_ptr< PerThread > > per_thread_map_
unsigned GetStealPartition(int i)
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void push_back(const T &t)
MaxSizeVector< MaxSizeVector< unsigned > > all_coprimes_
Namespace containing all symbols from the Eigen library.
EIGEN_STRONG_INLINE PerThread * GetPerThread()
std::atomic< unsigned > steal_partition
Task Steal(unsigned start, unsigned limit)
Environment::EnvThread Thread
static const Point3 pt(1.0, 2.0, 3.0)
std::atomic< bool > done_
std::unique_ptr< Barrier > init_barrier_
RunQueue< Task, 1024 > Queue
void WorkerLoop(int thread_id)
void AssertBounds(int start, int end)
int CurrentThreadId() const EIGEN_FINAL
MaxSizeVector< ThreadData > thread_data_
std::atomic< bool > cancelled_
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t *state)
std::mutex per_thread_map_mutex_
unsigned __int64 uint64_t
ThreadPoolTempl(int num_threads, Environment env=Environment())
EIGEN_DEVICE_FUNC const Scalar & q
unsigned EncodePartition(unsigned start, unsigned limit)
static const int kMaxThreads
#define eigen_plain_assert(x)
void DecodePartition(unsigned val, unsigned *start, unsigned *limit)
int NumThreads() const EIGEN_FINAL
ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env=Environment())
std::atomic< unsigned > blocked_
void SetStealPartitions(const std::vector< std::pair< unsigned, unsigned >> &partitions)
std::unique_ptr< Thread > thread
void Schedule(std::function< void()> fn) EIGEN_OVERRIDE
MaxSizeVector< EventCount::Waiter > waiters_
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash()
static EIGEN_DEPRECATED const end_t end
const bool allow_spinning_
void SetStealPartition(size_t i, unsigned val)
bool WaitForWork(EventCount::Waiter *waiter, Task *t)
static const int kMaxPartitionBits
unsigned global_steal_partition_
void Notify(bool notifyAll)
#define EIGEN_UNUSED_VARIABLE(var)
void CommitWait(Waiter *w)