threadpool.c
Go to the documentation of this file.
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv-common.h"
23 
24 #if !defined(_WIN32)
25 # include "unix/internal.h"
26 #endif
27 
28 #include <stdlib.h>
29 
30 #define MAX_THREADPOOL_SIZE 1024
31 
33 static uv_cond_t cond;
35 static unsigned int idle_threads;
36 static unsigned int slow_io_work_running;
37 static unsigned int nthreads;
41 static QUEUE wq;
44 
45 static unsigned int slow_work_thread_threshold(void) {
46  return (nthreads + 1) / 2;
47 }
48 
49 static void uv__cancelled(struct uv__work* w) {
50  abort();
51 }
52 
53 
54 /* To avoid deadlock with uv_cancel() it's crucial that the worker
55  * never holds the global mutex and the loop-local mutex at the same time.
56  */
57 static void worker(void* arg) {
58  struct uv__work* w;
59  QUEUE* q;
60  int is_slow_work;
61 
63  arg = NULL;
64 
66  for (;;) {
67  /* `mutex` should always be locked at this point. */
68 
69  /* Keep waiting while either no work is present or only slow I/O
70  and we're at the threshold for that. */
71  while (QUEUE_EMPTY(&wq) ||
75  idle_threads += 1;
77  idle_threads -= 1;
78  }
79 
80  q = QUEUE_HEAD(&wq);
81  if (q == &exit_message) {
84  break;
85  }
86 
87  QUEUE_REMOVE(q);
88  QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
89 
90  is_slow_work = 0;
91  if (q == &run_slow_work_message) {
92  /* If we're at the slow I/O threshold, re-schedule until after all
93  other work in the queue is done. */
95  QUEUE_INSERT_TAIL(&wq, q);
96  continue;
97  }
98 
99  /* If we encountered a request to run slow I/O work but there is none
100  to run, that means it's cancelled => Start over. */
102  continue;
103 
104  is_slow_work = 1;
106 
108  QUEUE_REMOVE(q);
109  QUEUE_INIT(q);
110 
111  /* If there is more slow I/O work, schedule it to be run as well. */
114  if (idle_threads > 0)
116  }
117  }
118 
120 
121  w = QUEUE_DATA(q, struct uv__work, wq);
122  w->work(w);
123 
124  uv_mutex_lock(&w->loop->wq_mutex);
125  w->work = NULL; /* Signal uv_cancel() that the work req is done
126  executing. */
127  QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
128  uv_async_send(&w->loop->wq_async);
129  uv_mutex_unlock(&w->loop->wq_mutex);
130 
131  /* Lock `mutex` since that is expected at the start of the next
132  * iteration. */
134  if (is_slow_work) {
135  /* `slow_io_work_running` is protected by `mutex`. */
137  }
138  }
139 }
140 
141 
142 static void post(QUEUE* q, enum uv__work_kind kind) {
144  if (kind == UV__WORK_SLOW_IO) {
145  /* Insert into a separate queue. */
148  /* Running slow I/O tasks is already scheduled => Nothing to do here.
149  The worker that runs said other task will schedule this one as well. */
151  return;
152  }
154  }
155 
156  QUEUE_INSERT_TAIL(&wq, q);
157  if (idle_threads > 0)
160 }
161 
162 
163 #ifndef _WIN32
164 UV_DESTRUCTOR(static void cleanup(void)) {
165  unsigned int i;
166 
167  if (nthreads == 0)
168  return;
169 
171 
172  for (i = 0; i < nthreads; i++)
173  if (uv_thread_join(threads + i))
174  abort();
175 
176  if (threads != default_threads)
177  uv__free(threads);
178 
181 
182  threads = NULL;
183  nthreads = 0;
184 }
185 #endif
186 
187 
188 static void init_threads(void) {
189  unsigned int i;
190  const char* val;
191  uv_sem_t sem;
192 
194  val = getenv("UV_THREADPOOL_SIZE");
195  if (val != NULL)
196  nthreads = atoi(val);
197  if (nthreads == 0)
198  nthreads = 1;
201 
204  threads = uv__malloc(nthreads * sizeof(threads[0]));
205  if (threads == NULL) {
208  }
209  }
210 
211  if (uv_cond_init(&cond))
212  abort();
213 
214  if (uv_mutex_init(&mutex))
215  abort();
216 
217  QUEUE_INIT(&wq);
220 
221  if (uv_sem_init(&sem, 0))
222  abort();
223 
224  for (i = 0; i < nthreads; i++)
225  if (uv_thread_create(threads + i, worker, &sem))
226  abort();
227 
228  for (i = 0; i < nthreads; i++)
229  uv_sem_wait(&sem);
230 
232 }
233 
234 
235 #ifndef _WIN32
236 static void reset_once(void) {
237  uv_once_t child_once = UV_ONCE_INIT;
238  memcpy(&once, &child_once, sizeof(child_once));
239 }
240 #endif
241 
242 
243 static void init_once(void) {
244 #ifndef _WIN32
245  /* Re-initialize the threadpool after fork.
246  * Note that this discards the global mutex and condition as well
247  * as the work queue.
248  */
249  if (pthread_atfork(NULL, NULL, &reset_once))
250  abort();
251 #endif
252  init_threads();
253 }
254 
255 
257  struct uv__work* w,
258  enum uv__work_kind kind,
259  void (*work)(struct uv__work* w),
260  void (*done)(struct uv__work* w, int status)) {
262  w->loop = loop;
263  w->work = work;
264  w->done = done;
265  post(&w->wq, kind);
266 }
267 
268 
269 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
270  int cancelled;
271 
273  uv_mutex_lock(&w->loop->wq_mutex);
274 
275  cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
276  if (cancelled)
277  QUEUE_REMOVE(&w->wq);
278 
279  uv_mutex_unlock(&w->loop->wq_mutex);
281 
282  if (!cancelled)
283  return UV_EBUSY;
284 
285  w->work = uv__cancelled;
286  uv_mutex_lock(&loop->wq_mutex);
287  QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
288  uv_async_send(&loop->wq_async);
289  uv_mutex_unlock(&loop->wq_mutex);
290 
291  return 0;
292 }
293 
294 
296  struct uv__work* w;
297  uv_loop_t* loop;
298  QUEUE* q;
299  QUEUE wq;
300  int err;
301 
302  loop = container_of(handle, uv_loop_t, wq_async);
303  uv_mutex_lock(&loop->wq_mutex);
304  QUEUE_MOVE(&loop->wq, &wq);
305  uv_mutex_unlock(&loop->wq_mutex);
306 
307  while (!QUEUE_EMPTY(&wq)) {
308  q = QUEUE_HEAD(&wq);
309  QUEUE_REMOVE(q);
310 
311  w = container_of(q, struct uv__work, wq);
312  err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
313  w->done(w, err);
314  }
315 }
316 
317 
318 static void uv__queue_work(struct uv__work* w) {
320 
321  req->work_cb(req);
322 }
323 
324 
325 static void uv__queue_done(struct uv__work* w, int err) {
326  uv_work_t* req;
327 
329  uv__req_unregister(req->loop, req);
330 
331  if (req->after_work_cb == NULL)
332  return;
333 
334  req->after_work_cb(req, err);
335 }
336 
337 
339  uv_work_t* req,
342  if (work_cb == NULL)
343  return UV_EINVAL;
344 
345  uv__req_init(loop, req, UV_WORK);
346  req->loop = loop;
347  req->work_cb = work_cb;
348  req->after_work_cb = after_work_cb;
350  &req->work_req,
351  UV__WORK_CPU,
354  return 0;
355 }
356 
357 
359  struct uv__work* wreq;
360  uv_loop_t* loop;
361 
362  switch (req->type) {
363  case UV_FS:
364  loop = ((uv_fs_t*) req)->loop;
365  wreq = &((uv_fs_t*) req)->work_req;
366  break;
367  case UV_GETADDRINFO:
368  loop = ((uv_getaddrinfo_t*) req)->loop;
369  wreq = &((uv_getaddrinfo_t*) req)->work_req;
370  break;
371  case UV_GETNAMEINFO:
372  loop = ((uv_getnameinfo_t*) req)->loop;
373  wreq = &((uv_getnameinfo_t*) req)->work_req;
374  break;
375  case UV_RANDOM:
376  loop = ((uv_random_t*) req)->loop;
377  wreq = &((uv_random_t*) req)->work_req;
378  break;
379  case UV_WORK:
380  loop = ((uv_work_t*) req)->loop;
381  wreq = &((uv_work_t*) req)->work_req;
382  break;
383  default:
384  return UV_EINVAL;
385  }
386 
387  return uv__work_cancel(loop, req, wreq);
388 }
async_greeter_server_with_graceful_shutdown.loop
loop
Definition: async_greeter_server_with_graceful_shutdown.py:59
sem
static uv_sem_t sem
Definition: test-signal-multiple-loops.c:52
slow_io_work_running
static unsigned int slow_io_work_running
Definition: threadpool.c:36
uv__work
Definition: third_party/libuv/include/uv/threadpool.h:30
ARRAY_SIZE
#define ARRAY_SIZE(array)
Definition: bloaty.cc:101
cleanup
void cleanup(void)
Definition: bloaty/third_party/zlib/examples/enough.c:182
reset_once
static void reset_once(void)
Definition: threadpool.c:236
uv__work_done
void uv__work_done(uv_async_t *handle)
Definition: threadpool.c:295
post
static void post(QUEUE *q, enum uv__work_kind kind)
Definition: threadpool.c:142
uv__req_init
#define uv__req_init(loop, req, typ)
Definition: uv-common.h:312
mutex
static uv_mutex_t mutex
Definition: threadpool.c:34
uv__malloc
void * uv__malloc(size_t size)
Definition: uv-common.c:75
uv_mutex_init
UV_EXTERN int uv_mutex_init(uv_mutex_t *handle)
Definition: libuv/src/unix/thread.c:281
worker
static void worker(void *arg)
Definition: threadpool.c:57
uv_getaddrinfo_s
Definition: uv.h:871
uv-common.h
uv_mutex_destroy
UV_EXTERN void uv_mutex_destroy(uv_mutex_t *handle)
Definition: libuv/src/unix/thread.c:323
uv_cancel
int uv_cancel(uv_req_t *req)
Definition: threadpool.c:358
error_ref_leak.err
err
Definition: error_ref_leak.py:35
QUEUE_HEAD
#define QUEUE_HEAD(q)
Definition: queue.h:42
status
absl::Status status
Definition: rls.cc:251
uv__work::done
void(* done)(struct uv__work *w, int status)
Definition: third_party/libuv/include/uv/threadpool.h:32
uv_getnameinfo_s
Definition: uv.h:894
uv_thread_join
UV_EXTERN int uv_thread_join(uv_thread_t *tid)
Definition: libuv/src/unix/thread.c:271
QUEUE_DATA
#define QUEUE_DATA(ptr, type, field)
Definition: queue.h:30
container_of
#define container_of(ptr, type, member)
Definition: uv-common.h:57
once
static uv_once_t once
Definition: threadpool.c:32
uv_fs_s
Definition: uv.h:1294
uv_cond_wait
UV_EXTERN void uv_cond_wait(uv_cond_t *cond, uv_mutex_t *mutex)
Definition: libuv/src/unix/thread.c:780
threads
static uv_thread_t * threads
Definition: threadpool.c:38
QUEUE_MOVE
#define QUEUE_MOVE(h, n)
Definition: queue.h:72
uv__queue_done
static void uv__queue_done(struct uv__work *w, int err)
Definition: threadpool.c:325
uv_random_s
Definition: uv.h:1631
uv_cond_t
Definition: win.h:249
QUEUE_INIT
#define QUEUE_INIT(q)
Definition: queue.h:45
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
uv_async_s
Definition: uv.h:834
init_once
static void init_once(void)
Definition: threadpool.c:243
wq
static QUEUE wq
Definition: threadpool.c:41
uv_once
UV_EXTERN void uv_once(uv_once_t *guard, void(*callback)(void))
Definition: libuv/src/unix/thread.c:418
req
static uv_connect_t req
Definition: test-connection-fail.c:30
uv_cond_destroy
UV_EXTERN void uv_cond_destroy(uv_cond_t *cond)
Definition: libuv/src/unix/thread.c:736
work_cb
static void work_cb(uv_work_t *req)
Definition: test-fork.c:610
worker
Definition: worker.py:1
cond
static uv_cond_t cond
Definition: threadpool.c:33
slow_io_pending_wq
static QUEUE slow_io_pending_wq
Definition: threadpool.c:43
uv__queue_work
static void uv__queue_work(struct uv__work *w)
Definition: threadpool.c:318
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
slow_work_thread_threshold
static unsigned int slow_work_thread_threshold(void)
Definition: threadpool.c:45
arg
Definition: cmdline.cc:40
uv__work::loop
struct uv_loop_s * loop
Definition: third_party/libuv/include/uv/threadpool.h:33
uv_queue_work
int uv_queue_work(uv_loop_t *loop, uv_work_t *req, uv_work_cb work_cb, uv_after_work_cb after_work_cb)
Definition: threadpool.c:338
uv_once_t
pthread_once_t uv_once_t
Definition: unix.h:133
uv__free
void uv__free(void *ptr)
Definition: uv-common.c:81
uv_mutex_t
pthread_mutex_t uv_mutex_t
Definition: unix.h:135
nthreads
static unsigned int nthreads
Definition: threadpool.c:37
UV_DESTRUCTOR
UV_DESTRUCTOR(static void cleanup(void))
Definition: threadpool.c:164
uv_thread_create
UV_EXTERN int uv_thread_create(uv_thread_t *tid, uv_thread_cb entry, void *arg)
Definition: libuv/src/unix/thread.c:209
MAX_THREADPOOL_SIZE
#define MAX_THREADPOOL_SIZE
Definition: threadpool.c:30
UV__WORK_CPU
@ UV__WORK_CPU
Definition: uv-common.h:179
exit_message
static QUEUE exit_message
Definition: threadpool.c:40
uv_mutex_unlock
UV_EXTERN void uv_mutex_unlock(uv_mutex_t *handle)
Definition: libuv/src/unix/thread.c:349
uv_sem_t
UV_PLATFORM_SEM_T uv_sem_t
Definition: unix.h:137
uv_after_work_cb
void(* uv_after_work_cb)(uv_work_t *req, int status)
Definition: uv.h:327
QUEUE_REMOVE
#define QUEUE_REMOVE(q)
Definition: queue.h:101
uv__work::wq
void * wq[2]
Definition: third_party/libuv/include/uv/threadpool.h:34
uv__work::work
void(* work)(struct uv__work *w)
Definition: third_party/libuv/include/uv/threadpool.h:31
QUEUE_EMPTY
#define QUEUE_EMPTY(q)
Definition: queue.h:39
uv_sem_post
UV_EXTERN void uv_sem_post(uv_sem_t *sem)
Definition: libuv/src/unix/thread.c:669
uv_sem_init
UV_EXTERN int uv_sem_init(uv_sem_t *sem, unsigned int value)
Definition: libuv/src/unix/thread.c:649
idle_threads
static unsigned int idle_threads
Definition: threadpool.c:35
uv_work_cb
void(* uv_work_cb)(uv_work_t *req)
Definition: uv.h:326
run_slow_work_message
static QUEUE run_slow_work_message
Definition: threadpool.c:42
internal.h
init_threads
static void init_threads(void)
Definition: threadpool.c:188
UV__WORK_SLOW_IO
@ UV__WORK_SLOW_IO
Definition: uv-common.h:181
default_threads
static uv_thread_t default_threads[4]
Definition: threadpool.c:39
QUEUE_NEXT
#define QUEUE_NEXT(q)
Definition: queue.h:24
uv_sem_destroy
UV_EXTERN void uv_sem_destroy(uv_sem_t *sem)
Definition: libuv/src/unix/thread.c:661
uv_mutex_lock
UV_EXTERN void uv_mutex_lock(uv_mutex_t *handle)
Definition: libuv/src/unix/thread.c:329
uv__work_kind
uv__work_kind
Definition: uv-common.h:178
uv_cond_init
UV_EXTERN int uv_cond_init(uv_cond_t *cond)
Definition: libuv/src/unix/thread.c:703
uv_cond_signal
UV_EXTERN void uv_cond_signal(uv_cond_t *cond)
Definition: libuv/src/unix/thread.c:770
UV_ONCE_INIT
#define UV_ONCE_INIT
Definition: unix.h:131
handle
static csh handle
Definition: test_arm_regression.c:16
after_work_cb
static void after_work_cb(uv_work_t *req, int status)
Definition: test-fork.c:615
uv_thread_t
pthread_t uv_thread_t
Definition: unix.h:134
uv_loop_s
Definition: uv.h:1767
uv_sem_wait
UV_EXTERN void uv_sem_wait(uv_sem_t *sem)
Definition: libuv/src/unix/thread.c:677
work_req
static uv_work_t work_req
Definition: test-loop-alive.c:32
uv_work_s
Definition: uv.h:1055
uv__work_submit
void uv__work_submit(uv_loop_t *loop, struct uv__work *w, enum uv__work_kind kind, void(*work)(struct uv__work *w), void(*done)(struct uv__work *w, int status))
Definition: threadpool.c:256
getenv
#define getenv(ptr)
Definition: ares_private.h:106
uv__work_cancel
static int uv__work_cancel(uv_loop_t *loop, uv_req_t *req, struct uv__work *w)
Definition: threadpool.c:269
uv_req_s
Definition: uv.h:404
QUEUE_INSERT_TAIL
#define QUEUE_INSERT_TAIL(h, q)
Definition: queue.h:92
uv__cancelled
static void uv__cancelled(struct uv__work *w)
Definition: threadpool.c:49
uv__req_unregister
#define uv__req_unregister(loop, req)
Definition: uv-common.h:213
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
QUEUE
void * QUEUE[2]
Definition: queue.h:21
uv_async_send
UV_EXTERN int uv_async_send(uv_async_t *async)
Definition: unix/async.c:62


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:37