yqueue.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
4 #define __ZMQ_YQUEUE_HPP_INCLUDED__
5 
6 #include <stdlib.h>
7 #include <stddef.h>
8 
9 #include "err.hpp"
10 #include "atomic_ptr.hpp"
11 #include "platform.hpp"
12 
13 namespace zmq
14 {
15 // yqueue is an efficient queue implementation. The main goal is
16 // to minimise number of allocations/deallocations needed. Thus yqueue
17 // allocates/deallocates elements in batches of N.
18 //
19 // yqueue allows one thread to use push/back function and another one
20 // to use pop/front functions. However, user must ensure that there's no
21 // pop on the empty queue and that both threads don't access the same
22 // element in unsynchronised manner.
23 //
24 // T is the type of the object in the queue.
25 // N is granularity of the queue (how many pushes have to be done till
26 // actual memory allocation is required).
27 #if defined HAVE_POSIX_MEMALIGN
28 // ALIGN is the memory alignment size to use in the case where we have
29 // posix_memalign available. Default value is 64, this alignment will
30 // prevent two queue chunks from occupying the same CPU cache line on
31 // architectures where cache lines are <= 64 bytes (e.g. most things
32 // except POWER). It is detected at build time to try to account for other
33 // platforms like POWER and s390x.
34 template <typename T, int N, size_t ALIGN = ZMQ_CACHELINE_SIZE> class yqueue_t
35 #else
36 template <typename T, int N> class yqueue_t
37 #endif
38 {
39  public:
40  // Create the queue.
41  inline yqueue_t ()
42  {
45  _begin_pos = 0;
46  _back_chunk = NULL;
47  _back_pos = 0;
49  _end_pos = 0;
50  }
51 
52  // Destroy the queue.
53  inline ~yqueue_t ()
54  {
55  while (true) {
56  if (_begin_chunk == _end_chunk) {
57  free (_begin_chunk);
58  break;
59  }
60  chunk_t *o = _begin_chunk;
62  free (o);
63  }
64 
65  chunk_t *sc = _spare_chunk.xchg (NULL);
66  free (sc);
67  }
68 
69  // Returns reference to the front element of the queue.
70  // If the queue is empty, behaviour is undefined.
71  inline T &front () { return _begin_chunk->values[_begin_pos]; }
72 
73  // Returns reference to the back element of the queue.
74  // If the queue is empty, behaviour is undefined.
75  inline T &back () { return _back_chunk->values[_back_pos]; }
76 
77  // Adds an element to the back end of the queue.
78  inline void push ()
79  {
82 
83  if (++_end_pos != N)
84  return;
85 
86  chunk_t *sc = _spare_chunk.xchg (NULL);
87  if (sc) {
88  _end_chunk->next = sc;
89  sc->prev = _end_chunk;
90  } else {
94  }
96  _end_pos = 0;
97  }
98 
99  // Removes element from the back end of the queue. In other words
100  // it rollbacks last push to the queue. Take care: Caller is
101  // responsible for destroying the object being unpushed.
102  // The caller must also guarantee that the queue isn't empty when
103  // unpush is called. It cannot be done automatically as the read
104  // side of the queue can be managed by different, completely
105  // unsynchronised thread.
106  inline void unpush ()
107  {
108  // First, move 'back' one position backwards.
109  if (_back_pos)
110  --_back_pos;
111  else {
112  _back_pos = N - 1;
114  }
115 
116  // Now, move 'end' position backwards. Note that obsolete end chunk
117  // is not used as a spare chunk. The analysis shows that doing so
118  // would require free and atomic operation per chunk deallocated
119  // instead of a simple free.
120  if (_end_pos)
121  --_end_pos;
122  else {
123  _end_pos = N - 1;
125  free (_end_chunk->next);
126  _end_chunk->next = NULL;
127  }
128  }
129 
130  // Removes an element from the front end of the queue.
131  inline void pop ()
132  {
133  if (++_begin_pos == N) {
134  chunk_t *o = _begin_chunk;
137  _begin_pos = 0;
138 
139  // 'o' has been more recently used than _spare_chunk,
140  // so for cache reasons we'll get rid of the spare and
141  // use 'o' as the spare.
142  chunk_t *cs = _spare_chunk.xchg (o);
143  free (cs);
144  }
145  }
146 
147  private:
148  // Individual memory chunk to hold N elements.
149  struct chunk_t
150  {
151  T values[N];
154  };
155 
156  static inline chunk_t *allocate_chunk ()
157  {
158 #if defined HAVE_POSIX_MEMALIGN
159  void *pv;
160  if (posix_memalign (&pv, ALIGN, sizeof (chunk_t)) == 0)
161  return (chunk_t *) pv;
162  return NULL;
163 #else
164  return static_cast<chunk_t *> (malloc (sizeof (chunk_t)));
165 #endif
166  }
167 
168  // Back position may point to invalid memory if the queue is empty,
169  // while begin & end positions are always valid. Begin position is
170  // accessed exclusively be queue reader (front/pop), while back and
171  // end positions are accessed exclusively by queue writer (back/push).
177  int _end_pos;
178 
179  // People are likely to produce and consume at similar rates. In
180  // this scenario holding onto the most recently freed chunk saves
181  // us from having to call malloc/free.
183 
185 };
186 }
187 
188 #endif
zmq::yqueue_t::_back_chunk
chunk_t * _back_chunk
Definition: yqueue.hpp:174
zmq::yqueue_t::unpush
void unpush()
Definition: yqueue.hpp:106
zmq::yqueue_t::_begin_chunk
chunk_t * _begin_chunk
Definition: yqueue.hpp:172
zmq::yqueue_t::chunk_t::prev
chunk_t * prev
Definition: yqueue.hpp:152
zmq::yqueue_t::yqueue_t
yqueue_t()
Definition: yqueue.hpp:41
zmq::yqueue_t::_spare_chunk
atomic_ptr_t< chunk_t > _spare_chunk
Definition: yqueue.hpp:182
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::atomic_ptr_t
Definition: atomic_ptr.hpp:150
zmq::yqueue_t::front
T & front()
Definition: yqueue.hpp:71
template
string template
zmq::yqueue_t::pop
void pop()
Definition: yqueue.hpp:131
zmq::yqueue_t::push
void push()
Definition: yqueue.hpp:78
T
#define T(upbtypeconst, upbtype, ctype, default_value)
zmq::yqueue_t::chunk_t
Definition: yqueue.hpp:149
zmq::yqueue_t::~yqueue_t
~yqueue_t()
Definition: yqueue.hpp:53
values
GLenum GLsizei GLsizei GLint * values
Definition: glcorearb.h:3591
zmq
Definition: zmq.hpp:229
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
zmq::yqueue_t::chunk_t::next
chunk_t * next
Definition: yqueue.hpp:153
zmq::yqueue_t::_end_pos
int _end_pos
Definition: yqueue.hpp:177
zmq::yqueue_t::chunk_t::values
T values[N]
Definition: yqueue.hpp:151
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
zmq::yqueue_t::allocate_chunk
static chunk_t * allocate_chunk()
Definition: yqueue.hpp:156
sc
void * sc
Definition: test_channel.cpp:9
err.hpp
zmq::yqueue_t::back
T & back()
Definition: yqueue.hpp:75
atomic_ptr.hpp
zmq::yqueue_t::_end_chunk
chunk_t * _end_chunk
Definition: yqueue.hpp:176
zmq::yqueue_t::_begin_pos
int _begin_pos
Definition: yqueue.hpp:173
zmq::yqueue_t
Definition: yqueue.hpp:36
zmq::yqueue_t::_back_pos
int _back_pos
Definition: yqueue.hpp:175


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02