ipc_connecter.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "ipc_connecter.hpp"
5 
6 #if defined ZMQ_HAVE_IPC
7 
8 #include <new>
9 #include <string>
10 
11 #include "io_thread.hpp"
12 #include "random.hpp"
13 #include "err.hpp"
14 #include "ip.hpp"
15 #include "address.hpp"
16 #include "ipc_address.hpp"
17 #include "session_base.hpp"
18 
19 #if defined ZMQ_HAVE_WINDOWS
20 #include <afunix.h>
21 #else
22 #include <unistd.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <sys/un.h>
26 #endif
27 
28 zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
29  class session_base_t *session_,
30  const options_t &options_,
31  address_t *addr_,
32  bool delayed_start_) :
33  stream_connecter_base_t (
34  io_thread_, session_, options_, addr_, delayed_start_)
35 {
36  zmq_assert (_addr->protocol == protocol_name::ipc);
37 }
38 
39 void zmq::ipc_connecter_t::out_event ()
40 {
41  const fd_t fd = connect ();
42  rm_handle ();
43 
44  // Handle the error condition by attempt to reconnect.
45  if (fd == retired_fd) {
46  close ();
47  add_reconnect_timer ();
48  return;
49  }
50 
51  create_engine (fd, get_socket_name<ipc_address_t> (fd, socket_end_local));
52 }
53 
54 void zmq::ipc_connecter_t::start_connecting ()
55 {
56  // Open the connecting socket.
57  const int rc = open ();
58 
59  // Connect may succeed in synchronous manner.
60  if (rc == 0) {
61  _handle = add_fd (_s);
62  out_event ();
63  }
64 
65  // Connection establishment may be delayed. Poll for its completion.
66  else if (rc == -1 && errno == EINPROGRESS) {
67  _handle = add_fd (_s);
68  set_pollout (_handle);
69  _socket->event_connect_delayed (
71 
72  // TODO, tcp_connecter_t adds a connect timer in this case; maybe this
73  // should be done here as well (and then this could be pulled up to
74  // stream_connecter_base_t).
75  }
76  //stop connecting after called zmq_disconnect
77  else if (rc == -1
78  && (options.reconnect_stop & ZMQ_RECONNECT_STOP_AFTER_DISCONNECT)
79  && errno == ECONNREFUSED && _socket->is_disconnected ()) {
80  if (_s != retired_fd)
81  close ();
82  }
83 
84  // Handle any other error condition by eventual reconnect.
85  else {
86  if (_s != retired_fd)
87  close ();
88  add_reconnect_timer ();
89  }
90 }
91 
92 int zmq::ipc_connecter_t::open ()
93 {
94  zmq_assert (_s == retired_fd);
95 
96  // Create the socket.
97  _s = open_socket (AF_UNIX, SOCK_STREAM, 0);
98  if (_s == retired_fd)
99  return -1;
100 
101  // Set the non-blocking flag.
102  unblock_socket (_s);
103 
104  // Connect to the remote peer.
105  const int rc = ::connect (_s, _addr->resolved.ipc_addr->addr (),
106  _addr->resolved.ipc_addr->addrlen ());
107 
108  // Connect was successful immediately.
109  if (rc == 0)
110  return 0;
111 
112  // Translate other error codes indicating asynchronous connect has been
113  // launched to a uniform EINPROGRESS.
114 #ifdef ZMQ_HAVE_WINDOWS
115  const int last_error = WSAGetLastError ();
116  if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
117  errno = EINPROGRESS;
118  else
119  errno = wsa_error_to_errno (last_error);
120 #else
121  if (rc == -1 && errno == EINTR) {
122  errno = EINPROGRESS;
123  }
124 #endif
125 
126  // Forward the error.
127  return -1;
128 }
129 
130 zmq::fd_t zmq::ipc_connecter_t::connect ()
131 {
132  // Following code should handle both Berkeley-derived socket
133  // implementations and Solaris.
134  int err = 0;
135  zmq_socklen_t len = static_cast<zmq_socklen_t> (sizeof (err));
136  const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
137  reinterpret_cast<char *> (&err), &len);
138  if (rc == -1) {
139  if (errno == ENOPROTOOPT)
140  errno = 0;
141  err = errno;
142  }
143  if (err != 0) {
144  // Assert if the error was caused by 0MQ bug.
145  // Networking problems are OK. No need to assert.
146  errno = err;
148  || errno == ETIMEDOUT || errno == EHOSTUNREACH
149  || errno == ENETUNREACH || errno == ENETDOWN);
150 
151  return retired_fd;
152  }
153 
154  const fd_t result = _s;
155  _s = retired_fd;
156  return result;
157 }
158 
159 #endif
ZMQ_RECONNECT_STOP_AFTER_DISCONNECT
#define ZMQ_RECONNECT_STOP_AFTER_DISCONNECT
Definition: zmq_draft.h:68
ip.hpp
zmq::socket_end_local
@ socket_end_local
Definition: address.hpp:112
zmq::make_unconnected_connect_endpoint_pair
endpoint_uri_pair_t make_unconnected_connect_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:7
EINTR
#define EINTR
Definition: errno.hpp:7
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
ipc_address.hpp
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
random.hpp
errno
int errno
ECONNREFUSED
#define ECONNREFUSED
Definition: zmq.h:122
ipc_connecter.hpp
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
ENETDOWN
#define ENETDOWN
Definition: zmq.h:113
EINPROGRESS
#define EINPROGRESS
Definition: zmq.h:125
address.hpp
zmq::unblock_socket
void unblock_socket(fd_t s_)
Definition: ip.cpp:107
zmq::open_socket
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:73
io_thread.hpp
len
int len
Definition: php/ext/google/protobuf/map.c:206
ECONNRESET
#define ECONNRESET
Definition: zmq.h:143
zmq::zmq_socklen_t
socklen_t zmq_socklen_t
Definition: address.hpp:107
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
EHOSTUNREACH
#define EHOSTUNREACH
Definition: zmq.h:152
ENETUNREACH
#define ENETUNREACH
Definition: zmq.h:137
ETIMEDOUT
#define ETIMEDOUT
Definition: zmq.h:149
session_base.hpp
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:54