ctx.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #ifndef ZMQ_HAVE_WINDOWS
6 #include <unistd.h>
7 #endif
8 
9 #include <limits>
10 #include <climits>
11 #include <new>
12 #include <sstream>
13 #include <string.h>
14 
15 #include "ctx.hpp"
16 #include "socket_base.hpp"
17 #include "io_thread.hpp"
18 #include "reaper.hpp"
19 #include "pipe.hpp"
20 #include "err.hpp"
21 #include "msg.hpp"
22 #include "random.hpp"
23 
24 #ifdef ZMQ_HAVE_VMCI
25 #include <vmci_sockets.h>
26 #endif
27 
28 #ifdef ZMQ_USE_NSS
29 #include <nss.h>
30 #endif
31 
32 #ifdef ZMQ_USE_GNUTLS
33 #include <gnutls/gnutls.h>
34 #endif
35 
36 #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
37 #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
38 
39 static int clipped_maxsocket (int max_requested_)
40 {
41  if (max_requested_ >= zmq::poller_t::max_fds ()
42  && zmq::poller_t::max_fds () != -1)
43  // -1 because we need room for the reaper mailbox.
44  max_requested_ = zmq::poller_t::max_fds () - 1;
45 
46  return max_requested_;
47 }
48 
49 zmq::ctx_t::ctx_t () :
51  _starting (true),
52  _terminating (false),
53  _reaper (NULL),
54  _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
55  _max_msgsz (INT_MAX),
56  _io_thread_count (ZMQ_IO_THREADS_DFLT),
57  _blocky (true),
58  _ipv6 (false),
59  _zero_copy (true)
60 {
61 #ifdef HAVE_FORK
62  _pid = getpid ();
63 #endif
64 #ifdef ZMQ_HAVE_VMCI
65  _vmci_fd = -1;
66  _vmci_family = -1;
67 #endif
68 
69  // Initialise crypto library, if needed.
71 
72 #ifdef ZMQ_USE_NSS
73  NSS_NoDB_Init (NULL);
74 #endif
75 
76 #ifdef ZMQ_USE_GNUTLS
77  gnutls_global_init ();
78 #endif
79 }
80 
81 bool zmq::ctx_t::check_tag () const
82 {
83  return _tag == ZMQ_CTX_TAG_VALUE_GOOD;
84 }
85 
86 zmq::ctx_t::~ctx_t ()
87 {
88  // Check that there are no remaining _sockets.
89  zmq_assert (_sockets.empty ());
90 
91  // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
92  // thread subsequent invocation of destructor would hang-up.
93  const io_threads_t::size_type io_threads_size = _io_threads.size ();
94  for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
95  _io_threads[i]->stop ();
96  }
97 
98  // Wait till I/O threads actually terminate.
99  for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
100  LIBZMQ_DELETE (_io_threads[i]);
101  }
102 
103  // Deallocate the reaper thread object.
104  LIBZMQ_DELETE (_reaper);
105 
106  // The mailboxes in _slots themselves were deallocated with their
107  // corresponding io_thread/socket objects.
108 
109  // De-initialise crypto library, if needed.
111 
112 #ifdef ZMQ_USE_NSS
113  NSS_Shutdown ();
114 #endif
115 
116 #ifdef ZMQ_USE_GNUTLS
117  gnutls_global_deinit ();
118 #endif
119 
120  // Remove the tag, so that the object is considered dead.
121  _tag = ZMQ_CTX_TAG_VALUE_BAD;
122 }
123 
124 bool zmq::ctx_t::valid () const
125 {
126  return _term_mailbox.valid ();
127 }
128 
129 int zmq::ctx_t::terminate ()
130 {
131  _slot_sync.lock ();
132 
133  const bool save_terminating = _terminating;
134  _terminating = false;
135 
136  // Connect up any pending inproc connections, otherwise we will hang
137  pending_connections_t copy = _pending_connections;
138  for (pending_connections_t::iterator p = copy.begin (), end = copy.end ();
139  p != end; ++p) {
140  zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
141  // create_socket might fail eg: out of memory/sockets limit reached
142  zmq_assert (s);
143  s->bind (p->first.c_str ());
144  s->close ();
145  }
146  _terminating = save_terminating;
147 
148  if (!_starting) {
149 #ifdef HAVE_FORK
150  if (_pid != getpid ()) {
151  // we are a forked child process. Close all file descriptors
152  // inherited from the parent.
153  for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
154  i++) {
155  _sockets[i]->get_mailbox ()->forked ();
156  }
157  _term_mailbox.forked ();
158  }
159 #endif
160 
161  // Check whether termination was already underway, but interrupted and now
162  // restarted.
163  const bool restarted = _terminating;
164  _terminating = true;
165 
166  // First attempt to terminate the context.
167  if (!restarted) {
168  // First send stop command to sockets so that any blocking calls
169  // can be interrupted. If there are no sockets we can ask reaper
170  // thread to stop.
171  for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
172  i++) {
173  _sockets[i]->stop ();
174  }
175  if (_sockets.empty ())
176  _reaper->stop ();
177  }
178  _slot_sync.unlock ();
179 
180  // Wait till reaper thread closes all the sockets.
181  command_t cmd;
182  const int rc = _term_mailbox.recv (&cmd, -1);
183  if (rc == -1 && errno == EINTR)
184  return -1;
185  errno_assert (rc == 0);
186  zmq_assert (cmd.type == command_t::done);
187  _slot_sync.lock ();
188  zmq_assert (_sockets.empty ());
189  }
190  _slot_sync.unlock ();
191 
192 #ifdef ZMQ_HAVE_VMCI
193  _vmci_sync.lock ();
194 
195  VMCISock_ReleaseAFValueFd (_vmci_fd);
196  _vmci_family = -1;
197  _vmci_fd = -1;
198 
199  _vmci_sync.unlock ();
200 #endif
201 
202  // Deallocate the resources.
203  delete this;
204 
205  return 0;
206 }
207 
208 int zmq::ctx_t::shutdown ()
209 {
210  scoped_lock_t locker (_slot_sync);
211 
212  if (!_terminating) {
213  _terminating = true;
214 
215  if (!_starting) {
216  // Send stop command to sockets so that any blocking calls
217  // can be interrupted. If there are no sockets we can ask reaper
218  // thread to stop.
219  for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
220  i++) {
221  _sockets[i]->stop ();
222  }
223  if (_sockets.empty ())
224  _reaper->stop ();
225  }
226  }
227 
228  return 0;
229 }
230 
231 int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
232 {
233  const bool is_int = (optvallen_ == sizeof (int));
234  int value = 0;
235  if (is_int)
236  memcpy (&value, optval_, sizeof (int));
237 
238  switch (option_) {
239  case ZMQ_MAX_SOCKETS:
240  if (is_int && value >= 1 && value == clipped_maxsocket (value)) {
241  scoped_lock_t locker (_opt_sync);
242  _max_sockets = value;
243  return 0;
244  }
245  break;
246 
247  case ZMQ_IO_THREADS:
248  if (is_int && value >= 0) {
249  scoped_lock_t locker (_opt_sync);
250  _io_thread_count = value;
251  return 0;
252  }
253  break;
254 
255  case ZMQ_IPV6:
256  if (is_int && value >= 0) {
257  scoped_lock_t locker (_opt_sync);
258  _ipv6 = (value != 0);
259  return 0;
260  }
261  break;
262 
263  case ZMQ_BLOCKY:
264  if (is_int && value >= 0) {
265  scoped_lock_t locker (_opt_sync);
266  _blocky = (value != 0);
267  return 0;
268  }
269  break;
270 
271  case ZMQ_MAX_MSGSZ:
272  if (is_int && value >= 0) {
273  scoped_lock_t locker (_opt_sync);
274  _max_msgsz = value < INT_MAX ? value : INT_MAX;
275  return 0;
276  }
277  break;
278 
279  case ZMQ_ZERO_COPY_RECV:
280  if (is_int && value >= 0) {
281  scoped_lock_t locker (_opt_sync);
282  _zero_copy = (value != 0);
283  return 0;
284  }
285  break;
286 
287  default: {
288  return thread_ctx_t::set (option_, optval_, optvallen_);
289  }
290  }
291 
292  errno = EINVAL;
293  return -1;
294 }
295 
296 int zmq::ctx_t::get (int option_, void *optval_, const size_t *optvallen_)
297 {
298  const bool is_int = (*optvallen_ == sizeof (int));
299  int *value = static_cast<int *> (optval_);
300 
301  switch (option_) {
302  case ZMQ_MAX_SOCKETS:
303  if (is_int) {
304  scoped_lock_t locker (_opt_sync);
305  *value = _max_sockets;
306  return 0;
307  }
308  break;
309 
310  case ZMQ_SOCKET_LIMIT:
311  if (is_int) {
312  *value = clipped_maxsocket (65535);
313  return 0;
314  }
315  break;
316 
317  case ZMQ_IO_THREADS:
318  if (is_int) {
319  scoped_lock_t locker (_opt_sync);
320  *value = _io_thread_count;
321  return 0;
322  }
323  break;
324 
325  case ZMQ_IPV6:
326  if (is_int) {
327  scoped_lock_t locker (_opt_sync);
328  *value = _ipv6;
329  return 0;
330  }
331  break;
332 
333  case ZMQ_BLOCKY:
334  if (is_int) {
335  scoped_lock_t locker (_opt_sync);
336  *value = _blocky;
337  return 0;
338  }
339  break;
340 
341  case ZMQ_MAX_MSGSZ:
342  if (is_int) {
343  scoped_lock_t locker (_opt_sync);
344  *value = _max_msgsz;
345  return 0;
346  }
347  break;
348 
349  case ZMQ_MSG_T_SIZE:
350  if (is_int) {
351  scoped_lock_t locker (_opt_sync);
352  *value = sizeof (zmq_msg_t);
353  return 0;
354  }
355  break;
356 
357  case ZMQ_ZERO_COPY_RECV:
358  if (is_int) {
359  scoped_lock_t locker (_opt_sync);
360  *value = _zero_copy;
361  return 0;
362  }
363  break;
364 
365  default: {
366  return thread_ctx_t::get (option_, optval_, optvallen_);
367  }
368  }
369 
370  errno = EINVAL;
371  return -1;
372 }
373 
374 int zmq::ctx_t::get (int option_)
375 {
376  int optval = 0;
377  size_t optvallen = sizeof (int);
378 
379  if (get (option_, &optval, &optvallen) == 0)
380  return optval;
381 
382  errno = EINVAL;
383  return -1;
384 }
385 
386 bool zmq::ctx_t::start ()
387 {
388  // Initialise the array of mailboxes. Additional two slots are for
389  // zmq_ctx_term thread and reaper thread.
390  _opt_sync.lock ();
391  const int term_and_reaper_threads_count = 2;
392  const int mazmq = _max_sockets;
393  const int ios = _io_thread_count;
394  _opt_sync.unlock ();
395  const int slot_count = mazmq + ios + term_and_reaper_threads_count;
396  try {
397  _slots.reserve (slot_count);
398  _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
399  }
400  catch (const std::bad_alloc &) {
401  errno = ENOMEM;
402  return false;
403  }
404  _slots.resize (term_and_reaper_threads_count);
405 
406  // Initialise the infrastructure for zmq_ctx_term thread.
407  _slots[term_tid] = &_term_mailbox;
408 
409  // Create the reaper thread.
410  _reaper = new (std::nothrow) reaper_t (this, reaper_tid);
411  if (!_reaper) {
412  errno = ENOMEM;
413  goto fail_cleanup_slots;
414  }
415  if (!_reaper->get_mailbox ()->valid ())
416  goto fail_cleanup_reaper;
417  _slots[reaper_tid] = _reaper->get_mailbox ();
418  _reaper->start ();
419 
420  // Create I/O thread objects and launch them.
421  _slots.resize (slot_count, NULL);
422 
423  for (int i = term_and_reaper_threads_count;
424  i != ios + term_and_reaper_threads_count; i++) {
425  io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
426  if (!io_thread) {
427  errno = ENOMEM;
428  goto fail_cleanup_reaper;
429  }
430  if (!io_thread->get_mailbox ()->valid ()) {
431  delete io_thread;
432  goto fail_cleanup_reaper;
433  }
434  _io_threads.push_back (io_thread);
435  _slots[i] = io_thread->get_mailbox ();
436  io_thread->start ();
437  }
438 
439  // In the unused part of the slot array, create a list of empty slots.
440  for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
441  i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
442  _empty_slots.push_back (i);
443  }
444 
445  _starting = false;
446  return true;
447 
448 fail_cleanup_reaper:
449  _reaper->stop ();
450  delete _reaper;
451  _reaper = NULL;
452 
453 fail_cleanup_slots:
454  _slots.clear ();
455  return false;
456 }
457 
458 zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
459 {
460  scoped_lock_t locker (_slot_sync);
461 
462  // Once zmq_ctx_term() or zmq_ctx_shutdown() was called, we can't create
463  // new sockets.
464  if (_terminating) {
465  errno = ETERM;
466  return NULL;
467  }
468 
469  if (unlikely (_starting)) {
470  if (!start ())
471  return NULL;
472  }
473 
474  // If max_sockets limit was reached, return error.
475  if (_empty_slots.empty ()) {
476  errno = EMFILE;
477  return NULL;
478  }
479 
480  // Choose a slot for the socket.
481  const uint32_t slot = _empty_slots.back ();
482  _empty_slots.pop_back ();
483 
484  // Generate new unique socket ID.
485  const int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
486 
487  // Create the socket and register its mailbox.
488  socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
489  if (!s) {
490  _empty_slots.push_back (slot);
491  return NULL;
492  }
493  _sockets.push_back (s);
494  _slots[slot] = s->get_mailbox ();
495 
496  return s;
497 }
498 
499 void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
500 {
501  scoped_lock_t locker (_slot_sync);
502 
503  // Free the associated thread slot.
504  const uint32_t tid = socket_->get_tid ();
505  _empty_slots.push_back (tid);
506  _slots[tid] = NULL;
507 
508  // Remove the socket from the list of sockets.
509  _sockets.erase (socket_);
510 
511  // If zmq_ctx_term() was already called and there are no more socket
512  // we can ask reaper thread to terminate.
513  if (_terminating && _sockets.empty ())
514  _reaper->stop ();
515 }
516 
517 zmq::object_t *zmq::ctx_t::get_reaper () const
518 {
519  return _reaper;
520 }
521 
523  _thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
524  _thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
525 {
526 }
527 
529  thread_fn *tfn_,
530  void *arg_,
531  const char *name_) const
532 {
533  thread_.setSchedulingParameters (_thread_priority, _thread_sched_policy,
534  _thread_affinity_cpus);
535 
536  char namebuf[16] = "";
537  snprintf (namebuf, sizeof (namebuf), "%s%sZMQbg%s%s",
538  _thread_name_prefix.empty () ? "" : _thread_name_prefix.c_str (),
539  _thread_name_prefix.empty () ? "" : "/", name_ ? "/" : "",
540  name_ ? name_ : "");
541  thread_.start (tfn_, arg_, namebuf);
542 }
543 
544 int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_)
545 {
546  const bool is_int = (optvallen_ == sizeof (int));
547  int value = 0;
548  if (is_int)
549  memcpy (&value, optval_, sizeof (int));
550 
551  switch (option_) {
553  if (is_int && value >= 0) {
554  scoped_lock_t locker (_opt_sync);
555  _thread_sched_policy = value;
556  return 0;
557  }
558  break;
559 
561  if (is_int && value >= 0) {
562  scoped_lock_t locker (_opt_sync);
563  _thread_affinity_cpus.insert (value);
564  return 0;
565  }
566  break;
567 
569  if (is_int && value >= 0) {
570  scoped_lock_t locker (_opt_sync);
571  if (0 == _thread_affinity_cpus.erase (value)) {
572  errno = EINVAL;
573  return -1;
574  }
575  return 0;
576  }
577  break;
578 
579  case ZMQ_THREAD_PRIORITY:
580  if (is_int && value >= 0) {
581  scoped_lock_t locker (_opt_sync);
582  _thread_priority = value;
583  return 0;
584  }
585  break;
586 
588  // start_thread() allows max 16 chars for thread name
589  if (is_int) {
590  std::ostringstream s;
591  s << value;
592  scoped_lock_t locker (_opt_sync);
593  _thread_name_prefix = s.str ();
594  return 0;
595  } else if (optvallen_ > 0 && optvallen_ <= 16) {
596  scoped_lock_t locker (_opt_sync);
597  _thread_name_prefix.assign (static_cast<const char *> (optval_),
598  optvallen_);
599  return 0;
600  }
601  break;
602  }
603 
604  errno = EINVAL;
605  return -1;
606 }
607 
608 int zmq::thread_ctx_t::get (int option_,
609  void *optval_,
610  const size_t *optvallen_)
611 {
612  const bool is_int = (*optvallen_ == sizeof (int));
613  int *value = static_cast<int *> (optval_);
614 
615  switch (option_) {
617  if (is_int) {
618  scoped_lock_t locker (_opt_sync);
619  *value = _thread_sched_policy;
620  return 0;
621  }
622  break;
623 
625  if (is_int) {
626  scoped_lock_t locker (_opt_sync);
627  *value = atoi (_thread_name_prefix.c_str ());
628  return 0;
629  } else if (*optvallen_ >= _thread_name_prefix.size ()) {
630  scoped_lock_t locker (_opt_sync);
631  memcpy (optval_, _thread_name_prefix.data (),
632  _thread_name_prefix.size ());
633  return 0;
634  }
635  break;
636  }
637 
638  errno = EINVAL;
639  return -1;
640 }
641 
642 void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
643 {
644  _slots[tid_]->send (command_);
645 }
646 
647 zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
648 {
649  if (_io_threads.empty ())
650  return NULL;
651 
652  // Find the I/O thread with minimum load.
653  int min_load = -1;
654  io_thread_t *selected_io_thread = NULL;
655  for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size;
656  i++) {
657  if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
658  const int load = _io_threads[i]->get_load ();
659  if (selected_io_thread == NULL || load < min_load) {
660  min_load = load;
661  selected_io_thread = _io_threads[i];
662  }
663  }
664  }
665  return selected_io_thread;
666 }
667 
668 int zmq::ctx_t::register_endpoint (const char *addr_,
669  const endpoint_t &endpoint_)
670 {
671  scoped_lock_t locker (_endpoints_sync);
672 
673  const bool inserted =
674  _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), endpoint_)
675  .second;
676  if (!inserted) {
677  errno = EADDRINUSE;
678  return -1;
679  }
680  return 0;
681 }
682 
683 int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
684  const socket_base_t *const socket_)
685 {
686  scoped_lock_t locker (_endpoints_sync);
687 
688  const endpoints_t::iterator it = _endpoints.find (addr_);
689  if (it == _endpoints.end () || it->second.socket != socket_) {
690  errno = ENOENT;
691  return -1;
692  }
693 
694  // Remove endpoint.
695  _endpoints.erase (it);
696 
697  return 0;
698 }
699 
700 void zmq::ctx_t::unregister_endpoints (const socket_base_t *const socket_)
701 {
702  scoped_lock_t locker (_endpoints_sync);
703 
704  for (endpoints_t::iterator it = _endpoints.begin (),
705  end = _endpoints.end ();
706  it != end;) {
707  if (it->second.socket == socket_)
708 #if __cplusplus >= 201103L || (defined _MSC_VER && _MSC_VER >= 1700)
709  it = _endpoints.erase (it);
710 #else
711  _endpoints.erase (it++);
712 #endif
713  else
714  ++it;
715  }
716 }
717 
718 zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
719 {
720  scoped_lock_t locker (_endpoints_sync);
721 
722  endpoints_t::iterator it = _endpoints.find (addr_);
723  if (it == _endpoints.end ()) {
725  endpoint_t empty = {NULL, options_t ()};
726  return empty;
727  }
728  endpoint_t endpoint = it->second;
729 
730  // Increment the command sequence number of the peer so that it won't
731  // get deallocated until "bind" command is issued by the caller.
732  // The subsequent 'bind' has to be called with inc_seqnum parameter
733  // set to false, so that the seqnum isn't incremented twice.
734  endpoint.socket->inc_seqnum ();
735 
736  return endpoint;
737 }
738 
739 void zmq::ctx_t::pend_connection (const std::string &addr_,
740  const endpoint_t &endpoint_,
741  pipe_t **pipes_)
742 {
743  scoped_lock_t locker (_endpoints_sync);
744 
745  const pending_connection_t pending_connection = {endpoint_, pipes_[0],
746  pipes_[1]};
747 
748  const endpoints_t::iterator it = _endpoints.find (addr_);
749  if (it == _endpoints.end ()) {
750  // Still no bind.
751  endpoint_.socket->inc_seqnum ();
752  _pending_connections.ZMQ_MAP_INSERT_OR_EMPLACE (addr_,
753  pending_connection);
754  } else {
755  // Bind has happened in the mean time, connect directly
756  connect_inproc_sockets (it->second.socket, it->second.options,
757  pending_connection, connect_side);
758  }
759 }
760 
761 void zmq::ctx_t::connect_pending (const char *addr_,
762  zmq::socket_base_t *bind_socket_)
763 {
764  scoped_lock_t locker (_endpoints_sync);
765 
766  const std::pair<pending_connections_t::iterator,
767  pending_connections_t::iterator>
768  pending = _pending_connections.equal_range (addr_);
769  for (pending_connections_t::iterator p = pending.first; p != pending.second;
770  ++p)
771  connect_inproc_sockets (bind_socket_, _endpoints[addr_].options,
772  p->second, bind_side);
773 
774  _pending_connections.erase (pending.first, pending.second);
775 }
776 
777 void zmq::ctx_t::connect_inproc_sockets (
778  zmq::socket_base_t *bind_socket_,
779  const options_t &bind_options_,
780  const pending_connection_t &pending_connection_,
781  side side_)
782 {
783  bind_socket_->inc_seqnum ();
784  pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
785 
786  if (!bind_options_.recv_routing_id) {
787  msg_t msg;
788  const bool ok = pending_connection_.bind_pipe->read (&msg);
789  zmq_assert (ok);
790  const int rc = msg.close ();
791  errno_assert (rc == 0);
792  }
793 
794  if (!get_effective_conflate_option (pending_connection_.endpoint.options)) {
795  pending_connection_.connect_pipe->set_hwms_boost (bind_options_.sndhwm,
796  bind_options_.rcvhwm);
797  pending_connection_.bind_pipe->set_hwms_boost (
798  pending_connection_.endpoint.options.sndhwm,
799  pending_connection_.endpoint.options.rcvhwm);
800 
801  pending_connection_.connect_pipe->set_hwms (
802  pending_connection_.endpoint.options.rcvhwm,
803  pending_connection_.endpoint.options.sndhwm);
804  pending_connection_.bind_pipe->set_hwms (bind_options_.rcvhwm,
805  bind_options_.sndhwm);
806  } else {
807  pending_connection_.connect_pipe->set_hwms (-1, -1);
808  pending_connection_.bind_pipe->set_hwms (-1, -1);
809  }
810 
811 #ifdef ZMQ_BUILD_DRAFT_API
812  if (bind_options_.can_recv_disconnect_msg
813  && !bind_options_.disconnect_msg.empty ())
814  pending_connection_.connect_pipe->set_disconnect_msg (
815  bind_options_.disconnect_msg);
816 #endif
817 
818  if (side_ == bind_side) {
819  command_t cmd;
820  cmd.type = command_t::bind;
821  cmd.args.bind.pipe = pending_connection_.bind_pipe;
822  bind_socket_->process_command (cmd);
823  bind_socket_->send_inproc_connected (
824  pending_connection_.endpoint.socket);
825  } else
826  pending_connection_.connect_pipe->send_bind (
827  bind_socket_, pending_connection_.bind_pipe, false);
828 
829  // When a ctx is terminated all pending inproc connection will be
830  // connected, but the socket will already be closed and the pipe will be
831  // in waiting_for_delimiter state, which means no more writes can be done
832  // and the routing id write fails and causes an assert. Check if the socket
833  // is open before sending.
834  if (pending_connection_.endpoint.options.recv_routing_id
835  && pending_connection_.endpoint.socket->check_tag ()) {
836  send_routing_id (pending_connection_.bind_pipe, bind_options_);
837  }
838 
839 #ifdef ZMQ_BUILD_DRAFT_API
840  // If set, send the hello msg of the bind socket to the pending connection.
841  if (bind_options_.can_send_hello_msg
842  && bind_options_.hello_msg.size () > 0) {
843  send_hello_msg (pending_connection_.bind_pipe, bind_options_);
844  }
845 #endif
846 }
847 
848 #ifdef ZMQ_HAVE_VMCI
849 
850 int zmq::ctx_t::get_vmci_socket_family ()
851 {
852  zmq::scoped_lock_t locker (_vmci_sync);
853 
854  if (_vmci_fd == -1) {
855  _vmci_family = VMCISock_GetAFValueFd (&_vmci_fd);
856 
857  if (_vmci_fd != -1) {
858 #ifdef FD_CLOEXEC
859  int rc = fcntl (_vmci_fd, F_SETFD, FD_CLOEXEC);
860  errno_assert (rc != -1);
861 #endif
862  }
863  }
864 
865  return _vmci_family;
866 }
867 
868 #endif
869 
870 // The last used socket ID, or 0 if no socket was used so far. Note that this
871 // is a global variable. Thus, even sockets created in different contexts have
872 // unique IDs.
873 zmq::atomic_counter_t zmq::ctx_t::max_socket_id;
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
ZMQ_MAX_SOCKETS_DFLT
#define ZMQ_MAX_SOCKETS_DFLT
Definition: zmq.h:194
zmq::random_open
void random_open()
Definition: random.cpp:59
end
GLuint GLuint end
Definition: glcorearb.h:2858
ZMQ_MAX_SOCKETS
#define ZMQ_MAX_SOCKETS
Definition: zmq.h:182
ZMQ_IO_THREADS_DFLT
#define ZMQ_IO_THREADS_DFLT
Definition: zmq.h:193
ZMQ_MAX_MSGSZ
#define ZMQ_MAX_MSGSZ
Definition: zmq.h:186
NULL
NULL
Definition: test_security_zap.cpp:405
ZMQ_THREAD_AFFINITY_CPU_REMOVE
#define ZMQ_THREAD_AFFINITY_CPU_REMOVE
Definition: zmq.h:189
EINTR
#define EINTR
Definition: errno.hpp:7
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
EINVAL
#define EINVAL
Definition: errno.hpp:25
zmq::thread_ctx_t::start_thread
void start_thread(thread_t &thread_, thread_fn *tfn_, void *arg_, const char *name_=NULL) const
Definition: ctx.cpp:528
s
XmlRpcServer s
zmq::object_t::process_command
void process_command(const zmq::command_t &cmd_)
Definition: object.cpp:43
ZMQ_SOCKET_LIMIT
#define ZMQ_SOCKET_LIMIT
Definition: zmq.h:183
zmq::thread_fn
void() thread_fn(void *)
Definition: thread.hpp:17
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
random.hpp
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
zmq::socket_base_t
Definition: socket_base.hpp:31
ECONNREFUSED
#define ECONNREFUSED
Definition: zmq.h:122
zmq::thread_t::start
void start(thread_fn *tfn_, void *arg_, const char *name_)
Definition: thread.cpp:234
zmq::command_t::bind
@ bind
Definition: command.hpp:32
ZMQ_THREAD_SCHED_POLICY
#define ZMQ_THREAD_SCHED_POLICY
Definition: zmq.h:185
ok
ROSCPP_DECL bool ok()
ctx.hpp
ZMQ_ZERO_COPY_RECV
#define ZMQ_ZERO_COPY_RECV
Definition: zmq_draft.h:71
zmq::random_close
void random_close()
Definition: random.cpp:64
ZMQ_THREAD_SCHED_POLICY_DFLT
#define ZMQ_THREAD_SCHED_POLICY_DFLT
Definition: zmq.h:196
snprintf
int snprintf(char *str, size_t size, const char *format,...)
Definition: port.cc:64
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
zmq_msg_t
struct zmq_msg_t zmq_msg_t
ZMQ_CTX_TAG_VALUE_GOOD
#define ZMQ_CTX_TAG_VALUE_GOOD
Definition: ctx.cpp:36
start
GLuint start
Definition: glcorearb.h:2858
zmq::atomic_counter_t
Definition: atomic_counter.hpp:61
clipped_maxsocket
static int clipped_maxsocket(int max_requested_)
Definition: ctx.cpp:39
ETERM
#define ETERM
Definition: zmq.h:161
p
const char * p
Definition: gmock-matchers_test.cc:3863
pipe.hpp
reaper.hpp
zmq::send_hello_msg
void send_hello_msg(pipe_t *pipe_, const options_t &options_)
Definition: pipe.cpp:64
zmq::thread_t
Definition: thread.hpp:26
name_
string name_
Definition: googletest.cc:182
ZMQ_THREAD_PRIORITY
#define ZMQ_THREAD_PRIORITY
Definition: zmq.h:184
send_command
void send_command(fd_t s_, char(&command_)[N])
Definition: test_security_curve.cpp:271
zmq::endpoint_t
Definition: ctx.hpp:31
zmq::get_effective_conflate_option
bool get_effective_conflate_option(const options_t &options)
Definition: options.hpp:303
zmq::object_t
Definition: object.hpp:28
i
int i
Definition: gmock-matchers_test.cc:764
msg.hpp
zmq::thread_ctx_t::thread_ctx_t
thread_ctx_t()
Definition: ctx.cpp:522
ZMQ_THREAD_AFFINITY_CPU_ADD
#define ZMQ_THREAD_AFFINITY_CPU_ADD
Definition: zmq.h:188
ZMQ_CTX_TAG_VALUE_BAD
#define ZMQ_CTX_TAG_VALUE_BAD
Definition: ctx.cpp:37
zmq::own_t::inc_seqnum
void inc_seqnum()
Definition: own.cpp:39
io_thread.hpp
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
zmq::thread_ctx_t::set
int set(int option_, const void *optval_, size_t optvallen_)
Definition: ctx.cpp:544
socket_base.hpp
size
GLsizeiptr size
Definition: glcorearb.h:2943
ZMQ_IPV6
#define ZMQ_IPV6
Definition: zmq.h:307
cpp.gmock_class.set
set
Definition: gmock_class.py:44
err.hpp
ZMQ_IO_THREADS
#define ZMQ_IO_THREADS
Definition: zmq.h:181
ZMQ_THREAD_NAME_PREFIX
#define ZMQ_THREAD_NAME_PREFIX
Definition: zmq.h:190
zmq::send_routing_id
void send_routing_id(pipe_t *pipe_, const options_t &options_)
Definition: pipe.cpp:52
true
#define true
Definition: cJSON.c:65
zmq::thread_ctx_t::get
int get(int option_, void *optval_, const size_t *optvallen_)
Definition: ctx.cpp:608
value
GLsizei const GLfloat * value
Definition: glcorearb.h:3093
ZMQ_MSG_T_SIZE
#define ZMQ_MSG_T_SIZE
Definition: zmq.h:187
false
#define false
Definition: cJSON.c:70
zmq::command_t
Definition: command.hpp:21
EADDRINUSE
#define EADDRINUSE
Definition: zmq.h:116
EMFILE
#define EMFILE
Definition: errno.hpp:27
zmq::object_t::send_inproc_connected
void send_inproc_connected(zmq::socket_base_t *socket_)
Definition: object.cpp:395
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
ZMQ_THREAD_PRIORITY_DFLT
#define ZMQ_THREAD_PRIORITY_DFLT
Definition: zmq.h:195
ZMQ_BLOCKY
#define ZMQ_BLOCKY
Definition: zmq.h:331
zmq::scoped_lock_t
Definition: mutex.hpp:143
unlikely
#define unlikely(x)
Definition: likely.hpp:11
zmq::object_t::get_tid
uint32_t get_tid() const
Definition: object.cpp:28
zmq::thread_t::setSchedulingParameters
void setSchedulingParameters(int priority_, int scheduling_policy_, const std::set< int > &affinity_cpus_)
Definition: thread.cpp:258


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:49