tcp.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #include "ip.hpp"
6 #include "tcp.hpp"
7 #include "err.hpp"
8 #include "options.hpp"
9 
10 #if !defined ZMQ_HAVE_WINDOWS
11 #include <fcntl.h>
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <netinet/in.h>
15 #include <netinet/tcp.h>
16 #include <unistd.h>
17 #ifdef ZMQ_HAVE_VXWORKS
18 #include <sockLib.h>
19 #endif
20 #endif
21 
22 #if defined ZMQ_HAVE_OPENVMS
23 #include <ioctl.h>
24 #endif
25 
26 #ifdef __APPLE__
27 #include <TargetConditionals.h>
28 #endif
29 
31 {
32  // Disable Nagle's algorithm. We are doing data batching on 0MQ level,
33  // so using Nagle wouldn't improve throughput in anyway, but it would
34  // hurt latency.
35  int nodelay = 1;
36  const int rc =
37  setsockopt (s_, IPPROTO_TCP, TCP_NODELAY,
38  reinterpret_cast<char *> (&nodelay), sizeof (int));
40  if (rc != 0)
41  return rc;
42 
43 #ifdef ZMQ_HAVE_OPENVMS
44  // Disable delayed acknowledgements as they hurt latency significantly.
45  int nodelack = 1;
46  rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack,
47  sizeof (int));
49 #endif
50  return rc;
51 }
52 
53 int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
54 {
55  const int rc =
56  setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
57  reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
58  assert_success_or_recoverable (sockfd_, rc);
59  return rc;
60 }
61 
62 int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
63 {
64  const int rc =
65  setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
66  reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
67  assert_success_or_recoverable (sockfd_, rc);
68  return rc;
69 }
70 
72  int keepalive_,
73  int keepalive_cnt_,
74  int keepalive_idle_,
75  int keepalive_intvl_)
76 {
77  // These options are used only under certain #ifdefs below.
78  LIBZMQ_UNUSED (keepalive_);
79  LIBZMQ_UNUSED (keepalive_cnt_);
80  LIBZMQ_UNUSED (keepalive_idle_);
81  LIBZMQ_UNUSED (keepalive_intvl_);
82 
83  // If none of the #ifdefs apply, then s_ is unused.
84  LIBZMQ_UNUSED (s_);
85 
86  // Tuning TCP keep-alives if platform allows it
87  // All values = -1 means skip and leave it for OS
88 #ifdef ZMQ_HAVE_WINDOWS
89  if (keepalive_ != -1) {
90  tcp_keepalive keepalive_opts;
91  keepalive_opts.onoff = keepalive_;
92  keepalive_opts.keepalivetime =
93  keepalive_idle_ != -1 ? keepalive_idle_ * 1000 : 7200000;
94  keepalive_opts.keepaliveinterval =
95  keepalive_intvl_ != -1 ? keepalive_intvl_ * 1000 : 1000;
96  DWORD num_bytes_returned;
97  const int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
98  sizeof (keepalive_opts), NULL, 0,
99  &num_bytes_returned, NULL, NULL);
101  if (rc == SOCKET_ERROR)
102  return rc;
103  }
104 #else
105 #ifdef ZMQ_HAVE_SO_KEEPALIVE
106  if (keepalive_ != -1) {
107  int rc =
108  setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
109  reinterpret_cast<char *> (&keepalive_), sizeof (int));
111  if (rc != 0)
112  return rc;
113 
114 #ifdef ZMQ_HAVE_TCP_KEEPCNT
115  if (keepalive_cnt_ != -1) {
116  int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_,
117  sizeof (int));
119  if (rc != 0)
120  return rc;
121  }
122 #endif // ZMQ_HAVE_TCP_KEEPCNT
123 
124 #ifdef ZMQ_HAVE_TCP_KEEPIDLE
125  if (keepalive_idle_ != -1) {
126  int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
127  &keepalive_idle_, sizeof (int));
129  if (rc != 0)
130  return rc;
131  }
132 #else // ZMQ_HAVE_TCP_KEEPIDLE
133 #ifdef ZMQ_HAVE_TCP_KEEPALIVE
134  if (keepalive_idle_ != -1) {
135  int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
136  &keepalive_idle_, sizeof (int));
138  if (rc != 0)
139  return rc;
140  }
141 #endif // ZMQ_HAVE_TCP_KEEPALIVE
142 #endif // ZMQ_HAVE_TCP_KEEPIDLE
143 
144 #ifdef ZMQ_HAVE_TCP_KEEPINTVL
145  if (keepalive_intvl_ != -1) {
146  int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
147  &keepalive_intvl_, sizeof (int));
149  if (rc != 0)
150  return rc;
151  }
152 #endif // ZMQ_HAVE_TCP_KEEPINTVL
153  }
154 #endif // ZMQ_HAVE_SO_KEEPALIVE
155 #endif // ZMQ_HAVE_WINDOWS
156 
157  return 0;
158 }
159 
160 int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
161 {
162  if (timeout_ <= 0)
163  return 0;
164 
165  LIBZMQ_UNUSED (sockfd_);
166 
167 #if defined(ZMQ_HAVE_WINDOWS) && defined(TCP_MAXRT)
168  // msdn says it's supported in >= Vista, >= Windows Server 2003
169  timeout_ /= 1000; // in seconds
170  const int rc =
171  setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT,
172  reinterpret_cast<char *> (&timeout_), sizeof (timeout_));
173  assert_success_or_recoverable (sockfd_, rc);
174  return rc;
175 // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
176 #elif defined(TCP_USER_TIMEOUT)
177  int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
178  sizeof (timeout_));
179  assert_success_or_recoverable (sockfd_, rc);
180  return rc;
181 #else
182  return 0;
183 #endif
184 }
185 
186 int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
187 {
188 #ifdef ZMQ_HAVE_WINDOWS
189 
190  const int nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0);
191 
192  // If not a single byte can be written to the socket in non-blocking mode
193  // we'll get an error (this may happen during the speculative write).
194  const int last_error = WSAGetLastError ();
195  if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK)
196  return 0;
197 
198  // Signalise peer failure.
199  if (nbytes == SOCKET_ERROR
200  && (last_error == WSAENETDOWN || last_error == WSAENETRESET
201  || last_error == WSAEHOSTUNREACH || last_error == WSAECONNABORTED
202  || last_error == WSAETIMEDOUT || last_error == WSAECONNRESET))
203  return -1;
204 
205  // Circumvent a Windows bug:
206  // See https://support.microsoft.com/en-us/kb/201213
207  // See https://zeromq.jira.com/browse/LIBZMQ-195
208  if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS)
209  return 0;
210 
211  wsa_assert (nbytes != SOCKET_ERROR);
212  return nbytes;
213 
214 #else
215  ssize_t nbytes = send (s_, static_cast<const char *> (data_), size_, 0);
216 
217  // Several errors are OK. When speculative write is being done we may not
218  // be able to write a single byte from the socket. Also, SIGSTOP issued
219  // by a debugging tool can result in EINTR error.
220  if (nbytes == -1
221  && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
222  return 0;
223 
224  // Signalise peer failure.
225  if (nbytes == -1) {
226 #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
227  errno_assert (errno != EACCES && errno != EBADF && errno != EDESTADDRREQ
228  && errno != EFAULT && errno != EISCONN
229  && errno != EMSGSIZE && errno != ENOMEM
230  && errno != ENOTSOCK && errno != EOPNOTSUPP);
231 #else
232  errno_assert (errno != EACCES && errno != EDESTADDRREQ
233  && errno != EFAULT && errno != EISCONN
234  && errno != EMSGSIZE && errno != ENOMEM
235  && errno != ENOTSOCK && errno != EOPNOTSUPP);
236 #endif
237  return -1;
238  }
239 
240  return static_cast<int> (nbytes);
241 
242 #endif
243 }
244 
245 int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
246 {
247 #ifdef ZMQ_HAVE_WINDOWS
248 
249  const int rc =
250  recv (s_, static_cast<char *> (data_), static_cast<int> (size_), 0);
251 
252  // If not a single byte can be read from the socket in non-blocking mode
253  // we'll get an error (this may happen during the speculative read).
254  if (rc == SOCKET_ERROR) {
255  const int last_error = WSAGetLastError ();
256  if (last_error == WSAEWOULDBLOCK) {
257  errno = EAGAIN;
258  } else {
259  wsa_assert (
260  last_error == WSAENETDOWN || last_error == WSAENETRESET
261  || last_error == WSAECONNABORTED || last_error == WSAETIMEDOUT
262  || last_error == WSAECONNRESET || last_error == WSAECONNREFUSED
263  || last_error == WSAENOTCONN || last_error == WSAENOBUFS);
264  errno = wsa_error_to_errno (last_error);
265  }
266  }
267 
268  return rc == SOCKET_ERROR ? -1 : rc;
269 
270 #else
271 
272  const ssize_t rc = recv (s_, static_cast<char *> (data_), size_, 0);
273 
274  // Several errors are OK. When speculative read is being done we may not
275  // be able to read a single byte from the socket. Also, SIGSTOP issued
276  // by a debugging tool can result in EINTR error.
277  if (rc == -1) {
278 #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
279  errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM
280  && errno != ENOTSOCK);
281 #else
282  errno_assert (errno != EFAULT && errno != ENOMEM && errno != ENOTSOCK);
283 #endif
284  if (errno == EWOULDBLOCK || errno == EINTR)
285  errno = EAGAIN;
286  }
287 
288  return static_cast<int> (rc);
289 
290 #endif
291 }
292 
294 {
295 #if defined ZMQ_HAVE_WINDOWS && defined SIO_LOOPBACK_FAST_PATH
296  int sio_loopback_fastpath = 1;
297  DWORD number_of_bytes_returned = 0;
298 
299  const int rc = WSAIoctl (
300  socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath,
301  sizeof sio_loopback_fastpath, NULL, 0, &number_of_bytes_returned, 0, 0);
302 
303  if (SOCKET_ERROR == rc) {
304  const DWORD last_error = ::WSAGetLastError ();
305 
306  if (WSAEOPNOTSUPP == last_error) {
307  // This system is not Windows 8 or Server 2012, and the call is not supported.
308  } else {
309  wsa_assert (false);
310  }
311  }
312 #else
313  LIBZMQ_UNUSED (socket_);
314 #endif
315 }
316 
317 void zmq::tune_tcp_busy_poll (fd_t socket_, int busy_poll_)
318 {
319 #if defined(ZMQ_HAVE_BUSY_POLL)
320  if (busy_poll_ > 0) {
321  const int rc =
322  setsockopt (socket_, SOL_SOCKET, SO_BUSY_POLL,
323  reinterpret_cast<char *> (&busy_poll_), sizeof (int));
324  assert_success_or_recoverable (socket_, rc);
325  }
326 #else
327  LIBZMQ_UNUSED (socket_);
328  LIBZMQ_UNUSED (busy_poll_);
329 #endif
330 }
331 
332 zmq::fd_t zmq::tcp_open_socket (const char *address_,
333  const zmq::options_t &options_,
334  bool local_,
335  bool fallback_to_ipv4_,
336  zmq::tcp_address_t *out_tcp_addr_)
337 {
338  // Convert the textual address into address structure.
339  int rc = out_tcp_addr_->resolve (address_, local_, options_.ipv6);
340  if (rc != 0)
341  return retired_fd;
342 
343  // Create the socket.
344  fd_t s = open_socket (out_tcp_addr_->family (), SOCK_STREAM, IPPROTO_TCP);
345 
346  // IPv6 address family not supported, try automatic downgrade to IPv4.
347  if (s == retired_fd && fallback_to_ipv4_
348  && out_tcp_addr_->family () == AF_INET6 && errno == EAFNOSUPPORT
349  && options_.ipv6) {
350  rc = out_tcp_addr_->resolve (address_, local_, false);
351  if (rc != 0) {
352  return retired_fd;
353  }
354  s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
355  }
356 
357  if (s == retired_fd) {
358  return retired_fd;
359  }
360 
361  // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
362  // Switch it on in such cases.
363  if (out_tcp_addr_->family () == AF_INET6)
365 
366  // Set the IP Type-Of-Service priority for this socket
367  if (options_.tos != 0)
369 
370  // Set the protocol-defined priority for this socket
371  if (options_.priority != 0)
372  set_socket_priority (s, options_.priority);
373 
374  // Set the socket to loopback fastpath if configured.
375  if (options_.loopback_fastpath)
377 
378  // Bind the socket to a device if applicable
379  if (!options_.bound_device.empty ())
380  if (bind_to_device (s, options_.bound_device) == -1)
381  goto setsockopt_error;
382 
383  // Set the socket buffer limits for the underlying socket.
384  if (options_.sndbuf >= 0)
385  set_tcp_send_buffer (s, options_.sndbuf);
386  if (options_.rcvbuf >= 0)
388 
389  // This option removes several delays caused by scheduling, interrupts and context switching.
390  if (options_.busy_poll)
391  tune_tcp_busy_poll (s, options_.busy_poll);
392  return s;
393 
394 setsockopt_error:
395 #ifdef ZMQ_HAVE_WINDOWS
396  rc = closesocket (s);
397  wsa_assert (rc != SOCKET_ERROR);
398 #else
399  rc = ::close (s);
400  errno_assert (rc == 0);
401 #endif
402  return retired_fd;
403 }
zmq::bind_to_device
int bind_to_device(fd_t s_, const std::string &bound_device_)
Definition: ip.cpp:244
closesocket
#define closesocket
Definition: unittest_poller.cpp:13
ip.hpp
EAFNOSUPPORT
#define EAFNOSUPPORT
Definition: zmq.h:134
data_
StringPiece data_
Definition: bytestream_unittest.cc:60
zmq::set_tcp_receive_buffer
int set_tcp_receive_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:62
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::options_t
Definition: options.hpp:34
EINTR
#define EINTR
Definition: errno.hpp:7
s
XmlRpcServer s
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
errno
int errno
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
retired_fd
@ retired_fd
Definition: libzmq/tests/testutil.hpp:117
zmq::assert_success_or_recoverable
void assert_success_or_recoverable(fd_t s_, int rc_)
Definition: ip.cpp:830
zmq::tcp_address_t
Definition: tcp_address.hpp:15
zmq::set_tcp_send_buffer
int set_tcp_send_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:53
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
zmq::tcp_address_t::family
sa_family_t family() const
Definition: tcp_address.cpp:170
zmq::set_socket_priority
void set_socket_priority(fd_t s_, int priority_)
Definition: ip.cpp:211
zmq::tcp_tune_loopback_fast_path
void tcp_tune_loopback_fast_path(fd_t socket_)
Definition: tcp.cpp:293
zmq::tcp_address_t::resolve
int resolve(const char *name_, bool local_, bool ipv6_)
Definition: tcp_address.cpp:46
s_
std::string s_
Definition: gmock-matchers_test.cc:4128
zmq::tune_tcp_maxrt
int tune_tcp_maxrt(fd_t sockfd_, int timeout_)
Definition: tcp.cpp:160
EBADF
#define EBADF
Definition: errno.hpp:12
ENOTSOCK
#define ENOTSOCK
Definition: zmq.h:128
zmq::open_socket
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:73
EACCES
#define EACCES
Definition: errno.hpp:16
zmq::tune_tcp_keepalives
int tune_tcp_keepalives(fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
Definition: tcp.cpp:71
EMSGSIZE
#define EMSGSIZE
Definition: zmq.h:131
options.hpp
tcp.hpp
zmq::tune_tcp_socket
int tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:30
zmq::tcp_open_socket
fd_t tcp_open_socket(const char *address_, const options_t &options_, bool local_, bool fallback_to_ipv4_, tcp_address_t *out_tcp_addr_)
Definition: tcp.cpp:332
zmq::tcp_write
int tcp_write(fd_t s_, const void *data_, size_t size_)
Definition: tcp.cpp:186
err.hpp
zmq::set_ip_type_of_service
void set_ip_type_of_service(fd_t s_, int iptos_)
Definition: ip.cpp:187
zmq::tune_tcp_busy_poll
void tune_tcp_busy_poll(fd_t socket_, int busy_poll_)
Definition: tcp.cpp:317
zmq::tcp_read
int tcp_read(fd_t s_, void *data_, size_t size_)
Definition: tcp.cpp:245
zmq::enable_ipv4_mapping
void enable_ipv4_mapping(fd_t s_)
Definition: ip.cpp:126
EFAULT
#define EFAULT
Definition: errno.hpp:17
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59