vmci_connecter.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 
5 #include "vmci_connecter.hpp"
6 
7 #if defined ZMQ_HAVE_VMCI
8 
9 #include <new>
10 
11 #include "io_thread.hpp"
12 #include "platform.hpp"
13 #include "random.hpp"
14 #include "err.hpp"
15 #include "ip.hpp"
16 #include "address.hpp"
17 #include "vmci_address.hpp"
18 #include "vmci.hpp"
19 #include "session_base.hpp"
20 
21 zmq::vmci_connecter_t::vmci_connecter_t (class io_thread_t *io_thread_,
22  class session_base_t *session_,
23  const options_t &options_,
24  address_t *addr_,
25  bool delayed_start_) :
26  stream_connecter_base_t (
27  io_thread_, session_, options_, addr_, delayed_start_),
28  _connect_timer_started (false)
29 {
30  zmq_assert (_addr->protocol == protocol_name::vmci);
31 }
32 
33 zmq::vmci_connecter_t::~vmci_connecter_t ()
34 {
35  zmq_assert (!_connect_timer_started);
36 }
37 
38 void zmq::vmci_connecter_t::process_term (int linger_)
39 {
40  if (_connect_timer_started) {
41  cancel_timer (connect_timer_id);
42  _connect_timer_started = false;
43  }
44 
45  stream_connecter_base_t::process_term (linger_);
46 }
47 
48 void zmq::vmci_connecter_t::in_event ()
49 {
50  // We are not polling for incoming data, so we are actually called
51  // because of error here. However, we can get error on out event as well
52  // on some platforms, so we'll simply handle both events in the same way.
53  out_event ();
54 }
55 
56 void zmq::vmci_connecter_t::out_event ()
57 {
58  if (_connect_timer_started) {
59  cancel_timer (connect_timer_id);
60  _connect_timer_started = false;
61  }
62 
63  // TODO this is still very similar to (t)ipc_connecter_t, maybe the
64  // differences can be factored out
65 
66  rm_handle ();
67 
68  const fd_t fd = connect ();
69 
70  if (fd == retired_fd
71  && ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED)
72  && errno == ECONNREFUSED)) {
73  send_conn_failed (_session);
74  close ();
75  terminate ();
76  return;
77  }
78 
79  // Handle the error condition by attempt to reconnect.
80  if (fd == retired_fd) {
81  close ();
82  add_reconnect_timer ();
83  return;
84  }
85 
86  tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size,
87  options.vmci_buffer_min_size,
88  options.vmci_buffer_max_size);
89 
90  if (options.vmci_connect_timeout > 0) {
91 #if defined ZMQ_HAVE_WINDOWS
92  tune_vmci_connect_timeout (this->get_ctx (), fd,
93  options.vmci_connect_timeout);
94 #else
95  struct timeval timeout = {0, options.vmci_connect_timeout * 1000};
96  tune_vmci_connect_timeout (this->get_ctx (), fd, timeout);
97 #endif
98  }
99 
100  create_engine (
102 }
103 
106  socket_end_t socket_end_) const
107 {
108  struct sockaddr_storage ss;
109  const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss);
110  if (sl == 0) {
111  return std::string ();
112  }
113 
114  const vmci_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl,
115  this->get_ctx ());
116  std::string address_string;
117  addr.to_string (address_string);
118  return address_string;
119 }
120 
121 void zmq::vmci_connecter_t::timer_event (int id_)
122 {
123  if (id_ == connect_timer_id) {
124  _connect_timer_started = false;
125  rm_handle ();
126  close ();
127  add_reconnect_timer ();
128  } else
129  stream_connecter_base_t::timer_event (id_);
130 }
131 
132 void zmq::vmci_connecter_t::start_connecting ()
133 {
134  // Open the connecting socket.
135  const int rc = open ();
136 
137  // Connect may succeed in synchronous manner.
138  if (rc == 0) {
139  _handle = add_fd (_s);
140  out_event ();
141  }
142 
143  // Connection establishment may be delayed. Poll for its completion.
144  else if (rc == -1 && errno == EINPROGRESS) {
145  _handle = add_fd (_s);
146  set_pollout (_handle);
147  _socket->event_connect_delayed (
149 
150  // add userspace connect timeout
151  add_connect_timer ();
152  }
153 
154  // Handle any other error condition by eventual reconnect.
155  else {
156  if (_s != retired_fd)
157  close ();
158  add_reconnect_timer ();
159  }
160 }
161 
162 void zmq::vmci_connecter_t::add_connect_timer ()
163 {
164  if (options.connect_timeout > 0) {
165  add_timer (options.connect_timeout, connect_timer_id);
166  _connect_timer_started = true;
167  }
168 }
169 
170 int zmq::vmci_connecter_t::open ()
171 {
172  zmq_assert (_s == retired_fd);
173 
174  // Resolve the address
175  if (_addr->resolved.vmci_addr != NULL) {
176  LIBZMQ_DELETE (_addr->resolved.vmci_addr);
177  }
178 
179  _addr->resolved.vmci_addr =
180  new (std::nothrow) vmci_address_t (this->get_ctx ());
181  alloc_assert (_addr->resolved.vmci_addr);
182  _s = vmci_open_socket (_addr->address.c_str (), options,
183  _addr->resolved.vmci_addr);
184  if (_s == retired_fd) {
185  // TODO we should emit some event in this case!
186 
187  LIBZMQ_DELETE (_addr->resolved.vmci_addr);
188  return -1;
189  }
190  zmq_assert (_addr->resolved.vmci_addr != NULL);
191 
192  // Set the socket to non-blocking mode so that we get async connect().
193  unblock_socket (_s);
194 
195  const vmci_address_t *const vmci_addr = _addr->resolved.vmci_addr;
196 
197  int rc;
198 
199  // Connect to the remote peer.
200 #if defined ZMQ_HAVE_VXWORKS
201  rc = ::connect (_s, (sockaddr *) vmci_addr->addr (), vmci_addr->addrlen ());
202 #else
203  rc = ::connect (_s, vmci_addr->addr (), vmci_addr->addrlen ());
204 #endif
205  // Connect was successful immediately.
206  if (rc == 0) {
207  return 0;
208  }
209 
210  // Translate error codes indicating asynchronous connect has been
211  // launched to a uniform EINPROGRESS.
212 #ifdef ZMQ_HAVE_WINDOWS
213  const int last_error = WSAGetLastError ();
214  if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
215  errno = EINPROGRESS;
216  else
217  errno = wsa_error_to_errno (last_error);
218 #else
219  if (errno == EINTR)
220  errno = EINPROGRESS;
221 #endif
222  return -1;
223 }
224 
225 zmq::fd_t zmq::vmci_connecter_t::connect ()
226 {
227  // Async connect has finished. Check whether an error occurred
228  int err = 0;
229 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
230  int len = sizeof err;
231 #else
232  socklen_t len = sizeof err;
233 #endif
234 
235  const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
236  reinterpret_cast<char *> (&err), &len);
237 
238  // Assert if the error was caused by 0MQ bug.
239  // Networking problems are OK. No need to assert.
240 #ifdef ZMQ_HAVE_WINDOWS
241  zmq_assert (rc == 0);
242  if (err != 0) {
243  if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
244  || err == WSAENOBUFS) {
245  wsa_assert_no (err);
246  }
247  errno = wsa_error_to_errno (err);
248  return retired_fd;
249  }
250 #else
251  // Following code should handle both Berkeley-derived socket
252  // implementations and Solaris.
253  if (rc == -1)
254  err = errno;
255  if (err != 0) {
256  errno = err;
257 #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
258  errno_assert (errno != EBADF && errno != ENOPROTOOPT
259  && errno != ENOTSOCK && errno != ENOBUFS);
260 #else
261  errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
262  && errno != ENOBUFS);
263 #endif
264  return retired_fd;
265  }
266 #endif
267 
268  // Return the newly connected socket.
269  const fd_t result = _s;
270  _s = retired_fd;
271  return result;
272 }
273 
274 #endif
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
zmq::get_socket_name
std::string get_socket_name(fd_t fd_, socket_end_t socket_end_)
Definition: address.hpp:120
ip.hpp
zmq::socket_end_local
@ socket_end_local
Definition: address.hpp:112
zmq::make_unconnected_connect_endpoint_pair
endpoint_uri_pair_t make_unconnected_connect_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:7
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
vmci_connecter.hpp
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
random.hpp
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
ECONNREFUSED
#define ECONNREFUSED
Definition: zmq.h:122
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
retired_fd
@ retired_fd
Definition: libzmq/tests/testutil.hpp:117
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::get_socket_address
zmq_socklen_t get_socket_address(fd_t fd_, socket_end_t socket_end_, sockaddr_storage *ss_)
Definition: address.cpp:102
EBADF
#define EBADF
Definition: errno.hpp:12
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
ENOTSOCK
#define ENOTSOCK
Definition: zmq.h:128
EINPROGRESS
#define EINPROGRESS
Definition: zmq.h:125
address.hpp
zmq::unblock_socket
void unblock_socket(fd_t s_)
Definition: ip.cpp:107
io_thread.hpp
len
int len
Definition: php/ext/google/protobuf/map.c:206
ENOBUFS
#define ENOBUFS
Definition: zmq.h:110
vmci.hpp
ZMQ_RECONNECT_STOP_CONN_REFUSED
#define ZMQ_RECONNECT_STOP_CONN_REFUSED
Definition: zmq_draft.h:66
zmq::zmq_socklen_t
socklen_t zmq_socklen_t
Definition: address.hpp:107
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
vmci_address.hpp
zmq::socket_end_t
socket_end_t
Definition: address.hpp:110
session_base.hpp
false
#define false
Definition: cJSON.c:70
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:01