ws_listener.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <new>
5 
6 #include <string>
7 #include <stdio.h>
8 
9 #include "ws_listener.hpp"
10 #include "io_thread.hpp"
11 #include "config.hpp"
12 #include "err.hpp"
13 #include "ip.hpp"
14 #include "tcp.hpp"
15 #include "socket_base.hpp"
16 #include "address.hpp"
17 #include "ws_engine.hpp"
18 #include "session_base.hpp"
19 
20 #ifdef ZMQ_HAVE_WSS
21 #include "wss_engine.hpp"
22 #include "wss_address.hpp"
23 #endif
24 
25 #ifndef ZMQ_HAVE_WINDOWS
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/tcp.h>
30 #include <netinet/in.h>
31 #include <netdb.h>
32 #include <fcntl.h>
33 #ifdef ZMQ_HAVE_VXWORKS
34 #include <sockLib.h>
35 #endif
36 #endif
37 
38 #ifdef ZMQ_HAVE_OPENVMS
39 #include <ioctl.h>
40 #endif
41 
42 zmq::ws_listener_t::ws_listener_t (io_thread_t *io_thread_,
43  socket_base_t *socket_,
44  const options_t &options_,
45  bool wss_) :
46  stream_listener_base_t (io_thread_, socket_, options_), _wss (wss_)
47 {
48 #ifdef ZMQ_HAVE_WSS
49  if (_wss) {
50  int rc = gnutls_certificate_allocate_credentials (&_tls_cred);
51  zmq_assert (rc == GNUTLS_E_SUCCESS);
52 
53  gnutls_datum_t cert = {(unsigned char *) options_.wss_cert_pem.c_str (),
54  (unsigned int) options_.wss_cert_pem.length ()};
55  gnutls_datum_t key = {(unsigned char *) options_.wss_key_pem.c_str (),
56  (unsigned int) options_.wss_key_pem.length ()};
57  rc = gnutls_certificate_set_x509_key_mem (_tls_cred, &cert, &key,
58  GNUTLS_X509_FMT_PEM);
59  zmq_assert (rc == GNUTLS_E_SUCCESS);
60  }
61 #endif
62 }
63 
64 zmq::ws_listener_t::~ws_listener_t ()
65 {
66 #ifdef ZMQ_HAVE_WSS
67  if (_wss)
68  gnutls_certificate_free_credentials (_tls_cred);
69 #endif
70 }
71 
72 void zmq::ws_listener_t::in_event ()
73 {
74  const fd_t fd = accept ();
75 
76  // If connection was reset by the peer in the meantime, just ignore it.
77  // TODO: Handle specific errors like ENFILE/EMFILE etc.
78  if (fd == retired_fd) {
79  _socket->event_accept_failed (
81  return;
82  }
83 
84  int rc = tune_tcp_socket (fd);
85  rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
86  if (rc != 0) {
87  _socket->event_accept_failed (
89  return;
90  }
91 
92  // Create the engine object for this connection.
93  create_engine (fd);
94 }
95 
97  socket_end_t socket_end_) const
98 {
99  std::string socket_name;
100 
101 #ifdef ZMQ_HAVE_WSS
102  if (_wss)
103  socket_name = zmq::get_socket_name<wss_address_t> (fd_, socket_end_);
104  else
105 #endif
106  socket_name = zmq::get_socket_name<ws_address_t> (fd_, socket_end_);
107 
108  return socket_name + _address.path ();
109 }
110 
111 int zmq::ws_listener_t::create_socket (const char *addr_)
112 {
113  tcp_address_t address;
114  _s = tcp_open_socket (addr_, options, true, true, &address);
115  if (_s == retired_fd) {
116  return -1;
117  }
118 
119  // TODO why is this only done for the listener?
121 
122  // Allow reusing of the address.
123  int flag = 1;
124  int rc;
125 #ifdef ZMQ_HAVE_WINDOWS
126  // TODO this was changed for Windows from SO_REUSEADDRE to
127  // SE_EXCLUSIVEADDRUSE by 0ab65324195ad70205514d465b03d851a6de051c,
128  // so the comment above is no longer correct; also, now the settings are
129  // different between listener and connecter with a src address.
130  // is this intentional?
131  rc = setsockopt (_s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
132  reinterpret_cast<const char *> (&flag), sizeof (int));
133  wsa_assert (rc != SOCKET_ERROR);
134 #elif defined ZMQ_HAVE_VXWORKS
135  rc =
136  setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof (int));
137  errno_assert (rc == 0);
138 #else
139  rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
140  errno_assert (rc == 0);
141 #endif
142 
143  // Bind the socket to the network interface and port.
144 #if defined ZMQ_HAVE_VXWORKS
145  rc = bind (_s, (sockaddr *) _address.addr (), _address.addrlen ());
146 #else
147  rc = bind (_s, address.addr (), address.addrlen ());
148 #endif
149 #ifdef ZMQ_HAVE_WINDOWS
150  if (rc == SOCKET_ERROR) {
151  errno = wsa_error_to_errno (WSAGetLastError ());
152  goto error;
153  }
154 #else
155  if (rc != 0)
156  goto error;
157 #endif
158 
159  // Listen for incoming connections.
160  rc = listen (_s, options.backlog);
161 #ifdef ZMQ_HAVE_WINDOWS
162  if (rc == SOCKET_ERROR) {
163  errno = wsa_error_to_errno (WSAGetLastError ());
164  goto error;
165  }
166 #else
167  if (rc != 0)
168  goto error;
169 #endif
170 
171  return 0;
172 
173 error:
174  const int err = errno;
175  close ();
176  errno = err;
177  return -1;
178 }
179 
180 int zmq::ws_listener_t::set_local_address (const char *addr_)
181 {
182  if (options.use_fd != -1) {
183  // in this case, the addr_ passed is not used and ignored, since the
184  // socket was already created by the application
185  _s = options.use_fd;
186  } else {
187  const int rc = _address.resolve (addr_, true, options.ipv6);
188  if (rc != 0)
189  return -1;
190 
191  // remove the path, otherwise resolving the port will fail with wildcard
192  const char *delim = strrchr (addr_, '/');
193  std::string host_address;
194  if (delim) {
195  host_address = std::string (addr_, delim - addr_);
196  } else {
197  host_address = addr_;
198  }
199 
200  if (create_socket (host_address.c_str ()) == -1)
201  return -1;
202  }
203 
204  _endpoint = get_socket_name (_s, socket_end_local);
205 
206  _socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
207  _s);
208  return 0;
209 }
210 
211 zmq::fd_t zmq::ws_listener_t::accept ()
212 {
213  // The situation where connection cannot be accepted due to insufficient
214  // resources is considered valid and treated by ignoring the connection.
215  // Accept one connection and deal with different failure modes.
216  zmq_assert (_s != retired_fd);
217 
218  struct sockaddr_storage ss;
219  memset (&ss, 0, sizeof (ss));
220 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
221  int ss_len = sizeof (ss);
222 #else
223  socklen_t ss_len = sizeof (ss);
224 #endif
225 #if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4
226  fd_t sock = ::accept4 (_s, reinterpret_cast<struct sockaddr *> (&ss),
227  &ss_len, SOCK_CLOEXEC);
228 #else
229  const fd_t sock =
230  ::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
231 #endif
232 
233  if (sock == retired_fd) {
234 #if defined ZMQ_HAVE_WINDOWS
235  const int last_error = WSAGetLastError ();
236  wsa_assert (last_error == WSAEWOULDBLOCK || last_error == WSAECONNRESET
237  || last_error == WSAEMFILE || last_error == WSAENOBUFS);
238 #elif defined ZMQ_HAVE_ANDROID
239  errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR
240  || errno == ECONNABORTED || errno == EPROTO
241  || errno == ENOBUFS || errno == ENOMEM || errno == EMFILE
242  || errno == ENFILE || errno == EINVAL);
243 #else
244  errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR
245  || errno == ECONNABORTED || errno == EPROTO
246  || errno == ENOBUFS || errno == ENOMEM || errno == EMFILE
247  || errno == ENFILE);
248 #endif
249  return retired_fd;
250  }
251 
253 
254  if (zmq::set_nosigpipe (sock)) {
255 #ifdef ZMQ_HAVE_WINDOWS
256  const int rc = closesocket (sock);
257  wsa_assert (rc != SOCKET_ERROR);
258 #else
259  int rc = ::close (sock);
260  errno_assert (rc == 0);
261 #endif
262  return retired_fd;
263  }
264 
265  // Set the IP Type-Of-Service priority for this client socket
266  if (options.tos != 0)
268 
269  // Set the protocol-defined priority for this client socket
270  if (options.priority != 0)
271  set_socket_priority (sock, options.priority);
272 
273  return sock;
274 }
275 
276 void zmq::ws_listener_t::create_engine (fd_t fd_)
277 {
278  const endpoint_uri_pair_t endpoint_pair (
281 
282  i_engine *engine = NULL;
283  if (_wss)
284 #ifdef ZMQ_HAVE_WSS
285  engine = new (std::nothrow)
286  wss_engine_t (fd_, options, endpoint_pair, _address, false, _tls_cred,
287  std::string ());
288 #else
289  zmq_assert (false);
290 #endif
291  else
292  engine = new (std::nothrow)
293  ws_engine_t (fd_, options, endpoint_pair, _address, false);
294 
295  alloc_assert (engine);
296 
297  // Choose I/O thread to run connecter in. Given that we are already
298  // running in an I/O thread, there must be at least one available.
299  io_thread_t *io_thread = choose_io_thread (options.affinity);
300  zmq_assert (io_thread);
301 
302  // Create and launch a session object.
303  session_base_t *session =
304  session_base_t::create (io_thread, false, _socket, options, NULL);
305  errno_assert (session);
306  session->inc_seqnum ();
307  launch_child (session);
308  send_attach (session, engine, false);
309 
310  _socket->event_accepted (endpoint_pair, fd_);
311 }
zmq::get_socket_name
std::string get_socket_name(fd_t fd_, socket_end_t socket_end_)
Definition: address.hpp:120
closesocket
#define closesocket
Definition: unittest_poller.cpp:13
ip.hpp
zmq::socket_end_local
@ socket_end_local
Definition: address.hpp:112
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
EINVAL
#define EINVAL
Definition: errno.hpp:25
sock
void * sock
Definition: test_connect_resolve.cpp:9
zmq::session_base_t::create
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
Definition: session_base.cpp:27
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
zmq::set_nosigpipe
int set_nosigpipe(fd_t s_)
Definition: ip.cpp:224
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
wss_engine.hpp
errno
int errno
address
const char * address
Definition: builds/zos/test_fork.cpp:6
error
Definition: cJSON.c:88
zmq::socket_end_remote
@ socket_end_remote
Definition: address.hpp:113
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
EPROTO
#define EPROTO
Definition: err.hpp:26
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
ECONNABORTED
#define ECONNABORTED
Definition: zmq.h:140
zmq::set_socket_priority
void set_socket_priority(fd_t s_, int priority_)
Definition: ip.cpp:211
zmq::make_unconnected_bind_endpoint_pair
endpoint_uri_pair_t make_unconnected_bind_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:14
zmq::tune_tcp_maxrt
int tune_tcp_maxrt(fd_t sockfd_, int timeout_)
Definition: tcp.cpp:160
ws_engine.hpp
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
cert
const char * cert
Definition: test_wss_transport.cpp:64
address.hpp
key
const SETUP_TEARDOWN_TESTCONTEXT char * key
Definition: test_wss_transport.cpp:10
zmq::endpoint_type_bind
@ endpoint_type_bind
Definition: endpoint.hpp:13
io_thread.hpp
ENOBUFS
#define ENOBUFS
Definition: zmq.h:110
socket_base.hpp
tcp.hpp
zmq::tune_tcp_socket
int tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:30
zmq::tcp_open_socket
fd_t tcp_open_socket(const char *address_, const options_t &options_, bool local_, bool fallback_to_ipv4_, tcp_address_t *out_tcp_addr_)
Definition: tcp.cpp:332
err.hpp
zmq::set_ip_type_of_service
void set_ip_type_of_service(fd_t s_, int iptos_)
Definition: ip.cpp:187
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
wss_address.hpp
zmq::socket_end_t
socket_end_t
Definition: address.hpp:110
session_base.hpp
zmq::make_socket_noninheritable
void make_socket_noninheritable(fd_t sock_)
Definition: ip.cpp:810
ws_listener.hpp
EMFILE
#define EMFILE
Definition: errno.hpp:27
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02