unix/async.c
Go to the documentation of this file.
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  * Permission is hereby granted, free of charge, to any person obtaining a copy
3  * of this software and associated documentation files (the "Software"), to
4  * deal in the Software without restriction, including without limitation the
5  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
6  * sell copies of the Software, and to permit persons to whom the Software is
7  * furnished to do so, subject to the following conditions:
8  *
9  * The above copyright notice and this permission notice shall be included in
10  * all copies or substantial portions of the Software.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
15  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
17  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
18  * IN THE SOFTWARE.
19  */
20 
21 /* This file contains both the uv__async internal infrastructure and the
22  * user-facing uv_async_t functions.
23  */
24 
25 #include "uv.h"
26 #include "internal.h"
27 #include "atomic-ops.h"
28 
29 #include <errno.h>
30 #include <stdio.h> /* snprintf() */
31 #include <assert.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <unistd.h>
35 
36 #ifdef __linux__
37 #include <sys/eventfd.h>
38 #endif
39 
40 static void uv__async_send(uv_loop_t* loop);
41 static int uv__async_start(uv_loop_t* loop);
42 
43 
45  int err;
46 
48  if (err)
49  return err;
50 
51  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
52  handle->async_cb = async_cb;
53  handle->pending = 0;
54 
55  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
57 
58  return 0;
59 }
60 
61 
63  /* Do a cheap read first. */
64  if (ACCESS_ONCE(int, handle->pending) != 0)
65  return 0;
66 
67  /* Tell the other thread we're busy with the handle. */
68  if (cmpxchgi(&handle->pending, 0, 1) != 0)
69  return 0;
70 
71  /* Wake up the other thread's event loop. */
72  uv__async_send(handle->loop);
73 
74  /* Tell the other thread we're done. */
75  if (cmpxchgi(&handle->pending, 1, 2) != 1)
76  abort();
77 
78  return 0;
79 }
80 
81 
82 /* Only call this from the event loop thread. */
84  int rc;
85 
86  for (;;) {
87  /* rc=0 -- handle is not pending.
88  * rc=1 -- handle is pending, other thread is still working with it.
89  * rc=2 -- handle is pending, other thread is done.
90  */
91  rc = cmpxchgi(&handle->pending, 2, 0);
92 
93  if (rc != 1)
94  return rc;
95 
96  /* Other thread is busy with this handle, spin until it's done. */
97  cpu_relax();
98  }
99 }
100 
101 
104  QUEUE_REMOVE(&handle->queue);
106 }
107 
108 
109 static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
110  char buf[1024];
111  ssize_t r;
112  QUEUE queue;
113  QUEUE* q;
114  uv_async_t* h;
115 
116  assert(w == &loop->async_io_watcher);
117 
118  for (;;) {
119  r = read(w->fd, buf, sizeof(buf));
120 
121  if (r == sizeof(buf))
122  continue;
123 
124  if (r != -1)
125  break;
126 
127  if (errno == EAGAIN || errno == EWOULDBLOCK)
128  break;
129 
130  if (errno == EINTR)
131  continue;
132 
133  abort();
134  }
135 
136  QUEUE_MOVE(&loop->async_handles, &queue);
137  while (!QUEUE_EMPTY(&queue)) {
138  q = QUEUE_HEAD(&queue);
139  h = QUEUE_DATA(q, uv_async_t, queue);
140 
141  QUEUE_REMOVE(q);
142  QUEUE_INSERT_TAIL(&loop->async_handles, q);
143 
144  if (0 == uv__async_spin(h))
145  continue; /* Not pending. */
146 
147  if (h->async_cb == NULL)
148  continue;
149 
150  h->async_cb(h);
151  }
152 }
153 
154 
156  const void* buf;
157  ssize_t len;
158  int fd;
159  int r;
160 
161  buf = "";
162  len = 1;
163  fd = loop->async_wfd;
164 
165 #if defined(__linux__)
166  if (fd == -1) {
167  static const uint64_t val = 1;
168  buf = &val;
169  len = sizeof(val);
170  fd = loop->async_io_watcher.fd; /* eventfd */
171  }
172 #endif
173 
174  do
175  r = write(fd, buf, len);
176  while (r == -1 && errno == EINTR);
177 
178  if (r == len)
179  return;
180 
181  if (r == -1)
182  if (errno == EAGAIN || errno == EWOULDBLOCK)
183  return;
184 
185  abort();
186 }
187 
188 
190  int pipefd[2];
191  int err;
192 
193  if (loop->async_io_watcher.fd != -1)
194  return 0;
195 
196 #ifdef __linux__
197  err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
198  if (err < 0)
199  return UV__ERR(errno);
200 
201  pipefd[0] = err;
202  pipefd[1] = -1;
203 #else
204  err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
205  if (err < 0)
206  return err;
207 #endif
208 
209  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
210  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
211  loop->async_wfd = pipefd[1];
212 
213  return 0;
214 }
215 
216 
218  if (loop->async_io_watcher.fd == -1) /* never started */
219  return 0;
220 
222 
223  return uv__async_start(loop);
224 }
225 
226 
228  if (loop->async_io_watcher.fd == -1)
229  return;
230 
231  if (loop->async_wfd != -1) {
232  if (loop->async_wfd != loop->async_io_watcher.fd)
233  uv__close(loop->async_wfd);
234  loop->async_wfd = -1;
235  }
236 
237  uv__io_stop(loop, &loop->async_io_watcher, POLLIN);
238  uv__close(loop->async_io_watcher.fd);
239  loop->async_io_watcher.fd = -1;
240 }
atomic-ops.h
async_greeter_server_with_graceful_shutdown.loop
loop
Definition: async_greeter_server_with_graceful_shutdown.py:59
write
#define write
Definition: test-fs.c:47
string.h
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
error_ref_leak.err
err
Definition: error_ref_leak.py:35
QUEUE_HEAD
#define QUEUE_HEAD(q)
Definition: queue.h:42
uv__async_spin
static int uv__async_spin(uv_async_t *handle)
Definition: unix/async.c:83
uv__io_init
void uv__io_init(uv__io_t *w, uv__io_cb cb, int fd)
Definition: unix/core.c:853
QUEUE_DATA
#define QUEUE_DATA(ptr, type, field)
Definition: queue.h:30
uv__io_s::fd
int fd
Definition: unix.h:100
QUEUE_MOVE
#define QUEUE_MOVE(h, n)
Definition: queue.h:72
UV__F_NONBLOCK
#define UV__F_NONBLOCK
Definition: third_party/libuv/src/unix/internal.h:291
uv__async_stop
void uv__async_stop(uv_loop_t *loop)
Definition: unix/async.c:227
uv__async_close
void uv__async_close(uv_async_t *handle)
Definition: unix/async.c:102
uv_async_s
Definition: uv.h:834
ACCESS_ONCE
#define ACCESS_ONCE(type, var)
Definition: third_party/libuv/src/unix/internal.h:81
ssize_t
intptr_t ssize_t
Definition: win.h:27
queue
Definition: sync_test.cc:39
uv__async_send
static void uv__async_send(uv_loop_t *loop)
Definition: unix/async.c:155
uv__io_start
void uv__io_start(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: unix/core.c:870
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
uv__make_pipe
int uv__make_pipe(int fds[2], int flags)
Definition: unix/process.c:142
uv__async_start
static int uv__async_start(uv_loop_t *loop)
Definition: unix/async.c:189
UV__ERR
#define UV__ERR(x)
Definition: errno.h:29
uv__async_io
static void uv__async_io(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: unix/async.c:109
QUEUE_REMOVE
#define QUEUE_REMOVE(q)
Definition: queue.h:101
uv__handle_init
#define uv__handle_init(loop_, h, type_)
Definition: uv-common.h:284
QUEUE_EMPTY
#define QUEUE_EMPTY(q)
Definition: queue.h:39
uv_async_send
int uv_async_send(uv_async_t *handle)
Definition: unix/async.c:62
uv.h
async_cb
static void async_cb(uv_async_t *handle)
Definition: benchmark-async-pummel.c:39
queue
struct queue queue
internal.h
read
int read(izstream &zs, T *x, Items items)
Definition: bloaty/third_party/zlib/contrib/iostream2/zstream.h:115
uv_async_cb
void(* uv_async_cb)(uv_async_t *handle)
Definition: uv.h:319
fix_build_deps.r
r
Definition: fix_build_deps.py:491
uv__async_fork
int uv__async_fork(uv_loop_t *loop)
Definition: unix/async.c:217
uv__io_stop
void uv__io_stop(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: unix/core.c:898
handle
static csh handle
Definition: test_arm_regression.c:16
uv_handle_s
Definition: uv.h:441
uv_loop_s
Definition: uv.h:1767
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
uv__handle_start
#define uv__handle_start(h)
Definition: uv-common.h:241
uv_async_init
int uv_async_init(uv_loop_t *loop, uv_async_t *handle, uv_async_cb async_cb)
Definition: unix/async.c:44
uv__io_s
Definition: unix.h:94
uv__handle_stop
#define uv__handle_stop(h)
Definition: uv-common.h:249
QUEUE_INSERT_TAIL
#define QUEUE_INSERT_TAIL(h, q)
Definition: queue.h:92
errno.h
QUEUE
void * QUEUE[2]
Definition: queue.h:21
uv__close
int uv__close(int fd)
Definition: unix/core.c:557


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:35