readerwriterqueue.h
Go to the documentation of this file.
1 // ©2013-2020 Cameron Desrochers.
2 // Distributed under the simplified BSD license (see the license file that
3 // should have come with this header).
4 
5 #pragma once
6 
7 #include "atomicops.h"
8 #include <new>
9 #include <type_traits>
10 #include <utility>
11 #include <cassert>
12 #include <stdexcept>
13 #include <new>
14 #include <cstdint>
15 #include <cstdlib> // For malloc/free/abort & size_t
16 #include <memory>
17 #if __cplusplus > 199711L || _MSC_VER >= 1700 // C++11 or VS2012
18 #include <chrono>
19 #endif
20 
21 // A lock-free queue for a single-consumer, single-producer architecture.
22 // The queue is also wait-free in the common path (except if more memory
23 // needs to be allocated, in which case malloc is called).
24 // Allocates memory sparingly (O(lg(n) times, amortized), and only once if
25 // the original maximum size estimate is never exceeded.
26 // Tested on x86/x64 processors, but semantics should be correct for all
27 // architectures (given the right implementations in atomicops.h), provided
28 // that aligned integer and pointer accesses are naturally atomic.
29 // Note that there should only be one consumer thread and producer thread;
30 // Switching roles of the threads, or using multiple consecutive threads for
31 // one role, is not safe unless properly synchronized.
32 // Using the queue exclusively from one thread is fine, though a bit silly.
33 
34 #ifndef MOODYCAMEL_CACHE_LINE_SIZE
35 #define MOODYCAMEL_CACHE_LINE_SIZE 64
36 #endif
37 
38 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
39 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || \
40  (!defined(_MSC_VER) && !defined(__GNUC__))
41 #define MOODYCAMEL_EXCEPTIONS_ENABLED
42 #endif
43 #endif
44 
45 #ifndef MOODYCAMEL_HAS_EMPLACE
46 #if !defined(_MSC_VER) || _MSC_VER >= 1800 // variadic templates: either a non-MS compiler or VS >= 2013
47 #define MOODYCAMEL_HAS_EMPLACE 1
48 #endif
49 #endif
50 
51 #ifdef AE_VCPP
52 #pragma warning(push)
53 #pragma warning(disable : 4324) // structure was padded due to __declspec(align())
54 #pragma warning(disable : 4820) // padding was added
55 #pragma warning(disable : 4127) // conditional expression is constant
56 #endif
57 
58 namespace moodycamel
59 {
60 template <typename T, size_t MAX_BLOCK_SIZE = 512>
62 {
63  // Design: Based on a queue-of-queues. The low-level queues are just
64  // circular buffers with front and tail indices indicating where the
65  // next element to dequeue is and where the next element can be enqueued,
66  // respectively. Each low-level queue is called a "block". Each block
67  // wastes exactly one element's worth of space to keep the design simple
68  // (if front == tail then the queue is empty, and can't be full).
69  // The high-level queue is a circular linked list of blocks; again there
70  // is a front and tail, but this time they are pointers to the blocks.
71  // The front block is where the next element to be dequeued is, provided
72  // the block is not empty. The back block is where elements are to be
73  // enqueued, provided the block is not full.
74  // The producer thread owns all the tail indices/pointers. The consumer
75  // thread owns all the front indices/pointers. Both threads read each
76  // other's variables, but only the owning thread updates them. E.g. After
77  // the consumer reads the producer's tail, the tail may change before the
78  // consumer is done dequeuing an object, but the consumer knows the tail
79  // will never go backwards, only forwards.
80  // If there is no room to enqueue an object, an additional block (of
81  // equal size to the last block) is added. Blocks are never removed.
82 
83 public:
84  typedef T value_type;
85 
86  // Constructs a queue that can hold at least `size` elements without further
87  // allocations. If more than MAX_BLOCK_SIZE elements are requested,
88  // then several blocks of MAX_BLOCK_SIZE each are reserved (including
89  // at least one extra buffer block).
90  AE_NO_TSAN explicit ReaderWriterQueue(size_t size = 15)
91 #ifndef NDEBUG
92  : enqueuing(false), dequeuing(false)
93 #endif
94  {
95  assert(size > 0);
96  assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
97  assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
98 
99  Block* firstBlock = nullptr;
100 
101  largestBlockSize = ceilToPow2(size + 1); // We need a spare slot to fit size elements in the block
102  if (largestBlockSize > MAX_BLOCK_SIZE * 2)
103  {
104  // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
105  // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the
106  // ambiguity between front == tail meaning "empty" and "full". So the effective number of slots that are
107  // guaranteed to be usable at any time is the block size - 1 times the number of blocks - 1. Solving for size and
108  // applying a ceiling to the division gives us (after simplifying):
109  size_t initialBlockCount = (size + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
110  largestBlockSize = MAX_BLOCK_SIZE;
111  Block* lastBlock = nullptr;
112  for (size_t i = 0; i != initialBlockCount; ++i)
113  {
114  auto block = make_block(largestBlockSize);
115  if (block == nullptr)
116  {
117 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
118  throw std::bad_alloc();
119 #else
120  abort();
121 #endif
122  }
123  if (firstBlock == nullptr)
124  {
125  firstBlock = block;
126  }
127  else
128  {
129  lastBlock->next = block;
130  }
131  lastBlock = block;
132  block->next = firstBlock;
133  }
134  }
135  else
136  {
137  firstBlock = make_block(largestBlockSize);
138  if (firstBlock == nullptr)
139  {
140 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
141  throw std::bad_alloc();
142 #else
143  abort();
144 #endif
145  }
146  firstBlock->next = firstBlock;
147  }
148  frontBlock = firstBlock;
149  tailBlock = firstBlock;
150 
151  // Make sure the reader/writer threads will have the initialized memory setup above:
153  }
154 
155  // Note: The queue should not be accessed concurrently while it's
156  // being moved. It's up to the user to synchronize this.
158  : frontBlock(other.frontBlock.load())
159  , tailBlock(other.tailBlock.load())
161 #ifndef NDEBUG
162  , enqueuing(false)
163  , dequeuing(false)
164 #endif
165  {
166  other.largestBlockSize = 32;
167  Block* b = other.make_block(other.largestBlockSize);
168  if (b == nullptr)
169  {
170 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
171  throw std::bad_alloc();
172 #else
173  abort();
174 #endif
175  }
176  b->next = b;
177  other.frontBlock = b;
178  other.tailBlock = b;
179  }
180 
181  // Note: The queue should not be accessed concurrently while it's
182  // being moved. It's up to the user to synchronize this.
184  {
185  Block* b = frontBlock.load();
186  frontBlock = other.frontBlock.load();
187  other.frontBlock = b;
188  b = tailBlock.load();
189  tailBlock = other.tailBlock.load();
190  other.tailBlock = b;
191  std::swap(largestBlockSize, other.largestBlockSize);
192  return *this;
193  }
194 
195  // Note: The queue should not be accessed concurrently while it's
196  // being deleted. It's up to the user to synchronize this.
198  {
199  // Make sure we get the latest version of all variables from other CPUs:
201 
202  // Destroy any remaining objects in queue and free memory
203  Block* frontBlock_ = frontBlock;
204  Block* block = frontBlock_;
205  do
206  {
207  Block* nextBlock = block->next;
208  size_t blockFront = block->front;
209  size_t blockTail = block->tail;
210 
211  for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask)
212  {
213  auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
214  element->~T();
215  (void)element;
216  }
217 
218  auto rawBlock = block->rawThis;
219  block->~Block();
220  std::free(rawBlock);
221  block = nextBlock;
222  } while (block != frontBlock_);
223  }
224 
225  // Enqueues a copy of element if there is room in the queue.
226  // Returns true if the element was enqueued, false otherwise.
227  // Does not allocate memory.
228  AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN
229  {
230  return inner_enqueue<CannotAlloc>(element);
231  }
232 
233  // Enqueues a moved copy of element if there is room in the queue.
234  // Returns true if the element was enqueued, false otherwise.
235  // Does not allocate memory.
237  {
238  return inner_enqueue<CannotAlloc>(std::forward<T>(element));
239  }
240 
241 #if MOODYCAMEL_HAS_EMPLACE
242  // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
243  template <typename... Args>
245  {
246  return inner_enqueue<CannotAlloc>(std::forward<Args>(args)...);
247  }
248 #endif
249 
250  // Enqueues a copy of element on the queue.
251  // Allocates an additional block of memory if needed.
252  // Only fails (returns false) if memory allocation fails.
253  AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN
254  {
255  return inner_enqueue<CanAlloc>(element);
256  }
257 
258  // Enqueues a moved copy of element on the queue.
259  // Allocates an additional block of memory if needed.
260  // Only fails (returns false) if memory allocation fails.
262  {
263  return inner_enqueue<CanAlloc>(std::forward<T>(element));
264  }
265 
266 #if MOODYCAMEL_HAS_EMPLACE
267  // Like enqueue() but with emplace semantics (i.e. construct-in-place).
268  template <typename... Args>
270  {
271  return inner_enqueue<CanAlloc>(std::forward<Args>(args)...);
272  }
273 #endif
274 
275  // Attempts to dequeue an element; if the queue is empty,
276  // returns false instead. If the queue has at least one element,
277  // moves front to result using operator=, then returns true.
278  template <typename U>
279  bool try_dequeue(U& result) AE_NO_TSAN
280  {
281 #ifndef NDEBUG
282  ReentrantGuard guard(this->dequeuing);
283 #endif
284 
285  // High-level pseudocode:
286  // Remember where the tail block is
287  // If the front block has an element in it, dequeue it
288  // Else
289  // If front block was the tail block when we entered the function, return false
290  // Else advance to next block and dequeue the item there
291 
292  // Note that we have to use the value of the tail block from before we check if the front
293  // block is full or not, in case the front block is empty and then, before we check if the
294  // tail block is at the front block or not, the producer fills up the front block *and
295  // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
296  // reproducible in practice.
297  // In order to avoid overhead in the common case, though, we do a double-checked pattern
298  // where we have the fast path if the front block is not empty, then read the tail block,
299  // then re-read the front block and check if it's not empty again, then check if the tail
300  // block has advanced.
301 
302  Block* frontBlock_ = frontBlock.load();
303  size_t blockTail = frontBlock_->localTail;
304  size_t blockFront = frontBlock_->front.load();
305 
306  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load()))
307  {
309 
310  non_empty_front_block:
311  // Front block not empty, dequeue from here
312  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
313  result = std::move(*element);
314  element->~T();
315 
316  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
317 
319  frontBlock_->front = blockFront;
320  }
321  else if (frontBlock_ != tailBlock.load())
322  {
324 
325  frontBlock_ = frontBlock.load();
326  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
327  blockFront = frontBlock_->front.load();
329 
330  if (blockFront != blockTail)
331  {
332  // Oh look, the front block isn't empty after all
333  goto non_empty_front_block;
334  }
335 
336  // Front block is empty but there's another block ahead, advance to it
337  Block* nextBlock = frontBlock_->next;
338  // Don't need an acquire fence here since next can only ever be set on the tailBlock,
339  // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
340  // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
341 
342  size_t nextBlockFront = nextBlock->front.load();
343  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
345 
346  // Since the tailBlock is only ever advanced after being written to,
347  // we know there's for sure an element to dequeue on it
348  assert(nextBlockFront != nextBlockTail);
349  AE_UNUSED(nextBlockTail);
350 
351  // We're done with this block, let the producer use it if it needs
352  fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
353  frontBlock = frontBlock_ = nextBlock;
354 
355  compiler_fence(memory_order_release); // Not strictly needed
356 
357  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
358 
359  result = std::move(*element);
360  element->~T();
361 
362  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
363 
365  frontBlock_->front = nextBlockFront;
366  }
367  else
368  {
369  // No elements in current block and no other block to advance to
370  return false;
371  }
372 
373  return true;
374  }
375 
376  // Returns a pointer to the front element in the queue (the one that
377  // would be removed next by a call to `try_dequeue` or `pop`). If the
378  // queue appears empty at the time the method is called, nullptr is
379  // returned instead.
380  // Must be called only from the consumer thread.
381  T* peek() const AE_NO_TSAN
382  {
383 #ifndef NDEBUG
384  ReentrantGuard guard(this->dequeuing);
385 #endif
386  // See try_dequeue() for reasoning
387 
388  Block* frontBlock_ = frontBlock.load();
389  size_t blockTail = frontBlock_->localTail;
390  size_t blockFront = frontBlock_->front.load();
391 
392  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load()))
393  {
395  non_empty_front_block:
396  return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
397  }
398  else if (frontBlock_ != tailBlock.load())
399  {
401  frontBlock_ = frontBlock.load();
402  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
403  blockFront = frontBlock_->front.load();
405 
406  if (blockFront != blockTail)
407  {
408  goto non_empty_front_block;
409  }
410 
411  Block* nextBlock = frontBlock_->next;
412 
413  size_t nextBlockFront = nextBlock->front.load();
415 
416  assert(nextBlockFront != nextBlock->tail.load());
417  return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
418  }
419 
420  return nullptr;
421  }
422 
423  // Removes the front element from the queue, if any, without returning it.
424  // Returns true on success, or false if the queue appeared empty at the time
425  // `pop` was called.
427  {
428 #ifndef NDEBUG
429  ReentrantGuard guard(this->dequeuing);
430 #endif
431  // See try_dequeue() for reasoning
432 
433  Block* frontBlock_ = frontBlock.load();
434  size_t blockTail = frontBlock_->localTail;
435  size_t blockFront = frontBlock_->front.load();
436 
437  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load()))
438  {
440 
441  non_empty_front_block:
442  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
443  element->~T();
444 
445  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
446 
448  frontBlock_->front = blockFront;
449  }
450  else if (frontBlock_ != tailBlock.load())
451  {
453  frontBlock_ = frontBlock.load();
454  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
455  blockFront = frontBlock_->front.load();
457 
458  if (blockFront != blockTail)
459  {
460  goto non_empty_front_block;
461  }
462 
463  // Front block is empty but there's another block ahead, advance to it
464  Block* nextBlock = frontBlock_->next;
465 
466  size_t nextBlockFront = nextBlock->front.load();
467  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
469 
470  assert(nextBlockFront != nextBlockTail);
471  AE_UNUSED(nextBlockTail);
472 
474  frontBlock = frontBlock_ = nextBlock;
475 
477 
478  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
479  element->~T();
480 
481  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
482 
484  frontBlock_->front = nextBlockFront;
485  }
486  else
487  {
488  // No elements in current block and no other block to advance to
489  return false;
490  }
491 
492  return true;
493  }
494 
495  // Returns the approximate number of items currently in the queue.
496  // Safe to call from both the producer and consumer threads.
497  inline size_t size_approx() const AE_NO_TSAN
498  {
499  size_t result = 0;
500  Block* frontBlock_ = frontBlock.load();
501  Block* block = frontBlock_;
502  do
503  {
505  size_t blockFront = block->front.load();
506  size_t blockTail = block->tail.load();
507  result += (blockTail - blockFront) & block->sizeMask;
508  block = block->next.load();
509  } while (block != frontBlock_);
510  return result;
511  }
512 
513  // Returns the total number of items that could be enqueued without incurring
514  // an allocation when this queue is empty.
515  // Safe to call from both the producer and consumer threads.
516  //
517  // NOTE: The actual capacity during usage may be different depending on the consumer.
518  // If the consumer is removing elements concurrently, the producer cannot add to
519  // the block the consumer is removing from until it's completely empty, except in
520  // the case where the producer was writing to the same block the consumer was
521  // reading from the whole time.
522  inline size_t max_capacity() const
523  {
524  size_t result = 0;
525  Block* frontBlock_ = frontBlock.load();
526  Block* block = frontBlock_;
527  do
528  {
530  result += block->sizeMask;
531  block = block->next.load();
532  } while (block != frontBlock_);
533  return result;
534  }
535 
536 private:
538  {
541  };
542 
543 #if MOODYCAMEL_HAS_EMPLACE
544  template <AllocationMode canAlloc, typename... Args>
545  bool inner_enqueue(Args&&... args) AE_NO_TSAN
546 #else
547  template <AllocationMode canAlloc, typename U>
548  bool inner_enqueue(U&& element) AE_NO_TSAN
549 #endif
550  {
551 #ifndef NDEBUG
552  ReentrantGuard guard(this->enqueuing);
553 #endif
554 
555  // High-level pseudocode (assuming we're allowed to alloc a new block):
556  // If room in tail block, add to tail
557  // Else check next block
558  // If next block is not the head block, enqueue on next block
559  // Else create a new block and enqueue there
560  // Advance tail to the block we just enqueued to
561 
562  Block* tailBlock_ = tailBlock.load();
563  size_t blockFront = tailBlock_->localFront;
564  size_t blockTail = tailBlock_->tail.load();
565 
566  size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
567  if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load()))
568  {
570  // This block has room for at least one more element
571  char* location = tailBlock_->data + blockTail * sizeof(T);
572 #if MOODYCAMEL_HAS_EMPLACE
573  new (location) T(std::forward<Args>(args)...);
574 #else
575  new (location) T(std::forward<U>(element));
576 #endif
577 
579  tailBlock_->tail = nextBlockTail;
580  }
581  else
582  {
584  if (tailBlock_->next.load() != frontBlock)
585  {
586  // Note that the reason we can't advance to the frontBlock and start adding new entries there
587  // is because if we did, then dequeue would stay in that block, eventually reading the new values,
588  // instead of advancing to the next full block (whose values were enqueued first and so should be
589  // consumed first).
590 
591  fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
592 
593  // tailBlock is full, but there's a free block ahead, use it
594  Block* tailBlockNext = tailBlock_->next.load();
595  size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
596  nextBlockTail = tailBlockNext->tail.load();
598 
599  // This block must be empty since it's not the head block and we
600  // go through the blocks in a circle
601  assert(nextBlockFront == nextBlockTail);
602  tailBlockNext->localFront = nextBlockFront;
603 
604  char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
605 #if MOODYCAMEL_HAS_EMPLACE
606  new (location) T(std::forward<Args>(args)...);
607 #else
608  new (location) T(std::forward<U>(element));
609 #endif
610 
611  tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
612 
614  tailBlock = tailBlockNext;
615  }
616  else if (canAlloc == CanAlloc)
617  {
618  // tailBlock is full and there's no free block ahead; create a new block
619  auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
620  auto newBlock = make_block(newBlockSize);
621  if (newBlock == nullptr)
622  {
623  // Could not allocate a block!
624  return false;
625  }
626  largestBlockSize = newBlockSize;
627 
628 #if MOODYCAMEL_HAS_EMPLACE
629  new (newBlock->data) T(std::forward<Args>(args)...);
630 #else
631  new (newBlock->data) T(std::forward<U>(element));
632 #endif
633  assert(newBlock->front == 0);
634  newBlock->tail = newBlock->localTail = 1;
635 
636  newBlock->next = tailBlock_->next.load();
637  tailBlock_->next = newBlock;
638 
639  // Might be possible for the dequeue thread to see the new tailBlock->next
640  // *without* seeing the new tailBlock value, but this is OK since it can't
641  // advance to the next block until tailBlock is set anyway (because the only
642  // case where it could try to read the next is if it's already at the tailBlock,
643  // and it won't advance past tailBlock in any circumstance).
644 
646  tailBlock = newBlock;
647  }
648  else if (canAlloc == CannotAlloc)
649  {
650  // Would have had to allocate a new block to enqueue, but not allowed
651  return false;
652  }
653  else
654  {
655  assert(false && "Should be unreachable code");
656  return false;
657  }
658  }
659 
660  return true;
661  }
662 
663  // Disable copying
665  {
666  }
667 
668  // Disable assignment
670  {
671  }
672 
673  AE_FORCEINLINE static size_t ceilToPow2(size_t x)
674  {
675  // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
676  --x;
677  x |= x >> 1;
678  x |= x >> 2;
679  x |= x >> 4;
680  for (size_t i = 1; i < sizeof(size_t); i <<= 1)
681  {
682  x |= x >> (i << 3);
683  }
684  ++x;
685  return x;
686  }
687 
688  template <typename U>
689  static AE_FORCEINLINE char* align_for(char* ptr) AE_NO_TSAN
690  {
691  const std::size_t alignment = std::alignment_of<U>::value;
692  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
693  }
694 
695 private:
696 #ifndef NDEBUG
698  {
699  AE_NO_TSAN ReentrantGuard(bool& _inSection) : inSection(_inSection)
700  {
701  assert(!inSection && "Concurrent (or re-entrant) enqueue or dequeue operation detected (only one thread at a "
702  "time may hold the producer or consumer role)");
703  inSection = true;
704  }
705 
707  {
708  inSection = false;
709  }
710 
711  private:
713 
714  private:
715  bool& inSection;
716  };
717 #endif
718 
719  struct Block
720  {
721  // Avoid false-sharing by putting highly contended variables on their own cache lines
722  weak_atomic<size_t> front; // (Atomic) Elements are read from here
723  size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
724 
725  char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)];
726  weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
727  size_t localFront;
728 
729  char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) -
730  sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as
731  // tail (which is)
733 
734  char* data; // Contents (on heap) are aligned to T's alignment
735 
736  const size_t sizeMask;
737 
738  // size must be a power of two (and greater than 0)
739  AE_NO_TSAN Block(size_t const& _size, char* _rawThis, char* _data)
740  : front(0)
741  , localTail(0)
742  , tail(0)
743  , localFront(0)
744  , next(nullptr)
745  , data(_data)
746  , sizeMask(_size - 1)
747  , rawThis(_rawThis)
748  {
749  }
750 
751  private:
752  // C4512 - Assignment operator could not be generated
753  Block& operator=(Block const&);
754 
755  public:
756  char* rawThis;
757  };
758 
759  static Block* make_block(size_t capacity) AE_NO_TSAN
760  {
761  // Allocate enough memory for the block itself, as well as all the elements it will contain
762  auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
763  size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
764  auto newBlockRaw = static_cast<char*>(std::malloc(size));
765  if (newBlockRaw == nullptr)
766  {
767  return nullptr;
768  }
769 
770  auto newBlockAligned = align_for<Block>(newBlockRaw);
771  auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
772  return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
773  }
774 
775 private:
776  weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
777 
779  weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
780 
782 
783 #ifndef NDEBUG
784  bool enqueuing;
785  mutable bool dequeuing;
786 #endif
787 };
788 
789 // Like ReaderWriterQueue, but also providees blocking operations
790 template <typename T, size_t MAX_BLOCK_SIZE = 512>
792 {
793 private:
794  typedef ::moodycamel::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
795 
796 public:
797  explicit BlockingReaderWriterQueue(size_t size = 15) AE_NO_TSAN : inner(size),
798  sema(new spsc_sema::LightweightSemaphore())
799  {
800  }
801 
802  BlockingReaderWriterQueue(BlockingReaderWriterQueue&& other) AE_NO_TSAN : inner(std::move(other.inner)),
803  sema(std::move(other.sema))
804  {
805  }
806 
808  {
809  std::swap(sema, other.sema);
810  std::swap(inner, other.inner);
811  return *this;
812  }
813 
814  // Enqueues a copy of element if there is room in the queue.
815  // Returns true if the element was enqueued, false otherwise.
816  // Does not allocate memory.
817  AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN
818  {
819  if (inner.try_enqueue(element))
820  {
821  sema->signal();
822  return true;
823  }
824  return false;
825  }
826 
827  // Enqueues a moved copy of element if there is room in the queue.
828  // Returns true if the element was enqueued, false otherwise.
829  // Does not allocate memory.
831  {
832  if (inner.try_enqueue(std::forward<T>(element)))
833  {
834  sema->signal();
835  return true;
836  }
837  return false;
838  }
839 
840  // Enqueues a copy of element on the queue.
841  // Allocates an additional block of memory if needed.
842  // Only fails (returns false) if memory allocation fails.
843  AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN
844  {
845  if (inner.enqueue(element))
846  {
847  sema->signal();
848  return true;
849  }
850  return false;
851  }
852 
853  // Enqueues a moved copy of element on the queue.
854  // Allocates an additional block of memory if needed.
855  // Only fails (returns false) if memory allocation fails.
857  {
858  if (inner.enqueue(std::forward<T>(element)))
859  {
860  sema->signal();
861  return true;
862  }
863  return false;
864  }
865 
866  // Attempts to dequeue an element; if the queue is empty,
867  // returns false instead. If the queue has at least one element,
868  // moves front to result using operator=, then returns true.
869  template <typename U>
870  bool try_dequeue(U& result) AE_NO_TSAN
871  {
872  if (sema->tryWait())
873  {
874  bool success = inner.try_dequeue(result);
875  assert(success);
876  AE_UNUSED(success);
877  return true;
878  }
879  return false;
880  }
881 
882  // Attempts to dequeue an element; if the queue is empty,
883  // waits until an element is available, then dequeues it.
884  template <typename U>
885  void wait_dequeue(U& result) AE_NO_TSAN
886  {
887  while (!sema->wait())
888  ;
889  bool success = inner.try_dequeue(result);
890  AE_UNUSED(result);
891  assert(success);
892  AE_UNUSED(success);
893  }
894 
895  // Attempts to dequeue an element; if the queue is empty,
896  // waits until an element is available up to the specified timeout,
897  // then dequeues it and returns true, or returns false if the timeout
898  // expires before an element can be dequeued.
899  // Using a negative timeout indicates an indefinite timeout,
900  // and is thus functionally equivalent to calling wait_dequeue.
901  template <typename U>
902  bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs) AE_NO_TSAN
903  {
904  if (!sema->wait(timeout_usecs))
905  {
906  return false;
907  }
908  bool success = inner.try_dequeue(result);
909  AE_UNUSED(result);
910  assert(success);
911  AE_UNUSED(success);
912  return true;
913  }
914 
915 #if __cplusplus > 199711L || _MSC_VER >= 1700
916  // Attempts to dequeue an element; if the queue is empty,
917  // waits until an element is available up to the specified timeout,
918  // then dequeues it and returns true, or returns false if the timeout
919  // expires before an element can be dequeued.
920  // Using a negative timeout indicates an indefinite timeout,
921  // and is thus functionally equivalent to calling wait_dequeue.
922  template <typename U, typename Rep, typename Period>
923  inline bool wait_dequeue_timed(U& result, std::chrono::duration<Rep, Period> const& timeout) AE_NO_TSAN
924  {
925  return wait_dequeue_timed(result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
926  }
927 #endif
928 
929  // Returns a pointer to the front element in the queue (the one that
930  // would be removed next by a call to `try_dequeue` or `pop`). If the
931  // queue appears empty at the time the method is called, nullptr is
932  // returned instead.
933  // Must be called only from the consumer thread.
935  {
936  return inner.peek();
937  }
938 
939  // Removes the front element from the queue, if any, without returning it.
940  // Returns true on success, or false if the queue appeared empty at the time
941  // `pop` was called.
943  {
944  if (sema->tryWait())
945  {
946  bool result = inner.pop();
947  assert(result);
948  AE_UNUSED(result);
949  return true;
950  }
951  return false;
952  }
953 
954  // Returns the approximate number of items currently in the queue.
955  // Safe to call from both the producer and consumer threads.
957  {
958  return sema->availableApprox();
959  }
960 
961  // Returns the total number of items that could be enqueued without incurring
962  // an allocation when this queue is empty.
963  // Safe to call from both the producer and consumer threads.
964  //
965  // NOTE: The actual capacity during usage may be different depending on the consumer.
966  // If the consumer is removing elements concurrently, the producer cannot add to
967  // the block the consumer is removing from until it's completely empty, except in
968  // the case where the producer was writing to the same block the consumer was
969  // reading from the whole time.
971  {
972  return inner.max_capacity();
973  }
974 
975 private:
976  // Disable copying & assignment
978  {
979  }
981  {
982  }
983 
984 private:
985  ReaderWriterQueue inner;
986  std::unique_ptr<spsc_sema::LightweightSemaphore> sema;
987 };
988 
989 } // end namespace moodycamel
990 
991 #ifdef AE_VCPP
992 #pragma warning(pop)
993 #endif
AE_FORCEINLINE bool try_enqueue(T const &element) AE_NO_TSAN
AE_FORCEINLINE bool try_enqueue(T &&element) AE_NO_TSAN
::moodycamel::ReaderWriterQueue< T, MAX_BLOCK_SIZE > ReaderWriterQueue
weak_atomic< Block * > tailBlock
bool try_dequeue(U &result) AE_NO_TSAN
#define AE_UNUSED(x)
Definition: atomicops.h:43
AE_FORCEINLINE bool pop() AE_NO_TSAN
#define MOODYCAMEL_CACHE_LINE_SIZE
BlockingReaderWriterQueue(BlockingReaderWriterQueue const &)
AE_NO_TSAN ReaderWriterQueue(size_t size=15)
bool wait_dequeue_timed(U &result, std::int64_t timeout_usecs) AE_NO_TSAN
AE_NO_TSAN Block(size_t const &_size, char *_rawThis, char *_data)
static AE_FORCEINLINE size_t ceilToPow2(size_t x)
AE_FORCEINLINE bool try_enqueue(T const &element) AE_NO_TSAN
AE_FORCEINLINE bool enqueue(T const &element) AE_NO_TSAN
AE_FORCEINLINE bool emplace(Args &&... args) AE_NO_TSAN
#define AE_NO_TSAN
Definition: atomicops.h:53
AE_FORCEINLINE size_t max_capacity() const
AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:237
ReaderWriterQueue(ReaderWriterQueue const &)
BlockingReaderWriterQueue(size_t size=15) AE_NO_TSAN
BlockingReaderWriterQueue & operator=(BlockingReaderWriterQueue &&other) AE_NO_TSAN
char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic< Block *>)]
bool try_dequeue(U &result) AE_NO_TSAN
std::unique_ptr< spsc_sema::LightweightSemaphore > sema
static Block * make_block(size_t capacity) AE_NO_TSAN
AE_FORCEINLINE size_t size_approx() const AE_NO_TSAN
AE_FORCEINLINE bool try_enqueue(T &&element) AE_NO_TSAN
BlockingReaderWriterQueue(BlockingReaderWriterQueue &&other) AE_NO_TSAN
AE_FORCEINLINE bool enqueue(T const &element) AE_NO_TSAN
ReentrantGuard & operator=(ReentrantGuard const &)
AE_NO_TSAN ReaderWriterQueue(ReaderWriterQueue &&other)
size_t size_approx() const AE_NO_TSAN
BlockingReaderWriterQueue & operator=(BlockingReaderWriterQueue const &)
AE_FORCEINLINE bool try_emplace(Args &&... args) AE_NO_TSAN
void wait_dequeue(U &result) AE_NO_TSAN
bool inner_enqueue(Args &&... args) AE_NO_TSAN
AE_FORCEINLINE bool enqueue(T &&element) AE_NO_TSAN
AE_FORCEINLINE void compiler_fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:214
AE_FORCEINLINE bool enqueue(T &&element) AE_NO_TSAN
ReaderWriterQueue & operator=(ReaderWriterQueue const &)
T * peek() const AE_NO_TSAN
ReaderWriterQueue & operator=(ReaderWriterQueue &&other) AE_NO_TSAN
weak_atomic< Block * > frontBlock
static AE_FORCEINLINE char * align_for(char *ptr) AE_NO_TSAN
AE_FORCEINLINE T * peek() const AE_NO_TSAN
#define AE_FORCEINLINE
Definition: atomicops.h:63


pf_driver
Author(s): Harsh Deshpande
autogenerated on Fri Feb 24 2023 03:59:35