socket_base.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <new>
5 #include <string>
6 #include <algorithm>
7 #include <limits>
8 
9 #include "macros.hpp"
10 
11 #if defined ZMQ_HAVE_WINDOWS
12 #if defined _MSC_VER
13 #if defined _WIN32_WCE
14 #include <cmnintrin.h>
15 #else
16 #include <intrin.h>
17 #endif
18 #endif
19 #else
20 #include <unistd.h>
21 #include <ctype.h>
22 #endif
23 
24 #include "socket_base.hpp"
25 #include "tcp_listener.hpp"
26 #include "ws_listener.hpp"
27 #include "ipc_listener.hpp"
28 #include "tipc_listener.hpp"
29 #include "tcp_connecter.hpp"
30 #ifdef ZMQ_HAVE_WS
31 #include "ws_address.hpp"
32 #endif
33 #include "io_thread.hpp"
34 #include "session_base.hpp"
35 #include "config.hpp"
36 #include "pipe.hpp"
37 #include "err.hpp"
38 #include "ctx.hpp"
39 #include "likely.hpp"
40 #include "msg.hpp"
41 #include "address.hpp"
42 #include "ipc_address.hpp"
43 #include "tcp_address.hpp"
44 #include "udp_address.hpp"
45 #include "tipc_address.hpp"
46 #include "mailbox.hpp"
47 #include "mailbox_safe.hpp"
48 
49 #ifdef ZMQ_HAVE_WSS
50 #include "wss_address.hpp"
51 #endif
52 #if defined ZMQ_HAVE_VMCI
53 #include "vmci_address.hpp"
54 #include "vmci_listener.hpp"
55 #endif
56 
57 #ifdef ZMQ_HAVE_OPENPGM
58 #include "pgm_socket.hpp"
59 #endif
60 
61 #include "pair.hpp"
62 #include "pub.hpp"
63 #include "sub.hpp"
64 #include "req.hpp"
65 #include "rep.hpp"
66 #include "pull.hpp"
67 #include "push.hpp"
68 #include "dealer.hpp"
69 #include "router.hpp"
70 #include "xpub.hpp"
71 #include "xsub.hpp"
72 #include "stream.hpp"
73 #include "server.hpp"
74 #include "client.hpp"
75 #include "radio.hpp"
76 #include "dish.hpp"
77 #include "gather.hpp"
78 #include "scatter.hpp"
79 #include "dgram.hpp"
80 #include "peer.hpp"
81 #include "channel.hpp"
82 
83 void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
84  pipe_t *pipe_)
85 {
86  _inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), pipe_);
87 }
88 
90  const std::string &endpoint_uri_str_)
91 {
92  const std::pair<map_t::iterator, map_t::iterator> range =
93  _inprocs.equal_range (endpoint_uri_str_);
94  if (range.first == range.second) {
95  errno = ENOENT;
96  return -1;
97  }
98 
99  for (map_t::iterator it = range.first; it != range.second; ++it) {
100  it->second->send_disconnect_msg ();
101  it->second->terminate (true);
102  }
103  _inprocs.erase (range.first, range.second);
104  return 0;
105 }
106 
108 {
109  for (map_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
110  it != end; ++it)
111  if (it->second == pipe_) {
112  _inprocs.erase (it);
113  break;
114  }
115 }
116 
118 {
119  return _tag == 0xbaddecaf;
120 }
121 
123 {
124  return _thread_safe;
125 }
126 
128  class ctx_t *parent_,
129  uint32_t tid_,
130  int sid_)
131 {
132  socket_base_t *s = NULL;
133  switch (type_) {
134  case ZMQ_PAIR:
135  s = new (std::nothrow) pair_t (parent_, tid_, sid_);
136  break;
137  case ZMQ_PUB:
138  s = new (std::nothrow) pub_t (parent_, tid_, sid_);
139  break;
140  case ZMQ_SUB:
141  s = new (std::nothrow) sub_t (parent_, tid_, sid_);
142  break;
143  case ZMQ_REQ:
144  s = new (std::nothrow) req_t (parent_, tid_, sid_);
145  break;
146  case ZMQ_REP:
147  s = new (std::nothrow) rep_t (parent_, tid_, sid_);
148  break;
149  case ZMQ_DEALER:
150  s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
151  break;
152  case ZMQ_ROUTER:
153  s = new (std::nothrow) router_t (parent_, tid_, sid_);
154  break;
155  case ZMQ_PULL:
156  s = new (std::nothrow) pull_t (parent_, tid_, sid_);
157  break;
158  case ZMQ_PUSH:
159  s = new (std::nothrow) push_t (parent_, tid_, sid_);
160  break;
161  case ZMQ_XPUB:
162  s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
163  break;
164  case ZMQ_XSUB:
165  s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
166  break;
167  case ZMQ_STREAM:
168  s = new (std::nothrow) stream_t (parent_, tid_, sid_);
169  break;
170  case ZMQ_SERVER:
171  s = new (std::nothrow) server_t (parent_, tid_, sid_);
172  break;
173  case ZMQ_CLIENT:
174  s = new (std::nothrow) client_t (parent_, tid_, sid_);
175  break;
176  case ZMQ_RADIO:
177  s = new (std::nothrow) radio_t (parent_, tid_, sid_);
178  break;
179  case ZMQ_DISH:
180  s = new (std::nothrow) dish_t (parent_, tid_, sid_);
181  break;
182  case ZMQ_GATHER:
183  s = new (std::nothrow) gather_t (parent_, tid_, sid_);
184  break;
185  case ZMQ_SCATTER:
186  s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
187  break;
188  case ZMQ_DGRAM:
189  s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
190  break;
191  case ZMQ_PEER:
192  s = new (std::nothrow) peer_t (parent_, tid_, sid_);
193  break;
194  case ZMQ_CHANNEL:
195  s = new (std::nothrow) channel_t (parent_, tid_, sid_);
196  break;
197  default:
198  errno = EINVAL;
199  return NULL;
200  }
201 
202  alloc_assert (s);
203 
204  if (s->_mailbox == NULL) {
205  s->_destroyed = true;
206  LIBZMQ_DELETE (s);
207  return NULL;
208  }
209 
210  return s;
211 }
212 
214  uint32_t tid_,
215  int sid_,
216  bool thread_safe_) :
217  own_t (parent_, tid_),
218  _sync (),
219  _tag (0xbaddecaf),
221  _destroyed (false),
222  _poller (NULL),
223  _handle (static_cast<poller_t::handle_t> (NULL)),
224  _last_tsc (0),
225  _ticks (0),
226  _rcvmore (false),
228  _monitor_events (0),
229  _thread_safe (thread_safe_),
231  _monitor_sync (),
233 {
234  options.socket_id = sid_;
235  options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
236  options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
237  options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
238 
239  if (_thread_safe) {
240  _mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
242  } else {
243  mailbox_t *m = new (std::nothrow) mailbox_t ();
244  zmq_assert (m);
245 
246  if (m->get_fd () != retired_fd)
247  _mailbox = m;
248  else {
249  LIBZMQ_DELETE (m);
250  _mailbox = NULL;
251  }
252  }
253 }
254 
255 int zmq::socket_base_t::get_peer_state (const void *routing_id_,
256  size_t routing_id_size_) const
257 {
258  LIBZMQ_UNUSED (routing_id_);
259  LIBZMQ_UNUSED (routing_id_size_);
260 
261  // Only ROUTER sockets support this
262  errno = ENOTSUP;
263  return -1;
264 }
265 
267 {
268  if (_mailbox)
269  LIBZMQ_DELETE (_mailbox);
270 
271  if (_reaper_signaler)
272  LIBZMQ_DELETE (_reaper_signaler);
273 
274  scoped_lock_t lock (_monitor_sync);
275  stop_monitor ();
276 
277  zmq_assert (_destroyed);
278 }
279 
281 {
282  return _mailbox;
283 }
284 
286 {
287  // Called by ctx when it is terminated (zmq_ctx_term).
288  // 'stop' command is sent from the threads that called zmq_ctx_term to
289  // the thread owning the socket. This way, blocking call in the
290  // owner thread can be interrupted.
291  send_stop ();
292 }
293 
294 // TODO consider renaming protocol_ to scheme_ in conformance with RFC 3986
295 // terminology, but this requires extensive changes to be consistent
296 int zmq::socket_base_t::parse_uri (const char *uri_,
297  std::string &protocol_,
298  std::string &path_)
299 {
300  zmq_assert (uri_ != NULL);
301 
302  const std::string uri (uri_);
303  const std::string::size_type pos = uri.find ("://");
304  if (pos == std::string::npos) {
305  errno = EINVAL;
306  return -1;
307  }
308  protocol_ = uri.substr (0, pos);
309  path_ = uri.substr (pos + 3);
310 
311  if (protocol_.empty () || path_.empty ()) {
312  errno = EINVAL;
313  return -1;
314  }
315  return 0;
316 }
317 
319 {
320  // First check out whether the protocol is something we are aware of.
321  if (protocol_ != protocol_name::inproc
322 #if defined ZMQ_HAVE_IPC
323  && protocol_ != protocol_name::ipc
324 #endif
325  && protocol_ != protocol_name::tcp
326 #ifdef ZMQ_HAVE_WS
327  && protocol_ != protocol_name::ws
328 #endif
329 #ifdef ZMQ_HAVE_WSS
330  && protocol_ != protocol_name::wss
331 #endif
332 #if defined ZMQ_HAVE_OPENPGM
333  // pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
334  && protocol_ != protocol_name::pgm
335  && protocol_ != protocol_name::epgm
336 #endif
337 #if defined ZMQ_HAVE_TIPC
338  // TIPC transport is only available on Linux.
339  && protocol_ != protocol_name::tipc
340 #endif
341 #if defined ZMQ_HAVE_NORM
342  && protocol_ != protocol_name::norm
343 #endif
344 #if defined ZMQ_HAVE_VMCI
345  && protocol_ != protocol_name::vmci
346 #endif
347  && protocol_ != protocol_name::udp) {
349  return -1;
350  }
351 
352  // Check whether socket type and transport protocol match.
353  // Specifically, multicast protocols can't be combined with
354  // bi-directional messaging patterns (socket types).
355 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
356 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
357  if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm
358  || protocol_ == protocol_name::norm)
359 #elif defined ZMQ_HAVE_OPENPGM
360  if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm)
361 #else // defined ZMQ_HAVE_NORM
362  if (protocol_ == protocol_name::norm
363 #endif
364  && options.type != ZMQ_PUB && options.type != ZMQ_SUB
365  && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
366  errno = ENOCOMPATPROTO;
367  return -1;
368  }
369 #endif
370 
371  if (protocol_ == protocol_name::udp
372  && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
373  && options.type != ZMQ_DGRAM)) {
374  errno = ENOCOMPATPROTO;
375  return -1;
376  }
377 
378  // Protocol is available.
379  return 0;
380 }
381 
383  bool subscribe_to_all_,
384  bool locally_initiated_)
385 {
386  // First, register the pipe so that we can terminate it later on.
387  pipe_->set_event_sink (this);
388  _pipes.push_back (pipe_);
389 
390  // Let the derived socket type know about new pipe.
391  xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
392 
393  // If the socket is already being closed, ask any new pipes to terminate
394  // straight away.
395  if (is_terminating ()) {
396  register_term_acks (1);
397  pipe_->terminate (false);
398  }
399 }
400 
402  const void *optval_,
403  size_t optvallen_)
404 {
405  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
406 
407  if (unlikely (_ctx_terminated)) {
408  errno = ETERM;
409  return -1;
410  }
411 
412  // First, check whether specific socket type overloads the option.
413  int rc = xsetsockopt (option_, optval_, optvallen_);
414  if (rc == 0 || errno != EINVAL) {
415  return rc;
416  }
417 
418  // If the socket type doesn't support the option, pass it to
419  // the generic option parser.
420  rc = options.setsockopt (option_, optval_, optvallen_);
421  update_pipe_options (option_);
422 
423  return rc;
424 }
425 
427  void *optval_,
428  size_t *optvallen_)
429 {
430  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
431 
432  if (unlikely (_ctx_terminated)) {
433  errno = ETERM;
434  return -1;
435  }
436 
437  // First, check whether specific socket type overloads the option.
438  int rc = xgetsockopt (option_, optval_, optvallen_);
439  if (rc == 0 || errno != EINVAL) {
440  return rc;
441  }
442 
443  if (option_ == ZMQ_RCVMORE) {
444  return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
445  }
446 
447  if (option_ == ZMQ_FD) {
448  if (_thread_safe) {
449  // thread safe socket doesn't provide file descriptor
450  errno = EINVAL;
451  return -1;
452  }
453 
454  return do_getsockopt<fd_t> (
455  optval_, optvallen_,
456  (static_cast<mailbox_t *> (_mailbox))->get_fd ());
457  }
458 
459  if (option_ == ZMQ_EVENTS) {
460  const int rc = process_commands (0, false);
461  if (rc != 0 && (errno == EINTR || errno == ETERM)) {
462  return -1;
463  }
464  errno_assert (rc == 0);
465 
466  return do_getsockopt<int> (optval_, optvallen_,
467  (has_out () ? ZMQ_POLLOUT : 0)
468  | (has_in () ? ZMQ_POLLIN : 0));
469  }
470 
471  if (option_ == ZMQ_LAST_ENDPOINT) {
472  return do_getsockopt (optval_, optvallen_, _last_endpoint);
473  }
474 
475  if (option_ == ZMQ_THREAD_SAFE) {
476  return do_getsockopt<int> (optval_, optvallen_, _thread_safe ? 1 : 0);
477  }
478 
479  return options.getsockopt (option_, optval_, optvallen_);
480 }
481 
482 int zmq::socket_base_t::join (const char *group_)
483 {
484  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
485 
486  return xjoin (group_);
487 }
488 
489 int zmq::socket_base_t::leave (const char *group_)
490 {
491  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
492 
493  return xleave (group_);
494 }
495 
497 {
498  zmq_assert (_thread_safe);
499 
500  scoped_lock_t sync_lock (_sync);
501  (static_cast<mailbox_safe_t *> (_mailbox))->add_signaler (s_);
502 }
503 
505 {
506  zmq_assert (_thread_safe);
507 
508  scoped_lock_t sync_lock (_sync);
509  (static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
510 }
511 
512 int zmq::socket_base_t::bind (const char *endpoint_uri_)
513 {
514  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
515 
516  if (unlikely (_ctx_terminated)) {
517  errno = ETERM;
518  return -1;
519  }
520 
521  // Process pending commands, if any.
522  int rc = process_commands (0, false);
523  if (unlikely (rc != 0)) {
524  return -1;
525  }
526 
527  // Parse endpoint_uri_ string.
528  std::string protocol;
530  if (parse_uri (endpoint_uri_, protocol, address)
531  || check_protocol (protocol)) {
532  return -1;
533  }
534 
535  if (protocol == protocol_name::inproc) {
536  const endpoint_t endpoint = {this, options};
537  rc = register_endpoint (endpoint_uri_, endpoint);
538  if (rc == 0) {
539  connect_pending (endpoint_uri_, this);
540  _last_endpoint.assign (endpoint_uri_);
541  options.connected = true;
542  }
543  return rc;
544  }
545 
546 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
547 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
548  if (protocol == protocol_name::pgm || protocol == protocol_name::epgm
549  || protocol == protocol_name::norm) {
550 #elif defined ZMQ_HAVE_OPENPGM
551  if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
552 #else // defined ZMQ_HAVE_NORM
553  if (protocol == protocol_name::norm) {
554 #endif
555  // For convenience's sake, bind can be used interchangeable with
556  // connect for PGM, EPGM, NORM transports.
557  rc = connect (endpoint_uri_);
558  if (rc != -1)
559  options.connected = true;
560  return rc;
561  }
562 #endif
563 
564  if (protocol == protocol_name::udp) {
565  if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
567  return -1;
568  }
569 
570  // Choose the I/O thread to run the session in.
571  io_thread_t *io_thread = choose_io_thread (options.affinity);
572  if (!io_thread) {
573  errno = EMTHREAD;
574  return -1;
575  }
576 
577  address_t *paddr =
578  new (std::nothrow) address_t (protocol, address, this->get_ctx ());
579  alloc_assert (paddr);
580 
581  paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
582  alloc_assert (paddr->resolved.udp_addr);
583  rc = paddr->resolved.udp_addr->resolve (address.c_str (), true,
584  options.ipv6);
585  if (rc != 0) {
586  LIBZMQ_DELETE (paddr);
587  return -1;
588  }
589 
590  session_base_t *session =
591  session_base_t::create (io_thread, true, this, options, paddr);
592  errno_assert (session);
593 
594  // Create a bi-directional pipe.
595  object_t *parents[2] = {this, session};
596  pipe_t *new_pipes[2] = {NULL, NULL};
597 
598  int hwms[2] = {options.sndhwm, options.rcvhwm};
599  bool conflates[2] = {false, false};
600  rc = pipepair (parents, new_pipes, hwms, conflates);
601  errno_assert (rc == 0);
602 
603  // Attach local end of the pipe to the socket object.
604  attach_pipe (new_pipes[0], true, true);
605  pipe_t *const newpipe = new_pipes[0];
606 
607  // Attach remote end of the pipe to the session object later on.
608  session->attach_pipe (new_pipes[1]);
609 
610  // Save last endpoint URI
611  paddr->to_string (_last_endpoint);
612 
613  // TODO shouldn't this use _last_endpoint instead of endpoint_uri_? as in the other cases
614  add_endpoint (endpoint_uri_pair_t (endpoint_uri_, std::string (),
616  static_cast<own_t *> (session), newpipe);
617 
618  return 0;
619  }
620 
621  // Remaining transports require to be run in an I/O thread, so at this
622  // point we'll choose one.
623  io_thread_t *io_thread = choose_io_thread (options.affinity);
624  if (!io_thread) {
625  errno = EMTHREAD;
626  return -1;
627  }
628 
629  if (protocol == protocol_name::tcp) {
630  tcp_listener_t *listener =
631  new (std::nothrow) tcp_listener_t (io_thread, this, options);
632  alloc_assert (listener);
633  rc = listener->set_local_address (address.c_str ());
634  if (rc != 0) {
635  LIBZMQ_DELETE (listener);
636  event_bind_failed (make_unconnected_bind_endpoint_pair (address),
637  zmq_errno ());
638  return -1;
639  }
640 
641  // Save last endpoint URI
642  listener->get_local_address (_last_endpoint);
643 
644  add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
645  static_cast<own_t *> (listener), NULL);
646  options.connected = true;
647  return 0;
648  }
649 
650 #ifdef ZMQ_HAVE_WS
651 #ifdef ZMQ_HAVE_WSS
652  if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
653  ws_listener_t *listener = new (std::nothrow) ws_listener_t (
654  io_thread, this, options, protocol == protocol_name::wss);
655 #else
656  if (protocol == protocol_name::ws) {
657  ws_listener_t *listener =
658  new (std::nothrow) ws_listener_t (io_thread, this, options, false);
659 #endif
660  alloc_assert (listener);
661  rc = listener->set_local_address (address.c_str ());
662  if (rc != 0) {
663  LIBZMQ_DELETE (listener);
664  event_bind_failed (make_unconnected_bind_endpoint_pair (address),
665  zmq_errno ());
666  return -1;
667  }
668 
669  // Save last endpoint URI
670  listener->get_local_address (_last_endpoint);
671 
672  add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
673  static_cast<own_t *> (listener), NULL);
674  options.connected = true;
675  return 0;
676  }
677 #endif
678 
679 #if defined ZMQ_HAVE_IPC
680  if (protocol == protocol_name::ipc) {
681  ipc_listener_t *listener =
682  new (std::nothrow) ipc_listener_t (io_thread, this, options);
683  alloc_assert (listener);
684  int rc = listener->set_local_address (address.c_str ());
685  if (rc != 0) {
686  LIBZMQ_DELETE (listener);
687  event_bind_failed (make_unconnected_bind_endpoint_pair (address),
688  zmq_errno ());
689  return -1;
690  }
691 
692  // Save last endpoint URI
693  listener->get_local_address (_last_endpoint);
694 
695  add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
696  static_cast<own_t *> (listener), NULL);
697  options.connected = true;
698  return 0;
699  }
700 #endif
701 #if defined ZMQ_HAVE_TIPC
702  if (protocol == protocol_name::tipc) {
703  tipc_listener_t *listener =
704  new (std::nothrow) tipc_listener_t (io_thread, this, options);
705  alloc_assert (listener);
706  int rc = listener->set_local_address (address.c_str ());
707  if (rc != 0) {
708  LIBZMQ_DELETE (listener);
709  event_bind_failed (make_unconnected_bind_endpoint_pair (address),
710  zmq_errno ());
711  return -1;
712  }
713 
714  // Save last endpoint URI
715  listener->get_local_address (_last_endpoint);
716 
717  // TODO shouldn't this use _last_endpoint as in the other cases?
718  add_endpoint (make_unconnected_bind_endpoint_pair (endpoint_uri_),
719  static_cast<own_t *> (listener), NULL);
720  options.connected = true;
721  return 0;
722  }
723 #endif
724 #if defined ZMQ_HAVE_VMCI
725  if (protocol == protocol_name::vmci) {
726  vmci_listener_t *listener =
727  new (std::nothrow) vmci_listener_t (io_thread, this, options);
728  alloc_assert (listener);
729  int rc = listener->set_local_address (address.c_str ());
730  if (rc != 0) {
731  LIBZMQ_DELETE (listener);
732  event_bind_failed (make_unconnected_bind_endpoint_pair (address),
733  zmq_errno ());
734  return -1;
735  }
736 
737  listener->get_local_address (_last_endpoint);
738 
739  add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
740  static_cast<own_t *> (listener), NULL);
741  options.connected = true;
742  return 0;
743  }
744 #endif
745 
746  zmq_assert (false);
747  return -1;
748 }
749 
750 int zmq::socket_base_t::connect (const char *endpoint_uri_)
751 {
752  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
753  return connect_internal (endpoint_uri_);
754 }
755 
756 int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
757 {
758  if (unlikely (_ctx_terminated)) {
759  errno = ETERM;
760  return -1;
761  }
762 
763  // Process pending commands, if any.
764  int rc = process_commands (0, false);
765  if (unlikely (rc != 0)) {
766  return -1;
767  }
768 
769  // Parse endpoint_uri_ string.
770  std::string protocol;
772  if (parse_uri (endpoint_uri_, protocol, address)
773  || check_protocol (protocol)) {
774  return -1;
775  }
776 
777  if (protocol == protocol_name::inproc) {
778  // TODO: inproc connect is specific with respect to creating pipes
779  // as there's no 'reconnect' functionality implemented. Once that
780  // is in place we should follow generic pipe creation algorithm.
781 
782  // Find the peer endpoint.
783  const endpoint_t peer = find_endpoint (endpoint_uri_);
784 
785  // The total HWM for an inproc connection should be the sum of
786  // the binder's HWM and the connector's HWM.
787  const int sndhwm = peer.socket == NULL ? options.sndhwm
788  : options.sndhwm != 0 && peer.options.rcvhwm != 0
789  ? options.sndhwm + peer.options.rcvhwm
790  : 0;
791  const int rcvhwm = peer.socket == NULL ? options.rcvhwm
792  : options.rcvhwm != 0 && peer.options.sndhwm != 0
793  ? options.rcvhwm + peer.options.sndhwm
794  : 0;
795 
796  // Create a bi-directional pipe to connect the peers.
797  object_t *parents[2] = {this, peer.socket == NULL ? this : peer.socket};
798  pipe_t *new_pipes[2] = {NULL, NULL};
799 
800  const bool conflate = get_effective_conflate_option (options);
801 
802  int hwms[2] = {conflate ? -1 : sndhwm, conflate ? -1 : rcvhwm};
803  bool conflates[2] = {conflate, conflate};
804  rc = pipepair (parents, new_pipes, hwms, conflates);
805  if (!conflate) {
806  new_pipes[0]->set_hwms_boost (peer.options.sndhwm,
807  peer.options.rcvhwm);
808  new_pipes[1]->set_hwms_boost (options.sndhwm, options.rcvhwm);
809  }
810 
811  errno_assert (rc == 0);
812 
813  if (!peer.socket) {
814  // The peer doesn't exist yet so we don't know whether
815  // to send the routing id message or not. To resolve this,
816  // we always send our routing id and drop it later if
817  // the peer doesn't expect it.
818  send_routing_id (new_pipes[0], options);
819 
820 #ifdef ZMQ_BUILD_DRAFT_API
821  // If set, send the hello msg of the local socket to the peer.
822  if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
823  send_hello_msg (new_pipes[0], options);
824  }
825 #endif
826 
827  const endpoint_t endpoint = {this, options};
828  pend_connection (std::string (endpoint_uri_), endpoint, new_pipes);
829  } else {
830  // If required, send the routing id of the local socket to the peer.
831  if (peer.options.recv_routing_id) {
832  send_routing_id (new_pipes[0], options);
833  }
834 
835  // If required, send the routing id of the peer to the local socket.
836  if (options.recv_routing_id) {
837  send_routing_id (new_pipes[1], peer.options);
838  }
839 
840 #ifdef ZMQ_BUILD_DRAFT_API
841  // If set, send the hello msg of the local socket to the peer.
842  if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
843  send_hello_msg (new_pipes[0], options);
844  }
845 
846  // If set, send the hello msg of the peer to the local socket.
847  if (peer.options.can_send_hello_msg
848  && peer.options.hello_msg.size () > 0) {
849  send_hello_msg (new_pipes[1], peer.options);
850  }
851 
853  && peer.options.disconnect_msg.size () > 0)
854  new_pipes[0]->set_disconnect_msg (peer.options.disconnect_msg);
855 #endif
856 
857  // Attach remote end of the pipe to the peer socket. Note that peer's
858  // seqnum was incremented in find_endpoint function. We don't need it
859  // increased here.
860  send_bind (peer.socket, new_pipes[1], false);
861  }
862 
863  // Attach local end of the pipe to this socket object.
864  attach_pipe (new_pipes[0], false, true);
865 
866  // Save last endpoint URI
867  _last_endpoint.assign (endpoint_uri_);
868 
869  // remember inproc connections for disconnect
870  _inprocs.emplace (endpoint_uri_, new_pipes[0]);
871 
872  options.connected = true;
873  return 0;
874  }
875  const bool is_single_connect =
876  (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
877  || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
878  if (unlikely (is_single_connect)) {
879  if (0 != _endpoints.count (endpoint_uri_)) {
880  // There is no valid use for multiple connects for SUB-PUB nor
881  // DEALER-ROUTER nor REQ-REP. Multiple connects produces
882  // nonsensical results.
883  return 0;
884  }
885  }
886 
887  // Choose the I/O thread to run the session in.
888  io_thread_t *io_thread = choose_io_thread (options.affinity);
889  if (!io_thread) {
890  errno = EMTHREAD;
891  return -1;
892  }
893 
894  address_t *paddr =
895  new (std::nothrow) address_t (protocol, address, this->get_ctx ());
896  alloc_assert (paddr);
897 
898  // Resolve address (if needed by the protocol)
899  if (protocol == protocol_name::tcp) {
900  // Do some basic sanity checks on tcp:// address syntax
901  // - hostname starts with digit or letter, with embedded '-' or '.'
902  // - IPv6 address may contain hex chars and colons.
903  // - IPv6 link local address may contain % followed by interface name / zone_id
904  // (Reference: https://tools.ietf.org/html/rfc4007)
905  // - IPv4 address may contain decimal digits and dots.
906  // - Address must end in ":port" where port is *, or numeric
907  // - Address may contain two parts separated by ':'
908  // Following code is quick and dirty check to catch obvious errors,
909  // without trying to be fully accurate.
910  const char *check = address.c_str ();
911  if (isalnum (*check) || isxdigit (*check) || *check == '['
912  || *check == ':') {
913  check++;
914  while (isalnum (*check) || isxdigit (*check) || *check == '.'
915  || *check == '-' || *check == ':' || *check == '%'
916  || *check == ';' || *check == '[' || *check == ']'
917  || *check == '_' || *check == '*') {
918  check++;
919  }
920  }
921  // Assume the worst, now look for success
922  rc = -1;
923  // Did we reach the end of the address safely?
924  if (*check == 0) {
925  // Do we have a valid port string? (cannot be '*' in connect
926  check = strrchr (address.c_str (), ':');
927  if (check) {
928  check++;
929  if (*check && (isdigit (*check)))
930  rc = 0; // Valid
931  }
932  }
933  if (rc == -1) {
934  errno = EINVAL;
935  LIBZMQ_DELETE (paddr);
936  return -1;
937  }
938  // Defer resolution until a socket is opened
939  paddr->resolved.tcp_addr = NULL;
940  }
941 #ifdef ZMQ_HAVE_WS
942 #ifdef ZMQ_HAVE_WSS
943  else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
944  if (protocol == protocol_name::wss) {
945  paddr->resolved.wss_addr = new (std::nothrow) wss_address_t ();
946  alloc_assert (paddr->resolved.wss_addr);
947  rc = paddr->resolved.wss_addr->resolve (address.c_str (), false,
948  options.ipv6);
949  } else
950 #else
951  else if (protocol == protocol_name::ws) {
952 #endif
953  {
954  paddr->resolved.ws_addr = new (std::nothrow) ws_address_t ();
955  alloc_assert (paddr->resolved.ws_addr);
956  rc = paddr->resolved.ws_addr->resolve (address.c_str (), false,
957  options.ipv6);
958  }
959 
960  if (rc != 0) {
961  LIBZMQ_DELETE (paddr);
962  return -1;
963  }
964  }
965 #endif
966 
967 #if defined ZMQ_HAVE_IPC
968  else if (protocol == protocol_name::ipc) {
969  paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
970  alloc_assert (paddr->resolved.ipc_addr);
971  int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
972  if (rc != 0) {
973  LIBZMQ_DELETE (paddr);
974  return -1;
975  }
976  }
977 #endif
978 
979  if (protocol == protocol_name::udp) {
980  if (options.type != ZMQ_RADIO) {
982  LIBZMQ_DELETE (paddr);
983  return -1;
984  }
985 
986  paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
987  alloc_assert (paddr->resolved.udp_addr);
988  rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
989  options.ipv6);
990  if (rc != 0) {
991  LIBZMQ_DELETE (paddr);
992  return -1;
993  }
994  }
995 
996  // TBD - Should we check address for ZMQ_HAVE_NORM???
997 
998 #ifdef ZMQ_HAVE_OPENPGM
999  if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
1000  struct pgm_addrinfo_t *res = NULL;
1001  uint16_t port_number = 0;
1002  int rc =
1003  pgm_socket_t::init_address (address.c_str (), &res, &port_number);
1004  if (res != NULL)
1005  pgm_freeaddrinfo (res);
1006  if (rc != 0 || port_number == 0) {
1007  return -1;
1008  }
1009  }
1010 #endif
1011 #if defined ZMQ_HAVE_TIPC
1012  else if (protocol == protocol_name::tipc) {
1013  paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
1014  alloc_assert (paddr->resolved.tipc_addr);
1015  int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
1016  if (rc != 0) {
1017  LIBZMQ_DELETE (paddr);
1018  return -1;
1019  }
1020  const sockaddr_tipc *const saddr =
1021  reinterpret_cast<const sockaddr_tipc *> (
1022  paddr->resolved.tipc_addr->addr ());
1023  // Cannot connect to random Port Identity
1024  if (saddr->addrtype == TIPC_ADDR_ID
1025  && paddr->resolved.tipc_addr->is_random ()) {
1026  LIBZMQ_DELETE (paddr);
1027  errno = EINVAL;
1028  return -1;
1029  }
1030  }
1031 #endif
1032 #if defined ZMQ_HAVE_VMCI
1033  else if (protocol == protocol_name::vmci) {
1034  paddr->resolved.vmci_addr =
1035  new (std::nothrow) vmci_address_t (this->get_ctx ());
1036  alloc_assert (paddr->resolved.vmci_addr);
1037  int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
1038  if (rc != 0) {
1039  LIBZMQ_DELETE (paddr);
1040  return -1;
1041  }
1042  }
1043 #endif
1044 
1045  // Create session.
1046  session_base_t *session =
1047  session_base_t::create (io_thread, true, this, options, paddr);
1048  errno_assert (session);
1049 
1050  // PGM does not support subscription forwarding; ask for all data to be
1051  // sent to this pipe. (same for NORM, currently?)
1052 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
1053  const bool subscribe_to_all =
1054  protocol == protocol_name::pgm || protocol == protocol_name::epgm
1055  || protocol == protocol_name::norm || protocol == protocol_name::udp;
1056 #elif defined ZMQ_HAVE_OPENPGM
1057  const bool subscribe_to_all = protocol == protocol_name::pgm
1058  || protocol == protocol_name::epgm
1059  || protocol == protocol_name::udp;
1060 #elif defined ZMQ_HAVE_NORM
1061  const bool subscribe_to_all =
1062  protocol == protocol_name::norm || protocol == protocol_name::udp;
1063 #else
1064  const bool subscribe_to_all = protocol == protocol_name::udp;
1065 #endif
1066  pipe_t *newpipe = NULL;
1067 
1068  if (options.immediate != 1 || subscribe_to_all) {
1069  // Create a bi-directional pipe.
1070  object_t *parents[2] = {this, session};
1071  pipe_t *new_pipes[2] = {NULL, NULL};
1072 
1073  const bool conflate = get_effective_conflate_option (options);
1074 
1075  int hwms[2] = {conflate ? -1 : options.sndhwm,
1076  conflate ? -1 : options.rcvhwm};
1077  bool conflates[2] = {conflate, conflate};
1078  rc = pipepair (parents, new_pipes, hwms, conflates);
1079  errno_assert (rc == 0);
1080 
1081  // Attach local end of the pipe to the socket object.
1082  attach_pipe (new_pipes[0], subscribe_to_all, true);
1083  newpipe = new_pipes[0];
1084 
1085  // Attach remote end of the pipe to the session object later on.
1086  session->attach_pipe (new_pipes[1]);
1087  }
1088 
1089  // Save last endpoint URI
1090  paddr->to_string (_last_endpoint);
1091 
1092  add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_),
1093  static_cast<own_t *> (session), newpipe);
1094  return 0;
1095 }
1096 
1099  const char *tcp_address_)
1100 {
1101  // The resolved last_endpoint is used as a key in the endpoints map.
1102  // The address passed by the user might not match in the TCP case due to
1103  // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
1104  // resolve before giving up. Given at this stage we don't know whether a
1105  // socket is connected or bound, try with both.
1106  if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1107  tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
1108  alloc_assert (tcp_addr);
1109  int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);
1110 
1111  if (rc == 0) {
1112  tcp_addr->to_string (endpoint_uri_pair_);
1113  if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1114  rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
1115  if (rc == 0) {
1116  tcp_addr->to_string (endpoint_uri_pair_);
1117  }
1118  }
1119  }
1120  LIBZMQ_DELETE (tcp_addr);
1121  }
1122  return endpoint_uri_pair_;
1123 }
1124 
1126  const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
1127 {
1128  // Activate the session. Make it a child of this socket.
1129  launch_child (endpoint_);
1130  _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
1131  endpoint_pipe_t (endpoint_, pipe_));
1132 
1133  if (pipe_ != NULL)
1134  pipe_->set_endpoint_pair (endpoint_pair_);
1135 }
1136 
1137 int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
1138 {
1139  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1140 
1141  // Check whether the context hasn't been shut down yet.
1142  if (unlikely (_ctx_terminated)) {
1143  errno = ETERM;
1144  return -1;
1145  }
1146 
1147  // Check whether endpoint address passed to the function is valid.
1148  if (unlikely (!endpoint_uri_)) {
1149  errno = EINVAL;
1150  return -1;
1151  }
1152 
1153  // Process pending commands, if any, since there could be pending unprocessed process_own()'s
1154  // (from launch_child() for example) we're asked to terminate now.
1155  const int rc = process_commands (0, false);
1156  if (unlikely (rc != 0)) {
1157  return -1;
1158  }
1159 
1160  // Parse endpoint_uri_ string.
1161  std::string uri_protocol;
1162  std::string uri_path;
1163  if (parse_uri (endpoint_uri_, uri_protocol, uri_path)
1164  || check_protocol (uri_protocol)) {
1165  return -1;
1166  }
1167 
1168  const std::string endpoint_uri_str = std::string (endpoint_uri_);
1169 
1170  // Disconnect an inproc socket
1171  if (uri_protocol == protocol_name::inproc) {
1172  return unregister_endpoint (endpoint_uri_str, this) == 0
1173  ? 0
1174  : _inprocs.erase_pipes (endpoint_uri_str);
1175  }
1176 
1177  const std::string resolved_endpoint_uri =
1178  uri_protocol == protocol_name::tcp
1179  ? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
1180  : endpoint_uri_str;
1181 
1182  // Find the endpoints range (if any) corresponding to the endpoint_uri_pair_ string.
1183  const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
1184  _endpoints.equal_range (resolved_endpoint_uri);
1185  if (range.first == range.second) {
1186  errno = ENOENT;
1187  return -1;
1188  }
1189 
1190  for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1191  // If we have an associated pipe, terminate it.
1192  if (it->second.second != NULL)
1193  it->second.second->terminate (false);
1194  term_child (it->second.first);
1195  }
1196  _endpoints.erase (range.first, range.second);
1197 
1198  if (options.reconnect_stop & ZMQ_RECONNECT_STOP_AFTER_DISCONNECT) {
1199  _disconnected = true;
1200  }
1201 
1202  return 0;
1203 }
1204 
1205 int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1206 {
1207  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1208 
1209  // Check whether the context hasn't been shut down yet.
1210  if (unlikely (_ctx_terminated)) {
1211  errno = ETERM;
1212  return -1;
1213  }
1214 
1215  // Check whether message passed to the function is valid.
1216  if (unlikely (!msg_ || !msg_->check ())) {
1217  errno = EFAULT;
1218  return -1;
1219  }
1220 
1221  // Process pending commands, if any.
1222  int rc = process_commands (0, true);
1223  if (unlikely (rc != 0)) {
1224  return -1;
1225  }
1226 
1227  // Clear any user-visible flags that are set on the message.
1228  msg_->reset_flags (msg_t::more);
1229 
1230  // At this point we impose the flags on the message.
1231  if (flags_ & ZMQ_SNDMORE)
1232  msg_->set_flags (msg_t::more);
1233 
1234  msg_->reset_metadata ();
1235 
1236  // Try to send the message using method in each socket class
1237  rc = xsend (msg_);
1238  if (rc == 0) {
1239  return 0;
1240  }
1241  // Special case for ZMQ_PUSH: -2 means pipe is dead while a
1242  // multi-part send is in progress and can't be recovered, so drop
1243  // silently when in blocking mode to keep backward compatibility.
1244  if (unlikely (rc == -2)) {
1245  if (!((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0)) {
1246  rc = msg_->close ();
1247  errno_assert (rc == 0);
1248  rc = msg_->init ();
1249  errno_assert (rc == 0);
1250  return 0;
1251  }
1252  }
1253  if (unlikely (errno != EAGAIN)) {
1254  return -1;
1255  }
1256 
1257  // In case of non-blocking send we'll simply propagate
1258  // the error - including EAGAIN - up the stack.
1259  if ((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0) {
1260  return -1;
1261  }
1262 
1263  // Compute the time when the timeout should occur.
1264  // If the timeout is infinite, don't care.
1265  int timeout = options.sndtimeo;
1266  const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1267 
1268  // Oops, we couldn't send the message. Wait for the next
1269  // command, process it and try to send the message again.
1270  // If timeout is reached in the meantime, return EAGAIN.
1271  while (true) {
1272  if (unlikely (process_commands (timeout, false) != 0)) {
1273  return -1;
1274  }
1275  rc = xsend (msg_);
1276  if (rc == 0)
1277  break;
1278  if (unlikely (errno != EAGAIN)) {
1279  return -1;
1280  }
1281  if (timeout > 0) {
1282  timeout = static_cast<int> (end - _clock.now_ms ());
1283  if (timeout <= 0) {
1284  errno = EAGAIN;
1285  return -1;
1286  }
1287  }
1288  }
1289 
1290  return 0;
1291 }
1292 
1293 int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1294 {
1295  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1296 
1297  // Check whether the context hasn't been shut down yet.
1298  if (unlikely (_ctx_terminated)) {
1299  errno = ETERM;
1300  return -1;
1301  }
1302 
1303  // Check whether message passed to the function is valid.
1304  if (unlikely (!msg_ || !msg_->check ())) {
1305  errno = EFAULT;
1306  return -1;
1307  }
1308 
1309  // Once every inbound_poll_rate messages check for signals and process
1310  // incoming commands. This happens only if we are not polling altogether
1311  // because there are messages available all the time. If poll occurs,
1312  // ticks is set to zero and thus we avoid this code.
1313  //
1314  // Note that 'recv' uses different command throttling algorithm (the one
1315  // described above) from the one used by 'send'. This is because counting
1316  // ticks is more efficient than doing RDTSC all the time.
1317  if (++_ticks == inbound_poll_rate) {
1318  if (unlikely (process_commands (0, false) != 0)) {
1319  return -1;
1320  }
1321  _ticks = 0;
1322  }
1323 
1324  // Get the message.
1325  int rc = xrecv (msg_);
1326  if (unlikely (rc != 0 && errno != EAGAIN)) {
1327  return -1;
1328  }
1329 
1330  // If we have the message, return immediately.
1331  if (rc == 0) {
1332  extract_flags (msg_);
1333  return 0;
1334  }
1335 
1336  // If the message cannot be fetched immediately, there are two scenarios.
1337  // For non-blocking recv, commands are processed in case there's an
1338  // activate_reader command already waiting in a command pipe.
1339  // If it's not, return EAGAIN.
1340  if ((flags_ & ZMQ_DONTWAIT) || options.rcvtimeo == 0) {
1341  if (unlikely (process_commands (0, false) != 0)) {
1342  return -1;
1343  }
1344  _ticks = 0;
1345 
1346  rc = xrecv (msg_);
1347  if (rc < 0) {
1348  return rc;
1349  }
1350  extract_flags (msg_);
1351 
1352  return 0;
1353  }
1354 
1355  // Compute the time when the timeout should occur.
1356  // If the timeout is infinite, don't care.
1357  int timeout = options.rcvtimeo;
1358  const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1359 
1360  // In blocking scenario, commands are processed over and over again until
1361  // we are able to fetch a message.
1362  bool block = (_ticks != 0);
1363  while (true) {
1364  if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1365  return -1;
1366  }
1367  rc = xrecv (msg_);
1368  if (rc == 0) {
1369  _ticks = 0;
1370  break;
1371  }
1372  if (unlikely (errno != EAGAIN)) {
1373  return -1;
1374  }
1375  block = true;
1376  if (timeout > 0) {
1377  timeout = static_cast<int> (end - _clock.now_ms ());
1378  if (timeout <= 0) {
1379  errno = EAGAIN;
1380  return -1;
1381  }
1382  }
1383  }
1384 
1385  extract_flags (msg_);
1386  return 0;
1387 }
1388 
1390 {
1391  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1392 
1393  // Remove all existing signalers for thread safe sockets
1394  if (_thread_safe)
1395  (static_cast<mailbox_safe_t *> (_mailbox))->clear_signalers ();
1396 
1397  // Mark the socket as dead
1398  _tag = 0xdeadbeef;
1399 
1400 
1401  // Transfer the ownership of the socket from this application thread
1402  // to the reaper thread which will take care of the rest of shutdown
1403  // process.
1404  send_reap (this);
1405 
1406  return 0;
1407 }
1408 
1410 {
1411  return xhas_in ();
1412 }
1413 
1415 {
1416  return xhas_out ();
1417 }
1418 
1419 void zmq::socket_base_t::start_reaping (poller_t *poller_)
1420 {
1421  // Plug the socket to the reaper thread.
1422  _poller = poller_;
1423 
1424  fd_t fd;
1425 
1426  if (!_thread_safe)
1427  fd = (static_cast<mailbox_t *> (_mailbox))->get_fd ();
1428  else {
1429  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1430 
1431  _reaper_signaler = new (std::nothrow) signaler_t ();
1432  zmq_assert (_reaper_signaler);
1433 
1434  // Add signaler to the safe mailbox
1435  fd = _reaper_signaler->get_fd ();
1436  (static_cast<mailbox_safe_t *> (_mailbox))
1437  ->add_signaler (_reaper_signaler);
1438 
1439  // Send a signal to make sure reaper handle existing commands
1440  _reaper_signaler->send ();
1441  }
1442 
1443  _handle = _poller->add_fd (fd, this);
1444  _poller->set_pollin (_handle);
1445 
1446  // Initialise the termination and check whether it can be deallocated
1447  // immediately.
1448  terminate ();
1449  check_destroy ();
1450 }
1451 
1452 int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
1453 {
1454  if (timeout_ == 0) {
1455  // If we are asked not to wait, check whether we haven't processed
1456  // commands recently, so that we can throttle the new commands.
1457 
1458  // Get the CPU's tick counter. If 0, the counter is not available.
1459  const uint64_t tsc = zmq::clock_t::rdtsc ();
1460 
1461  // Optimised version of command processing - it doesn't have to check
1462  // for incoming commands each time. It does so only if certain time
1463  // elapsed since last command processing. Command delay varies
1464  // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
1465  // etc. The optimisation makes sense only on platforms where getting
1466  // a timestamp is a very cheap operation (tens of nanoseconds).
1467  if (tsc && throttle_) {
1468  // Check whether TSC haven't jumped backwards (in case of migration
1469  // between CPU cores) and whether certain time have elapsed since
1470  // last command processing. If it didn't do nothing.
1471  if (tsc >= _last_tsc && tsc - _last_tsc <= max_command_delay)
1472  return 0;
1473  _last_tsc = tsc;
1474  }
1475  }
1476 
1477  // Check whether there are any commands pending for this thread.
1478  command_t cmd;
1479  int rc = _mailbox->recv (&cmd, timeout_);
1480 
1481  if (rc != 0 && errno == EINTR)
1482  return -1;
1483 
1484  // Process all available commands.
1485  while (rc == 0 || errno == EINTR) {
1486  if (rc == 0) {
1487  cmd.destination->process_command (cmd);
1488  }
1489  rc = _mailbox->recv (&cmd, 0);
1490  }
1491 
1492  zmq_assert (errno == EAGAIN);
1493 
1494  if (_ctx_terminated) {
1495  errno = ETERM;
1496  return -1;
1497  }
1498 
1499  return 0;
1500 }
1501 
1503 {
1504  // Here, someone have called zmq_ctx_term while the socket was still alive.
1505  // We'll remember the fact so that any blocking call is interrupted and any
1506  // further attempt to use the socket will return ETERM. The user is still
1507  // responsible for calling zmq_close on the socket though!
1508  scoped_lock_t lock (_monitor_sync);
1509  stop_monitor ();
1510 
1511  _ctx_terminated = true;
1512 }
1513 
1515 {
1516  attach_pipe (pipe_);
1517 }
1518 
1520 {
1521  // Unregister all inproc endpoints associated with this socket.
1522  // Doing this we make sure that no new pipes from other sockets (inproc)
1523  // will be initiated.
1524  unregister_endpoints (this);
1525 
1526  // Ask all attached pipes to terminate.
1527  for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1528  // Only inprocs might have a disconnect message set
1529  _pipes[i]->send_disconnect_msg ();
1530  _pipes[i]->terminate (false);
1531  }
1532  register_term_acks (static_cast<int> (_pipes.size ()));
1533 
1534  // Continue the termination process immediately.
1535  own_t::process_term (linger_);
1536 }
1537 
1539 {
1540  term_endpoint (endpoint_->c_str ());
1541  delete endpoint_;
1542 }
1543 
1545  uint64_t outbound_queue_count_,
1546  uint64_t inbound_queue_count_,
1547  endpoint_uri_pair_t *endpoint_pair_)
1548 {
1549  uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_};
1550  event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS);
1551  delete endpoint_pair_;
1552 }
1553 
1554 /*
1555  * There are 2 pipes per connection, and the inbound one _must_ be queried from
1556  * the I/O thread. So ask the outbound pipe, in the application thread, to send
1557  * a message (pipe_peer_stats) to its peer. The message will carry the outbound
1558  * pipe stats and endpoint, and the reference to the socket object.
1559  * The inbound pipe on the I/O thread will then add its own stats and endpoint,
1560  * and write back a message to the socket object (pipe_stats_publish) which
1561  * will raise an event with the data.
1562  */
1564 {
1565  {
1566  scoped_lock_t lock (_monitor_sync);
1567  if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) {
1568  errno = EINVAL;
1569  return -1;
1570  }
1571  }
1572  if (_pipes.size () == 0) {
1573  errno = EAGAIN;
1574  return -1;
1575  }
1576  for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1577  _pipes[i]->send_stats_to_peer (this);
1578  }
1579 
1580  return 0;
1581 }
1582 
1584 {
1585  if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
1586  for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1587  _pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
1588  _pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
1589  }
1590  }
1591 }
1592 
1594 {
1595  _destroyed = true;
1596 }
1597 
1598 int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1599 {
1600  errno = EINVAL;
1601  return -1;
1602 }
1603 
1604 int zmq::socket_base_t::xgetsockopt (int, void *, size_t *)
1605 {
1606  errno = EINVAL;
1607  return -1;
1608 }
1609 
1611 {
1612  return false;
1613 }
1614 
1616 {
1617  errno = ENOTSUP;
1618  return -1;
1619 }
1620 
1622 {
1623  return false;
1624 }
1625 
1626 int zmq::socket_base_t::xjoin (const char *group_)
1627 {
1628  LIBZMQ_UNUSED (group_);
1629  errno = ENOTSUP;
1630  return -1;
1631 }
1632 
1633 int zmq::socket_base_t::xleave (const char *group_)
1634 {
1635  LIBZMQ_UNUSED (group_);
1636  errno = ENOTSUP;
1637  return -1;
1638 }
1639 
1641 {
1642  errno = ENOTSUP;
1643  return -1;
1644 }
1645 
1647 {
1648  zmq_assert (false);
1649 }
1651 {
1652  zmq_assert (false);
1653 }
1654 
1656 {
1657  zmq_assert (false);
1658 }
1659 
1661 {
1662  // This function is invoked only once the socket is running in the context
1663  // of the reaper thread. Process any commands from other threads/sockets
1664  // that may be available at the moment. Ultimately, the socket will
1665  // be destroyed.
1666  {
1667  scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1668 
1669  // If the socket is thread safe we need to unsignal the reaper signaler
1670  if (_thread_safe)
1671  _reaper_signaler->recv ();
1672 
1673  process_commands (0, false);
1674  }
1675  check_destroy ();
1676 }
1677 
1679 {
1680  zmq_assert (false);
1681 }
1682 
1684 {
1685  zmq_assert (false);
1686 }
1687 
1689 {
1690  // If the object was already marked as destroyed, finish the deallocation.
1691  if (_destroyed) {
1692  // Remove the socket from the reaper's poller.
1693  _poller->rm_fd (_handle);
1694 
1695  // Remove the socket from the context.
1696  destroy_socket (this);
1697 
1698  // Notify the reaper about the fact.
1699  send_reaped ();
1700 
1701  // Deallocate.
1703  }
1704 }
1705 
1707 {
1708  xread_activated (pipe_);
1709 }
1710 
1712 {
1713  xwrite_activated (pipe_);
1714 }
1715 
1716 void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
1717 {
1718  if (options.immediate == 1)
1719  pipe_->terminate (false);
1720  else
1721  // Notify derived sockets of the hiccup
1722  xhiccuped (pipe_);
1723 }
1724 
1726 {
1727  // Notify the specific socket type about the pipe termination.
1728  xpipe_terminated (pipe_);
1729 
1730  // Remove pipe from inproc pipes
1731  _inprocs.erase_pipe (pipe_);
1732 
1733  // Remove the pipe from the list of attached pipes and confirm its
1734  // termination if we are already shutting down.
1735  _pipes.erase (pipe_);
1736 
1737  // Remove the pipe from _endpoints (set it to NULL).
1738  const std::string &identifier = pipe_->get_endpoint_pair ().identifier ();
1739  if (!identifier.empty ()) {
1740  std::pair<endpoints_t::iterator, endpoints_t::iterator> range;
1741  range = _endpoints.equal_range (identifier);
1742 
1743  for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1744  if (it->second.second == pipe_) {
1745  it->second.second = NULL;
1746  break;
1747  }
1748  }
1749  }
1750 
1751  if (is_terminating ())
1752  unregister_term_ack ();
1753 }
1754 
1756 {
1757  // Test whether routing_id flag is valid for this socket type.
1758  if (unlikely (msg_->flags () & msg_t::routing_id))
1759  zmq_assert (options.recv_routing_id);
1760 
1761  // Remove MORE flag.
1762  _rcvmore = (msg_->flags () & msg_t::more) != 0;
1763 }
1764 
1765 int zmq::socket_base_t::monitor (const char *endpoint_,
1766  uint64_t events_,
1767  int event_version_,
1768  int type_)
1769 {
1770  scoped_lock_t lock (_monitor_sync);
1771 
1772  if (unlikely (_ctx_terminated)) {
1773  errno = ETERM;
1774  return -1;
1775  }
1776 
1777  // Event version 1 supports only first 16 events.
1778  if (unlikely (event_version_ == 1 && events_ >> 16 != 0)) {
1779  errno = EINVAL;
1780  return -1;
1781  }
1782 
1783  // Support deregistering monitoring endpoints as well
1784  if (endpoint_ == NULL) {
1785  stop_monitor ();
1786  return 0;
1787  }
1788  // Parse endpoint_uri_ string.
1789  std::string protocol;
1791  if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
1792  return -1;
1793 
1794  // Event notification only supported over inproc://
1795  if (protocol != protocol_name::inproc) {
1797  return -1;
1798  }
1799 
1800  // already monitoring. Stop previous monitor before starting new one.
1801  if (_monitor_socket != NULL) {
1802  stop_monitor (true);
1803  }
1804 
1805  // Check if the specified socket type is supported. It must be a
1806  // one-way socket types that support the SNDMORE flag.
1807  switch (type_) {
1808  case ZMQ_PAIR:
1809  break;
1810  case ZMQ_PUB:
1811  break;
1812  case ZMQ_PUSH:
1813  break;
1814  default:
1815  errno = EINVAL;
1816  return -1;
1817  }
1818 
1819  // Register events to monitor
1820  _monitor_events = events_;
1821  options.monitor_event_version = event_version_;
1822  // Create a monitor socket of the specified type.
1823  _monitor_socket = zmq_socket (get_ctx (), type_);
1824  if (_monitor_socket == NULL)
1825  return -1;
1826 
1827  // Never block context termination on pending event messages
1828  int linger = 0;
1829  int rc =
1830  zmq_setsockopt (_monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1831  if (rc == -1)
1832  stop_monitor (false);
1833 
1834  // Spawn the monitor socket endpoint
1835  rc = zmq_bind (_monitor_socket, endpoint_);
1836  if (rc == -1)
1837  stop_monitor (false);
1838  return rc;
1839 }
1840 
1842  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1843 {
1844  uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1845  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECTED);
1846 }
1847 
1849  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1850 {
1851  uint64_t values[1] = {static_cast<uint64_t> (err_)};
1852  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_DELAYED);
1853 }
1854 
1856  const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
1857 {
1858  uint64_t values[1] = {static_cast<uint64_t> (interval_)};
1859  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_RETRIED);
1860 }
1861 
1863  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1864 {
1865  uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1866  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_LISTENING);
1867 }
1868 
1870  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1871 {
1872  uint64_t values[1] = {static_cast<uint64_t> (err_)};
1873  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_BIND_FAILED);
1874 }
1875 
1877  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1878 {
1879  uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1880  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPTED);
1881 }
1882 
1884  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1885 {
1886  uint64_t values[1] = {static_cast<uint64_t> (err_)};
1887  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPT_FAILED);
1888 }
1889 
1891  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1892 {
1893  uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1894  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSED);
1895 }
1896 
1898  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1899 {
1900  uint64_t values[1] = {static_cast<uint64_t> (err_)};
1901  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSE_FAILED);
1902 }
1903 
1905  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1906 {
1907  uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1908  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_DISCONNECTED);
1909 }
1910 
1912  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1913 {
1914  uint64_t values[1] = {static_cast<uint64_t> (err_)};
1915  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
1916 }
1917 
1919  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1920 {
1921  uint64_t values[1] = {static_cast<uint64_t> (err_)};
1922  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
1923 }
1924 
1926  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1927 {
1928  uint64_t values[1] = {static_cast<uint64_t> (err_)};
1929  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
1930 }
1931 
1933  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1934 {
1935  uint64_t values[1] = {static_cast<uint64_t> (err_)};
1936  event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
1937 }
1938 
1939 void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
1940  uint64_t values_[],
1941  uint64_t values_count_,
1942  uint64_t type_)
1943 {
1944  scoped_lock_t lock (_monitor_sync);
1945  if (_monitor_events & type_) {
1946  monitor_event (type_, values_, values_count_, endpoint_uri_pair_);
1947  }
1948 }
1949 
1950 // Send a monitor event
1952  uint64_t event_,
1953  const uint64_t values_[],
1954  uint64_t values_count_,
1955  const endpoint_uri_pair_t &endpoint_uri_pair_) const
1956 {
1957  // this is a private method which is only called from
1958  // contexts where the _monitor_sync mutex has been locked before
1959 
1960  if (_monitor_socket) {
1961  zmq_msg_t msg;
1962 
1963  switch (options.monitor_event_version) {
1964  case 1: {
1965  // The API should not allow to activate unsupported events
1966  zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
1967  // v1 only allows one value
1968  zmq_assert (values_count_ == 1);
1969  zmq_assert (values_[0]
1970  <= std::numeric_limits<uint32_t>::max ());
1971 
1972  // Send event and value in first frame
1973  const uint16_t event = static_cast<uint16_t> (event_);
1974  const uint32_t value = static_cast<uint32_t> (values_[0]);
1975  zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
1976  uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
1977  // Avoid dereferencing uint32_t on unaligned address
1978  memcpy (data + 0, &event, sizeof (event));
1979  memcpy (data + sizeof (event), &value, sizeof (value));
1980  zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1981 
1982  const std::string &endpoint_uri =
1983  endpoint_uri_pair_.identifier ();
1984 
1985  // Send address in second frame
1986  zmq_msg_init_size (&msg, endpoint_uri.size ());
1987  memcpy (zmq_msg_data (&msg), endpoint_uri.c_str (),
1988  endpoint_uri.size ());
1989  zmq_msg_send (&msg, _monitor_socket, 0);
1990  } break;
1991  case 2: {
1992  // Send event in first frame (64bit unsigned)
1993  zmq_msg_init_size (&msg, sizeof (event_));
1994  memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
1995  zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1996 
1997  // Send number of values that will follow in second frame
1998  zmq_msg_init_size (&msg, sizeof (values_count_));
1999  memcpy (zmq_msg_data (&msg), &values_count_,
2000  sizeof (values_count_));
2001  zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2002 
2003  // Send values in third-Nth frames (64bit unsigned)
2004  for (uint64_t i = 0; i < values_count_; ++i) {
2005  zmq_msg_init_size (&msg, sizeof (values_[i]));
2006  memcpy (zmq_msg_data (&msg), &values_[i],
2007  sizeof (values_[i]));
2008  zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2009  }
2010 
2011  // Send local endpoint URI in second-to-last frame (string)
2012  zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ());
2013  memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (),
2014  endpoint_uri_pair_.local.size ());
2015  zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2016 
2017  // Send remote endpoint URI in last frame (string)
2018  zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ());
2019  memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (),
2020  endpoint_uri_pair_.remote.size ());
2021  zmq_msg_send (&msg, _monitor_socket, 0);
2022  } break;
2023  }
2024  }
2025 }
2026 
2027 void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
2028 {
2029  // this is a private method which is only called from
2030  // contexts where the _monitor_sync mutex has been locked before
2031 
2032  if (_monitor_socket) {
2033  if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
2034  && send_monitor_stopped_event_) {
2035  uint64_t values[1] = {0};
2036  monitor_event (ZMQ_EVENT_MONITOR_STOPPED, values, 1,
2037  endpoint_uri_pair_t ());
2038  }
2039  zmq_close (_monitor_socket);
2040  _monitor_socket = NULL;
2041  _monitor_events = 0;
2042  }
2043 }
2044 
2046 {
2047  return _disconnected;
2048 }
2049 
2051  uint32_t tid_,
2052  int sid_) :
2053  socket_base_t (parent_, tid_, sid_)
2054 {
2055 }
2056 
2058 {
2059  zmq_assert (_out_pipes.empty ());
2060 }
2061 
2063  const void *optval_,
2064  size_t optvallen_)
2065 {
2066  switch (option_) {
2068  // TODO why isn't it possible to set an empty connect_routing_id
2069  // (which is the default value)
2070  if (optval_ && optvallen_) {
2071  _connect_routing_id.assign (static_cast<const char *> (optval_),
2072  optvallen_);
2073  return 0;
2074  }
2075  break;
2076  }
2077  errno = EINVAL;
2078  return -1;
2079 }
2080 
2082 {
2083  const out_pipes_t::iterator end = _out_pipes.end ();
2084  out_pipes_t::iterator it;
2085  for (it = _out_pipes.begin (); it != end; ++it)
2086  if (it->second.pipe == pipe_)
2087  break;
2088 
2089  zmq_assert (it != end);
2090  zmq_assert (!it->second.active);
2091  it->second.active = true;
2092 }
2093 
2095 {
2096  std::string res = ZMQ_MOVE (_connect_routing_id);
2097  _connect_routing_id.clear ();
2098  return res;
2099 }
2100 
2102 {
2103  return !_connect_routing_id.empty ();
2104 }
2105 
2107  pipe_t *pipe_)
2108 {
2109  // Add the record into output pipes lookup table
2110  const out_pipe_t outpipe = {pipe_, true};
2111  const bool ok =
2112  _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id_), outpipe)
2113  .second;
2114  zmq_assert (ok);
2115 }
2116 
2117 bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id_) const
2118 {
2119  return 0 != _out_pipes.count (routing_id_);
2120 }
2121 
2124 {
2125  // TODO we could probably avoid constructor a temporary blob_t to call this function
2126  out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2127  return it == _out_pipes.end () ? NULL : &it->second;
2128 }
2129 
2132 {
2133  // TODO we could probably avoid constructor a temporary blob_t to call this function
2134  const out_pipes_t::const_iterator it = _out_pipes.find (routing_id_);
2135  return it == _out_pipes.end () ? NULL : &it->second;
2136 }
2137 
2139 {
2140  const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
2141  zmq_assert (erased);
2142 }
2143 
2146 {
2147  const out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2148  out_pipe_t res = {NULL, false};
2149  if (it != _out_pipes.end ()) {
2150  res = it->second;
2151  _out_pipes.erase (it);
2152  }
2153  return res;
2154 }
zmq::socket_base_t::query_pipes_stats
int query_pipes_stats()
Definition: socket_base.cpp:1563
zmq::socket_base_t::event_handshake_failed_no_detail
void event_handshake_failed_no_detail(const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
Definition: socket_base.cpp:1911
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
zmq::socket_base_t::inprocs_t::_inprocs
map_t _inprocs
Definition: socket_base.hpp:229
gather.hpp
zmq::session_base_t
Definition: session_base.hpp:21
tipc_listener.hpp
zmq::options_t::hello_msg
std::vector< unsigned char > hello_msg
Definition: options.hpp:278
zmq::socket_base_t::extract_flags
void extract_flags(const msg_t *msg_)
Definition: socket_base.cpp:1755
zmq::tcp_address_t::to_string
int to_string(std::string &addr_) const
Definition: tcp_address.cpp:111
zmq::protocol_name::udp
static const char udp[]
Definition: address.hpp:39
ZMQ_RECONNECT_STOP_AFTER_DISCONNECT
#define ZMQ_RECONNECT_STOP_AFTER_DISCONNECT
Definition: zmq_draft.h:68
zmq::socket_base_t::xhas_in
virtual bool xhas_in()
Definition: socket_base.cpp:1621
zmq::server_t
Definition: server.hpp:21
zmq::endpoint_type_none
@ endpoint_type_none
Definition: endpoint.hpp:12
udp_address.hpp
zmq::routing_socket_base_t::erase_out_pipe
void erase_out_pipe(const pipe_t *pipe_)
Definition: socket_base.cpp:2138
client.hpp
zmq::make_unconnected_connect_endpoint_pair
endpoint_uri_pair_t make_unconnected_connect_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:7
zmq::socket_base_t::start_reaping
void start_reaping(poller_t *poller_)
Definition: socket_base.cpp:1419
ZMQ_SERVER
#define ZMQ_SERVER
Definition: zmq_draft.h:14
scatter.hpp
ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
#define ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
Definition: zmq.h:414
EMTHREAD
#define EMTHREAD
Definition: zmq.h:162
zmq::socket_base_t::out_event
void out_event() ZMQ_FINAL
Definition: socket_base.cpp:1678
zmq::socket_base_t::_sync
mutex_t _sync
Definition: socket_base.hpp:191
zmq::options_t::zero_copy
bool zero_copy
Definition: options.hpp:259
end
GLuint GLuint end
Definition: glcorearb.h:2858
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
#define ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
Definition: zmq.h:423
ENOTSUP
#define ENOTSUP
Definition: zmq.h:104
ZMQ_GATHER
#define ZMQ_GATHER
Definition: zmq_draft.h:18
NULL
NULL
Definition: test_security_zap.cpp:405
ZMQ_XPUB
#define ZMQ_XPUB
Definition: zmq.h:267
zmq::socket_base_t::remove_signaler
void remove_signaler(signaler_t *s_)
Definition: socket_base.cpp:504
zmq::options_t::rcvhwm
int rcvhwm
Definition: options.hpp:47
zmq::socket_base_t::check_protocol
int check_protocol(const std::string &protocol_) const
Definition: socket_base.cpp:318
rep.hpp
push.hpp
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
ZMQ_EVENTS
#define ZMQ_EVENTS
Definition: zmq.h:286
zmq::routing_socket_base_t::xsetsockopt
int xsetsockopt(int option_, const void *optval_, size_t optvallen_) ZMQ_OVERRIDE
Definition: socket_base.cpp:2062
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
zmq::wss_address_t
Definition: wss_address.hpp:10
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
ZMQ_STREAM
#define ZMQ_STREAM
Definition: zmq.h:269
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
zmq::socket_base_t::add_endpoint
void add_endpoint(const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
Definition: socket_base.cpp:1125
zmq::socket_base_t::xhas_out
virtual bool xhas_out()
Definition: socket_base.cpp:1610
EINVAL
#define EINVAL
Definition: errno.hpp:25
zmq::atomic_value_t::store
void store(const int value_) ZMQ_NOEXCEPT
Definition: atomic_ptr.hpp:221
zmq::routing_socket_base_t::lookup_out_pipe
out_pipe_t * lookup_out_pipe(const blob_t &routing_id_)
Definition: socket_base.cpp:2123
zmq::session_base_t::create
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
Definition: session_base.cpp:27
zmq::protocol_name::tcp
static const char tcp[]
Definition: address.hpp:38
ipc_address.hpp
zmq::endpoint_uri_pair_t::local
std::string local
Definition: endpoint.hpp:34
s
XmlRpcServer s
zmq_msg_send
ZMQ_EXPORT int zmq_msg_send(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:609
zmq::socket_base_t::_disconnected
bool _disconnected
Definition: socket_base.hpp:330
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
ZMQ_CLIENT
#define ZMQ_CLIENT
Definition: zmq_draft.h:15
zmq::object_t::process_command
void process_command(const zmq::command_t &cmd_)
Definition: object.cpp:43
ZMQ_EVENT_CLOSED
#define ZMQ_EVENT_CLOSED
Definition: zmq.h:408
zmq::dealer_t
Definition: dealer.hpp:19
ZMQ_EVENT_BIND_FAILED
#define ZMQ_EVENT_BIND_FAILED
Definition: zmq.h:405
zmq::socket_base_t::attach_pipe
void attach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false, bool locally_initiated_=false)
Definition: socket_base.cpp:382
ZMQ_RADIO
#define ZMQ_RADIO
Definition: zmq_draft.h:16
zmq::socket_base_t::check_tag
bool check_tag() const
Definition: socket_base.cpp:117
zmq::socket_base_t::process_term
void process_term(int linger_) ZMQ_FINAL
Definition: socket_base.cpp:1519
precompiled.hpp
zmq::socket_base_t::xgetsockopt
virtual int xgetsockopt(int option_, void *optval_, size_t *optvallen_)
Definition: socket_base.cpp:1604
ZMQ_LAST_ENDPOINT
#define ZMQ_LAST_ENDPOINT
Definition: zmq.h:298
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
vmci_listener.hpp
zmq::socket_base_t::_reaper_signaler
signaler_t * _reaper_signaler
Definition: socket_base.hpp:322
pull.hpp
zmq::socket_base_t::_handle
poller_t::handle_t _handle
Definition: socket_base.hpp:295
zmq::session_base_t::attach_pipe
void attach_pipe(zmq::pipe_t *pipe_)
Definition: session_base.cpp:135
ZMQ_EVENT_CLOSE_FAILED
#define ZMQ_EVENT_CLOSE_FAILED
Definition: zmq.h:409
mailbox_safe.hpp
zmq::own_t::process_term
void process_term(int linger_) ZMQ_OVERRIDE
Definition: own.cpp:128
zmq::socket_base_t::getsockopt
int getsockopt(int option_, void *optval_, size_t *optvallen_)
Definition: socket_base.cpp:426
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
zmq::socket_base_t::_rcvmore
bool _rcvmore
Definition: socket_base.hpp:304
zmq::do_getsockopt
int do_getsockopt(void *optval_, size_t *optvallen_, const void *value_, size_t value_len_)
Definition: options.cpp:39
if
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END if(!upb_strtable_init(&intern->table, UPB_CTYPE_UINT64))
Definition: php/ext/google/protobuf/map.c:232
errno
int errno
zmq::socket_base_t::connect_internal
int connect_internal(const char *endpoint_uri_)
Definition: socket_base.cpp:756
ZMQ_XSUB
#define ZMQ_XSUB
Definition: zmq.h:268
zmq::socket_base_t::setsockopt
int setsockopt(int option_, const void *optval_, size_t optvallen_)
Definition: socket_base.cpp:401
zmq::udp_address_t::resolve
int resolve(const char *name_, bool bind_, bool ipv6_)
Definition: udp_address.cpp:32
zmq::socket_base_t
Definition: socket_base.hpp:31
ZMQ_EVENT_PIPES_STATS
#define ZMQ_EVENT_PIPES_STATS
Definition: zmq_draft.h:150
zmq::address_t::resolved
union zmq::address_t::@23 resolved
address
const char * address
Definition: builds/zos/test_fork.cpp:6
zmq::socket_base_t::process_pipe_stats_publish
void process_pipe_stats_publish(uint64_t outbound_queue_count_, uint64_t inbound_queue_count_, endpoint_uri_pair_t *endpoint_pair_) ZMQ_FINAL
Definition: socket_base.cpp:1544
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
zmq::socket_base_t::endpoint_pipe_t
std::pair< own_t *, pipe_t * > endpoint_pipe_t
Definition: socket_base.hpp:215
zmq::socket_base_t::xread_activated
virtual void xread_activated(pipe_t *pipe_)
Definition: socket_base.cpp:1646
zmq::socket_base_t::get_mailbox
i_mailbox * get_mailbox() const
Definition: socket_base.cpp:280
pub.hpp
ZMQ_REQ
#define ZMQ_REQ
Definition: zmq.h:261
zmq::socket_base_t::check_destroy
void check_destroy()
Definition: socket_base.cpp:1688
zmq_msg_data
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
get_fd
SETUP_TEARDOWN_TESTCONTEXT fd_t get_fd(void *socket_)
Definition: test_poller.cpp:17
ZMQ_EVENT_LISTENING
#define ZMQ_EVENT_LISTENING
Definition: zmq.h:404
ok
ROSCPP_DECL bool ok()
ZMQ_POLLIN
#define ZMQ_POLLIN
Definition: zmq.h:482
ctx.hpp
zmq::socket_base_t::event
void event(const endpoint_uri_pair_t &endpoint_uri_pair_, uint64_t values_[], uint64_t values_count_, uint64_t type_)
Definition: socket_base.cpp:1939
zmq::socket_base_t::add_signaler
void add_signaler(signaler_t *s_)
Definition: socket_base.cpp:496
values
GLenum GLsizei GLsizei GLint * values
Definition: glcorearb.h:3591
ZMQ_ZERO_COPY_RECV
#define ZMQ_ZERO_COPY_RECV
Definition: zmq_draft.h:71
router.hpp
zmq::udp_address_t
Definition: udp_address.hpp:17
zmq::socket_base_t::xwrite_activated
virtual void xwrite_activated(pipe_t *pipe_)
Definition: socket_base.cpp:1650
zmq::socket_base_t::event_accepted
void event_accepted(const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
Definition: socket_base.cpp:1876
range
GLenum GLint * range
Definition: glcorearb.h:3963
zmq::socket_base_t::resolve_tcp_addr
std::string resolve_tcp_addr(std::string endpoint_uri_, const char *tcp_address_)
Definition: socket_base.cpp:1098
zmq::options_t::can_send_hello_msg
bool can_send_hello_msg
Definition: options.hpp:279
zmq::socket_base_t::event_handshake_failed_auth
void event_handshake_failed_auth(const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
Definition: socket_base.cpp:1925
zmq::socket_base_t::hiccuped
void hiccuped(pipe_t *pipe_) ZMQ_FINAL
Definition: socket_base.cpp:1716
zmq::socket_base_t::~socket_base_t
~socket_base_t() ZMQ_OVERRIDE
Definition: socket_base.cpp:266
ZMQ_CONNECT_ROUTING_ID
#define ZMQ_CONNECT_ROUTING_ID
Definition: zmq.h:323
ZMQ_POLLOUT
#define ZMQ_POLLOUT
Definition: zmq.h:483
zmq::socket_base_t::event_bind_failed
void event_bind_failed(const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
Definition: socket_base.cpp:1869
zmq::tcp_address_t
Definition: tcp_address.hpp:15
ZMQ_EVENT_HANDSHAKE_SUCCEEDED
#define ZMQ_EVENT_HANDSHAKE_SUCCEEDED
Definition: zmq.h:417
zmq::socket_base_t::process_destroy
void process_destroy() ZMQ_FINAL
Definition: socket_base.cpp:1593
zmq::routing_socket_base_t::routing_socket_base_t
routing_socket_base_t(class ctx_t *parent_, uint32_t tid_, int sid_)
Definition: socket_base.cpp:2050
ZMQ_RCVHWM
#define ZMQ_RCVHWM
Definition: zmq.h:294
zmq::socket_base_t::event_handshake_failed_protocol
void event_handshake_failed_protocol(const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
Definition: socket_base.cpp:1918
zmq::socket_base_t::_destroyed
bool _destroyed
Definition: socket_base.hpp:250
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
zmq::router_t
Definition: router.hpp:21
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::socket_base_t::get_peer_state
virtual int get_peer_state(const void *routing_id_, size_t routing_id_size_) const
Definition: socket_base.cpp:255
peer.hpp
zmq::socket_base_t::_poller
poller_t * _poller
Definition: socket_base.hpp:294
ZMQ_CHANNEL
#define ZMQ_CHANNEL
Definition: zmq_draft.h:22
zmq::socket_base_t::_thread_safe
const bool _thread_safe
Definition: socket_base.hpp:319
server.hpp
macros.hpp
zmq_setsockopt
ZMQ_EXPORT int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
ZMQ_EVENT_CONNECTED
#define ZMQ_EVENT_CONNECTED
Definition: zmq.h:401
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
zmq::socket_base_t::_last_tsc
uint64_t _last_tsc
Definition: socket_base.hpp:298
event
struct _cl_event * event
Definition: glcorearb.h:4163
sub.hpp
zmq::socket_base_t::event_handshake_succeeded
void event_handshake_succeeded(const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
Definition: socket_base.cpp:1932
zmq::socket_base_t::bind
int bind(const char *endpoint_uri_)
Definition: socket_base.cpp:512
zmq::socket_base_t::event_connected
void event_connected(const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
Definition: socket_base.cpp:1841
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
zmq::msg_t::close
int close()
Definition: msg.cpp:242
ZMQ_ROUTER
#define ZMQ_ROUTER
Definition: zmq.h:264
zmq::socket_base_t::term_endpoint
int term_endpoint(const char *endpoint_uri_)
Definition: socket_base.cpp:1137
zmq::own_t::options
options_t options
Definition: own.hpp:78
zmq::tcp_address_t::resolve
int resolve(const char *name_, bool local_, bool ipv6_)
Definition: tcp_address.cpp:46
zmq::make_unconnected_bind_endpoint_pair
endpoint_uri_pair_t make_unconnected_bind_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:14
zmq::socket_base_t::event_disconnected
void event_disconnected(const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
Definition: socket_base.cpp:1904
ETERM
#define ETERM
Definition: zmq.h:161
zmq_msg_t
Definition: zmq.h:218
ZMQ_REP
#define ZMQ_REP
Definition: zmq.h:262
s_
std::string s_
Definition: gmock-matchers_test.cc:4128
mailbox.hpp
tcp_address.hpp
zmq::scoped_optional_lock_t
Definition: mutex.hpp:156
zmq::routing_socket_base_t::out_pipe_t
Definition: socket_base.hpp:349
zmq::socket_base_t::process_term_endpoint
void process_term_endpoint(std::string *endpoint_) ZMQ_FINAL
Definition: socket_base.cpp:1538
zmq::socket_base_t::timer_event
void timer_event(int id_) ZMQ_FINAL
Definition: socket_base.cpp:1683
zmq::socket_base_t::process_stop
void process_stop() ZMQ_FINAL
Definition: socket_base.cpp:1502
pipe.hpp
zmq_bind
ZMQ_EXPORT int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
zmq::socket_base_t::_monitor_sync
mutex_t _monitor_sync
Definition: socket_base.hpp:325
pgm_socket.hpp
zmq::send_hello_msg
void send_hello_msg(pipe_t *pipe_, const options_t &options_)
Definition: pipe.cpp:64
zmq::socket_base_t::send
int send(zmq::msg_t *msg_, int flags_)
Definition: socket_base.cpp:1205
zmq::socket_base_t::event_connect_delayed
void event_connect_delayed(const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
Definition: socket_base.cpp:1848
tcp_connecter.hpp
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
zmq::options_t::recv_routing_id
bool recv_routing_id
Definition: options.hpp:133
zmq_socket
ZMQ_EXPORT void * zmq_socket(void *, int type_)
Definition: zmq.cpp:230
zmq::msg_t::check
bool check() const
Definition: msg.cpp:24
zmq::endpoint_uri_pair_t::remote
std::string remote
Definition: endpoint.hpp:34
zmq::routing_socket_base_t::connect_routing_id_is_set
bool connect_routing_id_is_set() const
Definition: socket_base.cpp:2101
zmq::socket_base_t::connect
int connect(const char *endpoint_uri_)
Definition: socket_base.cpp:750
google::protobuf::isxdigit
bool isxdigit(char c)
Definition: strutil.cc:73
zmq::socket_base_t::event_close_failed
void event_close_failed(const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
Definition: socket_base.cpp:1897
zmq::endpoint_uri_pair_t::identifier
const std::string & identifier() const
Definition: endpoint.hpp:27
address.hpp
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
req.hpp
zmq::protocol_name::inproc
static const char inproc[]
Definition: address.hpp:37
zmq::socket_base_t::inprocs_t::erase_pipes
int erase_pipes(const std::string &endpoint_uri_str_)
Definition: socket_base.cpp:89
zmq::socket_base_t::is_thread_safe
bool is_thread_safe() const
Definition: socket_base.cpp:122
zmq::i_mailbox
Definition: i_mailbox.hpp:13
zmq::socket_base_t::inprocs_t::emplace
void emplace(const char *endpoint_uri_, pipe_t *pipe_)
Definition: socket_base.cpp:83
zmq::socket_base_t::_monitor_events
int64_t _monitor_events
Definition: socket_base.hpp:313
zmq::options_t::linger
atomic_value_t linger
Definition: options.hpp:83
zmq::address_t::udp_addr
udp_address_t * udp_addr
Definition: address.hpp:82
ZMQ_EVENT_ACCEPT_FAILED
#define ZMQ_EVENT_ACCEPT_FAILED
Definition: zmq.h:407
zmq::endpoint_t
Definition: ctx.hpp:31
ZMQ_PEER
#define ZMQ_PEER
Definition: zmq_draft.h:21
zmq::array_t< pipe_t, 3 >::size_type
std::vector< pipe_t * >::size_type size_type
Definition: array.hpp:52
zmq::socket_base_t::stop
void stop()
Definition: socket_base.cpp:285
zmq::get_effective_conflate_option
bool get_effective_conflate_option(const options_t &options)
Definition: options.hpp:303
zmq::options_t::disconnect_msg
std::vector< unsigned char > disconnect_msg
Definition: options.hpp:282
zmq::ws_address_t
Definition: ws_address.hpp:15
zmq::clock_t::rdtsc
static uint64_t rdtsc()
Definition: clock.cpp:213
zmq_close
ZMQ_EXPORT int zmq_close(void *s_)
Definition: zmq.cpp:241
zmq::object_t
Definition: object.hpp:28
zmq::msg_t::init
int init()
Definition: msg.cpp:50
EPROTONOSUPPORT
#define EPROTONOSUPPORT
Definition: zmq.h:107
zmq::socket_base_t::process_bind
void process_bind(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: socket_base.cpp:1514
i
int i
Definition: gmock-matchers_test.cc:764
zmq::socket_base_t::stop_monitor
void stop_monitor(bool send_monitor_stopped_event_=true)
Definition: socket_base.cpp:2027
zmq::blob_t
Definition: blob.hpp:46
zmq::signaler_t
Definition: signaler.hpp:20
msg.hpp
zmq::xsub_t
Definition: xsub.hpp:22
zmq::socket_base_t::xleave
virtual int xleave(const char *group_)
Definition: socket_base.cpp:1633
zmq::address_t::to_string
int to_string(std::string &addr_) const
Definition: address.cpp:65
zmq::msg_t::reset_metadata
void reset_metadata()
Definition: msg.cpp:456
zmq::socket_base_t::update_pipe_options
void update_pipe_options(int option_)
Definition: socket_base.cpp:1583
ZMQ_SNDHWM
#define ZMQ_SNDHWM
Definition: zmq.h:293
ZMQ_LINGER
#define ZMQ_LINGER
Definition: zmq.h:288
zmq::msg_t::more
@ more
Definition: msg.hpp:55
ZMQ_EVENT_CONNECT_DELAYED
#define ZMQ_EVENT_CONNECT_DELAYED
Definition: zmq.h:402
zmq::address_t
Definition: address.hpp:64
dgram.hpp
zmq::msg_t::routing_id
uint32_t routing_id
Definition: msg.hpp:233
zmq::routing_socket_base_t::~routing_socket_base_t
~routing_socket_base_t() ZMQ_OVERRIDE
Definition: socket_base.cpp:2057
io_thread.hpp
zmq::msg_t::reset_flags
void reset_flags(unsigned char flags_)
Definition: msg.cpp:438
dish.hpp
ZMQ_DGRAM
#define ZMQ_DGRAM
Definition: zmq_draft.h:20
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
ZMQ_SCATTER
#define ZMQ_SCATTER
Definition: zmq_draft.h:19
zmq::socket_base_t::write_activated
void write_activated(pipe_t *pipe_) ZMQ_FINAL
Definition: socket_base.cpp:1711
zmq::socket_base_t::xsend
virtual int xsend(zmq::msg_t *msg_)
Definition: socket_base.cpp:1615
zmq::socket_base_t::parse_uri
static int parse_uri(const char *uri_, std::string &protocol_, std::string &path_)
Definition: socket_base.cpp:296
zmq::routing_socket_base_t::add_out_pipe
void add_out_pipe(blob_t routing_id_, pipe_t *pipe_)
Definition: socket_base.cpp:2106
zmq::inbound_poll_rate
@ inbound_poll_rate
Definition: config.hpp:26
ZMQ_EVENT_ACCEPTED
#define ZMQ_EVENT_ACCEPTED
Definition: zmq.h:406
ZMQ_EVENT_CONNECT_RETRIED
#define ZMQ_EVENT_CONNECT_RETRIED
Definition: zmq.h:403
ZMQ_DISH
#define ZMQ_DISH
Definition: zmq_draft.h:17
zmq::options_t::ipv6
bool ipv6
Definition: options.hpp:118
socket_base.hpp
ZMQ_EVENT_DISCONNECTED
#define ZMQ_EVENT_DISCONNECTED
Definition: zmq.h:410
size
GLsizeiptr size
Definition: glcorearb.h:2943
xpub.hpp
ws_address.hpp
ZMQ_IPV6
#define ZMQ_IPV6
Definition: zmq.h:307
zmq::socket_base_t::leave
int leave(const char *group_)
Definition: socket_base.cpp:489
zmq::own_t
Definition: own.hpp:21
zmq::socket_base_t::has_in
bool has_in()
Definition: socket_base.cpp:1409
zmq::socket_base_t::close
int close()
Definition: socket_base.cpp:1389
zmq_msg_init_size
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
Definition: zmq.cpp:592
dealer.hpp
m
const upb_json_parsermethod * m
Definition: ruby/ext/google/protobuf_c/upb.h:10501
zmq::socket_base_t::_ctx_terminated
bool _ctx_terminated
Definition: socket_base.hpp:245
zmq::socket_base_t::create
static socket_base_t * create(int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: socket_base.cpp:127
err.hpp
zmq::socket_base_t::_monitor_socket
void * _monitor_socket
Definition: socket_base.hpp:310
tcp_listener.hpp
zmq::socket_base_t::pipe_terminated
void pipe_terminated(pipe_t *pipe_) ZMQ_FINAL
Definition: socket_base.cpp:1725
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
likely.hpp
zmq::socket_base_t::socket_base_t
socket_base_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_=false)
Definition: socket_base.cpp:213
ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL
#define ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL
Definition: zmq.h:420
zmq::options_t::socket_id
int socket_id
Definition: options.hpp:201
ZMQ_MOVE
#define ZMQ_MOVE(x)
Definition: blob.hpp:33
zmq::send_routing_id
void send_routing_id(pipe_t *pipe_, const options_t &options_)
Definition: pipe.cpp:52
zmq::socket_base_t::event_connect_retried
void event_connect_retried(const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
Definition: socket_base.cpp:1855
zmq::socket_base_t::event_closed
void event_closed(const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
Definition: socket_base.cpp:1890
stream.hpp
zmq::socket_base_t::is_disconnected
bool is_disconnected() const
Definition: socket_base.cpp:2045
zmq::command_t::destination
zmq::object_t * destination
Definition: command.hpp:24
zmq::routing_socket_base_t::has_out_pipe
bool has_out_pipe(const blob_t &routing_id_) const
Definition: socket_base.cpp:2117
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
zmq::socket_base_t::_ticks
int _ticks
Definition: socket_base.hpp:301
zmq::options_t::sndhwm
int sndhwm
Definition: options.hpp:46
zmq::socket_base_t::xrecv
virtual int xrecv(zmq::msg_t *msg_)
Definition: socket_base.cpp:1640
zmq::socket_base_t::process_commands
int process_commands(int timeout_, bool throttle_)
Definition: socket_base.cpp:1452
zmq::socket_base_t::_inprocs
inprocs_t _inprocs
Definition: socket_base.hpp:231
zmq::address_t::tcp_addr
tcp_address_t * tcp_addr
Definition: address.hpp:81
zmq::socket_base_t::xjoin
virtual int xjoin(const char *group_)
Definition: socket_base.cpp:1626
zmq::pipepair
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], const int hwms_[2], const bool conflate_[2])
zmq::socket_base_t::join
int join(const char *group_)
Definition: socket_base.cpp:482
zmq::own_t::process_destroy
virtual void process_destroy()
Definition: own.cpp:182
zmq::routing_socket_base_t::extract_connect_routing_id
std::string extract_connect_routing_id()
Definition: socket_base.cpp:2094
ZMQ_FD
#define ZMQ_FD
Definition: zmq.h:285
ENOCOMPATPROTO
#define ENOCOMPATPROTO
Definition: zmq.h:160
wss_address.hpp
vmci_address.hpp
xsub.hpp
zmq::socket_base_t::recv
int recv(zmq::msg_t *msg_, int flags_)
Definition: socket_base.cpp:1293
zmq::socket_base_t::event_accept_failed
void event_accept_failed(const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
Definition: socket_base.cpp:1883
zmq::options_t::can_recv_disconnect_msg
bool can_recv_disconnect_msg
Definition: options.hpp:283
zmq::routing_socket_base_t::xwrite_activated
void xwrite_activated(pipe_t *pipe_) ZMQ_FINAL
Definition: socket_base.cpp:2081
tipc_address.hpp
value
GLsizei const GLfloat * value
Definition: glcorearb.h:3093
zmq::routing_socket_base_t::try_erase_out_pipe
out_pipe_t try_erase_out_pipe(const blob_t &routing_id_)
Definition: socket_base.cpp:2145
zmq::socket_base_t::in_event
void in_event() ZMQ_FINAL
Definition: socket_base.cpp:1660
zmq::socket_base_t::monitor
int monitor(const char *endpoint_, uint64_t events_, int event_version_, int type_)
Definition: socket_base.cpp:1765
zmq::socket_base_t::event_listening
void event_listening(const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
Definition: socket_base.cpp:1862
zmq::endpoint_t::options
options_t options
Definition: ctx.hpp:34
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
session_base.hpp
EFAULT
#define EFAULT
Definition: errno.hpp:17
zmq::socket_base_t::xhiccuped
virtual void xhiccuped(pipe_t *pipe_)
Definition: socket_base.cpp:1655
pair.hpp
false
#define false
Definition: cJSON.c:70
ZMQ_HAVE_TIPC
#define ZMQ_HAVE_TIPC
Definition: vxworks/platform.hpp:263
zmq::command_t
Definition: command.hpp:21
zmq::socket_base_t::xsetsockopt
virtual int xsetsockopt(int option_, const void *optval_, size_t optvallen_)
Definition: socket_base.cpp:1598
ws_listener.hpp
ZMQ_RCVMORE
#define ZMQ_RCVMORE
Definition: zmq.h:284
check
static void check(upb_inttable *t)
Definition: php/ext/google/protobuf/upb.c:5088
zmq::msg_t::set_flags
void set_flags(unsigned char flags_)
Definition: msg.cpp:433
ipc_listener.hpp
zmq::socket_base_t::_mailbox
i_mailbox * _mailbox
Definition: socket_base.hpp:287
zmq::socket_base_t::read_activated
void read_activated(pipe_t *pipe_) ZMQ_FINAL
Definition: socket_base.cpp:1706
zmq::socket_base_t::has_out
bool has_out()
Definition: socket_base.cpp:1414
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
zmq::socket_base_t::inprocs_t::erase_pipe
void erase_pipe(const pipe_t *pipe_)
Definition: socket_base.cpp:107
zmq::max_command_delay
@ max_command_delay
Definition: config.hpp:43
channel.hpp
zmq::msg_t
Definition: msg.hpp:33
ZMQ_BLOCKY
#define ZMQ_BLOCKY
Definition: zmq.h:331
ZMQ_THREAD_SAFE
#define ZMQ_THREAD_SAFE
Definition: zmq.h:342
zmq::scoped_lock_t
Definition: mutex.hpp:143
zmq::socket_base_t::monitor_event
void monitor_event(uint64_t event_, const uint64_t values_[], uint64_t values_count_, const endpoint_uri_pair_t &endpoint_uri_pair_) const
Definition: socket_base.cpp:1951
radio.hpp
unlikely
#define unlikely(x)
Definition: likely.hpp:11
zmq::socket_base_t::_tag
uint32_t _tag
Definition: socket_base.hpp:242
ZMQ_EVENT_MONITOR_STOPPED
#define ZMQ_EVENT_MONITOR_STOPPED
Definition: zmq.h:411
zmq::endpoint_t::socket
socket_base_t * socket
Definition: ctx.hpp:33
zmq::xpub_t
Definition: xpub.hpp:20


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58