17 const bool conflate_[2])
22 typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t;
23 typedef ypipe_conflate_t<msg_t> upipe_conflate_t;
25 pipe_t::upipe_t *upipe1;
27 upipe1 =
new (std::nothrow) upipe_conflate_t ();
29 upipe1 =
new (std::nothrow) upipe_normal_t ();
32 pipe_t::upipe_t *upipe2;
34 upipe2 =
new (std::nothrow) upipe_conflate_t ();
36 upipe2 =
new (std::nothrow) upipe_normal_t ();
39 pipes_[0] =
new (std::nothrow)
40 pipe_t (parents_[0], upipe1, upipe2, hwms_[1], hwms_[0], conflate_[0]);
42 pipes_[1] =
new (std::nothrow)
43 pipe_t (parents_[1], upipe2, upipe1, hwms_[0], hwms_[1], conflate_[1]);
46 pipes_[0]->set_peer (pipes_[1]);
47 pipes_[1]->set_peer (pipes_[0]);
55 const int rc =
id.init_size (
options_.routing_id_size);
59 const bool written = pipe_->write (&
id);
70 const bool written = pipe_->write (&hello);
75 zmq::pipe_t::pipe_t (object_t *parent_,
87 _lwm (compute_lwm (inhwm_)),
97 _server_socket_routing_id (0),
100 _disconnect_msg.init ();
103 zmq::pipe_t::~pipe_t ()
105 _disconnect_msg.close ();
108 void zmq::pipe_t::set_peer (pipe_t *peer_)
115 void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
122 void zmq::pipe_t::set_server_socket_routing_id (
123 uint32_t server_socket_routing_id_)
125 _server_socket_routing_id = server_socket_routing_id_;
128 uint32_t zmq::pipe_t::get_server_socket_routing_id ()
const
130 return _server_socket_routing_id;
133 void zmq::pipe_t::set_router_socket_routing_id (
134 const blob_t &router_socket_routing_id_)
136 _router_socket_routing_id.set_deep_copy (router_socket_routing_id_);
139 const zmq::blob_t &zmq::pipe_t::get_routing_id ()
const
141 return _router_socket_routing_id;
144 bool zmq::pipe_t::check_read ()
148 if (
unlikely (_state !=
active && _state != waiting_for_delimiter))
152 if (!_in_pipe->check_read ()) {
159 if (_in_pipe->probe (is_delimiter)) {
161 const bool ok = _in_pipe->read (&msg);
163 process_delimiter ();
170 bool zmq::pipe_t::read (msg_t *msg_)
174 if (
unlikely (_state !=
active && _state != waiting_for_delimiter))
178 if (!_in_pipe->read (msg_)) {
184 if (
unlikely (msg_->is_credential ())) {
185 const int rc = msg_->close ();
193 if (msg_->is_delimiter ()) {
194 process_delimiter ();
198 if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
201 if (_lwm > 0 && _msgs_read % _lwm == 0)
202 send_activate_write (_peer, _msgs_read);
207 bool zmq::pipe_t::check_write ()
212 const bool full = !check_hwm ();
222 bool zmq::pipe_t::write (
const msg_t *msg_)
227 const bool more = (msg_->flags () & msg_t::more) != 0;
228 const bool is_routing_id = msg_->is_routing_id ();
229 _out_pipe->write (*msg_, more);
230 if (!more && !is_routing_id)
236 void zmq::pipe_t::rollback ()
const
241 while (_out_pipe->unwrite (&msg)) {
243 const int rc = msg.close ();
249 void zmq::pipe_t::flush ()
252 if (_state == term_ack_sent)
255 if (_out_pipe && !_out_pipe->flush ())
256 send_activate_read (_peer);
259 void zmq::pipe_t::process_activate_read ()
261 if (!_in_active && (_state ==
active || _state == waiting_for_delimiter)) {
263 _sink->read_activated (
this);
267 void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
270 _peers_msgs_read = msgs_read_;
272 if (!_out_active && _state ==
active) {
274 _sink->write_activated (
this);
278 void zmq::pipe_t::process_hiccup (
void *pipe_)
285 while (_out_pipe->read (&msg)) {
286 if (!(msg.flags () & msg_t::more))
288 const int rc = msg.close ();
295 _out_pipe =
static_cast<upipe_t *
> (pipe_);
300 _sink->hiccuped (
this);
303 void zmq::pipe_t::process_pipe_term ()
306 || _state == term_req_sent1);
315 _state = waiting_for_delimiter;
317 _state = term_ack_sent;
319 send_pipe_term_ack (_peer);
325 else if (_state == delimiter_received) {
326 _state = term_ack_sent;
328 send_pipe_term_ack (_peer);
334 else if (_state == term_req_sent1) {
335 _state = term_req_sent2;
337 send_pipe_term_ack (_peer);
341 void zmq::pipe_t::process_pipe_term_ack ()
345 _sink->pipe_terminated (
this);
351 if (_state == term_req_sent1) {
353 send_pipe_term_ack (_peer);
355 zmq_assert (_state == term_ack_sent || _state == term_req_sent2);
365 while (_in_pipe->read (&msg)) {
366 const int rc = msg.close ();
377 void zmq::pipe_t::process_pipe_hwm (
int inhwm_,
int outhwm_)
379 set_hwms (inhwm_, outhwm_);
382 void zmq::pipe_t::set_nodelay ()
384 this->_delay =
false;
387 void zmq::pipe_t::terminate (
bool delay_)
393 if (_state == term_req_sent1 || _state == term_req_sent2) {
398 if (_state == term_ack_sent) {
404 send_pipe_term (_peer);
405 _state = term_req_sent1;
409 else if (_state == waiting_for_delimiter && !_delay) {
413 send_pipe_term_ack (_peer);
414 _state = term_ack_sent;
417 else if (_state == waiting_for_delimiter) {
422 else if (_state == delimiter_received) {
423 send_pipe_term (_peer);
424 _state = term_req_sent1;
441 msg.init_delimiter ();
442 _out_pipe->write (msg,
false);
447 bool zmq::pipe_t::is_delimiter (
const msg_t &msg_)
449 return msg_.is_delimiter ();
452 int zmq::pipe_t::compute_lwm (
int hwm_)
470 const int result = (hwm_ + 1) / 2;
475 void zmq::pipe_t::process_delimiter ()
480 _state = delimiter_received;
484 send_pipe_term_ack (_peer);
485 _state = term_ack_sent;
489 void zmq::pipe_t::hiccup ()
501 ?
static_cast<upipe_t *
> (
new (std::nothrow) ypipe_conflate_t<msg_t> ())
502 :
new (std::nothrow) ypipe_t<msg_t, message_pipe_granularity> ();
508 send_hiccup (_peer, _in_pipe);
511 void zmq::pipe_t::set_hwms (
int inhwm_,
int outhwm_)
513 int in = inhwm_ + std::max (_in_hwm_boost, 0);
514 int out = outhwm_ + std::max (_out_hwm_boost, 0);
517 if (inhwm_ <= 0 || _in_hwm_boost == 0)
520 if (outhwm_ <= 0 || _out_hwm_boost == 0)
523 _lwm = compute_lwm (in);
527 void zmq::pipe_t::set_hwms_boost (
int inhwmboost_,
int outhwmboost_)
529 _in_hwm_boost = inhwmboost_;
530 _out_hwm_boost = outhwmboost_;
533 bool zmq::pipe_t::check_hwm ()
const
536 _hwm > 0 && _msgs_written - _peers_msgs_read >= uint64_t (_hwm);
540 void zmq::pipe_t::send_hwms_to_peer (
int inhwm_,
int outhwm_)
543 send_pipe_hwm (_peer, inhwm_, outhwm_);
548 _endpoint_pair =
ZMQ_MOVE (endpoint_pair_);
553 return _endpoint_pair;
556 void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
559 endpoint_uri_pair_t *
ep =
560 new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
561 send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read,
566 void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
568 endpoint_uri_pair_t *endpoint_pair_)
570 send_pipe_stats_publish (socket_base_, queue_count_,
571 _msgs_written - _peers_msgs_read, endpoint_pair_);
574 void zmq::pipe_t::send_disconnect_msg ()
576 if (_disconnect_msg.size () > 0 && _out_pipe) {
580 _out_pipe->write (_disconnect_msg,
false);
582 _disconnect_msg.init ();
586 void zmq::pipe_t::set_disconnect_msg (
587 const std::vector<unsigned char> &disconnect_)
589 _disconnect_msg.close ();
591 _disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ());
595 void zmq::pipe_t::send_hiccup_msg (
const std::vector<unsigned char> &hiccup_)
597 if (!hiccup_.empty () && _out_pipe) {
599 const int rc = msg.init_buffer (&hiccup_[0], hiccup_.size ());
602 _out_pipe->write (msg,
false);