readerwriterqueue.h
Go to the documentation of this file.
1 // ©2013-2016 Cameron Desrochers.
2 // Distributed under the simplified BSD license (see the license file that
3 // should have come with this header).
4 
5 #pragma once
6 
7 #include "atomicops.h"
8 #include <type_traits>
9 #include <utility>
10 #include <cassert>
11 #include <stdexcept>
12 #include <new>
13 #include <cstdint>
14 #include <cstdlib> // For malloc/free/abort & size_t
15 #include <memory>
16 #if __cplusplus > 199711L || _MSC_VER >= 1700 // C++11 or VS2012
17 #include <chrono>
18 #endif
19 
20 
21 // A lock-free queue for a single-consumer, single-producer architecture.
22 // The queue is also wait-free in the common path (except if more memory
23 // needs to be allocated, in which case malloc is called).
24 // Allocates memory sparingly (O(lg(n) times, amortized), and only once if
25 // the original maximum size estimate is never exceeded.
26 // Tested on x86/x64 processors, but semantics should be correct for all
27 // architectures (given the right implementations in atomicops.h), provided
28 // that aligned integer and pointer accesses are naturally atomic.
29 // Note that there should only be one consumer thread and producer thread;
30 // Switching roles of the threads, or using multiple consecutive threads for
31 // one role, is not safe unless properly synchronized.
32 // Using the queue exclusively from one thread is fine, though a bit silly.
33 
34 #ifndef MOODYCAMEL_CACHE_LINE_SIZE
35 #define MOODYCAMEL_CACHE_LINE_SIZE 64
36 #endif
37 
38 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
39 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
40 #define MOODYCAMEL_EXCEPTIONS_ENABLED
41 #endif
42 #endif
43 
44 #ifndef MOODYCAMEL_HAS_EMPLACE
45 #if !defined(_MSC_VER) || _MSC_VER >= 1800 // variadic templates: either a non-MS compiler or VS >= 2013
46 #define MOODYCAMEL_HAS_EMPLACE 1
47 #endif
48 #endif
49 
50 #ifdef AE_VCPP
51 #pragma warning(push)
52 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
53 #pragma warning(disable: 4820) // padding was added
54 #pragma warning(disable: 4127) // conditional expression is constant
55 #endif
56 
57 namespace moodycamel {
58 
59 template<typename T, size_t MAX_BLOCK_SIZE = 512>
61 {
62  // Design: Based on a queue-of-queues. The low-level queues are just
63  // circular buffers with front and tail indices indicating where the
64  // next element to dequeue is and where the next element can be enqueued,
65  // respectively. Each low-level queue is called a "block". Each block
66  // wastes exactly one element's worth of space to keep the design simple
67  // (if front == tail then the queue is empty, and can't be full).
68  // The high-level queue is a circular linked list of blocks; again there
69  // is a front and tail, but this time they are pointers to the blocks.
70  // The front block is where the next element to be dequeued is, provided
71  // the block is not empty. The back block is where elements are to be
72  // enqueued, provided the block is not full.
73  // The producer thread owns all the tail indices/pointers. The consumer
74  // thread owns all the front indices/pointers. Both threads read each
75  // other's variables, but only the owning thread updates them. E.g. After
76  // the consumer reads the producer's tail, the tail may change before the
77  // consumer is done dequeuing an object, but the consumer knows the tail
78  // will never go backwards, only forwards.
79  // If there is no room to enqueue an object, an additional block (of
80  // equal size to the last block) is added. Blocks are never removed.
81 
82 public:
83  typedef T value_type;
84 
85  // Constructs a queue that can hold maxSize elements without further
86  // allocations. If more than MAX_BLOCK_SIZE elements are requested,
87  // then several blocks of MAX_BLOCK_SIZE each are reserved (including
88  // at least one extra buffer block).
89  AE_NO_TSAN explicit ReaderWriterQueue(size_t maxSize = 15)
90 #ifndef NDEBUG
91  : enqueuing(false)
92  ,dequeuing(false)
93 #endif
94  {
95  assert(maxSize > 0);
96  assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
97  assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
98 
99  Block* firstBlock = nullptr;
100 
101  largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block
102  if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
103  // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
104  // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity
105  // between front == tail meaning "empty" and "full".
106  // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
107  // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying):
108  size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
109  largestBlockSize = MAX_BLOCK_SIZE;
110  Block* lastBlock = nullptr;
111  for (size_t i = 0; i != initialBlockCount; ++i) {
112  auto block = make_block(largestBlockSize);
113  if (block == nullptr) {
114 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
115  throw std::bad_alloc();
116 #else
117  abort();
118 #endif
119  }
120  if (firstBlock == nullptr) {
121  firstBlock = block;
122  }
123  else {
124  lastBlock->next = block;
125  }
126  lastBlock = block;
127  block->next = firstBlock;
128  }
129  }
130  else {
131  firstBlock = make_block(largestBlockSize);
132  if (firstBlock == nullptr) {
133 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
134  throw std::bad_alloc();
135 #else
136  abort();
137 #endif
138  }
139  firstBlock->next = firstBlock;
140  }
141  frontBlock = firstBlock;
142  tailBlock = firstBlock;
143 
144  // Make sure the reader/writer threads will have the initialized memory setup above:
146  }
147 
148  // Note: The queue should not be accessed concurrently while it's
149  // being moved. It's up to the user to synchronize this.
151  : frontBlock(other.frontBlock.load()),
152  tailBlock(other.tailBlock.load()),
154 #ifndef NDEBUG
155  ,enqueuing(false)
156  ,dequeuing(false)
157 #endif
158  {
159  other.largestBlockSize = 32;
160  Block* b = other.make_block(other.largestBlockSize);
161  if (b == nullptr) {
162 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
163  throw std::bad_alloc();
164 #else
165  abort();
166 #endif
167  }
168  b->next = b;
169  other.frontBlock = b;
170  other.tailBlock = b;
171  }
172 
173  // Note: The queue should not be accessed concurrently while it's
174  // being moved. It's up to the user to synchronize this.
176  {
177  Block* b = frontBlock.load();
178  frontBlock = other.frontBlock.load();
179  other.frontBlock = b;
180  b = tailBlock.load();
181  tailBlock = other.tailBlock.load();
182  other.tailBlock = b;
183  std::swap(largestBlockSize, other.largestBlockSize);
184  return *this;
185  }
186 
187  // Note: The queue should not be accessed concurrently while it's
188  // being deleted. It's up to the user to synchronize this.
190  {
191  // Make sure we get the latest version of all variables from other CPUs:
193 
194  // Destroy any remaining objects in queue and free memory
195  Block* frontBlock_ = frontBlock;
196  Block* block = frontBlock_;
197  do {
198  Block* nextBlock = block->next;
199  size_t blockFront = block->front;
200  size_t blockTail = block->tail;
201 
202  for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
203  auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
204  element->~T();
205  (void)element;
206  }
207 
208  auto rawBlock = block->rawThis;
209  block->~Block();
210  std::free(rawBlock);
211  block = nextBlock;
212  } while (block != frontBlock_);
213  }
214 
215 
216  // Enqueues a copy of element if there is room in the queue.
217  // Returns true if the element was enqueued, false otherwise.
218  // Does not allocate memory.
219  AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN
220  {
221  return inner_enqueue<CannotAlloc>(element);
222  }
223 
224  // Enqueues a moved copy of element if there is room in the queue.
225  // Returns true if the element was enqueued, false otherwise.
226  // Does not allocate memory.
228  {
229  return inner_enqueue<CannotAlloc>(std::forward<T>(element));
230  }
231 
232 #if MOODYCAMEL_HAS_EMPLACE
233  // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
234  template<typename... Args>
236  {
237  return inner_enqueue<CannotAlloc>(std::forward<Args>(args)...);
238  }
239 #endif
240 
241  // Enqueues a copy of element on the queue.
242  // Allocates an additional block of memory if needed.
243  // Only fails (returns false) if memory allocation fails.
244  AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN
245  {
246  return inner_enqueue<CanAlloc>(element);
247  }
248 
249  // Enqueues a moved copy of element on the queue.
250  // Allocates an additional block of memory if needed.
251  // Only fails (returns false) if memory allocation fails.
253  {
254  return inner_enqueue<CanAlloc>(std::forward<T>(element));
255  }
256 
257 #if MOODYCAMEL_HAS_EMPLACE
258  // Like enqueue() but with emplace semantics (i.e. construct-in-place).
259  template<typename... Args>
261  {
262  return inner_enqueue<CanAlloc>(std::forward<Args>(args)...);
263  }
264 #endif
265 
266  // Attempts to dequeue an element; if the queue is empty,
267  // returns false instead. If the queue has at least one element,
268  // moves front to result using operator=, then returns true.
269  template<typename U>
270  bool try_dequeue(U& result) AE_NO_TSAN
271  {
272 #ifndef NDEBUG
273  ReentrantGuard guard(this->dequeuing);
274 #endif
275 
276  // High-level pseudocode:
277  // Remember where the tail block is
278  // If the front block has an element in it, dequeue it
279  // Else
280  // If front block was the tail block when we entered the function, return false
281  // Else advance to next block and dequeue the item there
282 
283  // Note that we have to use the value of the tail block from before we check if the front
284  // block is full or not, in case the front block is empty and then, before we check if the
285  // tail block is at the front block or not, the producer fills up the front block *and
286  // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
287  // reproducible in practice.
288  // In order to avoid overhead in the common case, though, we do a double-checked pattern
289  // where we have the fast path if the front block is not empty, then read the tail block,
290  // then re-read the front block and check if it's not empty again, then check if the tail
291  // block has advanced.
292 
293  Block* frontBlock_ = frontBlock.load();
294  size_t blockTail = frontBlock_->localTail;
295  size_t blockFront = frontBlock_->front.load();
296 
297  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
299 
300  non_empty_front_block:
301  // Front block not empty, dequeue from here
302  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
303  result = std::move(*element);
304  element->~T();
305 
306  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
307 
309  frontBlock_->front = blockFront;
310  }
311  else if (frontBlock_ != tailBlock.load()) {
313 
314  frontBlock_ = frontBlock.load();
315  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
316  blockFront = frontBlock_->front.load();
318 
319  if (blockFront != blockTail) {
320  // Oh look, the front block isn't empty after all
321  goto non_empty_front_block;
322  }
323 
324  // Front block is empty but there's another block ahead, advance to it
325  Block* nextBlock = frontBlock_->next;
326  // Don't need an acquire fence here since next can only ever be set on the tailBlock,
327  // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
328  // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
329 
330  size_t nextBlockFront = nextBlock->front.load();
331  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
333 
334  // Since the tailBlock is only ever advanced after being written to,
335  // we know there's for sure an element to dequeue on it
336  assert(nextBlockFront != nextBlockTail);
337  AE_UNUSED(nextBlockTail);
338 
339  // We're done with this block, let the producer use it if it needs
340  fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
341  frontBlock = frontBlock_ = nextBlock;
342 
343  compiler_fence(memory_order_release); // Not strictly needed
344 
345  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
346 
347  result = std::move(*element);
348  element->~T();
349 
350  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
351 
353  frontBlock_->front = nextBlockFront;
354  }
355  else {
356  // No elements in current block and no other block to advance to
357  return false;
358  }
359 
360  return true;
361  }
362 
363 
364  // Returns a pointer to the front element in the queue (the one that
365  // would be removed next by a call to `try_dequeue` or `pop`). If the
366  // queue appears empty at the time the method is called, nullptr is
367  // returned instead.
368  // Must be called only from the consumer thread.
370  {
371 #ifndef NDEBUG
372  ReentrantGuard guard(this->dequeuing);
373 #endif
374  // See try_dequeue() for reasoning
375 
376  Block* frontBlock_ = frontBlock.load();
377  size_t blockTail = frontBlock_->localTail;
378  size_t blockFront = frontBlock_->front.load();
379 
380  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
382  non_empty_front_block:
383  return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
384  }
385  else if (frontBlock_ != tailBlock.load()) {
387  frontBlock_ = frontBlock.load();
388  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
389  blockFront = frontBlock_->front.load();
391 
392  if (blockFront != blockTail) {
393  goto non_empty_front_block;
394  }
395 
396  Block* nextBlock = frontBlock_->next;
397 
398  size_t nextBlockFront = nextBlock->front.load();
400 
401  assert(nextBlockFront != nextBlock->tail.load());
402  return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
403  }
404 
405  return nullptr;
406  }
407 
408  // Removes the front element from the queue, if any, without returning it.
409  // Returns true on success, or false if the queue appeared empty at the time
410  // `pop` was called.
412  {
413 #ifndef NDEBUG
414  ReentrantGuard guard(this->dequeuing);
415 #endif
416  // See try_dequeue() for reasoning
417 
418  Block* frontBlock_ = frontBlock.load();
419  size_t blockTail = frontBlock_->localTail;
420  size_t blockFront = frontBlock_->front.load();
421 
422  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
424 
425  non_empty_front_block:
426  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
427  element->~T();
428 
429  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
430 
432  frontBlock_->front = blockFront;
433  }
434  else if (frontBlock_ != tailBlock.load()) {
436  frontBlock_ = frontBlock.load();
437  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
438  blockFront = frontBlock_->front.load();
440 
441  if (blockFront != blockTail) {
442  goto non_empty_front_block;
443  }
444 
445  // Front block is empty but there's another block ahead, advance to it
446  Block* nextBlock = frontBlock_->next;
447 
448  size_t nextBlockFront = nextBlock->front.load();
449  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
451 
452  assert(nextBlockFront != nextBlockTail);
453  AE_UNUSED(nextBlockTail);
454 
456  frontBlock = frontBlock_ = nextBlock;
457 
459 
460  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
461  element->~T();
462 
463  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
464 
466  frontBlock_->front = nextBlockFront;
467  }
468  else {
469  // No elements in current block and no other block to advance to
470  return false;
471  }
472 
473  return true;
474  }
475 
476  // Returns the approximate number of items currently in the queue.
477  // Safe to call from both the producer and consumer threads.
478  inline size_t size_approx() const AE_NO_TSAN
479  {
480  size_t result = 0;
481  Block* frontBlock_ = frontBlock.load();
482  Block* block = frontBlock_;
483  do {
485  size_t blockFront = block->front.load();
486  size_t blockTail = block->tail.load();
487  result += (blockTail - blockFront) & block->sizeMask;
488  block = block->next.load();
489  } while (block != frontBlock_);
490  return result;
491  }
492 
493 
494 private:
496 
497 #if MOODYCAMEL_HAS_EMPLACE
498  template<AllocationMode canAlloc, typename... Args>
499  bool inner_enqueue(Args&&... args) AE_NO_TSAN
500 #else
501  template<AllocationMode canAlloc, typename U>
502  bool inner_enqueue(U&& element) AE_NO_TSAN
503 #endif
504  {
505 #ifndef NDEBUG
506  ReentrantGuard guard(this->enqueuing);
507 #endif
508 
509  // High-level pseudocode (assuming we're allowed to alloc a new block):
510  // If room in tail block, add to tail
511  // Else check next block
512  // If next block is not the head block, enqueue on next block
513  // Else create a new block and enqueue there
514  // Advance tail to the block we just enqueued to
515 
516  Block* tailBlock_ = tailBlock.load();
517  size_t blockFront = tailBlock_->localFront;
518  size_t blockTail = tailBlock_->tail.load();
519 
520  size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
521  if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
523  // This block has room for at least one more element
524  char* location = tailBlock_->data + blockTail * sizeof(T);
525 #if MOODYCAMEL_HAS_EMPLACE
526  new (location) T(std::forward<Args>(args)...);
527 #else
528  new (location) T(std::forward<U>(element));
529 #endif
530 
532  tailBlock_->tail = nextBlockTail;
533  }
534  else {
536  if (tailBlock_->next.load() != frontBlock) {
537  // Note that the reason we can't advance to the frontBlock and start adding new entries there
538  // is because if we did, then dequeue would stay in that block, eventually reading the new values,
539  // instead of advancing to the next full block (whose values were enqueued first and so should be
540  // consumed first).
541 
542  fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
543 
544  // tailBlock is full, but there's a free block ahead, use it
545  Block* tailBlockNext = tailBlock_->next.load();
546  size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
547  nextBlockTail = tailBlockNext->tail.load();
549 
550  // This block must be empty since it's not the head block and we
551  // go through the blocks in a circle
552  assert(nextBlockFront == nextBlockTail);
553  tailBlockNext->localFront = nextBlockFront;
554 
555  char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
556 #if MOODYCAMEL_HAS_EMPLACE
557  new (location) T(std::forward<Args>(args)...);
558 #else
559  new (location) T(std::forward<U>(element));
560 #endif
561 
562  tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
563 
565  tailBlock = tailBlockNext;
566  }
567  else if (canAlloc == CanAlloc) {
568  // tailBlock is full and there's no free block ahead; create a new block
569  auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
570  auto newBlock = make_block(newBlockSize);
571  if (newBlock == nullptr) {
572  // Could not allocate a block!
573  return false;
574  }
575  largestBlockSize = newBlockSize;
576 
577 #if MOODYCAMEL_HAS_EMPLACE
578  new (newBlock->data) T(std::forward<Args>(args)...);
579 #else
580  new (newBlock->data) T(std::forward<U>(element));
581 #endif
582  assert(newBlock->front == 0);
583  newBlock->tail = newBlock->localTail = 1;
584 
585  newBlock->next = tailBlock_->next.load();
586  tailBlock_->next = newBlock;
587 
588  // Might be possible for the dequeue thread to see the new tailBlock->next
589  // *without* seeing the new tailBlock value, but this is OK since it can't
590  // advance to the next block until tailBlock is set anyway (because the only
591  // case where it could try to read the next is if it's already at the tailBlock,
592  // and it won't advance past tailBlock in any circumstance).
593 
595  tailBlock = newBlock;
596  }
597  else if (canAlloc == CannotAlloc) {
598  // Would have had to allocate a new block to enqueue, but not allowed
599  return false;
600  }
601  else {
602  assert(false && "Should be unreachable code");
603  return false;
604  }
605  }
606 
607  return true;
608  }
609 
610 
611  // Disable copying
613 
614  // Disable assignment
616 
617 
618 
619  AE_FORCEINLINE static size_t ceilToPow2(size_t x)
620  {
621  // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
622  --x;
623  x |= x >> 1;
624  x |= x >> 2;
625  x |= x >> 4;
626  for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
627  x |= x >> (i << 3);
628  }
629  ++x;
630  return x;
631  }
632 
633  template<typename U>
634  static AE_FORCEINLINE char* align_for(char* ptr) AE_NO_TSAN
635  {
636  const std::size_t alignment = std::alignment_of<U>::value;
637  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
638  }
639 private:
640 #ifndef NDEBUG
642  {
643  AE_NO_TSAN ReentrantGuard(bool& _inSection)
644  : inSection(_inSection)
645  {
646  assert(!inSection && "Concurrent (or re-entrant) enqueue or dequeue operation detected (only one thread at a time may hold the producer or consumer role)");
647  inSection = true;
648  }
649 
651 
652  private:
654 
655  private:
656  bool& inSection;
657  };
658 #endif
659 
660  struct Block
661  {
662  // Avoid false-sharing by putting highly contended variables on their own cache lines
663  weak_atomic<size_t> front; // (Atomic) Elements are read from here
664  size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
665 
666  char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)];
667  weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
668  size_t localFront;
669 
670  char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as tail (which is)
672 
673  char* data; // Contents (on heap) are aligned to T's alignment
674 
675  const size_t sizeMask;
676 
677 
678  // size must be a power of two (and greater than 0)
679  AE_NO_TSAN Block(size_t const& _size, char* _rawThis, char* _data)
680  : front(0), localTail(0), tail(0), localFront(0), next(nullptr), data(_data), sizeMask(_size - 1), rawThis(_rawThis)
681  {
682  }
683 
684  private:
685  // C4512 - Assignment operator could not be generated
686  Block& operator=(Block const&);
687 
688  public:
689  char* rawThis;
690  };
691 
692 
693  static Block* make_block(size_t capacity) AE_NO_TSAN
694  {
695  // Allocate enough memory for the block itself, as well as all the elements it will contain
696  auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
697  size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
698  auto newBlockRaw = static_cast<char*>(std::malloc(size));
699  if (newBlockRaw == nullptr) {
700  return nullptr;
701  }
702 
703  auto newBlockAligned = align_for<Block>(newBlockRaw);
704  auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
705  return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
706  }
707 
708 private:
709  weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
710 
712  weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
713 
715 
716 #ifndef NDEBUG
717  bool enqueuing;
718  bool dequeuing;
719 #endif
720 };
721 
722 // Like ReaderWriterQueue, but also providees blocking operations
723 template<typename T, size_t MAX_BLOCK_SIZE = 512>
725 {
726 private:
727  typedef ::moodycamel::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
728 
729 public:
730  explicit BlockingReaderWriterQueue(size_t maxSize = 15) AE_NO_TSAN
731  : inner(maxSize), sema(new spsc_sema::LightweightSemaphore())
732  { }
733 
735  : inner(std::move(other.inner)), sema(std::move(other.sema))
736  { }
737 
739  {
740  std::swap(sema, other.sema);
741  std::swap(inner, other.inner);
742  return *this;
743  }
744 
745 
746  // Enqueues a copy of element if there is room in the queue.
747  // Returns true if the element was enqueued, false otherwise.
748  // Does not allocate memory.
749  AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN
750  {
751  if (inner.try_enqueue(element)) {
752  sema->signal();
753  return true;
754  }
755  return false;
756  }
757 
758  // Enqueues a moved copy of element if there is room in the queue.
759  // Returns true if the element was enqueued, false otherwise.
760  // Does not allocate memory.
762  {
763  if (inner.try_enqueue(std::forward<T>(element))) {
764  sema->signal();
765  return true;
766  }
767  return false;
768  }
769 
770 
771  // Enqueues a copy of element on the queue.
772  // Allocates an additional block of memory if needed.
773  // Only fails (returns false) if memory allocation fails.
774  AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN
775  {
776  if (inner.enqueue(element)) {
777  sema->signal();
778  return true;
779  }
780  return false;
781  }
782 
783  // Enqueues a moved copy of element on the queue.
784  // Allocates an additional block of memory if needed.
785  // Only fails (returns false) if memory allocation fails.
787  {
788  if (inner.enqueue(std::forward<T>(element))) {
789  sema->signal();
790  return true;
791  }
792  return false;
793  }
794 
795 
796  // Attempts to dequeue an element; if the queue is empty,
797  // returns false instead. If the queue has at least one element,
798  // moves front to result using operator=, then returns true.
799  template<typename U>
800  bool try_dequeue(U& result) AE_NO_TSAN
801  {
802  if (sema->tryWait()) {
803  bool success = inner.try_dequeue(result);
804  assert(success);
805  AE_UNUSED(success);
806  return true;
807  }
808  return false;
809  }
810 
811 
812  // Attempts to dequeue an element; if the queue is empty,
813  // waits until an element is available, then dequeues it.
814  template<typename U>
815  void wait_dequeue(U& result) AE_NO_TSAN
816  {
817  sema->wait();
818  bool success = inner.try_dequeue(result);
819  AE_UNUSED(result);
820  assert(success);
821  AE_UNUSED(success);
822  }
823 
824 
825  // Attempts to dequeue an element; if the queue is empty,
826  // waits until an element is available up to the specified timeout,
827  // then dequeues it and returns true, or returns false if the timeout
828  // expires before an element can be dequeued.
829  // Using a negative timeout indicates an indefinite timeout,
830  // and is thus functionally equivalent to calling wait_dequeue.
831  template<typename U>
832  bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs) AE_NO_TSAN
833  {
834  if (!sema->wait(timeout_usecs)) {
835  return false;
836  }
837  bool success = inner.try_dequeue(result);
838  AE_UNUSED(result);
839  assert(success);
840  AE_UNUSED(success);
841  return true;
842  }
843 
844 
845 #if __cplusplus > 199711L || _MSC_VER >= 1700
846  // Attempts to dequeue an element; if the queue is empty,
847  // waits until an element is available up to the specified timeout,
848  // then dequeues it and returns true, or returns false if the timeout
849  // expires before an element can be dequeued.
850  // Using a negative timeout indicates an indefinite timeout,
851  // and is thus functionally equivalent to calling wait_dequeue.
852  template<typename U, typename Rep, typename Period>
853  inline bool wait_dequeue_timed(U& result, std::chrono::duration<Rep, Period> const& timeout) AE_NO_TSAN
854  {
855  return wait_dequeue_timed(result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
856  }
857 #endif
858 
859 
860  // Returns a pointer to the front element in the queue (the one that
861  // would be removed next by a call to `try_dequeue` or `pop`). If the
862  // queue appears empty at the time the method is called, nullptr is
863  // returned instead.
864  // Must be called only from the consumer thread.
866  {
867  return inner.peek();
868  }
869 
870  // Removes the front element from the queue, if any, without returning it.
871  // Returns true on success, or false if the queue appeared empty at the time
872  // `pop` was called.
874  {
875  if (sema->tryWait()) {
876  bool result = inner.pop();
877  assert(result);
878  AE_UNUSED(result);
879  return true;
880  }
881  return false;
882  }
883 
884  // Returns the approximate number of items currently in the queue.
885  // Safe to call from both the producer and consumer threads.
887  {
888  return sema->availableApprox();
889  }
890 
891 
892 private:
893  // Disable copying & assignment
896 
897 private:
898  ReaderWriterQueue inner;
899  std::unique_ptr<spsc_sema::LightweightSemaphore> sema;
900 };
901 
902 } // end namespace moodycamel
903 
904 #ifdef AE_VCPP
905 #pragma warning(pop)
906 #endif
char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE-sizeof(weak_atomic< Block * >)]
AE_FORCEINLINE bool try_enqueue(T const &element) AE_NO_TSAN
AE_FORCEINLINE bool try_enqueue(T &&element) AE_NO_TSAN
::moodycamel::ReaderWriterQueue< T, MAX_BLOCK_SIZE > ReaderWriterQueue
weak_atomic< Block * > tailBlock
AE_NO_TSAN ReaderWriterQueue(size_t maxSize=15)
bool try_dequeue(U &result) AE_NO_TSAN
#define AE_UNUSED(x)
Definition: atomicops.h:43
AE_FORCEINLINE bool pop() AE_NO_TSAN
#define MOODYCAMEL_CACHE_LINE_SIZE
BlockingReaderWriterQueue(BlockingReaderWriterQueue const &)
bool wait_dequeue_timed(U &result, std::int64_t timeout_usecs) AE_NO_TSAN
AE_NO_TSAN Block(size_t const &_size, char *_rawThis, char *_data)
static AE_FORCEINLINE size_t ceilToPow2(size_t x)
AE_FORCEINLINE bool try_enqueue(T const &element) AE_NO_TSAN
AE_FORCEINLINE bool enqueue(T const &element) AE_NO_TSAN
AE_FORCEINLINE bool emplace(Args &&...args) AE_NO_TSAN
#define AE_NO_TSAN
Definition: atomicops.h:53
AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:206
ReaderWriterQueue(ReaderWriterQueue const &)
AE_FORCEINLINE T load() const AE_NO_TSAN
Definition: atomicops.h:307
BlockingReaderWriterQueue & operator=(BlockingReaderWriterQueue &&other) AE_NO_TSAN
AE_FORCEINLINE bool try_emplace(Args &&...args) AE_NO_TSAN
bool try_dequeue(U &result) AE_NO_TSAN
std::unique_ptr< spsc_sema::LightweightSemaphore > sema
static Block * make_block(size_t capacity) AE_NO_TSAN
AE_FORCEINLINE size_t size_approx() const AE_NO_TSAN
AE_FORCEINLINE bool try_enqueue(T &&element) AE_NO_TSAN
BlockingReaderWriterQueue(BlockingReaderWriterQueue &&other) AE_NO_TSAN
AE_FORCEINLINE bool enqueue(T const &element) AE_NO_TSAN
ReentrantGuard & operator=(ReentrantGuard const &)
AE_NO_TSAN ReaderWriterQueue(ReaderWriterQueue &&other)
size_t size_approx() const AE_NO_TSAN
bool inner_enqueue(Args &&...args) AE_NO_TSAN
BlockingReaderWriterQueue & operator=(BlockingReaderWriterQueue const &)
void wait_dequeue(U &result) AE_NO_TSAN
AE_FORCEINLINE bool enqueue(T &&element) AE_NO_TSAN
AE_FORCEINLINE void compiler_fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:194
AE_FORCEINLINE bool enqueue(T &&element) AE_NO_TSAN
ReaderWriterQueue & operator=(ReaderWriterQueue const &)
ReaderWriterQueue & operator=(ReaderWriterQueue &&other) AE_NO_TSAN
BlockingReaderWriterQueue(size_t maxSize=15) AE_NO_TSAN
AE_FORCEINLINE T * peek() AE_NO_TSAN
weak_atomic< Block * > frontBlock
static AE_FORCEINLINE char * align_for(char *ptr) AE_NO_TSAN
#define AE_FORCEINLINE
Definition: atomicops.h:64


orb_slam2_with_maps_odom
Author(s): teng zhang
autogenerated on Fri Sep 25 2020 03:24:47