libzmq
src
yqueue.hpp
Go to the documentation of this file.
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
4
#define __ZMQ_YQUEUE_HPP_INCLUDED__
5
6
#include <stdlib.h>
7
#include <stddef.h>
8
9
#include "
err.hpp
"
10
#include "
atomic_ptr.hpp
"
11
#include "platform.hpp"
12
13
namespace
zmq
14
{
15
// yqueue is an efficient queue implementation. The main goal is
16
// to minimise number of allocations/deallocations needed. Thus yqueue
17
// allocates/deallocates elements in batches of N.
18
//
19
// yqueue allows one thread to use push/back function and another one
20
// to use pop/front functions. However, user must ensure that there's no
21
// pop on the empty queue and that both threads don't access the same
22
// element in unsynchronised manner.
23
//
24
// T is the type of the object in the queue.
25
// N is granularity of the queue (how many pushes have to be done till
26
// actual memory allocation is required).
27
#if defined HAVE_POSIX_MEMALIGN
28
// ALIGN is the memory alignment size to use in the case where we have
29
// posix_memalign available. Default value is 64, this alignment will
30
// prevent two queue chunks from occupying the same CPU cache line on
31
// architectures where cache lines are <= 64 bytes (e.g. most things
32
// except POWER). It is detected at build time to try to account for other
33
// platforms like POWER and s390x.
34
template
<
typename
T,
int
N,
size_t
ALIGN = ZMQ_CACHELINE_SIZE>
class
yqueue_t
35
#else
36
template
<typename
T
, int N> class
yqueue_t
37
#endif
38
{
39
public
:
40
// Create the queue.
41
inline
yqueue_t
()
42
{
43
_begin_chunk
=
allocate_chunk
();
44
alloc_assert
(
_begin_chunk
);
45
_begin_pos
= 0;
46
_back_chunk
=
NULL
;
47
_back_pos
= 0;
48
_end_chunk
=
_begin_chunk
;
49
_end_pos
= 0;
50
}
51
52
// Destroy the queue.
53
inline
~yqueue_t
()
54
{
55
while
(
true
) {
56
if
(
_begin_chunk
==
_end_chunk
) {
57
free (
_begin_chunk
);
58
break
;
59
}
60
chunk_t
*o =
_begin_chunk
;
61
_begin_chunk
=
_begin_chunk
->
next
;
62
free (o);
63
}
64
65
chunk_t
*
sc
=
_spare_chunk
.xchg (
NULL
);
66
free (
sc
);
67
}
68
69
// Returns reference to the front element of the queue.
70
// If the queue is empty, behaviour is undefined.
71
inline
T
&
front
() {
return
_begin_chunk
->
values
[
_begin_pos
]; }
72
73
// Returns reference to the back element of the queue.
74
// If the queue is empty, behaviour is undefined.
75
inline
T
&
back
() {
return
_back_chunk
->
values
[
_back_pos
]; }
76
77
// Adds an element to the back end of the queue.
78
inline
void
push
()
79
{
80
_back_chunk
=
_end_chunk
;
81
_back_pos
=
_end_pos
;
82
83
if
(++
_end_pos
!= N)
84
return
;
85
86
chunk_t
*
sc
=
_spare_chunk
.xchg (
NULL
);
87
if
(
sc
) {
88
_end_chunk
->
next
=
sc
;
89
sc
->prev =
_end_chunk
;
90
}
else
{
91
_end_chunk
->
next
=
allocate_chunk
();
92
alloc_assert
(
_end_chunk
->
next
);
93
_end_chunk
->
next
->
prev
=
_end_chunk
;
94
}
95
_end_chunk
=
_end_chunk
->
next
;
96
_end_pos
= 0;
97
}
98
99
// Removes element from the back end of the queue. In other words
100
// it rollbacks last push to the queue. Take care: Caller is
101
// responsible for destroying the object being unpushed.
102
// The caller must also guarantee that the queue isn't empty when
103
// unpush is called. It cannot be done automatically as the read
104
// side of the queue can be managed by different, completely
105
// unsynchronised thread.
106
inline
void
unpush
()
107
{
108
// First, move 'back' one position backwards.
109
if
(
_back_pos
)
110
--
_back_pos
;
111
else
{
112
_back_pos
= N - 1;
113
_back_chunk
=
_back_chunk
->
prev
;
114
}
115
116
// Now, move 'end' position backwards. Note that obsolete end chunk
117
// is not used as a spare chunk. The analysis shows that doing so
118
// would require free and atomic operation per chunk deallocated
119
// instead of a simple free.
120
if
(
_end_pos
)
121
--
_end_pos
;
122
else
{
123
_end_pos
= N - 1;
124
_end_chunk
=
_end_chunk
->
prev
;
125
free (
_end_chunk
->
next
);
126
_end_chunk
->
next
=
NULL
;
127
}
128
}
129
130
// Removes an element from the front end of the queue.
131
inline
void
pop
()
132
{
133
if
(++
_begin_pos
== N) {
134
chunk_t
*o =
_begin_chunk
;
135
_begin_chunk
=
_begin_chunk
->
next
;
136
_begin_chunk
->
prev
=
NULL
;
137
_begin_pos
= 0;
138
139
// 'o' has been more recently used than _spare_chunk,
140
// so for cache reasons we'll get rid of the spare and
141
// use 'o' as the spare.
142
chunk_t
*cs =
_spare_chunk
.xchg (o);
143
free (cs);
144
}
145
}
146
147
private
:
148
// Individual memory chunk to hold N elements.
149
struct
chunk_t
150
{
151
T
values
[N];
152
chunk_t
*
prev
;
153
chunk_t
*
next
;
154
};
155
156
static
inline
chunk_t
*
allocate_chunk
()
157
{
158
#if defined HAVE_POSIX_MEMALIGN
159
void
*pv;
160
if
(posix_memalign (&pv, ALIGN,
sizeof
(
chunk_t
)) == 0)
161
return
(
chunk_t
*) pv;
162
return
NULL
;
163
#else
164
return
static_cast<
chunk_t
*
>
(malloc (
sizeof
(
chunk_t
)));
165
#endif
166
}
167
168
// Back position may point to invalid memory if the queue is empty,
169
// while begin & end positions are always valid. Begin position is
170
// accessed exclusively be queue reader (front/pop), while back and
171
// end positions are accessed exclusively by queue writer (back/push).
172
chunk_t
*
_begin_chunk
;
173
int
_begin_pos
;
174
chunk_t
*
_back_chunk
;
175
int
_back_pos
;
176
chunk_t
*
_end_chunk
;
177
int
_end_pos
;
178
179
// People are likely to produce and consume at similar rates. In
180
// this scenario holding onto the most recently freed chunk saves
181
// us from having to call malloc/free.
182
atomic_ptr_t<chunk_t>
_spare_chunk
;
183
184
ZMQ_NON_COPYABLE_NOR_MOVABLE
(
yqueue_t
)
185
};
186
}
187
188
#endif
zmq::yqueue_t::_back_chunk
chunk_t * _back_chunk
Definition:
yqueue.hpp:174
zmq::yqueue_t::unpush
void unpush()
Definition:
yqueue.hpp:106
zmq::yqueue_t::_begin_chunk
chunk_t * _begin_chunk
Definition:
yqueue.hpp:172
zmq::yqueue_t::chunk_t::prev
chunk_t * prev
Definition:
yqueue.hpp:152
zmq::yqueue_t::yqueue_t
yqueue_t()
Definition:
yqueue.hpp:41
zmq::yqueue_t::_spare_chunk
atomic_ptr_t< chunk_t > _spare_chunk
Definition:
yqueue.hpp:182
NULL
NULL
Definition:
test_security_zap.cpp:405
zmq::atomic_ptr_t
Definition:
atomic_ptr.hpp:150
zmq::yqueue_t::front
T & front()
Definition:
yqueue.hpp:71
template
string template
zmq::yqueue_t::pop
void pop()
Definition:
yqueue.hpp:131
zmq::yqueue_t::push
void push()
Definition:
yqueue.hpp:78
T
#define T(upbtypeconst, upbtype, ctype, default_value)
zmq::yqueue_t::chunk_t
Definition:
yqueue.hpp:149
zmq::yqueue_t::~yqueue_t
~yqueue_t()
Definition:
yqueue.hpp:53
values
GLenum GLsizei GLsizei GLint * values
Definition:
glcorearb.h:3591
zmq
Definition:
zmq.hpp:229
alloc_assert
#define alloc_assert(x)
Definition:
err.hpp:146
zmq::yqueue_t::chunk_t::next
chunk_t * next
Definition:
yqueue.hpp:153
zmq::yqueue_t::_end_pos
int _end_pos
Definition:
yqueue.hpp:177
zmq::yqueue_t::chunk_t::values
T values[N]
Definition:
yqueue.hpp:151
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition:
macros.hpp:58
zmq::yqueue_t::allocate_chunk
static chunk_t * allocate_chunk()
Definition:
yqueue.hpp:156
sc
void * sc
Definition:
test_channel.cpp:9
err.hpp
zmq::yqueue_t::back
T & back()
Definition:
yqueue.hpp:75
atomic_ptr.hpp
zmq::yqueue_t::_end_chunk
chunk_t * _end_chunk
Definition:
yqueue.hpp:176
zmq::yqueue_t::_begin_pos
int _begin_pos
Definition:
yqueue.hpp:173
zmq::yqueue_t
Definition:
yqueue.hpp:36
zmq::yqueue_t::_back_pos
int _back_pos
Definition:
yqueue.hpp:175
libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02