36 s =
new (std::nothrow)
37 req_session_t (io_thread_, active_, socket_,
options_, addr_);
40 s =
new (std::nothrow)
41 radio_session_t (io_thread_, active_, socket_,
options_, addr_);
44 s =
new (std::nothrow)
45 dish_session_t (io_thread_, active_, socket_,
options_, addr_);
65 #ifdef ZMQ_BUILD_DRAFT_API
67 s =
new (std::nothrow) hello_msg_session_t (
68 io_thread_, active_, socket_,
options_, addr_);
71 io_thread_, active_, socket_,
options_, addr_);
75 s =
new (std::nothrow)
98 _incomplete_in (
false),
102 _io_thread (io_thread_),
103 _has_linger_timer (
false),
107 _wss_hostname (
options_.wss_hostname)
114 return _engine->get_endpoint ();
123 if (_has_linger_timer) {
124 cancel_timer (linger_timer_id);
125 _has_linger_timer =
false;
130 _engine->terminate ();
141 _pipe->set_event_sink (
this);
146 if (!_pipe || !_pipe->read (msg_)) {
162 if (_pipe && _pipe->write (msg_)) {
163 const int rc = msg_->
init ();
174 if (_zap_pipe ==
NULL) {
179 if (!_zap_pipe->read (msg_)) {
189 if (_zap_pipe ==
NULL || !_zap_pipe->write (msg_)) {
197 const int rc = msg_->
init ();
228 while (_incomplete_in) {
230 int rc = msg.
init ();
232 rc = pull_msg (&msg);
242 zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
243 || _terminating_pipes.count (pipe_) == 1);
245 if (pipe_ == _pipe) {
248 if (_has_linger_timer) {
249 cancel_timer (linger_timer_id);
250 _has_linger_timer =
false;
252 }
else if (pipe_ == _zap_pipe)
256 _terminating_pipes.erase (pipe_);
258 if (!is_terminating () &&
options.raw_socket) {
260 _engine->terminate ();
269 if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
278 if (
unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
279 zmq_assert (_terminating_pipes.count (pipe_) == 1);
285 _pipe->check_read ();
289 if (
likely (pipe_ == _pipe))
290 _engine->restart_output ();
293 _engine->zap_msg_available ();
300 if (_pipe != pipe_) {
301 zmq_assert (_terminating_pipes.count (pipe_) == 1);
306 _engine->restart_input ();
324 start_connecting (
false);
335 if (_zap_pipe !=
NULL)
338 endpoint_t peer = find_endpoint (
"inproc://zeromq.zap.01");
349 pipe_t *new_pipes[2] = {
NULL,
NULL};
350 int hwms[2] = {0, 0};
351 bool conflates[2] = {
false,
false};
352 int rc =
pipepair (parents, new_pipes, hwms, conflates);
356 _zap_pipe = new_pipes[0];
357 _zap_pipe->set_nodelay ();
358 _zap_pipe->set_event_sink (
this);
360 send_bind (peer.
socket, new_pipes[1],
false);
368 bool ok = _zap_pipe->write (&
id);
391 _engine->plug (_io_thread,
this);
397 if (!_pipe && !is_terminating ()) {
398 object_t *parents[2] = {
this, _socket};
403 int hwms[2] = {conflate ? -1 :
options.rcvhwm,
404 conflate ? -1 :
options.sndhwm};
405 bool conflates[2] = {conflate, conflate};
406 const int rc =
pipepair (parents, pipes, hwms, conflates);
410 pipes[0]->set_event_sink (
this);
418 pipes[0]->set_endpoint_pair (_engine->get_endpoint ());
419 pipes[1]->set_endpoint_pair (_engine->get_endpoint ());
422 send_bind (_socket, pipes[1]);
437 if (!_active && handshaked_ &&
options.can_recv_disconnect_msg
438 && !
options.disconnect_msg.empty ()) {
439 _pipe->set_disconnect_msg (
options.disconnect_msg);
440 _pipe->send_disconnect_msg ();
444 if (_active && handshaked_ &&
options.can_recv_hiccup_msg
445 && !
options.hiccup_msg.empty ()) {
446 _pipe->send_hiccup_msg (
options.hiccup_msg);
466 _pipe->terminate (
false);
468 _zap_pipe->terminate (
false);
477 _pipe->check_read ();
480 _zap_pipe->check_read ();
490 if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
503 add_timer (linger_, linger_timer_id);
504 _has_linger_timer =
true;
509 _pipe->terminate (linger_ != 0);
515 _pipe->check_read ();
518 if (_zap_pipe !=
NULL)
519 _zap_pipe->terminate (
false);
527 _has_linger_timer =
false;
531 _pipe->terminate (
false);
537 _addr->to_string (*
ep);
538 send_term_endpoint (_socket,
ep);
545 if (_pipe &&
options.immediate == 1
546 #ifdef ZMQ_HAVE_OPENPGM
547 && _addr->protocol != protocol_name::pgm
548 && _addr->protocol != protocol_name::epgm
551 && _addr->protocol != protocol_name::norm
555 _pipe->terminate (
false);
556 _terminating_pipes.insert (_pipe);
559 if (_has_linger_timer) {
560 cancel_timer (linger_timer_id);
561 _has_linger_timer =
false;
569 start_connecting (
true);
572 _addr->to_string (*
ep);
573 send_term_endpoint (_socket,
ep);
590 io_thread_t *io_thread = choose_io_thread (
options.affinity);
596 if (!
options.socks_proxy_address.empty ()) {
597 address_t *proxy_address =
new (std::nothrow)
601 connecter =
new (std::nothrow) socks_connecter_t (
602 io_thread,
this,
options, _addr, proxy_address, wait_);
604 if (!
options.socks_proxy_username.empty ()) {
605 reinterpret_cast<socks_connecter_t *
> (connecter)
606 ->set_auth_method_basic (
options.socks_proxy_username,
610 connecter =
new (std::nothrow)
611 tcp_connecter_t (io_thread,
this,
options, _addr, wait_);
614 #if defined ZMQ_HAVE_IPC
615 else if (_addr->protocol == protocol_name::ipc) {
616 connecter =
new (std::nothrow)
617 ipc_connecter_t (io_thread,
this,
options, _addr, wait_);
620 #if defined ZMQ_HAVE_TIPC
621 else if (_addr->protocol == protocol_name::tipc) {
622 connecter =
new (std::nothrow)
623 tipc_connecter_t (io_thread,
this,
options, _addr, wait_);
626 #if defined ZMQ_HAVE_VMCI
627 else if (_addr->protocol == protocol_name::vmci) {
628 connecter =
new (std::nothrow)
629 vmci_connecter_t (io_thread,
this,
options, _addr, wait_);
632 #if defined ZMQ_HAVE_WS
633 else if (_addr->protocol == protocol_name::ws) {
634 connecter =
new (std::nothrow) ws_connecter_t (
638 #if defined ZMQ_HAVE_WSS
639 else if (_addr->protocol == protocol_name::wss) {
640 connecter =
new (std::nothrow) ws_connecter_t (
641 io_thread,
this,
options, _addr, wait_,
true, _wss_hostname);
644 if (connecter !=
NULL) {
646 launch_child (connecter);
654 udp_engine_t *engine =
new (std::nothrow) udp_engine_t (
options);
671 int rc = engine->init (_addr,
send, recv);
674 send_attach (
this, engine);
679 #ifdef ZMQ_HAVE_OPENPGM
682 if (_addr->protocol ==
"pgm" || _addr->protocol ==
"epgm") {
687 bool const udp_encapsulation = _addr->protocol ==
"epgm";
694 pgm_sender_t *pgm_sender =
695 new (std::nothrow) pgm_sender_t (io_thread,
options);
699 pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
702 send_attach (
this, pgm_sender);
705 pgm_receiver_t *pgm_receiver =
706 new (std::nothrow) pgm_receiver_t (io_thread,
options);
710 pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
713 send_attach (
this, pgm_receiver);
721 if (_addr->protocol ==
"norm") {
727 norm_engine_t *norm_sender =
728 new (std::nothrow) norm_engine_t (io_thread,
options);
731 int rc = norm_sender->init (_addr->address.c_str (),
true,
false);
734 send_attach (
this, norm_sender);
738 norm_engine_t *norm_receiver =
739 new (std::nothrow) norm_engine_t (io_thread,
options);
742 int rc = norm_receiver->init (_addr->address.c_str (),
false,
true);
745 send_attach (
this, norm_receiver);
749 #endif // ZMQ_HAVE_NORM
754 zmq::hello_msg_session_t::hello_msg_session_t (io_thread_t *io_thread_,
764 zmq::hello_msg_session_t::~hello_msg_session_t ()
769 int zmq::hello_msg_session_t::pull_msg (msg_t *msg_)
775 msg_->init_buffer (&
options.hello_msg[0],
options.hello_msg.size ());
784 void zmq::hello_msg_session_t::reset ()