fq.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "fq.hpp"
5 #include "pipe.hpp"
6 #include "err.hpp"
7 #include "msg.hpp"
8 
9 zmq::fq_t::fq_t () : _active (0), _current (0), _more (false)
10 {
11 }
12 
14 {
15  zmq_assert (_pipes.empty ());
16 }
17 
18 void zmq::fq_t::attach (pipe_t *pipe_)
19 {
20  _pipes.push_back (pipe_);
21  _pipes.swap (_active, _pipes.size () - 1);
22  _active++;
23 }
24 
25 void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
26 {
27  const pipes_t::size_type index = _pipes.index (pipe_);
28 
29  // Remove the pipe from the list; adjust number of active pipes
30  // accordingly.
31  if (index < _active) {
32  _active--;
33  _pipes.swap (index, _active);
34  if (_current == _active)
35  _current = 0;
36  }
37  _pipes.erase (pipe_);
38 }
39 
40 void zmq::fq_t::activated (pipe_t *pipe_)
41 {
42  // Move the pipe to the list of active pipes.
43  _pipes.swap (_pipes.index (pipe_), _active);
44  _active++;
45 }
46 
48 {
49  return recvpipe (msg_, NULL);
50 }
51 
52 int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
53 {
54  // Deallocate old content of the message.
55  int rc = msg_->close ();
56  errno_assert (rc == 0);
57 
58  // Round-robin over the pipes to get the next message.
59  while (_active > 0) {
60  // Try to fetch new message. If we've already read part of the message
61  // subsequent part should be immediately available.
62  const bool fetched = _pipes[_current]->read (msg_);
63 
64  // Note that when message is not fetched, current pipe is deactivated
65  // and replaced by another active pipe. Thus we don't have to increase
66  // the 'current' pointer.
67  if (fetched) {
68  if (pipe_)
69  *pipe_ = _pipes[_current];
70  _more = (msg_->flags () & msg_t::more) != 0;
71  if (!_more) {
72  _current = (_current + 1) % _active;
73  }
74  return 0;
75  }
76 
77  // Check the atomicity of the message.
78  // If we've already received the first part of the message
79  // we should get the remaining parts without blocking.
80  zmq_assert (!_more);
81 
82  _active--;
83  _pipes.swap (_current, _active);
84  if (_current == _active)
85  _current = 0;
86  }
87 
88  // No message is available. Initialise the output parameter
89  // to be a 0-byte message.
90  rc = msg_->init ();
91  errno_assert (rc == 0);
92  errno = EAGAIN;
93  return -1;
94 }
95 
97 {
98  // There are subsequent parts of the partly-read message available.
99  if (_more)
100  return true;
101 
102  // Note that messing with current doesn't break the fairness of fair
103  // queueing algorithm. If there are no messages available current will
104  // get back to its original value. Otherwise it'll point to the first
105  // pipe holding messages, skipping only pipes with no messages available.
106  while (_active > 0) {
107  if (_pipes[_current]->check_read ())
108  return true;
109 
110  // Deactivate the pipe.
111  _active--;
112  _pipes.swap (_current, _active);
113  if (_current == _active)
114  _current = 0;
115  }
116 
117  return false;
118 }
NULL
NULL
Definition: test_security_zap.cpp:405
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
errno
int errno
zmq::fq_t::~fq_t
~fq_t()
Definition: fq.cpp:13
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
zmq::fq_t::fq_t
fq_t()
Definition: fq.cpp:9
zmq::fq_t::activated
void activated(pipe_t *pipe_)
Definition: fq.cpp:40
zmq::msg_t::close
int close()
Definition: msg.cpp:242
zmq::fq_t::attach
void attach(pipe_t *pipe_)
Definition: fq.cpp:18
zmq::fq_t::pipe_terminated
void pipe_terminated(pipe_t *pipe_)
Definition: fq.cpp:25
pipe.hpp
zmq::array_t< pipe_t, 1 >::size_type
std::vector< pipe_t * >::size_type size_type
Definition: array.hpp:52
fq.hpp
zmq::msg_t::init
int init()
Definition: msg.cpp:50
msg.hpp
zmq::fq_t::has_in
bool has_in()
Definition: fq.cpp:96
zmq::fq_t::recvpipe
int recvpipe(msg_t *msg_, pipe_t **pipe_)
Definition: fq.cpp:52
zmq::msg_t::more
@ more
Definition: msg.hpp:55
zmq::fq_t::recv
int recv(msg_t *msg_)
Definition: fq.cpp:47
err.hpp
false
#define false
Definition: cJSON.c:70
index
GLuint index
Definition: glcorearb.h:3055
zmq::msg_t
Definition: msg.hpp:33


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:51