27 #ifdef GRPC_LINUX_EPOLL
35 #include <sys/epoll.h>
36 #include <sys/socket.h>
42 #include "absl/strings/str_cat.h"
43 #include "absl/strings/str_format.h"
44 #include "absl/strings/str_join.h"
68 #define MAX_EPOLL_EVENTS 100
69 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
79 typedef struct epoll_set {
94 static epoll_set g_epoll_set;
96 static int epoll_create_and_cloexec() {
97 #ifdef GRPC_LINUX_EPOLL_CREATE1
103 int fd = epoll_create(MAX_EPOLL_EVENTS);
106 }
else if (fcntl(
fd, F_SETFD, FD_CLOEXEC) != 0) {
115 static bool epoll_set_init() {
116 g_epoll_set.epfd = epoll_create_and_cloexec();
117 if (g_epoll_set.epfd < 0) {
128 static void epoll_set_shutdown() {
129 if (g_epoll_set.epfd >= 0) {
130 close(g_epoll_set.epfd);
131 g_epoll_set.epfd = -1;
140 struct grpc_fork_fd_list {
158 grpc_fork_fd_list* fork_fd_list;
161 static void fd_global_init(
void);
162 static void fd_global_shutdown(
void);
168 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
170 static const char* kick_state_string(kick_state st) {
176 case DESIGNATED_POLLER:
177 return "DESIGNATED_POLLER";
184 int kick_state_mutator;
192 #define SET_KICK_STATE(worker, kick_state) \
194 (worker)->state = (kick_state); \
195 (worker)->kick_state_mutator = __LINE__; \
198 #define MAX_NEIGHBORHOODS 1024u
200 typedef struct pollset_neighborhood {
208 } pollset_neighborhood;
212 pollset_neighborhood* neighborhood;
213 bool reassigning_neighborhood;
215 bool kicked_without_poller;
275 static grpc_fd* fd_freelist =
nullptr;
276 static gpr_mu fd_freelist_mu;
279 static grpc_fd* fork_fd_list_head =
nullptr;
280 static gpr_mu fork_fd_list_mu;
282 static void fd_global_init(
void) {
gpr_mu_init(&fd_freelist_mu); }
284 static void fd_global_shutdown(
void) {
291 while (fd_freelist !=
nullptr) {
293 fd_freelist = fd_freelist->freelist_next;
299 static void fork_fd_list_add_grpc_fd(
grpc_fd* fd) {
303 static_cast<grpc_fork_fd_list*
>(
gpr_malloc(
sizeof(grpc_fork_fd_list)));
304 fd->fork_fd_list->next = fork_fd_list_head;
305 fd->fork_fd_list->prev =
nullptr;
306 if (fork_fd_list_head !=
nullptr) {
307 fork_fd_list_head->fork_fd_list->prev = fd;
309 fork_fd_list_head = fd;
314 static void fork_fd_list_remove_grpc_fd(
grpc_fd* fd) {
317 if (fork_fd_list_head == fd) {
318 fork_fd_list_head = fd->fork_fd_list->next;
320 if (fd->fork_fd_list->prev !=
nullptr) {
321 fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
323 if (fd->fork_fd_list->next !=
nullptr) {
324 fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
331 static grpc_fd* fd_create(
int fd,
const char*
name,
bool track_err) {
335 if (fd_freelist !=
nullptr) {
336 new_fd = fd_freelist;
337 fd_freelist = fd_freelist->freelist_next;
341 if (new_fd ==
nullptr) {
343 new_fd->read_closure.Init();
344 new_fd->write_closure.Init();
345 new_fd->error_closure.Init();
348 new_fd->read_closure->InitEvent();
349 new_fd->write_closure->InitEvent();
350 new_fd->error_closure->InitEvent();
352 new_fd->freelist_next =
nullptr;
356 fork_fd_list_add_grpc_fd(new_fd);
370 ev.data.ptr =
reinterpret_cast<void*
>(
reinterpret_cast<intptr_t>(new_fd) |
371 (track_err ? 1 : 0));
379 static int fd_wrapped_fd(
grpc_fd*
fd) {
return fd->fd; }
388 shutdown(
fd->fd, SHUT_RDWR);
405 fd_shutdown_internal(
fd, why,
false);
409 const char* reason) {
411 bool is_release_fd = (release_fd !=
nullptr);
413 if (!
fd->read_closure->IsShutdown()) {
421 *release_fd =
fd->fd;
429 fork_fd_list_remove_grpc_fd(
fd);
430 fd->read_closure->DestroyEvent();
431 fd->write_closure->DestroyEvent();
432 fd->error_closure->DestroyEvent();
435 fd->freelist_next = fd_freelist;
440 static bool fd_is_shutdown(
grpc_fd*
fd) {
441 return fd->read_closure->IsShutdown();
456 static void fd_become_readable(
grpc_fd*
fd) {
fd->read_closure->SetReady(); }
458 static void fd_become_writable(
grpc_fd*
fd) {
fd->write_closure->SetReady(); }
460 static void fd_has_errors(
grpc_fd*
fd) {
fd->error_closure->SetReady(); }
470 static gpr_atm g_active_poller;
472 static pollset_neighborhood* g_neighborhoods;
473 static size_t g_num_neighborhoods;
477 if (pollset->root_worker ==
nullptr) {
478 pollset->root_worker =
worker;
482 worker->next = pollset->root_worker;
491 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
493 static worker_remove_result worker_remove(
grpc_pollset* pollset,
495 if (
worker == pollset->root_worker) {
497 pollset->root_worker =
nullptr;
500 pollset->root_worker =
worker->next;
512 static size_t choose_neighborhood(
void) {
523 ev.data.ptr = &global_wakeup_fd;
528 g_num_neighborhoods =
530 g_neighborhoods =
static_cast<pollset_neighborhood*
>(
531 gpr_zalloc(
sizeof(*g_neighborhoods) * g_num_neighborhoods));
532 for (
size_t i = 0;
i < g_num_neighborhoods;
i++) {
538 static void pollset_global_shutdown(
void) {
540 for (
size_t i = 0;
i < g_num_neighborhoods;
i++) {
549 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
550 pollset->reassigning_neighborhood =
false;
551 pollset->root_worker =
nullptr;
552 pollset->kicked_without_poller =
false;
553 pollset->seen_inactive =
true;
554 pollset->shutting_down =
false;
555 pollset->shutdown_closure =
nullptr;
556 pollset->begin_refs = 0;
557 pollset->next = pollset->prev =
nullptr;
562 if (!pollset->seen_inactive) {
563 pollset_neighborhood* neighborhood = pollset->neighborhood;
565 retry_lock_neighborhood:
568 if (!pollset->seen_inactive) {
569 if (pollset->neighborhood != neighborhood) {
571 neighborhood = pollset->neighborhood;
573 goto retry_lock_neighborhood;
575 pollset->prev->next = pollset->next;
576 pollset->next->prev = pollset->prev;
577 if (pollset == pollset->neighborhood->active_root) {
578 pollset->neighborhood->active_root =
579 pollset->next == pollset ? nullptr : pollset->next;
591 if (pollset->root_worker !=
nullptr) {
600 SET_KICK_STATE(
worker, KICKED);
601 if (
worker->initialized_cv) {
606 case DESIGNATED_POLLER:
608 SET_KICK_STATE(
worker, KICKED);
615 }
while (
worker != pollset->root_worker);
622 static void pollset_maybe_finish_shutdown(
grpc_pollset* pollset) {
623 if (pollset->shutdown_closure !=
nullptr && pollset->root_worker ==
nullptr &&
624 pollset->begin_refs == 0) {
628 pollset->shutdown_closure =
nullptr;
634 GPR_ASSERT(pollset->shutdown_closure ==
nullptr);
636 pollset->shutdown_closure =
closure;
637 pollset->shutting_down =
true;
639 pollset_maybe_finish_shutdown(pollset);
645 if (delta > INT_MAX) {
647 }
else if (delta < 0) {
650 return static_cast<int>(delta);
665 static const char* err_desc =
"process_events";
670 (
idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
674 void* data_ptr = ev->data.ptr;
676 if (data_ptr == &global_wakeup_fd) {
686 bool read_ev = (ev->
events & (EPOLLIN | EPOLLPRI)) != 0;
687 bool write_ev = (ev->
events & EPOLLOUT) != 0;
688 bool err_fallback =
error && !track_err;
690 if (
error && !err_fallback) {
694 if (read_ev ||
cancel || err_fallback) {
695 fd_become_readable(
fd);
698 if (write_ev ||
cancel || err_fallback) {
699 fd_become_writable(
fd);
719 int timeout = poll_deadline_to_millis_timeout(deadline);
725 r =
epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
727 }
while (
r < 0 && errno == EINTR);
750 if (worker_hdl !=
nullptr) *worker_hdl =
worker;
751 worker->initialized_cv =
false;
752 SET_KICK_STATE(
worker, UNKICKED);
754 pollset->begin_refs++;
760 if (pollset->seen_inactive) {
763 bool is_reassigning =
false;
764 if (!pollset->reassigning_neighborhood) {
765 is_reassigning =
true;
766 pollset->reassigning_neighborhood =
true;
767 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
769 pollset_neighborhood* neighborhood = pollset->neighborhood;
772 retry_lock_neighborhood:
776 gpr_log(
GPR_INFO,
"PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
780 if (pollset->seen_inactive) {
781 if (neighborhood != pollset->neighborhood) {
783 neighborhood = pollset->neighborhood;
785 goto retry_lock_neighborhood;
797 if (
worker->state == UNKICKED) {
798 pollset->seen_inactive =
false;
799 if (neighborhood->active_root ==
nullptr) {
800 neighborhood->active_root = pollset->next = pollset->prev = pollset;
802 if (
worker->state == UNKICKED &&
805 SET_KICK_STATE(
worker, DESIGNATED_POLLER);
808 pollset->next = neighborhood->active_root;
809 pollset->prev = pollset->next->prev;
810 pollset->next->prev = pollset->prev->next = pollset;
814 if (is_reassigning) {
815 GPR_ASSERT(pollset->reassigning_neighborhood);
816 pollset->reassigning_neighborhood =
false;
821 worker_insert(pollset,
worker);
822 pollset->begin_refs--;
823 if (
worker->state == UNKICKED && !pollset->kicked_without_poller) {
825 worker->initialized_cv =
true;
827 while (
worker->state == UNKICKED && !pollset->shutting_down) {
831 pollset->shutting_down);
836 worker->state == UNKICKED) {
839 SET_KICK_STATE(
worker, KICKED);
847 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
848 "kicked_without_poller: %d",
850 pollset->shutting_down, pollset->kicked_without_poller);
862 if (pollset->kicked_without_poller) {
863 pollset->kicked_without_poller =
false;
867 return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
870 static bool check_neighborhood_for_available_poller(
871 pollset_neighborhood* neighborhood) {
873 bool found_worker =
false;
876 if (inspect ==
nullptr) {
882 if (inspect_worker !=
nullptr) {
884 switch (inspect_worker->state) {
888 reinterpret_cast<gpr_atm>(inspect_worker))) {
893 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
894 if (inspect_worker->initialized_cv) {
909 case DESIGNATED_POLLER:
914 inspect_worker = inspect_worker->next;
915 }
while (!found_worker && inspect_worker != inspect->root_worker);
921 inspect->seen_inactive =
true;
922 if (inspect == neighborhood->active_root) {
923 neighborhood->active_root =
924 inspect->next == inspect ? nullptr : inspect->next;
926 inspect->next->prev = inspect->prev;
927 inspect->prev->next = inspect->next;
928 inspect->next = inspect->prev =
nullptr;
931 }
while (!found_worker);
941 if (worker_hdl !=
nullptr) *worker_hdl =
nullptr;
943 SET_KICK_STATE(
worker, KICKED);
954 SET_KICK_STATE(
worker->next, DESIGNATED_POLLER);
964 size_t poller_neighborhood_idx =
965 static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
967 bool found_worker =
false;
968 bool scan_state[MAX_NEIGHBORHOODS];
969 for (
size_t i = 0; !found_worker &&
i < g_num_neighborhoods;
i++) {
970 pollset_neighborhood* neighborhood =
971 &g_neighborhoods[(poller_neighborhood_idx +
i) %
972 g_num_neighborhoods];
974 found_worker = check_neighborhood_for_available_poller(neighborhood);
976 scan_state[
i] =
true;
978 scan_state[
i] =
false;
981 for (
size_t i = 0; !found_worker &&
i < g_num_neighborhoods;
i++) {
982 if (scan_state[
i])
continue;
983 pollset_neighborhood* neighborhood =
984 &g_neighborhoods[(poller_neighborhood_idx +
i) %
985 g_num_neighborhoods];
987 found_worker = check_neighborhood_for_available_poller(neighborhood);
998 if (
worker->initialized_cv) {
1004 if (EMPTIED == worker_remove(pollset,
worker)) {
1005 pollset_maybe_finish_shutdown(pollset);
1020 static const char* err_desc =
"pollset_work";
1021 if (ps->kicked_without_poller) {
1022 ps->kicked_without_poller =
false;
1026 if (begin_worker(ps, &
worker, worker_hdl, deadline)) {
1027 g_current_thread_pollset = ps;
1028 g_current_thread_worker = &
worker;
1049 append_error(&
error, do_epoll_wait(ps, deadline), err_desc);
1051 append_error(&
error, process_epoll_events(ps), err_desc);
1055 g_current_thread_worker =
nullptr;
1057 g_current_thread_pollset = ps;
1059 end_worker(ps, &
worker, worker_hdl);
1061 g_current_thread_pollset =
nullptr;
1071 std::vector<std::string>
log;
1073 "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
1074 static_cast<void*
>(g_current_thread_pollset),
1075 static_cast<void*
>(g_current_thread_worker), pollset->root_worker));
1076 if (pollset->root_worker !=
nullptr) {
1078 " {kick_state=%s next=%p {kick_state=%s}}",
1079 kick_state_string(pollset->root_worker->state),
1080 pollset->root_worker->next,
1081 kick_state_string(pollset->root_worker->next->state)));
1083 if (specific_worker !=
nullptr) {
1085 kick_state_string(specific_worker->state)));
1090 if (specific_worker ==
nullptr) {
1091 if (g_current_thread_pollset != pollset) {
1093 if (root_worker ==
nullptr) {
1095 pollset->kicked_without_poller =
true;
1102 if (root_worker->state == KICKED) {
1107 SET_KICK_STATE(root_worker, KICKED);
1109 }
else if (next_worker->state == KICKED) {
1114 SET_KICK_STATE(next_worker, KICKED);
1116 }
else if (root_worker == next_worker &&
1125 SET_KICK_STATE(root_worker, KICKED);
1128 }
else if (next_worker->state == UNKICKED) {
1134 SET_KICK_STATE(next_worker, KICKED);
1137 }
else if (next_worker->state == DESIGNATED_POLLER) {
1138 if (root_worker->state != DESIGNATED_POLLER) {
1142 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1143 root_worker, root_worker->initialized_cv, next_worker);
1145 SET_KICK_STATE(root_worker, KICKED);
1146 if (root_worker->initialized_cv) {
1157 SET_KICK_STATE(next_worker, KICKED);
1164 SET_KICK_STATE(next_worker, KICKED);
1178 if (specific_worker->state == KICKED) {
1183 }
else if (g_current_thread_worker == specific_worker) {
1188 SET_KICK_STATE(specific_worker, KICKED);
1190 }
else if (specific_worker ==
1197 SET_KICK_STATE(specific_worker, KICKED);
1200 }
else if (specific_worker->initialized_cv) {
1205 SET_KICK_STATE(specific_worker, KICKED);
1213 SET_KICK_STATE(specific_worker, KICKED);
1252 static bool is_any_background_poller_thread(
void) {
return false; }
1254 static void shutdown_background_closure(
void) {}
1256 static bool add_closure_to_background_poller(
grpc_closure* ,
1261 static void shutdown_engine(
void) {
1262 fd_global_shutdown();
1263 pollset_global_shutdown();
1264 epoll_set_shutdown();
1271 static bool init_epoll1_linux();
1298 pollset_set_destroy,
1299 pollset_set_add_pollset,
1300 pollset_set_del_pollset,
1301 pollset_set_add_pollset_set,
1302 pollset_set_del_pollset_set,
1306 is_any_background_poller_thread,
1308 [](
bool) {
return init_epoll1_linux(); },
1310 shutdown_background_closure,
1312 add_closure_to_background_poller,
1318 static void reset_event_manager_on_fork() {
1320 while (fork_fd_list_head !=
nullptr) {
1321 close(fork_fd_list_head->fd);
1322 fork_fd_list_head->fd = -1;
1323 fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1327 init_epoll1_linux();
1333 static bool init_epoll1_linux() {
1339 if (!epoll_set_init()) {
1346 fd_global_shutdown();
1347 epoll_set_shutdown();
1354 reset_event_manager_on_fork);
1360 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1397 [](
bool) {
return false; },