stream_engine_base.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 
6 #include <limits.h>
7 #include <string.h>
8 
9 #ifndef ZMQ_HAVE_WINDOWS
10 #include <unistd.h>
11 #endif
12 
13 #include <new>
14 #include <sstream>
15 
16 #include "stream_engine_base.hpp"
17 #include "io_thread.hpp"
18 #include "session_base.hpp"
19 #include "v1_encoder.hpp"
20 #include "v1_decoder.hpp"
21 #include "v2_encoder.hpp"
22 #include "v2_decoder.hpp"
23 #include "null_mechanism.hpp"
24 #include "plain_client.hpp"
25 #include "plain_server.hpp"
26 #include "gssapi_client.hpp"
27 #include "gssapi_server.hpp"
28 #include "curve_client.hpp"
29 #include "curve_server.hpp"
30 #include "raw_decoder.hpp"
31 #include "raw_encoder.hpp"
32 #include "config.hpp"
33 #include "err.hpp"
34 #include "ip.hpp"
35 #include "tcp.hpp"
36 #include "likely.hpp"
37 #include "wire.hpp"
38 
40 {
41  std::string peer_address;
42 
43  const int family = zmq::get_peer_ip_address (s_, peer_address);
44  if (family == 0)
45  peer_address.clear ();
46 #if defined ZMQ_HAVE_SO_PEERCRED
47  else if (family == PF_UNIX) {
48  struct ucred cred;
49  socklen_t size = sizeof (cred);
50  if (!getsockopt (s_, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
51  std::ostringstream buf;
52  buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
53  peer_address += buf.str ();
54  }
55  }
56 #elif defined ZMQ_HAVE_LOCAL_PEERCRED
57  else if (family == PF_UNIX) {
58  struct xucred cred;
59  socklen_t size = sizeof (cred);
60  if (!getsockopt (s_, 0, LOCAL_PEERCRED, &cred, &size)
61  && cred.cr_version == XUCRED_VERSION) {
62  std::ostringstream buf;
63  buf << ":" << cred.cr_uid << ":";
64  if (cred.cr_ngroups > 0)
65  buf << cred.cr_groups[0];
66  buf << ":";
67  peer_address += buf.str ();
68  }
69  }
70 #endif
71 
72  return peer_address;
73 }
74 
76  fd_t fd_,
77  const options_t &options_,
78  const endpoint_uri_pair_t &endpoint_uri_pair_,
79  bool has_handshake_stage_) :
80  _options (options_),
81  _inpos (NULL),
82  _insize (0),
83  _decoder (NULL),
84  _outpos (NULL),
85  _outsize (0),
86  _encoder (NULL),
87  _mechanism (NULL),
88  _next_msg (NULL),
89  _process_msg (NULL),
90  _metadata (NULL),
91  _input_stopped (false),
92  _output_stopped (false),
93  _endpoint_uri_pair (endpoint_uri_pair_),
94  _has_handshake_timer (false),
95  _has_ttl_timer (false),
96  _has_timeout_timer (false),
97  _has_heartbeat_timer (false),
98  _peer_address (get_peer_address (fd_)),
99  _s (fd_),
100  _handle (static_cast<handle_t> (NULL)),
101  _plugged (false),
102  _handshaking (true),
103  _io_error (false),
104  _session (NULL),
105  _socket (NULL),
106  _has_handshake_stage (has_handshake_stage_)
107 {
108  const int rc = _tx_msg.init ();
109  errno_assert (rc == 0);
110 
111  // Put the socket into non-blocking mode.
112  unblock_socket (_s);
113 }
114 
116 {
117  zmq_assert (!_plugged);
118 
119  if (_s != retired_fd) {
120 #ifdef ZMQ_HAVE_WINDOWS
121  const int rc = closesocket (_s);
122  wsa_assert (rc != SOCKET_ERROR);
123 #else
124  int rc = close (_s);
125 #if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
126  // FreeBSD may return ECONNRESET on close() under load but this is not
127  // an error.
128  if (rc == -1 && errno == ECONNRESET)
129  rc = 0;
130 #endif
131  errno_assert (rc == 0);
132 #endif
133  _s = retired_fd;
134  }
135 
136  const int rc = _tx_msg.close ();
137  errno_assert (rc == 0);
138 
139  // Drop reference to metadata and destroy it if we are
140  // the only user.
141  if (_metadata != NULL) {
142  if (_metadata->drop_ref ()) {
143  LIBZMQ_DELETE (_metadata);
144  }
145  }
146 
147  LIBZMQ_DELETE (_encoder);
148  LIBZMQ_DELETE (_decoder);
149  LIBZMQ_DELETE (_mechanism);
150 }
151 
152 void zmq::stream_engine_base_t::plug (io_thread_t *io_thread_,
153  session_base_t *session_)
154 {
155  zmq_assert (!_plugged);
156  _plugged = true;
157 
158  // Connect to session object.
159  zmq_assert (!_session);
160  zmq_assert (session_);
161  _session = session_;
162  _socket = _session->get_socket ();
163 
164  // Connect to I/O threads poller object.
165  io_object_t::plug (io_thread_);
166  _handle = add_fd (_s);
167  _io_error = false;
168 
169  plug_internal ();
170 }
171 
173 {
174  zmq_assert (_plugged);
175  _plugged = false;
176 
177  // Cancel all timers.
178  if (_has_handshake_timer) {
179  cancel_timer (handshake_timer_id);
180  _has_handshake_timer = false;
181  }
182 
183  if (_has_ttl_timer) {
184  cancel_timer (heartbeat_ttl_timer_id);
185  _has_ttl_timer = false;
186  }
187 
188  if (_has_timeout_timer) {
189  cancel_timer (heartbeat_timeout_timer_id);
190  _has_timeout_timer = false;
191  }
192 
193  if (_has_heartbeat_timer) {
194  cancel_timer (heartbeat_ivl_timer_id);
195  _has_heartbeat_timer = false;
196  }
197  // Cancel all fd subscriptions.
198  if (!_io_error)
199  rm_fd (_handle);
200 
201  // Disconnect from I/O threads poller object.
203 
204  _session = NULL;
205 }
206 
208 {
209  unplug ();
210  delete this;
211 }
212 
214 {
215  // ignore errors
216  const bool res = in_event_internal ();
217  LIBZMQ_UNUSED (res);
218 }
219 
221 {
222  zmq_assert (!_io_error);
223 
224  // If still handshaking, receive and process the greeting message.
225  if (unlikely (_handshaking)) {
226  if (handshake ()) {
227  // Handshaking was successful.
228  // Switch into the normal message flow.
229  _handshaking = false;
230 
231  if (_mechanism == NULL && _has_handshake_stage) {
232  _session->engine_ready ();
233 
234  if (_has_handshake_timer) {
235  cancel_timer (handshake_timer_id);
236  _has_handshake_timer = false;
237  }
238  }
239  } else
240  return false;
241  }
242 
243 
244  zmq_assert (_decoder);
245 
246  // If there has been an I/O error, stop polling.
247  if (_input_stopped) {
248  rm_fd (_handle);
249  _io_error = true;
250  return true; // TODO or return false in this case too?
251  }
252 
253  // If there's no data to process in the buffer...
254  if (!_insize) {
255  // Retrieve the buffer and read as much data as possible.
256  // Note that buffer can be arbitrarily large. However, we assume
257  // the underlying TCP layer has fixed buffer size and thus the
258  // number of bytes read will be always limited.
259  size_t bufsize = 0;
260  _decoder->get_buffer (&_inpos, &bufsize);
261 
262  const int rc = read (_inpos, bufsize);
263 
264  if (rc == -1) {
265  if (errno != EAGAIN) {
266  error (connection_error);
267  return false;
268  }
269  return true;
270  }
271 
272  // Adjust input size
273  _insize = static_cast<size_t> (rc);
274  // Adjust buffer size to received bytes
275  _decoder->resize_buffer (_insize);
276  }
277 
278  int rc = 0;
279  size_t processed = 0;
280 
281  while (_insize > 0) {
282  rc = _decoder->decode (_inpos, _insize, processed);
283  zmq_assert (processed <= _insize);
284  _inpos += processed;
285  _insize -= processed;
286  if (rc == 0 || rc == -1)
287  break;
288  rc = (this->*_process_msg) (_decoder->msg ());
289  if (rc == -1)
290  break;
291  }
292 
293  // Tear down the connection if we have failed to decode input data
294  // or the session has rejected the message.
295  if (rc == -1) {
296  if (errno != EAGAIN) {
297  // In cases where the src/dst have the same IP and the dst uses an ephemeral port, reconnection
298  // eventually results in the src and dest IP and port clashing (google tcp self connection)
299  // While this is a protocol_error (you have the single zmq socket handshaking with itself)
300  // we do not want to to stop reconnection from happening
301  if (!_endpoint_uri_pair.clash ()) {
302  error (protocol_error);
303  return false;
304  }
305  }
306  _input_stopped = true;
307  reset_pollin (_handle);
308  }
309 
310  _session->flush ();
311  return true;
312 }
313 
315 {
316  zmq_assert (!_io_error);
317 
318  // If write buffer is empty, try to read new data from the encoder.
319  if (!_outsize) {
320  // Even when we stop polling as soon as there is no
321  // data to send, the poller may invoke out_event one
322  // more time due to 'speculative write' optimisation.
323  if (unlikely (_encoder == NULL)) {
324  zmq_assert (_handshaking);
325  return;
326  }
327 
328  _outpos = NULL;
329  _outsize = _encoder->encode (&_outpos, 0);
330 
331  while (_outsize < static_cast<size_t> (_options.out_batch_size)) {
332  if ((this->*_next_msg) (&_tx_msg) == -1) {
333  // ws_engine can cause an engine error and delete it, so
334  // bail out immediately to avoid use-after-free
335  if (errno == ECONNRESET)
336  return;
337  else
338  break;
339  }
340  _encoder->load_msg (&_tx_msg);
341  unsigned char *bufptr = _outpos + _outsize;
342  const size_t n =
343  _encoder->encode (&bufptr, _options.out_batch_size - _outsize);
344  zmq_assert (n > 0);
345  if (_outpos == NULL)
346  _outpos = bufptr;
347  _outsize += n;
348  }
349 
350  // If there is no data to send, stop polling for output.
351  if (_outsize == 0) {
352  _output_stopped = true;
353  reset_pollout ();
354  return;
355  }
356  }
357 
358  // If there are any data to write in write buffer, write as much as
359  // possible to the socket. Note that amount of data to write can be
360  // arbitrarily large. However, we assume that underlying TCP layer has
361  // limited transmission buffer and thus the actual number of bytes
362  // written should be reasonably modest.
363  const int nbytes = write (_outpos, _outsize);
364 
365  // IO error has occurred. We stop waiting for output events.
366  // The engine is not terminated until we detect input error;
367  // this is necessary to prevent losing incoming messages.
368  if (nbytes == -1) {
369  reset_pollout ();
370  return;
371  }
372 
373  _outpos += nbytes;
374  _outsize -= nbytes;
375 
376  // If we are still handshaking and there are no data
377  // to send, stop polling for output.
378  if (unlikely (_handshaking))
379  if (_outsize == 0)
380  reset_pollout ();
381 }
382 
384 {
385  if (unlikely (_io_error))
386  return;
387 
388  if (likely (_output_stopped)) {
389  set_pollout ();
390  _output_stopped = false;
391  }
392 
393  // Speculative write: The assumption is that at the moment new message
394  // was sent by the user the socket is probably available for writing.
395  // Thus we try to write the data to socket avoiding polling for POLLOUT.
396  // Consequently, the latency should be better in request/reply scenarios.
397  out_event ();
398 }
399 
401 {
402  zmq_assert (_input_stopped);
403  zmq_assert (_session != NULL);
404  zmq_assert (_decoder != NULL);
405 
406  int rc = (this->*_process_msg) (_decoder->msg ());
407  if (rc == -1) {
408  if (errno == EAGAIN)
409  _session->flush ();
410  else {
411  error (protocol_error);
412  return false;
413  }
414  return true;
415  }
416 
417  while (_insize > 0) {
418  size_t processed = 0;
419  rc = _decoder->decode (_inpos, _insize, processed);
420  zmq_assert (processed <= _insize);
421  _inpos += processed;
422  _insize -= processed;
423  if (rc == 0 || rc == -1)
424  break;
425  rc = (this->*_process_msg) (_decoder->msg ());
426  if (rc == -1)
427  break;
428  }
429 
430  if (rc == -1 && errno == EAGAIN)
431  _session->flush ();
432  else if (_io_error) {
433  error (connection_error);
434  return false;
435  } else if (rc == -1) {
436  error (protocol_error);
437  return false;
438  }
439 
440  else {
441  _input_stopped = false;
442  set_pollin ();
443  _session->flush ();
444 
445  // Speculative read.
446  if (!in_event_internal ())
447  return false;
448  }
449 
450  return true;
451 }
452 
454 {
455  zmq_assert (_mechanism != NULL);
456 
457  if (_mechanism->status () == mechanism_t::ready) {
458  mechanism_ready ();
459  return pull_and_encode (msg_);
460  }
461  if (_mechanism->status () == mechanism_t::error) {
462  errno = EPROTO;
463  return -1;
464  }
465  const int rc = _mechanism->next_handshake_command (msg_);
466 
467  if (rc == 0)
468  msg_->set_flags (msg_t::command);
469 
470  return rc;
471 }
472 
474 {
475  zmq_assert (_mechanism != NULL);
476  const int rc = _mechanism->process_handshake_command (msg_);
477  if (rc == 0) {
478  if (_mechanism->status () == mechanism_t::ready)
479  mechanism_ready ();
480  else if (_mechanism->status () == mechanism_t::error) {
481  errno = EPROTO;
482  return -1;
483  }
484  if (_output_stopped)
485  restart_output ();
486  }
487 
488  return rc;
489 }
490 
492 {
493  zmq_assert (_mechanism != NULL);
494 
495  const int rc = _mechanism->zap_msg_available ();
496  if (rc == -1) {
497  error (protocol_error);
498  return;
499  }
500  if (_input_stopped)
501  if (!restart_input ())
502  return;
503  if (_output_stopped)
504  restart_output ();
505 }
506 
508 {
509  return _endpoint_uri_pair;
510 }
511 
513 {
514  if (_options.heartbeat_interval > 0 && !_has_heartbeat_timer) {
515  add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
516  _has_heartbeat_timer = true;
517  }
518 
519  if (_has_handshake_stage)
520  _session->engine_ready ();
521 
522  bool flush_session = false;
523 
524  if (_options.recv_routing_id) {
525  msg_t routing_id;
526  _mechanism->peer_routing_id (&routing_id);
527  const int rc = _session->push_msg (&routing_id);
528  if (rc == -1 && errno == EAGAIN) {
529  // If the write is failing at this stage with
530  // an EAGAIN the pipe must be being shut down,
531  // so we can just bail out of the routing id set.
532  return;
533  }
534  errno_assert (rc == 0);
535  flush_session = true;
536  }
537 
538  if (_options.router_notify & ZMQ_NOTIFY_CONNECT) {
539  msg_t connect_notification;
540  connect_notification.init ();
541  const int rc = _session->push_msg (&connect_notification);
542  if (rc == -1 && errno == EAGAIN) {
543  // If the write is failing at this stage with
544  // an EAGAIN the pipe must be being shut down,
545  // so we can just bail out of the notification.
546  return;
547  }
548  errno_assert (rc == 0);
549  flush_session = true;
550  }
551 
552  if (flush_session)
553  _session->flush ();
554 
557 
558  // Compile metadata.
559  properties_t properties;
560  init_properties (properties);
561 
562  // Add ZAP properties.
563  const properties_t &zap_properties = _mechanism->get_zap_properties ();
564  properties.insert (zap_properties.begin (), zap_properties.end ());
565 
566  // Add ZMTP properties.
567  const properties_t &zmtp_properties = _mechanism->get_zmtp_properties ();
568  properties.insert (zmtp_properties.begin (), zmtp_properties.end ());
569 
570  zmq_assert (_metadata == NULL);
571  if (!properties.empty ()) {
572  _metadata = new (std::nothrow) metadata_t (properties);
573  alloc_assert (_metadata);
574  }
575 
576  if (_has_handshake_timer) {
577  cancel_timer (handshake_timer_id);
578  _has_handshake_timer = false;
579  }
580 
581  _socket->event_handshake_succeeded (_endpoint_uri_pair, 0);
582 }
583 
585 {
586  zmq_assert (_mechanism != NULL);
587  zmq_assert (_session != NULL);
588 
589  const blob_t &credential = _mechanism->get_user_id ();
590  if (credential.size () > 0) {
591  msg_t msg;
592  int rc = msg.init_size (credential.size ());
593  zmq_assert (rc == 0);
594  memcpy (msg.data (), credential.data (), credential.size ());
596  rc = _session->push_msg (&msg);
597  if (rc == -1) {
598  rc = msg.close ();
599  errno_assert (rc == 0);
600  return -1;
601  }
602  }
604  return decode_and_push (msg_);
605 }
606 
608 {
609  zmq_assert (_mechanism != NULL);
610 
611  if (_session->pull_msg (msg_) == -1)
612  return -1;
613  if (_mechanism->encode (msg_) == -1)
614  return -1;
615  return 0;
616 }
617 
619 {
620  zmq_assert (_mechanism != NULL);
621 
622  if (_mechanism->decode (msg_) == -1)
623  return -1;
624 
625  if (_has_timeout_timer) {
626  _has_timeout_timer = false;
627  cancel_timer (heartbeat_timeout_timer_id);
628  }
629 
630  if (_has_ttl_timer) {
631  _has_ttl_timer = false;
632  cancel_timer (heartbeat_ttl_timer_id);
633  }
634 
635  if (msg_->flags () & msg_t::command) {
636  process_command_message (msg_);
637  }
638 
639  if (_metadata)
640  msg_->set_metadata (_metadata);
641  if (_session->push_msg (msg_) == -1) {
642  if (errno == EAGAIN)
644  return -1;
645  }
646  return 0;
647 }
648 
650 {
651  const int rc = _session->push_msg (msg_);
652  if (rc == 0)
654  return rc;
655 }
656 
658 {
659  return _session->pull_msg (msg_);
660 }
661 
663 {
664  return _session->push_msg (msg_);
665 }
666 
668 {
669  zmq_assert (_session);
670 
671  if ((_options.router_notify & ZMQ_NOTIFY_DISCONNECT) && !_handshaking) {
672  // For router sockets with disconnect notification, rollback
673  // any incomplete message in the pipe, and push the disconnect
674  // notification message.
675  _session->rollback ();
676 
677  msg_t disconnect_notification;
678  disconnect_notification.init ();
679  _session->push_msg (&disconnect_notification);
680  }
681 
682  // protocol errors have been signaled already at the point where they occurred
683  if (reason_ != protocol_error
684  && (_mechanism == NULL
685  || _mechanism->status () == mechanism_t::handshaking)) {
686  const int err = errno;
687  _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err);
688  // special case: connecting to non-ZMTP process which immediately drops connection,
689  // or which never responds with greeting, should be treated as a protocol error
690  // (i.e. stop reconnect)
691  if (((reason_ == connection_error) || (reason_ == timeout_error))
692  && (_options.reconnect_stop
694  reason_ = protocol_error;
695  }
696  }
697 
698  _socket->event_disconnected (_endpoint_uri_pair, _s);
699  _session->flush ();
700  _session->engine_error (
701  !_handshaking
702  && (_mechanism == NULL
703  || _mechanism->status () != mechanism_t::handshaking),
704  reason_);
705  unplug ();
706  delete this;
707 }
708 
710 {
711  zmq_assert (!_has_handshake_timer);
712 
713  if (_options.handshake_ivl > 0) {
714  add_timer (_options.handshake_ivl, handshake_timer_id);
715  _has_handshake_timer = true;
716  }
717 }
718 
720 {
721  if (_peer_address.empty ())
722  return false;
723  properties_.ZMQ_MAP_INSERT_OR_EMPLACE (
724  std::string (ZMQ_MSG_PROPERTY_PEER_ADDRESS), _peer_address);
725 
726  // Private property to support deprecated SRCFD
727  std::ostringstream stream;
729  std::string fd_string = stream.str ();
730  properties_.ZMQ_MAP_INSERT_OR_EMPLACE (std::string ("__fd"),
731  ZMQ_MOVE (fd_string));
732  return true;
733 }
734 
736 {
737  if (id_ == handshake_timer_id) {
738  _has_handshake_timer = false;
739  // handshake timer expired before handshake completed, so engine fail
740  error (timeout_error);
741  } else if (id_ == heartbeat_ivl_timer_id) {
743  out_event ();
744  add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
745  } else if (id_ == heartbeat_ttl_timer_id) {
746  _has_ttl_timer = false;
747  error (timeout_error);
748  } else if (id_ == heartbeat_timeout_timer_id) {
749  _has_timeout_timer = false;
750  error (timeout_error);
751  } else
752  // There are no other valid timer ids!
753  assert (false);
754 }
755 
756 int zmq::stream_engine_base_t::read (void *data_, size_t size_)
757 {
758  const int rc = zmq::tcp_read (_s, data_, size_);
759 
760  if (rc == 0) {
761  // connection closed by peer
762  errno = EPIPE;
763  return -1;
764  }
765 
766  return rc;
767 }
768 
769 int zmq::stream_engine_base_t::write (const void *data_, size_t size_)
770 {
771  return zmq::tcp_write (_s, data_, size_);
772 }
zmq::io_object_t::plug
void plug(zmq::io_thread_t *io_thread_)
Definition: io_object.cpp:18
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
zmq::stream_engine_base_t::push_msg_to_session
int push_msg_to_session(msg_t *msg_)
Definition: stream_engine_base.cpp:662
zmq::msg_t::command
@ command
Definition: msg.hpp:56
zmq::session_base_t
Definition: session_base.hpp:21
closesocket
#define closesocket
Definition: unittest_poller.cpp:13
stream_engine_base.hpp
ip.hpp
data_
StringPiece data_
Definition: bytestream_unittest.cc:60
ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED
#define ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED
Definition: zmq_draft.h:67
get_peer_address
static std::string get_peer_address(zmq::fd_t s_)
Definition: stream_engine_base.cpp:39
zmq::stream_engine_base_t::read
virtual int read(void *data, size_t size_)
Definition: stream_engine_base.cpp:756
zmq::io_object_t::handle_t
poller_t::handle_t handle_t
Definition: io_object.hpp:32
zmq::stream_engine_base_t::init_properties
bool init_properties(properties_t &properties_)
Definition: stream_engine_base.cpp:719
stream
GLuint GLuint stream
Definition: glcorearb.h:3946
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::options_t
Definition: options.hpp:34
zmq::mechanism_t::ready
@ ready
Definition: mechanism.hpp:25
config.hpp
zmq::mechanism_t::error
@ error
Definition: mechanism.hpp:26
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
zmq::stream_engine_base_t::in_event
void in_event() ZMQ_FINAL
Definition: stream_engine_base.cpp:213
raw_encoder.hpp
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
bufsize
GLenum GLuint GLsizei bufsize
Definition: glcorearb.h:3897
zmq::i_engine::error_reason_t
error_reason_t
Definition: i_engine.hpp:17
gssapi_server.hpp
zmq::stream_engine_base_t::restart_output
void restart_output() ZMQ_FINAL
Definition: stream_engine_base.cpp:383
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
zmq::stream_engine_base_t::produce_ping_message
virtual int produce_ping_message(msg_t *msg_)
Definition: stream_engine_base.hpp:79
null_mechanism.hpp
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
zmq::stream_engine_base_t::next_handshake_command
int next_handshake_command(msg_t *msg_)
Definition: stream_engine_base.cpp:453
zmq::stream_engine_base_t::in_event_internal
bool in_event_internal()
Definition: stream_engine_base.cpp:220
zmq::stream_engine_base_t::decode_and_push
virtual int decode_and_push(msg_t *msg_)
Definition: stream_engine_base.cpp:618
zmq::stream_engine_base_t::restart_input
bool restart_input() ZMQ_FINAL
Definition: stream_engine_base.cpp:400
error
Definition: cJSON.c:88
zmq::stream_engine_base_t::_tx_msg
msg_t _tx_msg
Definition: stream_engine_base.hpp:176
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
zmq::msg_t::init_size
int init_size(size_t size_)
Definition: msg.cpp:62
zmq::io_object_t::unplug
void unplug()
Definition: io_object.cpp:27
wire.hpp
zmq::stream_engine_base_t::pull_msg_from_session
int pull_msg_from_session(msg_t *msg_)
Definition: stream_engine_base.cpp:657
EPROTO
#define EPROTO
Definition: err.hpp:26
v2_encoder.hpp
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
likely
#define likely(x)
Definition: likely.hpp:10
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::session_base_t::get_socket
socket_base_t * get_socket() const
Definition: session_base.cpp:316
zmq::stream_engine_base_t::set_handshake_timer
void set_handshake_timer()
Definition: stream_engine_base.cpp:709
v1_encoder.hpp
v2_decoder.hpp
macros.hpp
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
v1_decoder.hpp
zmq::stream_engine_base_t::mechanism_ready
void mechanism_ready()
Definition: stream_engine_base.cpp:512
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
zmq::mechanism_t::handshaking
@ handshaking
Definition: mechanism.hpp:24
zmq::msg_t::close
int close()
Definition: msg.cpp:242
zmq::blob_t::data
const unsigned char * data() const
Definition: blob.hpp:86
zmq::stream_engine_base_t::out_event
void out_event() ZMQ_OVERRIDE
Definition: stream_engine_base.cpp:314
zmq::stream_engine_base_t::unplug
void unplug()
Definition: stream_engine_base.cpp:172
zmq::blob_t::size
size_t size() const
Definition: blob.hpp:83
s_
std::string s_
Definition: gmock-matchers_test.cc:4128
zmq::stream_engine_base_t::zap_msg_available
void zap_msg_available() ZMQ_FINAL
Definition: stream_engine_base.cpp:491
zmq::msg_t::set_metadata
void set_metadata(metadata_t *metadata_)
Definition: msg.cpp:448
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
zmq::stream_engine_base_t::pull_and_encode
int pull_and_encode(msg_t *msg_)
Definition: stream_engine_base.cpp:607
zmq::msg_t::credential
@ credential
Definition: msg.hpp:64
curve_server.hpp
gssapi_client.hpp
zmq::stream_engine_base_t::get_endpoint
const endpoint_uri_pair_t & get_endpoint() const ZMQ_FINAL
Definition: stream_engine_base.cpp:507
zmq::unblock_socket
void unblock_socket(fd_t s_)
Definition: ip.cpp:107
raw_decoder.hpp
buf
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition: glcorearb.h:4175
ZMQ_NOTIFY_DISCONNECT
#define ZMQ_NOTIFY_DISCONNECT
Definition: zmq_draft.h:102
zmq::stream_engine_base_t::stream_engine_base_t
stream_engine_base_t(fd_t fd_, const options_t &options_, const endpoint_uri_pair_t &endpoint_uri_pair_, bool has_handshake_stage_)
Definition: stream_engine_base.cpp:75
n
GLdouble n
Definition: glcorearb.h:4153
zmq::msg_t::init
int init()
Definition: msg.cpp:50
plain_server.hpp
zmq::blob_t
Definition: blob.hpp:46
ZMQ_NOTIFY_CONNECT
#define ZMQ_NOTIFY_CONNECT
Definition: zmq_draft.h:101
zmq::metadata_t
Definition: metadata.hpp:13
plain_client.hpp
io_thread.hpp
ZMQ_MSG_PROPERTY_PEER_ADDRESS
#define ZMQ_MSG_PROPERTY_PEER_ADDRESS
Definition: zmq_draft.h:98
tcp.hpp
zmq::get_peer_ip_address
int get_peer_ip_address(fd_t sockfd_, std::string &ip_addr_)
Definition: ip.cpp:147
size
GLsizeiptr size
Definition: glcorearb.h:2943
ECONNRESET
#define ECONNRESET
Definition: zmq.h:143
zmq::tcp_write
int tcp_write(fd_t s_, const void *data_, size_t size_)
Definition: tcp.cpp:186
err.hpp
likely.hpp
zmq::stream_engine_base_t::_s
fd_t _s
Definition: stream_engine_base.hpp:165
zmq::stream_engine_base_t::push_one_then_decode_and_push
int push_one_then_decode_and_push(msg_t *msg_)
Definition: stream_engine_base.cpp:649
zmq::stream_engine_base_t::~stream_engine_base_t
~stream_engine_base_t() ZMQ_OVERRIDE
Definition: stream_engine_base.cpp:115
ZMQ_MOVE
#define ZMQ_MOVE(x)
Definition: blob.hpp:33
zmq::stream_engine_base_t::error
virtual void error(error_reason_t reason_)
Definition: stream_engine_base.cpp:667
zmq::tcp_read
int tcp_read(fd_t s_, void *data_, size_t size_)
Definition: tcp.cpp:245
true
#define true
Definition: cJSON.c:65
zmq::stream_engine_base_t::terminate
void terminate() ZMQ_FINAL
Definition: stream_engine_base.cpp:207
zmq::stream_engine_base_t::write_credential
int write_credential(msg_t *msg_)
Definition: stream_engine_base.cpp:584
zmq::stream_engine_base_t::process_handshake_command
int process_handshake_command(msg_t *msg_)
Definition: stream_engine_base.cpp:473
zmq::stream_engine_base_t::timer_event
void timer_event(int id_) ZMQ_FINAL
Definition: stream_engine_base.cpp:735
session_base.hpp
zmq::msg_t::data
unsigned char data[max_vsm_size]
Definition: msg.hpp:239
zmq::stream_engine_base_t::write
virtual int write(const void *data_, size_t size_)
Definition: stream_engine_base.cpp:769
zmq::stream_engine_base_t::properties_t
metadata_t::dict_t properties_t
Definition: stream_engine_base.hpp:53
false
#define false
Definition: cJSON.c:70
zmq::msg_t::set_flags
void set_flags(unsigned char flags_)
Definition: msg.cpp:433
curve_client.hpp
zmq::msg_t
Definition: msg.hpp:33
zmq::stream_engine_base_t::plug
void plug(zmq::io_thread_t *io_thread_, zmq::session_base_t *session_) ZMQ_FINAL
Definition: stream_engine_base.cpp:152
unlikely
#define unlikely(x)
Definition: likely.hpp:11
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59