kqueue.c
Go to the documentation of this file.
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  * Permission is hereby granted, free of charge, to any person obtaining a copy
3  * of this software and associated documentation files (the "Software"), to
4  * deal in the Software without restriction, including without limitation the
5  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
6  * sell copies of the Software, and to permit persons to whom the Software is
7  * furnished to do so, subject to the following conditions:
8  *
9  * The above copyright notice and this permission notice shall be included in
10  * all copies or substantial portions of the Software.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
15  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
17  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
18  * IN THE SOFTWARE.
19  */
20 
21 #include "uv.h"
22 #include "internal.h"
23 
24 #include <assert.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <errno.h>
28 
29 #include <sys/sysctl.h>
30 #include <sys/types.h>
31 #include <sys/event.h>
32 #include <sys/time.h>
33 #include <unistd.h>
34 #include <fcntl.h>
35 #include <time.h>
36 
37 /*
38  * Required on
39  * - Until at least FreeBSD 11.0
40  * - Older versions of Mac OS X
41  *
42  * http://www.boost.org/doc/libs/1_61_0/boost/asio/detail/kqueue_reactor.hpp
43  */
44 #ifndef EV_OOBAND
45 #define EV_OOBAND EV_FLAG1
46 #endif
47 
48 static void uv__fs_event(uv_loop_t* loop, uv__io_t* w, unsigned int fflags);
49 
50 
52  loop->backend_fd = kqueue();
53  if (loop->backend_fd == -1)
54  return UV__ERR(errno);
55 
56  uv__cloexec(loop->backend_fd, 1);
57 
58  return 0;
59 }
60 
61 
62 #if defined(__APPLE__) && MAC_OS_X_VERSION_MAX_ALLOWED >= 1070
63 static int uv__has_forked_with_cfrunloop;
64 #endif
65 
67  int err;
68  loop->backend_fd = -1;
70  if (err)
71  return err;
72 
73 #if defined(__APPLE__) && MAC_OS_X_VERSION_MAX_ALLOWED >= 1070
74  if (loop->cf_state != NULL) {
75  /* We cannot start another CFRunloop and/or thread in the child
76  process; CF aborts if you try or if you try to touch the thread
77  at all to kill it. So the best we can do is ignore it from now
78  on. This means we can't watch directories in the same way
79  anymore (like other BSDs). It also means we cannot properly
80  clean up the allocated resources; calling
81  uv__fsevents_loop_delete from uv_loop_close will crash the
82  process. So we sidestep the issue by pretending like we never
83  started it in the first place.
84  */
85  uv__has_forked_with_cfrunloop = 1;
86  uv__free(loop->cf_state);
87  loop->cf_state = NULL;
88  }
89 #endif /* #if defined(__APPLE__) && MAC_OS_X_VERSION_MAX_ALLOWED >= 1070 */
90  return err;
91 }
92 
93 
95  struct kevent ev;
96  int rc;
97 
98  rc = 0;
99  EV_SET(&ev, fd, EVFILT_READ, EV_ADD, 0, 0, 0);
100  if (kevent(loop->backend_fd, &ev, 1, NULL, 0, NULL))
101  rc = UV__ERR(errno);
102 
103  EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
104  if (rc == 0)
105  if (kevent(loop->backend_fd, &ev, 1, NULL, 0, NULL))
106  abort();
107 
108  return rc;
109 }
110 
111 
113  struct kevent events[1024];
114  struct kevent* ev;
115  struct timespec spec;
116  unsigned int nevents;
117  unsigned int revents;
118  QUEUE* q;
119  uv__io_t* w;
120  sigset_t* pset;
121  sigset_t set;
122  uint64_t base;
123  uint64_t diff;
124  int have_signals;
125  int filter;
126  int fflags;
127  int count;
128  int nfds;
129  int fd;
130  int op;
131  int i;
132 
133  if (loop->nfds == 0) {
134  assert(QUEUE_EMPTY(&loop->watcher_queue));
135  return;
136  }
137 
138  nevents = 0;
139 
140  while (!QUEUE_EMPTY(&loop->watcher_queue)) {
141  q = QUEUE_HEAD(&loop->watcher_queue);
142  QUEUE_REMOVE(q);
143  QUEUE_INIT(q);
144 
145  w = QUEUE_DATA(q, uv__io_t, watcher_queue);
146  assert(w->pevents != 0);
147  assert(w->fd >= 0);
148  assert(w->fd < (int) loop->nwatchers);
149 
150  if ((w->events & POLLIN) == 0 && (w->pevents & POLLIN) != 0) {
151  filter = EVFILT_READ;
152  fflags = 0;
153  op = EV_ADD;
154 
155  if (w->cb == uv__fs_event) {
156  filter = EVFILT_VNODE;
157  fflags = NOTE_ATTRIB | NOTE_WRITE | NOTE_RENAME
158  | NOTE_DELETE | NOTE_EXTEND | NOTE_REVOKE;
159  op = EV_ADD | EV_ONESHOT; /* Stop the event from firing repeatedly. */
160  }
161 
162  EV_SET(events + nevents, w->fd, filter, op, fflags, 0, 0);
163 
164  if (++nevents == ARRAY_SIZE(events)) {
165  if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
166  abort();
167  nevents = 0;
168  }
169  }
170 
171  if ((w->events & POLLOUT) == 0 && (w->pevents & POLLOUT) != 0) {
172  EV_SET(events + nevents, w->fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
173 
174  if (++nevents == ARRAY_SIZE(events)) {
175  if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
176  abort();
177  nevents = 0;
178  }
179  }
180 
181  if ((w->events & UV__POLLPRI) == 0 && (w->pevents & UV__POLLPRI) != 0) {
182  EV_SET(events + nevents, w->fd, EV_OOBAND, EV_ADD, 0, 0, 0);
183 
184  if (++nevents == ARRAY_SIZE(events)) {
185  if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
186  abort();
187  nevents = 0;
188  }
189  }
190 
191  w->events = w->pevents;
192  }
193 
194  pset = NULL;
195  if (loop->flags & UV_LOOP_BLOCK_SIGPROF) {
196  pset = &set;
197  sigemptyset(pset);
198  sigaddset(pset, SIGPROF);
199  }
200 
201  assert(timeout >= -1);
202  base = loop->time;
203  count = 48; /* Benchmarks suggest this gives the best throughput. */
204 
205  for (;; nevents = 0) {
206  if (timeout != -1) {
207  spec.tv_sec = timeout / 1000;
208  spec.tv_nsec = (timeout % 1000) * 1000000;
209  }
210 
211  if (pset != NULL)
212  pthread_sigmask(SIG_BLOCK, pset, NULL);
213 
214  nfds = kevent(loop->backend_fd,
215  events,
216  nevents,
217  events,
218  ARRAY_SIZE(events),
219  timeout == -1 ? NULL : &spec);
220 
221  if (pset != NULL)
222  pthread_sigmask(SIG_UNBLOCK, pset, NULL);
223 
224  /* Update loop->time unconditionally. It's tempting to skip the update when
225  * timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the
226  * operating system didn't reschedule our process while in the syscall.
227  */
228  SAVE_ERRNO(uv__update_time(loop));
229 
230  if (nfds == 0) {
231  assert(timeout != -1);
232  return;
233  }
234 
235  if (nfds == -1) {
236  if (errno != EINTR)
237  abort();
238 
239  if (timeout == 0)
240  return;
241 
242  if (timeout == -1)
243  continue;
244 
245  /* Interrupted by a signal. Update timeout and poll again. */
246  goto update_timeout;
247  }
248 
249  have_signals = 0;
250  nevents = 0;
251 
252  assert(loop->watchers != NULL);
253  loop->watchers[loop->nwatchers] = (void*) events;
254  loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
255  for (i = 0; i < nfds; i++) {
256  ev = events + i;
257  fd = ev->ident;
258  /* Skip invalidated events, see uv__platform_invalidate_fd */
259  if (fd == -1)
260  continue;
261  w = loop->watchers[fd];
262 
263  if (w == NULL) {
264  /* File descriptor that we've stopped watching, disarm it.
265  * TODO: batch up. */
266  struct kevent events[1];
267 
268  EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
269  if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL))
270  if (errno != EBADF && errno != ENOENT)
271  abort();
272 
273  continue;
274  }
275 
276  if (ev->filter == EVFILT_VNODE) {
277  assert(w->events == POLLIN);
278  assert(w->pevents == POLLIN);
279  w->cb(loop, w, ev->fflags); /* XXX always uv__fs_event() */
280  nevents++;
281  continue;
282  }
283 
284  revents = 0;
285 
286  if (ev->filter == EVFILT_READ) {
287  if (w->pevents & POLLIN) {
288  revents |= POLLIN;
289  w->rcount = ev->data;
290  } else {
291  /* TODO batch up */
292  struct kevent events[1];
293  EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
294  if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL))
295  if (errno != ENOENT)
296  abort();
297  }
298  }
299 
300  if (ev->filter == EV_OOBAND) {
301  if (w->pevents & UV__POLLPRI) {
302  revents |= UV__POLLPRI;
303  w->rcount = ev->data;
304  } else {
305  /* TODO batch up */
306  struct kevent events[1];
307  EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
308  if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL))
309  if (errno != ENOENT)
310  abort();
311  }
312  }
313 
314  if (ev->filter == EVFILT_WRITE) {
315  if (w->pevents & POLLOUT) {
316  revents |= POLLOUT;
317  w->wcount = ev->data;
318  } else {
319  /* TODO batch up */
320  struct kevent events[1];
321  EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
322  if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL))
323  if (errno != ENOENT)
324  abort();
325  }
326  }
327 
328  if (ev->flags & EV_ERROR)
329  revents |= POLLERR;
330 
331  if ((ev->flags & EV_EOF) && (w->pevents & UV__POLLRDHUP))
332  revents |= UV__POLLRDHUP;
333 
334  if (revents == 0)
335  continue;
336 
337  /* Run signal watchers last. This also affects child process watchers
338  * because those are implemented in terms of signal watchers.
339  */
340  if (w == &loop->signal_io_watcher)
341  have_signals = 1;
342  else
343  w->cb(loop, w, revents);
344 
345  nevents++;
346  }
347 
348  if (have_signals != 0)
349  loop->signal_io_watcher.cb(loop, &loop->signal_io_watcher, POLLIN);
350 
351  loop->watchers[loop->nwatchers] = NULL;
352  loop->watchers[loop->nwatchers + 1] = NULL;
353 
354  if (have_signals != 0)
355  return; /* Event loop should cycle now so don't poll again. */
356 
357  if (nevents != 0) {
358  if (nfds == ARRAY_SIZE(events) && --count != 0) {
359  /* Poll for more events but don't block this time. */
360  timeout = 0;
361  continue;
362  }
363  return;
364  }
365 
366  if (timeout == 0)
367  return;
368 
369  if (timeout == -1)
370  continue;
371 
372 update_timeout:
373  assert(timeout > 0);
374 
375  diff = loop->time - base;
376  if (diff >= (uint64_t) timeout)
377  return;
378 
379  timeout -= diff;
380  }
381 }
382 
383 
385  struct kevent* events;
386  uintptr_t i;
387  uintptr_t nfds;
388 
389  assert(loop->watchers != NULL);
390  assert(fd >= 0);
391 
392  events = (struct kevent*) loop->watchers[loop->nwatchers];
393  nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1];
394  if (events == NULL)
395  return;
396 
397  /* Invalidate events with same file descriptor */
398  for (i = 0; i < nfds; i++)
399  if ((int) events[i].ident == fd)
400  events[i].ident = -1;
401 }
402 
403 
404 static void uv__fs_event(uv_loop_t* loop, uv__io_t* w, unsigned int fflags) {
406  struct kevent ev;
407  int events;
408  const char* path;
409 #if defined(F_GETPATH)
410  /* MAXPATHLEN == PATH_MAX but the former is what XNU calls it internally. */
411  char pathbuf[MAXPATHLEN];
412 #endif
413 
414  handle = container_of(w, uv_fs_event_t, event_watcher);
415 
416  if (fflags & (NOTE_ATTRIB | NOTE_EXTEND))
417  events = UV_CHANGE;
418  else
419  events = UV_RENAME;
420 
421  path = NULL;
422 #if defined(F_GETPATH)
423  /* Also works when the file has been unlinked from the file system. Passing
424  * in the path when the file has been deleted is arguably a little strange
425  * but it's consistent with what the inotify backend does.
426  */
427  if (fcntl(handle->event_watcher.fd, F_GETPATH, pathbuf) == 0)
428  path = uv__basename_r(pathbuf);
429 #endif
430  handle->cb(handle, path, events, 0);
431 
432  if (handle->event_watcher.fd == -1)
433  return;
434 
435  /* Watcher operates in one-shot mode, re-arm it. */
436  fflags = NOTE_ATTRIB | NOTE_WRITE | NOTE_RENAME
437  | NOTE_DELETE | NOTE_EXTEND | NOTE_REVOKE;
438 
439  EV_SET(&ev, w->fd, EVFILT_VNODE, EV_ADD | EV_ONESHOT, fflags, 0, 0);
440 
441  if (kevent(loop->backend_fd, &ev, 1, NULL, 0, NULL))
442  abort();
443 }
444 
445 
447  uv__handle_init(loop, (uv_handle_t*)handle, UV_FS_EVENT);
448  return 0;
449 }
450 
451 
454  const char* path,
455  unsigned int flags) {
456  int fd;
457 #if defined(__APPLE__) && MAC_OS_X_VERSION_MAX_ALLOWED >= 1070
458  struct stat statbuf;
459 #endif
460 
461  if (uv__is_active(handle))
462  return UV_EINVAL;
463 
464  handle->cb = cb;
465  handle->path = uv__strdup(path);
466  if (handle->path == NULL)
467  return UV_ENOMEM;
468 
469  /* TODO open asynchronously - but how do we report back errors? */
470  fd = open(handle->path, O_RDONLY);
471  if (fd == -1) {
472  uv__free(handle->path);
473  handle->path = NULL;
474  return UV__ERR(errno);
475  }
476 
477 #if defined(__APPLE__) && MAC_OS_X_VERSION_MAX_ALLOWED >= 1070
478  /* Nullify field to perform checks later */
479  handle->cf_cb = NULL;
480  handle->realpath = NULL;
481  handle->realpath_len = 0;
482  handle->cf_flags = flags;
483 
484  if (fstat(fd, &statbuf))
485  goto fallback;
486  /* FSEvents works only with directories */
487  if (!(statbuf.st_mode & S_IFDIR))
488  goto fallback;
489 
490  if (!uv__has_forked_with_cfrunloop) {
491  int r;
492  /* The fallback fd is no longer needed */
494  handle->event_watcher.fd = -1;
496  if (r == 0) {
498  } else {
499  uv__free(handle->path);
500  handle->path = NULL;
501  }
502  return r;
503  }
504 fallback:
505 #endif /* #if defined(__APPLE__) && MAC_OS_X_VERSION_MAX_ALLOWED >= 1070 */
506 
508  uv__io_init(&handle->event_watcher, uv__fs_event, fd);
509  uv__io_start(handle->loop, &handle->event_watcher, POLLIN);
510 
511  return 0;
512 }
513 
514 
516  int r;
517  r = 0;
518 
519  if (!uv__is_active(handle))
520  return 0;
521 
523 
524 #if defined(__APPLE__) && MAC_OS_X_VERSION_MAX_ALLOWED >= 1070
525  if (!uv__has_forked_with_cfrunloop && handle->cf_cb != NULL)
527 #endif
528 
529  if (handle->event_watcher.fd != -1) {
530  uv__io_close(handle->loop, &handle->event_watcher);
531  uv__close(handle->event_watcher.fd);
532  handle->event_watcher.fd = -1;
533  }
534 
535  uv__free(handle->path);
536  handle->path = NULL;
537 
538  return r;
539 }
540 
541 
544 }
uv__is_active
#define uv__is_active(h)
Definition: uv-common.h:235
async_greeter_server_with_graceful_shutdown.loop
loop
Definition: async_greeter_server_with_graceful_shutdown.py:59
uv_fs_event_s
Definition: uv.h:1533
EV_OOBAND
#define EV_OOBAND
Definition: kqueue.c:45
uv__fsevents_init
int uv__fsevents_init(uv_fs_event_t *handle)
Definition: fsevents.c:29
ARRAY_SIZE
#define ARRAY_SIZE(array)
Definition: bloaty.cc:101
uv__close_nocheckstdio
int uv__close_nocheckstdio(int fd)
Definition: unix/core.c:538
string.h
error_ref_leak.err
err
Definition: error_ref_leak.py:35
uv_fs_event_init
int uv_fs_event_init(uv_loop_t *loop, uv_fs_event_t *handle)
Definition: kqueue.c:446
QUEUE_HEAD
#define QUEUE_HEAD(q)
Definition: queue.h:42
check_documentation.path
path
Definition: check_documentation.py:57
UV__POLLPRI
#define UV__POLLPRI
Definition: third_party/libuv/src/unix/internal.h:126
uv__io_init
void uv__io_init(uv__io_t *w, uv__io_cb cb, int fd)
Definition: unix/core.c:853
QUEUE_DATA
#define QUEUE_DATA(ptr, type, field)
Definition: queue.h:30
container_of
#define container_of(ptr, type, member)
Definition: uv-common.h:57
run_interop_tests.spec
def spec
Definition: run_interop_tests.py:1394
uv__io_s::fd
int fd
Definition: unix.h:100
uv__io_s::events
unsigned int events
Definition: unix.h:99
uv__platform_invalidate_fd
void uv__platform_invalidate_fd(uv_loop_t *loop, int fd)
Definition: kqueue.c:384
QUEUE_INIT
#define QUEUE_INIT(q)
Definition: queue.h:45
uv__cloexec
#define uv__cloexec
Definition: third_party/libuv/src/unix/internal.h:173
uv__io_poll
void uv__io_poll(uv_loop_t *loop, int timeout)
Definition: kqueue.c:112
uv__strdup
char * uv__strdup(const char *s)
Definition: uv-common.c:55
uv__io_start
void uv__io_start(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: unix/core.c:870
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
bm_diff.diff
diff
Definition: bm_diff.py:274
UV_RENAME
@ UV_RENAME
Definition: uv.h:1528
uv__free
void uv__free(void *ptr)
Definition: uv-common.c:81
UV__ERR
#define UV__ERR(x)
Definition: errno.h:29
uv__fs_event
static void uv__fs_event(uv_loop_t *loop, uv__io_t *w, unsigned int fflags)
Definition: kqueue.c:404
gen_synthetic_protos.base
base
Definition: gen_synthetic_protos.py:31
uintptr_t
_W64 unsigned int uintptr_t
Definition: stdint-msvc2008.h:119
uv__io_check_fd
int uv__io_check_fd(uv_loop_t *loop, int fd)
Definition: kqueue.c:94
uv_fs_event_stop
int uv_fs_event_stop(uv_fs_event_t *handle)
Definition: kqueue.c:515
QUEUE_REMOVE
#define QUEUE_REMOVE(q)
Definition: queue.h:101
uv__handle_init
#define uv__handle_init(loop_, h, type_)
Definition: uv-common.h:284
QUEUE_EMPTY
#define QUEUE_EMPTY(q)
Definition: queue.h:39
UV_CHANGE
@ UV_CHANGE
Definition: uv.h:1529
uv__io_fork
int uv__io_fork(uv_loop_t *loop)
Definition: kqueue.c:66
uv.h
internal.h
UV__POLLRDHUP
#define UV__POLLRDHUP
Definition: third_party/libuv/src/unix/internal.h:120
absl::flags_internal
Definition: abseil-cpp/absl/flags/commandlineflag.h:40
count
int * count
Definition: bloaty/third_party/googletest/googlemock/test/gmock_stress_test.cc:96
uv__io_s::cb
uv__io_cb cb
Definition: unix.h:95
uv_fs_event_start
int uv_fs_event_start(uv_fs_event_t *handle, uv_fs_event_cb cb, const char *path, unsigned int flags)
Definition: kqueue.c:452
uv__fsevents_close
int uv__fsevents_close(uv_fs_event_t *handle)
Definition: fsevents.c:34
uv__io_close
void uv__io_close(uv_loop_t *loop, uv__io_t *w)
Definition: unix/core.c:930
UV_LOOP_BLOCK_SIGPROF
@ UV_LOOP_BLOCK_SIGPROF
Definition: third_party/libuv/src/unix/internal.h:141
uv_fs_event_cb
void(* uv_fs_event_cb)(uv_fs_event_t *handle, const char *filename, int events, int status)
Definition: uv.h:366
fix_build_deps.r
r
Definition: fix_build_deps.py:491
SAVE_ERRNO
#define SAVE_ERRNO(block)
Definition: third_party/libuv/src/unix/internal.h:94
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
uv__io_s::pevents
unsigned int pevents
Definition: unix.h:98
open
#define open
Definition: test-fs.c:46
stat
#define stat
Definition: test-fs.c:50
handle
static csh handle
Definition: test_arm_regression.c:16
uv_handle_s
Definition: uv.h:441
uv_loop_s
Definition: uv.h:1767
flags
uint32_t flags
Definition: retry_filter.cc:632
uv__kqueue_init
int uv__kqueue_init(uv_loop_t *loop)
Definition: kqueue.c:51
uv__handle_start
#define uv__handle_start(h)
Definition: uv-common.h:241
uv__io_s
Definition: unix.h:94
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
uv__handle_stop
#define uv__handle_stop(h)
Definition: uv-common.h:249
uv__fs_event_close
void uv__fs_event_close(uv_fs_event_t *handle)
Definition: kqueue.c:542
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
errno.h
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
QUEUE
void * QUEUE[2]
Definition: queue.h:21
uv__close
int uv__close(int fd)
Definition: unix/core.c:557


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:27