signaler.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "poller.hpp"
5 #include "polling_util.hpp"
6 
7 #if defined ZMQ_POLL_BASED_ON_POLL
8 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
9 #include <poll.h>
10 #endif
11 #elif defined ZMQ_POLL_BASED_ON_SELECT
12 #if defined ZMQ_HAVE_WINDOWS
13 #elif defined ZMQ_HAVE_HPUX
14 #include <sys/param.h>
15 #include <sys/types.h>
16 #include <sys/time.h>
17 #elif defined ZMQ_HAVE_OPENVMS
18 #include <sys/types.h>
19 #include <sys/time.h>
20 #elif defined ZMQ_HAVE_VXWORKS
21 #include <sys/types.h>
22 #include <sys/time.h>
23 #include <sockLib.h>
24 #include <strings.h>
25 #else
26 #include <sys/select.h>
27 #endif
28 #endif
29 
30 #include "signaler.hpp"
31 #include "likely.hpp"
32 #include "stdint.hpp"
33 #include "config.hpp"
34 #include "err.hpp"
35 #include "fd.hpp"
36 #include "ip.hpp"
37 #include "tcp.hpp"
38 
39 #if !defined ZMQ_HAVE_WINDOWS
40 #include <unistd.h>
41 #include <netinet/tcp.h>
42 #include <sys/types.h>
43 #include <sys/socket.h>
44 #endif
45 
46 #if !defined(ZMQ_HAVE_WINDOWS)
47 // Helper to sleep for specific number of milliseconds (or until signal)
48 //
49 static int sleep_ms (unsigned int ms_)
50 {
51  if (ms_ == 0)
52  return 0;
53 #if defined ZMQ_HAVE_ANDROID
54  usleep (ms_ * 1000);
55  return 0;
56 #elif defined ZMQ_HAVE_VXWORKS
57  struct timespec ns_;
58  ns_.tv_sec = ms_ / 1000;
59  ns_.tv_nsec = ms_ % 1000 * 1000000;
60  return nanosleep (&ns_, 0);
61 #else
62  return usleep (ms_ * 1000);
63 #endif
64 }
65 
66 // Helper to wait on close(), for non-blocking sockets, until it completes
67 // If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
68 // the overall timeout is reached.
69 //
70 static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
71 {
72  unsigned int ms_so_far = 0;
73  const unsigned int min_step_ms = 1;
74  const unsigned int max_step_ms = 100;
75  const unsigned int step_ms =
76  std::min (std::max (min_step_ms, max_ms_ / 10), max_step_ms);
77 
78  int rc = 0; // do not sleep on first attempt
79  do {
80  if (rc == -1 && errno == EAGAIN) {
81  sleep_ms (step_ms);
82  ms_so_far += step_ms;
83  }
84  rc = close (fd_);
85  } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
86 
87  return rc;
88 }
89 #endif
90 
92 {
93  // Create the socketpair for signaling.
94  if (make_fdpair (&_r, &_w) == 0) {
97  }
98 #ifdef HAVE_FORK
99  pid = getpid ();
100 #endif
101 }
102 
103 // This might get run after some part of construction failed, leaving one or
104 // both of _r and _w retired_fd.
106 {
107 #if defined ZMQ_HAVE_EVENTFD
108  if (_r == retired_fd)
109  return;
110  int rc = close_wait_ms (_r);
111  errno_assert (rc == 0);
112 #elif defined ZMQ_HAVE_WINDOWS
113  if (_w != retired_fd) {
114  const struct linger so_linger = {1, 0};
115  int rc = setsockopt (_w, SOL_SOCKET, SO_LINGER,
116  reinterpret_cast<const char *> (&so_linger),
117  sizeof so_linger);
118  // Only check shutdown if WSASTARTUP was previously done
119  if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
120  wsa_assert (rc != SOCKET_ERROR);
121  rc = closesocket (_w);
122  wsa_assert (rc != SOCKET_ERROR);
123  if (_r == retired_fd)
124  return;
125  rc = closesocket (_r);
126  wsa_assert (rc != SOCKET_ERROR);
127  }
128  }
129 #else
130  if (_w != retired_fd) {
131  int rc = close_wait_ms (_w);
132  errno_assert (rc == 0);
133  }
134  if (_r != retired_fd) {
135  int rc = close_wait_ms (_r);
136  errno_assert (rc == 0);
137  }
138 #endif
139 }
140 
142 {
143  return _r;
144 }
145 
147 {
148 #if defined HAVE_FORK
149  if (unlikely (pid != getpid ())) {
150  //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
151  return; // do not send anything in forked child context
152  }
153 #endif
154 #if defined ZMQ_HAVE_EVENTFD
155  const uint64_t inc = 1;
156  ssize_t sz = write (_w, &inc, sizeof (inc));
157  errno_assert (sz == sizeof (inc));
158 #elif defined ZMQ_HAVE_WINDOWS
159  const char dummy = 0;
160  int nbytes;
161  do {
162  nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
163  wsa_assert (nbytes != SOCKET_ERROR);
164  // wsa_assert does not abort on WSAEWOULDBLOCK. If we get this, we retry.
165  } while (nbytes == SOCKET_ERROR);
166  // Given the small size of dummy (should be 1) expect that send was able to send everything.
167  zmq_assert (nbytes == sizeof (dummy));
168 #elif defined ZMQ_HAVE_VXWORKS
169  unsigned char dummy = 0;
170  while (true) {
171  ssize_t nbytes = ::send (_w, (char *) &dummy, sizeof (dummy), 0);
172  if (unlikely (nbytes == -1 && errno == EINTR))
173  continue;
174 #if defined(HAVE_FORK)
175  if (unlikely (pid != getpid ())) {
176  //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
177  errno = EINTR;
178  break;
179  }
180 #endif
181  zmq_assert (nbytes == sizeof dummy);
182  break;
183  }
184 #else
185  unsigned char dummy = 0;
186  while (true) {
187  ssize_t nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
188  if (unlikely (nbytes == -1 && errno == EINTR))
189  continue;
190 #if defined(HAVE_FORK)
191  if (unlikely (pid != getpid ())) {
192  //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
193  errno = EINTR;
194  break;
195  }
196 #endif
197  zmq_assert (nbytes == sizeof dummy);
198  break;
199  }
200 #endif
201 }
202 
203 int zmq::signaler_t::wait (int timeout_) const
204 {
205 #ifdef HAVE_FORK
206  if (unlikely (pid != getpid ())) {
207  // we have forked and the file descriptor is closed. Emulate an interrupt
208  // response.
209  //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
210  errno = EINTR;
211  return -1;
212  }
213 #endif
214 
215 #ifdef ZMQ_POLL_BASED_ON_POLL
216  struct pollfd pfd;
217  pfd.fd = _r;
218  pfd.events = POLLIN;
219  const int rc = poll (&pfd, 1, timeout_);
220  if (unlikely (rc < 0)) {
221  errno_assert (errno == EINTR);
222  return -1;
223  }
224  if (unlikely (rc == 0)) {
225  errno = EAGAIN;
226  return -1;
227  }
228 #ifdef HAVE_FORK
229  if (unlikely (pid != getpid ())) {
230  // we have forked and the file descriptor is closed. Emulate an interrupt
231  // response.
232  //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
233  errno = EINTR;
234  return -1;
235  }
236 #endif
237  zmq_assert (rc == 1);
238  zmq_assert (pfd.revents & POLLIN);
239  return 0;
240 
241 #elif defined ZMQ_POLL_BASED_ON_SELECT
242 
243  optimized_fd_set_t fds (1);
244  FD_ZERO (fds.get ());
245  FD_SET (_r, fds.get ());
246  struct timeval timeout;
247  if (timeout_ >= 0) {
248  timeout.tv_sec = timeout_ / 1000;
249  timeout.tv_usec = timeout_ % 1000 * 1000;
250  }
251 #ifdef ZMQ_HAVE_WINDOWS
252  int rc =
253  select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
254  wsa_assert (rc != SOCKET_ERROR);
255 #else
256  int rc =
257  select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
258  if (unlikely (rc < 0)) {
259  errno_assert (errno == EINTR);
260  return -1;
261  }
262 #endif
263  if (unlikely (rc == 0)) {
264  errno = EAGAIN;
265  return -1;
266  }
267  zmq_assert (rc == 1);
268  return 0;
269 
270 #else
271 #error
272 #endif
273 }
274 
276 {
277 // Attempt to read a signal.
278 #if defined ZMQ_HAVE_EVENTFD
279  uint64_t dummy;
280  ssize_t sz = read (_r, &dummy, sizeof (dummy));
281  errno_assert (sz == sizeof (dummy));
282 
283  // If we accidentally grabbed the next signal(s) along with the current
284  // one, return it back to the eventfd object.
285  if (unlikely (dummy > 1)) {
286  const uint64_t inc = dummy - 1;
287  ssize_t sz2 = write (_w, &inc, sizeof (inc));
288  errno_assert (sz2 == sizeof (inc));
289  return;
290  }
291 
292  zmq_assert (dummy == 1);
293 #else
294  unsigned char dummy;
295 #if defined ZMQ_HAVE_WINDOWS
296  const int nbytes =
297  ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
298  wsa_assert (nbytes != SOCKET_ERROR);
299 #elif defined ZMQ_HAVE_VXWORKS
300  ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
301  errno_assert (nbytes >= 0);
302 #else
303  ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
304  errno_assert (nbytes >= 0);
305 #endif
306  zmq_assert (nbytes == sizeof (dummy));
307  zmq_assert (dummy == 0);
308 #endif
309 }
310 
312 {
313 // Attempt to read a signal.
314 #if defined ZMQ_HAVE_EVENTFD
315  uint64_t dummy;
316  ssize_t sz = read (_r, &dummy, sizeof (dummy));
317  if (sz == -1) {
319  return -1;
320  }
321  errno_assert (sz == sizeof (dummy));
322 
323  // If we accidentally grabbed the next signal(s) along with the current
324  // one, return it back to the eventfd object.
325  if (unlikely (dummy > 1)) {
326  const uint64_t inc = dummy - 1;
327  ssize_t sz2 = write (_w, &inc, sizeof (inc));
328  errno_assert (sz2 == sizeof (inc));
329  return 0;
330  }
331 
332  zmq_assert (dummy == 1);
333 
334 #else
335  unsigned char dummy;
336 #if defined ZMQ_HAVE_WINDOWS
337  const int nbytes =
338  ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
339  if (nbytes == SOCKET_ERROR) {
340  const int last_error = WSAGetLastError ();
341  if (last_error == WSAEWOULDBLOCK) {
342  errno = EAGAIN;
343  return -1;
344  }
345  wsa_assert (last_error == WSAEWOULDBLOCK);
346  }
347 #elif defined ZMQ_HAVE_VXWORKS
348  ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
349  if (nbytes == -1) {
350  if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
351  errno = EAGAIN;
352  return -1;
353  }
354  errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
355  || errno == EINTR);
356  }
357 #else
358  ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
359  if (nbytes == -1) {
360  if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
361  errno = EAGAIN;
362  return -1;
363  }
364  errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
365  || errno == EINTR);
366  }
367 #endif
368  zmq_assert (nbytes == sizeof (dummy));
369  zmq_assert (dummy == 0);
370 #endif
371  return 0;
372 }
373 
375 {
376  return _w != retired_fd;
377 }
378 
379 #ifdef HAVE_FORK
380 void zmq::signaler_t::forked ()
381 {
382  // Close file descriptors created in the parent and create new pair
383  close (_r);
384  close (_w);
385  make_fdpair (&_r, &_w);
386 }
387 #endif
closesocket
#define closesocket
Definition: unittest_poller.cpp:13
ip.hpp
zmq::poll
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:319
zmq::signaler_t::signaler_t
signaler_t()
Definition: signaler.cpp:91
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
polling_util.hpp
errno
int errno
dummy
ReturnVal dummy
Definition: register_benchmark_test.cc:68
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
close_wait_ms
static int close_wait_ms(int fd_, unsigned int max_ms_=2000)
Definition: signaler.cpp:70
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
stdint.hpp
poller.hpp
zmq::signaler_t::send
void send()
Definition: signaler.cpp:146
zmq::make_fdpair
int make_fdpair(fd_t *r_, fd_t *w_)
Definition: ip.cpp:532
zmq::signaler_t::_r
fd_t _r
Definition: signaler.hpp:47
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
fd.hpp
zmq::unblock_socket
void unblock_socket(fd_t s_)
Definition: ip.cpp:107
zmq::signaler_t::_w
fd_t _w
Definition: signaler.hpp:46
zmq::signaler_t::get_fd
fd_t get_fd() const
Definition: signaler.cpp:141
zmq::signaler_t::~signaler_t
~signaler_t()
Definition: signaler.cpp:105
zmq::signaler_t::recv
void recv()
Definition: signaler.cpp:275
sleep_ms
static int sleep_ms(unsigned int ms_)
Definition: signaler.cpp:49
tcp.hpp
signaler.hpp
err.hpp
likely.hpp
zmq::signaler_t::recv_failable
int recv_failable()
Definition: signaler.cpp:311
zmq::signaler_t::valid
bool valid() const
Definition: signaler.cpp:374
zmq::signaler_t::wait
int wait(int timeout_) const
Definition: signaler.cpp:203
unlikely
#define unlikely(x)
Definition: likely.hpp:11


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58