10 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H 11 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H 16 template <
typename Environment>
19 typedef typename Environment::Task
Task;
41 for (
int i = 1; i <= num_threads; i++) {
43 unsigned b = num_threads;
54 for (
int i = 0; i < num_threads; i++) {
57 for (
int i = 0; i < num_threads; i++) {
75 Task t =
env_.CreateTask(std::move(fn));
77 if (pt->
pool ==
this) {
107 if (pt->
pool ==
this) {
115 typedef typename Environment::EnvThread
Thread;
138 pt->
rand = std::hash<std::thread::id>()(std::this_thread::get_id());
153 if (!spinning_ && !spinning_.exchange(
true)) {
154 for (
int i = 0; i < 1000 && !t.f; i++) {
177 unsigned inc = coprimes_[r % coprimes_.
size()];
178 unsigned victim = r %
size;
179 for (
unsigned i = 0; i <
size; i++) {
180 Task t = queues_[victim]->PopBack();
185 if (victim >= size) {
204 *t = queues_[victim]->PopBack();
211 if (done_ && blocked_ == threads_.
size()) {
241 unsigned inc = coprimes_[r % coprimes_.
size()];
242 unsigned victim = r %
size;
243 for (
unsigned i = 0; i <
size; i++) {
244 if (!queues_[victim]->Empty()) {
248 if (victim >= size) {
262 uint64_t current = *state;
264 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
266 return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
274 #endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
#define EIGEN_STRONG_INLINE
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void push_back(const T &t)
NonBlockingThreadPoolTempl(int num_threads, Environment env=Environment())
void WorkerLoop(int thread_id)
static constexpr size_t size(Tuple< Args... > &)
Provides access to the number of elements in a tuple as a compile-time constant expression.
bool WaitForWork(EventCount::Waiter *waiter, Task *t)
static EIGEN_STRONG_INLINE PerThread * GetPerThread()
std::atomic< unsigned > blocked_
RunQueue< Task, 1024 > Queue
MaxSizeVector< Queue * > queues_
EIGEN_DEVICE_FUNC const Scalar & q
void CancelWait(Waiter *w)
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t size() const
NonBlockingThreadPoolTempl * pool
int CurrentThreadId() const final
MaxSizeVector< unsigned > coprimes_
std::atomic< bool > done_
Environment::EnvThread Thread
#define EIGEN_THREAD_LOCAL
int NumThreads() const final
void Schedule(std::function< void()> fn)
EIGEN_DEVICE_FUNC const Scalar & b
MaxSizeVector< EventCount::Waiter > waiters_
~NonBlockingThreadPoolTempl()
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t *state)
std::atomic< bool > spinning_
NonBlockingThreadPoolTempl< StlThreadEnvironment > NonBlockingThreadPool
MaxSizeVector< Thread * > threads_
void CommitWait(Waiter *w)