ws_connecter.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <new>
5 #include <string>
6 
7 #include "macros.hpp"
8 #include "ws_connecter.hpp"
9 #include "io_thread.hpp"
10 #include "err.hpp"
11 #include "ip.hpp"
12 #include "tcp.hpp"
13 #include "address.hpp"
14 #include "ws_address.hpp"
15 #include "ws_engine.hpp"
16 #include "session_base.hpp"
17 
18 #ifdef ZMQ_HAVE_WSS
19 #include "wss_engine.hpp"
20 #include "wss_address.hpp"
21 #endif
22 
23 #if !defined ZMQ_HAVE_WINDOWS
24 #include <unistd.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <arpa/inet.h>
28 #include <netinet/tcp.h>
29 #include <netinet/in.h>
30 #include <netdb.h>
31 #include <fcntl.h>
32 #ifdef ZMQ_HAVE_VXWORKS
33 #include <sockLib.h>
34 #endif
35 #ifdef ZMQ_HAVE_OPENVMS
36 #include <ioctl.h>
37 #endif
38 #endif
39 
40 #ifdef __APPLE__
41 #include <TargetConditionals.h>
42 #endif
43 
44 zmq::ws_connecter_t::ws_connecter_t (class io_thread_t *io_thread_,
45  class session_base_t *session_,
46  const options_t &options_,
47  address_t *addr_,
48  bool delayed_start_,
49  bool wss_,
50  const std::string &tls_hostname_) :
51  stream_connecter_base_t (
52  io_thread_, session_, options_, addr_, delayed_start_),
53  _connect_timer_started (false),
54  _wss (wss_),
55  _hostname (tls_hostname_)
56 {
57 }
58 
59 zmq::ws_connecter_t::~ws_connecter_t ()
60 {
61  zmq_assert (!_connect_timer_started);
62 }
63 
64 void zmq::ws_connecter_t::process_term (int linger_)
65 {
66  if (_connect_timer_started) {
67  cancel_timer (connect_timer_id);
68  _connect_timer_started = false;
69  }
70 
72 }
73 
74 void zmq::ws_connecter_t::out_event ()
75 {
76  if (_connect_timer_started) {
77  cancel_timer (connect_timer_id);
78  _connect_timer_started = false;
79  }
80 
81  // TODO this is still very similar to (t)ipc_connecter_t, maybe the
82  // differences can be factored out
83 
84  rm_handle ();
85 
86  const fd_t fd = connect ();
87 
88  // Handle the error condition by attempt to reconnect.
89  if (fd == retired_fd || !tune_socket (fd)) {
90  close ();
91  add_reconnect_timer ();
92  return;
93  }
94 
95  if (_wss)
96 #ifdef ZMQ_HAVE_WSS
97  create_engine (fd,
98  get_socket_name<wss_address_t> (fd, socket_end_local));
99 #else
100  assert (false);
101 #endif
102  else
103  create_engine (fd,
104  get_socket_name<ws_address_t> (fd, socket_end_local));
105 }
106 
107 void zmq::ws_connecter_t::timer_event (int id_)
108 {
109  if (id_ == connect_timer_id) {
110  _connect_timer_started = false;
111  rm_handle ();
112  close ();
113  add_reconnect_timer ();
114  } else
116 }
117 
118 void zmq::ws_connecter_t::start_connecting ()
119 {
120  // Open the connecting socket.
121  const int rc = open ();
122 
123  // Connect may succeed in synchronous manner.
124  if (rc == 0) {
125  _handle = add_fd (_s);
126  out_event ();
127  }
128 
129  // Connection establishment may be delayed. Poll for its completion.
130  else if (rc == -1 && errno == EINPROGRESS) {
131  _handle = add_fd (_s);
132  set_pollout (_handle);
133  _socket->event_connect_delayed (
135 
136  // add userspace connect timeout
137  add_connect_timer ();
138  }
139 
140  // Handle any other error condition by eventual reconnect.
141  else {
142  if (_s != retired_fd)
143  close ();
144  add_reconnect_timer ();
145  }
146 }
147 
148 void zmq::ws_connecter_t::add_connect_timer ()
149 {
150  if (options.connect_timeout > 0) {
151  add_timer (options.connect_timeout, connect_timer_id);
152  _connect_timer_started = true;
153  }
154 }
155 
156 int zmq::ws_connecter_t::open ()
157 {
158  zmq_assert (_s == retired_fd);
159 
160  tcp_address_t tcp_addr;
161  _s = tcp_open_socket (_addr->address.c_str (), options, false, true,
162  &tcp_addr);
163  if (_s == retired_fd)
164  return -1;
165 
166  // Set the socket to non-blocking mode so that we get async connect().
167  unblock_socket (_s);
168 
169  // Connect to the remote peer.
170 #ifdef ZMQ_HAVE_VXWORKS
171  int rc = ::connect (_s, (sockaddr *) tcp_addr.addr (), tcp_addr.addrlen ());
172 #else
173  const int rc = ::connect (_s, tcp_addr.addr (), tcp_addr.addrlen ());
174 #endif
175  // Connect was successful immediately.
176  if (rc == 0) {
177  return 0;
178  }
179 
180  // Translate error codes indicating asynchronous connect has been
181  // launched to a uniform EINPROGRESS.
182 #ifdef ZMQ_HAVE_WINDOWS
183  const int last_error = WSAGetLastError ();
184  if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
185  errno = EINPROGRESS;
186  else
187  errno = wsa_error_to_errno (last_error);
188 #else
189  if (errno == EINTR)
190  errno = EINPROGRESS;
191 #endif
192  return -1;
193 }
194 
195 zmq::fd_t zmq::ws_connecter_t::connect ()
196 {
197  // Async connect has finished. Check whether an error occurred
198  int err = 0;
199 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
200  int len = sizeof err;
201 #else
202  socklen_t len = sizeof err;
203 #endif
204 
205  const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
206  reinterpret_cast<char *> (&err), &len);
207 
208  // Assert if the error was caused by 0MQ bug.
209  // Networking problems are OK. No need to assert.
210 #ifdef ZMQ_HAVE_WINDOWS
211  zmq_assert (rc == 0);
212  if (err != 0) {
213  if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
214  || err == WSAENOBUFS) {
215  wsa_assert_no (err);
216  }
217  return retired_fd;
218  }
219 #else
220  // Following code should handle both Berkeley-derived socket
221  // implementations and Solaris.
222  if (rc == -1)
223  err = errno;
224  if (err != 0) {
225  errno = err;
226 #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
227  errno_assert (errno != EBADF && errno != ENOPROTOOPT
228  && errno != ENOTSOCK && errno != ENOBUFS);
229 #else
230  errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
231  && errno != ENOBUFS);
232 #endif
233  return retired_fd;
234  }
235 #endif
236 
237  // Return the newly connected socket.
238  const fd_t result = _s;
239  _s = retired_fd;
240  return result;
241 }
242 
243 bool zmq::ws_connecter_t::tune_socket (const fd_t fd_)
244 {
245  const int rc =
246  tune_tcp_socket (fd_) | tune_tcp_maxrt (fd_, options.tcp_maxrt);
247  return rc == 0;
248 }
249 
250 void zmq::ws_connecter_t::create_engine (fd_t fd_,
251  const std::string &local_address_)
252 {
253  const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
255 
256  // Create the engine object for this connection.
257  i_engine *engine = NULL;
258  if (_wss) {
259 #ifdef ZMQ_HAVE_WSS
260  engine = new (std::nothrow)
261  wss_engine_t (fd_, options, endpoint_pair, *_addr->resolved.ws_addr,
262  true, NULL, _hostname);
263 #else
264  LIBZMQ_UNUSED (_hostname);
265  assert (false);
266 #endif
267  } else
268  engine = new (std::nothrow) ws_engine_t (
269  fd_, options, endpoint_pair, *_addr->resolved.ws_addr, true);
270  alloc_assert (engine);
271 
272  // Attach the engine to the corresponding session object.
273  send_attach (_session, engine);
274 
275  // Shut the connecter down.
276  terminate ();
277 
278  _socket->event_connected (endpoint_pair, fd_);
279 }
ip.hpp
zmq::socket_end_local
@ socket_end_local
Definition: address.hpp:112
zmq::make_unconnected_connect_endpoint_pair
endpoint_uri_pair_t make_unconnected_connect_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:7
ws_connecter.hpp
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
wss_engine.hpp
errno
int errno
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
zmq::endpoint_type_connect
@ endpoint_type_connect
Definition: endpoint.hpp:14
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
zmq::tune_tcp_maxrt
int tune_tcp_maxrt(fd_t sockfd_, int timeout_)
Definition: tcp.cpp:160
EBADF
#define EBADF
Definition: errno.hpp:12
ws_engine.hpp
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
ENOTSOCK
#define ENOTSOCK
Definition: zmq.h:128
EINPROGRESS
#define EINPROGRESS
Definition: zmq.h:125
address.hpp
zmq::unblock_socket
void unblock_socket(fd_t s_)
Definition: ip.cpp:107
zmq::stream_connecter_base_t::timer_event
void timer_event(int id_) ZMQ_OVERRIDE
Definition: stream_connecter_base.cpp:170
io_thread.hpp
len
int len
Definition: php/ext/google/protobuf/map.c:206
ENOBUFS
#define ENOBUFS
Definition: zmq.h:110
tcp.hpp
zmq::tune_tcp_socket
int tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:30
ws_address.hpp
zmq::tcp_open_socket
fd_t tcp_open_socket(const char *address_, const options_t &options_, bool local_, bool fallback_to_ipv4_, tcp_address_t *out_tcp_addr_)
Definition: tcp.cpp:332
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
zmq::stream_connecter_base_t::process_term
void process_term(int linger_) ZMQ_OVERRIDE
Definition: stream_connecter_base.cpp:58
wss_address.hpp
session_base.hpp
false
#define false
Definition: cJSON.c:70
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02