5 #ifndef ZMQ_HAVE_WINDOWS
25 #include <vmci_sockets.h>
33 #include <gnutls/gnutls.h>
36 #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
37 #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
41 if (max_requested_ >= zmq::poller_t::max_fds ()
42 && zmq::poller_t::max_fds () != -1)
44 max_requested_ = zmq::poller_t::max_fds () - 1;
46 return max_requested_;
49 zmq::ctx_t::ctx_t () :
77 gnutls_global_init ();
81 bool zmq::ctx_t::check_tag ()
const
93 const io_threads_t::size_type io_threads_size = _io_threads.size ();
94 for (io_threads_t::size_type
i = 0;
i != io_threads_size;
i++) {
95 _io_threads[
i]->stop ();
99 for (io_threads_t::size_type
i = 0;
i != io_threads_size;
i++) {
116 #ifdef ZMQ_USE_GNUTLS
117 gnutls_global_deinit ();
124 bool zmq::ctx_t::valid ()
const
126 return _term_mailbox.valid ();
129 int zmq::ctx_t::terminate ()
133 const bool save_terminating = _terminating;
134 _terminating =
false;
137 pending_connections_t copy = _pending_connections;
138 for (pending_connections_t::iterator
p = copy.begin (),
end = copy.end ();
143 s->bind (
p->first.c_str ());
146 _terminating = save_terminating;
150 if (_pid != getpid ()) {
153 for (sockets_t::size_type
i = 0,
size = _sockets.size ();
i !=
size;
155 _sockets[
i]->get_mailbox ()->forked ();
157 _term_mailbox.forked ();
163 const bool restarted = _terminating;
171 for (sockets_t::size_type
i = 0,
size = _sockets.size ();
i !=
size;
173 _sockets[
i]->stop ();
175 if (_sockets.empty ())
178 _slot_sync.unlock ();
182 const int rc = _term_mailbox.recv (&cmd, -1);
190 _slot_sync.unlock ();
195 VMCISock_ReleaseAFValueFd (_vmci_fd);
199 _vmci_sync.unlock ();
208 int zmq::ctx_t::shutdown ()
210 scoped_lock_t locker (_slot_sync);
219 for (sockets_t::size_type
i = 0,
size = _sockets.size ();
i !=
size;
221 _sockets[
i]->stop ();
223 if (_sockets.empty ())
231 int zmq::ctx_t::set (
int option_,
const void *optval_,
size_t optvallen_)
233 const bool is_int = (optvallen_ ==
sizeof (int));
236 memcpy (&
value, optval_,
sizeof (
int));
241 scoped_lock_t locker (_opt_sync);
242 _max_sockets =
value;
248 if (is_int &&
value >= 0) {
249 scoped_lock_t locker (_opt_sync);
250 _io_thread_count =
value;
256 if (is_int &&
value >= 0) {
257 scoped_lock_t locker (_opt_sync);
258 _ipv6 = (
value != 0);
264 if (is_int &&
value >= 0) {
265 scoped_lock_t locker (_opt_sync);
266 _blocky = (
value != 0);
272 if (is_int &&
value >= 0) {
273 scoped_lock_t locker (_opt_sync);
274 _max_msgsz =
value < INT_MAX ?
value : INT_MAX;
280 if (is_int &&
value >= 0) {
281 scoped_lock_t locker (_opt_sync);
282 _zero_copy = (
value != 0);
296 int zmq::ctx_t::get (
int option_,
void *optval_,
const size_t *optvallen_)
298 const bool is_int = (*optvallen_ ==
sizeof (int));
299 int *
value =
static_cast<int *
> (optval_);
304 scoped_lock_t locker (_opt_sync);
305 *
value = _max_sockets;
319 scoped_lock_t locker (_opt_sync);
320 *
value = _io_thread_count;
327 scoped_lock_t locker (_opt_sync);
335 scoped_lock_t locker (_opt_sync);
343 scoped_lock_t locker (_opt_sync);
351 scoped_lock_t locker (_opt_sync);
359 scoped_lock_t locker (_opt_sync);
366 return thread_ctx_t::get (option_, optval_, optvallen_);
374 int zmq::ctx_t::get (
int option_)
377 size_t optvallen =
sizeof (int);
379 if (get (option_, &optval, &optvallen) == 0)
391 const int term_and_reaper_threads_count = 2;
392 const int mazmq = _max_sockets;
393 const int ios = _io_thread_count;
395 const int slot_count = mazmq + ios + term_and_reaper_threads_count;
397 _slots.reserve (slot_count);
398 _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
400 catch (
const std::bad_alloc &) {
404 _slots.resize (term_and_reaper_threads_count);
407 _slots[term_tid] = &_term_mailbox;
410 _reaper =
new (std::nothrow) reaper_t (
this, reaper_tid);
413 goto fail_cleanup_slots;
415 if (!_reaper->get_mailbox ()->valid ())
416 goto fail_cleanup_reaper;
417 _slots[reaper_tid] = _reaper->get_mailbox ();
421 _slots.resize (slot_count,
NULL);
423 for (
int i = term_and_reaper_threads_count;
424 i != ios + term_and_reaper_threads_count;
i++) {
425 io_thread_t *io_thread =
new (std::nothrow) io_thread_t (
this,
i);
428 goto fail_cleanup_reaper;
430 if (!io_thread->get_mailbox ()->valid ()) {
432 goto fail_cleanup_reaper;
434 _io_threads.push_back (io_thread);
435 _slots[
i] = io_thread->get_mailbox ();
440 for (int32_t
i =
static_cast<int32_t
> (_slots.size ()) - 1;
441 i >=
static_cast<int32_t
> (ios) + term_and_reaper_threads_count;
i--) {
442 _empty_slots.push_back (
i);
460 scoped_lock_t locker (_slot_sync);
475 if (_empty_slots.empty ()) {
481 const uint32_t slot = _empty_slots.back ();
482 _empty_slots.pop_back ();
485 const int sid = (
static_cast<int> (max_socket_id.add (1))) + 1;
488 socket_base_t *
s = socket_base_t::create (type_,
this, slot, sid);
490 _empty_slots.push_back (slot);
493 _sockets.push_back (s);
494 _slots[slot] =
s->get_mailbox ();
499 void zmq::ctx_t::destroy_socket (
class socket_base_t *socket_)
501 scoped_lock_t locker (_slot_sync);
504 const uint32_t tid = socket_->get_tid ();
505 _empty_slots.push_back (tid);
509 _sockets.erase (socket_);
513 if (_terminating && _sockets.empty ())
531 const char *
name_)
const
534 _thread_affinity_cpus);
536 char namebuf[16] =
"";
537 snprintf (namebuf,
sizeof (namebuf),
"%s%sZMQbg%s%s",
538 _thread_name_prefix.empty () ?
"" : _thread_name_prefix.c_str (),
539 _thread_name_prefix.empty () ?
"" :
"/",
name_ ?
"/" :
"",
541 thread_.
start (tfn_, arg_, namebuf);
546 const bool is_int = (optvallen_ ==
sizeof (int));
549 memcpy (&
value, optval_,
sizeof (
int));
553 if (is_int &&
value >= 0) {
555 _thread_sched_policy =
value;
561 if (is_int &&
value >= 0) {
563 _thread_affinity_cpus.insert (
value);
569 if (is_int &&
value >= 0) {
571 if (0 == _thread_affinity_cpus.erase (
value)) {
580 if (is_int &&
value >= 0) {
582 _thread_priority =
value;
590 std::ostringstream
s;
593 _thread_name_prefix =
s.str ();
595 }
else if (optvallen_ > 0 && optvallen_ <= 16) {
597 _thread_name_prefix.assign (
static_cast<const char *
> (optval_),
610 const size_t *optvallen_)
612 const bool is_int = (*optvallen_ ==
sizeof (int));
613 int *
value =
static_cast<int *
> (optval_);
619 *
value = _thread_sched_policy;
627 *
value = atoi (_thread_name_prefix.c_str ());
629 }
else if (*optvallen_ >= _thread_name_prefix.size ()) {
631 memcpy (optval_, _thread_name_prefix.data (),
632 _thread_name_prefix.size ());
644 _slots[tid_]->send (command_);
647 zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
649 if (_io_threads.empty ())
654 io_thread_t *selected_io_thread =
NULL;
655 for (io_threads_t::size_type
i = 0,
size = _io_threads.size ();
i !=
size;
657 if (!affinity_ || (affinity_ & (uint64_t (1) <<
i))) {
658 const int load = _io_threads[
i]->get_load ();
659 if (selected_io_thread ==
NULL || load < min_load) {
661 selected_io_thread = _io_threads[
i];
665 return selected_io_thread;
668 int zmq::ctx_t::register_endpoint (
const char *addr_,
669 const endpoint_t &endpoint_)
671 scoped_lock_t locker (_endpoints_sync);
673 const bool inserted =
674 _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (
std::string (addr_), endpoint_)
683 int zmq::ctx_t::unregister_endpoint (
const std::string &addr_,
684 const socket_base_t *
const socket_)
686 scoped_lock_t locker (_endpoints_sync);
688 const endpoints_t::iterator
it = _endpoints.find (addr_);
689 if (
it == _endpoints.end () ||
it->second.socket != socket_) {
695 _endpoints.erase (
it);
700 void zmq::ctx_t::unregister_endpoints (
const socket_base_t *
const socket_)
702 scoped_lock_t locker (_endpoints_sync);
704 for (endpoints_t::iterator
it = _endpoints.begin (),
705 end = _endpoints.end ();
707 if (
it->second.socket == socket_)
708 #if __cplusplus >= 201103L || (defined _MSC_VER && _MSC_VER >= 1700)
709 it = _endpoints.erase (
it);
711 _endpoints.erase (
it++);
720 scoped_lock_t locker (_endpoints_sync);
722 endpoints_t::iterator
it = _endpoints.find (addr_);
723 if (
it == _endpoints.end ()) {
725 endpoint_t empty = {
NULL, options_t ()};
728 endpoint_t endpoint =
it->second;
734 endpoint.socket->inc_seqnum ();
739 void zmq::ctx_t::pend_connection (
const std::string &addr_,
740 const endpoint_t &endpoint_,
743 scoped_lock_t locker (_endpoints_sync);
745 const pending_connection_t pending_connection = {endpoint_, pipes_[0],
748 const endpoints_t::iterator
it = _endpoints.find (addr_);
749 if (
it == _endpoints.end ()) {
751 endpoint_.socket->inc_seqnum ();
752 _pending_connections.ZMQ_MAP_INSERT_OR_EMPLACE (addr_,
756 connect_inproc_sockets (
it->second.socket,
it->second.options,
757 pending_connection, connect_side);
761 void zmq::ctx_t::connect_pending (
const char *addr_,
764 scoped_lock_t locker (_endpoints_sync);
766 const std::pair<pending_connections_t::iterator,
767 pending_connections_t::iterator>
768 pending = _pending_connections.equal_range (addr_);
769 for (pending_connections_t::iterator
p = pending.first;
p != pending.second;
771 connect_inproc_sockets (bind_socket_, _endpoints[addr_].
options,
772 p->second, bind_side);
774 _pending_connections.erase (pending.first, pending.second);
777 void zmq::ctx_t::connect_inproc_sockets (
779 const options_t &bind_options_,
780 const pending_connection_t &pending_connection_,
784 pending_connection_.bind_pipe->set_tid (bind_socket_->
get_tid ());
786 if (!bind_options_.recv_routing_id) {
788 const bool ok = pending_connection_.bind_pipe->read (&msg);
790 const int rc = msg.close ();
795 pending_connection_.connect_pipe->set_hwms_boost (bind_options_.sndhwm,
796 bind_options_.rcvhwm);
797 pending_connection_.bind_pipe->set_hwms_boost (
798 pending_connection_.endpoint.options.sndhwm,
799 pending_connection_.endpoint.options.rcvhwm);
801 pending_connection_.connect_pipe->set_hwms (
802 pending_connection_.endpoint.options.rcvhwm,
803 pending_connection_.endpoint.options.sndhwm);
804 pending_connection_.bind_pipe->set_hwms (bind_options_.rcvhwm,
805 bind_options_.sndhwm);
807 pending_connection_.connect_pipe->set_hwms (-1, -1);
808 pending_connection_.bind_pipe->set_hwms (-1, -1);
811 #ifdef ZMQ_BUILD_DRAFT_API
812 if (bind_options_.can_recv_disconnect_msg
813 && !bind_options_.disconnect_msg.empty ())
814 pending_connection_.connect_pipe->set_disconnect_msg (
815 bind_options_.disconnect_msg);
818 if (side_ == bind_side) {
821 cmd.args.bind.pipe = pending_connection_.bind_pipe;
824 pending_connection_.endpoint.socket);
826 pending_connection_.connect_pipe->send_bind (
827 bind_socket_, pending_connection_.bind_pipe,
false);
834 if (pending_connection_.endpoint.options.recv_routing_id
835 && pending_connection_.endpoint.socket->check_tag ()) {
839 #ifdef ZMQ_BUILD_DRAFT_API
841 if (bind_options_.can_send_hello_msg
842 && bind_options_.hello_msg.size () > 0) {
850 int zmq::ctx_t::get_vmci_socket_family ()
854 if (_vmci_fd == -1) {
855 _vmci_family = VMCISock_GetAFValueFd (&_vmci_fd);
857 if (_vmci_fd != -1) {
859 int rc = fcntl (_vmci_fd, F_SETFD, FD_CLOEXEC);