RunQueue.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
12 
13 
14 namespace Eigen {
15 
16 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
17 // Operations on front of the queue must be done by a single thread (owner),
18 // operations on back of the queue can be done by multiple threads concurrently.
19 //
20 // Algorithm outline:
21 // All remote threads operating on the queue back are serialized by a mutex.
22 // This ensures that at most two threads access state: owner and one remote
23 // thread (Size aside). The algorithm ensures that the occupied region of the
24 // underlying array is logically continuous (can wraparound, but no stray
25 // occupied elements). Owner operates on one end of this region, remote thread
26 // operates on the other end. Synchronization between these threads
27 // (potential consumption of the last element and take up of the last empty
28 // element) happens by means of state variable in each element. States are:
29 // empty, busy (in process of insertion of removal) and ready. Threads claim
30 // elements (empty->busy and ready->busy transitions) by means of a CAS
31 // operation. The finishing transition (busy->empty and busy->ready) are done
32 // with plain store as the element is exclusively owned by the current thread.
33 //
34 // Note: we could permit only pointers as elements, then we would not need
35 // separate state variable as null/non-null pointer value would serve as state,
36 // but that would require malloc/free per operation for large, complex values
37 // (and this is designed to store std::function<()>).
38 template <typename Work, unsigned kSize>
39 class RunQueue {
40  public:
41  RunQueue() : front_(0), back_(0) {
42  // require power-of-two for fast masking
43  eigen_assert((kSize & (kSize - 1)) == 0);
44  eigen_assert(kSize > 2); // why would you do this?
45  eigen_assert(kSize <= (64 << 10)); // leave enough space for counter
46  for (unsigned i = 0; i < kSize; i++)
47  array_[i].state.store(kEmpty, std::memory_order_relaxed);
48  }
49 
50  ~RunQueue() { eigen_assert(Size() == 0); }
51 
52  // PushFront inserts w at the beginning of the queue.
53  // If queue is full returns w, otherwise returns default-constructed Work.
54  Work PushFront(Work w) {
55  unsigned front = front_.load(std::memory_order_relaxed);
56  Elem* e = &array_[front & kMask];
57  uint8_t s = e->state.load(std::memory_order_relaxed);
58  if (s != kEmpty ||
59  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
60  return w;
61  front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
62  e->w = std::move(w);
63  e->state.store(kReady, std::memory_order_release);
64  return Work();
65  }
66 
67  // PopFront removes and returns the first element in the queue.
68  // If the queue was empty returns default-constructed Work.
69  Work PopFront() {
70  unsigned front = front_.load(std::memory_order_relaxed);
71  Elem* e = &array_[(front - 1) & kMask];
72  uint8_t s = e->state.load(std::memory_order_relaxed);
73  if (s != kReady ||
74  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
75  return Work();
76  Work w = std::move(e->w);
77  e->state.store(kEmpty, std::memory_order_release);
78  front = ((front - 1) & kMask2) | (front & ~kMask2);
79  front_.store(front, std::memory_order_relaxed);
80  return w;
81  }
82 
83  // PushBack adds w at the end of the queue.
84  // If queue is full returns w, otherwise returns default-constructed Work.
85  Work PushBack(Work w) {
86  std::unique_lock<std::mutex> lock(mutex_);
87  unsigned back = back_.load(std::memory_order_relaxed);
88  Elem* e = &array_[(back - 1) & kMask];
89  uint8_t s = e->state.load(std::memory_order_relaxed);
90  if (s != kEmpty ||
91  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
92  return w;
93  back = ((back - 1) & kMask2) | (back & ~kMask2);
94  back_.store(back, std::memory_order_relaxed);
95  e->w = std::move(w);
96  e->state.store(kReady, std::memory_order_release);
97  return Work();
98  }
99 
100  // PopBack removes and returns the last elements in the queue.
101  // Can fail spuriously.
102  Work PopBack() {
103  if (Empty()) return Work();
104  std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
105  if (!lock) return Work();
106  unsigned back = back_.load(std::memory_order_relaxed);
107  Elem* e = &array_[back & kMask];
108  uint8_t s = e->state.load(std::memory_order_relaxed);
109  if (s != kReady ||
110  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
111  return Work();
112  Work w = std::move(e->w);
113  e->state.store(kEmpty, std::memory_order_release);
114  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
115  return w;
116  }
117 
118  // PopBackHalf removes and returns half last elements in the queue.
119  // Returns number of elements removed. But can also fail spuriously.
120  unsigned PopBackHalf(std::vector<Work>* result) {
121  if (Empty()) return 0;
122  std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
123  if (!lock) return 0;
124  unsigned back = back_.load(std::memory_order_relaxed);
125  unsigned size = Size();
126  unsigned mid = back;
127  if (size > 1) mid = back + (size - 1) / 2;
128  unsigned n = 0;
129  unsigned start = 0;
130  for (; static_cast<int>(mid - back) >= 0; mid--) {
131  Elem* e = &array_[mid & kMask];
132  uint8_t s = e->state.load(std::memory_order_relaxed);
133  if (n == 0) {
134  if (s != kReady ||
135  !e->state.compare_exchange_strong(s, kBusy,
136  std::memory_order_acquire))
137  continue;
138  start = mid;
139  } else {
140  // Note: no need to store temporal kBusy, we exclusively own these
141  // elements.
142  eigen_assert(s == kReady);
143  }
144  result->push_back(std::move(e->w));
145  e->state.store(kEmpty, std::memory_order_release);
146  n++;
147  }
148  if (n != 0)
149  back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
150  return n;
151  }
152 
153  // Size returns current queue size.
154  // Can be called by any thread at any time.
155  unsigned Size() const {
156  // Emptiness plays critical role in thread pool blocking. So we go to great
157  // effort to not produce false positives (claim non-empty queue as empty).
158  for (;;) {
159  // Capture a consistent snapshot of front/tail.
160  unsigned front = front_.load(std::memory_order_acquire);
161  unsigned back = back_.load(std::memory_order_acquire);
162  unsigned front1 = front_.load(std::memory_order_relaxed);
163  if (front != front1) continue;
164  int size = (front & kMask2) - (back & kMask2);
165  // Fix overflow.
166  if (size < 0) size += 2 * kSize;
167  // Order of modification in push/pop is crafted to make the queue look
168  // larger than it is during concurrent modifications. E.g. pop can
169  // decrement size before the corresponding push has incremented it.
170  // So the computed size can be up to kSize + 1, fix it.
171  if (size > static_cast<int>(kSize)) size = kSize;
172  return size;
173  }
174  }
175 
176  // Empty tests whether container is empty.
177  // Can be called by any thread at any time.
178  bool Empty() const { return Size() == 0; }
179 
180  private:
181  static const unsigned kMask = kSize - 1;
182  static const unsigned kMask2 = (kSize << 1) - 1;
183  struct Elem {
184  std::atomic<uint8_t> state;
185  Work w;
186  };
187  enum {
191  };
192  std::mutex mutex_;
193  // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
194  // front/back, repsectively. The remaining bits contain modification counters
195  // that are incremented on Push operations. This allows us to (1) distinguish
196  // between empty and full conditions (if we would use log(kSize) bits for
197  // position, these conditions would be indistinguishable); (2) obtain
198  // consistent snapshot of front_/back_ for Size operation using the
199  // modification counters.
200  std::atomic<unsigned> front_;
201  std::atomic<unsigned> back_;
202  Elem array_[kSize];
203 
204  RunQueue(const RunQueue&) = delete;
205  void operator=(const RunQueue&) = delete;
206 };
207 
208 } // namespace Eigen
209 
210 #endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
unsigned char uint8_t
Definition: ms_stdint.h:83
Work PushBack(Work w)
Definition: RunQueue.h:85
std::atomic< unsigned > back_
Definition: RunQueue.h:201
std::atomic< uint8_t > state
Definition: RunQueue.h:184
int n
Namespace containing all symbols from the Eigen library.
Definition: jet.h:637
void operator=(const RunQueue &)=delete
static const unsigned kMask2
Definition: RunQueue.h:182
Work PopFront()
Definition: RunQueue.h:69
bool Empty() const
Definition: RunQueue.h:178
std::atomic< unsigned > front_
Definition: RunQueue.h:200
unsigned Size() const
Definition: RunQueue.h:155
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
Elem array_[kSize]
Definition: RunQueue.h:202
Values result
#define eigen_assert(x)
Definition: Macros.h:579
Array< double, 1, 3 > e(1./3., 0.5, 2.)
RealScalar s
std::mutex mutex_
Definition: RunQueue.h:192
RowVector3d w
Work PushFront(Work w)
Definition: RunQueue.h:54
static const unsigned kMask
Definition: RunQueue.h:181
unsigned PopBackHalf(std::vector< Work > *result)
Definition: RunQueue.h:120
Work PopBack()
Definition: RunQueue.h:102


gtsam
Author(s):
autogenerated on Sat May 8 2021 02:43:53