router.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #include "router.hpp"
6 #include "pipe.hpp"
7 #include "wire.hpp"
8 #include "random.hpp"
9 #include "likely.hpp"
10 #include "err.hpp"
11 
12 zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
13  routing_socket_base_t (parent_, tid_, sid_),
14  _prefetched (false),
15  _routing_id_sent (false),
16  _current_in (NULL),
17  _terminate_current_in (false),
18  _more_in (false),
19  _current_out (NULL),
20  _more_out (false),
21  _next_integral_routing_id (generate_random ()),
22  _mandatory (false),
23  // raw_socket functionality in ROUTER is deprecated
24  _raw_socket (false),
25  _probe_router (false),
26  _handover (false)
27 {
29  options.recv_routing_id = true;
30  options.raw_socket = false;
33 
36 }
37 
39 {
40  zmq_assert (_anonymous_pipes.empty ());
41  _prefetched_id.close ();
42  _prefetched_msg.close ();
43 }
44 
45 void zmq::router_t::xattach_pipe (pipe_t *pipe_,
46  bool subscribe_to_all_,
47  bool locally_initiated_)
48 {
49  LIBZMQ_UNUSED (subscribe_to_all_);
50 
51  zmq_assert (pipe_);
52 
53  if (_probe_router) {
54  msg_t probe_msg;
55  int rc = probe_msg.init ();
56  errno_assert (rc == 0);
57 
58  rc = pipe_->write (&probe_msg);
59  // zmq_assert (rc) is not applicable here, since it is not a bug.
60  LIBZMQ_UNUSED (rc);
61 
62  pipe_->flush ();
63 
64  rc = probe_msg.close ();
65  errno_assert (rc == 0);
66  }
67 
68  const bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
69  if (routing_id_ok)
70  _fq.attach (pipe_);
71  else
72  _anonymous_pipes.insert (pipe_);
73 }
74 
75 int zmq::router_t::xsetsockopt (int option_,
76  const void *optval_,
77  size_t optvallen_)
78 {
79  const bool is_int = (optvallen_ == sizeof (int));
80  int value = 0;
81  if (is_int)
82  memcpy (&value, optval_, sizeof (int));
83 
84  switch (option_) {
85  case ZMQ_ROUTER_RAW:
86  if (is_int && value >= 0) {
87  _raw_socket = (value != 0);
88  if (_raw_socket) {
89  options.recv_routing_id = false;
90  options.raw_socket = true;
91  }
92  return 0;
93  }
94  break;
95 
97  if (is_int && value >= 0) {
98  _mandatory = (value != 0);
99  return 0;
100  }
101  break;
102 
103  case ZMQ_PROBE_ROUTER:
104  if (is_int && value >= 0) {
105  _probe_router = (value != 0);
106  return 0;
107  }
108  break;
109 
110  case ZMQ_ROUTER_HANDOVER:
111  if (is_int && value >= 0) {
112  _handover = (value != 0);
113  return 0;
114  }
115  break;
116 
117 #ifdef ZMQ_BUILD_DRAFT_API
118  case ZMQ_ROUTER_NOTIFY:
119  if (is_int && value >= 0
121  options.router_notify = value;
122  return 0;
123  }
124  break;
125 #endif
126 
127  default:
128  return routing_socket_base_t::xsetsockopt (option_, optval_,
129  optvallen_);
130  }
131  errno = EINVAL;
132  return -1;
133 }
134 
135 
137 {
138  if (0 == _anonymous_pipes.erase (pipe_)) {
139  erase_out_pipe (pipe_);
140  _fq.pipe_terminated (pipe_);
141  pipe_->rollback ();
142  if (pipe_ == _current_out)
143  _current_out = NULL;
144  }
145 }
146 
147 void zmq::router_t::xread_activated (pipe_t *pipe_)
148 {
149  const std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_);
150  if (it == _anonymous_pipes.end ())
151  _fq.activated (pipe_);
152  else {
153  const bool routing_id_ok = identify_peer (pipe_, false);
154  if (routing_id_ok) {
155  _anonymous_pipes.erase (it);
156  _fq.attach (pipe_);
157  }
158  }
159 }
160 
162 {
163  // If this is the first part of the message it's the ID of the
164  // peer to send the message to.
165  if (!_more_out) {
166  zmq_assert (!_current_out);
167 
168  // If we have malformed message (prefix with no subsequent message)
169  // then just silently ignore it.
170  // TODO: The connections should be killed instead.
171  if (msg_->flags () & msg_t::more) {
172  _more_out = true;
173 
174  // Find the pipe associated with the routing id stored in the prefix.
175  // If there's no such pipe just silently ignore the message, unless
176  // router_mandatory is set.
177  out_pipe_t *out_pipe = lookup_out_pipe (
178  blob_t (static_cast<unsigned char *> (msg_->data ()),
179  msg_->size (), zmq::reference_tag_t ()));
180 
181  if (out_pipe) {
182  _current_out = out_pipe->pipe;
183 
184  // Check whether pipe is closed or not
185  if (!_current_out->check_write ()) {
186  // Check whether pipe is full or not
187  const bool pipe_full = !_current_out->check_hwm ();
188  out_pipe->active = false;
189  _current_out = NULL;
190 
191  if (_mandatory) {
192  _more_out = false;
193  if (pipe_full)
194  errno = EAGAIN;
195  else
197  return -1;
198  }
199  }
200  } else if (_mandatory) {
201  _more_out = false;
203  return -1;
204  }
205  }
206 
207  int rc = msg_->close ();
208  errno_assert (rc == 0);
209  rc = msg_->init ();
210  errno_assert (rc == 0);
211  return 0;
212  }
213 
214  // Ignore the MORE flag for raw-sock or assert?
215  if (options.raw_socket)
216  msg_->reset_flags (msg_t::more);
217 
218  // Check whether this is the last part of the message.
219  _more_out = (msg_->flags () & msg_t::more) != 0;
220 
221  // Push the message into the pipe. If there's no out pipe, just drop it.
222  if (_current_out) {
223  // Close the remote connection if user has asked to do so
224  // by sending zero length message.
225  // Pending messages in the pipe will be dropped (on receiving term- ack)
226  if (_raw_socket && msg_->size () == 0) {
227  _current_out->terminate (false);
228  int rc = msg_->close ();
229  errno_assert (rc == 0);
230  rc = msg_->init ();
231  errno_assert (rc == 0);
232  _current_out = NULL;
233  return 0;
234  }
235 
236  const bool ok = _current_out->write (msg_);
237  if (unlikely (!ok)) {
238  // Message failed to send - we must close it ourselves.
239  const int rc = msg_->close ();
240  errno_assert (rc == 0);
241  // HWM was checked before, so the pipe must be gone. Roll back
242  // messages that were piped, for example REP labels.
243  _current_out->rollback ();
244  _current_out = NULL;
245  } else {
246  if (!_more_out) {
247  _current_out->flush ();
248  _current_out = NULL;
249  }
250  }
251  } else {
252  const int rc = msg_->close ();
253  errno_assert (rc == 0);
254  }
255 
256  // Detach the message from the data buffer.
257  const int rc = msg_->init ();
258  errno_assert (rc == 0);
259 
260  return 0;
261 }
262 
264 {
265  if (_prefetched) {
266  if (!_routing_id_sent) {
267  const int rc = msg_->move (_prefetched_id);
268  errno_assert (rc == 0);
269  _routing_id_sent = true;
270  } else {
271  const int rc = msg_->move (_prefetched_msg);
272  errno_assert (rc == 0);
273  _prefetched = false;
274  }
275  _more_in = (msg_->flags () & msg_t::more) != 0;
276 
277  if (!_more_in) {
278  if (_terminate_current_in) {
279  _current_in->terminate (true);
280  _terminate_current_in = false;
281  }
282  _current_in = NULL;
283  }
284  return 0;
285  }
286 
287  pipe_t *pipe = NULL;
288  int rc = _fq.recvpipe (msg_, &pipe);
289 
290  // It's possible that we receive peer's routing id. That happens
291  // after reconnection. The current implementation assumes that
292  // the peer always uses the same routing id.
293  while (rc == 0 && msg_->is_routing_id ())
294  rc = _fq.recvpipe (msg_, &pipe);
295 
296  if (rc != 0)
297  return -1;
298 
299  zmq_assert (pipe != NULL);
300 
301  // If we are in the middle of reading a message, just return the next part.
302  if (_more_in) {
303  _more_in = (msg_->flags () & msg_t::more) != 0;
304 
305  if (!_more_in) {
306  if (_terminate_current_in) {
307  _current_in->terminate (true);
308  _terminate_current_in = false;
309  }
310  _current_in = NULL;
311  }
312  } else {
313  // We are at the beginning of a message.
314  // Keep the message part we have in the prefetch buffer
315  // and return the ID of the peer instead.
316  rc = _prefetched_msg.move (*msg_);
317  errno_assert (rc == 0);
318  _prefetched = true;
319  _current_in = pipe;
320 
321  const blob_t &routing_id = pipe->get_routing_id ();
322  rc = msg_->init_size (routing_id.size ());
323  errno_assert (rc == 0);
324  memcpy (msg_->data (), routing_id.data (), routing_id.size ());
325  msg_->set_flags (msg_t::more);
326  if (_prefetched_msg.metadata ())
327  msg_->set_metadata (_prefetched_msg.metadata ());
328  _routing_id_sent = true;
329  }
330 
331  return 0;
332 }
333 
335 {
336  if (_current_out) {
337  _current_out->rollback ();
338  _current_out = NULL;
339  _more_out = false;
340  }
341  return 0;
342 }
343 
345 {
346  // If we are in the middle of reading the messages, there are
347  // definitely more parts available.
348  if (_more_in)
349  return true;
350 
351  // We may already have a message pre-fetched.
352  if (_prefetched)
353  return true;
354 
355  // Try to read the next message.
356  // The message, if read, is kept in the pre-fetch buffer.
357  pipe_t *pipe = NULL;
358  int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
359 
360  // It's possible that we receive peer's routing id. That happens
361  // after reconnection. The current implementation assumes that
362  // the peer always uses the same routing id.
363  // TODO: handle the situation when the peer changes its routing id.
364  while (rc == 0 && _prefetched_msg.is_routing_id ())
365  rc = _fq.recvpipe (&_prefetched_msg, &pipe);
366 
367  if (rc != 0)
368  return false;
369 
370  zmq_assert (pipe != NULL);
371 
372  const blob_t &routing_id = pipe->get_routing_id ();
373  rc = _prefetched_id.init_size (routing_id.size ());
374  errno_assert (rc == 0);
375  memcpy (_prefetched_id.data (), routing_id.data (), routing_id.size ());
376  _prefetched_id.set_flags (msg_t::more);
377  if (_prefetched_msg.metadata ())
378  _prefetched_id.set_metadata (_prefetched_msg.metadata ());
379 
380  _prefetched = true;
381  _routing_id_sent = false;
382  _current_in = pipe;
383 
384  return true;
385 }
386 
387 static bool check_pipe_hwm (const zmq::pipe_t &pipe_)
388 {
389  return pipe_.check_hwm ();
390 }
391 
393 {
394  // In theory, ROUTER socket is always ready for writing (except when
395  // MANDATORY is set). Whether actual attempt to write succeeds depends
396  // on which pipe the message is going to be routed to.
397 
398  if (!_mandatory)
399  return true;
400 
401  return any_of_out_pipes (check_pipe_hwm);
402 }
403 
404 int zmq::router_t::get_peer_state (const void *routing_id_,
405  size_t routing_id_size_) const
406 {
407  int res = 0;
408 
409  // TODO remove the const_cast, see comment in lookup_out_pipe
410  const blob_t routing_id_blob (
411  static_cast<unsigned char *> (const_cast<void *> (routing_id_)),
412  routing_id_size_, reference_tag_t ());
413  const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob);
414  if (!out_pipe) {
416  return -1;
417  }
418 
419  if (out_pipe->pipe->check_hwm ())
420  res |= ZMQ_POLLOUT;
421 
424  return res;
425 }
426 
427 bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
428 {
429  msg_t msg;
430  blob_t routing_id;
431 
432  if (locally_initiated_ && connect_routing_id_is_set ()) {
433  const std::string connect_routing_id = extract_connect_routing_id ();
434  routing_id.set (
435  reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
436  connect_routing_id.length ());
437  // Not allowed to duplicate an existing rid
438  zmq_assert (!has_out_pipe (routing_id));
439  } else if (
440  options
441  .raw_socket) { // Always assign an integral routing id for raw-socket
442  unsigned char buf[5];
443  buf[0] = 0;
444  put_uint32 (buf + 1, _next_integral_routing_id++);
445  routing_id.set (buf, sizeof buf);
446  } else if (!options.raw_socket) {
447  // Pick up handshake cases and also case where next integral routing id is set
448  msg.init ();
449  const bool ok = pipe_->read (&msg);
450  if (!ok)
451  return false;
452 
453  if (msg.size () == 0) {
454  // Fall back on the auto-generation
455  unsigned char buf[5];
456  buf[0] = 0;
457  put_uint32 (buf + 1, _next_integral_routing_id++);
458  routing_id.set (buf, sizeof buf);
459  msg.close ();
460  } else {
461  routing_id.set (static_cast<unsigned char *> (msg.data ()),
462  msg.size ());
463  msg.close ();
464 
465  // Try to remove an existing routing id entry to allow the new
466  // connection to take the routing id.
467  const out_pipe_t *const existing_outpipe =
468  lookup_out_pipe (routing_id);
469 
470  if (existing_outpipe) {
471  if (!_handover)
472  // Ignore peers with duplicate ID
473  return false;
474 
475  // We will allow the new connection to take over this
476  // routing id. Temporarily assign a new routing id to the
477  // existing pipe so we can terminate it asynchronously.
478  unsigned char buf[5];
479  buf[0] = 0;
480  put_uint32 (buf + 1, _next_integral_routing_id++);
481  blob_t new_routing_id (buf, sizeof buf);
482 
483  pipe_t *const old_pipe = existing_outpipe->pipe;
484 
485  erase_out_pipe (old_pipe);
486  old_pipe->set_router_socket_routing_id (new_routing_id);
487  add_out_pipe (ZMQ_MOVE (new_routing_id), old_pipe);
488 
489  if (old_pipe == _current_in)
490  _terminate_current_in = true;
491  else
492  old_pipe->terminate (true);
493  }
494  }
495  }
496 
497  pipe_->set_router_socket_routing_id (routing_id);
498  add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
499 
500  return true;
501 }
zmq::router_t::get_peer_state
int get_peer_state(const void *routing_id_, size_t routing_id_size_) const ZMQ_FINAL
Definition: router.cpp:404
zmq::router_t::xsend
int xsend(zmq::msg_t *msg_) ZMQ_OVERRIDE
Definition: router.cpp:161
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::options_t::raw_socket
bool raw_socket
Definition: options.hpp:136
zmq::put_uint32
void put_uint32(unsigned char *buffer_, uint32_t value_)
Definition: wire.hpp:35
zmq::routing_socket_base_t::xsetsockopt
int xsetsockopt(int option_, const void *optval_, size_t optvallen_) ZMQ_OVERRIDE
Definition: socket_base.cpp:2062
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmq::router_t::~router_t
~router_t() ZMQ_OVERRIDE
Definition: router.cpp:38
EINVAL
#define EINVAL
Definition: errno.hpp:25
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
check_pipe_hwm
static bool check_pipe_hwm(const zmq::pipe_t &pipe_)
Definition: router.cpp:387
zmq::router_t::rollback
int rollback()
Definition: router.cpp:334
zmq::router_t::router_t
router_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: router.cpp:12
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
random.hpp
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
zmq::router_t::_prefetched_msg
msg_t _prefetched_msg
Definition: router.hpp:64
errno
int errno
zmq::router_t::xhas_in
bool xhas_in() ZMQ_OVERRIDE
Definition: router.cpp:344
zmq::msg_t::move
int move(msg_t &src_)
Definition: msg.cpp:305
zmq::msg_t::init_size
int init_size(size_t size_)
Definition: msg.cpp:62
zmq::router_t::xattach_pipe
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_, bool locally_initiated_) ZMQ_FINAL
Definition: router.cpp:45
ok
ROSCPP_DECL bool ok()
wire.hpp
router.hpp
raw_socket
int raw_socket
Definition: test_heartbeats.cpp:13
zmq::options_t::can_send_hello_msg
bool can_send_hello_msg
Definition: options.hpp:279
ZMQ_POLLOUT
#define ZMQ_POLLOUT
Definition: zmq.h:483
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
ZMQ_ROUTER_HANDOVER
#define ZMQ_ROUTER_HANDOVER
Definition: zmq.h:321
zmq::msg_t::is_routing_id
bool is_routing_id() const
Definition: msg.cpp:466
macros.hpp
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
zmq::msg_t::close
int close()
Definition: msg.cpp:242
ZMQ_ROUTER
#define ZMQ_ROUTER
Definition: zmq.h:264
zmq::blob_t::data
const unsigned char * data() const
Definition: blob.hpp:86
zmq::own_t::options
options_t options
Definition: own.hpp:78
zmq::router_t::xsetsockopt
int xsetsockopt(int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL
Definition: router.cpp:75
zmq::blob_t::size
size_t size() const
Definition: blob.hpp:83
zmq::routing_socket_base_t::out_pipe_t
Definition: socket_base.hpp:349
pipe.hpp
zmq::blob_t::set
void set(const unsigned char *const data_, const size_t size_)
Definition: blob.hpp:113
zmq::msg_t::set_metadata
void set_metadata(metadata_t *metadata_)
Definition: msg.cpp:448
ZMQ_ROUTER_RAW
#define ZMQ_ROUTER_RAW
Definition: zmq.h:306
zmq::routing_socket_base_t
Definition: socket_base.hpp:333
zmq::options_t::recv_routing_id
bool recv_routing_id
Definition: options.hpp:133
zmq::router_t::xhas_out
bool xhas_out() ZMQ_OVERRIDE
Definition: router.cpp:392
buf
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition: glcorearb.h:4175
zmq::router_t::identify_peer
bool identify_peer(pipe_t *pipe_, bool locally_initiated_)
Definition: router.cpp:427
ZMQ_NOTIFY_DISCONNECT
#define ZMQ_NOTIFY_DISCONNECT
Definition: zmq_draft.h:102
zmq::msg_t::init
int init()
Definition: msg.cpp:50
ZMQ_ROUTER_MANDATORY
#define ZMQ_ROUTER_MANDATORY
Definition: zmq.h:299
zmq::reference_tag_t
Definition: blob.hpp:38
zmq::blob_t
Definition: blob.hpp:46
ZMQ_NOTIFY_CONNECT
#define ZMQ_NOTIFY_CONNECT
Definition: zmq_draft.h:101
zmq::router_t::xrecv
int xrecv(zmq::msg_t *msg_) ZMQ_OVERRIDE
Definition: router.cpp:263
zmq::msg_t::more
@ more
Definition: msg.hpp:55
zmq::router_t::_prefetched_id
msg_t _prefetched_id
Definition: router.hpp:61
zmq::msg_t::reset_flags
void reset_flags(unsigned char flags_)
Definition: msg.cpp:438
ZMQ_PROBE_ROUTER
#define ZMQ_PROBE_ROUTER
Definition: zmq.h:316
zmq::generate_random
uint32_t generate_random()
Definition: random.cpp:30
err.hpp
zmq::routing_socket_base_t::out_pipe_t::active
bool active
Definition: socket_base.hpp:352
likely.hpp
zmq::routing_socket_base_t::out_pipe_t::pipe
pipe_t * pipe
Definition: socket_base.hpp:351
ZMQ_MOVE
#define ZMQ_MOVE(x)
Definition: blob.hpp:33
EHOSTUNREACH
#define EHOSTUNREACH
Definition: zmq.h:152
ZMQ_ROUTER_NOTIFY
#define ZMQ_ROUTER_NOTIFY
Definition: zmq_draft.h:29
zmq::options_t::can_recv_disconnect_msg
bool can_recv_disconnect_msg
Definition: options.hpp:283
value
GLsizei const GLfloat * value
Definition: glcorearb.h:3093
zmq::msg_t::size
unsigned char size
Definition: msg.hpp:240
zmq::msg_t::data
unsigned char data[max_vsm_size]
Definition: msg.hpp:239
zmq::options_t::type
int8_t type
Definition: options.hpp:80
false
#define false
Definition: cJSON.c:70
zmq::msg_t::set_flags
void set_flags(unsigned char flags_)
Definition: msg.cpp:433
zmq::router_t::xread_activated
void xread_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: router.cpp:147
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
zmq::msg_t
Definition: msg.hpp:33
unlikely
#define unlikely(x)
Definition: likely.hpp:11
zmq::router_t::xpipe_terminated
void xpipe_terminated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: router.cpp:136


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58