lb.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "lb.hpp"
5 #include "pipe.hpp"
6 #include "err.hpp"
7 #include "msg.hpp"
8 
9 zmq::lb_t::lb_t () : _active (0), _current (0), _more (false), _dropping (false)
10 {
11 }
12 
14 {
15  zmq_assert (_pipes.empty ());
16 }
17 
18 void zmq::lb_t::attach (pipe_t *pipe_)
19 {
20  _pipes.push_back (pipe_);
21  activated (pipe_);
22 }
23 
24 void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
25 {
26  const pipes_t::size_type index = _pipes.index (pipe_);
27 
28  // If we are in the middle of multipart message and current pipe
29  // have disconnected, we have to drop the remainder of the message.
30  if (index == _current && _more)
31  _dropping = true;
32 
33  // Remove the pipe from the list; adjust number of active pipes
34  // accordingly.
35  if (index < _active) {
36  _active--;
37  _pipes.swap (index, _active);
38  if (_current == _active)
39  _current = 0;
40  }
41  _pipes.erase (pipe_);
42 }
43 
44 void zmq::lb_t::activated (pipe_t *pipe_)
45 {
46  // Move the pipe to the list of active pipes.
47  _pipes.swap (_pipes.index (pipe_), _active);
48  _active++;
49 }
50 
52 {
53  return sendpipe (msg_, NULL);
54 }
55 
56 int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
57 {
58  // Drop the message if required. If we are at the end of the message
59  // switch back to non-dropping mode.
60  if (_dropping) {
61  _more = (msg_->flags () & msg_t::more) != 0;
62  _dropping = _more;
63 
64  int rc = msg_->close ();
65  errno_assert (rc == 0);
66  rc = msg_->init ();
67  errno_assert (rc == 0);
68  return 0;
69  }
70 
71  while (_active > 0) {
72  if (_pipes[_current]->write (msg_)) {
73  if (pipe_)
74  *pipe_ = _pipes[_current];
75  break;
76  }
77 
78  // If send fails for multi-part msg rollback other
79  // parts sent earlier and return EAGAIN.
80  // Application should handle this as suitable
81  if (_more) {
82  _pipes[_current]->rollback ();
83  // At this point the pipe is already being deallocated
84  // and the first N frames are unreachable (_outpipe is
85  // most likely already NULL so rollback won't actually do
86  // anything and they can't be un-written to deliver later).
87  // Return EFAULT to socket_base caller to drop current message
88  // and any other subsequent frames to avoid them being
89  // "stuck" and received when a new client reconnects, which
90  // would break atomicity of multi-part messages (in blocking mode
91  // socket_base just tries again and again to send the same message)
92  // Note that given dropping mode returns 0, the user will
93  // never know that the message could not be delivered, but
94  // can't really fix it without breaking backward compatibility.
95  // -2/EAGAIN will make sure socket_base caller does not re-enter
96  // immediately or after a short sleep in blocking mode.
97  _dropping = (msg_->flags () & msg_t::more) != 0;
98  _more = false;
99  errno = EAGAIN;
100  return -2;
101  }
102 
103  _active--;
104  if (_current < _active)
105  _pipes.swap (_current, _active);
106  else
107  _current = 0;
108  }
109 
110  // If there are no pipes we cannot send the message.
111  if (_active == 0) {
112  errno = EAGAIN;
113  return -1;
114  }
115 
116  // If it's final part of the message we can flush it downstream and
117  // continue round-robining (load balance).
118  _more = (msg_->flags () & msg_t::more) != 0;
119  if (!_more) {
120  _pipes[_current]->flush ();
121 
122  if (++_current >= _active)
123  _current = 0;
124  }
125 
126  // Detach the message from the data buffer.
127  const int rc = msg_->init ();
128  errno_assert (rc == 0);
129 
130  return 0;
131 }
132 
134 {
135  // If one part of the message was already written we can definitely
136  // write the rest of the message.
137  if (_more)
138  return true;
139 
140  while (_active > 0) {
141  // Check whether a pipe has room for another message.
142  if (_pipes[_current]->check_write ())
143  return true;
144 
145  // Deactivate the pipe.
146  _active--;
147  _pipes.swap (_current, _active);
148  if (_current == _active)
149  _current = 0;
150  }
151 
152  return false;
153 }
zmq::lb_t::has_out
bool has_out()
Definition: lb.cpp:133
zmq::lb_t::attach
void attach(pipe_t *pipe_)
Definition: lb.cpp:18
lb.hpp
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::lb_t::sendpipe
int sendpipe(msg_t *msg_, pipe_t **pipe_)
Definition: lb.cpp:56
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
errno
int errno
zmq::lb_t::send
int send(msg_t *msg_)
Definition: lb.cpp:51
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
zmq::lb_t::lb_t
lb_t()
Definition: lb.cpp:9
zmq::msg_t::close
int close()
Definition: msg.cpp:242
pipe.hpp
zmq::array_t< pipe_t, 2 >::size_type
std::vector< pipe_t * >::size_type size_type
Definition: array.hpp:52
zmq::msg_t::init
int init()
Definition: msg.cpp:50
msg.hpp
zmq::lb_t::~lb_t
~lb_t()
Definition: lb.cpp:13
zmq::msg_t::more
@ more
Definition: msg.hpp:55
err.hpp
zmq::lb_t::pipe_terminated
void pipe_terminated(pipe_t *pipe_)
Definition: lb.cpp:24
false
#define false
Definition: cJSON.c:70
index
GLuint index
Definition: glcorearb.h:3055
zmq::msg_t
Definition: msg.hpp:33
zmq::lb_t::activated
void activated(pipe_t *pipe_)
Definition: lb.cpp:44


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:55