ypipe.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_YPIPE_HPP_INCLUDED__
4 #define __ZMQ_YPIPE_HPP_INCLUDED__
5 
6 #include "atomic_ptr.hpp"
7 #include "yqueue.hpp"
8 #include "ypipe_base.hpp"
9 
10 namespace zmq
11 {
12 // Lock-free queue implementation.
13 // Only a single thread can read from the pipe at any specific moment.
14 // Only a single thread can write to the pipe at any specific moment.
15 // T is the type of the object in the queue.
16 // N is granularity of the pipe, i.e. how many items are needed to
17 // perform next memory allocation.
18 
19 template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
20 {
21  public:
22  // Initialises the pipe.
24  {
25  // Insert terminator element into the queue.
26  _queue.push ();
27 
28  // Let all the pointers to point to the terminator.
29  // (unless pipe is dead, in which case c is set to NULL).
30  _r = _w = _f = &_queue.back ();
31  _c.set (&_queue.back ());
32  }
33 
34  // Following function (write) deliberately copies uninitialised data
35  // when used with zmq_msg. Initialising the VSM body for
36  // non-VSM messages won't be good for performance.
37 
38 #ifdef ZMQ_HAVE_OPENVMS
39 #pragma message save
40 #pragma message disable(UNINIT)
41 #endif
42 
43  // Write an item to the pipe. Don't flush it yet. If incomplete is
44  // set to true the item is assumed to be continued by items
45  // subsequently written to the pipe. Incomplete items are never
46  // flushed down the stream.
47  void write (const T &value_, bool incomplete_)
48  {
49  // Place the value to the queue, add new terminator element.
50  _queue.back () = value_;
51  _queue.push ();
52 
53  // Move the "flush up to here" pointer.
54  if (!incomplete_)
55  _f = &_queue.back ();
56  }
57 
58 #ifdef ZMQ_HAVE_OPENVMS
59 #pragma message restore
60 #endif
61 
62  // Pop an incomplete item from the pipe. Returns true if such
63  // item exists, false otherwise.
64  bool unwrite (T *value_)
65  {
66  if (_f == &_queue.back ())
67  return false;
68  _queue.unpush ();
69  *value_ = _queue.back ();
70  return true;
71  }
72 
73  // Flush all the completed items into the pipe. Returns false if
74  // the reader thread is sleeping. In that case, caller is obliged to
75  // wake the reader up before using the pipe again.
76  bool flush ()
77  {
78  // If there are no un-flushed items, do nothing.
79  if (_w == _f)
80  return true;
81 
82  // Try to set 'c' to 'f'.
83  if (_c.cas (_w, _f) != _w) {
84  // Compare-and-swap was unsuccessful because 'c' is NULL.
85  // This means that the reader is asleep. Therefore we don't
86  // care about thread-safeness and update c in non-atomic
87  // manner. We'll return false to let the caller know
88  // that reader is sleeping.
89  _c.set (_f);
90  _w = _f;
91  return false;
92  }
93 
94  // Reader is alive. Nothing special to do now. Just move
95  // the 'first un-flushed item' pointer to 'f'.
96  _w = _f;
97  return true;
98  }
99 
100  // Check whether item is available for reading.
101  bool check_read ()
102  {
103  // Was the value prefetched already? If so, return.
104  if (&_queue.front () != _r && _r)
105  return true;
106 
107  // There's no prefetched value, so let us prefetch more values.
108  // Prefetching is to simply retrieve the
109  // pointer from c in atomic fashion. If there are no
110  // items to prefetch, set c to NULL (using compare-and-swap).
111  _r = _c.cas (&_queue.front (), NULL);
112 
113  // If there are no elements prefetched, exit.
114  // During pipe's lifetime r should never be NULL, however,
115  // it can happen during pipe shutdown when items
116  // are being deallocated.
117  if (&_queue.front () == _r || !_r)
118  return false;
119 
120  // There was at least one value prefetched.
121  return true;
122  }
123 
124  // Reads an item from the pipe. Returns false if there is no value.
125  // available.
126  bool read (T *value_)
127  {
128  // Try to prefetch a value.
129  if (!check_read ())
130  return false;
131 
132  // There was at least one value prefetched.
133  // Return it to the caller.
134  *value_ = _queue.front ();
135  _queue.pop ();
136  return true;
137  }
138 
139  // Applies the function fn to the first element in the pipe
140  // and returns the value returned by the fn.
141  // The pipe mustn't be empty or the function crashes.
142  bool probe (bool (*fn_) (const T &))
143  {
144  const bool rc = check_read ();
145  zmq_assert (rc);
146 
147  return (*fn_) (_queue.front ());
148  }
149 
150  protected:
151  // Allocation-efficient queue to store pipe items.
152  // Front of the queue points to the first prefetched item, back of
153  // the pipe points to last un-flushed item. Front is used only by
154  // reader thread, while back is used only by writer thread.
156 
157  // Points to the first un-flushed item. This variable is used
158  // exclusively by writer thread.
159  T *_w;
160 
161  // Points to the first un-prefetched item. This variable is used
162  // exclusively by reader thread.
163  T *_r;
164 
165  // Points to the first item to be flushed in the future.
166  T *_f;
167 
168  // The single point of contention between writer and reader thread.
169  // Points past the last flushed item. If it is NULL,
170  // reader is asleep. This pointer should be always accessed using
171  // atomic operations.
173 
175 };
176 }
177 
178 #endif
zmq::ZMQ_FINAL::_c
atomic_ptr_t< T > _c
Definition: ypipe.hpp:172
zmq::ZMQ_FINAL::check_read
bool check_read()
Definition: ypipe.hpp:101
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::atomic_ptr_t
Definition: atomic_ptr.hpp:150
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
ypipe_base.hpp
T
#define T(upbtypeconst, upbtype, ctype, default_value)
yqueue.hpp
zmq
Definition: zmq.hpp:229
zmq::ZMQ_FINAL::_r
T * _r
Definition: ypipe.hpp:163
zmq::ZMQ_FINAL::flush
bool flush()
Definition: ypipe.hpp:76
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
zmq::ZMQ_FINAL::read
bool read(T *value_)
Definition: ypipe.hpp:126
value_
int value_
Definition: gmock-matchers_test.cc:571
zmq::ZMQ_FINAL::probe
bool probe(bool(*fn_)(const T &))
Definition: ypipe.hpp:142
zmq::ZMQ_FINAL::_w
T * _w
Definition: ypipe.hpp:159
zmq::ZMQ_FINAL::unwrite
bool unwrite(T *value_)
Definition: ypipe.hpp:64
zmq::ZMQ_FINAL::ypipe_t
ypipe_t()
Definition: ypipe.hpp:23
atomic_ptr.hpp
zmq::ZMQ_FINAL::write
void write(const T &value_, bool incomplete_)
Definition: ypipe.hpp:47
zmq::ZMQ_FINAL::_queue
yqueue_t< T, N > _queue
Definition: ypipe.hpp:155
zmq::ZMQ_FINAL::_f
T * _f
Definition: ypipe.hpp:166
ZMQ_FINAL
Definition: unittest_ip_resolver.cpp:26
zmq::yqueue_t
Definition: yqueue.hpp:36


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02