ev_poll_posix.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #ifdef GRPC_POSIX_SOCKET_EV_POLL
24 
25 #include <assert.h>
26 #include <errno.h>
27 #include <limits.h>
28 #include <poll.h>
29 #include <string.h>
30 #include <sys/socket.h>
31 #include <unistd.h>
32 
33 #include <string>
34 
35 #include "absl/strings/str_cat.h"
36 
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 
42 #include "src/core/lib/gpr/tls.h"
44 #include "src/core/lib/gprpp/thd.h"
50 
51 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
52 
53 /*******************************************************************************
54  * FD declarations
55  */
56 typedef struct grpc_fd_watcher {
57  struct grpc_fd_watcher* next;
58  struct grpc_fd_watcher* prev;
59  grpc_pollset* pollset;
61  grpc_fd* fd;
62 } grpc_fd_watcher;
63 
64 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
65 
66 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
67 struct grpc_fork_fd_list {
68  /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
69  set to nullptr. */
70  grpc_fd* fd;
71  grpc_cached_wakeup_fd* cached_wakeup_fd;
72 
73  grpc_fork_fd_list* next;
74  grpc_fork_fd_list* prev;
75 };
76 
77 struct grpc_fd {
78  int fd;
79  /* refst format:
80  bit0: 1=active/0=orphaned
81  bit1-n: refcount
82  meaning that mostly we ref by two to avoid altering the orphaned bit,
83  and just unref by 1 when we're ready to flag the object as orphaned */
84  gpr_atm refst;
85 
86  gpr_mu mu;
87  int shutdown;
88  int closed;
89  int released;
90  gpr_atm pollhup;
91  grpc_error_handle shutdown_error;
92 
93  /* The watcher list.
94 
95  The following watcher related fields are protected by watcher_mu.
96 
97  An fd_watcher is an ephemeral object created when an fd wants to
98  begin polling, and destroyed after the poll.
99 
100  It denotes the fd's interest in whether to read poll or write poll
101  or both or neither on this fd.
102 
103  If a watcher is asked to poll for reads or writes, the read_watcher
104  or write_watcher fields are set respectively. A watcher may be asked
105  to poll for both, in which case both fields will be set.
106 
107  read_watcher and write_watcher may be NULL if no watcher has been
108  asked to poll for reads or writes.
109 
110  If an fd_watcher is not asked to poll for reads or writes, it's added
111  to a linked list of inactive watchers, rooted at inactive_watcher_root.
112  If at a later time there becomes need of a poller to poll, one of
113  the inactive pollers may be kicked out of their poll loops to take
114  that responsibility. */
115  grpc_fd_watcher inactive_watcher_root;
116  grpc_fd_watcher* read_watcher;
117  grpc_fd_watcher* write_watcher;
118 
119  grpc_closure* read_closure;
120  grpc_closure* write_closure;
121 
122  grpc_closure* on_done_closure;
123 
124  grpc_iomgr_object iomgr_object;
125 
126  /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
127  grpc_fork_fd_list* fork_fd_list;
128 };
129 
130 /* True when GRPC_ENABLE_FORK_SUPPORT=1. */
131 static bool track_fds_for_fork = false;
132 
133 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
134 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
135 static gpr_mu fork_fd_list_mu;
136 
137 /* Begin polling on an fd.
138  Registers that the given pollset is interested in this fd - so that if read
139  or writability interest changes, the pollset can be kicked to pick up that
140  new interest.
141  Return value is:
142  (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
143  i.e. a combination of read_mask and write_mask determined by the fd's current
144  interest in said events.
145  Polling strategies that do not need to alter their behavior depending on the
146  fd's current interest (such as epoll) do not need to call this function.
147  MUST NOT be called with a pollset lock taken */
148 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
149  grpc_pollset_worker* worker, uint32_t read_mask,
150  uint32_t write_mask, grpc_fd_watcher* watcher);
151 /* Complete polling previously started with fd_begin_poll
152  MUST NOT be called with a pollset lock taken
153  if got_read or got_write are 1, also does the become_{readable,writable} as
154  appropriate. */
155 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write);
156 
157 /* Return 1 if this fd is orphaned, 0 otherwise */
158 static bool fd_is_orphaned(grpc_fd* fd);
159 
160 #ifndef NDEBUG
161 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
162 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
163  int line);
164 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
165 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
166 #else
167 static void fd_ref(grpc_fd* fd);
168 static void fd_unref(grpc_fd* fd);
169 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
170 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
171 #endif
172 
173 #define CLOSURE_NOT_READY ((grpc_closure*)0)
174 #define CLOSURE_READY ((grpc_closure*)1)
175 
176 /*******************************************************************************
177  * pollset declarations
178  */
179 
180 typedef struct grpc_cached_wakeup_fd {
181  grpc_wakeup_fd fd;
182  struct grpc_cached_wakeup_fd* next;
183 
184  /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
185  grpc_fork_fd_list* fork_fd_list;
186 } grpc_cached_wakeup_fd;
187 
188 struct grpc_pollset_worker {
189  grpc_cached_wakeup_fd* wakeup_fd;
190  int reevaluate_polling_on_wakeup;
191  int kicked_specifically;
192  struct grpc_pollset_worker* next;
193  struct grpc_pollset_worker* prev;
194 };
195 
196 struct grpc_pollset {
197  gpr_mu mu;
198  grpc_pollset_worker root_worker;
199  int shutting_down;
200  int called_shutdown;
201  int kicked_without_pollers;
202  grpc_closure* shutdown_done;
203  int pollset_set_count;
204  /* all polled fds */
205  size_t fd_count;
206  size_t fd_capacity;
207  grpc_fd** fds;
208  /* Local cache of eventfds for workers */
209  grpc_cached_wakeup_fd* local_wakeup_cache;
210 };
211 
212 /* Add an fd to a pollset */
213 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
214 
215 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
216 
217 /* Convert a timespec to milliseconds:
218  - very small or negative poll times are clamped to zero to do a
219  non-blocking poll (which becomes spin polling)
220  - other small values are rounded up to one millisecond
221  - longer than a millisecond polls are rounded up to the next nearest
222  millisecond to avoid spinning
223  - infinite timeouts are converted to -1 */
224 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline);
225 
226 /* Allow kick to wakeup the currently polling worker */
227 #define GRPC_POLLSET_CAN_KICK_SELF 1
228 /* Force the wakee to repoll when awoken */
229 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
230 /* As per pollset_kick, with an extended set of flags (defined above)
231  -- mostly for fd_posix's use. */
232 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
233  grpc_pollset_worker* specific_worker,
235 
236 /* Return 1 if the pollset has active threads in pollset_work (pollset must
237  * be locked) */
238 static bool pollset_has_workers(grpc_pollset* pollset);
239 
240 /*******************************************************************************
241  * pollset_set definitions
242  */
243 
244 struct grpc_pollset_set {
245  gpr_mu mu;
246 
247  size_t pollset_count;
248  size_t pollset_capacity;
249  grpc_pollset** pollsets;
250 
251  size_t pollset_set_count;
252  size_t pollset_set_capacity;
253  struct grpc_pollset_set** pollset_sets;
254 
255  size_t fd_count;
256  size_t fd_capacity;
257  grpc_fd** fds;
258 };
259 
260 /*******************************************************************************
261  * functions to track opened fds. No-ops unless track_fds_for_fork is true.
262  */
263 
264 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
265  if (track_fds_for_fork) {
266  gpr_mu_lock(&fork_fd_list_mu);
267  if (fork_fd_list_head == node) {
268  fork_fd_list_head = node->next;
269  }
270  if (node->prev != nullptr) {
271  node->prev->next = node->next;
272  }
273  if (node->next != nullptr) {
274  node->next->prev = node->prev;
275  }
276  gpr_free(node);
277  gpr_mu_unlock(&fork_fd_list_mu);
278  }
279 }
280 
281 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
282  gpr_mu_lock(&fork_fd_list_mu);
283  node->next = fork_fd_list_head;
284  node->prev = nullptr;
285  if (fork_fd_list_head != nullptr) {
286  fork_fd_list_head->prev = node;
287  }
288  fork_fd_list_head = node;
289  gpr_mu_unlock(&fork_fd_list_mu);
290 }
291 
292 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
293  if (track_fds_for_fork) {
294  fd->fork_fd_list =
295  static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
296  fd->fork_fd_list->fd = fd;
297  fd->fork_fd_list->cached_wakeup_fd = nullptr;
298  fork_fd_list_add_node(fd->fork_fd_list);
299  }
300 }
301 
302 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
303  if (track_fds_for_fork) {
304  fd->fork_fd_list =
305  static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
306  fd->fork_fd_list->cached_wakeup_fd = fd;
307  fd->fork_fd_list->fd = nullptr;
308  fork_fd_list_add_node(fd->fork_fd_list);
309  }
310 }
311 
312 /*******************************************************************************
313  * fd_posix.c
314  */
315 
316 #ifndef NDEBUG
317 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
318 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
319 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
320  int line) {
323  "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
324  fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
325  gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
326  }
327 #else
328 #define REF_BY(fd, n, reason) \
329  do { \
330  ref_by(fd, n); \
331  (void)(reason); \
332  } while (0)
333 #define UNREF_BY(fd, n, reason) \
334  do { \
335  unref_by(fd, n); \
336  (void)(reason); \
337  } while (0)
338 static void ref_by(grpc_fd* fd, int n) {
339 #endif
340  GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
341 }
342 
343 #ifndef NDEBUG
344 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
345  int line) {
348  "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
349  fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
350  gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
351  }
352 #else
353 static void unref_by(grpc_fd* fd, int n) {
354 #endif
355  gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
356  if (old == n) {
357  gpr_mu_destroy(&fd->mu);
358  grpc_iomgr_unregister_object(&fd->iomgr_object);
359  fork_fd_list_remove_node(fd->fork_fd_list);
360  if (fd->shutdown) {
361  GRPC_ERROR_UNREF(fd->shutdown_error);
362  }
363 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
364  fd->shutdown_error.~Status();
365 #endif
366  gpr_free(fd);
367  } else {
368  GPR_ASSERT(old > n);
369  }
370 }
371 
372 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
373  // Avoid unused-parameter warning for debug-only parameter
374  (void)track_err;
375  GPR_DEBUG_ASSERT(track_err == false);
376  grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
377  gpr_mu_init(&r->mu);
378  gpr_atm_rel_store(&r->refst, 1);
379  r->shutdown = 0;
380 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
381  new (&r->shutdown_error) absl::Status();
382 #endif
383  r->read_closure = CLOSURE_NOT_READY;
384  r->write_closure = CLOSURE_NOT_READY;
385  r->fd = fd;
386  r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
387  &r->inactive_watcher_root;
388  r->read_watcher = r->write_watcher = nullptr;
389  r->on_done_closure = nullptr;
390  r->closed = 0;
391  r->released = 0;
392  gpr_atm_no_barrier_store(&r->pollhup, 0);
393 
394  std::string name2 = absl::StrCat(name, " fd=", fd);
395  grpc_iomgr_register_object(&r->iomgr_object, name2.c_str());
396  fork_fd_list_add_grpc_fd(r);
397  return r;
398 }
399 
400 static bool fd_is_orphaned(grpc_fd* fd) {
401  return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
402 }
403 
404 static grpc_error_handle pollset_kick_locked(grpc_fd_watcher* watcher) {
405  gpr_mu_lock(&watcher->pollset->mu);
406  GPR_ASSERT(watcher->worker);
408  pollset_kick_ext(watcher->pollset, watcher->worker,
409  GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
410  gpr_mu_unlock(&watcher->pollset->mu);
411  return err;
412 }
413 
414 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
415  if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
416  (void)pollset_kick_locked(fd->inactive_watcher_root.next);
417  } else if (fd->read_watcher) {
418  (void)pollset_kick_locked(fd->read_watcher);
419  } else if (fd->write_watcher) {
420  (void)pollset_kick_locked(fd->write_watcher);
421  }
422 }
423 
424 static void wake_all_watchers_locked(grpc_fd* fd) {
425  grpc_fd_watcher* watcher;
426  for (watcher = fd->inactive_watcher_root.next;
427  watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
428  (void)pollset_kick_locked(watcher);
429  }
430  if (fd->read_watcher) {
431  (void)pollset_kick_locked(fd->read_watcher);
432  }
433  if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
434  (void)pollset_kick_locked(fd->write_watcher);
435  }
436 }
437 
438 static int has_watchers(grpc_fd* fd) {
439  return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
440  fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
441 }
442 
443 static void close_fd_locked(grpc_fd* fd) {
444  fd->closed = 1;
445  if (!fd->released) {
446  close(fd->fd);
447  }
449 }
450 
451 static int fd_wrapped_fd(grpc_fd* fd) {
452  if (fd->released || fd->closed) {
453  return -1;
454  } else {
455  return fd->fd;
456  }
457 }
458 
459 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
460  const char* reason) {
461  fd->on_done_closure = on_done;
462  fd->released = release_fd != nullptr;
463  if (release_fd != nullptr) {
464  *release_fd = fd->fd;
465  fd->released = true;
466  }
467  gpr_mu_lock(&fd->mu);
468  REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
469  if (!has_watchers(fd)) {
470  close_fd_locked(fd);
471  } else {
472  wake_all_watchers_locked(fd);
473  }
474  gpr_mu_unlock(&fd->mu);
475  UNREF_BY(fd, 2, reason); /* drop the reference */
476 }
477 
478 /* increment refcount by two to avoid changing the orphan bit */
479 #ifndef NDEBUG
480 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
481  int line) {
482  ref_by(fd, 2, reason, file, line);
483 }
484 
485 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
486  int line) {
487  unref_by(fd, 2, reason, file, line);
488 }
489 #else
490 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
491 
492 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
493 #endif
494 
495 static grpc_error_handle fd_shutdown_error(grpc_fd* fd) {
496  if (!fd->shutdown) {
497  return GRPC_ERROR_NONE;
498  } else {
500  "FD shutdown", &fd->shutdown_error, 1),
503  }
504 }
505 
506 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
508  if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
514  } else if (*st == CLOSURE_NOT_READY) {
515  /* not ready ==> switch to a waiting state by setting the closure */
516  *st = closure;
517  } else if (*st == CLOSURE_READY) {
518  /* already ready ==> queue the closure to run immediately */
519  *st = CLOSURE_NOT_READY;
520  grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
521  maybe_wake_one_watcher_locked(fd);
522  } else {
523  /* upcallptr was set to a different closure. This is an error! */
525  "User called a notify_on function with a previous callback still "
526  "pending");
527  abort();
528  }
529 }
530 
531 /* returns 1 if state becomes not ready */
532 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
533  if (*st == CLOSURE_READY) {
534  /* duplicate ready ==> ignore */
535  return 0;
536  } else if (*st == CLOSURE_NOT_READY) {
537  /* not ready, and not waiting ==> flag ready */
538  *st = CLOSURE_READY;
539  return 0;
540  } else {
541  /* waiting ==> queue closure */
542  grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
543  *st = CLOSURE_NOT_READY;
544  return 1;
545  }
546 }
547 
548 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
549  gpr_mu_lock(&fd->mu);
550  /* only shutdown once */
551  if (!fd->shutdown) {
552  fd->shutdown = 1;
553  fd->shutdown_error = why;
554  /* signal read/write closed to OS so that future operations fail */
555  shutdown(fd->fd, SHUT_RDWR);
556  set_ready_locked(fd, &fd->read_closure);
557  set_ready_locked(fd, &fd->write_closure);
558  } else {
559  GRPC_ERROR_UNREF(why);
560  }
561  gpr_mu_unlock(&fd->mu);
562 }
563 
564 static bool fd_is_shutdown(grpc_fd* fd) {
565  gpr_mu_lock(&fd->mu);
566  bool r = fd->shutdown;
567  gpr_mu_unlock(&fd->mu);
568  return r;
569 }
570 
571 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
572  gpr_mu_lock(&fd->mu);
573  notify_on_locked(fd, &fd->read_closure, closure);
574  gpr_mu_unlock(&fd->mu);
575 }
576 
577 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
578  gpr_mu_lock(&fd->mu);
579  notify_on_locked(fd, &fd->write_closure, closure);
580  gpr_mu_unlock(&fd->mu);
581 }
582 
583 static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
585  gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
586  }
588 }
589 
590 static void fd_set_readable(grpc_fd* fd) {
591  gpr_mu_lock(&fd->mu);
592  set_ready_locked(fd, &fd->read_closure);
593  gpr_mu_unlock(&fd->mu);
594 }
595 
596 static void fd_set_writable(grpc_fd* fd) {
597  gpr_mu_lock(&fd->mu);
598  set_ready_locked(fd, &fd->write_closure);
599  gpr_mu_unlock(&fd->mu);
600 }
601 
602 static void fd_set_error(grpc_fd* /*fd*/) {
604  gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
605  }
606 }
607 
608 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
609  grpc_pollset_worker* worker, uint32_t read_mask,
610  uint32_t write_mask, grpc_fd_watcher* watcher) {
611  uint32_t mask = 0;
612  grpc_closure* cur;
613  int requested;
614  /* keep track of pollers that have requested our events, in case they change
615  */
616  GRPC_FD_REF(fd, "poll");
617 
618  gpr_mu_lock(&fd->mu);
619 
620  /* if we are shutdown, then don't add to the watcher set */
621  if (fd->shutdown) {
622  watcher->fd = nullptr;
623  watcher->pollset = nullptr;
624  watcher->worker = nullptr;
625  gpr_mu_unlock(&fd->mu);
626  GRPC_FD_UNREF(fd, "poll");
627  return 0;
628  }
629 
630  /* if there is nobody polling for read, but we need to, then start doing so */
631  cur = fd->read_closure;
632  requested = cur != CLOSURE_READY;
633  if (read_mask && fd->read_watcher == nullptr && requested) {
634  fd->read_watcher = watcher;
635  mask |= read_mask;
636  }
637  /* if there is nobody polling for write, but we need to, then start doing so
638  */
639  cur = fd->write_closure;
640  requested = cur != CLOSURE_READY;
641  if (write_mask && fd->write_watcher == nullptr && requested) {
642  fd->write_watcher = watcher;
643  mask |= write_mask;
644  }
645  /* if not polling, remember this watcher in case we need someone to later */
646  if (mask == 0 && worker != nullptr) {
647  watcher->next = &fd->inactive_watcher_root;
648  watcher->prev = watcher->next->prev;
649  watcher->next->prev = watcher->prev->next = watcher;
650  }
651  watcher->pollset = pollset;
652  watcher->worker = worker;
653  watcher->fd = fd;
654  gpr_mu_unlock(&fd->mu);
655 
656  return mask;
657 }
658 
659 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
660  int was_polling = 0;
661  int kick = 0;
662  grpc_fd* fd = watcher->fd;
663 
664  if (fd == nullptr) {
665  return;
666  }
667 
668  gpr_mu_lock(&fd->mu);
669 
670  if (watcher == fd->read_watcher) {
671  /* remove read watcher, kick if we still need a read */
672  was_polling = 1;
673  if (!got_read) {
674  kick = 1;
675  }
676  fd->read_watcher = nullptr;
677  }
678  if (watcher == fd->write_watcher) {
679  /* remove write watcher, kick if we still need a write */
680  was_polling = 1;
681  if (!got_write) {
682  kick = 1;
683  }
684  fd->write_watcher = nullptr;
685  }
686  if (!was_polling && watcher->worker != nullptr) {
687  /* remove from inactive list */
688  watcher->next->prev = watcher->prev;
689  watcher->prev->next = watcher->next;
690  }
691  if (got_read) {
692  if (set_ready_locked(fd, &fd->read_closure)) {
693  kick = 1;
694  }
695  }
696  if (got_write) {
697  if (set_ready_locked(fd, &fd->write_closure)) {
698  kick = 1;
699  }
700  }
701  if (kick) {
702  maybe_wake_one_watcher_locked(fd);
703  }
704  if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
705  close_fd_locked(fd);
706  }
707  gpr_mu_unlock(&fd->mu);
708 
709  GRPC_FD_UNREF(fd, "poll");
710 }
711 
712 /*******************************************************************************
713  * pollset_posix.c
714  */
715 
716 static GPR_THREAD_LOCAL(grpc_pollset*) g_current_thread_poller;
717 static GPR_THREAD_LOCAL(grpc_pollset_worker*) g_current_thread_worker;
718 
719 static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {
720  worker->prev->next = worker->next;
721  worker->next->prev = worker->prev;
722 }
723 
724 static bool pollset_has_workers(grpc_pollset* p) {
725  return p->root_worker.next != &p->root_worker;
726 }
727 
728 static bool pollset_in_pollset_sets(grpc_pollset* p) {
729  return p->pollset_set_count;
730 }
731 
732 static bool pollset_has_observers(grpc_pollset* p) {
733  return pollset_has_workers(p) || pollset_in_pollset_sets(p);
734 }
735 
736 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
737  if (pollset_has_workers(p)) {
738  grpc_pollset_worker* w = p->root_worker.next;
739  remove_worker(p, w);
740  return w;
741  } else {
742  return nullptr;
743  }
744 }
745 
746 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
747  worker->next = &p->root_worker;
748  worker->prev = worker->next->prev;
749  worker->prev->next = worker->next->prev = worker;
750 }
751 
752 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
753  worker->prev = &p->root_worker;
754  worker->next = worker->prev->next;
755  worker->prev->next = worker->next->prev = worker;
756 }
757 
758 static void kick_append_error(grpc_error_handle* composite,
760  if (GRPC_ERROR_IS_NONE(error)) return;
761  if (GRPC_ERROR_IS_NONE(*composite)) {
762  *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
763  }
764  *composite = grpc_error_add_child(*composite, error);
765 }
766 
767 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
768  grpc_pollset_worker* specific_worker,
769  uint32_t flags) {
770  GPR_TIMER_SCOPE("pollset_kick_ext", 0);
773 
774  /* pollset->mu already held */
775  if (specific_worker != nullptr) {
776  if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
777  GPR_TIMER_SCOPE("pollset_kick_ext.broadcast", 0);
778  GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
779  for (specific_worker = p->root_worker.next;
780  specific_worker != &p->root_worker;
781  specific_worker = specific_worker->next) {
782  kick_append_error(
783  &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
784  }
785  p->kicked_without_pollers = true;
786  } else if (g_current_thread_worker != specific_worker) {
787  GPR_TIMER_MARK("different_thread_worker", 0);
788  if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
789  specific_worker->reevaluate_polling_on_wakeup = true;
790  }
791  specific_worker->kicked_specifically = true;
792  kick_append_error(&error,
793  grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
794  } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
795  GPR_TIMER_MARK("kick_yoself", 0);
796  if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
797  specific_worker->reevaluate_polling_on_wakeup = true;
798  }
799  specific_worker->kicked_specifically = true;
800  kick_append_error(&error,
801  grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
802  }
803  } else if (g_current_thread_poller != p) {
804  GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
805  GPR_TIMER_MARK("kick_anonymous", 0);
806  specific_worker = pop_front_worker(p);
807  if (specific_worker != nullptr) {
808  if (g_current_thread_worker == specific_worker) {
809  GPR_TIMER_MARK("kick_anonymous_not_self", 0);
810  push_back_worker(p, specific_worker);
811  specific_worker = pop_front_worker(p);
812  if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
813  g_current_thread_worker == specific_worker) {
814  push_back_worker(p, specific_worker);
815  specific_worker = nullptr;
816  }
817  }
818  if (specific_worker != nullptr) {
819  GPR_TIMER_MARK("finally_kick", 0);
820  push_back_worker(p, specific_worker);
821  kick_append_error(
822  &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
823  }
824  } else {
825  GPR_TIMER_MARK("kicked_no_pollers", 0);
826  p->kicked_without_pollers = true;
827  }
828  }
829 
830  GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
831  return error;
832 }
833 
835  grpc_pollset_worker* specific_worker) {
836  return pollset_kick_ext(p, specific_worker, 0);
837 }
838 
839 /* global state management */
840 
841 static grpc_error_handle pollset_global_init(void) { return GRPC_ERROR_NONE; }
842 
843 /* main interface */
844 
845 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
846  gpr_mu_init(&pollset->mu);
847  *mu = &pollset->mu;
848  pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
849  pollset->shutting_down = 0;
850  pollset->called_shutdown = 0;
851  pollset->kicked_without_pollers = 0;
852  pollset->local_wakeup_cache = nullptr;
853  pollset->kicked_without_pollers = 0;
854  pollset->fd_count = 0;
855  pollset->fd_capacity = 0;
856  pollset->fds = nullptr;
857  pollset->pollset_set_count = 0;
858 }
859 
860 static void pollset_destroy(grpc_pollset* pollset) {
861  GPR_ASSERT(!pollset_has_workers(pollset));
862  while (pollset->local_wakeup_cache) {
863  grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
864  fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
865  grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
866  gpr_free(pollset->local_wakeup_cache);
867  pollset->local_wakeup_cache = next;
868  }
869  gpr_free(pollset->fds);
870  gpr_mu_destroy(&pollset->mu);
871 }
872 
873 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
874  gpr_mu_lock(&pollset->mu);
875  size_t i;
876  /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
877  for (i = 0; i < pollset->fd_count; i++) {
878  if (pollset->fds[i] == fd) goto exit;
879  }
880  if (pollset->fd_count == pollset->fd_capacity) {
881  pollset->fd_capacity =
882  std::max(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
883  pollset->fds = static_cast<grpc_fd**>(
884  gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
885  }
886  pollset->fds[pollset->fd_count++] = fd;
887  GRPC_FD_REF(fd, "multipoller");
888  (void)pollset_kick(pollset, nullptr);
889 exit:
890  gpr_mu_unlock(&pollset->mu);
891 }
892 
893 static void finish_shutdown(grpc_pollset* pollset) {
894  size_t i;
895  for (i = 0; i < pollset->fd_count; i++) {
896  GRPC_FD_UNREF(pollset->fds[i], "multipoller");
897  }
898  pollset->fd_count = 0;
899  grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
901 }
902 
903 static void work_combine_error(grpc_error_handle* composite,
905  if (GRPC_ERROR_IS_NONE(error)) return;
906  if (GRPC_ERROR_IS_NONE(*composite)) {
907  *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
908  }
909  *composite = grpc_error_add_child(*composite, error);
910 }
911 
913  grpc_pollset_worker** worker_hdl,
914  grpc_core::Timestamp deadline) {
915  GPR_TIMER_SCOPE("pollset_work", 0);
917  if (worker_hdl) *worker_hdl = &worker;
919 
920  /* Avoid malloc for small number of elements. */
921  enum { inline_elements = 96 };
922  struct pollfd pollfd_space[inline_elements];
923  struct grpc_fd_watcher watcher_space[inline_elements];
924 
925  /* pollset->mu already held */
926  int added_worker = 0;
927  int locked = 1;
928  int queued_work = 0;
929  int keep_polling = 0;
930  /* this must happen before we (potentially) drop pollset->mu */
931  worker.next = worker.prev = nullptr;
932  worker.reevaluate_polling_on_wakeup = 0;
933  if (pollset->local_wakeup_cache != nullptr) {
934  worker.wakeup_fd = pollset->local_wakeup_cache;
935  pollset->local_wakeup_cache = worker.wakeup_fd->next;
936  } else {
937  worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
938  gpr_malloc(sizeof(*worker.wakeup_fd)));
939  error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
940  fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
941  if (!GRPC_ERROR_IS_NONE(error)) {
942  GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
943  return error;
944  }
945  }
946  worker.kicked_specifically = 0;
947  /* If we're shutting down then we don't execute any extended work */
948  if (pollset->shutting_down) {
949  GPR_TIMER_MARK("pollset_work.shutting_down", 0);
950  goto done;
951  }
952  /* Start polling, and keep doing so while we're being asked to
953  re-evaluate our pollers (this allows poll() based pollers to
954  ensure they don't miss wakeups) */
955  keep_polling = 1;
956  g_current_thread_poller = pollset;
957  while (keep_polling) {
958  keep_polling = 0;
959  if (!pollset->kicked_without_pollers ||
960  deadline <= grpc_core::ExecCtx::Get()->Now()) {
961  if (!added_worker) {
962  push_front_worker(pollset, &worker);
963  added_worker = 1;
964  g_current_thread_worker = &worker;
965  }
966  GPR_TIMER_SCOPE("maybe_work_and_unlock", 0);
967 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
968 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
969 
970  int timeout;
971  int r;
972  size_t i, fd_count;
973  nfds_t pfd_count;
974  grpc_fd_watcher* watchers;
975  struct pollfd* pfds;
976 
977  timeout = poll_deadline_to_millis_timeout(deadline);
978 
979  if (pollset->fd_count + 2 <= inline_elements) {
980  pfds = pollfd_space;
981  watchers = watcher_space;
982  } else {
983  /* Allocate one buffer to hold both pfds and watchers arrays */
984  const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
985  const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
986  void* buf = gpr_malloc(pfd_size + watch_size);
987  pfds = static_cast<struct pollfd*>(buf);
988  watchers = static_cast<grpc_fd_watcher*>(
989  static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
990  }
991 
992  fd_count = 0;
993  pfd_count = 1;
994  pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
995  pfds[0].events = POLLIN;
996  pfds[0].revents = 0;
997  for (i = 0; i < pollset->fd_count; i++) {
998  if (fd_is_orphaned(pollset->fds[i]) ||
999  gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
1000  GRPC_FD_UNREF(pollset->fds[i], "multipoller");
1001  } else {
1002  pollset->fds[fd_count++] = pollset->fds[i];
1003  watchers[pfd_count].fd = pollset->fds[i];
1004  GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
1005  pfds[pfd_count].fd = pollset->fds[i]->fd;
1006  pfds[pfd_count].revents = 0;
1007  pfd_count++;
1008  }
1009  }
1010  pollset->fd_count = fd_count;
1011  gpr_mu_unlock(&pollset->mu);
1012 
1013  for (i = 1; i < pfd_count; i++) {
1014  grpc_fd* fd = watchers[i].fd;
1015  pfds[i].events = static_cast<short>(
1016  fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1017  GRPC_FD_UNREF(fd, "multipoller_start");
1018  }
1019 
1020  /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1021  even going into the blocking annotation if possible */
1024  r = grpc_poll_function(pfds, pfd_count, timeout);
1026 
1028  gpr_log(GPR_INFO, "%p poll=%d", pollset, r);
1029  }
1030 
1031  if (r < 0) {
1032  if (errno != EINTR) {
1033  work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1034  }
1035 
1036  for (i = 1; i < pfd_count; i++) {
1037  if (watchers[i].fd == nullptr) {
1038  fd_end_poll(&watchers[i], 0, 0);
1039  } else {
1040  // Wake up all the file descriptors, if we have an invalid one
1041  // we can identify it on the next pollset_work()
1042  fd_end_poll(&watchers[i], 1, 1);
1043  }
1044  }
1045  } else if (r == 0) {
1046  for (i = 1; i < pfd_count; i++) {
1047  fd_end_poll(&watchers[i], 0, 0);
1048  }
1049  } else {
1050  if (pfds[0].revents & POLLIN_CHECK) {
1052  gpr_log(GPR_INFO, "%p: got_wakeup", pollset);
1053  }
1054  work_combine_error(
1055  &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1056  }
1057  for (i = 1; i < pfd_count; i++) {
1058  if (watchers[i].fd == nullptr) {
1059  fd_end_poll(&watchers[i], 0, 0);
1060  } else {
1062  gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
1063  pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1064  (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1065  }
1066  /* This is a mitigation to prevent poll() from spinning on a
1067  ** POLLHUP https://github.com/grpc/grpc/pull/13665
1068  */
1069  if (pfds[i].revents & POLLHUP) {
1070  gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1071  }
1072  fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1073  pfds[i].revents & POLLOUT_CHECK);
1074  }
1075  }
1076  }
1077 
1078  if (pfds != pollfd_space) {
1079  /* pfds and watchers are in the same memory block pointed to by pfds */
1080  gpr_free(pfds);
1081  }
1082 
1083  locked = 0;
1084  } else {
1085  GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1086  pollset->kicked_without_pollers = 0;
1087  }
1088  /* Finished execution - start cleaning up.
1089  Note that we may arrive here from outside the enclosing while() loop.
1090  In that case we won't loop though as we haven't added worker to the
1091  worker list, which means nobody could ask us to re-evaluate polling). */
1092  done:
1093  if (!locked) {
1094  queued_work |= grpc_core::ExecCtx::Get()->Flush();
1095  gpr_mu_lock(&pollset->mu);
1096  locked = 1;
1097  }
1098  /* If we're forced to re-evaluate polling (via pollset_kick with
1099  GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1100  a loop */
1101  if (worker.reevaluate_polling_on_wakeup && GRPC_ERROR_IS_NONE(error)) {
1102  worker.reevaluate_polling_on_wakeup = 0;
1103  pollset->kicked_without_pollers = 0;
1104  if (queued_work || worker.kicked_specifically) {
1105  /* If there's queued work on the list, then set the deadline to be
1106  immediate so we get back out of the polling loop quickly */
1107  deadline = grpc_core::Timestamp();
1108  }
1109  keep_polling = 1;
1110  }
1111  }
1112  g_current_thread_poller = nullptr;
1113  if (added_worker) {
1114  remove_worker(pollset, &worker);
1115  g_current_thread_worker = nullptr;
1116  }
1117  /* release wakeup fd to the local pool */
1118  worker.wakeup_fd->next = pollset->local_wakeup_cache;
1119  pollset->local_wakeup_cache = worker.wakeup_fd;
1120  /* check shutdown conditions */
1121  if (pollset->shutting_down) {
1122  if (pollset_has_workers(pollset)) {
1123  (void)pollset_kick(pollset, nullptr);
1124  } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1125  pollset->called_shutdown = 1;
1126  gpr_mu_unlock(&pollset->mu);
1127  finish_shutdown(pollset);
1129  /* Continuing to access pollset here is safe -- it is the caller's
1130  * responsibility to not destroy when it has outstanding calls to
1131  * pollset_work.
1132  * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1133  gpr_mu_lock(&pollset->mu);
1134  }
1135  }
1136  if (worker_hdl) *worker_hdl = nullptr;
1137  GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1138  return error;
1139 }
1140 
1141 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1142  GPR_ASSERT(!pollset->shutting_down);
1143  pollset->shutting_down = 1;
1144  pollset->shutdown_done = closure;
1145  (void)pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1146  if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1147  pollset->called_shutdown = 1;
1148  finish_shutdown(pollset);
1149  }
1150 }
1151 
1152 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline) {
1153  if (deadline == grpc_core::Timestamp::InfFuture()) return -1;
1154  if (deadline.is_process_epoch()) return 0;
1155  int64_t n = (deadline - grpc_core::ExecCtx::Get()->Now()).millis();
1156  if (n < 0) return 0;
1157  if (n > INT_MAX) return -1;
1158  return static_cast<int>(n);
1159 }
1160 
1161 /*******************************************************************************
1162  * pollset_set_posix.c
1163  */
1164 
1165 static grpc_pollset_set* pollset_set_create(void) {
1166  grpc_pollset_set* pollset_set =
1167  static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1168  gpr_mu_init(&pollset_set->mu);
1169  return pollset_set;
1170 }
1171 
1172 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1173  size_t i;
1174  gpr_mu_destroy(&pollset_set->mu);
1175  for (i = 0; i < pollset_set->fd_count; i++) {
1176  GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1177  }
1178  for (i = 0; i < pollset_set->pollset_count; i++) {
1179  grpc_pollset* pollset = pollset_set->pollsets[i];
1180  gpr_mu_lock(&pollset->mu);
1181  pollset->pollset_set_count--;
1182  /* check shutdown */
1183  if (pollset->shutting_down && !pollset->called_shutdown &&
1184  !pollset_has_observers(pollset)) {
1185  pollset->called_shutdown = 1;
1186  gpr_mu_unlock(&pollset->mu);
1187  finish_shutdown(pollset);
1188  } else {
1189  gpr_mu_unlock(&pollset->mu);
1190  }
1191  }
1192  gpr_free(pollset_set->pollsets);
1193  gpr_free(pollset_set->pollset_sets);
1194  gpr_free(pollset_set->fds);
1195  gpr_free(pollset_set);
1196 }
1197 
1198 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1199  grpc_pollset* pollset) {
1200  size_t i, j;
1201  gpr_mu_lock(&pollset->mu);
1202  pollset->pollset_set_count++;
1203  gpr_mu_unlock(&pollset->mu);
1204  gpr_mu_lock(&pollset_set->mu);
1205  if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1206  pollset_set->pollset_capacity =
1207  std::max(size_t(8), 2 * pollset_set->pollset_capacity);
1208  pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1209  pollset_set->pollsets,
1210  pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1211  }
1212  pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1213  for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1214  if (fd_is_orphaned(pollset_set->fds[i])) {
1215  GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1216  } else {
1217  pollset_add_fd(pollset, pollset_set->fds[i]);
1218  pollset_set->fds[j++] = pollset_set->fds[i];
1219  }
1220  }
1221  pollset_set->fd_count = j;
1222  gpr_mu_unlock(&pollset_set->mu);
1223 }
1224 
1225 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1226  grpc_pollset* pollset) {
1227  size_t i;
1228  gpr_mu_lock(&pollset_set->mu);
1229  for (i = 0; i < pollset_set->pollset_count; i++) {
1230  if (pollset_set->pollsets[i] == pollset) {
1231  pollset_set->pollset_count--;
1232  std::swap(pollset_set->pollsets[i],
1233  pollset_set->pollsets[pollset_set->pollset_count]);
1234  break;
1235  }
1236  }
1237  gpr_mu_unlock(&pollset_set->mu);
1238  gpr_mu_lock(&pollset->mu);
1239  pollset->pollset_set_count--;
1240  /* check shutdown */
1241  if (pollset->shutting_down && !pollset->called_shutdown &&
1242  !pollset_has_observers(pollset)) {
1243  pollset->called_shutdown = 1;
1244  gpr_mu_unlock(&pollset->mu);
1245  finish_shutdown(pollset);
1246  } else {
1247  gpr_mu_unlock(&pollset->mu);
1248  }
1249 }
1250 
1251 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1252  grpc_pollset_set* item) {
1253  size_t i, j;
1254  gpr_mu_lock(&bag->mu);
1255  if (bag->pollset_set_count == bag->pollset_set_capacity) {
1256  bag->pollset_set_capacity =
1257  std::max(size_t(8), 2 * bag->pollset_set_capacity);
1258  bag->pollset_sets = static_cast<grpc_pollset_set**>(
1259  gpr_realloc(bag->pollset_sets,
1260  bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1261  }
1262  bag->pollset_sets[bag->pollset_set_count++] = item;
1263  for (i = 0, j = 0; i < bag->fd_count; i++) {
1264  if (fd_is_orphaned(bag->fds[i])) {
1265  GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1266  } else {
1267  pollset_set_add_fd(item, bag->fds[i]);
1268  bag->fds[j++] = bag->fds[i];
1269  }
1270  }
1271  bag->fd_count = j;
1272  gpr_mu_unlock(&bag->mu);
1273 }
1274 
1275 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1276  grpc_pollset_set* item) {
1277  size_t i;
1278  gpr_mu_lock(&bag->mu);
1279  for (i = 0; i < bag->pollset_set_count; i++) {
1280  if (bag->pollset_sets[i] == item) {
1281  bag->pollset_set_count--;
1282  std::swap(bag->pollset_sets[i],
1283  bag->pollset_sets[bag->pollset_set_count]);
1284  break;
1285  }
1286  }
1287  gpr_mu_unlock(&bag->mu);
1288 }
1289 
1290 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1291  size_t i;
1292  gpr_mu_lock(&pollset_set->mu);
1293  if (pollset_set->fd_count == pollset_set->fd_capacity) {
1294  pollset_set->fd_capacity =
1295  std::max(size_t(8), 2 * pollset_set->fd_capacity);
1296  pollset_set->fds = static_cast<grpc_fd**>(
1297  gpr_realloc(pollset_set->fds,
1298  pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1299  }
1300  GRPC_FD_REF(fd, "pollset_set");
1301  pollset_set->fds[pollset_set->fd_count++] = fd;
1302  for (i = 0; i < pollset_set->pollset_count; i++) {
1303  pollset_add_fd(pollset_set->pollsets[i], fd);
1304  }
1305  for (i = 0; i < pollset_set->pollset_set_count; i++) {
1306  pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1307  }
1308  gpr_mu_unlock(&pollset_set->mu);
1309 }
1310 
1311 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1312  size_t i;
1313  gpr_mu_lock(&pollset_set->mu);
1314  for (i = 0; i < pollset_set->fd_count; i++) {
1315  if (pollset_set->fds[i] == fd) {
1316  pollset_set->fd_count--;
1317  std::swap(pollset_set->fds[i], pollset_set->fds[pollset_set->fd_count]);
1318  GRPC_FD_UNREF(fd, "pollset_set");
1319  break;
1320  }
1321  }
1322  for (i = 0; i < pollset_set->pollset_set_count; i++) {
1323  pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1324  }
1325  gpr_mu_unlock(&pollset_set->mu);
1326 }
1327 
1328 /*******************************************************************************
1329  * event engine binding
1330  */
1331 
1332 static bool is_any_background_poller_thread(void) { return false; }
1333 
1334 static void shutdown_background_closure(void) {}
1335 
1336 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1337  grpc_error_handle /*error*/) {
1338  return false;
1339 }
1340 
1341 /* Called by the child process's post-fork handler to close open fds, including
1342  * worker wakeup fds. This allows gRPC to shutdown in the child process without
1343  * interfering with connections or RPCs ongoing in the parent. */
1344 static void reset_event_manager_on_fork() {
1345  gpr_mu_lock(&fork_fd_list_mu);
1346  while (fork_fd_list_head != nullptr) {
1347  if (fork_fd_list_head->fd != nullptr) {
1348  if (!fork_fd_list_head->fd->closed) {
1349  close(fork_fd_list_head->fd->fd);
1350  }
1351  fork_fd_list_head->fd->fd = -1;
1352  } else {
1353  close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1354  fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1355  close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1356  fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1357  }
1358  fork_fd_list_head = fork_fd_list_head->next;
1359  }
1360  gpr_mu_unlock(&fork_fd_list_mu);
1361 }
1362 
1364  sizeof(grpc_pollset),
1365  false,
1366  false,
1367 
1368  fd_create,
1369  fd_wrapped_fd,
1370  fd_orphan,
1371  fd_shutdown,
1372  fd_notify_on_read,
1373  fd_notify_on_write,
1374  fd_notify_on_error,
1375  fd_set_readable,
1376  fd_set_writable,
1377  fd_set_error,
1378  fd_is_shutdown,
1379 
1380  pollset_init,
1383  pollset_work,
1384  pollset_kick,
1385  pollset_add_fd,
1386 
1387  pollset_set_create,
1388  pollset_set_destroy,
1389  pollset_set_add_pollset,
1390  pollset_set_del_pollset,
1391  pollset_set_add_pollset_set,
1392  pollset_set_del_pollset_set,
1393  pollset_set_add_fd,
1394  pollset_set_del_fd,
1395 
1396  is_any_background_poller_thread,
1397  /* name = */ "poll",
1398  /* check_engine_available = */
1399  [](bool) {
1400  if (!grpc_has_wakeup_fd()) {
1401  gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
1402  return false;
1403  }
1404  if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1405  return false;
1406  }
1407  if (grpc_core::Fork::Enabled()) {
1408  track_fds_for_fork = true;
1409  gpr_mu_init(&fork_fd_list_mu);
1411  reset_event_manager_on_fork);
1412  }
1413  return true;
1414  },
1415  /* init_engine = */ []() {},
1416  /* shutdown_engine = */ shutdown_background_closure,
1417  []() {},
1418  add_closure_to_background_poller,
1419 };
1420 
1421 namespace {
1422 
1423 grpc_poll_function_type real_poll_function;
1424 
1425 int phony_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
1426  if (timeout == 0) {
1427  return real_poll_function(fds, nfds, 0);
1428  } else {
1429  gpr_log(GPR_ERROR, "Attempted a blocking poll when declared non-polling.");
1430  GPR_ASSERT(false);
1431  return -1;
1432  }
1433 }
1434 
1435 } // namespace
1436 
1439  v.check_engine_available = [](bool explicit_request) {
1440  if (!explicit_request) return false;
1441  // return the simplest engine as a phony but also override the poller
1442  if (!grpc_ev_poll_posix.check_engine_available(explicit_request)) {
1443  return false;
1444  }
1445  real_poll_function = grpc_poll_function;
1446  grpc_poll_function = phony_poll;
1447  return true;
1448  };
1449  v.name = "none";
1450  v.init_engine = []() {};
1451  v.shutdown_engine = []() {};
1452  return v;
1453 }();
1454 
1455 #endif /* GRPC_POSIX_SOCKET_EV_POLL */
grpc_pollset_worker
struct grpc_pollset_worker grpc_pollset_worker
Definition: pollset.h:39
grpc_wakeup_fd_destroy
void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info)
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
block_annotate.h
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
gpr_atm_no_barrier_load
#define gpr_atm_no_barrier_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:53
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc::testing::pollset_work
static grpc_error_handle pollset_work(grpc_pollset *ps, grpc_pollset_worker **, grpc_core::Timestamp deadline)
Definition: bm_cq_multiple_threads.cc:73
gpr_atm_full_fetch_add
#define gpr_atm_full_fetch_add(p, delta)
Definition: impl/codegen/atm_gcc_atomic.h:62
log.h
gpr_atm_no_barrier_store
#define gpr_atm_no_barrier_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:56
grpc_pollset::mu
gpr_mu mu
Definition: bm_cq_multiple_threads.cc:38
absl::str_format_internal::LengthMod::j
@ j
grpc_trace_fd_refcount
grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount
GRPC_STATUS_UNAVAILABLE
@ GRPC_STATUS_UNAVAILABLE
Definition: include/grpc/impl/codegen/status.h:143
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
wakeup_fd_posix.h
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
timers.h
iomgr_internal.h
grpc_wakeup_fd
Definition: wakeup_fd_posix.h:74
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
worker
static void worker(void *arg)
Definition: threadpool.c:57
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc_poll_function_type
int(* grpc_poll_function_type)(struct pollfd *, nfds_t, int)
Definition: ev_posix.h:206
string.h
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
useful.h
grpc_event_engine_vtable
Definition: ev_posix.h:46
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
error_ref_leak.err
err
Definition: error_ref_leak.py:35
file
Definition: bloaty/third_party/zlib/examples/gzappend.c:170
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(desc, errs, count)
Definition: error.h:307
setup.name
name
Definition: setup.py:542
GRPC_LOG_IF_ERROR
#define GRPC_LOG_IF_ERROR(what, error)
Definition: error.h:398
xds_manager.p
p
Definition: xds_manager.py:60
GRPC_ERROR_CANCELLED
#define GRPC_ERROR_CANCELLED
Definition: error.h:238
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
stats.h
gen_build_yaml.struct
def struct(**kwargs)
Definition: test/core/end2end/gen_build_yaml.py:30
grpc_iomgr_unregister_object
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj)
Definition: iomgr.cc:193
gpr_zalloc
GPRAPI void * gpr_zalloc(size_t size)
Definition: alloc.cc:40
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc_iomgr_object
Definition: iomgr_internal.h:29
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_error_add_child
grpc_error_handle grpc_error_add_child(grpc_error_handle src, grpc_error_handle child)
Definition: error.cc:678
GRPC_OS_ERROR
#define GRPC_OS_ERROR(err, call_name)
create an error associated with errno!=0 (an 'operating system' error)
Definition: error.h:352
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
gpr_realloc
GPRAPI void * gpr_realloc(void *p, size_t size)
Definition: alloc.cc:56
grpc_wakeup_fd_wakeup
grpc_error_handle grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) GRPC_MUST_USE_RESULT
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_core::ExecCtx::Flush
bool Flush()
Definition: exec_ctx.cc:69
grpc_fd
struct grpc_fd grpc_fd
Definition: ev_posix.h:44
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
Timestamp
struct Timestamp Timestamp
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:672
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
grpc_wakeup_fd_init
grpc_error_handle grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) GRPC_MUST_USE_RESULT
worker
Definition: worker.py:1
setup.v
v
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:42
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
gpr_atm_acq_load
#define gpr_atm_acq_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:52
GRPC_WAKEUP_FD_GET_READ_FD
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info)
Definition: wakeup_fd_posix.h:82
grpc_wakeup_fd_consume_wakeup
grpc_error_handle grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) GRPC_MUST_USE_RESULT
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
GRPC_STATS_INC_SYSCALL_POLL
#define GRPC_STATS_INC_SYSCALL_POLL()
Definition: stats_data.h:188
GRPC_SCHEDULING_END_BLOCKING_REGION
#define GRPC_SCHEDULING_END_BLOCKING_REGION
Definition: block_annotate.h:48
std::swap
void swap(Json::Value &a, Json::Value &b)
Specialize std::swap() for Json::Value.
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:1226
gpr_atm_rel_store
#define gpr_atm_rel_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:54
gpr_atm_no_barrier_fetch_add
#define gpr_atm_no_barrier_fetch_add(p, delta)
Definition: impl/codegen/atm_gcc_atomic.h:59
close
#define close
Definition: test-fs.c:48
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_pollset
struct grpc_pollset grpc_pollset
Definition: pollset.h:38
GPR_THREAD_LOCAL
#define GPR_THREAD_LOCAL(type)
Definition: tls.h:151
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
n
int n
Definition: abseil-cpp/absl/container/btree_test.cc:1080
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
memory_diff.cur
def cur
Definition: memory_diff.py:83
grpc_iomgr_register_object
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name)
Definition: iomgr.cc:184
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
GRPC_MUST_USE_RESULT
#define GRPC_MUST_USE_RESULT
Definition: impl/codegen/port_platform.h:584
grpc_ev_none_posix
const grpc_event_engine_vtable grpc_ev_none_posix
grpc_polling_trace
grpc_core::DebugOnlyTraceFlag grpc_polling_trace
grpc_error_set_int
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
Definition: error.cc:613
absl::flags_internal
Definition: abseil-cpp/absl/flags/commandlineflag.h:40
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
port.h
poll.h
grpc_event_engine_vtable::check_engine_available
bool(* check_engine_available)(bool explicit_request)
Definition: ev_posix.h:89
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
ev_poll_posix.h
alloc.h
grpc::testing::pollset_destroy
static void pollset_destroy(grpc_pollset *ps)
Definition: bm_cq_multiple_threads.cc:59
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
fix_build_deps.r
r
Definition: fix_build_deps.py:491
thd.h
regen-readme.line
line
Definition: regen-readme.py:30
closure
Definition: proxy.cc:59
tls.h
grpc::testing::pollset_shutdown
static void pollset_shutdown(grpc_pollset *, grpc_closure *closure)
Definition: bm_cq_multiple_threads.cc:50
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
grpc_has_wakeup_fd
int grpc_has_wakeup_fd(void)
watcher
ClusterWatcher * watcher
Definition: cds.cc:148
grpc_core::Fork::SetResetChildPollingEngineFunc
static void SetResetChildPollingEngineFunc(child_postfork_func reset_child_polling_engine)
Definition: fork.cc:198
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
murmur_hash.h
GRPC_STATS_INC_POLLSET_KICK
#define GRPC_STATS_INC_POLLSET_KICK()
Definition: stats_data.h:192
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
grpc_core::Timestamp::is_process_epoch
bool is_process_epoch() const
Definition: src/core/lib/gprpp/time.h:107
grpc_error
Definition: error_internal.h:42
grpc::testing::pollset_kick
static grpc_error_handle pollset_kick(grpc_pollset *, grpc_pollset_worker *)
Definition: bm_cq_multiple_threads.cc:61
grpc_poll_function
grpc_poll_function_type grpc_poll_function
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_closure
Definition: closure.h:56
GPR_TIMER_MARK
#define GPR_TIMER_MARK(tag, important)
Definition: src/core/lib/profiling/timers.h:39
binary_size.old
string old
Definition: binary_size.py:128
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
errno.h
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
GRPC_ERROR_INT_GRPC_STATUS
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
Definition: error.h:66
GRPC_SCHEDULING_START_BLOCKING_REGION
#define GRPC_SCHEDULING_START_BLOCKING_REGION
Definition: block_annotate.h:45
grpc_core::Fork::Enabled
static bool Enabled()
Definition: fork.cc:184
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
port_platform.h
grpc_ev_poll_posix
const grpc_event_engine_vtable grpc_ev_poll_posix
grpc::testing::pollset_init
static void pollset_init(grpc_pollset *ps, gpr_mu **mu)
Definition: bm_cq_multiple_threads.cc:54


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:19