11 #if defined ZMQ_HAVE_WINDOWS
13 #if defined _WIN32_WCE
14 #include <cmnintrin.h>
52 #if defined ZMQ_HAVE_VMCI
57 #ifdef ZMQ_HAVE_OPENPGM
92 const std::pair<map_t::iterator, map_t::iterator>
range =
93 _inprocs.equal_range (endpoint_uri_str_);
100 it->second->send_disconnect_msg ();
101 it->second->terminate (
true);
111 if (
it->second == pipe_) {
119 return _tag == 0xbaddecaf;
128 class ctx_t *parent_,
135 s =
new (std::nothrow) pair_t (parent_, tid_, sid_);
138 s =
new (std::nothrow) pub_t (parent_, tid_, sid_);
141 s =
new (std::nothrow) sub_t (parent_, tid_, sid_);
144 s =
new (std::nothrow) req_t (parent_, tid_, sid_);
147 s =
new (std::nothrow) rep_t (parent_, tid_, sid_);
150 s =
new (std::nothrow)
dealer_t (parent_, tid_, sid_);
153 s =
new (std::nothrow)
router_t (parent_, tid_, sid_);
156 s =
new (std::nothrow) pull_t (parent_, tid_, sid_);
159 s =
new (std::nothrow) push_t (parent_, tid_, sid_);
162 s =
new (std::nothrow)
xpub_t (parent_, tid_, sid_);
165 s =
new (std::nothrow)
xsub_t (parent_, tid_, sid_);
168 s =
new (std::nothrow) stream_t (parent_, tid_, sid_);
171 s =
new (std::nothrow)
server_t (parent_, tid_, sid_);
174 s =
new (std::nothrow) client_t (parent_, tid_, sid_);
177 s =
new (std::nothrow) radio_t (parent_, tid_, sid_);
180 s =
new (std::nothrow) dish_t (parent_, tid_, sid_);
183 s =
new (std::nothrow) gather_t (parent_, tid_, sid_);
186 s =
new (std::nothrow) scatter_t (parent_, tid_, sid_);
189 s =
new (std::nothrow) dgram_t (parent_, tid_, sid_);
192 s =
new (std::nothrow) peer_t (parent_, tid_, sid_);
195 s =
new (std::nothrow) channel_t (parent_, tid_, sid_);
204 if (
s->_mailbox ==
NULL) {
205 s->_destroyed =
true;
217 own_t (parent_, tid_),
243 mailbox_t *
m =
new (std::nothrow) mailbox_t ();
256 size_t routing_id_size_)
const
271 if (_reaper_signaler)
303 const std::string::size_type pos = uri.find (
"://");
304 if (pos == std::string::npos) {
308 protocol_ = uri.substr (0, pos);
309 path_ = uri.substr (pos + 3);
311 if (protocol_.empty () || path_.empty ()) {
322 #
if defined ZMQ_HAVE_IPC
323 && protocol_ != protocol_name::ipc
327 && protocol_ != protocol_name::ws
330 && protocol_ != protocol_name::wss
332 #
if defined ZMQ_HAVE_OPENPGM
334 && protocol_ != protocol_name::pgm
335 && protocol_ != protocol_name::epgm
339 && protocol_ != protocol_name::tipc
341 #
if defined ZMQ_HAVE_NORM
342 && protocol_ != protocol_name::norm
344 #
if defined ZMQ_HAVE_VMCI
345 && protocol_ != protocol_name::vmci
355 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
356 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
357 if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm
358 || protocol_ == protocol_name::norm)
359 #elif defined ZMQ_HAVE_OPENPGM
360 if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm)
362 if (protocol_ == protocol_name::norm
366 errno = ENOCOMPATPROTO;
374 errno = ENOCOMPATPROTO;
383 bool subscribe_to_all_,
384 bool locally_initiated_)
387 pipe_->set_event_sink (
this);
388 _pipes.push_back (pipe_);
391 xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
395 if (is_terminating ()) {
396 register_term_acks (1);
397 pipe_->terminate (
false);
413 int rc = xsetsockopt (option_, optval_, optvallen_);
420 rc =
options.setsockopt (option_, optval_, optvallen_);
421 update_pipe_options (option_);
438 int rc = xgetsockopt (option_, optval_, optvallen_);
444 return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
454 return do_getsockopt<fd_t> (
456 (
static_cast<mailbox_t *
> (_mailbox))->
get_fd ());
460 const int rc = process_commands (0,
false);
466 return do_getsockopt<int> (optval_, optvallen_,
476 return do_getsockopt<int> (optval_, optvallen_, _thread_safe ? 1 : 0);
479 return options.getsockopt (option_, optval_, optvallen_);
486 return xjoin (group_);
493 return xleave (group_);
501 (
static_cast<mailbox_safe_t *
> (_mailbox))->add_signaler (
s_);
509 (
static_cast<mailbox_safe_t *
> (_mailbox))->remove_signaler (
s_);
522 int rc = process_commands (0,
false);
530 if (parse_uri (endpoint_uri_, protocol,
address)
531 || check_protocol (protocol)) {
537 rc = register_endpoint (endpoint_uri_, endpoint);
539 connect_pending (endpoint_uri_,
this);
540 _last_endpoint.assign (endpoint_uri_);
546 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
547 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
548 if (protocol == protocol_name::pgm || protocol == protocol_name::epgm
549 || protocol == protocol_name::norm) {
550 #elif defined ZMQ_HAVE_OPENPGM
551 if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
552 #else // defined ZMQ_HAVE_NORM
553 if (protocol == protocol_name::norm) {
557 rc = connect (endpoint_uri_);
571 io_thread_t *io_thread = choose_io_thread (
options.affinity);
595 object_t *parents[2] = {
this, session};
596 pipe_t *new_pipes[2] = {
NULL,
NULL};
599 bool conflates[2] = {
false,
false};
600 rc =
pipepair (parents, new_pipes, hwms, conflates);
604 attach_pipe (new_pipes[0],
true,
true);
605 pipe_t *
const newpipe = new_pipes[0];
616 static_cast<own_t *
> (session), newpipe);
623 io_thread_t *io_thread = choose_io_thread (
options.affinity);
630 tcp_listener_t *listener =
631 new (std::nothrow) tcp_listener_t (io_thread,
this,
options);
633 rc = listener->set_local_address (
address.c_str ());
642 listener->get_local_address (_last_endpoint);
652 if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
653 ws_listener_t *listener =
new (std::nothrow) ws_listener_t (
654 io_thread,
this,
options, protocol == protocol_name::wss);
656 if (protocol == protocol_name::ws) {
657 ws_listener_t *listener =
658 new (std::nothrow) ws_listener_t (io_thread,
this,
options,
false);
661 rc = listener->set_local_address (
address.c_str ());
670 listener->get_local_address (_last_endpoint);
679 #if defined ZMQ_HAVE_IPC
680 if (protocol == protocol_name::ipc) {
681 ipc_listener_t *listener =
682 new (std::nothrow) ipc_listener_t (io_thread,
this,
options);
684 int rc = listener->set_local_address (
address.c_str ());
693 listener->get_local_address (_last_endpoint);
701 #if defined ZMQ_HAVE_TIPC
702 if (protocol == protocol_name::tipc) {
703 tipc_listener_t *listener =
704 new (std::nothrow) tipc_listener_t (io_thread,
this,
options);
706 int rc = listener->set_local_address (
address.c_str ());
715 listener->get_local_address (_last_endpoint);
724 #if defined ZMQ_HAVE_VMCI
725 if (protocol == protocol_name::vmci) {
726 vmci_listener_t *listener =
727 new (std::nothrow) vmci_listener_t (io_thread,
this,
options);
729 int rc = listener->set_local_address (
address.c_str ());
737 listener->get_local_address (_last_endpoint);
753 return connect_internal (endpoint_uri_);
764 int rc = process_commands (0,
false);
772 if (parse_uri (endpoint_uri_, protocol,
address)
773 || check_protocol (protocol)) {
783 const endpoint_t peer = find_endpoint (endpoint_uri_);
798 pipe_t *new_pipes[2] = {
NULL,
NULL};
802 int hwms[2] = {conflate ? -1 : sndhwm, conflate ? -1 : rcvhwm};
803 bool conflates[2] = {conflate, conflate};
804 rc =
pipepair (parents, new_pipes, hwms, conflates);
820 #ifdef ZMQ_BUILD_DRAFT_API
822 if (
options.can_send_hello_msg &&
options.hello_msg.size () > 0) {
828 pend_connection (
std::string (endpoint_uri_), endpoint, new_pipes);
840 #ifdef ZMQ_BUILD_DRAFT_API
842 if (
options.can_send_hello_msg &&
options.hello_msg.size () > 0) {
860 send_bind (peer.
socket, new_pipes[1],
false);
864 attach_pipe (new_pipes[0],
false,
true);
867 _last_endpoint.assign (endpoint_uri_);
870 _inprocs.emplace (endpoint_uri_, new_pipes[0]);
875 const bool is_single_connect =
879 if (0 != _endpoints.count (endpoint_uri_)) {
888 io_thread_t *io_thread = choose_io_thread (
options.affinity);
943 else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
944 if (protocol == protocol_name::wss) {
951 else if (protocol == protocol_name::ws) {
967 #if defined ZMQ_HAVE_IPC
968 else if (protocol == protocol_name::ipc) {
969 paddr->
resolved.ipc_addr =
new (std::nothrow) ipc_address_t ();
998 #ifdef ZMQ_HAVE_OPENPGM
999 if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
1000 struct pgm_addrinfo_t *res =
NULL;
1001 uint16_t port_number = 0;
1003 pgm_socket_t::init_address (
address.c_str (), &res, &port_number);
1005 pgm_freeaddrinfo (res);
1006 if (rc != 0 || port_number == 0) {
1011 #if defined ZMQ_HAVE_TIPC
1012 else if (protocol == protocol_name::tipc) {
1013 paddr->
resolved.tipc_addr =
new (std::nothrow) tipc_address_t ();
1020 const sockaddr_tipc *
const saddr =
1021 reinterpret_cast<const sockaddr_tipc *
> (
1022 paddr->
resolved.tipc_addr->addr ());
1024 if (saddr->addrtype == TIPC_ADDR_ID
1025 && paddr->
resolved.tipc_addr->is_random ()) {
1032 #if defined ZMQ_HAVE_VMCI
1033 else if (protocol == protocol_name::vmci) {
1035 new (std::nothrow) vmci_address_t (this->get_ctx ());
1052 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
1053 const bool subscribe_to_all =
1054 protocol == protocol_name::pgm || protocol == protocol_name::epgm
1056 #elif defined ZMQ_HAVE_OPENPGM
1057 const bool subscribe_to_all = protocol == protocol_name::pgm
1058 || protocol == protocol_name::epgm
1060 #elif defined ZMQ_HAVE_NORM
1061 const bool subscribe_to_all =
1066 pipe_t *newpipe =
NULL;
1068 if (
options.immediate != 1 || subscribe_to_all) {
1070 object_t *parents[2] = {
this, session};
1071 pipe_t *new_pipes[2] = {
NULL,
NULL};
1075 int hwms[2] = {conflate ? -1 :
options.sndhwm,
1076 conflate ? -1 :
options.rcvhwm};
1077 bool conflates[2] = {conflate, conflate};
1078 rc =
pipepair (parents, new_pipes, hwms, conflates);
1082 attach_pipe (new_pipes[0], subscribe_to_all,
true);
1083 newpipe = new_pipes[0];
1093 static_cast<own_t *
> (session), newpipe);
1099 const char *tcp_address_)
1106 if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1112 tcp_addr->
to_string (endpoint_uri_pair_);
1113 if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1116 tcp_addr->
to_string (endpoint_uri_pair_);
1122 return endpoint_uri_pair_;
1129 launch_child (endpoint_);
1130 _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.
identifier (),
1134 pipe_->set_endpoint_pair (endpoint_pair_);
1155 const int rc = process_commands (0,
false);
1163 if (parse_uri (endpoint_uri_, uri_protocol, uri_path)
1164 || check_protocol (uri_protocol)) {
1172 return unregister_endpoint (endpoint_uri_str,
this) == 0
1174 : _inprocs.erase_pipes (endpoint_uri_str);
1179 ? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
1183 const std::pair<endpoints_t::iterator, endpoints_t::iterator>
range =
1184 _endpoints.equal_range (resolved_endpoint_uri);
1192 if (
it->second.second !=
NULL)
1193 it->second.second->terminate (
false);
1194 term_child (
it->second.first);
1196 _endpoints.erase (
range.first,
range.second);
1199 _disconnected =
true;
1222 int rc = process_commands (0,
true);
1246 rc = msg_->
close ();
1282 timeout =
static_cast<int> (
end - _clock.now_ms ());
1318 if (
unlikely (process_commands (0,
false) != 0)) {
1325 int rc = xrecv (msg_);
1332 extract_flags (msg_);
1341 if (
unlikely (process_commands (0,
false) != 0)) {
1350 extract_flags (msg_);
1362 bool block = (_ticks != 0);
1377 timeout =
static_cast<int> (
end - _clock.now_ms ());
1385 extract_flags (msg_);
1395 (
static_cast<mailbox_safe_t *
> (_mailbox))->clear_signalers ();
1427 fd = (
static_cast<mailbox_t *
> (_mailbox))->get_fd ();
1431 _reaper_signaler =
new (std::nothrow)
signaler_t ();
1435 fd = _reaper_signaler->get_fd ();
1436 (
static_cast<mailbox_safe_t *
> (_mailbox))
1437 ->add_signaler (_reaper_signaler);
1440 _reaper_signaler->send ();
1443 _handle = _poller->add_fd (fd,
this);
1444 _poller->set_pollin (_handle);
1454 if (timeout_ == 0) {
1467 if (tsc && throttle_) {
1479 int rc = _mailbox->recv (&cmd, timeout_);
1489 rc = _mailbox->recv (&cmd, 0);
1494 if (_ctx_terminated) {
1511 _ctx_terminated =
true;
1516 attach_pipe (pipe_);
1524 unregister_endpoints (
this);
1529 _pipes[
i]->send_disconnect_msg ();
1530 _pipes[
i]->terminate (
false);
1532 register_term_acks (
static_cast<int> (_pipes.size ()));
1540 term_endpoint (endpoint_->c_str ());
1545 uint64_t outbound_queue_count_,
1546 uint64_t inbound_queue_count_,
1549 uint64_t
values[2] = {outbound_queue_count_, inbound_queue_count_};
1551 delete endpoint_pair_;
1572 if (_pipes.size () == 0) {
1577 _pipes[
i]->send_stats_to_peer (
this);
1671 _reaper_signaler->recv ();
1673 process_commands (0,
false);
1693 _poller->rm_fd (_handle);
1696 destroy_socket (
this);
1708 xread_activated (pipe_);
1713 xwrite_activated (pipe_);
1719 pipe_->terminate (
false);
1728 xpipe_terminated (pipe_);
1731 _inprocs.erase_pipe (pipe_);
1735 _pipes.erase (pipe_);
1738 const std::string &identifier = pipe_->get_endpoint_pair ().identifier ();
1739 if (!identifier.empty ()) {
1740 std::pair<endpoints_t::iterator, endpoints_t::iterator>
range;
1741 range = _endpoints.equal_range (identifier);
1744 if (
it->second.second == pipe_) {
1745 it->second.second =
NULL;
1751 if (is_terminating ())
1752 unregister_term_ack ();
1778 if (
unlikely (event_version_ == 1 && events_ >> 16 != 0)) {
1784 if (endpoint_ ==
NULL) {
1791 if (parse_uri (endpoint_, protocol,
address) || check_protocol (protocol))
1801 if (_monitor_socket !=
NULL) {
1802 stop_monitor (
true);
1820 _monitor_events = events_;
1821 options.monitor_event_version = event_version_;
1823 _monitor_socket =
zmq_socket (get_ctx (), type_);
1824 if (_monitor_socket ==
NULL)
1832 stop_monitor (
false);
1835 rc =
zmq_bind (_monitor_socket, endpoint_);
1837 stop_monitor (
false);
1844 uint64_t
values[1] = {
static_cast<uint64_t
> (fd_)};
1851 uint64_t
values[1] = {
static_cast<uint64_t
> (err_)};
1858 uint64_t
values[1] = {
static_cast<uint64_t
> (interval_)};
1865 uint64_t
values[1] = {
static_cast<uint64_t
> (fd_)};
1872 uint64_t
values[1] = {
static_cast<uint64_t
> (err_)};
1879 uint64_t
values[1] = {
static_cast<uint64_t
> (fd_)};
1886 uint64_t
values[1] = {
static_cast<uint64_t
> (err_)};
1893 uint64_t
values[1] = {
static_cast<uint64_t
> (fd_)};
1900 uint64_t
values[1] = {
static_cast<uint64_t
> (err_)};
1907 uint64_t
values[1] = {
static_cast<uint64_t
> (fd_)};
1914 uint64_t
values[1] = {
static_cast<uint64_t
> (err_)};
1921 uint64_t
values[1] = {
static_cast<uint64_t
> (err_)};
1928 uint64_t
values[1] = {
static_cast<uint64_t
> (err_)};
1935 uint64_t
values[1] = {
static_cast<uint64_t
> (err_)};
1941 uint64_t values_count_,
1945 if (_monitor_events & type_) {
1946 monitor_event (type_, values_, values_count_, endpoint_uri_pair_);
1953 const uint64_t values_[],
1954 uint64_t values_count_,
1960 if (_monitor_socket) {
1963 switch (
options.monitor_event_version) {
1966 zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
1970 <= std::numeric_limits<uint32_t>::max ());
1973 const uint16_t
event =
static_cast<uint16_t
> (event_);
1974 const uint32_t
value =
static_cast<uint32_t
> (values_[0]);
1988 endpoint_uri.size ());
1994 memcpy (
zmq_msg_data (&msg), &event_,
sizeof (event_));
2000 sizeof (values_count_));
2004 for (uint64_t
i = 0;
i < values_count_; ++
i) {
2007 sizeof (values_[
i]));
2014 endpoint_uri_pair_.
local.size ());
2020 endpoint_uri_pair_.
remote.size ());
2032 if (_monitor_socket) {
2034 && send_monitor_stopped_event_) {
2035 uint64_t
values[1] = {0};
2040 _monitor_socket =
NULL;
2041 _monitor_events = 0;
2047 return _disconnected;
2063 const void *optval_,
2070 if (optval_ && optvallen_) {
2071 _connect_routing_id.assign (
static_cast<const char *
> (optval_),
2083 const out_pipes_t::iterator
end = _out_pipes.end ();
2084 out_pipes_t::iterator
it;
2085 for (
it = _out_pipes.begin ();
it !=
end; ++
it)
2086 if (
it->second.pipe == pipe_)
2091 it->second.active =
true;
2097 _connect_routing_id.clear ();
2103 return !_connect_routing_id.empty ();
2112 _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (
ZMQ_MOVE (routing_id_), outpipe)
2119 return 0 != _out_pipes.count (routing_id_);
2126 out_pipes_t::iterator
it = _out_pipes.find (routing_id_);
2127 return it == _out_pipes.end () ?
NULL : &
it->second;
2134 const out_pipes_t::const_iterator
it = _out_pipes.find (routing_id_);
2135 return it == _out_pipes.end () ?
NULL : &
it->second;
2140 const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
2147 const out_pipes_t::iterator
it = _out_pipes.find (routing_id_);
2149 if (
it != _out_pipes.end ()) {
2151 _out_pipes.erase (
it);