RunQueue.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
12 
13 namespace Eigen {
14 
15 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
16 // Operations on front of the queue must be done by a single thread (owner),
17 // operations on back of the queue can be done by multiple threads concurrently.
18 //
19 // Algorithm outline:
20 // All remote threads operating on the queue back are serialized by a mutex.
21 // This ensures that at most two threads access state: owner and one remote
22 // thread (Size aside). The algorithm ensures that the occupied region of the
23 // underlying array is logically continuous (can wraparound, but no stray
24 // occupied elements). Owner operates on one end of this region, remote thread
25 // operates on the other end. Synchronization between these threads
26 // (potential consumption of the last element and take up of the last empty
27 // element) happens by means of state variable in each element. States are:
28 // empty, busy (in process of insertion of removal) and ready. Threads claim
29 // elements (empty->busy and ready->busy transitions) by means of a CAS
30 // operation. The finishing transition (busy->empty and busy->ready) are done
31 // with plain store as the element is exclusively owned by the current thread.
32 //
33 // Note: we could permit only pointers as elements, then we would not need
34 // separate state variable as null/non-null pointer value would serve as state,
35 // but that would require malloc/free per operation for large, complex values
36 // (and this is designed to store std::function<()>).
37 template <typename Work, unsigned kSize>
38 class RunQueue {
39  public:
40  RunQueue() : front_(0), back_(0) {
41  // require power-of-two for fast masking
42  eigen_plain_assert((kSize & (kSize - 1)) == 0);
43  eigen_plain_assert(kSize > 2); // why would you do this?
44  eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter
45  for (unsigned i = 0; i < kSize; i++)
46  array_[i].state.store(kEmpty, std::memory_order_relaxed);
47  }
48 
50 
51  // PushFront inserts w at the beginning of the queue.
52  // If queue is full returns w, otherwise returns default-constructed Work.
53  Work PushFront(Work w) {
54  unsigned front = front_.load(std::memory_order_relaxed);
55  Elem* e = &array_[front & kMask];
56  uint8_t s = e->state.load(std::memory_order_relaxed);
57  if (s != kEmpty ||
58  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
59  return w;
60  front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
61  e->w = std::move(w);
62  e->state.store(kReady, std::memory_order_release);
63  return Work();
64  }
65 
66  // PopFront removes and returns the first element in the queue.
67  // If the queue was empty returns default-constructed Work.
68  Work PopFront() {
69  unsigned front = front_.load(std::memory_order_relaxed);
70  Elem* e = &array_[(front - 1) & kMask];
71  uint8_t s = e->state.load(std::memory_order_relaxed);
72  if (s != kReady ||
73  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
74  return Work();
75  Work w = std::move(e->w);
76  e->state.store(kEmpty, std::memory_order_release);
77  front = ((front - 1) & kMask2) | (front & ~kMask2);
78  front_.store(front, std::memory_order_relaxed);
79  return w;
80  }
81 
82  // PushBack adds w at the end of the queue.
83  // If queue is full returns w, otherwise returns default-constructed Work.
84  Work PushBack(Work w) {
85  std::unique_lock<std::mutex> lock(mutex_);
86  unsigned back = back_.load(std::memory_order_relaxed);
87  Elem* e = &array_[(back - 1) & kMask];
88  uint8_t s = e->state.load(std::memory_order_relaxed);
89  if (s != kEmpty ||
90  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
91  return w;
92  back = ((back - 1) & kMask2) | (back & ~kMask2);
93  back_.store(back, std::memory_order_relaxed);
94  e->w = std::move(w);
95  e->state.store(kReady, std::memory_order_release);
96  return Work();
97  }
98 
99  // PopBack removes and returns the last elements in the queue.
100  Work PopBack() {
101  if (Empty()) return Work();
102  std::unique_lock<std::mutex> lock(mutex_);
103  unsigned back = back_.load(std::memory_order_relaxed);
104  Elem* e = &array_[back & kMask];
105  uint8_t s = e->state.load(std::memory_order_relaxed);
106  if (s != kReady ||
107  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
108  return Work();
109  Work w = std::move(e->w);
110  e->state.store(kEmpty, std::memory_order_release);
111  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
112  return w;
113  }
114 
115  // PopBackHalf removes and returns half last elements in the queue.
116  // Returns number of elements removed.
117  unsigned PopBackHalf(std::vector<Work>* result) {
118  if (Empty()) return 0;
119  std::unique_lock<std::mutex> lock(mutex_);
120  unsigned back = back_.load(std::memory_order_relaxed);
121  unsigned size = Size();
122  unsigned mid = back;
123  if (size > 1) mid = back + (size - 1) / 2;
124  unsigned n = 0;
125  unsigned start = 0;
126  for (; static_cast<int>(mid - back) >= 0; mid--) {
127  Elem* e = &array_[mid & kMask];
128  uint8_t s = e->state.load(std::memory_order_relaxed);
129  if (n == 0) {
130  if (s != kReady || !e->state.compare_exchange_strong(
131  s, kBusy, std::memory_order_acquire))
132  continue;
133  start = mid;
134  } else {
135  // Note: no need to store temporal kBusy, we exclusively own these
136  // elements.
138  }
139  result->push_back(std::move(e->w));
140  e->state.store(kEmpty, std::memory_order_release);
141  n++;
142  }
143  if (n != 0)
144  back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
145  return n;
146  }
147 
148  // Size returns current queue size.
149  // Can be called by any thread at any time.
150  unsigned Size() const { return SizeOrNotEmpty<true>(); }
151 
152  // Empty tests whether container is empty.
153  // Can be called by any thread at any time.
154  bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
155 
156  // Delete all the elements from the queue.
157  void Flush() {
158  while (!Empty()) {
159  PopFront();
160  }
161  }
162 
163  private:
164  static const unsigned kMask = kSize - 1;
165  static const unsigned kMask2 = (kSize << 1) - 1;
166  struct Elem {
167  std::atomic<uint8_t> state;
168  Work w;
169  };
170  enum {
174  };
175  std::mutex mutex_;
176  // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
177  // front/back, respectively. The remaining bits contain modification counters
178  // that are incremented on Push operations. This allows us to (1) distinguish
179  // between empty and full conditions (if we would use log(kSize) bits for
180  // position, these conditions would be indistinguishable); (2) obtain
181  // consistent snapshot of front_/back_ for Size operation using the
182  // modification counters.
183  std::atomic<unsigned> front_;
184  std::atomic<unsigned> back_;
185  Elem array_[kSize];
186 
187  // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
188  // only whether the size is 0 is guaranteed to be correct.
189  // Can be called by any thread at any time.
190  template<bool NeedSizeEstimate>
191  unsigned SizeOrNotEmpty() const {
192  // Emptiness plays critical role in thread pool blocking. So we go to great
193  // effort to not produce false positives (claim non-empty queue as empty).
194  unsigned front = front_.load(std::memory_order_acquire);
195  for (;;) {
196  // Capture a consistent snapshot of front/tail.
197  unsigned back = back_.load(std::memory_order_acquire);
198  unsigned front1 = front_.load(std::memory_order_relaxed);
199  if (front != front1) {
200  front = front1;
201  std::atomic_thread_fence(std::memory_order_acquire);
202  continue;
203  }
204  if (NeedSizeEstimate) {
205  return CalculateSize(front, back);
206  } else {
207  // This value will be 0 if the queue is empty, and undefined otherwise.
208  unsigned maybe_zero = ((front ^ back) & kMask2);
209  // Queue size estimate must agree with maybe zero check on the queue
210  // empty/non-empty state.
211  eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
212  return maybe_zero;
213  }
214  }
215  }
216 
218  unsigned CalculateSize(unsigned front, unsigned back) const {
219  int size = (front & kMask2) - (back & kMask2);
220  // Fix overflow.
221  if (size < 0) size += 2 * kSize;
222  // Order of modification in push/pop is crafted to make the queue look
223  // larger than it is during concurrent modifications. E.g. push can
224  // increment size before the corresponding pop has decremented it.
225  // So the computed size can be up to kSize + 1, fix it.
226  if (size > static_cast<int>(kSize)) size = kSize;
227  return static_cast<unsigned>(size);
228  }
229 
230  RunQueue(const RunQueue&) = delete;
231  void operator=(const RunQueue&) = delete;
232 };
233 
234 } // namespace Eigen
235 
236 #endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
w
RowVector3d w
Definition: Matrix_resize_int.cpp:3
Eigen::RunQueue::Elem::w
Work w
Definition: RunQueue.h:168
Eigen
Namespace containing all symbols from the Eigen library.
Definition: jet.h:637
s
RealScalar s
Definition: level1_cplx_impl.h:126
e
Array< double, 1, 3 > e(1./3., 0.5, 2.)
Eigen::RunQueue::SizeOrNotEmpty
unsigned SizeOrNotEmpty() const
Definition: RunQueue.h:191
Eigen::RunQueue::back_
std::atomic< unsigned > back_
Definition: RunQueue.h:184
eigen_assert
#define eigen_assert(x)
Definition: Macros.h:1037
eigen_plain_assert
#define eigen_plain_assert(x)
Definition: Macros.h:1007
Eigen::RunQueue
Definition: RunQueue.h:38
Eigen::RunQueue::array_
Elem array_[kSize]
Definition: RunQueue.h:185
Eigen::RunQueue::kMask2
static const unsigned kMask2
Definition: RunQueue.h:165
result
Values result
Definition: OdometryOptimize.cpp:8
uint8_t
unsigned char uint8_t
Definition: ms_stdint.h:83
Eigen::RunQueue::Flush
void Flush()
Definition: RunQueue.h:157
size
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
Eigen::RunQueue::PopFront
Work PopFront()
Definition: RunQueue.h:68
n
int n
Definition: BiCGSTAB_simple.cpp:1
Eigen::RunQueue::operator=
void operator=(const RunQueue &)=delete
Eigen::RunQueue::kEmpty
@ kEmpty
Definition: RunQueue.h:171
Eigen::RunQueue::RunQueue
RunQueue()
Definition: RunQueue.h:40
Eigen::RunQueue::kReady
@ kReady
Definition: RunQueue.h:173
Eigen::RunQueue::front_
std::atomic< unsigned > front_
Definition: RunQueue.h:183
Eigen::RunQueue::Size
unsigned Size() const
Definition: RunQueue.h:150
Eigen::RunQueue::Empty
bool Empty() const
Definition: RunQueue.h:154
Eigen::RunQueue::Elem::state
std::atomic< uint8_t > state
Definition: RunQueue.h:167
EIGEN_ALWAYS_INLINE
#define EIGEN_ALWAYS_INLINE
Definition: Macros.h:932
Eigen::RunQueue::~RunQueue
~RunQueue()
Definition: RunQueue.h:49
Eigen::RunQueue::PushFront
Work PushFront(Work w)
Definition: RunQueue.h:53
Eigen::RunQueue::mutex_
std::mutex mutex_
Definition: RunQueue.h:175
Eigen::RunQueue::PopBackHalf
unsigned PopBackHalf(std::vector< Work > *result)
Definition: RunQueue.h:117
Eigen::RunQueue::kMask
static const unsigned kMask
Definition: RunQueue.h:164
Eigen::RunQueue::PushBack
Work PushBack(Work w)
Definition: RunQueue.h:84
Eigen::RunQueue::kBusy
@ kBusy
Definition: RunQueue.h:172
Eigen::RunQueue::PopBack
Work PopBack()
Definition: RunQueue.h:100
i
int i
Definition: BiCGSTAB_step_by_step.cpp:9
Eigen::RunQueue::CalculateSize
EIGEN_ALWAYS_INLINE unsigned CalculateSize(unsigned front, unsigned back) const
Definition: RunQueue.h:218
Eigen::RunQueue::Elem
Definition: RunQueue.h:166
make_changelog.state
state
Definition: make_changelog.py:29


gtsam
Author(s):
autogenerated on Wed Jan 1 2025 04:03:06