9 #ifndef ZMQ_HAVE_WINDOWS
45 peer_address.clear ();
46 #if defined ZMQ_HAVE_SO_PEERCRED
47 else if (family == PF_UNIX) {
49 socklen_t
size =
sizeof (cred);
50 if (!getsockopt (
s_, SOL_SOCKET, SO_PEERCRED, &cred, &
size)) {
51 std::ostringstream
buf;
52 buf <<
":" << cred.uid <<
":" << cred.gid <<
":" << cred.pid;
53 peer_address +=
buf.str ();
56 #elif defined ZMQ_HAVE_LOCAL_PEERCRED
57 else if (family == PF_UNIX) {
59 socklen_t
size =
sizeof (cred);
60 if (!getsockopt (
s_, 0, LOCAL_PEERCRED, &cred, &
size)
61 && cred.cr_version == XUCRED_VERSION) {
62 std::ostringstream
buf;
63 buf <<
":" << cred.cr_uid <<
":";
64 if (cred.cr_ngroups > 0)
65 buf << cred.cr_groups[0];
67 peer_address +=
buf.str ();
79 bool has_handshake_stage_) :
91 _input_stopped (
false),
92 _output_stopped (
false),
93 _endpoint_uri_pair (endpoint_uri_pair_),
94 _has_handshake_timer (
false),
95 _has_ttl_timer (
false),
96 _has_timeout_timer (
false),
97 _has_heartbeat_timer (
false),
106 _has_handshake_stage (has_handshake_stage_)
120 #ifdef ZMQ_HAVE_WINDOWS
122 wsa_assert (rc != SOCKET_ERROR);
125 #if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
136 const int rc = _tx_msg.close ();
141 if (_metadata !=
NULL) {
142 if (_metadata->drop_ref ()) {
166 _handle = add_fd (_s);
178 if (_has_handshake_timer) {
179 cancel_timer (handshake_timer_id);
180 _has_handshake_timer =
false;
183 if (_has_ttl_timer) {
184 cancel_timer (heartbeat_ttl_timer_id);
185 _has_ttl_timer =
false;
188 if (_has_timeout_timer) {
189 cancel_timer (heartbeat_timeout_timer_id);
190 _has_timeout_timer =
false;
193 if (_has_heartbeat_timer) {
194 cancel_timer (heartbeat_ivl_timer_id);
195 _has_heartbeat_timer =
false;
216 const bool res = in_event_internal ();
229 _handshaking =
false;
231 if (_mechanism ==
NULL && _has_handshake_stage) {
232 _session->engine_ready ();
234 if (_has_handshake_timer) {
235 cancel_timer (handshake_timer_id);
236 _has_handshake_timer =
false;
247 if (_input_stopped) {
260 _decoder->get_buffer (&_inpos, &
bufsize);
262 const int rc = read (_inpos,
bufsize);
266 error (connection_error);
273 _insize =
static_cast<size_t> (rc);
275 _decoder->resize_buffer (_insize);
279 size_t processed = 0;
281 while (_insize > 0) {
282 rc = _decoder->decode (_inpos, _insize, processed);
285 _insize -= processed;
286 if (rc == 0 || rc == -1)
288 rc = (this->*_process_msg) (_decoder->msg ());
301 if (!_endpoint_uri_pair.clash ()) {
302 error (protocol_error);
306 _input_stopped =
true;
307 reset_pollin (_handle);
329 _outsize = _encoder->encode (&_outpos, 0);
331 while (_outsize <
static_cast<size_t> (_options.out_batch_size)) {
332 if ((this->*_next_msg) (&_tx_msg) == -1) {
340 _encoder->load_msg (&_tx_msg);
341 unsigned char *bufptr = _outpos + _outsize;
343 _encoder->encode (&bufptr, _options.out_batch_size - _outsize);
352 _output_stopped =
true;
363 const int nbytes = write (_outpos, _outsize);
388 if (
likely (_output_stopped)) {
390 _output_stopped =
false;
406 int rc = (this->*_process_msg) (_decoder->msg ());
411 error (protocol_error);
417 while (_insize > 0) {
418 size_t processed = 0;
419 rc = _decoder->decode (_inpos, _insize, processed);
422 _insize -= processed;
423 if (rc == 0 || rc == -1)
425 rc = (this->*_process_msg) (_decoder->msg ());
432 else if (_io_error) {
433 error (connection_error);
435 }
else if (rc == -1) {
436 error (protocol_error);
441 _input_stopped =
false;
446 if (!in_event_internal ())
459 return pull_and_encode (msg_);
465 const int rc = _mechanism->next_handshake_command (msg_);
476 const int rc = _mechanism->process_handshake_command (msg_);
495 const int rc = _mechanism->zap_msg_available ();
497 error (protocol_error);
501 if (!restart_input ())
509 return _endpoint_uri_pair;
514 if (_options.heartbeat_interval > 0 && !_has_heartbeat_timer) {
515 add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
516 _has_heartbeat_timer =
true;
519 if (_has_handshake_stage)
520 _session->engine_ready ();
522 bool flush_session =
false;
524 if (_options.recv_routing_id) {
526 _mechanism->peer_routing_id (&routing_id);
527 const int rc = _session->push_msg (&routing_id);
535 flush_session =
true;
539 msg_t connect_notification;
540 connect_notification.
init ();
541 const int rc = _session->push_msg (&connect_notification);
549 flush_session =
true;
560 init_properties (properties);
563 const properties_t &zap_properties = _mechanism->get_zap_properties ();
564 properties.insert (zap_properties.begin (), zap_properties.end ());
567 const properties_t &zmtp_properties = _mechanism->get_zmtp_properties ();
568 properties.insert (zmtp_properties.begin (), zmtp_properties.end ());
571 if (!properties.empty ()) {
572 _metadata =
new (std::nothrow)
metadata_t (properties);
576 if (_has_handshake_timer) {
577 cancel_timer (handshake_timer_id);
578 _has_handshake_timer =
false;
581 _socket->event_handshake_succeeded (_endpoint_uri_pair, 0);
589 const blob_t &credential = _mechanism->get_user_id ();
590 if (credential.
size () > 0) {
594 memcpy (msg.
data (), credential.
data (), credential.
size ());
596 rc = _session->push_msg (&msg);
604 return decode_and_push (msg_);
611 if (_session->pull_msg (msg_) == -1)
613 if (_mechanism->encode (msg_) == -1)
622 if (_mechanism->decode (msg_) == -1)
625 if (_has_timeout_timer) {
626 _has_timeout_timer =
false;
627 cancel_timer (heartbeat_timeout_timer_id);
630 if (_has_ttl_timer) {
631 _has_ttl_timer =
false;
632 cancel_timer (heartbeat_ttl_timer_id);
636 process_command_message (msg_);
641 if (_session->push_msg (msg_) == -1) {
651 const int rc = _session->push_msg (msg_);
659 return _session->pull_msg (msg_);
664 return _session->push_msg (msg_);
675 _session->rollback ();
677 msg_t disconnect_notification;
678 disconnect_notification.
init ();
679 _session->push_msg (&disconnect_notification);
683 if (reason_ != protocol_error
684 && (_mechanism ==
NULL
687 _socket->event_handshake_failed_no_detail (_endpoint_uri_pair,
err);
691 if (((reason_ == connection_error) || (reason_ == timeout_error))
692 && (_options.reconnect_stop
694 reason_ = protocol_error;
698 _socket->event_disconnected (_endpoint_uri_pair, _s);
700 _session->engine_error (
702 && (_mechanism ==
NULL
713 if (_options.handshake_ivl > 0) {
714 add_timer (_options.handshake_ivl, handshake_timer_id);
715 _has_handshake_timer =
true;
721 if (_peer_address.empty ())
723 properties_.ZMQ_MAP_INSERT_OR_EMPLACE (
727 std::ostringstream
stream;
730 properties_.ZMQ_MAP_INSERT_OR_EMPLACE (
std::string (
"__fd"),
737 if (id_ == handshake_timer_id) {
738 _has_handshake_timer =
false;
740 error (timeout_error);
741 }
else if (id_ == heartbeat_ivl_timer_id) {
744 add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
745 }
else if (id_ == heartbeat_ttl_timer_id) {
746 _has_ttl_timer =
false;
747 error (timeout_error);
748 }
else if (id_ == heartbeat_timeout_timer_id) {
749 _has_timeout_timer =
false;
750 error (timeout_error);