select.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "select.hpp"
5 #if defined ZMQ_IOTHREAD_POLLER_USE_SELECT
6 
7 #if defined ZMQ_HAVE_WINDOWS
8 #elif defined ZMQ_HAVE_HPUX
9 #include <sys/param.h>
10 #include <sys/types.h>
11 #include <sys/time.h>
12 #elif defined ZMQ_HAVE_OPENVMS
13 #include <sys/types.h>
14 #include <sys/time.h>
15 #elif defined ZMQ_HAVE_VXWORKS
16 #include <sys/types.h>
17 #include <sys/time.h>
18 #include <strings.h>
19 #else
20 #include <sys/select.h>
21 #endif
22 
23 #include "err.hpp"
24 #include "config.hpp"
25 #include "i_poll_events.hpp"
26 
27 #include <algorithm>
28 #include <limits>
29 #include <climits>
30 
31 zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
32  worker_poller_base_t (ctx_),
33 #if defined ZMQ_HAVE_WINDOWS
34  // Fine as long as map is not cleared.
35  _current_family_entry_it (_family_entries.end ())
36 #else
37  _max_fd (retired_fd)
38 #endif
39 {
40 #if defined ZMQ_HAVE_WINDOWS
41  for (size_t i = 0; i < fd_family_cache_size; ++i)
42  _fd_family_cache[i] = std::make_pair (retired_fd, 0);
43 #endif
44 }
45 
46 zmq::select_t::~select_t ()
47 {
48  stop_worker ();
49 }
50 
51 zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
52 {
53  check_thread ();
54  zmq_assert (fd_ != retired_fd);
55 
56  fd_entry_t fd_entry;
57  fd_entry.fd = fd_;
58  fd_entry.events = events_;
59 
60 #if defined ZMQ_HAVE_WINDOWS
61  u_short family = get_fd_family (fd_);
62  wsa_assert (family != AF_UNSPEC);
63  family_entry_t &family_entry = _family_entries[family];
64 #else
65  family_entry_t &family_entry = _family_entry;
66 #endif
67  family_entry.fd_entries.push_back (fd_entry);
68  FD_SET (fd_, &family_entry.fds_set.error);
69 
70 #if !defined ZMQ_HAVE_WINDOWS
71  if (fd_ > _max_fd)
72  _max_fd = fd_;
73 #endif
74 
75  adjust_load (1);
76 
77  return fd_;
78 }
79 
80 zmq::select_t::fd_entries_t::iterator
81 zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries_,
82  handle_t handle_)
83 {
84  fd_entries_t::iterator fd_entry_it;
85  for (fd_entry_it = fd_entries_.begin (); fd_entry_it != fd_entries_.end ();
86  ++fd_entry_it)
87  if (fd_entry_it->fd == handle_)
88  break;
89 
90  return fd_entry_it;
91 }
92 
93 void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
94  const fds_set_t &local_fds_set_,
95  int event_count_)
96 {
97  // Size is cached to avoid iteration through recently added descriptors.
98  for (fd_entries_t::size_type i = 0, size = fd_entries_.size ();
99  i < size && event_count_ > 0; ++i) {
100  // fd_entries_[i] may not be stored, since calls to
101  // in_event/out_event may reallocate the vector
102 
103  if (is_retired_fd (fd_entries_[i]))
104  continue;
105 
106  if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.read)) {
107  fd_entries_[i].events->in_event ();
108  --event_count_;
109  }
110 
111  // TODO: can the is_retired_fd be true at this point? if it
112  // was retired before, we would already have continued, and I
113  // don't see where it might have been modified
114  // And if rc == 0, we can break instead of continuing
115  if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
116  continue;
117 
118  if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.write)) {
119  fd_entries_[i].events->out_event ();
120  --event_count_;
121  }
122 
123  // TODO: same as above
124  if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
125  continue;
126 
127  if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.error)) {
128  fd_entries_[i].events->in_event ();
129  --event_count_;
130  }
131  }
132 }
133 
134 #if defined ZMQ_HAVE_WINDOWS
135 int zmq::select_t::try_retire_fd_entry (
136  family_entries_t::iterator family_entry_it_, zmq::fd_t &handle_)
137 {
138  family_entry_t &family_entry = family_entry_it_->second;
139 
140  fd_entries_t::iterator fd_entry_it =
141  find_fd_entry_by_handle (family_entry.fd_entries, handle_);
142 
143  if (fd_entry_it == family_entry.fd_entries.end ())
144  return 0;
145 
146  fd_entry_t &fd_entry = *fd_entry_it;
147  zmq_assert (fd_entry.fd != retired_fd);
148 
149  if (family_entry_it_ != _current_family_entry_it) {
150  // Family is not currently being iterated and can be safely
151  // modified in-place. So later it can be skipped without
152  // re-verifying its content.
153  family_entry.fd_entries.erase (fd_entry_it);
154  } else {
155  // Otherwise mark removed entries as retired. It will be cleaned up
156  // at the end of the iteration. See zmq::select_t::loop
157  fd_entry.fd = retired_fd;
158  family_entry.has_retired = true;
159  }
160  family_entry.fds_set.remove_fd (handle_);
161  return 1;
162 }
163 #endif
164 
165 void zmq::select_t::rm_fd (handle_t handle_)
166 {
167  check_thread ();
168  int retired = 0;
169 #if defined ZMQ_HAVE_WINDOWS
170  u_short family = get_fd_family (handle_);
171  if (family != AF_UNSPEC) {
172  family_entries_t::iterator family_entry_it =
173  _family_entries.find (family);
174 
175  retired += try_retire_fd_entry (family_entry_it, handle_);
176  } else {
177  // get_fd_family may fail and return AF_UNSPEC if the socket was not
178  // successfully connected. In that case, we need to look for the
179  // socket in all family_entries.
180  family_entries_t::iterator end = _family_entries.end ();
181  for (family_entries_t::iterator family_entry_it =
182  _family_entries.begin ();
183  family_entry_it != end; ++family_entry_it) {
184  if (retired += try_retire_fd_entry (family_entry_it, handle_)) {
185  break;
186  }
187  }
188  }
189 #else
190  fd_entries_t::iterator fd_entry_it =
191  find_fd_entry_by_handle (_family_entry.fd_entries, handle_);
192  assert (fd_entry_it != _family_entry.fd_entries.end ());
193 
194  zmq_assert (fd_entry_it->fd != retired_fd);
195  fd_entry_it->fd = retired_fd;
196  _family_entry.fds_set.remove_fd (handle_);
197 
198  ++retired;
199 
200  if (handle_ == _max_fd) {
201  _max_fd = retired_fd;
202  for (fd_entry_it = _family_entry.fd_entries.begin ();
203  fd_entry_it != _family_entry.fd_entries.end (); ++fd_entry_it)
204  if (fd_entry_it->fd > _max_fd)
205  _max_fd = fd_entry_it->fd;
206  }
207 
208  _family_entry.has_retired = true;
209 #endif
210  zmq_assert (retired == 1);
211  adjust_load (-1);
212 }
213 
214 void zmq::select_t::set_pollin (handle_t handle_)
215 {
216  check_thread ();
217 #if defined ZMQ_HAVE_WINDOWS
218  u_short family = get_fd_family (handle_);
219  wsa_assert (family != AF_UNSPEC);
220  family_entry_t &family_entry = _family_entries[family];
221 #else
222  family_entry_t &family_entry = _family_entry;
223 #endif
224  FD_SET (handle_, &family_entry.fds_set.read);
225 }
226 
227 void zmq::select_t::reset_pollin (handle_t handle_)
228 {
229  check_thread ();
230 #if defined ZMQ_HAVE_WINDOWS
231  u_short family = get_fd_family (handle_);
232  wsa_assert (family != AF_UNSPEC);
233  family_entry_t &family_entry = _family_entries[family];
234 #else
235  family_entry_t &family_entry = _family_entry;
236 #endif
237  FD_CLR (handle_, &family_entry.fds_set.read);
238 }
239 
240 void zmq::select_t::set_pollout (handle_t handle_)
241 {
242  check_thread ();
243 #if defined ZMQ_HAVE_WINDOWS
244  u_short family = get_fd_family (handle_);
245  wsa_assert (family != AF_UNSPEC);
246  family_entry_t &family_entry = _family_entries[family];
247 #else
248  family_entry_t &family_entry = _family_entry;
249 #endif
250  FD_SET (handle_, &family_entry.fds_set.write);
251 }
252 
253 void zmq::select_t::reset_pollout (handle_t handle_)
254 {
255  check_thread ();
256 #if defined ZMQ_HAVE_WINDOWS
257  u_short family = get_fd_family (handle_);
258  wsa_assert (family != AF_UNSPEC);
259  family_entry_t &family_entry = _family_entries[family];
260 #else
261  family_entry_t &family_entry = _family_entry;
262 #endif
263  FD_CLR (handle_, &family_entry.fds_set.write);
264 }
265 
266 void zmq::select_t::stop ()
267 {
268  check_thread ();
269  // no-op... thread is stopped when no more fds or timers are registered
270 }
271 
272 int zmq::select_t::max_fds ()
273 {
274  return FD_SETSIZE;
275 }
276 
277 void zmq::select_t::loop ()
278 {
279  while (true) {
280  // Execute any due timers.
281  int timeout = static_cast<int> (execute_timers ());
282 
283  cleanup_retired ();
284 
285 #ifdef _WIN32
286  if (_family_entries.empty ()) {
287 #else
288  if (_family_entry.fd_entries.empty ()) {
289 #endif
290  zmq_assert (get_load () == 0);
291 
292  if (timeout == 0)
293  break;
294 
295  // TODO sleep for timeout
296  continue;
297  }
298 
299 #if defined ZMQ_HAVE_OSX
300  struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000};
301 #else
302  struct timeval tv = {static_cast<long> (timeout / 1000),
303  static_cast<long> (timeout % 1000 * 1000)};
304 #endif
305 
306 #if defined ZMQ_HAVE_WINDOWS
307  /*
308  On Windows select does not allow to mix descriptors from different
309  service providers. It seems to work for AF_INET and AF_INET6,
310  but fails for AF_INET and VMCI. The workaround is to use
311  WSAEventSelect and WSAWaitForMultipleEvents to wait, then use
312  select to find out what actually changed. WSAWaitForMultipleEvents
313  cannot be used alone, because it does not support more than 64 events
314  which is not enough.
315 
316  To reduce unnecessary overhead, WSA is only used when there are more
317  than one family. Moreover, AF_INET and AF_INET6 are considered the same
318  family because Windows seems to handle them properly.
319  See get_fd_family for details.
320  */
321 
322  // If there is just one family, there is no reason to use WSA events.
323  int rc = 0;
324  const bool use_wsa_events = _family_entries.size () > 1;
325  if (use_wsa_events) {
326  // TODO: I don't really understand why we are doing this. If any of
327  // the events was signaled, we will call select for each fd_family
328  // afterwards. The only benefit is if none of the events was
329  // signaled, then we continue early.
330  // IMHO, either WSAEventSelect/WSAWaitForMultipleEvents or select
331  // should be used, but not both
332 
333  wsa_events_t wsa_events;
334 
335  for (family_entries_t::iterator family_entry_it =
336  _family_entries.begin ();
337  family_entry_it != _family_entries.end (); ++family_entry_it) {
338  family_entry_t &family_entry = family_entry_it->second;
339 
340  for (fd_entries_t::iterator fd_entry_it =
341  family_entry.fd_entries.begin ();
342  fd_entry_it != family_entry.fd_entries.end ();
343  ++fd_entry_it) {
344  fd_t fd = fd_entry_it->fd;
345 
346  // http://stackoverflow.com/q/35043420/188530
347  if (FD_ISSET (fd, &family_entry.fds_set.read)
348  && FD_ISSET (fd, &family_entry.fds_set.write))
349  rc = WSAEventSelect (fd, wsa_events.events[3],
350  FD_READ | FD_ACCEPT | FD_CLOSE
351  | FD_WRITE | FD_CONNECT);
352  else if (FD_ISSET (fd, &family_entry.fds_set.read))
353  rc = WSAEventSelect (fd, wsa_events.events[0],
354  FD_READ | FD_ACCEPT | FD_CLOSE);
355  else if (FD_ISSET (fd, &family_entry.fds_set.write))
356  rc = WSAEventSelect (fd, wsa_events.events[1],
357  FD_WRITE | FD_CONNECT);
358  else
359  rc = 0;
360 
361  wsa_assert (rc != SOCKET_ERROR);
362  }
363  }
364 
365  rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE,
366  timeout ? timeout : INFINITE, FALSE);
367  wsa_assert (rc != (int) WSA_WAIT_FAILED);
368  zmq_assert (rc != WSA_WAIT_IO_COMPLETION);
369 
370  if (rc == WSA_WAIT_TIMEOUT)
371  continue;
372  }
373 
374  for (_current_family_entry_it = _family_entries.begin ();
375  _current_family_entry_it != _family_entries.end ();
376  ++_current_family_entry_it) {
377  family_entry_t &family_entry = _current_family_entry_it->second;
378 
379 
380  if (use_wsa_events) {
381  // There is no reason to wait again after WSAWaitForMultipleEvents.
382  // Simply collect what is ready.
383  struct timeval tv_nodelay = {0, 0};
384  select_family_entry (family_entry, 0, true, tv_nodelay);
385  } else {
386  select_family_entry (family_entry, 0, timeout > 0, tv);
387  }
388  }
389 #else
390  select_family_entry (_family_entry, _max_fd + 1, timeout > 0, tv);
391 #endif
392  }
393 }
394 
395 void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
396  const int max_fd_,
397  const bool use_timeout_,
398  struct timeval &tv_)
399 {
400  // select will fail when run with empty sets.
401  fd_entries_t &fd_entries = family_entry_.fd_entries;
402  if (fd_entries.empty ())
403  return;
404 
405  fds_set_t local_fds_set = family_entry_.fds_set;
406  int rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write,
407  &local_fds_set.error, use_timeout_ ? &tv_ : NULL);
408 
409 #if defined ZMQ_HAVE_WINDOWS
410  wsa_assert (rc != SOCKET_ERROR);
411 #else
412  if (rc == -1) {
413  errno_assert (errno == EINTR);
414  return;
415  }
416 #endif
417 
418  trigger_events (fd_entries, local_fds_set, rc);
419 
420  cleanup_retired (family_entry_);
421 }
422 
423 zmq::select_t::fds_set_t::fds_set_t ()
424 {
425  FD_ZERO (&read);
426  FD_ZERO (&write);
427  FD_ZERO (&error);
428 }
429 
430 zmq::select_t::fds_set_t::fds_set_t (const fds_set_t &other_)
431 {
432 #if defined ZMQ_HAVE_WINDOWS
433  // On Windows we don't need to copy the whole fd_set.
434  // SOCKETS are continuous from the beginning of fd_array in fd_set.
435  // We just need to copy fd_count elements of fd_array.
436  // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
437  memcpy (&read, &other_.read,
438  (char *) (other_.read.fd_array + other_.read.fd_count)
439  - (char *) &other_.read);
440  memcpy (&write, &other_.write,
441  (char *) (other_.write.fd_array + other_.write.fd_count)
442  - (char *) &other_.write);
443  memcpy (&error, &other_.error,
444  (char *) (other_.error.fd_array + other_.error.fd_count)
445  - (char *) &other_.error);
446 #else
447  memcpy (&read, &other_.read, sizeof other_.read);
448  memcpy (&write, &other_.write, sizeof other_.write);
449  memcpy (&error, &other_.error, sizeof other_.error);
450 #endif
451 }
452 
453 zmq::select_t::fds_set_t &
454 zmq::select_t::fds_set_t::operator= (const fds_set_t &other_)
455 {
456 #if defined ZMQ_HAVE_WINDOWS
457  // On Windows we don't need to copy the whole fd_set.
458  // SOCKETS are continuous from the beginning of fd_array in fd_set.
459  // We just need to copy fd_count elements of fd_array.
460  // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
461  memcpy (&read, &other_.read,
462  (char *) (other_.read.fd_array + other_.read.fd_count)
463  - (char *) &other_.read);
464  memcpy (&write, &other_.write,
465  (char *) (other_.write.fd_array + other_.write.fd_count)
466  - (char *) &other_.write);
467  memcpy (&error, &other_.error,
468  (char *) (other_.error.fd_array + other_.error.fd_count)
469  - (char *) &other_.error);
470 #else
471  memcpy (&read, &other_.read, sizeof other_.read);
472  memcpy (&write, &other_.write, sizeof other_.write);
473  memcpy (&error, &other_.error, sizeof other_.error);
474 #endif
475  return *this;
476 }
477 
478 void zmq::select_t::fds_set_t::remove_fd (const fd_t &fd_)
479 {
480  FD_CLR (fd_, &read);
481  FD_CLR (fd_, &write);
482  FD_CLR (fd_, &error);
483 }
484 
485 bool zmq::select_t::cleanup_retired (family_entry_t &family_entry_)
486 {
487  if (family_entry_.has_retired) {
488  family_entry_.has_retired = false;
489  family_entry_.fd_entries.erase (
490  std::remove_if (family_entry_.fd_entries.begin (),
491  family_entry_.fd_entries.end (), is_retired_fd),
492  family_entry_.fd_entries.end ());
493  }
494  return family_entry_.fd_entries.empty ();
495 }
496 
497 void zmq::select_t::cleanup_retired ()
498 {
499 #ifdef _WIN32
500  for (family_entries_t::iterator it = _family_entries.begin ();
501  it != _family_entries.end ();) {
502  if (cleanup_retired (it->second))
503  it = _family_entries.erase (it);
504  else
505  ++it;
506  }
507 #else
508  cleanup_retired (_family_entry);
509 #endif
510 }
511 
512 bool zmq::select_t::is_retired_fd (const fd_entry_t &entry_)
513 {
514  return entry_.fd == retired_fd;
515 }
516 
517 zmq::select_t::family_entry_t::family_entry_t () : has_retired (false)
518 {
519 }
520 
521 
522 #if defined ZMQ_HAVE_WINDOWS
523 u_short zmq::select_t::get_fd_family (fd_t fd_)
524 {
525  // cache the results of determine_fd_family, as this is frequently called
526  // for the same sockets, and determine_fd_family is expensive
527  size_t i;
528  for (i = 0; i < fd_family_cache_size; ++i) {
529  const std::pair<fd_t, u_short> &entry = _fd_family_cache[i];
530  if (entry.first == fd_) {
531  return entry.second;
532  }
533  if (entry.first == retired_fd)
534  break;
535  }
536 
537  std::pair<fd_t, u_short> res =
538  std::make_pair (fd_, determine_fd_family (fd_));
539  if (i < fd_family_cache_size) {
540  _fd_family_cache[i] = res;
541  } else {
542  // just overwrite a random entry
543  // could be optimized by some LRU strategy
544  _fd_family_cache[rand () % fd_family_cache_size] = res;
545  }
546 
547  return res.second;
548 }
549 
550 u_short zmq::select_t::determine_fd_family (fd_t fd_)
551 {
552  // Use sockaddr_storage instead of sockaddr to accommodate different structure sizes
553  sockaddr_storage addr = {0};
554  int addr_size = sizeof addr;
555 
556  int type;
557  int type_length = sizeof (int);
558 
559  int rc = getsockopt (fd_, SOL_SOCKET, SO_TYPE,
560  reinterpret_cast<char *> (&type), &type_length);
561 
562  if (rc == 0) {
563  if (type == SOCK_DGRAM)
564  return AF_INET;
565 
566  rc =
567  getsockname (fd_, reinterpret_cast<sockaddr *> (&addr), &addr_size);
568 
569  // AF_INET and AF_INET6 can be mixed in select
570  // TODO: If proven otherwise, should simply return addr.sa_family
571  if (rc != SOCKET_ERROR)
572  return addr.ss_family == AF_INET6 ? AF_INET : addr.ss_family;
573  }
574 
575  return AF_UNSPEC;
576 }
577 
578 zmq::select_t::wsa_events_t::wsa_events_t ()
579 {
580  events[0] = WSACreateEvent ();
581  wsa_assert (events[0] != WSA_INVALID_EVENT);
582  events[1] = WSACreateEvent ();
583  wsa_assert (events[1] != WSA_INVALID_EVENT);
584  events[2] = WSACreateEvent ();
585  wsa_assert (events[2] != WSA_INVALID_EVENT);
586  events[3] = WSACreateEvent ();
587  wsa_assert (events[3] != WSA_INVALID_EVENT);
588 }
589 
590 zmq::select_t::wsa_events_t::~wsa_events_t ()
591 {
592  wsa_assert (WSACloseEvent (events[0]));
593  wsa_assert (WSACloseEvent (events[1]));
594  wsa_assert (WSACloseEvent (events[2]));
595  wsa_assert (WSACloseEvent (events[3]));
596 }
597 #endif
598 
599 #endif
i_poll_events.hpp
end
GLuint GLuint end
Definition: glcorearb.h:2858
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
select.hpp
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
if
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END if(!upb_strtable_init(&intern->table, UPB_CTYPE_UINT64))
Definition: php/ext/google/protobuf/map.c:232
errno
int errno
error
Definition: cJSON.c:88
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
retired_fd
@ retired_fd
Definition: libzmq/tests/testutil.hpp:117
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
FD_SETSIZE
#define FD_SETSIZE
Definition: deprecated-msvc/vs2015_xp/platform.hpp:10
i
int i
Definition: gmock-matchers_test.cc:764
zmq::thread_ctx_t
Definition: ctx.hpp:37
type
GLenum type
Definition: glcorearb.h:2695
size
GLsizeiptr size
Definition: glcorearb.h:2943
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
false
#define false
Definition: cJSON.c:70
ZMQ_HAVE_WINDOWS
#define ZMQ_HAVE_WINDOWS
Definition: deprecated-msvc/platform.hpp:4
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
google::protobuf.internal.decoder.long
long
Definition: decoder.py:89


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58