session_base.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #include "session_base.hpp"
6 #include "i_engine.hpp"
7 #include "err.hpp"
8 #include "pipe.hpp"
9 #include "likely.hpp"
10 #include "tcp_connecter.hpp"
11 #include "ws_connecter.hpp"
12 #include "ipc_connecter.hpp"
13 #include "tipc_connecter.hpp"
14 #include "socks_connecter.hpp"
15 #include "vmci_connecter.hpp"
16 #include "pgm_sender.hpp"
17 #include "pgm_receiver.hpp"
18 #include "address.hpp"
19 #include "norm_engine.hpp"
20 #include "udp_engine.hpp"
21 
22 #include "ctx.hpp"
23 #include "req.hpp"
24 #include "radio.hpp"
25 #include "dish.hpp"
26 
27 zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
28  bool active_,
29  class socket_base_t *socket_,
30  const options_t &options_,
31  address_t *addr_)
32 {
34  switch (options_.type) {
35  case ZMQ_REQ:
36  s = new (std::nothrow)
37  req_session_t (io_thread_, active_, socket_, options_, addr_);
38  break;
39  case ZMQ_RADIO:
40  s = new (std::nothrow)
41  radio_session_t (io_thread_, active_, socket_, options_, addr_);
42  break;
43  case ZMQ_DISH:
44  s = new (std::nothrow)
45  dish_session_t (io_thread_, active_, socket_, options_, addr_);
46  break;
47  case ZMQ_DEALER:
48  case ZMQ_REP:
49  case ZMQ_ROUTER:
50  case ZMQ_PUB:
51  case ZMQ_XPUB:
52  case ZMQ_SUB:
53  case ZMQ_XSUB:
54  case ZMQ_PUSH:
55  case ZMQ_PULL:
56  case ZMQ_PAIR:
57  case ZMQ_STREAM:
58  case ZMQ_SERVER:
59  case ZMQ_CLIENT:
60  case ZMQ_GATHER:
61  case ZMQ_SCATTER:
62  case ZMQ_DGRAM:
63  case ZMQ_PEER:
64  case ZMQ_CHANNEL:
65 #ifdef ZMQ_BUILD_DRAFT_API
66  if (options_.can_send_hello_msg && options_.hello_msg.size () > 0)
67  s = new (std::nothrow) hello_msg_session_t (
68  io_thread_, active_, socket_, options_, addr_);
69  else
70  s = new (std::nothrow) session_base_t (
71  io_thread_, active_, socket_, options_, addr_);
72 
73  break;
74 #else
75  s = new (std::nothrow)
76  session_base_t (io_thread_, active_, socket_, options_, addr_);
77  break;
78 #endif
79 
80  default:
81  errno = EINVAL;
82  return NULL;
83  }
84  alloc_assert (s);
85  return s;
86 }
87 
88 zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
89  bool active_,
90  class socket_base_t *socket_,
91  const options_t &options_,
92  address_t *addr_) :
93  own_t (io_thread_, options_),
94  io_object_t (io_thread_),
95  _active (active_),
96  _pipe (NULL),
97  _zap_pipe (NULL),
98  _incomplete_in (false),
99  _pending (false),
100  _engine (NULL),
101  _socket (socket_),
102  _io_thread (io_thread_),
103  _has_linger_timer (false),
104  _addr (addr_)
105 #ifdef ZMQ_HAVE_WSS
106  ,
107  _wss_hostname (options_.wss_hostname)
108 #endif
109 {
110 }
111 
113 {
114  return _engine->get_endpoint ();
115 }
116 
118 {
119  zmq_assert (!_pipe);
120  zmq_assert (!_zap_pipe);
121 
122  // If there's still a pending linger timer, remove it.
123  if (_has_linger_timer) {
124  cancel_timer (linger_timer_id);
125  _has_linger_timer = false;
126  }
127 
128  // Close the engine.
129  if (_engine)
130  _engine->terminate ();
131 
132  LIBZMQ_DELETE (_addr);
133 }
134 
136 {
137  zmq_assert (!is_terminating ());
138  zmq_assert (!_pipe);
139  zmq_assert (pipe_);
140  _pipe = pipe_;
141  _pipe->set_event_sink (this);
142 }
143 
145 {
146  if (!_pipe || !_pipe->read (msg_)) {
147  errno = EAGAIN;
148  return -1;
149  }
150 
151  _incomplete_in = (msg_->flags () & msg_t::more) != 0;
152 
153  return 0;
154 }
155 
157 {
158  // pass subscribe/cancel to the sockets
159  if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
160  && !msg_->is_cancel ())
161  return 0;
162  if (_pipe && _pipe->write (msg_)) {
163  const int rc = msg_->init ();
164  errno_assert (rc == 0);
165  return 0;
166  }
167 
168  errno = EAGAIN;
169  return -1;
170 }
171 
173 {
174  if (_zap_pipe == NULL) {
175  errno = ENOTCONN;
176  return -1;
177  }
178 
179  if (!_zap_pipe->read (msg_)) {
180  errno = EAGAIN;
181  return -1;
182  }
183 
184  return 0;
185 }
186 
188 {
189  if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
190  errno = ENOTCONN;
191  return -1;
192  }
193 
194  if ((msg_->flags () & msg_t::more) == 0)
195  _zap_pipe->flush ();
196 
197  const int rc = msg_->init ();
198  errno_assert (rc == 0);
199  return 0;
200 }
201 
203 {
204 }
205 
207 {
208  if (_pipe)
209  _pipe->flush ();
210 }
211 
213 {
214  if (_pipe)
215  _pipe->rollback ();
216 }
217 
219 {
220  zmq_assert (_pipe != NULL);
221 
222  // Get rid of half-processed messages in the out pipe. Flush any
223  // unflushed messages upstream.
224  _pipe->rollback ();
225  _pipe->flush ();
226 
227  // Remove any half-read message from the in pipe.
228  while (_incomplete_in) {
229  msg_t msg;
230  int rc = msg.init ();
231  errno_assert (rc == 0);
232  rc = pull_msg (&msg);
233  errno_assert (rc == 0);
234  rc = msg.close ();
235  errno_assert (rc == 0);
236  }
237 }
238 
240 {
241  // Drop the reference to the deallocated pipe if required.
242  zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
243  || _terminating_pipes.count (pipe_) == 1);
244 
245  if (pipe_ == _pipe) {
246  // If this is our current pipe, remove it
247  _pipe = NULL;
248  if (_has_linger_timer) {
249  cancel_timer (linger_timer_id);
250  _has_linger_timer = false;
251  }
252  } else if (pipe_ == _zap_pipe)
253  _zap_pipe = NULL;
254  else
255  // Remove the pipe from the detached pipes set
256  _terminating_pipes.erase (pipe_);
257 
258  if (!is_terminating () && options.raw_socket) {
259  if (_engine) {
260  _engine->terminate ();
261  _engine = NULL;
262  }
263  terminate ();
264  }
265 
266  // If we are waiting for pending messages to be sent, at this point
267  // we are sure that there will be no more messages and we can proceed
268  // with termination safely.
269  if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
270  _pending = false;
272  }
273 }
274 
276 {
277  // Skip activating if we're detaching this pipe
278  if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
279  zmq_assert (_terminating_pipes.count (pipe_) == 1);
280  return;
281  }
282 
283  if (unlikely (_engine == NULL)) {
284  if (_pipe)
285  _pipe->check_read ();
286  return;
287  }
288 
289  if (likely (pipe_ == _pipe))
290  _engine->restart_output ();
291  else {
292  // i.e. pipe_ == zap_pipe
293  _engine->zap_msg_available ();
294  }
295 }
296 
298 {
299  // Skip activating if we're detaching this pipe
300  if (_pipe != pipe_) {
301  zmq_assert (_terminating_pipes.count (pipe_) == 1);
302  return;
303  }
304 
305  if (_engine)
306  _engine->restart_input ();
307 }
308 
310 {
311  // Hiccups are always sent from session to socket, not the other
312  // way round.
313  zmq_assert (false);
314 }
315 
317 {
318  return _socket;
319 }
320 
322 {
323  if (_active)
324  start_connecting (false);
325 }
326 
327 // This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
328 // is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
329 // or it aborts on any other error. In other words, either ZAP is not
330 // configured or if it is configured it MUST be configured correctly and it
331 // MUST work, otherwise authentication cannot be guaranteed and it would be a
332 // security flaw.
334 {
335  if (_zap_pipe != NULL)
336  return 0;
337 
338  endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
339  if (peer.socket == NULL) {
341  return -1;
342  }
343  zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
344  || peer.options.type == ZMQ_SERVER);
345 
346  // Create a bi-directional pipe that will connect
347  // session with zap socket.
348  object_t *parents[2] = {this, peer.socket};
349  pipe_t *new_pipes[2] = {NULL, NULL};
350  int hwms[2] = {0, 0};
351  bool conflates[2] = {false, false};
352  int rc = pipepair (parents, new_pipes, hwms, conflates);
353  errno_assert (rc == 0);
354 
355  // Attach local end of the pipe to this socket object.
356  _zap_pipe = new_pipes[0];
357  _zap_pipe->set_nodelay ();
358  _zap_pipe->set_event_sink (this);
359 
360  send_bind (peer.socket, new_pipes[1], false);
361 
362  // Send empty routing id if required by the peer.
363  if (peer.options.recv_routing_id) {
364  msg_t id;
365  rc = id.init ();
366  errno_assert (rc == 0);
367  id.set_flags (msg_t::routing_id);
368  bool ok = _zap_pipe->write (&id);
369  zmq_assert (ok);
370  _zap_pipe->flush ();
371  }
372 
373  return 0;
374 }
375 
377 {
378  return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
379 }
380 
382 {
383  zmq_assert (engine_ != NULL);
384  zmq_assert (!_engine);
385  _engine = engine_;
386 
387  if (!engine_->has_handshake_stage ())
388  engine_ready ();
389 
390  // Plug in the engine.
391  _engine->plug (_io_thread, this);
392 }
393 
395 {
396  // Create the pipe if it does not exist yet.
397  if (!_pipe && !is_terminating ()) {
398  object_t *parents[2] = {this, _socket};
399  pipe_t *pipes[2] = {NULL, NULL};
400 
401  const bool conflate = get_effective_conflate_option (options);
402 
403  int hwms[2] = {conflate ? -1 : options.rcvhwm,
404  conflate ? -1 : options.sndhwm};
405  bool conflates[2] = {conflate, conflate};
406  const int rc = pipepair (parents, pipes, hwms, conflates);
407  errno_assert (rc == 0);
408 
409  // Plug the local end of the pipe.
410  pipes[0]->set_event_sink (this);
411 
412  // Remember the local end of the pipe.
413  zmq_assert (!_pipe);
414  _pipe = pipes[0];
415 
416  // The endpoints strings are not set on bind, set them here so that
417  // events can use them.
418  pipes[0]->set_endpoint_pair (_engine->get_endpoint ());
419  pipes[1]->set_endpoint_pair (_engine->get_endpoint ());
420 
421  // Ask socket to plug into the remote end of the pipe.
422  send_bind (_socket, pipes[1]);
423  }
424 }
425 
426 void zmq::session_base_t::engine_error (bool handshaked_,
428 {
429  // Engine is dead. Let's forget about it.
430  _engine = NULL;
431 
432  // Remove any half-done messages from the pipes.
433  if (_pipe) {
434  clean_pipes ();
435 
436  // Only send disconnect message if socket was accepted and handshake was completed
437  if (!_active && handshaked_ && options.can_recv_disconnect_msg
438  && !options.disconnect_msg.empty ()) {
439  _pipe->set_disconnect_msg (options.disconnect_msg);
440  _pipe->send_disconnect_msg ();
441  }
442 
443  // Only send hiccup message if socket was connected and handshake was completed
444  if (_active && handshaked_ && options.can_recv_hiccup_msg
445  && !options.hiccup_msg.empty ()) {
446  _pipe->send_hiccup_msg (options.hiccup_msg);
447  }
448  }
449 
451  || reason_ == i_engine::timeout_error
452  || reason_ == i_engine::protocol_error);
453 
454  switch (reason_) {
456  /* FALLTHROUGH */
458  if (_active) {
459  reconnect ();
460  break;
461  }
462 
464  if (_pending) {
465  if (_pipe)
466  _pipe->terminate (false);
467  if (_zap_pipe)
468  _zap_pipe->terminate (false);
469  } else {
470  terminate ();
471  }
472  break;
473  }
474 
475  // Just in case there's only a delimiter in the pipe.
476  if (_pipe)
477  _pipe->check_read ();
478 
479  if (_zap_pipe)
480  _zap_pipe->check_read ();
481 }
482 
484 {
485  zmq_assert (!_pending);
486 
487  // If the termination of the pipe happens before the term command is
488  // delivered there's nothing much to do. We can proceed with the
489  // standard termination immediately.
490  if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
492  return;
493  }
494 
495  _pending = true;
496 
497  if (_pipe != NULL) {
498  // If there's finite linger value, delay the termination.
499  // If linger is infinite (negative) we don't even have to set
500  // the timer.
501  if (linger_ > 0) {
502  zmq_assert (!_has_linger_timer);
503  add_timer (linger_, linger_timer_id);
504  _has_linger_timer = true;
505  }
506 
507  // Start pipe termination process. Delay the termination till all messages
508  // are processed in case the linger time is non-zero.
509  _pipe->terminate (linger_ != 0);
510 
511  // TODO: Should this go into pipe_t::terminate ?
512  // In case there's no engine and there's only delimiter in the
513  // pipe it wouldn't be ever read. Thus we check for it explicitly.
514  if (!_engine)
515  _pipe->check_read ();
516  }
517 
518  if (_zap_pipe != NULL)
519  _zap_pipe->terminate (false);
520 }
521 
523 {
524  // Linger period expired. We can proceed with termination even though
525  // there are still pending messages to be sent.
526  zmq_assert (id_ == linger_timer_id);
527  _has_linger_timer = false;
528 
529  // Ask pipe to terminate even though there may be pending messages in it.
530  zmq_assert (_pipe);
531  _pipe->terminate (false);
532 }
533 
535 {
536  std::string *ep = new (std::string);
537  _addr->to_string (*ep);
538  send_term_endpoint (_socket, ep);
539 }
540 
542 {
543  // For delayed connect situations, terminate the pipe
544  // and reestablish later on
545  if (_pipe && options.immediate == 1
546 #ifdef ZMQ_HAVE_OPENPGM
547  && _addr->protocol != protocol_name::pgm
548  && _addr->protocol != protocol_name::epgm
549 #endif
550 #ifdef ZMQ_HAVE_NORM
551  && _addr->protocol != protocol_name::norm
552 #endif
553  && _addr->protocol != protocol_name::udp) {
554  _pipe->hiccup ();
555  _pipe->terminate (false);
556  _terminating_pipes.insert (_pipe);
557  _pipe = NULL;
558 
559  if (_has_linger_timer) {
560  cancel_timer (linger_timer_id);
561  _has_linger_timer = false;
562  }
563  }
564 
565  reset ();
566 
567  // Reconnect.
568  if (options.reconnect_ivl > 0)
569  start_connecting (true);
570  else {
571  std::string *ep = new (std::string);
572  _addr->to_string (*ep);
573  send_term_endpoint (_socket, ep);
574  }
575 
576  // For subscriber sockets we hiccup the inbound pipe, which will cause
577  // the socket object to resend all the subscriptions.
578  if (_pipe
579  && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
580  || options.type == ZMQ_DISH))
581  _pipe->hiccup ();
582 }
583 
585 {
586  zmq_assert (_active);
587 
588  // Choose I/O thread to run connecter in. Given that we are already
589  // running in an I/O thread, there must be at least one available.
590  io_thread_t *io_thread = choose_io_thread (options.affinity);
591  zmq_assert (io_thread);
592 
593  // Create the connecter object.
594  own_t *connecter = NULL;
595  if (_addr->protocol == protocol_name::tcp) {
596  if (!options.socks_proxy_address.empty ()) {
597  address_t *proxy_address = new (std::nothrow)
598  address_t (protocol_name::tcp, options.socks_proxy_address,
599  this->get_ctx ());
600  alloc_assert (proxy_address);
601  connecter = new (std::nothrow) socks_connecter_t (
602  io_thread, this, options, _addr, proxy_address, wait_);
603  alloc_assert (connecter);
604  if (!options.socks_proxy_username.empty ()) {
605  reinterpret_cast<socks_connecter_t *> (connecter)
606  ->set_auth_method_basic (options.socks_proxy_username,
607  options.socks_proxy_password);
608  }
609  } else {
610  connecter = new (std::nothrow)
611  tcp_connecter_t (io_thread, this, options, _addr, wait_);
612  }
613  }
614 #if defined ZMQ_HAVE_IPC
615  else if (_addr->protocol == protocol_name::ipc) {
616  connecter = new (std::nothrow)
617  ipc_connecter_t (io_thread, this, options, _addr, wait_);
618  }
619 #endif
620 #if defined ZMQ_HAVE_TIPC
621  else if (_addr->protocol == protocol_name::tipc) {
622  connecter = new (std::nothrow)
623  tipc_connecter_t (io_thread, this, options, _addr, wait_);
624  }
625 #endif
626 #if defined ZMQ_HAVE_VMCI
627  else if (_addr->protocol == protocol_name::vmci) {
628  connecter = new (std::nothrow)
629  vmci_connecter_t (io_thread, this, options, _addr, wait_);
630  }
631 #endif
632 #if defined ZMQ_HAVE_WS
633  else if (_addr->protocol == protocol_name::ws) {
634  connecter = new (std::nothrow) ws_connecter_t (
635  io_thread, this, options, _addr, wait_, false, std::string ());
636  }
637 #endif
638 #if defined ZMQ_HAVE_WSS
639  else if (_addr->protocol == protocol_name::wss) {
640  connecter = new (std::nothrow) ws_connecter_t (
641  io_thread, this, options, _addr, wait_, true, _wss_hostname);
642  }
643 #endif
644  if (connecter != NULL) {
645  alloc_assert (connecter);
646  launch_child (connecter);
647  return;
648  }
649 
650  if (_addr->protocol == protocol_name::udp) {
651  zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
652  || options.type == ZMQ_DGRAM);
653 
654  udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
655  alloc_assert (engine);
656 
657  bool recv = false;
658  bool send = false;
659 
660  if (options.type == ZMQ_RADIO) {
661  send = true;
662  recv = false;
663  } else if (options.type == ZMQ_DISH) {
664  send = false;
665  recv = true;
666  } else if (options.type == ZMQ_DGRAM) {
667  send = true;
668  recv = true;
669  }
670 
671  int rc = engine->init (_addr, send, recv);
672  errno_assert (rc == 0);
673 
674  send_attach (this, engine);
675 
676  return;
677  }
678 
679 #ifdef ZMQ_HAVE_OPENPGM
680 
681  // Both PGM and EPGM transports are using the same infrastructure.
682  if (_addr->protocol == "pgm" || _addr->protocol == "epgm") {
683  zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
684  || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
685 
686  // For EPGM transport with UDP encapsulation of PGM is used.
687  bool const udp_encapsulation = _addr->protocol == "epgm";
688 
689  // At this point we'll create message pipes to the session straight
690  // away. There's no point in delaying it as no concept of 'connect'
691  // exists with PGM anyway.
692  if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
693  // PGM sender.
694  pgm_sender_t *pgm_sender =
695  new (std::nothrow) pgm_sender_t (io_thread, options);
696  alloc_assert (pgm_sender);
697 
698  int rc =
699  pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
700  errno_assert (rc == 0);
701 
702  send_attach (this, pgm_sender);
703  } else {
704  // PGM receiver.
705  pgm_receiver_t *pgm_receiver =
706  new (std::nothrow) pgm_receiver_t (io_thread, options);
707  alloc_assert (pgm_receiver);
708 
709  int rc =
710  pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
711  errno_assert (rc == 0);
712 
713  send_attach (this, pgm_receiver);
714  }
715 
716  return;
717  }
718 #endif
719 
720 #ifdef ZMQ_HAVE_NORM
721  if (_addr->protocol == "norm") {
722  // At this point we'll create message pipes to the session straight
723  // away. There's no point in delaying it as no concept of 'connect'
724  // exists with NORM anyway.
725  if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
726  // NORM sender.
727  norm_engine_t *norm_sender =
728  new (std::nothrow) norm_engine_t (io_thread, options);
729  alloc_assert (norm_sender);
730 
731  int rc = norm_sender->init (_addr->address.c_str (), true, false);
732  errno_assert (rc == 0);
733 
734  send_attach (this, norm_sender);
735  } else { // ZMQ_SUB or ZMQ_XSUB
736 
737  // NORM receiver.
738  norm_engine_t *norm_receiver =
739  new (std::nothrow) norm_engine_t (io_thread, options);
740  alloc_assert (norm_receiver);
741 
742  int rc = norm_receiver->init (_addr->address.c_str (), false, true);
743  errno_assert (rc == 0);
744 
745  send_attach (this, norm_receiver);
746  }
747  return;
748  }
749 #endif // ZMQ_HAVE_NORM
750 
751  zmq_assert (false);
752 }
753 
754 zmq::hello_msg_session_t::hello_msg_session_t (io_thread_t *io_thread_,
755  bool connect_,
756  socket_base_t *socket_,
757  const options_t &options_,
758  address_t *addr_) :
759  session_base_t (io_thread_, connect_, socket_, options_, addr_),
760  _new_pipe (true)
761 {
762 }
763 
764 zmq::hello_msg_session_t::~hello_msg_session_t ()
765 {
766 }
767 
768 
769 int zmq::hello_msg_session_t::pull_msg (msg_t *msg_)
770 {
771  if (_new_pipe) {
772  _new_pipe = false;
773 
774  const int rc =
775  msg_->init_buffer (&options.hello_msg[0], options.hello_msg.size ());
776  errno_assert (rc == 0);
777 
778  return 0;
779  }
780 
781  return session_base_t::pull_msg (msg_);
782 }
783 
784 void zmq::hello_msg_session_t::reset ()
785 {
787  _new_pipe = true;
788 }
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
zmq::msg_t::command
@ command
Definition: msg.hpp:56
zmq::session_base_t
Definition: session_base.hpp:21
pgm_receiver.hpp
zmq::protocol_name::udp
static const char udp[]
Definition: address.hpp:39
zmq::session_base_t::reset
virtual void reset()
Definition: session_base.cpp:202
zmq::session_base_t::push_msg
virtual int push_msg(msg_t *msg_)
Definition: session_base.cpp:156
ZMQ_SERVER
#define ZMQ_SERVER
Definition: zmq_draft.h:14
ws_connecter.hpp
zmq::i_engine::timeout_error
@ timeout_error
Definition: i_engine.hpp:21
zmq::io_object_t
Definition: io_object.hpp:20
ZMQ_GATHER
#define ZMQ_GATHER
Definition: zmq_draft.h:18
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::options_t
Definition: options.hpp:34
ZMQ_XPUB
#define ZMQ_XPUB
Definition: zmq.h:267
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
ZMQ_STREAM
#define ZMQ_STREAM
Definition: zmq.h:269
EINVAL
#define EINVAL
Definition: errno.hpp:25
zmq::session_base_t::process_attach
void process_attach(zmq::i_engine *engine_) ZMQ_FINAL
Definition: session_base.cpp:381
zmq::session_base_t::clean_pipes
void clean_pipes()
Definition: session_base.cpp:218
zmq::session_base_t::create
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
Definition: session_base.cpp:27
zmq::protocol_name::tcp
static const char tcp[]
Definition: address.hpp:38
s
XmlRpcServer s
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
ZMQ_CLIENT
#define ZMQ_CLIENT
Definition: zmq_draft.h:15
zmq::session_base_t::start_connecting
void start_connecting(bool wait_)
Definition: session_base.cpp:584
zmq::i_engine::error_reason_t
error_reason_t
Definition: i_engine.hpp:17
vmci_connecter.hpp
ZMQ_RADIO
#define ZMQ_RADIO
Definition: zmq_draft.h:16
zmq::session_base_t::get_endpoint
const endpoint_uri_pair_t & get_endpoint() const
Definition: session_base.cpp:112
precompiled.hpp
pgm_sender.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
zmq::session_base_t::attach_pipe
void attach_pipe(zmq::pipe_t *pipe_)
Definition: session_base.cpp:135
zmq::session_base_t::write_zap_msg
int write_zap_msg(msg_t *msg_)
Definition: session_base.cpp:187
socks_connecter.hpp
zmq::own_t::process_term
void process_term(int linger_) ZMQ_OVERRIDE
Definition: own.cpp:128
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
ZMQ_XSUB
#define ZMQ_XSUB
Definition: zmq.h:268
zmq::socket_base_t
Definition: socket_base.hpp:31
ECONNREFUSED
#define ECONNREFUSED
Definition: zmq.h:122
ENOTCONN
#define ENOTCONN
Definition: zmq.h:146
zmq::session_base_t::~session_base_t
~session_base_t() ZMQ_OVERRIDE
Definition: session_base.cpp:117
ZMQ_SUB
#define ZMQ_SUB
Definition: zmq.h:260
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
udp_engine.hpp
ipc_connecter.hpp
ZMQ_REQ
#define ZMQ_REQ
Definition: zmq.h:261
ok
ROSCPP_DECL bool ok()
ctx.hpp
zmq::session_base_t::process_plug
void process_plug() ZMQ_FINAL
Definition: session_base.cpp:321
zmq::session_base_t::engine_ready
void engine_ready()
Definition: session_base.cpp:394
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
ZMQ_DEALER
#define ZMQ_DEALER
Definition: zmq.h:263
likely
#define likely(x)
Definition: likely.hpp:10
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::session_base_t::get_socket
socket_base_t * get_socket() const
Definition: session_base.cpp:316
zmq::session_base_t::zap_enabled
bool zap_enabled() const
Definition: session_base.cpp:376
ZMQ_CHANNEL
#define ZMQ_CHANNEL
Definition: zmq_draft.h:22
macros.hpp
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
ZMQ_PUSH
#define ZMQ_PUSH
Definition: zmq.h:266
zmq::msg_t::close
int close()
Definition: msg.cpp:242
ZMQ_ROUTER
#define ZMQ_ROUTER
Definition: zmq.h:264
zmq::session_base_t::flush
void flush()
Definition: session_base.cpp:206
ZMQ_REP
#define ZMQ_REP
Definition: zmq.h:262
zmq::i_engine::connection_error
@ connection_error
Definition: i_engine.hpp:20
pipe.hpp
tcp_connecter.hpp
zmq::options_t::recv_routing_id
bool recv_routing_id
Definition: options.hpp:133
address.hpp
zmq::i_engine::has_handshake_stage
virtual bool has_handshake_stage()=0
req.hpp
tipc_connecter.hpp
zmq::endpoint_t
Definition: ctx.hpp:31
zmq::i_engine::protocol_error
@ protocol_error
Definition: i_engine.hpp:19
zmq::session_base_t::read_activated
void read_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: session_base.cpp:275
ZMQ_PEER
#define ZMQ_PEER
Definition: zmq_draft.h:21
zmq::get_effective_conflate_option
bool get_effective_conflate_option(const options_t &options)
Definition: options.hpp:303
zmq::session_base_t::pull_msg
virtual int pull_msg(msg_t *msg_)
Definition: session_base.cpp:144
zmq::session_base_t::process_term
void process_term(int linger_) ZMQ_FINAL
Definition: session_base.cpp:483
zmq::object_t
Definition: object.hpp:28
zmq::msg_t::init
int init()
Definition: msg.cpp:50
zmq::id
const char id[]
Definition: zap_client.cpp:14
zmq::session_base_t::read_zap_msg
int read_zap_msg(msg_t *msg_)
Definition: session_base.cpp:172
zmq::session_base_t::write_activated
void write_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: session_base.cpp:297
zmq::session_base_t::timer_event
void timer_event(int id_) ZMQ_FINAL
Definition: session_base.cpp:522
zmq::msg_t::more
@ more
Definition: msg.hpp:55
zmq::session_base_t::zap_connect
int zap_connect()
Definition: session_base.cpp:333
zmq::address_t
Definition: address.hpp:64
zmq::session_base_t::session_base_t
session_base_t(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
Definition: session_base.cpp:88
zmq::msg_t::routing_id
uint32_t routing_id
Definition: msg.hpp:233
dish.hpp
ZMQ_DGRAM
#define ZMQ_DGRAM
Definition: zmq_draft.h:20
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
ZMQ_SCATTER
#define ZMQ_SCATTER
Definition: zmq_draft.h:19
ZMQ_DISH
#define ZMQ_DISH
Definition: zmq_draft.h:17
zmq::session_base_t::rollback
void rollback()
Definition: session_base.cpp:212
zmq::session_base_t::process_conn_failed
void process_conn_failed() ZMQ_OVERRIDE
Definition: session_base.cpp:534
zmq::session_base_t::pipe_terminated
void pipe_terminated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: session_base.cpp:239
zmq::own_t
Definition: own.hpp:21
err.hpp
ZMQ_NULL
#define ZMQ_NULL
Definition: zmq.h:362
likely.hpp
zmq::session_base_t::engine_error
void engine_error(bool handshaked_, zmq::i_engine::error_reason_t reason_)
Definition: session_base.cpp:426
true
#define true
Definition: cJSON.c:65
zmq::pipepair
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], const int hwms_[2], const bool conflate_[2])
zmq::session_base_t::reconnect
void reconnect()
Definition: session_base.cpp:541
zmq::i_engine
Definition: i_engine.hpp:15
zmq::endpoint_t::options
options_t options
Definition: ctx.hpp:34
ZMQ_PULL
#define ZMQ_PULL
Definition: zmq.h:265
zmq::msg_t::is_subscribe
bool is_subscribe() const
Definition: msg.hpp:113
session_base.hpp
i_engine.hpp
zmq::msg_t::is_cancel
bool is_cancel() const
Definition: msg.hpp:118
zmq::options_t::type
int8_t type
Definition: options.hpp:80
false
#define false
Definition: cJSON.c:70
ep
const SETUP_TEARDOWN_TESTCONTEXT char ep[]
Definition: test_term_endpoint_tipc.cpp:8
zmq::msg_t
Definition: msg.hpp:33
radio.hpp
unlikely
#define unlikely(x)
Definition: likely.hpp:11
norm_engine.hpp
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410
zmq::endpoint_t::socket
socket_base_t * socket
Definition: ctx.hpp:33
zmq::session_base_t::hiccuped
void hiccuped(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: session_base.cpp:309


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58