ev_epoll1_linux.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include <grpc/support/log.h>
22 
24 
25 /* This polling engine is only relevant on linux kernels supporting epoll
26  epoll_create() or epoll_create1() */
27 #ifdef GRPC_LINUX_EPOLL
28 #include <assert.h>
29 #include <errno.h>
30 #include <fcntl.h>
31 #include <limits.h>
32 #include <poll.h>
33 #include <pthread.h>
34 #include <string.h>
35 #include <sys/epoll.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38 
39 #include <string>
40 #include <vector>
41 
42 #include "absl/strings/str_cat.h"
43 #include "absl/strings/str_format.h"
44 #include "absl/strings/str_join.h"
45 
46 #include <grpc/support/alloc.h>
47 #include <grpc/support/cpu.h>
48 
51 #include "src/core/lib/gpr/tls.h"
61 
62 static grpc_wakeup_fd global_wakeup_fd;
63 
64 /*******************************************************************************
65  * Singleton epoll set related fields
66  */
67 
68 #define MAX_EPOLL_EVENTS 100
69 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
70 
71 /* NOTE ON SYNCHRONIZATION:
72  * - Fields in this struct are only modified by the designated poller. Hence
73  * there is no need for any locks to protect the struct.
74  * - num_events and cursor fields have to be of atomic type to provide memory
75  * visibility guarantees only. i.e In case of multiple pollers, the designated
76  * polling thread keeps changing; the thread that wrote these values may be
77  * different from the thread reading the values
78  */
79 typedef struct epoll_set {
80  int epfd;
81 
82  /* The epoll_events after the last call to epoll_wait() */
83  struct epoll_event events[MAX_EPOLL_EVENTS];
84 
85  /* The number of epoll_events after the last call to epoll_wait() */
86  gpr_atm num_events;
87 
88  /* Index of the first event in epoll_events that has to be processed. This
89  * field is only valid if num_events > 0 */
90  gpr_atm cursor;
91 } epoll_set;
92 
93 /* The global singleton epoll set */
94 static epoll_set g_epoll_set;
95 
96 static int epoll_create_and_cloexec() {
97 #ifdef GRPC_LINUX_EPOLL_CREATE1
98  int fd = epoll_create1(EPOLL_CLOEXEC);
99  if (fd < 0) {
100  gpr_log(GPR_ERROR, "epoll_create1 unavailable");
101  }
102 #else
103  int fd = epoll_create(MAX_EPOLL_EVENTS);
104  if (fd < 0) {
105  gpr_log(GPR_ERROR, "epoll_create unavailable");
106  } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
107  gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
108  return -1;
109  }
110 #endif
111  return fd;
112 }
113 
114 /* Must be called *only* once */
115 static bool epoll_set_init() {
116  g_epoll_set.epfd = epoll_create_and_cloexec();
117  if (g_epoll_set.epfd < 0) {
118  return false;
119  }
120 
121  gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
122  gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
123  gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
124  return true;
125 }
126 
127 /* epoll_set_init() MUST be called before calling this. */
128 static void epoll_set_shutdown() {
129  if (g_epoll_set.epfd >= 0) {
130  close(g_epoll_set.epfd);
131  g_epoll_set.epfd = -1;
132  }
133 }
134 
135 /*******************************************************************************
136  * Fd Declarations
137  */
138 
139 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
140 struct grpc_fork_fd_list {
141  grpc_fd* fd;
142  grpc_fd* next;
143  grpc_fd* prev;
144 };
145 
146 struct grpc_fd {
147  int fd;
148 
152 
153  struct grpc_fd* freelist_next;
154 
155  grpc_iomgr_object iomgr_object;
156 
157  /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
158  grpc_fork_fd_list* fork_fd_list;
159 };
160 
161 static void fd_global_init(void);
162 static void fd_global_shutdown(void);
163 
164 /*******************************************************************************
165  * Pollset Declarations
166  */
167 
168 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
169 
170 static const char* kick_state_string(kick_state st) {
171  switch (st) {
172  case UNKICKED:
173  return "UNKICKED";
174  case KICKED:
175  return "KICKED";
176  case DESIGNATED_POLLER:
177  return "DESIGNATED_POLLER";
178  }
179  GPR_UNREACHABLE_CODE(return "UNKNOWN");
180 }
181 
182 struct grpc_pollset_worker {
183  kick_state state;
184  int kick_state_mutator; // which line of code last changed kick state
185  bool initialized_cv;
187  grpc_pollset_worker* prev;
188  gpr_cv cv;
189  grpc_closure_list schedule_on_end_work;
190 };
191 
192 #define SET_KICK_STATE(worker, kick_state) \
193  do { \
194  (worker)->state = (kick_state); \
195  (worker)->kick_state_mutator = __LINE__; \
196  } while (false)
197 
198 #define MAX_NEIGHBORHOODS 1024u
199 
200 typedef struct pollset_neighborhood {
201  union {
202  char pad[GPR_CACHELINE_SIZE];
203  struct {
204  gpr_mu mu;
205  grpc_pollset* active_root;
206  };
207  };
208 } pollset_neighborhood;
209 
210 struct grpc_pollset {
211  gpr_mu mu;
212  pollset_neighborhood* neighborhood;
213  bool reassigning_neighborhood;
214  grpc_pollset_worker* root_worker;
215  bool kicked_without_poller;
216 
217  /* Set to true if the pollset is observed to have no workers available to
218  poll */
219  bool seen_inactive;
220  bool shutting_down; /* Is the pollset shutting down ? */
221  grpc_closure* shutdown_closure; /* Called after shutdown is complete */
222 
223  /* Number of workers who are *about-to* attach themselves to the pollset
224  * worker list */
225  int begin_refs;
226 
228  grpc_pollset* prev;
229 };
230 
231 /*******************************************************************************
232  * Pollset-set Declarations
233  */
234 
235 struct grpc_pollset_set {
236  char unused;
237 };
238 
239 /*******************************************************************************
240  * Common helpers
241  */
242 
243 static bool append_error(grpc_error_handle* composite, grpc_error_handle error,
244  const char* desc) {
245  if (GRPC_ERROR_IS_NONE(error)) return true;
246  if (GRPC_ERROR_IS_NONE(*composite)) {
248  }
249  *composite = grpc_error_add_child(*composite, error);
250  return false;
251 }
252 
253 /*******************************************************************************
254  * Fd Definitions
255  */
256 
257 /* We need to keep a freelist not because of any concerns of malloc performance
258  * but instead so that implementations with multiple threads in (for example)
259  * epoll_wait deal with the race between pollset removal and incoming poll
260  * notifications.
261  *
262  * The problem is that the poller ultimately holds a reference to this
263  * object, so it is very difficult to know when is safe to free it, at least
264  * without some expensive synchronization.
265  *
266  * If we keep the object freelisted, in the worst case losing this race just
267  * becomes a spurious read notification on a reused fd.
268  */
269 
270 /* The alarm system needs to be able to wakeup 'some poller' sometimes
271  * (specifically when a new alarm needs to be triggered earlier than the next
272  * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
273  * case occurs. */
274 
275 static grpc_fd* fd_freelist = nullptr;
276 static gpr_mu fd_freelist_mu;
277 
278 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
279 static grpc_fd* fork_fd_list_head = nullptr;
280 static gpr_mu fork_fd_list_mu;
281 
282 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
283 
284 static void fd_global_shutdown(void) {
285  // TODO(guantaol): We don't have a reasonable explanation about this
286  // lock()/unlock() pattern. It can be a valid barrier if there is at most one
287  // pending lock() at this point. Otherwise, there is still a possibility of
288  // use-after-free race. Need to reason about the code and/or clean it up.
289  gpr_mu_lock(&fd_freelist_mu);
290  gpr_mu_unlock(&fd_freelist_mu);
291  while (fd_freelist != nullptr) {
292  grpc_fd* fd = fd_freelist;
293  fd_freelist = fd_freelist->freelist_next;
294  gpr_free(fd);
295  }
296  gpr_mu_destroy(&fd_freelist_mu);
297 }
298 
299 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
300  if (grpc_core::Fork::Enabled()) {
301  gpr_mu_lock(&fork_fd_list_mu);
302  fd->fork_fd_list =
303  static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
304  fd->fork_fd_list->next = fork_fd_list_head;
305  fd->fork_fd_list->prev = nullptr;
306  if (fork_fd_list_head != nullptr) {
307  fork_fd_list_head->fork_fd_list->prev = fd;
308  }
309  fork_fd_list_head = fd;
310  gpr_mu_unlock(&fork_fd_list_mu);
311  }
312 }
313 
314 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
315  if (grpc_core::Fork::Enabled()) {
316  gpr_mu_lock(&fork_fd_list_mu);
317  if (fork_fd_list_head == fd) {
318  fork_fd_list_head = fd->fork_fd_list->next;
319  }
320  if (fd->fork_fd_list->prev != nullptr) {
321  fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
322  }
323  if (fd->fork_fd_list->next != nullptr) {
324  fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
325  }
326  gpr_free(fd->fork_fd_list);
327  gpr_mu_unlock(&fork_fd_list_mu);
328  }
329 }
330 
331 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
332  grpc_fd* new_fd = nullptr;
333 
334  gpr_mu_lock(&fd_freelist_mu);
335  if (fd_freelist != nullptr) {
336  new_fd = fd_freelist;
337  fd_freelist = fd_freelist->freelist_next;
338  }
339  gpr_mu_unlock(&fd_freelist_mu);
340 
341  if (new_fd == nullptr) {
342  new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
343  new_fd->read_closure.Init();
344  new_fd->write_closure.Init();
345  new_fd->error_closure.Init();
346  }
347  new_fd->fd = fd;
348  new_fd->read_closure->InitEvent();
349  new_fd->write_closure->InitEvent();
350  new_fd->error_closure->InitEvent();
351 
352  new_fd->freelist_next = nullptr;
353 
354  std::string fd_name = absl::StrCat(name, " fd=", fd);
355  grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name.c_str());
356  fork_fd_list_add_grpc_fd(new_fd);
357 #ifndef NDEBUG
359  gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name.c_str());
360  }
361 #endif
362 
363  struct epoll_event ev;
364  ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
365  /* Use the least significant bit of ev.data.ptr to store track_err. We expect
366  * the addresses to be word aligned. We need to store track_err to avoid
367  * synchronization issues when accessing it after receiving an event.
368  * Accessing fd would be a data race there because the fd might have been
369  * returned to the free list at that point. */
370  ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
371  (track_err ? 1 : 0));
372  if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
373  gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
374  }
375 
376  return new_fd;
377 }
378 
379 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
380 
381 /* if 'releasing_fd' is true, it means that we are going to detach the internal
382  * fd from grpc_fd structure (i.e which means we should not be calling
383  * shutdown() syscall on that fd) */
384 static void fd_shutdown_internal(grpc_fd* fd, grpc_error_handle why,
385  bool releasing_fd) {
386  if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
387  if (!releasing_fd) {
388  shutdown(fd->fd, SHUT_RDWR);
389  } else {
390  /* we need a phony event for earlier linux versions. */
391  epoll_event phony_event;
392  if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &phony_event) !=
393  0) {
394  gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
395  }
396  }
397  fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
398  fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
399  }
400  GRPC_ERROR_UNREF(why);
401 }
402 
403 /* Might be called multiple times */
404 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
405  fd_shutdown_internal(fd, why, false);
406 }
407 
408 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
409  const char* reason) {
411  bool is_release_fd = (release_fd != nullptr);
412 
413  if (!fd->read_closure->IsShutdown()) {
414  fd_shutdown_internal(fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
415  is_release_fd);
416  }
417 
418  /* If release_fd is not NULL, we should be relinquishing control of the file
419  descriptor fd->fd (but we still own the grpc_fd structure). */
420  if (is_release_fd) {
421  *release_fd = fd->fd;
422  } else {
423  close(fd->fd);
424  }
425 
427 
428  grpc_iomgr_unregister_object(&fd->iomgr_object);
429  fork_fd_list_remove_grpc_fd(fd);
430  fd->read_closure->DestroyEvent();
431  fd->write_closure->DestroyEvent();
432  fd->error_closure->DestroyEvent();
433 
434  gpr_mu_lock(&fd_freelist_mu);
435  fd->freelist_next = fd_freelist;
436  fd_freelist = fd;
437  gpr_mu_unlock(&fd_freelist_mu);
438 }
439 
440 static bool fd_is_shutdown(grpc_fd* fd) {
441  return fd->read_closure->IsShutdown();
442 }
443 
444 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
445  fd->read_closure->NotifyOn(closure);
446 }
447 
448 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
449  fd->write_closure->NotifyOn(closure);
450 }
451 
452 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
453  fd->error_closure->NotifyOn(closure);
454 }
455 
456 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
457 
458 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
459 
460 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
461 
462 /*******************************************************************************
463  * Pollset Definitions
464  */
465 
466 static GPR_THREAD_LOCAL(grpc_pollset*) g_current_thread_pollset;
467 static GPR_THREAD_LOCAL(grpc_pollset_worker*) g_current_thread_worker;
468 
469 /* The designated poller */
470 static gpr_atm g_active_poller;
471 
472 static pollset_neighborhood* g_neighborhoods;
473 static size_t g_num_neighborhoods;
474 
475 /* Return true if first in list */
476 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
477  if (pollset->root_worker == nullptr) {
478  pollset->root_worker = worker;
479  worker->next = worker->prev = worker;
480  return true;
481  } else {
482  worker->next = pollset->root_worker;
483  worker->prev = worker->next->prev;
484  worker->next->prev = worker;
485  worker->prev->next = worker;
486  return false;
487  }
488 }
489 
490 /* Return true if last in list */
491 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
492 
493 static worker_remove_result worker_remove(grpc_pollset* pollset,
495  if (worker == pollset->root_worker) {
496  if (worker == worker->next) {
497  pollset->root_worker = nullptr;
498  return EMPTIED;
499  } else {
500  pollset->root_worker = worker->next;
501  worker->prev->next = worker->next;
502  worker->next->prev = worker->prev;
503  return NEW_ROOT;
504  }
505  } else {
506  worker->prev->next = worker->next;
507  worker->next->prev = worker->prev;
508  return REMOVED;
509  }
510 }
511 
512 static size_t choose_neighborhood(void) {
513  return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
514 }
515 
516 static grpc_error_handle pollset_global_init(void) {
517  gpr_atm_no_barrier_store(&g_active_poller, 0);
518  global_wakeup_fd.read_fd = -1;
519  grpc_error_handle err = grpc_wakeup_fd_init(&global_wakeup_fd);
520  if (!GRPC_ERROR_IS_NONE(err)) return err;
521  struct epoll_event ev;
522  ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
523  ev.data.ptr = &global_wakeup_fd;
524  if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
525  &ev) != 0) {
526  return GRPC_OS_ERROR(errno, "epoll_ctl");
527  }
528  g_num_neighborhoods =
529  grpc_core::Clamp(gpr_cpu_num_cores(), 1u, MAX_NEIGHBORHOODS);
530  g_neighborhoods = static_cast<pollset_neighborhood*>(
531  gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
532  for (size_t i = 0; i < g_num_neighborhoods; i++) {
533  gpr_mu_init(&g_neighborhoods[i].mu);
534  }
535  return GRPC_ERROR_NONE;
536 }
537 
538 static void pollset_global_shutdown(void) {
539  if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
540  for (size_t i = 0; i < g_num_neighborhoods; i++) {
541  gpr_mu_destroy(&g_neighborhoods[i].mu);
542  }
543  gpr_free(g_neighborhoods);
544 }
545 
546 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
547  gpr_mu_init(&pollset->mu);
548  *mu = &pollset->mu;
549  pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
550  pollset->reassigning_neighborhood = false;
551  pollset->root_worker = nullptr;
552  pollset->kicked_without_poller = false;
553  pollset->seen_inactive = true;
554  pollset->shutting_down = false;
555  pollset->shutdown_closure = nullptr;
556  pollset->begin_refs = 0;
557  pollset->next = pollset->prev = nullptr;
558 }
559 
560 static void pollset_destroy(grpc_pollset* pollset) {
561  gpr_mu_lock(&pollset->mu);
562  if (!pollset->seen_inactive) {
563  pollset_neighborhood* neighborhood = pollset->neighborhood;
564  gpr_mu_unlock(&pollset->mu);
565  retry_lock_neighborhood:
566  gpr_mu_lock(&neighborhood->mu);
567  gpr_mu_lock(&pollset->mu);
568  if (!pollset->seen_inactive) {
569  if (pollset->neighborhood != neighborhood) {
570  gpr_mu_unlock(&neighborhood->mu);
571  neighborhood = pollset->neighborhood;
572  gpr_mu_unlock(&pollset->mu);
573  goto retry_lock_neighborhood;
574  }
575  pollset->prev->next = pollset->next;
576  pollset->next->prev = pollset->prev;
577  if (pollset == pollset->neighborhood->active_root) {
578  pollset->neighborhood->active_root =
579  pollset->next == pollset ? nullptr : pollset->next;
580  }
581  }
582  gpr_mu_unlock(&pollset->neighborhood->mu);
583  }
584  gpr_mu_unlock(&pollset->mu);
585  gpr_mu_destroy(&pollset->mu);
586 }
587 
588 static grpc_error_handle pollset_kick_all(grpc_pollset* pollset) {
589  GPR_TIMER_SCOPE("pollset_kick_all", 0);
591  if (pollset->root_worker != nullptr) {
592  grpc_pollset_worker* worker = pollset->root_worker;
593  do {
595  switch (worker->state) {
596  case KICKED:
598  break;
599  case UNKICKED:
600  SET_KICK_STATE(worker, KICKED);
601  if (worker->initialized_cv) {
603  gpr_cv_signal(&worker->cv);
604  }
605  break;
606  case DESIGNATED_POLLER:
608  SET_KICK_STATE(worker, KICKED);
609  append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
610  "pollset_kick_all");
611  break;
612  }
613 
614  worker = worker->next;
615  } while (worker != pollset->root_worker);
616  }
617  // TODO(sreek): Check if we need to set 'kicked_without_poller' to true here
618  // in the else case
619  return error;
620 }
621 
622 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
623  if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
624  pollset->begin_refs == 0) {
625  GPR_TIMER_MARK("pollset_finish_shutdown", 0);
626  grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
628  pollset->shutdown_closure = nullptr;
629  }
630 }
631 
632 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
633  GPR_TIMER_SCOPE("pollset_shutdown", 0);
634  GPR_ASSERT(pollset->shutdown_closure == nullptr);
635  GPR_ASSERT(!pollset->shutting_down);
636  pollset->shutdown_closure = closure;
637  pollset->shutting_down = true;
638  GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
639  pollset_maybe_finish_shutdown(pollset);
640 }
641 
642 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp millis) {
643  if (millis == grpc_core::Timestamp::InfFuture()) return -1;
644  int64_t delta = (millis - grpc_core::ExecCtx::Get()->Now()).millis();
645  if (delta > INT_MAX) {
646  return INT_MAX;
647  } else if (delta < 0) {
648  return 0;
649  } else {
650  return static_cast<int>(delta);
651  }
652 }
653 
654 /* Process the epoll events found by do_epoll_wait() function.
655  - g_epoll_set.cursor points to the index of the first event to be processed
656  - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
657  updates the g_epoll_set.cursor
658 
659  NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
660  called by g_active_poller thread. So there is no need for synchronization
661  when accessing fields in g_epoll_set */
662 static grpc_error_handle process_epoll_events(grpc_pollset* /*pollset*/) {
663  GPR_TIMER_SCOPE("process_epoll_events", 0);
664 
665  static const char* err_desc = "process_events";
667  long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
668  long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
669  for (int idx = 0;
670  (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
671  idx++) {
672  long c = cursor++;
673  struct epoll_event* ev = &g_epoll_set.events[c];
674  void* data_ptr = ev->data.ptr;
675 
676  if (data_ptr == &global_wakeup_fd) {
677  append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
678  err_desc);
679  } else {
680  grpc_fd* fd = reinterpret_cast<grpc_fd*>(
681  reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
682  bool track_err =
683  reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
684  bool cancel = (ev->events & EPOLLHUP) != 0;
685  bool error = (ev->events & EPOLLERR) != 0;
686  bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
687  bool write_ev = (ev->events & EPOLLOUT) != 0;
688  bool err_fallback = error && !track_err;
689 
690  if (error && !err_fallback) {
691  fd_has_errors(fd);
692  }
693 
694  if (read_ev || cancel || err_fallback) {
695  fd_become_readable(fd);
696  }
697 
698  if (write_ev || cancel || err_fallback) {
699  fd_become_writable(fd);
700  }
701  }
702  }
703  gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
704  return error;
705 }
706 
707 /* Do epoll_wait and store the events in g_epoll_set.events field. This does not
708  "process" any of the events yet; that is done in process_epoll_events().
709  *See process_epoll_events() function for more details.
710 
711  NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
712  (i.e the designated poller thread) will be calling this function. So there is
713  no need for any synchronization when accesing fields in g_epoll_set */
714 static grpc_error_handle do_epoll_wait(grpc_pollset* ps,
715  grpc_core::Timestamp deadline) {
716  GPR_TIMER_SCOPE("do_epoll_wait", 0);
717 
718  int r;
719  int timeout = poll_deadline_to_millis_timeout(deadline);
720  if (timeout != 0) {
722  }
723  do {
725  r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
726  timeout);
727  } while (r < 0 && errno == EINTR);
728  if (timeout != 0) {
730  }
731 
732  if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
733 
735 
737  gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
738  }
739 
740  gpr_atm_rel_store(&g_epoll_set.num_events, r);
741  gpr_atm_rel_store(&g_epoll_set.cursor, 0);
742 
743  return GRPC_ERROR_NONE;
744 }
745 
746 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
747  grpc_pollset_worker** worker_hdl,
748  grpc_core::Timestamp deadline) {
749  GPR_TIMER_SCOPE("begin_worker", 0);
750  if (worker_hdl != nullptr) *worker_hdl = worker;
751  worker->initialized_cv = false;
752  SET_KICK_STATE(worker, UNKICKED);
753  worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
754  pollset->begin_refs++;
755 
757  gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
758  }
759 
760  if (pollset->seen_inactive) {
761  // pollset has been observed to be inactive, we need to move back to the
762  // active list
763  bool is_reassigning = false;
764  if (!pollset->reassigning_neighborhood) {
765  is_reassigning = true;
766  pollset->reassigning_neighborhood = true;
767  pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
768  }
769  pollset_neighborhood* neighborhood = pollset->neighborhood;
770  gpr_mu_unlock(&pollset->mu);
771  // pollset unlocked: state may change (even worker->kick_state)
772  retry_lock_neighborhood:
773  gpr_mu_lock(&neighborhood->mu);
774  gpr_mu_lock(&pollset->mu);
776  gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
777  pollset, worker, kick_state_string(worker->state),
778  is_reassigning);
779  }
780  if (pollset->seen_inactive) {
781  if (neighborhood != pollset->neighborhood) {
782  gpr_mu_unlock(&neighborhood->mu);
783  neighborhood = pollset->neighborhood;
784  gpr_mu_unlock(&pollset->mu);
785  goto retry_lock_neighborhood;
786  }
787 
788  /* In the brief time we released the pollset locks above, the worker MAY
789  have been kicked. In this case, the worker should get out of this
790  pollset ASAP and hence this should neither add the pollset to
791  neighborhood nor mark the pollset as active.
792 
793  On a side note, the only way a worker's kick state could have changed
794  at this point is if it were "kicked specifically". Since the worker has
795  not added itself to the pollset yet (by calling worker_insert()), it is
796  not visible in the "kick any" path yet */
797  if (worker->state == UNKICKED) {
798  pollset->seen_inactive = false;
799  if (neighborhood->active_root == nullptr) {
800  neighborhood->active_root = pollset->next = pollset->prev = pollset;
801  /* Make this the designated poller if there isn't one already */
802  if (worker->state == UNKICKED &&
803  gpr_atm_no_barrier_cas(&g_active_poller, 0,
804  reinterpret_cast<gpr_atm>(worker))) {
805  SET_KICK_STATE(worker, DESIGNATED_POLLER);
806  }
807  } else {
808  pollset->next = neighborhood->active_root;
809  pollset->prev = pollset->next->prev;
810  pollset->next->prev = pollset->prev->next = pollset;
811  }
812  }
813  }
814  if (is_reassigning) {
815  GPR_ASSERT(pollset->reassigning_neighborhood);
816  pollset->reassigning_neighborhood = false;
817  }
818  gpr_mu_unlock(&neighborhood->mu);
819  }
820 
821  worker_insert(pollset, worker);
822  pollset->begin_refs--;
823  if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
824  GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
825  worker->initialized_cv = true;
826  gpr_cv_init(&worker->cv);
827  while (worker->state == UNKICKED && !pollset->shutting_down) {
829  gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
830  pollset, worker, kick_state_string(worker->state),
831  pollset->shutting_down);
832  }
833 
834  if (gpr_cv_wait(&worker->cv, &pollset->mu,
835  deadline.as_timespec(GPR_CLOCK_MONOTONIC)) &&
836  worker->state == UNKICKED) {
837  /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
838  received a kick */
839  SET_KICK_STATE(worker, KICKED);
840  }
841  }
843  }
844 
847  "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
848  "kicked_without_poller: %d",
849  pollset, worker, kick_state_string(worker->state),
850  pollset->shutting_down, pollset->kicked_without_poller);
851  }
852 
853  /* We release pollset lock in this function at a couple of places:
854  * 1. Briefly when assigning pollset to a neighborhood
855  * 2. When doing gpr_cv_wait()
856  * It is possible that 'kicked_without_poller' was set to true during (1) and
857  * 'shutting_down' is set to true during (1) or (2). If either of them is
858  * true, this worker cannot do polling */
859  /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
860  * case; especially when the worker is the DESIGNATED_POLLER */
861 
862  if (pollset->kicked_without_poller) {
863  pollset->kicked_without_poller = false;
864  return false;
865  }
866 
867  return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
868 }
869 
870 static bool check_neighborhood_for_available_poller(
871  pollset_neighborhood* neighborhood) {
872  GPR_TIMER_SCOPE("check_neighborhood_for_available_poller", 0);
873  bool found_worker = false;
874  do {
875  grpc_pollset* inspect = neighborhood->active_root;
876  if (inspect == nullptr) {
877  break;
878  }
879  gpr_mu_lock(&inspect->mu);
880  GPR_ASSERT(!inspect->seen_inactive);
881  grpc_pollset_worker* inspect_worker = inspect->root_worker;
882  if (inspect_worker != nullptr) {
883  do {
884  switch (inspect_worker->state) {
885  case UNKICKED:
887  &g_active_poller, 0,
888  reinterpret_cast<gpr_atm>(inspect_worker))) {
890  gpr_log(GPR_INFO, " .. choose next poller to be %p",
891  inspect_worker);
892  }
893  SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
894  if (inspect_worker->initialized_cv) {
895  GPR_TIMER_MARK("signal worker", 0);
897  gpr_cv_signal(&inspect_worker->cv);
898  }
899  } else {
901  gpr_log(GPR_INFO, " .. beaten to choose next poller");
902  }
903  }
904  // even if we didn't win the cas, there's a worker, we can stop
905  found_worker = true;
906  break;
907  case KICKED:
908  break;
909  case DESIGNATED_POLLER:
910  found_worker = true; // ok, so someone else found the worker, but
911  // we'll accept that
912  break;
913  }
914  inspect_worker = inspect_worker->next;
915  } while (!found_worker && inspect_worker != inspect->root_worker);
916  }
917  if (!found_worker) {
919  gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
920  }
921  inspect->seen_inactive = true;
922  if (inspect == neighborhood->active_root) {
923  neighborhood->active_root =
924  inspect->next == inspect ? nullptr : inspect->next;
925  }
926  inspect->next->prev = inspect->prev;
927  inspect->prev->next = inspect->next;
928  inspect->next = inspect->prev = nullptr;
929  }
930  gpr_mu_unlock(&inspect->mu);
931  } while (!found_worker);
932  return found_worker;
933 }
934 
935 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
936  grpc_pollset_worker** worker_hdl) {
937  GPR_TIMER_SCOPE("end_worker", 0);
939  gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
940  }
941  if (worker_hdl != nullptr) *worker_hdl = nullptr;
942  /* Make sure we appear kicked */
943  SET_KICK_STATE(worker, KICKED);
944  grpc_closure_list_move(&worker->schedule_on_end_work,
946  if (gpr_atm_no_barrier_load(&g_active_poller) ==
947  reinterpret_cast<gpr_atm>(worker)) {
948  if (worker->next != worker && worker->next->state == UNKICKED) {
950  gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
951  }
952  GPR_ASSERT(worker->next->initialized_cv);
953  gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
954  SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
956  gpr_cv_signal(&worker->next->cv);
957  if (grpc_core::ExecCtx::Get()->HasWork()) {
958  gpr_mu_unlock(&pollset->mu);
960  gpr_mu_lock(&pollset->mu);
961  }
962  } else {
963  gpr_atm_no_barrier_store(&g_active_poller, 0);
964  size_t poller_neighborhood_idx =
965  static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
966  gpr_mu_unlock(&pollset->mu);
967  bool found_worker = false;
968  bool scan_state[MAX_NEIGHBORHOODS];
969  for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
970  pollset_neighborhood* neighborhood =
971  &g_neighborhoods[(poller_neighborhood_idx + i) %
972  g_num_neighborhoods];
973  if (gpr_mu_trylock(&neighborhood->mu)) {
974  found_worker = check_neighborhood_for_available_poller(neighborhood);
975  gpr_mu_unlock(&neighborhood->mu);
976  scan_state[i] = true;
977  } else {
978  scan_state[i] = false;
979  }
980  }
981  for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
982  if (scan_state[i]) continue;
983  pollset_neighborhood* neighborhood =
984  &g_neighborhoods[(poller_neighborhood_idx + i) %
985  g_num_neighborhoods];
986  gpr_mu_lock(&neighborhood->mu);
987  found_worker = check_neighborhood_for_available_poller(neighborhood);
988  gpr_mu_unlock(&neighborhood->mu);
989  }
991  gpr_mu_lock(&pollset->mu);
992  }
993  } else if (grpc_core::ExecCtx::Get()->HasWork()) {
994  gpr_mu_unlock(&pollset->mu);
996  gpr_mu_lock(&pollset->mu);
997  }
998  if (worker->initialized_cv) {
999  gpr_cv_destroy(&worker->cv);
1000  }
1002  gpr_log(GPR_INFO, " .. remove worker");
1003  }
1004  if (EMPTIED == worker_remove(pollset, worker)) {
1005  pollset_maybe_finish_shutdown(pollset);
1006  }
1007  GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
1008 }
1009 
1010 /* pollset->po.mu lock must be held by the caller before calling this.
1011  The function pollset_work() may temporarily release the lock (pollset->po.mu)
1012  during the course of its execution but it will always re-acquire the lock and
1013  ensure that it is held by the time the function returns */
1015  grpc_pollset_worker** worker_hdl,
1016  grpc_core::Timestamp deadline) {
1017  GPR_TIMER_SCOPE("pollset_work", 0);
1020  static const char* err_desc = "pollset_work";
1021  if (ps->kicked_without_poller) {
1022  ps->kicked_without_poller = false;
1023  return GRPC_ERROR_NONE;
1024  }
1025 
1026  if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1027  g_current_thread_pollset = ps;
1028  g_current_thread_worker = &worker;
1029  GPR_ASSERT(!ps->shutting_down);
1030  GPR_ASSERT(!ps->seen_inactive);
1031 
1032  gpr_mu_unlock(&ps->mu); /* unlock */
1033  /* This is the designated polling thread at this point and should ideally do
1034  polling. However, if there are unprocessed events left from a previous
1035  call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1036  process the pending epoll events.
1037 
1038  The reason for decoupling do_epoll_wait and process_epoll_events is to
1039  better distribute the work (i.e handling epoll events) across multiple
1040  threads
1041 
1042  process_epoll_events() returns very quickly: It just queues the work on
1043  exec_ctx but does not execute it (the actual exectution or more
1044  accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1045  AFTER selecting a designated poller). So we are not waiting long periods
1046  without a designated poller */
1047  if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1048  gpr_atm_acq_load(&g_epoll_set.num_events)) {
1049  append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1050  }
1051  append_error(&error, process_epoll_events(ps), err_desc);
1052 
1053  gpr_mu_lock(&ps->mu); /* lock */
1054 
1055  g_current_thread_worker = nullptr;
1056  } else {
1057  g_current_thread_pollset = ps;
1058  }
1059  end_worker(ps, &worker, worker_hdl);
1060 
1061  g_current_thread_pollset = nullptr;
1062  return error;
1063 }
1064 
1066  grpc_pollset_worker* specific_worker) {
1067  GPR_TIMER_SCOPE("pollset_kick", 0);
1071  std::vector<std::string> log;
1072  log.push_back(absl::StrFormat(
1073  "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
1074  static_cast<void*>(g_current_thread_pollset),
1075  static_cast<void*>(g_current_thread_worker), pollset->root_worker));
1076  if (pollset->root_worker != nullptr) {
1077  log.push_back(absl::StrFormat(
1078  " {kick_state=%s next=%p {kick_state=%s}}",
1079  kick_state_string(pollset->root_worker->state),
1080  pollset->root_worker->next,
1081  kick_state_string(pollset->root_worker->next->state)));
1082  }
1083  if (specific_worker != nullptr) {
1084  log.push_back(absl::StrFormat(" worker_kick_state=%s",
1085  kick_state_string(specific_worker->state)));
1086  }
1087  gpr_log(GPR_DEBUG, "%s", absl::StrJoin(log, "").c_str());
1088  }
1089 
1090  if (specific_worker == nullptr) {
1091  if (g_current_thread_pollset != pollset) {
1092  grpc_pollset_worker* root_worker = pollset->root_worker;
1093  if (root_worker == nullptr) {
1095  pollset->kicked_without_poller = true;
1097  gpr_log(GPR_INFO, " .. kicked_without_poller");
1098  }
1099  goto done;
1100  }
1101  grpc_pollset_worker* next_worker = root_worker->next;
1102  if (root_worker->state == KICKED) {
1105  gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1106  }
1107  SET_KICK_STATE(root_worker, KICKED);
1108  goto done;
1109  } else if (next_worker->state == KICKED) {
1112  gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1113  }
1114  SET_KICK_STATE(next_worker, KICKED);
1115  goto done;
1116  } else if (root_worker == next_worker && // only try and wake up a poller
1117  // if there is no next worker
1118  root_worker ==
1119  reinterpret_cast<grpc_pollset_worker*>(
1120  gpr_atm_no_barrier_load(&g_active_poller))) {
1123  gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1124  }
1125  SET_KICK_STATE(root_worker, KICKED);
1126  ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1127  goto done;
1128  } else if (next_worker->state == UNKICKED) {
1131  gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1132  }
1133  GPR_ASSERT(next_worker->initialized_cv);
1134  SET_KICK_STATE(next_worker, KICKED);
1135  gpr_cv_signal(&next_worker->cv);
1136  goto done;
1137  } else if (next_worker->state == DESIGNATED_POLLER) {
1138  if (root_worker->state != DESIGNATED_POLLER) {
1140  gpr_log(
1141  GPR_INFO,
1142  " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1143  root_worker, root_worker->initialized_cv, next_worker);
1144  }
1145  SET_KICK_STATE(root_worker, KICKED);
1146  if (root_worker->initialized_cv) {
1148  gpr_cv_signal(&root_worker->cv);
1149  }
1150  goto done;
1151  } else {
1154  gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1155  root_worker);
1156  }
1157  SET_KICK_STATE(next_worker, KICKED);
1158  ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1159  goto done;
1160  }
1161  } else {
1163  GPR_ASSERT(next_worker->state == KICKED);
1164  SET_KICK_STATE(next_worker, KICKED);
1165  goto done;
1166  }
1167  } else {
1170  gpr_log(GPR_INFO, " .. kicked while waking up");
1171  }
1172  goto done;
1173  }
1174 
1175  GPR_UNREACHABLE_CODE(goto done);
1176  }
1177 
1178  if (specific_worker->state == KICKED) {
1180  gpr_log(GPR_INFO, " .. specific worker already kicked");
1181  }
1182  goto done;
1183  } else if (g_current_thread_worker == specific_worker) {
1186  gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1187  }
1188  SET_KICK_STATE(specific_worker, KICKED);
1189  goto done;
1190  } else if (specific_worker ==
1191  reinterpret_cast<grpc_pollset_worker*>(
1192  gpr_atm_no_barrier_load(&g_active_poller))) {
1195  gpr_log(GPR_INFO, " .. kick active poller");
1196  }
1197  SET_KICK_STATE(specific_worker, KICKED);
1198  ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1199  goto done;
1200  } else if (specific_worker->initialized_cv) {
1203  gpr_log(GPR_INFO, " .. kick waiting worker");
1204  }
1205  SET_KICK_STATE(specific_worker, KICKED);
1206  gpr_cv_signal(&specific_worker->cv);
1207  goto done;
1208  } else {
1211  gpr_log(GPR_INFO, " .. kick non-waiting worker");
1212  }
1213  SET_KICK_STATE(specific_worker, KICKED);
1214  goto done;
1215  }
1216 done:
1217  return ret_err;
1218 }
1219 
1220 static void pollset_add_fd(grpc_pollset* /*pollset*/, grpc_fd* /*fd*/) {}
1221 
1222 /*******************************************************************************
1223  * Pollset-set Definitions
1224  */
1225 
1226 static grpc_pollset_set* pollset_set_create(void) {
1227  return reinterpret_cast<grpc_pollset_set*>(static_cast<intptr_t>(0xdeafbeef));
1228 }
1229 
1230 static void pollset_set_destroy(grpc_pollset_set* /*pss*/) {}
1231 
1232 static void pollset_set_add_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1233 
1234 static void pollset_set_del_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1235 
1236 static void pollset_set_add_pollset(grpc_pollset_set* /*pss*/,
1237  grpc_pollset* /*ps*/) {}
1238 
1239 static void pollset_set_del_pollset(grpc_pollset_set* /*pss*/,
1240  grpc_pollset* /*ps*/) {}
1241 
1242 static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
1243  grpc_pollset_set* /*item*/) {}
1244 
1245 static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
1246  grpc_pollset_set* /*item*/) {}
1247 
1248 /*******************************************************************************
1249  * Event engine binding
1250  */
1251 
1252 static bool is_any_background_poller_thread(void) { return false; }
1253 
1254 static void shutdown_background_closure(void) {}
1255 
1256 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1257  grpc_error_handle /*error*/) {
1258  return false;
1259 }
1260 
1261 static void shutdown_engine(void) {
1262  fd_global_shutdown();
1263  pollset_global_shutdown();
1264  epoll_set_shutdown();
1265  if (grpc_core::Fork::Enabled()) {
1266  gpr_mu_destroy(&fork_fd_list_mu);
1268  }
1269 }
1270 
1271 static bool init_epoll1_linux();
1272 
1274  sizeof(grpc_pollset),
1275  true,
1276  false,
1277 
1278  fd_create,
1279  fd_wrapped_fd,
1280  fd_orphan,
1281  fd_shutdown,
1282  fd_notify_on_read,
1283  fd_notify_on_write,
1284  fd_notify_on_error,
1285  fd_become_readable,
1286  fd_become_writable,
1287  fd_has_errors,
1288  fd_is_shutdown,
1289 
1290  pollset_init,
1293  pollset_work,
1294  pollset_kick,
1295  pollset_add_fd,
1296 
1297  pollset_set_create,
1298  pollset_set_destroy,
1299  pollset_set_add_pollset,
1300  pollset_set_del_pollset,
1301  pollset_set_add_pollset_set,
1302  pollset_set_del_pollset_set,
1303  pollset_set_add_fd,
1304  pollset_set_del_fd,
1305 
1306  is_any_background_poller_thread,
1307  /* name = */ "epoll1",
1308  /* check_engine_available = */ [](bool) { return init_epoll1_linux(); },
1309  /* init_engine = */ []() {},
1310  shutdown_background_closure,
1311  /* shutdown_engine = */ []() {},
1312  add_closure_to_background_poller,
1313 };
1314 
1315 /* Called by the child process's post-fork handler to close open fds, including
1316  * the global epoll fd. This allows gRPC to shutdown in the child process
1317  * without interfering with connections or RPCs ongoing in the parent. */
1318 static void reset_event_manager_on_fork() {
1319  gpr_mu_lock(&fork_fd_list_mu);
1320  while (fork_fd_list_head != nullptr) {
1321  close(fork_fd_list_head->fd);
1322  fork_fd_list_head->fd = -1;
1323  fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1324  }
1325  gpr_mu_unlock(&fork_fd_list_mu);
1326  shutdown_engine();
1327  init_epoll1_linux();
1328 }
1329 
1330 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1331  * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1332  * support is available */
1333 static bool init_epoll1_linux() {
1334  if (!grpc_has_wakeup_fd()) {
1335  gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1336  return false;
1337  }
1338 
1339  if (!epoll_set_init()) {
1340  return false;
1341  }
1342 
1343  fd_global_init();
1344 
1345  if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1346  fd_global_shutdown();
1347  epoll_set_shutdown();
1348  return false;
1349  }
1350 
1351  if (grpc_core::Fork::Enabled()) {
1352  gpr_mu_init(&fork_fd_list_mu);
1354  reset_event_manager_on_fork);
1355  }
1356  return true;
1357 }
1358 
1359 #else /* defined(GRPC_LINUX_EPOLL) */
1360 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1363  1,
1364  true,
1365  false,
1366 
1367  nullptr,
1368  nullptr,
1369  nullptr,
1370  nullptr,
1371  nullptr,
1372  nullptr,
1373  nullptr,
1374  nullptr,
1375  nullptr,
1376  nullptr,
1377  nullptr,
1378 
1379  nullptr,
1380  nullptr,
1381  nullptr,
1382  nullptr,
1383  nullptr,
1384  nullptr,
1385 
1386  nullptr,
1387  nullptr,
1388  nullptr,
1389  nullptr,
1390  nullptr,
1391  nullptr,
1392  nullptr,
1393  nullptr,
1394 
1395  nullptr,
1396  /* name = */ "epoll1",
1397  /* check_engine_available = */ [](bool) { return false; },
1398  nullptr,
1399  nullptr,
1400  nullptr,
1401  nullptr,
1402 };
1403 #endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
1404 #endif /* !defined(GRPC_LINUX_EPOLL) */
grpc_pollset_worker
struct grpc_pollset_worker grpc_pollset_worker
Definition: pollset.h:39
gpr_cv_signal
GPRAPI void gpr_cv_signal(gpr_cv *cv)
EPOLL_CTL_ADD
#define EPOLL_CTL_ADD
Definition: os390-syscalls.h:32
grpc_wakeup_fd_destroy
void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info)
grpc_wakeup_fd::read_fd
int read_fd
Definition: wakeup_fd_posix.h:75
gpr_cpu_num_cores
GPRAPI unsigned gpr_cpu_num_cores(void)
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
block_annotate.h
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
gpr_cpu_current_cpu
GPRAPI unsigned gpr_cpu_current_cpu(void)
gpr_atm_no_barrier_load
#define gpr_atm_no_barrier_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:53
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc::testing::pollset_work
static grpc_error_handle pollset_work(grpc_pollset *ps, grpc_pollset_worker **, grpc_core::Timestamp deadline)
Definition: bm_cq_multiple_threads.cc:73
log.h
gpr_atm_no_barrier_store
#define gpr_atm_no_barrier_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:56
grpc_pollset::mu
gpr_mu mu
Definition: bm_cq_multiple_threads.cc:38
GRPC_ERROR_CREATE_FROM_COPIED_STRING
#define GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc)
Definition: error.h:294
grpc_trace_fd_refcount
grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount
GRPC_STATS_INC_POLLSET_KICKED_AGAIN
#define GRPC_STATS_INC_POLLSET_KICKED_AGAIN()
Definition: stats_data.h:196
bool
bool
Definition: setup_once.h:312
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
wakeup_fd_posix.h
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
timers.h
iomgr_internal.h
grpc_wakeup_fd
Definition: wakeup_fd_posix.h:74
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
worker
static void worker(void *arg)
Definition: threadpool.c:57
GRPC_CLOSURE_LIST_INIT
#define GRPC_CLOSURE_LIST_INIT
Definition: closure.h:167
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
gpr_cv
pthread_cond_t gpr_cv
Definition: impl/codegen/sync_posix.h:48
string.h
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
useful.h
grpc_event_engine_vtable
Definition: ev_posix.h:46
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
error_ref_leak.err
err
Definition: error_ref_leak.py:35
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
u
OPENSSL_EXPORT pem_password_cb void * u
Definition: pem.h:351
setup.name
name
Definition: setup.py:542
GRPC_LOG_IF_ERROR
#define GRPC_LOG_IF_ERROR(what, error)
Definition: error.h:398
GPR_CACHELINE_SIZE
#define GPR_CACHELINE_SIZE
Definition: impl/codegen/port_platform.h:542
lockfree_event.h
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_closure_list
struct grpc_closure_list grpc_closure_list
stats.h
grpc_iomgr_unregister_object
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj)
Definition: iomgr.cc:193
epoll_create1
uv__os390_epoll * epoll_create1(int flags)
Definition: os390-syscalls.c:215
gpr_zalloc
GPRAPI void * gpr_zalloc(size_t size)
Definition: alloc.cc:40
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc_iomgr_object
Definition: iomgr_internal.h:29
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_error_add_child
grpc_error_handle grpc_error_add_child(grpc_error_handle src, grpc_error_handle child)
Definition: error.cc:678
grpc_core::ExecCtx::closure_list
grpc_closure_list * closure_list()
Definition: exec_ctx.h:146
c
void c(T a)
Definition: miscompile_with_no_unique_address_test.cc:40
GRPC_OS_ERROR
#define GRPC_OS_ERROR(err, call_name)
create an error associated with errno!=0 (an 'operating system' error)
Definition: error.h:352
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
absl::StrJoin
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
Definition: abseil-cpp/absl/strings/str_join.h:239
GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD
#define GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD()
Definition: stats_data.h:202
grpc_wakeup_fd_wakeup
grpc_error_handle grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) GRPC_MUST_USE_RESULT
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
gpr_cv_destroy
GPRAPI void gpr_cv_destroy(gpr_cv *cv)
ev_posix.h
grpc_core::ExecCtx::Flush
bool Flush()
Definition: exec_ctx.cc:69
grpc_fd
struct grpc_fd grpc_fd
Definition: ev_posix.h:44
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
grpc_wakeup_fd_init
grpc_error_handle grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) GRPC_MUST_USE_RESULT
worker
Definition: worker.py:1
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
pad
int pad
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/statusor_test.cc:47
gpr_atm_acq_load
#define gpr_atm_acq_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:52
ev_epoll1_linux.h
grpc_wakeup_fd_consume_wakeup
grpc_error_handle grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) GRPC_MUST_USE_RESULT
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
cpu.h
epoll_event::fd
int fd
Definition: os390-syscalls.h:42
GRPC_STATS_INC_SYSCALL_POLL
#define GRPC_STATS_INC_SYSCALL_POLL()
Definition: stats_data.h:188
GRPC_SCHEDULING_END_BLOCKING_REGION
#define GRPC_SCHEDULING_END_BLOCKING_REGION
Definition: block_annotate.h:48
gpr_cv_wait
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
gpr_atm_rel_store
#define gpr_atm_rel_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:54
close
#define close
Definition: test-fs.c:48
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
GRPC_STATS_INC_POLL_EVENTS_RETURNED
#define GRPC_STATS_INC_POLL_EVENTS_RETURNED(value)
Definition: stats_data.h:396
gpr_atm_no_barrier_cas
#define gpr_atm_no_barrier_cas(p, o, n)
Definition: impl/codegen/atm_gcc_sync.h:74
grpc_pollset
struct grpc_pollset grpc_pollset
Definition: pollset.h:38
GPR_THREAD_LOCAL
#define GPR_THREAD_LOCAL(type)
Definition: tls.h:151
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
epoll_wait
int epoll_wait(uv__os390_epoll *lst, struct epoll_event *events, int maxevents, int timeout)
Definition: os390-syscalls.c:284
log
Definition: bloaty/third_party/zlib/examples/gzlog.c:289
manual_constructor.h
setup.idx
idx
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:197
grpc_closure_list
Definition: closure.h:41
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
grpc_iomgr_register_object
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name)
Definition: iomgr.cc:184
grpc_core::Clamp
T Clamp(T val, T min, T max)
Definition: useful.h:31
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
grpc_polling_trace
grpc_core::DebugOnlyTraceFlag grpc_polling_trace
cv
unsigned cv
Definition: cxa_demangle.cpp:4908
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
port.h
poll.h
gpr_mu_trylock
GPRAPI int gpr_mu_trylock(gpr_mu *mu)
epoll_event
Definition: os390-syscalls.h:40
GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER
#define GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER()
Definition: stats_data.h:194
alloc.h
grpc::testing::pollset_destroy
static void pollset_destroy(grpc_pollset *ps)
Definition: bm_cq_multiple_threads.cc:59
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
fix_build_deps.r
r
Definition: fix_build_deps.py:491
epoll_ctl
int epoll_ctl(uv__os390_epoll *lst, int op, int fd, struct epoll_event *event)
Definition: os390-syscalls.c:238
EPOLL_CTL_DEL
#define EPOLL_CTL_DEL
Definition: os390-syscalls.h:33
desc
#define desc
Definition: bloaty/third_party/protobuf/src/google/protobuf/extension_set.h:338
closure
Definition: proxy.cc:59
tls.h
log
bool log
Definition: abseil-cpp/absl/synchronization/mutex.cc:310
grpc::testing::pollset_shutdown
static void pollset_shutdown(grpc_pollset *, grpc_closure *closure)
Definition: bm_cq_multiple_threads.cc:50
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
grpc_has_wakeup_fd
int grpc_has_wakeup_fd(void)
cancel
bool cancel
Definition: client_callback_end2end_test.cc:634
grpc_core::Fork::SetResetChildPollingEngineFunc
static void SetResetChildPollingEngineFunc(child_postfork_func reset_child_polling_engine)
Definition: fork.cc:198
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV
#define GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV()
Definition: stats_data.h:200
epoll_event::events
int events
Definition: os390-syscalls.h:41
GRPC_STATS_INC_POLLSET_KICK
#define GRPC_STATS_INC_POLLSET_KICK()
Definition: stats_data.h:192
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
grpc_closure_list_move
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst)
Definition: closure.h:228
grpc_error
Definition: error_internal.h:42
grpc::testing::pollset_kick
static grpc_error_handle pollset_kick(grpc_pollset *, grpc_pollset_worker *)
Definition: bm_cq_multiple_threads.cc:61
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD
#define GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD()
Definition: stats_data.h:198
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_closure
Definition: closure.h:56
GPR_TIMER_MARK
#define GPR_TIMER_MARK(tag, important)
Definition: src/core/lib/profiling/timers.h:39
grpc_ev_epoll1_posix
const grpc_event_engine_vtable grpc_ev_epoll1_posix
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
errno.h
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
grpc_core::Timestamp::as_timespec
gpr_timespec as_timespec(gpr_clock_type type) const
Definition: src/core/lib/gprpp/time.cc:157
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
state
static struct rpc_state state
Definition: bad_server_response_test.cc:87
gpr_cv_init
GPRAPI void gpr_cv_init(gpr_cv *cv)
GRPC_SCHEDULING_START_BLOCKING_REGION
#define GRPC_SCHEDULING_START_BLOCKING_REGION
Definition: block_annotate.h:45
grpc_core::Fork::Enabled
static bool Enabled()
Definition: fork.cc:184
grpc_core::ExecCtx::InvalidateNow
void InvalidateNow()
Definition: exec_ctx.h:188
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
grpc_core::ManualConstructor
Definition: manual_constructor.h:103
port_platform.h
grpc::testing::pollset_init
static void pollset_init(grpc_pollset *ps, gpr_mu **mu)
Definition: bm_cq_multiple_threads.cc:54


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:19