dist.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "dist.hpp"
5 #include "pipe.hpp"
6 #include "err.hpp"
7 #include "msg.hpp"
8 #include "likely.hpp"
9 
11  _matching (0), _active (0), _eligible (0), _more (false)
12 {
13 }
14 
16 {
17  zmq_assert (_pipes.empty ());
18 }
19 
20 void zmq::dist_t::attach (pipe_t *pipe_)
21 {
22  // If we are in the middle of sending a message, we'll add new pipe
23  // into the list of eligible pipes. Otherwise we add it to the list
24  // of active pipes.
25  if (_more) {
26  _pipes.push_back (pipe_);
27  _pipes.swap (_eligible, _pipes.size () - 1);
28  _eligible++;
29  } else {
30  _pipes.push_back (pipe_);
31  _pipes.swap (_active, _pipes.size () - 1);
32  _active++;
33  _eligible++;
34  }
35 }
36 
37 bool zmq::dist_t::has_pipe (pipe_t *pipe_)
38 {
39  std::size_t claimed_index = _pipes.index (pipe_);
40 
41  // If pipe claims to be outside the available index space it can't be in the distributor.
42  if (claimed_index >= _pipes.size ()) {
43  return false;
44  }
45 
46  return _pipes[claimed_index] == pipe_;
47 }
48 
49 void zmq::dist_t::match (pipe_t *pipe_)
50 {
51  // If pipe is already matching do nothing.
52  if (_pipes.index (pipe_) < _matching)
53  return;
54 
55  // If the pipe isn't eligible, ignore it.
56  if (_pipes.index (pipe_) >= _eligible)
57  return;
58 
59  // Mark the pipe as matching.
60  _pipes.swap (_pipes.index (pipe_), _matching);
61  _matching++;
62 }
63 
65 {
66  const pipes_t::size_type prev_matching = _matching;
67 
68  // Reset matching to 0
69  unmatch ();
70 
71  // Mark all matching pipes as not matching and vice-versa.
72  // To do this, push all pipes that are eligible but not
73  // matched - i.e. between "matching" and "eligible" -
74  // to the beginning of the queue.
75  for (pipes_t::size_type i = prev_matching; i < _eligible; ++i) {
76  _pipes.swap (i, _matching++);
77  }
78 }
79 
81 {
82  _matching = 0;
83 }
84 
85 void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
86 {
87  // Remove the pipe from the list; adjust number of matching, active and/or
88  // eligible pipes accordingly.
89  if (_pipes.index (pipe_) < _matching) {
90  _pipes.swap (_pipes.index (pipe_), _matching - 1);
91  _matching--;
92  }
93  if (_pipes.index (pipe_) < _active) {
94  _pipes.swap (_pipes.index (pipe_), _active - 1);
95  _active--;
96  }
97  if (_pipes.index (pipe_) < _eligible) {
98  _pipes.swap (_pipes.index (pipe_), _eligible - 1);
99  _eligible--;
100  }
101 
102  _pipes.erase (pipe_);
103 }
104 
105 void zmq::dist_t::activated (pipe_t *pipe_)
106 {
107  // Move the pipe from passive to eligible state.
108  if (_eligible < _pipes.size ()) {
109  _pipes.swap (_pipes.index (pipe_), _eligible);
110  _eligible++;
111  }
112 
113  // If there's no message being sent at the moment, move it to
114  // the active state.
115  if (!_more && _active < _pipes.size ()) {
116  _pipes.swap (_eligible - 1, _active);
117  _active++;
118  }
119 }
120 
122 {
123  _matching = _active;
124  return send_to_matching (msg_);
125 }
126 
128 {
129  // Is this end of a multipart message?
130  const bool msg_more = (msg_->flags () & msg_t::more) != 0;
131 
132  // Push the message to matching pipes.
133  distribute (msg_);
134 
135  // If multipart message is fully sent, activate all the eligible pipes.
136  if (!msg_more)
137  _active = _eligible;
138 
139  _more = msg_more;
140 
141  return 0;
142 }
143 
145 {
146  // If there are no matching pipes available, simply drop the message.
147  if (_matching == 0) {
148  int rc = msg_->close ();
149  errno_assert (rc == 0);
150  rc = msg_->init ();
151  errno_assert (rc == 0);
152  return;
153  }
154 
155  if (msg_->is_vsm ()) {
156  for (pipes_t::size_type i = 0; i < _matching;) {
157  if (!write (_pipes[i], msg_)) {
158  // Use same index again because entry will have been removed.
159  } else {
160  ++i;
161  }
162  }
163  int rc = msg_->init ();
164  errno_assert (rc == 0);
165  return;
166  }
167 
168  // Add matching-1 references to the message. We already hold one reference,
169  // that's why -1.
170  msg_->add_refs (static_cast<int> (_matching) - 1);
171 
172  // Push copy of the message to each matching pipe.
173  int failed = 0;
174  for (pipes_t::size_type i = 0; i < _matching;) {
175  if (!write (_pipes[i], msg_)) {
176  ++failed;
177  // Use same index again because entry will have been removed.
178  } else {
179  ++i;
180  }
181  }
182  if (unlikely (failed))
183  msg_->rm_refs (failed);
184 
185  // Detach the original message from the data buffer. Note that we don't
186  // close the message. That's because we've already used all the references.
187  const int rc = msg_->init ();
188  errno_assert (rc == 0);
189 }
190 
192 {
193  return true;
194 }
195 
196 bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
197 {
198  if (!pipe_->write (msg_)) {
199  _pipes.swap (_pipes.index (pipe_), _matching - 1);
200  _matching--;
201  _pipes.swap (_pipes.index (pipe_), _active - 1);
202  _active--;
203  _pipes.swap (_active, _eligible - 1);
204  _eligible--;
205  return false;
206  }
207  if (!(msg_->flags () & msg_t::more))
208  pipe_->flush ();
209  return true;
210 }
211 
213 {
214  for (pipes_t::size_type i = 0; i < _matching; ++i)
215  if (!_pipes[i]->check_hwm ())
216  return false;
217 
218  return true;
219 }
zmq::dist_t::activated
void activated(zmq::pipe_t *pipe_)
Definition: dist.cpp:105
zmq::msg_t::is_vsm
bool is_vsm() const
Definition: msg.cpp:481
zmq::dist_t::send_to_matching
int send_to_matching(zmq::msg_t *msg_)
Definition: dist.cpp:127
zmq::dist_t::attach
void attach(zmq::pipe_t *pipe_)
Definition: dist.cpp:20
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
dist.hpp
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
zmq::msg_t::close
int close()
Definition: msg.cpp:242
zmq::msg_t::add_refs
void add_refs(int refs_)
Definition: msg.cpp:561
pipe.hpp
zmq::dist_t::send_to_all
int send_to_all(zmq::msg_t *msg_)
Definition: dist.cpp:121
zmq::dist_t::write
bool write(zmq::pipe_t *pipe_, zmq::msg_t *msg_)
Definition: dist.cpp:196
zmq::dist_t::dist_t
dist_t()
Definition: dist.cpp:10
zmq::dist_t::has_out
static bool has_out()
Definition: dist.cpp:191
zmq::dist_t::unmatch
void unmatch()
Definition: dist.cpp:80
zmq::dist_t::check_hwm
bool check_hwm()
Definition: dist.cpp:212
zmq::array_t< zmq::pipe_t, 2 >::size_type
std::vector< zmq::pipe_t * >::size_type size_type
Definition: array.hpp:52
zmq::dist_t::reverse_match
void reverse_match()
Definition: dist.cpp:64
zmq::msg_t::init
int init()
Definition: msg.cpp:50
i
int i
Definition: gmock-matchers_test.cc:764
msg.hpp
zmq::msg_t::more
@ more
Definition: msg.hpp:55
zmq::dist_t::match
void match(zmq::pipe_t *pipe_)
Definition: dist.cpp:49
zmq::dist_t::pipe_terminated
void pipe_terminated(zmq::pipe_t *pipe_)
Definition: dist.cpp:85
err.hpp
likely.hpp
zmq::msg_t::rm_refs
bool rm_refs(int refs_)
Definition: msg.cpp:584
zmq::dist_t::~dist_t
~dist_t()
Definition: dist.cpp:15
zmq::dist_t::distribute
void distribute(zmq::msg_t *msg_)
Definition: dist.cpp:144
false
#define false
Definition: cJSON.c:70
zmq::msg_t
Definition: msg.hpp:33
zmq::dist_t::has_pipe
bool has_pipe(zmq::pipe_t *pipe_)
Definition: dist.cpp:37
unlikely
#define unlikely(x)
Definition: likely.hpp:11


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:50