tcp_listener.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <new>
5 
6 #include <string>
7 #include <stdio.h>
8 
9 #include "tcp_listener.hpp"
10 #include "io_thread.hpp"
11 #include "config.hpp"
12 #include "err.hpp"
13 #include "ip.hpp"
14 #include "tcp.hpp"
15 #include "socket_base.hpp"
16 #include "address.hpp"
17 
18 #ifndef ZMQ_HAVE_WINDOWS
19 #include <unistd.h>
20 #include <sys/socket.h>
21 #include <arpa/inet.h>
22 #include <netinet/tcp.h>
23 #include <netinet/in.h>
24 #include <netdb.h>
25 #include <fcntl.h>
26 #ifdef ZMQ_HAVE_VXWORKS
27 #include <sockLib.h>
28 #endif
29 #endif
30 
31 #ifdef ZMQ_HAVE_OPENVMS
32 #include <ioctl.h>
33 #endif
34 
35 zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
36  socket_base_t *socket_,
37  const options_t &options_) :
38  stream_listener_base_t (io_thread_, socket_, options_)
39 {
40 }
41 
42 void zmq::tcp_listener_t::in_event ()
43 {
44  const fd_t fd = accept ();
45 
46  // If connection was reset by the peer in the meantime, just ignore it.
47  // TODO: Handle specific errors like ENFILE/EMFILE etc.
48  if (fd == retired_fd) {
49  _socket->event_accept_failed (
51  return;
52  }
53 
54  int rc = tune_tcp_socket (fd);
55  rc = rc
57  fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
58  options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
59  rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
60  if (rc != 0) {
61  _socket->event_accept_failed (
63  return;
64  }
65 
66  // Create the engine object for this connection.
67  create_engine (fd);
68 }
69 
72  socket_end_t socket_end_) const
73 {
74  return zmq::get_socket_name<tcp_address_t> (fd_, socket_end_);
75 }
76 
77 int zmq::tcp_listener_t::create_socket (const char *addr_)
78 {
79  _s = tcp_open_socket (addr_, options, true, true, &_address);
80  if (_s == retired_fd) {
81  return -1;
82  }
83 
84  // TODO why is this only done for the listener?
86 
87  // Allow reusing of the address.
88  int flag = 1;
89  int rc;
90 #ifdef ZMQ_HAVE_WINDOWS
91  // TODO this was changed for Windows from SO_REUSEADDRE to
92  // SE_EXCLUSIVEADDRUSE by 0ab65324195ad70205514d465b03d851a6de051c,
93  // so the comment above is no longer correct; also, now the settings are
94  // different between listener and connecter with a src address.
95  // is this intentional?
96  rc = setsockopt (_s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
97  reinterpret_cast<const char *> (&flag), sizeof (int));
98  wsa_assert (rc != SOCKET_ERROR);
99 #elif defined ZMQ_HAVE_VXWORKS
100  rc =
101  setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof (int));
102  errno_assert (rc == 0);
103 #else
104  rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
105  errno_assert (rc == 0);
106 #endif
107 
108  // Bind the socket to the network interface and port.
109 #if defined ZMQ_HAVE_VXWORKS
110  rc = bind (_s, (sockaddr *) _address.addr (), _address.addrlen ());
111 #else
112  rc = bind (_s, _address.addr (), _address.addrlen ());
113 #endif
114 #ifdef ZMQ_HAVE_WINDOWS
115  if (rc == SOCKET_ERROR) {
116  errno = wsa_error_to_errno (WSAGetLastError ());
117  goto error;
118  }
119 #else
120  if (rc != 0)
121  goto error;
122 #endif
123 
124  // Listen for incoming connections.
125  rc = listen (_s, options.backlog);
126 #ifdef ZMQ_HAVE_WINDOWS
127  if (rc == SOCKET_ERROR) {
128  errno = wsa_error_to_errno (WSAGetLastError ());
129  goto error;
130  }
131 #else
132  if (rc != 0)
133  goto error;
134 #endif
135 
136  return 0;
137 
138 error:
139  const int err = errno;
140  close ();
141  errno = err;
142  return -1;
143 }
144 
145 int zmq::tcp_listener_t::set_local_address (const char *addr_)
146 {
147  if (options.use_fd != -1) {
148  // in this case, the addr_ passed is not used and ignored, since the
149  // socket was already created by the application
150  _s = options.use_fd;
151  } else {
152  if (create_socket (addr_) == -1)
153  return -1;
154  }
155 
156  _endpoint = get_socket_name (_s, socket_end_local);
157 
158  _socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
159  _s);
160  return 0;
161 }
162 
163 zmq::fd_t zmq::tcp_listener_t::accept ()
164 {
165  // The situation where connection cannot be accepted due to insufficient
166  // resources is considered valid and treated by ignoring the connection.
167  // Accept one connection and deal with different failure modes.
168  zmq_assert (_s != retired_fd);
169 
170  struct sockaddr_storage ss;
171  memset (&ss, 0, sizeof (ss));
172 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
173  int ss_len = sizeof (ss);
174 #else
175  socklen_t ss_len = sizeof (ss);
176 #endif
177 #if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4
178  fd_t sock = ::accept4 (_s, reinterpret_cast<struct sockaddr *> (&ss),
179  &ss_len, SOCK_CLOEXEC);
180 #else
181  const fd_t sock =
182  ::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
183 #endif
184 
185  if (sock == retired_fd) {
186 #if defined ZMQ_HAVE_WINDOWS
187  const int last_error = WSAGetLastError ();
188  wsa_assert (last_error == WSAEWOULDBLOCK || last_error == WSAECONNRESET
189  || last_error == WSAEMFILE || last_error == WSAENOBUFS);
190 #elif defined ZMQ_HAVE_ANDROID
191  errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR
192  || errno == ECONNABORTED || errno == EPROTO
193  || errno == ENOBUFS || errno == ENOMEM || errno == EMFILE
194  || errno == ENFILE || errno == EINVAL);
195 #else
196  errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR
197  || errno == ECONNABORTED || errno == EPROTO
198  || errno == ENOBUFS || errno == ENOMEM || errno == EMFILE
199  || errno == ENFILE);
200 #endif
201  return retired_fd;
202  }
203 
205 
206  if (!options.tcp_accept_filters.empty ()) {
207  bool matched = false;
208  for (options_t::tcp_accept_filters_t::size_type
209  i = 0,
210  size = options.tcp_accept_filters.size ();
211  i != size; ++i) {
212  if (options.tcp_accept_filters[i].match_address (
213  reinterpret_cast<struct sockaddr *> (&ss), ss_len)) {
214  matched = true;
215  break;
216  }
217  }
218  if (!matched) {
219 #ifdef ZMQ_HAVE_WINDOWS
220  const int rc = closesocket (sock);
221  wsa_assert (rc != SOCKET_ERROR);
222 #else
223  int rc = ::close (sock);
224  errno_assert (rc == 0);
225 #endif
226  return retired_fd;
227  }
228  }
229 
230  if (zmq::set_nosigpipe (sock)) {
231 #ifdef ZMQ_HAVE_WINDOWS
232  const int rc = closesocket (sock);
233  wsa_assert (rc != SOCKET_ERROR);
234 #else
235  int rc = ::close (sock);
236  errno_assert (rc == 0);
237 #endif
238  return retired_fd;
239  }
240 
241  // Set the IP Type-Of-Service priority for this client socket
242  if (options.tos != 0)
244 
245  // Set the protocol-defined priority for this client socket
246  if (options.priority != 0)
247  set_socket_priority (sock, options.priority);
248 
249  return sock;
250 }
zmq::get_socket_name
std::string get_socket_name(fd_t fd_, socket_end_t socket_end_)
Definition: address.hpp:120
closesocket
#define closesocket
Definition: unittest_poller.cpp:13
ip.hpp
zmq::socket_end_local
@ socket_end_local
Definition: address.hpp:112
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
EINVAL
#define EINVAL
Definition: errno.hpp:25
sock
void * sock
Definition: test_connect_resolve.cpp:9
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
zmq::set_nosigpipe
int set_nosigpipe(fd_t s_)
Definition: ip.cpp:224
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
error
Definition: cJSON.c:88
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
EPROTO
#define EPROTO
Definition: err.hpp:26
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
ECONNABORTED
#define ECONNABORTED
Definition: zmq.h:140
zmq::set_socket_priority
void set_socket_priority(fd_t s_, int priority_)
Definition: ip.cpp:211
zmq::make_unconnected_bind_endpoint_pair
endpoint_uri_pair_t make_unconnected_bind_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:14
zmq::tune_tcp_maxrt
int tune_tcp_maxrt(fd_t sockfd_, int timeout_)
Definition: tcp.cpp:160
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
address.hpp
zmq::tune_tcp_keepalives
int tune_tcp_keepalives(fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
Definition: tcp.cpp:71
i
int i
Definition: gmock-matchers_test.cc:764
io_thread.hpp
ENOBUFS
#define ENOBUFS
Definition: zmq.h:110
socket_base.hpp
tcp.hpp
zmq::tune_tcp_socket
int tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:30
size
GLsizeiptr size
Definition: glcorearb.h:2943
zmq::tcp_open_socket
fd_t tcp_open_socket(const char *address_, const options_t &options_, bool local_, bool fallback_to_ipv4_, tcp_address_t *out_tcp_addr_)
Definition: tcp.cpp:332
err.hpp
zmq::set_ip_type_of_service
void set_ip_type_of_service(fd_t s_, int iptos_)
Definition: ip.cpp:187
tcp_listener.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
zmq::socket_end_t
socket_end_t
Definition: address.hpp:110
zmq::make_socket_noninheritable
void make_socket_noninheritable(fd_t sock_)
Definition: ip.cpp:810
EMFILE
#define EMFILE
Definition: errno.hpp:27
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59