pipe.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <new>
5 #include <stddef.h>
6 
7 #include "macros.hpp"
8 #include "pipe.hpp"
9 #include "err.hpp"
10 
11 #include "ypipe.hpp"
12 #include "ypipe_conflate.hpp"
13 
14 int zmq::pipepair (object_t *parents_[2],
15  pipe_t *pipes_[2],
16  const int hwms_[2],
17  const bool conflate_[2])
18 {
19  // Creates two pipe objects. These objects are connected by two ypipes,
20  // each to pass messages in one direction.
21 
22  typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t;
23  typedef ypipe_conflate_t<msg_t> upipe_conflate_t;
24 
25  pipe_t::upipe_t *upipe1;
26  if (conflate_[0])
27  upipe1 = new (std::nothrow) upipe_conflate_t ();
28  else
29  upipe1 = new (std::nothrow) upipe_normal_t ();
30  alloc_assert (upipe1);
31 
32  pipe_t::upipe_t *upipe2;
33  if (conflate_[1])
34  upipe2 = new (std::nothrow) upipe_conflate_t ();
35  else
36  upipe2 = new (std::nothrow) upipe_normal_t ();
37  alloc_assert (upipe2);
38 
39  pipes_[0] = new (std::nothrow)
40  pipe_t (parents_[0], upipe1, upipe2, hwms_[1], hwms_[0], conflate_[0]);
41  alloc_assert (pipes_[0]);
42  pipes_[1] = new (std::nothrow)
43  pipe_t (parents_[1], upipe2, upipe1, hwms_[0], hwms_[1], conflate_[1]);
44  alloc_assert (pipes_[1]);
45 
46  pipes_[0]->set_peer (pipes_[1]);
47  pipes_[1]->set_peer (pipes_[0]);
48 
49  return 0;
50 }
51 
52 void zmq::send_routing_id (pipe_t *pipe_, const options_t &options_)
53 {
54  zmq::msg_t id;
55  const int rc = id.init_size (options_.routing_id_size);
56  errno_assert (rc == 0);
57  memcpy (id.data (), options_.routing_id, options_.routing_id_size);
58  id.set_flags (zmq::msg_t::routing_id);
59  const bool written = pipe_->write (&id);
60  zmq_assert (written);
61  pipe_->flush ();
62 }
63 
64 void zmq::send_hello_msg (pipe_t *pipe_, const options_t &options_)
65 {
66  zmq::msg_t hello;
67  const int rc =
68  hello.init_buffer (&options_.hello_msg[0], options_.hello_msg.size ());
69  errno_assert (rc == 0);
70  const bool written = pipe_->write (&hello);
71  zmq_assert (written);
72  pipe_->flush ();
73 }
74 
75 zmq::pipe_t::pipe_t (object_t *parent_,
76  upipe_t *inpipe_,
77  upipe_t *outpipe_,
78  int inhwm_,
79  int outhwm_,
80  bool conflate_) :
81  object_t (parent_),
82  _in_pipe (inpipe_),
83  _out_pipe (outpipe_),
84  _in_active (true),
85  _out_active (true),
86  _hwm (outhwm_),
87  _lwm (compute_lwm (inhwm_)),
88  _in_hwm_boost (-1),
89  _out_hwm_boost (-1),
90  _msgs_read (0),
91  _msgs_written (0),
92  _peers_msgs_read (0),
93  _peer (NULL),
94  _sink (NULL),
95  _state (active),
96  _delay (true),
97  _server_socket_routing_id (0),
98  _conflate (conflate_)
99 {
100  _disconnect_msg.init ();
101 }
102 
103 zmq::pipe_t::~pipe_t ()
104 {
105  _disconnect_msg.close ();
106 }
107 
108 void zmq::pipe_t::set_peer (pipe_t *peer_)
109 {
110  // Peer can be set once only.
111  zmq_assert (!_peer);
112  _peer = peer_;
113 }
114 
115 void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
116 {
117  // Sink can be set once only.
118  zmq_assert (!_sink);
119  _sink = sink_;
120 }
121 
122 void zmq::pipe_t::set_server_socket_routing_id (
123  uint32_t server_socket_routing_id_)
124 {
125  _server_socket_routing_id = server_socket_routing_id_;
126 }
127 
128 uint32_t zmq::pipe_t::get_server_socket_routing_id () const
129 {
130  return _server_socket_routing_id;
131 }
132 
133 void zmq::pipe_t::set_router_socket_routing_id (
134  const blob_t &router_socket_routing_id_)
135 {
136  _router_socket_routing_id.set_deep_copy (router_socket_routing_id_);
137 }
138 
139 const zmq::blob_t &zmq::pipe_t::get_routing_id () const
140 {
141  return _router_socket_routing_id;
142 }
143 
144 bool zmq::pipe_t::check_read ()
145 {
146  if (unlikely (!_in_active))
147  return false;
148  if (unlikely (_state != active && _state != waiting_for_delimiter))
149  return false;
150 
151  // Check if there's an item in the pipe.
152  if (!_in_pipe->check_read ()) {
153  _in_active = false;
154  return false;
155  }
156 
157  // If the next item in the pipe is message delimiter,
158  // initiate termination process.
159  if (_in_pipe->probe (is_delimiter)) {
160  msg_t msg;
161  const bool ok = _in_pipe->read (&msg);
162  zmq_assert (ok);
163  process_delimiter ();
164  return false;
165  }
166 
167  return true;
168 }
169 
170 bool zmq::pipe_t::read (msg_t *msg_)
171 {
172  if (unlikely (!_in_active))
173  return false;
174  if (unlikely (_state != active && _state != waiting_for_delimiter))
175  return false;
176 
177  while (true) {
178  if (!_in_pipe->read (msg_)) {
179  _in_active = false;
180  return false;
181  }
182 
183  // If this is a credential, ignore it and receive next message.
184  if (unlikely (msg_->is_credential ())) {
185  const int rc = msg_->close ();
186  zmq_assert (rc == 0);
187  } else {
188  break;
189  }
190  }
191 
192  // If delimiter was read, start termination process of the pipe.
193  if (msg_->is_delimiter ()) {
194  process_delimiter ();
195  return false;
196  }
197 
198  if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
199  _msgs_read++;
200 
201  if (_lwm > 0 && _msgs_read % _lwm == 0)
202  send_activate_write (_peer, _msgs_read);
203 
204  return true;
205 }
206 
207 bool zmq::pipe_t::check_write ()
208 {
209  if (unlikely (!_out_active || _state != active))
210  return false;
211 
212  const bool full = !check_hwm ();
213 
214  if (unlikely (full)) {
215  _out_active = false;
216  return false;
217  }
218 
219  return true;
220 }
221 
222 bool zmq::pipe_t::write (const msg_t *msg_)
223 {
224  if (unlikely (!check_write ()))
225  return false;
226 
227  const bool more = (msg_->flags () & msg_t::more) != 0;
228  const bool is_routing_id = msg_->is_routing_id ();
229  _out_pipe->write (*msg_, more);
230  if (!more && !is_routing_id)
231  _msgs_written++;
232 
233  return true;
234 }
235 
236 void zmq::pipe_t::rollback () const
237 {
238  // Remove incomplete message from the outbound pipe.
239  msg_t msg;
240  if (_out_pipe) {
241  while (_out_pipe->unwrite (&msg)) {
242  zmq_assert (msg.flags () & msg_t::more);
243  const int rc = msg.close ();
244  errno_assert (rc == 0);
245  }
246  }
247 }
248 
249 void zmq::pipe_t::flush ()
250 {
251  // The peer does not exist anymore at this point.
252  if (_state == term_ack_sent)
253  return;
254 
255  if (_out_pipe && !_out_pipe->flush ())
256  send_activate_read (_peer);
257 }
258 
259 void zmq::pipe_t::process_activate_read ()
260 {
261  if (!_in_active && (_state == active || _state == waiting_for_delimiter)) {
262  _in_active = true;
263  _sink->read_activated (this);
264  }
265 }
266 
267 void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
268 {
269  // Remember the peer's message sequence number.
270  _peers_msgs_read = msgs_read_;
271 
272  if (!_out_active && _state == active) {
273  _out_active = true;
274  _sink->write_activated (this);
275  }
276 }
277 
278 void zmq::pipe_t::process_hiccup (void *pipe_)
279 {
280  // Destroy old outpipe. Note that the read end of the pipe was already
281  // migrated to this thread.
282  zmq_assert (_out_pipe);
283  _out_pipe->flush ();
284  msg_t msg;
285  while (_out_pipe->read (&msg)) {
286  if (!(msg.flags () & msg_t::more))
287  _msgs_written--;
288  const int rc = msg.close ();
289  errno_assert (rc == 0);
290  }
291  LIBZMQ_DELETE (_out_pipe);
292 
293  // Plug in the new outpipe.
294  zmq_assert (pipe_);
295  _out_pipe = static_cast<upipe_t *> (pipe_);
296  _out_active = true;
297 
298  // If appropriate, notify the user about the hiccup.
299  if (_state == active)
300  _sink->hiccuped (this);
301 }
302 
303 void zmq::pipe_t::process_pipe_term ()
304 {
305  zmq_assert (_state == active || _state == delimiter_received
306  || _state == term_req_sent1);
307 
308  // This is the simple case of peer-induced termination. If there are no
309  // more pending messages to read, or if the pipe was configured to drop
310  // pending messages, we can move directly to the term_ack_sent state.
311  // Otherwise we'll hang up in waiting_for_delimiter state till all
312  // pending messages are read.
313  if (_state == active) {
314  if (_delay)
315  _state = waiting_for_delimiter;
316  else {
317  _state = term_ack_sent;
318  _out_pipe = NULL;
319  send_pipe_term_ack (_peer);
320  }
321  }
322 
323  // Delimiter happened to arrive before the term command. Now we have the
324  // term command as well, so we can move straight to term_ack_sent state.
325  else if (_state == delimiter_received) {
326  _state = term_ack_sent;
327  _out_pipe = NULL;
328  send_pipe_term_ack (_peer);
329  }
330 
331  // This is the case where both ends of the pipe are closed in parallel.
332  // We simply reply to the request by ack and continue waiting for our
333  // own ack.
334  else if (_state == term_req_sent1) {
335  _state = term_req_sent2;
336  _out_pipe = NULL;
337  send_pipe_term_ack (_peer);
338  }
339 }
340 
341 void zmq::pipe_t::process_pipe_term_ack ()
342 {
343  // Notify the user that all the references to the pipe should be dropped.
344  zmq_assert (_sink);
345  _sink->pipe_terminated (this);
346 
347  // In term_ack_sent and term_req_sent2 states there's nothing to do.
348  // Simply deallocate the pipe. In term_req_sent1 state we have to ack
349  // the peer before deallocating this side of the pipe.
350  // All the other states are invalid.
351  if (_state == term_req_sent1) {
352  _out_pipe = NULL;
353  send_pipe_term_ack (_peer);
354  } else
355  zmq_assert (_state == term_ack_sent || _state == term_req_sent2);
356 
357  // We'll deallocate the inbound pipe, the peer will deallocate the outbound
358  // pipe (which is an inbound pipe from its point of view).
359  // First, delete all the unread messages in the pipe. We have to do it by
360  // hand because msg_t doesn't have automatic destructor. Then deallocate
361  // the ypipe itself.
362 
363  if (!_conflate) {
364  msg_t msg;
365  while (_in_pipe->read (&msg)) {
366  const int rc = msg.close ();
367  errno_assert (rc == 0);
368  }
369  }
370 
371  LIBZMQ_DELETE (_in_pipe);
372 
373  // Deallocate the pipe object
374  delete this;
375 }
376 
377 void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_)
378 {
379  set_hwms (inhwm_, outhwm_);
380 }
381 
382 void zmq::pipe_t::set_nodelay ()
383 {
384  this->_delay = false;
385 }
386 
387 void zmq::pipe_t::terminate (bool delay_)
388 {
389  // Overload the value specified at pipe creation.
390  _delay = delay_;
391 
392  // If terminate was already called, we can ignore the duplicate invocation.
393  if (_state == term_req_sent1 || _state == term_req_sent2) {
394  return;
395  }
396  // If the pipe is in the final phase of async termination, it's going to
397  // closed anyway. No need to do anything special here.
398  if (_state == term_ack_sent) {
399  return;
400  }
401  // The simple sync termination case. Ask the peer to terminate and wait
402  // for the ack.
403  if (_state == active) {
404  send_pipe_term (_peer);
405  _state = term_req_sent1;
406  }
407  // There are still pending messages available, but the user calls
408  // 'terminate'. We can act as if all the pending messages were read.
409  else if (_state == waiting_for_delimiter && !_delay) {
410  // Drop any unfinished outbound messages.
411  rollback ();
412  _out_pipe = NULL;
413  send_pipe_term_ack (_peer);
414  _state = term_ack_sent;
415  }
416  // If there are pending messages still available, do nothing.
417  else if (_state == waiting_for_delimiter) {
418  }
419  // We've already got delimiter, but not term command yet. We can ignore
420  // the delimiter and ack synchronously terminate as if we were in
421  // active state.
422  else if (_state == delimiter_received) {
423  send_pipe_term (_peer);
424  _state = term_req_sent1;
425  }
426  // There are no other states.
427  else {
428  zmq_assert (false);
429  }
430 
431  // Stop outbound flow of messages.
432  _out_active = false;
433 
434  if (_out_pipe) {
435  // Drop any unfinished outbound messages.
436  rollback ();
437 
438  // Write the delimiter into the pipe. Note that watermarks are not
439  // checked; thus the delimiter can be written even when the pipe is full.
440  msg_t msg;
441  msg.init_delimiter ();
442  _out_pipe->write (msg, false);
443  flush ();
444  }
445 }
446 
447 bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
448 {
449  return msg_.is_delimiter ();
450 }
451 
452 int zmq::pipe_t::compute_lwm (int hwm_)
453 {
454  // Compute the low water mark. Following point should be taken
455  // into consideration:
456  //
457  // 1. LWM has to be less than HWM.
458  // 2. LWM cannot be set to very low value (such as zero) as after filling
459  // the queue it would start to refill only after all the messages are
460  // read from it and thus unnecessarily hold the progress back.
461  // 3. LWM cannot be set to very high value (such as HWM-1) as it would
462  // result in lock-step filling of the queue - if a single message is
463  // read from a full queue, writer thread is resumed to write exactly one
464  // message to the queue and go back to sleep immediately. This would
465  // result in low performance.
466  //
467  // Given the 3. it would be good to keep HWM and LWM as far apart as
468  // possible to reduce the thread switching overhead to almost zero.
469  // Let's make LWM 1/2 of HWM.
470  const int result = (hwm_ + 1) / 2;
471 
472  return result;
473 }
474 
475 void zmq::pipe_t::process_delimiter ()
476 {
477  zmq_assert (_state == active || _state == waiting_for_delimiter);
478 
479  if (_state == active)
480  _state = delimiter_received;
481  else {
482  rollback ();
483  _out_pipe = NULL;
484  send_pipe_term_ack (_peer);
485  _state = term_ack_sent;
486  }
487 }
488 
489 void zmq::pipe_t::hiccup ()
490 {
491  // If termination is already under way do nothing.
492  if (_state != active)
493  return;
494 
495  // We'll drop the pointer to the inpipe. From now on, the peer is
496  // responsible for deallocating it.
497 
498  // Create new inpipe.
499  _in_pipe =
500  _conflate
501  ? static_cast<upipe_t *> (new (std::nothrow) ypipe_conflate_t<msg_t> ())
502  : new (std::nothrow) ypipe_t<msg_t, message_pipe_granularity> ();
503 
504  alloc_assert (_in_pipe);
505  _in_active = true;
506 
507  // Notify the peer about the hiccup.
508  send_hiccup (_peer, _in_pipe);
509 }
510 
511 void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
512 {
513  int in = inhwm_ + std::max (_in_hwm_boost, 0);
514  int out = outhwm_ + std::max (_out_hwm_boost, 0);
515 
516  // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
517  if (inhwm_ <= 0 || _in_hwm_boost == 0)
518  in = 0;
519 
520  if (outhwm_ <= 0 || _out_hwm_boost == 0)
521  out = 0;
522 
523  _lwm = compute_lwm (in);
524  _hwm = out;
525 }
526 
527 void zmq::pipe_t::set_hwms_boost (int inhwmboost_, int outhwmboost_)
528 {
529  _in_hwm_boost = inhwmboost_;
530  _out_hwm_boost = outhwmboost_;
531 }
532 
533 bool zmq::pipe_t::check_hwm () const
534 {
535  const bool full =
536  _hwm > 0 && _msgs_written - _peers_msgs_read >= uint64_t (_hwm);
537  return !full;
538 }
539 
540 void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_)
541 {
542  if (_state == active)
543  send_pipe_hwm (_peer, inhwm_, outhwm_);
544 }
545 
546 void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_)
547 {
548  _endpoint_pair = ZMQ_MOVE (endpoint_pair_);
549 }
550 
551 const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
552 {
553  return _endpoint_pair;
554 }
555 
556 void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
557 {
558  if (_state == active) {
559  endpoint_uri_pair_t *ep =
560  new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
561  send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read,
562  socket_base_, ep);
563  }
564 }
565 
566 void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
567  own_t *socket_base_,
568  endpoint_uri_pair_t *endpoint_pair_)
569 {
570  send_pipe_stats_publish (socket_base_, queue_count_,
571  _msgs_written - _peers_msgs_read, endpoint_pair_);
572 }
573 
574 void zmq::pipe_t::send_disconnect_msg ()
575 {
576  if (_disconnect_msg.size () > 0 && _out_pipe) {
577  // Rollback any incomplete message in the pipe, and push the disconnect message.
578  rollback ();
579 
580  _out_pipe->write (_disconnect_msg, false);
581  flush ();
582  _disconnect_msg.init ();
583  }
584 }
585 
586 void zmq::pipe_t::set_disconnect_msg (
587  const std::vector<unsigned char> &disconnect_)
588 {
589  _disconnect_msg.close ();
590  const int rc =
591  _disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ());
592  errno_assert (rc == 0);
593 }
594 
595 void zmq::pipe_t::send_hiccup_msg (const std::vector<unsigned char> &hiccup_)
596 {
597  if (!hiccup_.empty () && _out_pipe) {
598  msg_t msg;
599  const int rc = msg.init_buffer (&hiccup_[0], hiccup_.size ());
600  errno_assert (rc == 0);
601 
602  _out_pipe->write (msg, false);
603  flush ();
604  }
605 }
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
zmq::msg_t::init_buffer
int init_buffer(const void *buf_, size_t size_)
Definition: msg.cpp:97
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::options_t
Definition: options.hpp:34
ypipe.hpp
active
@ active
Definition: proxy.cpp:141
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
ok
ROSCPP_DECL bool ok()
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
id
GLenum GLuint id
Definition: glcorearb.h:2695
pipe.hpp
zmq::send_hello_msg
void send_hello_msg(pipe_t *pipe_, const options_t &options_)
Definition: pipe.cpp:64
zmq::blob_t
Definition: blob.hpp:46
zmq::msg_t::routing_id
uint32_t routing_id
Definition: msg.hpp:233
err.hpp
ZMQ_MOVE
#define ZMQ_MOVE(x)
Definition: blob.hpp:33
zmq::send_routing_id
void send_routing_id(pipe_t *pipe_, const options_t &options_)
Definition: pipe.cpp:52
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
true
#define true
Definition: cJSON.c:65
zmq::pipepair
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], const int hwms_[2], const bool conflate_[2])
ep
const SETUP_TEARDOWN_TESTCONTEXT char ep[]
Definition: test_term_endpoint_tipc.cpp:8
zmq::msg_t
Definition: msg.hpp:33
ypipe_conflate.hpp
unlikely
#define unlikely(x)
Definition: likely.hpp:11
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57