tcp_connecter.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <new>
5 #include <string>
6 
7 #include "macros.hpp"
8 #include "tcp_connecter.hpp"
9 #include "io_thread.hpp"
10 #include "err.hpp"
11 #include "ip.hpp"
12 #include "tcp.hpp"
13 #include "address.hpp"
14 #include "tcp_address.hpp"
15 #include "session_base.hpp"
16 
17 #if !defined ZMQ_HAVE_WINDOWS
18 #include <unistd.h>
19 #include <sys/types.h>
20 #include <sys/socket.h>
21 #include <arpa/inet.h>
22 #include <netinet/tcp.h>
23 #include <netinet/in.h>
24 #include <netdb.h>
25 #include <fcntl.h>
26 #ifdef ZMQ_HAVE_VXWORKS
27 #include <sockLib.h>
28 #endif
29 #ifdef ZMQ_HAVE_OPENVMS
30 #include <ioctl.h>
31 #endif
32 #endif
33 
34 #ifdef __APPLE__
35 #include <TargetConditionals.h>
36 #endif
37 
38 zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
39  class session_base_t *session_,
40  const options_t &options_,
41  address_t *addr_,
42  bool delayed_start_) :
43  stream_connecter_base_t (
44  io_thread_, session_, options_, addr_, delayed_start_),
45  _connect_timer_started (false)
46 {
47  zmq_assert (_addr->protocol == protocol_name::tcp);
48 }
49 
50 zmq::tcp_connecter_t::~tcp_connecter_t ()
51 {
52  zmq_assert (!_connect_timer_started);
53 }
54 
55 void zmq::tcp_connecter_t::process_term (int linger_)
56 {
57  if (_connect_timer_started) {
58  cancel_timer (connect_timer_id);
59  _connect_timer_started = false;
60  }
61 
63 }
64 
65 void zmq::tcp_connecter_t::out_event ()
66 {
67  if (_connect_timer_started) {
68  cancel_timer (connect_timer_id);
69  _connect_timer_started = false;
70  }
71 
72  // TODO this is still very similar to (t)ipc_connecter_t, maybe the
73  // differences can be factored out
74 
75  rm_handle ();
76 
77  const fd_t fd = connect ();
78 
79  if (fd == retired_fd
80  && ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED)
81  && errno == ECONNREFUSED)) {
82  send_conn_failed (_session);
83  close ();
84  terminate ();
85  return;
86  }
87 
88  // Handle the error condition by attempt to reconnect.
89  if (fd == retired_fd || !tune_socket (fd)) {
90  close ();
91  add_reconnect_timer ();
92  return;
93  }
94 
95  create_engine (fd, get_socket_name<tcp_address_t> (fd, socket_end_local));
96 }
97 
98 void zmq::tcp_connecter_t::timer_event (int id_)
99 {
100  if (id_ == connect_timer_id) {
101  _connect_timer_started = false;
102  rm_handle ();
103  close ();
104  add_reconnect_timer ();
105  } else
107 }
108 
109 void zmq::tcp_connecter_t::start_connecting ()
110 {
111  // Open the connecting socket.
112  const int rc = open ();
113 
114  // Connect may succeed in synchronous manner.
115  if (rc == 0) {
116  _handle = add_fd (_s);
117  out_event ();
118  }
119 
120  // Connection establishment may be delayed. Poll for its completion.
121  else if (rc == -1 && errno == EINPROGRESS) {
122  _handle = add_fd (_s);
123  set_pollout (_handle);
124  _socket->event_connect_delayed (
126 
127  // add userspace connect timeout
128  add_connect_timer ();
129  }
130 
131  // Handle any other error condition by eventual reconnect.
132  else {
133  if (_s != retired_fd)
134  close ();
135  add_reconnect_timer ();
136  }
137 }
138 
139 void zmq::tcp_connecter_t::add_connect_timer ()
140 {
141  if (options.connect_timeout > 0) {
142  add_timer (options.connect_timeout, connect_timer_id);
143  _connect_timer_started = true;
144  }
145 }
146 
147 int zmq::tcp_connecter_t::open ()
148 {
149  zmq_assert (_s == retired_fd);
150 
151  // Resolve the address
152  if (_addr->resolved.tcp_addr != NULL) {
153  LIBZMQ_DELETE (_addr->resolved.tcp_addr);
154  }
155 
156  _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
157  alloc_assert (_addr->resolved.tcp_addr);
158  _s = tcp_open_socket (_addr->address.c_str (), options, false, true,
159  _addr->resolved.tcp_addr);
160  if (_s == retired_fd) {
161  // TODO we should emit some event in this case!
162 
163  LIBZMQ_DELETE (_addr->resolved.tcp_addr);
164  return -1;
165  }
166  zmq_assert (_addr->resolved.tcp_addr != NULL);
167 
168  // Set the socket to non-blocking mode so that we get async connect().
169  unblock_socket (_s);
170 
171  const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
172 
173  int rc;
174 
175  // Set a source address for conversations
176  if (tcp_addr->has_src_addr ()) {
177  // Allow reusing of the address, to connect to different servers
178  // using the same source port on the client.
179  int flag = 1;
180 #ifdef ZMQ_HAVE_WINDOWS
181  rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR,
182  reinterpret_cast<const char *> (&flag), sizeof (int));
183  wsa_assert (rc != SOCKET_ERROR);
184 #elif defined ZMQ_HAVE_VXWORKS
185  rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag,
186  sizeof (int));
187  errno_assert (rc == 0);
188 #else
189  rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
190  errno_assert (rc == 0);
191 #endif
192 
193 #if defined ZMQ_HAVE_VXWORKS
194  rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
195  tcp_addr->src_addrlen ());
196 #else
197  rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
198 #endif
199  if (rc == -1)
200  return -1;
201  }
202 
203  // Connect to the remote peer.
204 #if defined ZMQ_HAVE_VXWORKS
205  rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
206 #else
207  rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
208 #endif
209  // Connect was successful immediately.
210  if (rc == 0) {
211  return 0;
212  }
213 
214  // Translate error codes indicating asynchronous connect has been
215  // launched to a uniform EINPROGRESS.
216 #ifdef ZMQ_HAVE_WINDOWS
217  const int last_error = WSAGetLastError ();
218  if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
219  errno = EINPROGRESS;
220  else
221  errno = wsa_error_to_errno (last_error);
222 #else
223  if (errno == EINTR)
224  errno = EINPROGRESS;
225 #endif
226  return -1;
227 }
228 
229 zmq::fd_t zmq::tcp_connecter_t::connect ()
230 {
231  // Async connect has finished. Check whether an error occurred
232  int err = 0;
233 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
234  int len = sizeof err;
235 #else
236  socklen_t len = sizeof err;
237 #endif
238 
239  const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
240  reinterpret_cast<char *> (&err), &len);
241 
242  // Assert if the error was caused by 0MQ bug.
243  // Networking problems are OK. No need to assert.
244 #ifdef ZMQ_HAVE_WINDOWS
245  zmq_assert (rc == 0);
246  if (err != 0) {
247  if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
248  || err == WSAENOBUFS) {
249  wsa_assert_no (err);
250  }
251  errno = wsa_error_to_errno (err);
252  return retired_fd;
253  }
254 #else
255  // Following code should handle both Berkeley-derived socket
256  // implementations and Solaris.
257  if (rc == -1)
258  err = errno;
259  if (err != 0) {
260  errno = err;
261 #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
262  errno_assert (errno != EBADF && errno != ENOPROTOOPT
263  && errno != ENOTSOCK && errno != ENOBUFS);
264 #else
265  errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
266  && errno != ENOBUFS);
267 #endif
268  return retired_fd;
269  }
270 #endif
271 
272  // Return the newly connected socket.
273  const fd_t result = _s;
274  _s = retired_fd;
275  return result;
276 }
277 
278 bool zmq::tcp_connecter_t::tune_socket (const fd_t fd_)
279 {
280  const int rc = tune_tcp_socket (fd_)
282  fd_, options.tcp_keepalive, options.tcp_keepalive_cnt,
283  options.tcp_keepalive_idle, options.tcp_keepalive_intvl)
284  | tune_tcp_maxrt (fd_, options.tcp_maxrt);
285  return rc == 0;
286 }
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
ip.hpp
zmq::socket_end_local
@ socket_end_local
Definition: address.hpp:112
zmq::make_unconnected_connect_endpoint_pair
endpoint_uri_pair_t make_unconnected_connect_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:7
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
zmq::protocol_name::tcp
static const char tcp[]
Definition: address.hpp:38
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
errno
int errno
ECONNREFUSED
#define ECONNREFUSED
Definition: zmq.h:122
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
tcp_address.hpp
zmq::tune_tcp_maxrt
int tune_tcp_maxrt(fd_t sockfd_, int timeout_)
Definition: tcp.cpp:160
EBADF
#define EBADF
Definition: errno.hpp:12
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
tcp_connecter.hpp
ENOTSOCK
#define ENOTSOCK
Definition: zmq.h:128
EINPROGRESS
#define EINPROGRESS
Definition: zmq.h:125
address.hpp
zmq::unblock_socket
void unblock_socket(fd_t s_)
Definition: ip.cpp:107
zmq::tune_tcp_keepalives
int tune_tcp_keepalives(fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
Definition: tcp.cpp:71
zmq::stream_connecter_base_t::timer_event
void timer_event(int id_) ZMQ_OVERRIDE
Definition: stream_connecter_base.cpp:170
io_thread.hpp
len
int len
Definition: php/ext/google/protobuf/map.c:206
ENOBUFS
#define ENOBUFS
Definition: zmq.h:110
tcp.hpp
ZMQ_RECONNECT_STOP_CONN_REFUSED
#define ZMQ_RECONNECT_STOP_CONN_REFUSED
Definition: zmq_draft.h:66
zmq::tune_tcp_socket
int tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:30
zmq::tcp_open_socket
fd_t tcp_open_socket(const char *address_, const options_t &options_, bool local_, bool fallback_to_ipv4_, tcp_address_t *out_tcp_addr_)
Definition: tcp.cpp:332
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
zmq::stream_connecter_base_t::process_term
void process_term(int linger_) ZMQ_OVERRIDE
Definition: stream_connecter_base.cpp:58
session_base.hpp
false
#define false
Definition: cJSON.c:70
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59