ip.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "ip.hpp"
5 #include "err.hpp"
6 #include "macros.hpp"
7 #include "config.hpp"
8 #include "address.hpp"
9 
10 #if !defined ZMQ_HAVE_WINDOWS
11 #include <fcntl.h>
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <sys/stat.h>
15 #include <netdb.h>
16 #include <netinet/in.h>
17 #include <netinet/tcp.h>
18 #include <stdlib.h>
19 #include <unistd.h>
20 
21 #include <vector>
22 #else
23 #include "tcp.hpp"
24 #ifdef ZMQ_HAVE_IPC
25 #include "ipc_address.hpp"
26 // Don't try ipc if it fails once
27 namespace zmq
28 {
29 static bool try_ipc_first = true;
30 }
31 #endif
32 
33 #include <direct.h>
34 
35 #define rmdir rmdir_utf8
36 #define unlink unlink_utf8
37 #endif
38 
39 #if defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
40 #include <ioctl.h>
41 #endif
42 
43 #if defined ZMQ_HAVE_VXWORKS
44 #include <unistd.h>
45 #include <sockLib.h>
46 #include <ioLib.h>
47 #endif
48 
49 #if defined ZMQ_HAVE_EVENTFD
50 #include <sys/eventfd.h>
51 #endif
52 
53 #if defined ZMQ_HAVE_OPENPGM
54 #ifdef ZMQ_HAVE_WINDOWS
55 #define __PGM_WININT_H__
56 #endif
57 
58 #include <pgm/pgm.h>
59 #endif
60 
61 #ifdef __APPLE__
62 #include <TargetConditionals.h>
63 #endif
64 
65 #ifndef ZMQ_HAVE_WINDOWS
66 // Acceptable temporary directory environment variables
67 static const char *tmp_env_vars[] = {
68  "TMPDIR", "TEMPDIR", "TMP",
69  0 // Sentinel
70 };
71 #endif
72 
73 zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_)
74 {
75  int rc;
76 
77  // Setting this option result in sane behaviour when exec() functions
78  // are used. Old sockets are closed and don't block TCP ports etc.
79 #if defined ZMQ_HAVE_SOCK_CLOEXEC
80  type_ |= SOCK_CLOEXEC;
81 #endif
82 
83 #if defined ZMQ_HAVE_WINDOWS && defined WSA_FLAG_NO_HANDLE_INHERIT
84  // if supported, create socket with WSA_FLAG_NO_HANDLE_INHERIT, such that
85  // the race condition in making it non-inheritable later is avoided
86  const fd_t s = WSASocket (domain_, type_, protocol_, NULL, 0,
87  WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT);
88 #else
89  const fd_t s = socket (domain_, type_, protocol_);
90 #endif
91  if (s == retired_fd) {
92 #ifdef ZMQ_HAVE_WINDOWS
93  errno = wsa_error_to_errno (WSAGetLastError ());
94 #endif
95  return retired_fd;
96  }
97 
99 
100  // Socket is not yet connected so EINVAL is not a valid networking error
101  rc = zmq::set_nosigpipe (s);
102  errno_assert (rc == 0);
103 
104  return s;
105 }
106 
108 {
109 #if defined ZMQ_HAVE_WINDOWS
110  u_long nonblock = 1;
111  const int rc = ioctlsocket (s_, FIONBIO, &nonblock);
112  wsa_assert (rc != SOCKET_ERROR);
113 #elif defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
114  int nonblock = 1;
115  int rc = ioctl (s_, FIONBIO, &nonblock);
116  errno_assert (rc != -1);
117 #else
118  int flags = fcntl (s_, F_GETFL, 0);
119  if (flags == -1)
120  flags = 0;
121  int rc = fcntl (s_, F_SETFL, flags | O_NONBLOCK);
122  errno_assert (rc != -1);
123 #endif
124 }
125 
127 {
128  LIBZMQ_UNUSED (s_);
129 
130 #if defined IPV6_V6ONLY && !defined ZMQ_HAVE_OPENBSD \
131  && !defined ZMQ_HAVE_DRAGONFLY
132 #ifdef ZMQ_HAVE_WINDOWS
133  DWORD flag = 0;
134 #else
135  int flag = 0;
136 #endif
137  const int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY,
138  reinterpret_cast<char *> (&flag), sizeof (flag));
139 #ifdef ZMQ_HAVE_WINDOWS
140  wsa_assert (rc != SOCKET_ERROR);
141 #else
142  errno_assert (rc == 0);
143 #endif
144 #endif
145 }
146 
147 int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
148 {
149  struct sockaddr_storage ss;
150 
151  const zmq_socklen_t addrlen =
152  get_socket_address (sockfd_, socket_end_remote, &ss);
153 
154  if (addrlen == 0) {
155 #ifdef ZMQ_HAVE_WINDOWS
156  const int last_error = WSAGetLastError ();
157  wsa_assert (last_error != WSANOTINITIALISED && last_error != WSAEFAULT
158  && last_error != WSAEINPROGRESS
159  && last_error != WSAENOTSOCK);
160 #elif !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
161  errno_assert (errno != EBADF && errno != EFAULT && errno != ENOTSOCK);
162 #else
164 #endif
165  return 0;
166  }
167 
168  char host[NI_MAXHOST];
169  const int rc =
170  getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen, host,
171  sizeof host, NULL, 0, NI_NUMERICHOST);
172  if (rc != 0)
173  return 0;
174 
175  ip_addr_ = host;
176 
177  union
178  {
179  struct sockaddr sa;
180  struct sockaddr_storage sa_stor;
181  } u;
182 
183  u.sa_stor = ss;
184  return static_cast<int> (u.sa.sa_family);
185 }
186 
188 {
189  int rc = setsockopt (s_, IPPROTO_IP, IP_TOS,
190  reinterpret_cast<char *> (&iptos_), sizeof (iptos_));
191 
192 #ifdef ZMQ_HAVE_WINDOWS
193  wsa_assert (rc != SOCKET_ERROR);
194 #else
195  errno_assert (rc == 0);
196 #endif
197 
198  // Windows and Hurd do not support IPV6_TCLASS
199 #if !defined(ZMQ_HAVE_WINDOWS) && defined(IPV6_TCLASS)
200  rc = setsockopt (s_, IPPROTO_IPV6, IPV6_TCLASS,
201  reinterpret_cast<char *> (&iptos_), sizeof (iptos_));
202 
203  // If IPv6 is not enabled ENOPROTOOPT will be returned on Linux and
204  // EINVAL on OSX
205  if (rc == -1) {
206  errno_assert (errno == ENOPROTOOPT || errno == EINVAL);
207  }
208 #endif
209 }
210 
211 void zmq::set_socket_priority (fd_t s_, int priority_)
212 {
213 #ifdef ZMQ_HAVE_SO_PRIORITY
214  int rc =
215  setsockopt (s_, SOL_SOCKET, SO_PRIORITY,
216  reinterpret_cast<char *> (&priority_), sizeof (priority_));
217  errno_assert (rc == 0);
218 #else
219  LIBZMQ_UNUSED (s_);
220  LIBZMQ_UNUSED (priority_);
221 #endif
222 }
223 
225 {
226 #ifdef SO_NOSIGPIPE
227  // Make sure that SIGPIPE signal is not generated when writing to a
228  // connection that was already closed by the peer.
229  // As per POSIX spec, EINVAL will be returned if the socket was valid but
230  // the connection has been reset by the peer. Return an error so that the
231  // socket can be closed and the connection retried if necessary.
232  int set = 1;
233  int rc = setsockopt (s_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
234  if (rc != 0 && errno == EINVAL)
235  return -1;
236  errno_assert (rc == 0);
237 #else
238  LIBZMQ_UNUSED (s_);
239 #endif
240 
241  return 0;
242 }
243 
244 int zmq::bind_to_device (fd_t s_, const std::string &bound_device_)
245 {
246 #ifdef ZMQ_HAVE_SO_BINDTODEVICE
247  int rc = setsockopt (s_, SOL_SOCKET, SO_BINDTODEVICE,
248  bound_device_.c_str (), bound_device_.length ());
249  if (rc != 0) {
251  return -1;
252  }
253  return 0;
254 
255 #else
256  LIBZMQ_UNUSED (s_);
257  LIBZMQ_UNUSED (bound_device_);
258 
259  errno = ENOTSUP;
260  return -1;
261 #endif
262 }
263 
265 {
266 #if defined ZMQ_HAVE_OPENPGM
267 
268  // Init PGM transport. Ensure threading and timer are enabled. Find PGM
269  // protocol ID. Note that if you want to use gettimeofday and sleep for
270  // openPGM timing, set environment variables PGM_TIMER to "GTOD" and
271  // PGM_SLEEP to "USLEEP".
272  pgm_error_t *pgm_error = NULL;
273  const bool ok = pgm_init (&pgm_error);
274  if (ok != TRUE) {
275  // Invalid parameters don't set pgm_error_t
276  zmq_assert (pgm_error != NULL);
277  if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME
278  && (pgm_error->code == PGM_ERROR_FAILED)) {
279  // Failed to access RTC or HPET device.
280  pgm_error_free (pgm_error);
281  errno = EINVAL;
282  return false;
283  }
284 
285  // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
286  zmq_assert (false);
287  }
288 #endif
289 
290 #ifdef ZMQ_HAVE_WINDOWS
291  // Initialise Windows sockets. Note that WSAStartup can be called multiple
292  // times given that WSACleanup will be called for each WSAStartup.
293 
294  const WORD version_requested = MAKEWORD (2, 2);
295  WSADATA wsa_data;
296  const int rc = WSAStartup (version_requested, &wsa_data);
297  zmq_assert (rc == 0);
298  zmq_assert (LOBYTE (wsa_data.wVersion) == 2
299  && HIBYTE (wsa_data.wVersion) == 2);
300 #endif
301 
302  return true;
303 }
304 
306 {
307 #ifdef ZMQ_HAVE_WINDOWS
308  // On Windows, uninitialise socket layer.
309  const int rc = WSACleanup ();
310  wsa_assert (rc != SOCKET_ERROR);
311 #endif
312 
313 #if defined ZMQ_HAVE_OPENPGM
314  // Shut down the OpenPGM library.
315  if (pgm_shutdown () != TRUE)
316  zmq_assert (false);
317 #endif
318 }
319 
320 #if defined ZMQ_HAVE_WINDOWS
321 static void tune_socket (const SOCKET socket_)
322 {
323  BOOL tcp_nodelay = 1;
324  const int rc =
325  setsockopt (socket_, IPPROTO_TCP, TCP_NODELAY,
326  reinterpret_cast<char *> (&tcp_nodelay), sizeof tcp_nodelay);
327  wsa_assert (rc != SOCKET_ERROR);
328 
330 }
331 
332 static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
333 {
334 #if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
335  // Windows CE does not manage security attributes
336  SECURITY_DESCRIPTOR sd;
337  SECURITY_ATTRIBUTES sa;
338  memset (&sd, 0, sizeof sd);
339  memset (&sa, 0, sizeof sa);
340 
341  InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
342  SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
343 
344  sa.nLength = sizeof (SECURITY_ATTRIBUTES);
345  sa.lpSecurityDescriptor = &sd;
346 #endif
347 
348  // This function has to be in a system-wide critical section so that
349  // two instances of the library don't accidentally create signaler
350  // crossing the process boundary.
351  // We'll use named event object to implement the critical section.
352  // Note that if the event object already exists, the CreateEvent requests
353  // EVENT_ALL_ACCESS access right. If this fails, we try to open
354  // the event object asking for SYNCHRONIZE access only.
355  HANDLE sync = NULL;
356 
357  // Create critical section only if using fixed signaler port
358  // Use problematic Event implementation for compatibility if using old port 5905.
359  // Otherwise use Mutex implementation.
360  const int event_signaler_port = 5905;
361 
362  if (zmq::signaler_port == event_signaler_port) {
363 #if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
364  sync =
365  CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
366 #else
367  sync =
368  CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
369 #endif
370  if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
371  sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE,
372  L"Global\\zmq-signaler-port-sync");
373 
374  win_assert (sync != NULL);
375  } else if (zmq::signaler_port != 0) {
376  wchar_t mutex_name[MAX_PATH];
377 #ifdef __MINGW32__
378  _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
380 #else
381  swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
383 #endif
384 
385 #if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
386  sync = CreateMutexW (&sa, FALSE, mutex_name);
387 #else
388  sync = CreateMutexW (NULL, FALSE, mutex_name);
389 #endif
390  if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
391  sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
392 
393  win_assert (sync != NULL);
394  }
395 
396  // Windows has no 'socketpair' function. CreatePipe is no good as pipe
397  // handles cannot be polled on. Here we create the socketpair by hand.
398  *w_ = INVALID_SOCKET;
399  *r_ = INVALID_SOCKET;
400 
401  // Create listening socket.
402  SOCKET listener;
403  listener = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
404  wsa_assert (listener != INVALID_SOCKET);
405 
406  // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
407  BOOL so_reuseaddr = 1;
408  int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
409  reinterpret_cast<char *> (&so_reuseaddr),
410  sizeof so_reuseaddr);
411  wsa_assert (rc != SOCKET_ERROR);
412 
413  tune_socket (listener);
414 
415  // Init sockaddr to signaler port.
416  struct sockaddr_in addr;
417  memset (&addr, 0, sizeof addr);
418  addr.sin_family = AF_INET;
419  addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
420  addr.sin_port = htons (zmq::signaler_port);
421 
422  // Create the writer socket.
423  *w_ = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
424  wsa_assert (*w_ != INVALID_SOCKET);
425 
426  if (sync != NULL) {
427  // Enter the critical section.
428  const DWORD dwrc = WaitForSingleObject (sync, INFINITE);
429  zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
430  }
431 
432  // Bind listening socket to signaler port.
433  rc = bind (listener, reinterpret_cast<const struct sockaddr *> (&addr),
434  sizeof addr);
435 
436  if (rc != SOCKET_ERROR && zmq::signaler_port == 0) {
437  // Retrieve ephemeral port number
438  int addrlen = sizeof addr;
439  rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&addr),
440  &addrlen);
441  }
442 
443  // Listen for incoming connections.
444  if (rc != SOCKET_ERROR) {
445  rc = listen (listener, 1);
446  }
447 
448  // Connect writer to the listener.
449  if (rc != SOCKET_ERROR) {
450  rc = connect (*w_, reinterpret_cast<struct sockaddr *> (&addr),
451  sizeof addr);
452  }
453 
454  // Accept connection from writer.
455  if (rc != SOCKET_ERROR) {
456  // Set TCP_NODELAY on writer socket.
457  tune_socket (*w_);
458 
459  *r_ = accept (listener, NULL, NULL);
460  }
461 
462  // Send/receive large chunk to work around TCP slow start
463  // This code is a workaround for #1608
464  if (*r_ != INVALID_SOCKET) {
465  const size_t dummy_size =
466  1024 * 1024; // 1M to overload default receive buffer
467  unsigned char *dummy =
468  static_cast<unsigned char *> (malloc (dummy_size));
469  wsa_assert (dummy);
470 
471  int still_to_send = static_cast<int> (dummy_size);
472  int still_to_recv = static_cast<int> (dummy_size);
473  while (still_to_send || still_to_recv) {
474  int nbytes;
475  if (still_to_send > 0) {
476  nbytes = ::send (
477  *w_,
478  reinterpret_cast<char *> (dummy + dummy_size - still_to_send),
479  still_to_send, 0);
480  wsa_assert (nbytes != SOCKET_ERROR);
481  still_to_send -= nbytes;
482  }
483  nbytes = ::recv (
484  *r_,
485  reinterpret_cast<char *> (dummy + dummy_size - still_to_recv),
486  still_to_recv, 0);
487  wsa_assert (nbytes != SOCKET_ERROR);
488  still_to_recv -= nbytes;
489  }
490  free (dummy);
491  }
492 
493  // Save errno if error occurred in bind/listen/connect/accept.
494  int saved_errno = 0;
495  if (*r_ == INVALID_SOCKET)
496  saved_errno = WSAGetLastError ();
497 
498  // We don't need the listening socket anymore. Close it.
499  rc = closesocket (listener);
500  wsa_assert (rc != SOCKET_ERROR);
501 
502  if (sync != NULL) {
503  // Exit the critical section.
504  BOOL brc;
505  if (zmq::signaler_port == event_signaler_port)
506  brc = SetEvent (sync);
507  else
508  brc = ReleaseMutex (sync);
509  win_assert (brc != 0);
510 
511  // Release the kernel object
512  brc = CloseHandle (sync);
513  win_assert (brc != 0);
514  }
515 
516  if (*r_ != INVALID_SOCKET) {
518  return 0;
519  }
520  // Cleanup writer if connection failed
521  if (*w_ != INVALID_SOCKET) {
522  rc = closesocket (*w_);
523  wsa_assert (rc != SOCKET_ERROR);
524  *w_ = INVALID_SOCKET;
525  }
526  // Set errno from saved value
527  errno = zmq::wsa_error_to_errno (saved_errno);
528  return -1;
529 }
530 #endif
531 
532 int zmq::make_fdpair (fd_t *r_, fd_t *w_)
533 {
534 #if defined ZMQ_HAVE_EVENTFD
535  int flags = 0;
536 #if defined ZMQ_HAVE_EVENTFD_CLOEXEC
537  // Setting this option result in sane behaviour when exec() functions
538  // are used. Old sockets are closed and don't block TCP ports, avoid
539  // leaks, etc.
540  flags |= EFD_CLOEXEC;
541 #endif
542  fd_t fd = eventfd (0, flags);
543  if (fd == -1) {
544  errno_assert (errno == ENFILE || errno == EMFILE);
545  *w_ = *r_ = -1;
546  return -1;
547  }
548  *w_ = *r_ = fd;
549  return 0;
550 
551 
552 #elif defined ZMQ_HAVE_WINDOWS
553 #ifdef ZMQ_HAVE_IPC
554  ipc_address_t address;
555  std::string dirname, filename;
556  sockaddr_un lcladdr;
557  socklen_t lcladdr_len = sizeof lcladdr;
558  int rc = 0;
559  int saved_errno = 0;
560  SOCKET listener = INVALID_SOCKET;
561 
562  // It appears that a lack of runtime AF_UNIX support
563  // can fail in more than one way.
564  // At least: open_socket can fail or later in bind or even in connect after bind
565  bool ipc_fallback_on_tcpip = true;
566 
567  if (!zmq::try_ipc_first) {
568  // a past ipc attempt failed, skip straight to try_tcpip in the future;
569  goto try_tcpip;
570  }
571 
572  // Create a listening socket.
573  listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
574  if (listener == retired_fd) {
575  // This may happen if the library was built on a system supporting AF_UNIX, but the system running doesn't support it.
576  goto try_tcpip;
577  }
578 
579  rc = create_ipc_wildcard_address (dirname, filename);
580  if (rc != 0) {
581  // This may happen if tmpfile creation fails
582  goto error_closelistener;
583  }
584 
585  // Initialise the address structure.
586  rc = address.resolve (filename.c_str ());
587  if (rc != 0) {
588  goto error_closelistener;
589  }
590 
591  // Bind the socket to the file path.
592  rc = bind (listener, const_cast<sockaddr *> (address.addr ()),
593  address.addrlen ());
594  if (rc != 0) {
595  errno = wsa_error_to_errno (WSAGetLastError ());
596  goto error_closelistener;
597  }
598  // if we got here, ipc should be working,
599  // but there are at least some cases where connect can still fail
600 
601  // Listen for incoming connections.
602  rc = listen (listener, 1);
603  if (rc != 0) {
604  errno = wsa_error_to_errno (WSAGetLastError ());
605  goto error_closelistener;
606  }
607 
608  rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&lcladdr),
609  &lcladdr_len);
610  wsa_assert (rc == 0);
611 
612  // Create the client socket.
613  *w_ = open_socket (AF_UNIX, SOCK_STREAM, 0);
614  if (*w_ == retired_fd) {
615  errno = wsa_error_to_errno (WSAGetLastError ());
616  goto error_closelistener;
617  }
618 
619  // Connect to the remote peer.
620  rc = ::connect (*w_, reinterpret_cast<const struct sockaddr *> (&lcladdr),
621  lcladdr_len);
622  if (rc != 0) {
623  errno = wsa_error_to_errno (WSAGetLastError ());
624  goto error_closeclient;
625  }
626  // if we got here, ipc should be working,
627  // so raise any remaining errors
628  ipc_fallback_on_tcpip = false;
629 
630  *r_ = accept (listener, NULL, NULL);
631  wsa_assert (*r_ != retired_fd);
632 
633  // Close the listener socket, we don't need it anymore.
634  rc = closesocket (listener);
635  wsa_assert (rc == 0);
636 
637  // Cleanup temporary socket file descriptor
638  if (!filename.empty ()) {
639  rc = ::unlink (filename.c_str ());
640  if ((rc == 0) && !dirname.empty ()) {
641  rc = ::rmdir (dirname.c_str ());
642  dirname.clear ();
643  }
644  filename.clear ();
645  }
646 
647  return 0;
648 
649 error_closeclient:
650  saved_errno = errno;
651  rc = closesocket (*w_);
652  wsa_assert (rc == 0);
653  *w_ = retired_fd;
654  errno = saved_errno;
655 
656 error_closelistener:
657  saved_errno = errno;
658  rc = closesocket (listener);
659  wsa_assert (rc == 0);
660 
661  // Cleanup temporary socket file descriptor
662  if (!filename.empty ()) {
663  rc = ::unlink (filename.c_str ());
664  if ((rc == 0) && !dirname.empty ()) {
665  rc = ::rmdir (dirname.c_str ());
666  dirname.clear ();
667  }
668  filename.clear ();
669  }
670 
671  // ipc failed due to lack of AF_UNIX support, fallback on tcpip
672  if (ipc_fallback_on_tcpip) {
673  goto try_tcpip;
674  }
675 
676  errno = saved_errno;
677  return -1;
678 
679 try_tcpip:
680  // try to fallback to TCP/IP
681  rc = make_fdpair_tcpip (r_, w_);
682  if (rc == 0 && zmq::try_ipc_first) {
683  // ipc didn't work but tcp/ip did; skip ipc in the future
684  zmq::try_ipc_first = false;
685  }
686  return rc;
687 #endif // ZMQ_HAVE_IPC
688  return make_fdpair_tcpip (r_, w_);
689 #elif defined ZMQ_HAVE_OPENVMS
690 
691  // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
692  // it does not set the socket options TCP_NODELAY and TCP_NODELACK which
693  // can lead to performance problems.
694  //
695  // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
696  // create the socket pair manually.
697  struct sockaddr_in lcladdr;
698  memset (&lcladdr, 0, sizeof lcladdr);
699  lcladdr.sin_family = AF_INET;
700  lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
701  lcladdr.sin_port = 0;
702 
703  int listener = open_socket (AF_INET, SOCK_STREAM, 0);
704  errno_assert (listener != -1);
705 
706  int on = 1;
707  int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
708  errno_assert (rc != -1);
709 
710  rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
711  errno_assert (rc != -1);
712 
713  rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
714  errno_assert (rc != -1);
715 
716  socklen_t lcladdr_len = sizeof lcladdr;
717 
718  rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
719  errno_assert (rc != -1);
720 
721  rc = listen (listener, 1);
722  errno_assert (rc != -1);
723 
724  *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
725  errno_assert (*w_ != -1);
726 
727  rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
728  errno_assert (rc != -1);
729 
730  rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
731  errno_assert (rc != -1);
732 
733  rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
734  errno_assert (rc != -1);
735 
736  *r_ = accept (listener, NULL, NULL);
737  errno_assert (*r_ != -1);
738 
739  close (listener);
740 
741  return 0;
742 #elif defined ZMQ_HAVE_VXWORKS
743  struct sockaddr_in lcladdr;
744  memset (&lcladdr, 0, sizeof lcladdr);
745  lcladdr.sin_family = AF_INET;
746  lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
747  lcladdr.sin_port = 0;
748 
749  int listener = open_socket (AF_INET, SOCK_STREAM, 0);
750  errno_assert (listener != -1);
751 
752  int on = 1;
753  int rc =
754  setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
755  errno_assert (rc != -1);
756 
757  rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
758  errno_assert (rc != -1);
759 
760  socklen_t lcladdr_len = sizeof lcladdr;
761 
762  rc = getsockname (listener, (struct sockaddr *) &lcladdr,
763  (int *) &lcladdr_len);
764  errno_assert (rc != -1);
765 
766  rc = listen (listener, 1);
767  errno_assert (rc != -1);
768 
769  *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
770  errno_assert (*w_ != -1);
771 
772  rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
773  errno_assert (rc != -1);
774 
775  rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
776  errno_assert (rc != -1);
777 
778  *r_ = accept (listener, NULL, NULL);
779  errno_assert (*r_ != -1);
780 
781  close (listener);
782 
783  return 0;
784 #else
785  // All other implementations support socketpair()
786  int sv[2];
787  int type = SOCK_STREAM;
788  // Setting this option result in sane behaviour when exec() functions
789  // are used. Old sockets are closed and don't block TCP ports, avoid
790  // leaks, etc.
791 #if defined ZMQ_HAVE_SOCK_CLOEXEC
792  type |= SOCK_CLOEXEC;
793 #endif
794  int rc = socketpair (AF_UNIX, type, 0, sv);
795  if (rc == -1) {
796  errno_assert (errno == ENFILE || errno == EMFILE);
797  *w_ = *r_ = -1;
798  return -1;
799  } else {
802 
803  *w_ = sv[0];
804  *r_ = sv[1];
805  return 0;
806  }
807 #endif
808 }
809 
811 {
812 #if defined ZMQ_HAVE_WINDOWS && !defined _WIN32_WCE \
813  && !defined ZMQ_HAVE_WINDOWS_UWP
814  // On Windows, preventing sockets to be inherited by child processes.
815  const BOOL brc = SetHandleInformation (reinterpret_cast<HANDLE> (sock_),
816  HANDLE_FLAG_INHERIT, 0);
817  win_assert (brc);
818 #elif (!defined ZMQ_HAVE_SOCK_CLOEXEC || !defined HAVE_ACCEPT4) \
819  && defined FD_CLOEXEC
820  // If there 's no SOCK_CLOEXEC, let's try the second best option.
821  // Race condition can cause socket not to be closed (if fork happens
822  // between accept and this point).
823  const int rc = fcntl (sock_, F_SETFD, FD_CLOEXEC);
824  errno_assert (rc != -1);
825 #else
826  LIBZMQ_UNUSED (sock_);
827 #endif
828 }
829 
831 {
832 #ifdef ZMQ_HAVE_WINDOWS
833  if (rc_ != SOCKET_ERROR) {
834  return;
835  }
836 #else
837  if (rc_ != -1) {
838  return;
839  }
840 #endif
841 
842  // Check whether an error occurred
843  int err = 0;
844 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
845  int len = sizeof err;
846 #else
847  socklen_t len = sizeof err;
848 #endif
849 
850  const int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
851  reinterpret_cast<char *> (&err), &len);
852 
853  // Assert if the error was caused by 0MQ bug.
854  // Networking problems are OK. No need to assert.
855 #ifdef ZMQ_HAVE_WINDOWS
856  zmq_assert (rc == 0);
857  if (err != 0) {
858  wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET
859  || err == WSAECONNABORTED || err == WSAEINTR
860  || err == WSAETIMEDOUT || err == WSAEHOSTUNREACH
861  || err == WSAENETUNREACH || err == WSAENETDOWN
862  || err == WSAENETRESET || err == WSAEACCES
863  || err == WSAEINVAL || err == WSAEADDRINUSE);
864  }
865 #else
866  // Following code should handle both Berkeley-derived socket
867  // implementations and Solaris.
868  if (rc == -1)
869  err = errno;
870  if (err != 0) {
871  errno = err;
873  || errno == ECONNABORTED || errno == EINTR
874  || errno == ETIMEDOUT || errno == EHOSTUNREACH
875  || errno == ENETUNREACH || errno == ENETDOWN
876  || errno == ENETRESET || errno == EINVAL);
877  }
878 #endif
879 }
880 
881 #ifdef ZMQ_HAVE_IPC
882 
883 #if defined ZMQ_HAVE_WINDOWS
884 char *widechar_to_utf8 (const wchar_t *widestring)
885 {
886  int nch, n;
887  char *utf8 = 0;
888  nch = WideCharToMultiByte (CP_UTF8, 0, widestring, -1, 0, 0, NULL, NULL);
889  if (nch > 0) {
890  utf8 = (char *) malloc ((nch + 1) * sizeof (char));
891  n = WideCharToMultiByte (CP_UTF8, 0, widestring, -1, utf8, nch, NULL,
892  NULL);
893  utf8[nch] = 0;
894  }
895  return utf8;
896 }
897 #endif
898 
899 int zmq::create_ipc_wildcard_address (std::string &path_, std::string &file_)
900 {
901 #if defined ZMQ_HAVE_WINDOWS
902  wchar_t buffer[MAX_PATH];
903 
904  {
905  const errno_t rc = _wtmpnam_s (buffer);
906  errno_assert (rc == 0);
907  }
908 
909  // TODO or use CreateDirectoryA and specify permissions?
910  const int rc = _wmkdir (buffer);
911  if (rc != 0) {
912  return -1;
913  }
914 
915  char *tmp = widechar_to_utf8 (buffer);
916  if (tmp == 0) {
917  return -1;
918  }
919 
920  path_.assign (tmp);
921  file_ = path_ + "/socket";
922 
923  free (tmp);
924 #else
925  std::string tmp_path;
926 
927  // If TMPDIR, TEMPDIR, or TMP are available and are directories, create
928  // the socket directory there.
929  const char **tmp_env = tmp_env_vars;
930  while (tmp_path.empty () && *tmp_env != 0) {
931  const char *const tmpdir = getenv (*tmp_env);
932  struct stat statbuf;
933 
934  // Confirm it is actually a directory before trying to use
935  if (tmpdir != 0 && ::stat (tmpdir, &statbuf) == 0
936  && S_ISDIR (statbuf.st_mode)) {
937  tmp_path.assign (tmpdir);
938  if (*(tmp_path.rbegin ()) != '/') {
939  tmp_path.push_back ('/');
940  }
941  }
942 
943  // Try the next environment variable
944  ++tmp_env;
945  }
946 
947  // Append a directory name
948  tmp_path.append ("tmpXXXXXX");
949 
950  // We need room for tmp_path + trailing NUL
951  std::vector<char> buffer (tmp_path.length () + 1);
952  memcpy (&buffer[0], tmp_path.c_str (), tmp_path.length () + 1);
953 
954 #if defined HAVE_MKDTEMP
955  // Create the directory. POSIX requires that mkdtemp() creates the
956  // directory with 0700 permissions, meaning the only possible race
957  // with socket creation could be the same user. However, since
958  // each socket is created in a directory created by mkdtemp(), and
959  // mkdtemp() guarantees a unique directory name, there will be no
960  // collision.
961  if (mkdtemp (&buffer[0]) == 0) {
962  return -1;
963  }
964 
965  path_.assign (&buffer[0]);
966  file_ = path_ + "/socket";
967 #else
968  LIBZMQ_UNUSED (path_);
969  int fd = mkstemp (&buffer[0]);
970  if (fd == -1)
971  return -1;
972  ::close (fd);
973 
974  file_.assign (&buffer[0]);
975 #endif
976 #endif
977 
978  return 0;
979 }
980 #endif
zmq::bind_to_device
int bind_to_device(fd_t s_, const std::string &bound_device_)
Definition: ip.cpp:244
closesocket
#define closesocket
Definition: unittest_poller.cpp:13
ip.hpp
ENOTSUP
#define ENOTSUP
Definition: zmq.h:104
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
EINVAL
#define EINVAL
Definition: errno.hpp:25
ipc_address.hpp
s
XmlRpcServer s
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
zmq::set_nosigpipe
int set_nosigpipe(fd_t s_)
Definition: ip.cpp:224
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
ECONNREFUSED
#define ECONNREFUSED
Definition: zmq.h:122
address
const char * address
Definition: builds/zos/test_fork.cpp:6
dummy
ReturnVal dummy
Definition: register_benchmark_test.cc:68
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
zmq::socket_end_remote
@ socket_end_remote
Definition: address.hpp:113
flags
GLbitfield flags
Definition: glcorearb.h:3585
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
ok
ROSCPP_DECL bool ok()
retired_fd
@ retired_fd
Definition: libzmq/tests/testutil.hpp:117
zmq::assert_success_or_recoverable
void assert_success_or_recoverable(fd_t s_, int rc_)
Definition: ip.cpp:830
zmq
Definition: zmq.hpp:229
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
NI_MAXHOST
#define NI_MAXHOST
Definition: vxworks/platform.hpp:305
ECONNABORTED
#define ECONNABORTED
Definition: zmq.h:140
zmq::shutdown_network
void shutdown_network()
Definition: ip.cpp:305
macros.hpp
buffer
GLuint buffer
Definition: glcorearb.h:2939
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
zmq::set_socket_priority
void set_socket_priority(fd_t s_, int priority_)
Definition: ip.cpp:211
zmq::tcp_tune_loopback_fast_path
void tcp_tune_loopback_fast_path(fd_t socket_)
Definition: tcp.cpp:293
zmq::get_socket_address
zmq_socklen_t get_socket_address(fd_t fd_, socket_end_t socket_end_, sockaddr_storage *ss_)
Definition: address.cpp:102
s_
std::string s_
Definition: gmock-matchers_test.cc:4128
zmq::make_fdpair
int make_fdpair(fd_t *r_, fd_t *w_)
Definition: ip.cpp:532
EBADF
#define EBADF
Definition: errno.hpp:12
buffer::length
size_t length
Definition: buffer_processor.h:45
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
ENETRESET
#define ENETRESET
Definition: zmq.h:155
ENETDOWN
#define ENETDOWN
Definition: zmq.h:113
ENOTSOCK
#define ENOTSOCK
Definition: zmq.h:128
tmp_env_vars
static const char * tmp_env_vars[]
Definition: ip.cpp:67
buffer
Definition: buffer_processor.h:43
address.hpp
zmq::unblock_socket
void unblock_socket(fd_t s_)
Definition: ip.cpp:107
zmq::open_socket
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:73
S_ISDIR
#define S_ISDIR(mode)
Definition: dirent.h:190
n
GLdouble n
Definition: glcorearb.h:4153
type
GLenum type
Definition: glcorearb.h:2695
len
int len
Definition: php/ext/google/protobuf/map.c:206
zmq::initialize_network
bool initialize_network()
Definition: ip.cpp:264
tcp.hpp
zmq::get_peer_ip_address
int get_peer_ip_address(fd_t sockfd_, std::string &ip_addr_)
Definition: ip.cpp:147
ECONNRESET
#define ECONNRESET
Definition: zmq.h:143
cpp.gmock_class.set
set
Definition: gmock_class.py:44
zmq::zmq_socklen_t
socklen_t zmq_socklen_t
Definition: address.hpp:107
err.hpp
zmq::set_ip_type_of_service
void set_ip_type_of_service(fd_t s_, int iptos_)
Definition: ip.cpp:187
HANDLE
void * HANDLE
Definition: wepoll.c:70
EHOSTUNREACH
#define EHOSTUNREACH
Definition: zmq.h:152
ENETUNREACH
#define ENETUNREACH
Definition: zmq.h:137
ETIMEDOUT
#define ETIMEDOUT
Definition: zmq.h:149
zmq::enable_ipv4_mapping
void enable_ipv4_mapping(fd_t s_)
Definition: ip.cpp:126
EFAULT
#define EFAULT
Definition: errno.hpp:17
zmq::make_socket_noninheritable
void make_socket_noninheritable(fd_t sock_)
Definition: ip.cpp:810
SOCKET
uintptr_t SOCKET
Definition: wepoll.c:71
EMFILE
#define EMFILE
Definition: errno.hpp:27
file_
FileDescriptorProto * file_
Definition: annotation_test_util.cc:68
zmq::signaler_port
@ signaler_port
Definition: config.hpp:58


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:54