kqueue.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "kqueue.hpp"
5 #if defined ZMQ_IOTHREAD_POLLER_USE_KQUEUE
6 
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/event.h>
10 #include <stdlib.h>
11 #include <unistd.h>
12 #include <algorithm>
13 #include <new>
14 
15 #include "macros.hpp"
16 #include "kqueue.hpp"
17 #include "err.hpp"
18 #include "config.hpp"
19 #include "i_poll_events.hpp"
20 #include "likely.hpp"
21 
22 // NetBSD up to version 9 defines (struct kevent).udata as intptr_t,
23 // everyone else as void *.
24 #if defined ZMQ_HAVE_NETBSD && defined(ZMQ_NETBSD_KEVENT_UDATA_INTPTR_T)
25 #define kevent_udata_t intptr_t
26 #else
27 #define kevent_udata_t void *
28 #endif
29 
30 zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) :
31  worker_poller_base_t (ctx_)
32 {
33  // Create event queue
34  kqueue_fd = kqueue ();
35  errno_assert (kqueue_fd != -1);
36 #ifdef HAVE_FORK
37  pid = getpid ();
38 #endif
39 }
40 
41 zmq::kqueue_t::~kqueue_t ()
42 {
43  stop_worker ();
44  close (kqueue_fd);
45 }
46 
47 void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_)
48 {
49  check_thread ();
50  struct kevent ev;
51 
52  EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t) udata_);
53  int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
54  errno_assert (rc != -1);
55 }
56 
57 void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
58 {
59  struct kevent ev;
60 
61  EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0);
62  int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
63  errno_assert (rc != -1);
64 }
65 
66 zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
67  i_poll_events *reactor_)
68 {
69  check_thread ();
70  poll_entry_t *pe = new (std::nothrow) poll_entry_t;
71  alloc_assert (pe);
72 
73  pe->fd = fd_;
74  pe->flag_pollin = 0;
75  pe->flag_pollout = 0;
76  pe->reactor = reactor_;
77 
78  adjust_load (1);
79 
80  return pe;
81 }
82 
83 void zmq::kqueue_t::rm_fd (handle_t handle_)
84 {
85  check_thread ();
86  poll_entry_t *pe = (poll_entry_t *) handle_;
87  if (pe->flag_pollin)
88  kevent_delete (pe->fd, EVFILT_READ);
89  if (pe->flag_pollout)
90  kevent_delete (pe->fd, EVFILT_WRITE);
91  pe->fd = retired_fd;
92  retired.push_back (pe);
93 
94  adjust_load (-1);
95 }
96 
97 void zmq::kqueue_t::set_pollin (handle_t handle_)
98 {
99  check_thread ();
100  poll_entry_t *pe = (poll_entry_t *) handle_;
101  if (likely (!pe->flag_pollin)) {
102  pe->flag_pollin = true;
103  kevent_add (pe->fd, EVFILT_READ, pe);
104  }
105 }
106 
107 void zmq::kqueue_t::reset_pollin (handle_t handle_)
108 {
109  check_thread ();
110  poll_entry_t *pe = (poll_entry_t *) handle_;
111  if (likely (pe->flag_pollin)) {
112  pe->flag_pollin = false;
113  kevent_delete (pe->fd, EVFILT_READ);
114  }
115 }
116 
117 void zmq::kqueue_t::set_pollout (handle_t handle_)
118 {
119  check_thread ();
120  poll_entry_t *pe = (poll_entry_t *) handle_;
121  if (likely (!pe->flag_pollout)) {
122  pe->flag_pollout = true;
123  kevent_add (pe->fd, EVFILT_WRITE, pe);
124  }
125 }
126 
127 void zmq::kqueue_t::reset_pollout (handle_t handle_)
128 {
129  check_thread ();
130  poll_entry_t *pe = (poll_entry_t *) handle_;
131  if (likely (pe->flag_pollout)) {
132  pe->flag_pollout = false;
133  kevent_delete (pe->fd, EVFILT_WRITE);
134  }
135 }
136 
137 void zmq::kqueue_t::stop ()
138 {
139 }
140 
141 int zmq::kqueue_t::max_fds ()
142 {
143  return -1;
144 }
145 
146 void zmq::kqueue_t::loop ()
147 {
148  while (true) {
149  // Execute any due timers.
150  int timeout = (int) execute_timers ();
151 
152  if (get_load () == 0) {
153  if (timeout == 0)
154  break;
155 
156  // TODO sleep for timeout
157  continue;
158  }
159 
160  // Wait for events.
161  struct kevent ev_buf[max_io_events];
162  timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
163  int n = kevent (kqueue_fd, NULL, 0, &ev_buf[0], max_io_events,
164  timeout ? &ts : NULL);
165 #ifdef HAVE_FORK
166  if (unlikely (pid != getpid ())) {
167  //printf("zmq::kqueue_t::loop aborting on forked child %d\n", (int)getpid());
168  // simply exit the loop in a forked process.
169  return;
170  }
171 #endif
172  if (n == -1) {
173  errno_assert (errno == EINTR);
174  continue;
175  }
176 
177  for (int i = 0; i < n; i++) {
178  poll_entry_t *pe = (poll_entry_t *) ev_buf[i].udata;
179 
180  if (pe->fd == retired_fd)
181  continue;
182  if (ev_buf[i].flags & EV_EOF)
183  pe->reactor->in_event ();
184  if (pe->fd == retired_fd)
185  continue;
186  if (ev_buf[i].filter == EVFILT_WRITE)
187  pe->reactor->out_event ();
188  if (pe->fd == retired_fd)
189  continue;
190  if (ev_buf[i].filter == EVFILT_READ)
191  pe->reactor->in_event ();
192  }
193 
194  // Destroy retired event sources.
195  for (retired_t::iterator it = retired.begin (); it != retired.end ();
196  ++it) {
197  LIBZMQ_DELETE (*it);
198  }
199  retired.clear ();
200  }
201 }
202 
203 #endif
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
i_poll_events.hpp
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
precompiled.hpp
errno
int errno
flags
GLbitfield flags
Definition: glcorearb.h:3585
retired_fd
@ retired_fd
Definition: libzmq/tests/testutil.hpp:117
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
likely
#define likely(x)
Definition: likely.hpp:10
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
n
GLdouble n
Definition: glcorearb.h:4153
i
int i
Definition: gmock-matchers_test.cc:764
zmq::thread_ctx_t
Definition: ctx.hpp:37
filter
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition: glcorearb.h:3467
err.hpp
likely.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
kqueue.hpp
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
zmq::max_io_events
@ max_io_events
Definition: config.hpp:32
unlikely
#define unlikely(x)
Definition: likely.hpp:11


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:55