completion_queue.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
19 
21 
22 #include <inttypes.h>
23 #include <stdio.h>
24 
25 #include <algorithm>
26 #include <atomic>
27 #include <new>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/status/status.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/str_join.h"
35 
36 #include <grpc/grpc.h>
38 #include <grpc/support/alloc.h>
39 #include <grpc/support/atm.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/sync.h>
42 
45 #include "src/core/lib/gpr/tls.h"
58 
62 
63 namespace {
64 
65 // Specifies a cq thread local cache.
66 // The first event that occurs on a thread
67 // with a cq cache will go into that cache, and
68 // will only be returned on the thread that initialized the cache.
69 // NOTE: Only one event will ever be cached.
70 GPR_THREAD_LOCAL(grpc_cq_completion*) g_cached_event;
72 
73 struct plucker {
75  void* tag;
76 };
77 struct cq_poller_vtable {
78  bool can_get_pollset;
79  bool can_listen;
80  size_t (*size)(void);
81  void (*init)(grpc_pollset* pollset, gpr_mu** mu);
82  grpc_error_handle (*kick)(grpc_pollset* pollset,
83  grpc_pollset_worker* specific_worker);
85  grpc_core::Timestamp deadline);
86  void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
87  void (*destroy)(grpc_pollset* pollset);
88 };
89 typedef struct non_polling_worker {
90  gpr_cv cv;
91  bool kicked;
92  struct non_polling_worker* next;
93  struct non_polling_worker* prev;
94 } non_polling_worker;
95 
96 struct non_polling_poller {
97  gpr_mu mu;
98  bool kicked_without_poller;
99  non_polling_worker* root;
100  grpc_closure* shutdown;
101 };
102 size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
103 
104 void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
105  non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
106  gpr_mu_init(&npp->mu);
107  *mu = &npp->mu;
108 }
109 
110 void non_polling_poller_destroy(grpc_pollset* pollset) {
111  non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
112  gpr_mu_destroy(&npp->mu);
113 }
114 
115 grpc_error_handle non_polling_poller_work(grpc_pollset* pollset,
117  grpc_core::Timestamp deadline) {
118  non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
119  if (npp->shutdown) return GRPC_ERROR_NONE;
120  if (npp->kicked_without_poller) {
121  npp->kicked_without_poller = false;
122  return GRPC_ERROR_NONE;
123  }
124  non_polling_worker w;
125  gpr_cv_init(&w.cv);
126  if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
127  if (npp->root == nullptr) {
128  npp->root = w.next = w.prev = &w;
129  } else {
130  w.next = npp->root;
131  w.prev = w.next->prev;
132  w.next->prev = w.prev->next = &w;
133  }
134  w.kicked = false;
135  gpr_timespec deadline_ts = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
136  while (!npp->shutdown && !w.kicked &&
137  !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) {
138  }
140  if (&w == npp->root) {
141  npp->root = w.next;
142  if (&w == npp->root) {
143  if (npp->shutdown) {
145  }
146  npp->root = nullptr;
147  }
148  }
149  w.next->prev = w.prev;
150  w.prev->next = w.next;
151  gpr_cv_destroy(&w.cv);
152  if (worker != nullptr) *worker = nullptr;
153  return GRPC_ERROR_NONE;
154 }
155 
156 grpc_error_handle non_polling_poller_kick(
157  grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
158  non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
159  if (specific_worker == nullptr) {
160  specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
161  }
162  if (specific_worker != nullptr) {
163  non_polling_worker* w =
164  reinterpret_cast<non_polling_worker*>(specific_worker);
165  if (!w->kicked) {
166  w->kicked = true;
167  gpr_cv_signal(&w->cv);
168  }
169  } else {
170  p->kicked_without_poller = true;
171  }
172  return GRPC_ERROR_NONE;
173 }
174 
175 void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
176  non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
177  GPR_ASSERT(closure != nullptr);
178  p->shutdown = closure;
179  if (p->root == nullptr) {
181  } else {
182  non_polling_worker* w = p->root;
183  do {
184  gpr_cv_signal(&w->cv);
185  w = w->next;
186  } while (w != p->root);
187  }
188 }
189 
190 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
191  /* GRPC_CQ_DEFAULT_POLLING */
194  /* GRPC_CQ_NON_LISTENING */
197  /* GRPC_CQ_NON_POLLING */
198  {false, false, non_polling_poller_size, non_polling_poller_init,
199  non_polling_poller_kick, non_polling_poller_work,
200  non_polling_poller_shutdown, non_polling_poller_destroy},
201 };
202 
203 } // namespace
204 
205 struct cq_vtable {
207  size_t data_size;
208  void (*init)(void* data, grpc_completion_queue_functor* shutdown_callback);
210  void (*destroy)(void* data);
213  void (*done)(void* done_arg, grpc_cq_completion* storage),
214  void* done_arg, grpc_cq_completion* storage, bool internal);
216  void* reserved);
218  gpr_timespec deadline, void* reserved);
219 };
220 
221 namespace {
222 
223 /* Queue that holds the cq_completion_events. Internally uses
224  * MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
225  * queue). It uses a queue_lock to support multiple consumers.
226  * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
227 class CqEventQueue {
228  public:
229  CqEventQueue() = default;
230  ~CqEventQueue() = default;
231 
232  /* Note: The counter is not incremented/decremented atomically with push/pop.
233  * The count is only eventually consistent */
234  intptr_t num_items() const {
235  return num_queue_items_.load(std::memory_order_relaxed);
236  }
237 
238  bool Push(grpc_cq_completion* c);
240 
241  private:
242  /* Spinlock to serialize consumers i.e pop() operations */
244 
246 
247  /* A lazy counter of number of items in the queue. This is NOT atomically
248  incremented/decremented along with push/pop operations and hence is only
249  eventually consistent */
250  std::atomic<intptr_t> num_queue_items_{0};
251 };
252 
253 struct cq_next_data {
254  ~cq_next_data() {
255  GPR_ASSERT(queue.num_items() == 0);
256 #ifndef NDEBUG
257  if (pending_events.load(std::memory_order_acquire) != 0) {
258  gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
259  }
260 #endif
261  }
262 
264  CqEventQueue queue;
265 
268  std::atomic<intptr_t> things_queued_ever{0};
269 
272  std::atomic<intptr_t> pending_events{1};
273 
275  bool shutdown_called = false;
276 };
277 
278 struct cq_pluck_data {
279  cq_pluck_data() {
280  completed_tail = &completed_head;
281  completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
282  }
283 
284  ~cq_pluck_data() {
285  GPR_ASSERT(completed_head.next ==
286  reinterpret_cast<uintptr_t>(&completed_head));
287 #ifndef NDEBUG
288  if (pending_events.load(std::memory_order_acquire) != 0) {
289  gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
290  }
291 #endif
292  }
293 
295  grpc_cq_completion completed_head;
296  grpc_cq_completion* completed_tail;
297 
300  std::atomic<intptr_t> pending_events{1};
301 
304  std::atomic<intptr_t> things_queued_ever{0};
305 
307  /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
308  * (pending_events == 0). So consider removing this in future and use
309  * pending_events */
310  std::atomic<bool> shutdown{false};
311 
313  bool shutdown_called = false;
314 
315  int num_pluckers = 0;
316  plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
317 };
318 
319 struct cq_callback_data {
320  explicit cq_callback_data(grpc_completion_queue_functor* shutdown_callback)
321  : shutdown_callback(shutdown_callback) {}
322 
323  ~cq_callback_data() {
324 #ifndef NDEBUG
325  if (pending_events.load(std::memory_order_acquire) != 0) {
326  gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
327  }
328 #endif
329  }
330 
335  std::atomic<intptr_t> pending_events{1};
336 
338  bool shutdown_called = false;
339 
341  grpc_completion_queue_functor* shutdown_callback;
342 };
343 
344 } // namespace
345 
346 /* Completion queue structure */
350 
352 
354  const cq_poller_vtable* poller_vtable;
355 
356 #ifndef NDEBUG
360 #endif
361 
364 };
365 
366 /* Forward declarations */
373 
374 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
377 
378 // A cq_end_op function is called when an operation on a given CQ with
379 // a given tag has completed. The storage argument is a reference to the
380 // space reserved for this completion as it is placed into the corresponding
381 // queue. The done argument is a callback that will be invoked when it is
382 // safe to free up that storage. The storage MUST NOT be freed until the
383 // done callback is invoked.
384 static void cq_end_op_for_next(
386  void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
387  grpc_cq_completion* storage, bool internal);
388 
389 static void cq_end_op_for_pluck(
391  void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
392  grpc_cq_completion* storage, bool internal);
393 
394 static void cq_end_op_for_callback(
396  void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
397  grpc_cq_completion* storage, bool internal);
398 
400  void* reserved);
401 
403  gpr_timespec deadline, void* reserved);
404 
405 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
406 static void cq_init_next(void* data,
407  grpc_completion_queue_functor* shutdown_callback);
408 static void cq_init_pluck(void* data,
409  grpc_completion_queue_functor* shutdown_callback);
410 static void cq_init_callback(void* data,
411  grpc_completion_queue_functor* shutdown_callback);
412 static void cq_destroy_next(void* data);
413 static void cq_destroy_pluck(void* data);
414 static void cq_destroy_callback(void* data);
415 
416 /* Completion queue vtables based on the completion-type */
417 static const cq_vtable g_cq_vtable[] = {
418  /* GRPC_CQ_NEXT */
419  {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
421  nullptr},
422  /* GRPC_CQ_PLUCK */
423  {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
425  cq_pluck},
426  /* GRPC_CQ_CALLBACK */
427  {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
429  cq_end_op_for_callback, nullptr, nullptr},
430 };
431 
432 #define DATA_FROM_CQ(cq) ((void*)((cq) + 1))
433 #define POLLSET_FROM_CQ(cq) \
434  ((grpc_pollset*)((cq)->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
435 
436 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
437 
438 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
439  do { \
440  if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) && \
441  (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace) || \
442  (event)->type != GRPC_QUEUE_TIMEOUT)) { \
443  gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, \
444  grpc_event_string(event).c_str()); \
445  } \
446  } while (0)
447 
449 
451 
453  if (g_cached_cq == nullptr) {
454  g_cached_event = nullptr;
455  g_cached_cq = cq;
456  }
457 }
458 
460  void** tag, int* ok) {
461  grpc_cq_completion* storage = g_cached_event;
462  int ret = 0;
463  if (storage != nullptr && g_cached_cq == cq) {
464  *tag = storage->tag;
466  *ok = (storage->next & static_cast<uintptr_t>(1)) == 1;
467  storage->done(storage->done_arg, storage);
468  ret = 1;
469  cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
470  if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
471  GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
472  gpr_mu_lock(cq->mu);
474  gpr_mu_unlock(cq->mu);
475  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
476  }
477  }
478  g_cached_event = nullptr;
479  g_cached_cq = nullptr;
480 
481  return ret;
482 }
483 
484 bool CqEventQueue::Push(grpc_cq_completion* c) {
485  queue_.Push(
487  return num_queue_items_.fetch_add(1, std::memory_order_relaxed) == 0;
488 }
489 
491  grpc_cq_completion* c = nullptr;
492 
493  if (gpr_spinlock_trylock(&queue_lock_)) {
495 
496  bool is_empty = false;
497  c = reinterpret_cast<grpc_cq_completion*>(queue_.PopAndCheckEnd(&is_empty));
498  gpr_spinlock_unlock(&queue_lock_);
499 
500  if (c == nullptr && !is_empty) {
502  }
503  } else {
505  }
506 
507  if (c) {
508  num_queue_items_.fetch_sub(1, std::memory_order_relaxed);
509  }
510 
511  return c;
512 }
513 
515  grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
516  grpc_completion_queue_functor* shutdown_callback) {
517  GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
518 
520 
522  "grpc_completion_queue_create_internal(completion_type=%d, "
523  "polling_type=%d)",
524  2, (completion_type, polling_type));
525 
526  const cq_vtable* vtable = &g_cq_vtable[completion_type];
527  const cq_poller_vtable* poller_vtable =
528  &g_poller_vtable_by_poller_type[polling_type];
529 
532 
533  cq = static_cast<grpc_completion_queue*>(
534  gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
535  poller_vtable->size()));
536 
537  cq->vtable = vtable;
538  cq->poller_vtable = poller_vtable;
539 
540  /* One for destroy(), one for pollset_shutdown */
542 
543  poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
544  vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
545 
547  grpc_schedule_on_exec_ctx);
548  return cq;
549 }
550 
551 static void cq_init_next(void* data,
552  grpc_completion_queue_functor* /*shutdown_callback*/) {
553  new (data) cq_next_data();
554 }
555 
556 static void cq_destroy_next(void* data) {
557  cq_next_data* cqd = static_cast<cq_next_data*>(data);
558  cqd->~cq_next_data();
559 }
560 
561 static void cq_init_pluck(
562  void* data, grpc_completion_queue_functor* /*shutdown_callback*/) {
563  new (data) cq_pluck_data();
564 }
565 
566 static void cq_destroy_pluck(void* data) {
567  cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
568  cqd->~cq_pluck_data();
569 }
570 
571 static void cq_init_callback(void* data,
572  grpc_completion_queue_functor* shutdown_callback) {
573  new (data) cq_callback_data(shutdown_callback);
574 }
575 
576 static void cq_destroy_callback(void* data) {
577  cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
578  cqd->~cq_callback_data();
579 }
580 
582  return cq->vtable->cq_completion_type;
583 }
584 
586  int cur_num_polls;
587  gpr_mu_lock(cq->mu);
588  cur_num_polls = cq->num_polls;
589  gpr_mu_unlock(cq->mu);
590  return cur_num_polls;
591 }
592 
593 #ifndef NDEBUG
595  const char* file, int line) {
596  grpc_core::DebugLocation debug_location(file, line);
597 #else
599  grpc_core::DebugLocation debug_location;
600  const char* reason = nullptr;
601 #endif
602  cq->owning_refs.Ref(debug_location, reason);
603 }
604 
605 static void on_pollset_shutdown_done(void* arg, grpc_error_handle /*error*/) {
607  GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
608 }
609 
610 #ifndef NDEBUG
612  const char* file, int line) {
613  grpc_core::DebugLocation debug_location(file, line);
614 #else
616  grpc_core::DebugLocation debug_location;
617  const char* reason = nullptr;
618 #endif
619  if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
621  cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
622 #ifndef NDEBUG
624 #endif
625  gpr_free(cq);
626  }
627 }
628 
629 #ifndef NDEBUG
630 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
631  int found = 0;
632  if (lock_cq) {
633  gpr_mu_lock(cq->mu);
634  }
635 
636  for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
637  if (cq->outstanding_tags[i] == tag) {
641  found = 1;
642  break;
643  }
644  }
645 
646  if (lock_cq) {
647  gpr_mu_unlock(cq->mu);
648  }
649 
650  GPR_ASSERT(found);
651 }
652 #else
653 static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
654  bool /*lock_cq*/) {}
655 #endif
656 
657 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
658  cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
659  return grpc_core::IncrementIfNonzero(&cqd->pending_events);
660 }
661 
662 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
663  cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
664  return grpc_core::IncrementIfNonzero(&cqd->pending_events);
665 }
666 
667 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
668  cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
669  return grpc_core::IncrementIfNonzero(&cqd->pending_events);
670 }
671 
673 #ifndef NDEBUG
674  gpr_mu_lock(cq->mu);
677  std::max(size_t(4), 2 * cq->outstanding_tag_capacity);
678  cq->outstanding_tags = static_cast<void**>(gpr_realloc(
681  }
683  gpr_mu_unlock(cq->mu);
684 #endif
685  return cq->vtable->begin_op(cq, tag);
686 }
687 
688 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
689  * completion
690  * type of GRPC_CQ_NEXT) */
691 static void cq_end_op_for_next(
693  void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
694  grpc_cq_completion* storage, bool /*internal*/) {
695  GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
696 
702  "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
703  "done=%p, done_arg=%p, storage=%p)",
704  6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
707  gpr_log(GPR_INFO, "Operation failed: tag=%p, error=%s", tag,
708  errmsg.c_str());
709  }
710  }
711  cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
712  int is_success = (GRPC_ERROR_IS_NONE(error));
713 
714  storage->tag = tag;
715  storage->done = done;
716  storage->done_arg = done_arg;
717  storage->next = static_cast<uintptr_t>(is_success);
718 
719  cq_check_tag(cq, tag, true); /* Used in debug builds only */
720 
721  if (g_cached_cq == cq && g_cached_event == nullptr) {
722  g_cached_event = storage;
723  } else {
724  /* Add the completion to the queue */
725  bool is_first = cqd->queue.Push(storage);
726  cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
727  /* Since we do not hold the cq lock here, it is important to do an 'acquire'
728  load here (instead of a 'no_barrier' load) to match with the release
729  store
730  (done via pending_events.fetch_sub(1, ACQ_REL)) in cq_shutdown_next
731  */
732  if (cqd->pending_events.load(std::memory_order_acquire) != 1) {
733  /* Only kick if this is the first item queued */
734  if (is_first) {
735  gpr_mu_lock(cq->mu);
736  grpc_error_handle kick_error =
737  cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
738  gpr_mu_unlock(cq->mu);
739 
740  if (!GRPC_ERROR_IS_NONE(kick_error)) {
741  gpr_log(GPR_ERROR, "Kick failed: %s",
742  grpc_error_std_string(kick_error).c_str());
743  GRPC_ERROR_UNREF(kick_error);
744  }
745  }
746  if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
747  GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
748  gpr_mu_lock(cq->mu);
750  gpr_mu_unlock(cq->mu);
751  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
752  }
753  } else {
754  GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
755  cqd->pending_events.store(0, std::memory_order_release);
756  gpr_mu_lock(cq->mu);
758  gpr_mu_unlock(cq->mu);
759  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
760  }
761  }
762 
764 }
765 
766 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
767  * completion
768  * type of GRPC_CQ_PLUCK) */
771  void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
772  grpc_cq_completion* storage, bool /*internal*/) {
773  GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
774 
775  cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
776  int is_success = (GRPC_ERROR_IS_NONE(error));
777 
781  std::string errmsg = grpc_error_std_string(error).c_str();
783  "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
784  "done=%p, done_arg=%p, storage=%p)",
785  6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
788  gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag,
789  errmsg.c_str());
790  }
791  }
792 
793  storage->tag = tag;
794  storage->done = done;
795  storage->done_arg = done_arg;
796  storage->next = reinterpret_cast<uintptr_t>(&cqd->completed_head) |
797  static_cast<uintptr_t>(is_success);
798 
799  gpr_mu_lock(cq->mu);
800  cq_check_tag(cq, tag, false); /* Used in debug builds only */
801 
802  /* Add to the list of completions */
803  cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
804  cqd->completed_tail->next =
805  reinterpret_cast<uintptr_t>(storage) | (1u & cqd->completed_tail->next);
806  cqd->completed_tail = storage;
807 
808  if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
810  gpr_mu_unlock(cq->mu);
811  } else {
812  grpc_pollset_worker* pluck_worker = nullptr;
813  for (int i = 0; i < cqd->num_pluckers; i++) {
814  if (cqd->pluckers[i].tag == tag) {
815  pluck_worker = *cqd->pluckers[i].worker;
816  break;
817  }
818  }
819 
820  grpc_error_handle kick_error =
821  cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
822  gpr_mu_unlock(cq->mu);
823  if (!GRPC_ERROR_IS_NONE(kick_error)) {
824  gpr_log(GPR_ERROR, "Kick failed: %s",
825  grpc_error_std_string(kick_error).c_str());
826  GRPC_ERROR_UNREF(kick_error);
827  }
828  }
829 
831 }
832 
834  auto* functor = static_cast<grpc_completion_queue_functor*>(arg);
835  functor->functor_run(functor, GRPC_ERROR_IS_NONE(error));
836 }
837 
838 /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
841  void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
842  grpc_cq_completion* storage, bool internal) {
843  GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
844 
845  cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
846 
852  "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
853  "done=%p, done_arg=%p, storage=%p)",
854  6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
857  gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag,
858  errmsg.c_str());
859  }
860  }
861 
862  // The callback-based CQ isn't really a queue at all and thus has no need
863  // for reserved storage. Invoke the done callback right away to release it.
864  done(done_arg, storage);
865 
866  cq_check_tag(cq, tag, true); /* Used in debug builds only */
867 
868  if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
870  }
871 
872  // If possible, schedule the callback onto an existing thread-local
873  // ApplicationCallbackExecCtx, which is a work queue. This is possible for:
874  // 1. The callback is internally-generated and there is an ACEC available
875  // 2. The callback is marked inlineable and there is an ACEC available
876  // 3. We are already running in a background poller thread (which always has
877  // an ACEC available at the base of the stack).
878  auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
879  if (((internal || functor->inlineable) &&
885  return;
886  }
887 
888  // Schedule the callback on a closure if not internal or triggered
889  // from a background poller thread.
891  GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
892 }
893 
896  void (*done)(void* done_arg, grpc_cq_completion* storage),
897  void* done_arg, grpc_cq_completion* storage,
898  bool internal) {
899  cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
900 }
901 
907  void* tag; /* for pluck */
909 };
911  public:
912  explicit ExecCtxNext(void* arg)
914 
915  bool CheckReadyToFinish() override {
918  grpc_completion_queue* cq = a->cq;
919  cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
920  GPR_ASSERT(a->stolen_completion == nullptr);
921 
922  intptr_t current_last_seen_things_queued_ever =
923  cqd->things_queued_ever.load(std::memory_order_relaxed);
924 
925  if (current_last_seen_things_queued_ever !=
926  a->last_seen_things_queued_ever) {
927  a->last_seen_things_queued_ever =
928  cqd->things_queued_ever.load(std::memory_order_relaxed);
929 
930  /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
931  * might return NULL in some cases even if the queue is not empty; but
932  * that
933  * is ok and doesn't affect correctness. Might effect the tail latencies a
934  * bit) */
935  a->stolen_completion = cqd->queue.Pop();
936  if (a->stolen_completion != nullptr) {
937  return true;
938  }
939  }
940  return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
941  }
942 
943  private:
945 };
946 
947 #ifndef NDEBUG
950  std::vector<std::string> parts;
951  parts.push_back("PENDING TAGS:");
952  gpr_mu_lock(cq->mu);
953  for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
954  parts.push_back(absl::StrFormat(" %p", cq->outstanding_tags[i]));
955  }
956  gpr_mu_unlock(cq->mu);
957  gpr_log(GPR_DEBUG, "%s", absl::StrJoin(parts, "").c_str());
958 }
959 #else
960 static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
961 #endif
962 
964  void* reserved) {
965  GPR_TIMER_SCOPE("grpc_completion_queue_next", 0);
966 
967  grpc_event ret;
968  cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
969 
971  "grpc_completion_queue_next("
972  "cq=%p, "
973  "deadline=gpr_timespec { tv_sec: %" PRId64
974  ", tv_nsec: %d, clock_type: %d }, "
975  "reserved=%p)",
976  5,
977  (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
978  reserved));
979  GPR_ASSERT(!reserved);
980 
982 
983  GRPC_CQ_INTERNAL_REF(cq, "next");
984 
985  grpc_core::Timestamp deadline_millis =
987  cq_is_finished_arg is_finished_arg = {
988  cqd->things_queued_ever.load(std::memory_order_relaxed),
989  cq,
990  deadline_millis,
991  nullptr,
992  nullptr,
993  true};
994  ExecCtxNext exec_ctx(&is_finished_arg);
995  for (;;) {
996  grpc_core::Timestamp iteration_deadline = deadline_millis;
997 
998  if (is_finished_arg.stolen_completion != nullptr) {
999  grpc_cq_completion* c = is_finished_arg.stolen_completion;
1000  is_finished_arg.stolen_completion = nullptr;
1001  ret.type = GRPC_OP_COMPLETE;
1002  ret.success = c->next & 1u;
1003  ret.tag = c->tag;
1004  c->done(c->done_arg, c);
1005  break;
1006  }
1007 
1008  grpc_cq_completion* c = cqd->queue.Pop();
1009 
1010  if (c != nullptr) {
1011  ret.type = GRPC_OP_COMPLETE;
1012  ret.success = c->next & 1u;
1013  ret.tag = c->tag;
1014  c->done(c->done_arg, c);
1015  break;
1016  } else {
1017  /* If c == NULL it means either the queue is empty OR in an transient
1018  inconsistent state. If it is the latter, we shold do a 0-timeout poll
1019  so that the thread comes back quickly from poll to make a second
1020  attempt at popping. Not doing this can potentially deadlock this
1021  thread forever (if the deadline is infinity) */
1022  if (cqd->queue.num_items() > 0) {
1023  iteration_deadline = grpc_core::Timestamp::ProcessEpoch();
1024  }
1025  }
1026 
1027  if (cqd->pending_events.load(std::memory_order_acquire) == 0) {
1028  /* Before returning, check if the queue has any items left over (since
1029  MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
1030  even if the queue is not empty. If so, keep retrying but do not
1031  return GRPC_QUEUE_SHUTDOWN */
1032  if (cqd->queue.num_items() > 0) {
1033  /* Go to the beginning of the loop. No point doing a poll because
1034  (cq->shutdown == true) is only possible when there is no pending
1035  work (i.e cq->pending_events == 0) and any outstanding completion
1036  events should have already been queued on this cq */
1037  continue;
1038  }
1039 
1040  ret.type = GRPC_QUEUE_SHUTDOWN;
1041  ret.success = 0;
1042  break;
1043  }
1044 
1045  if (!is_finished_arg.first_loop &&
1046  grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1047  ret.type = GRPC_QUEUE_TIMEOUT;
1048  ret.success = 0;
1050  break;
1051  }
1052 
1053  /* The main polling work happens in grpc_pollset_work */
1054  gpr_mu_lock(cq->mu);
1055  cq->num_polls++;
1057  POLLSET_FROM_CQ(cq), nullptr, iteration_deadline);
1058  gpr_mu_unlock(cq->mu);
1059 
1060  if (!GRPC_ERROR_IS_NONE(err)) {
1061  gpr_log(GPR_ERROR, "Completion queue next failed: %s",
1064  if (err == GRPC_ERROR_CANCELLED) {
1065  ret.type = GRPC_QUEUE_SHUTDOWN;
1066  } else {
1067  ret.type = GRPC_QUEUE_TIMEOUT;
1068  }
1069  ret.success = 0;
1071  break;
1072  }
1073  is_finished_arg.first_loop = false;
1074  }
1075 
1076  if (cqd->queue.num_items() > 0 &&
1077  cqd->pending_events.load(std::memory_order_acquire) > 0) {
1078  gpr_mu_lock(cq->mu);
1079  (void)cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1080  gpr_mu_unlock(cq->mu);
1081  }
1082 
1084  GRPC_CQ_INTERNAL_UNREF(cq, "next");
1085 
1086  GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1087 
1088  return ret;
1089 }
1090 
1091 /* Finishes the completion queue shutdown. This means that there are no more
1092  completion events / tags expected from the completion queue
1093  - Must be called under completion queue lock
1094  - Must be called only once in completion queue's lifetime
1095  - grpc_completion_queue_shutdown() MUST have been called before calling
1096  this function */
1098  cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1099 
1100  GPR_ASSERT(cqd->shutdown_called);
1101  GPR_ASSERT(cqd->pending_events.load(std::memory_order_relaxed) == 0);
1102 
1104 }
1105 
1107  cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1108 
1109  /* Need an extra ref for cq here because:
1110  * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1111  * Pollset shutdown decrements the cq ref count which can potentially destroy
1112  * the cq (if that happens to be the last ref).
1113  * Creating an extra ref here prevents the cq from getting destroyed while
1114  * this function is still active */
1115  GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1116  gpr_mu_lock(cq->mu);
1117  if (cqd->shutdown_called) {
1118  gpr_mu_unlock(cq->mu);
1119  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1120  return;
1121  }
1122  cqd->shutdown_called = true;
1123  /* Doing acq/release fetch_sub here to match with
1124  * cq_begin_op_for_next and cq_end_op_for_next functions which read/write
1125  * on this counter without necessarily holding a lock on cq */
1126  if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1128  }
1129  gpr_mu_unlock(cq->mu);
1130  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1131 }
1132 
1134  gpr_timespec deadline, void* reserved) {
1135  return cq->vtable->next(cq, deadline, reserved);
1136 }
1137 
1140  cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1141  if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1142  return 0;
1143  }
1144  cqd->pluckers[cqd->num_pluckers].tag = tag;
1145  cqd->pluckers[cqd->num_pluckers].worker = worker;
1146  cqd->num_pluckers++;
1147  return 1;
1148 }
1149 
1152  cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1153  for (int i = 0; i < cqd->num_pluckers; i++) {
1154  if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1155  cqd->num_pluckers--;
1156  std::swap(cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1157  return;
1158  }
1159  }
1160  GPR_UNREACHABLE_CODE(return );
1161 }
1162 
1164  public:
1165  explicit ExecCtxPluck(void* arg)
1167 
1168  bool CheckReadyToFinish() override {
1171  grpc_completion_queue* cq = a->cq;
1172  cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1173 
1174  GPR_ASSERT(a->stolen_completion == nullptr);
1175  gpr_atm current_last_seen_things_queued_ever =
1176  cqd->things_queued_ever.load(std::memory_order_relaxed);
1177  if (current_last_seen_things_queued_ever !=
1178  a->last_seen_things_queued_ever) {
1179  gpr_mu_lock(cq->mu);
1180  a->last_seen_things_queued_ever =
1181  cqd->things_queued_ever.load(std::memory_order_relaxed);
1183  grpc_cq_completion* prev = &cqd->completed_head;
1184  while ((c = reinterpret_cast<grpc_cq_completion*>(
1185  prev->next & ~static_cast<uintptr_t>(1))) !=
1186  &cqd->completed_head) {
1187  if (c->tag == a->tag) {
1188  prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1189  (c->next & ~static_cast<uintptr_t>(1));
1190  if (c == cqd->completed_tail) {
1191  cqd->completed_tail = prev;
1192  }
1193  gpr_mu_unlock(cq->mu);
1194  a->stolen_completion = c;
1195  return true;
1196  }
1197  prev = c;
1198  }
1199  gpr_mu_unlock(cq->mu);
1200  }
1201  return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
1202  }
1203 
1204  private:
1206 };
1207 
1209  gpr_timespec deadline, void* reserved) {
1210  GPR_TIMER_SCOPE("grpc_completion_queue_pluck", 0);
1211 
1212  grpc_event ret;
1214  grpc_cq_completion* prev;
1215  grpc_pollset_worker* worker = nullptr;
1216  cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1217 
1220  "grpc_completion_queue_pluck("
1221  "cq=%p, tag=%p, "
1222  "deadline=gpr_timespec { tv_sec: %" PRId64
1223  ", tv_nsec: %d, clock_type: %d }, "
1224  "reserved=%p)",
1225  6,
1226  (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1227  reserved));
1228  }
1229  GPR_ASSERT(!reserved);
1230 
1232 
1233  GRPC_CQ_INTERNAL_REF(cq, "pluck");
1234  gpr_mu_lock(cq->mu);
1235  grpc_core::Timestamp deadline_millis =
1237  cq_is_finished_arg is_finished_arg = {
1238  cqd->things_queued_ever.load(std::memory_order_relaxed),
1239  cq,
1240  deadline_millis,
1241  nullptr,
1242  tag,
1243  true};
1244  ExecCtxPluck exec_ctx(&is_finished_arg);
1245  for (;;) {
1246  if (is_finished_arg.stolen_completion != nullptr) {
1247  gpr_mu_unlock(cq->mu);
1248  c = is_finished_arg.stolen_completion;
1249  is_finished_arg.stolen_completion = nullptr;
1250  ret.type = GRPC_OP_COMPLETE;
1251  ret.success = c->next & 1u;
1252  ret.tag = c->tag;
1253  c->done(c->done_arg, c);
1254  break;
1255  }
1256  prev = &cqd->completed_head;
1257  while ((c = reinterpret_cast<grpc_cq_completion*>(
1258  prev->next & ~static_cast<uintptr_t>(1))) !=
1259  &cqd->completed_head) {
1260  if (c->tag == tag) {
1261  prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1262  (c->next & ~static_cast<uintptr_t>(1));
1263  if (c == cqd->completed_tail) {
1264  cqd->completed_tail = prev;
1265  }
1266  gpr_mu_unlock(cq->mu);
1267  ret.type = GRPC_OP_COMPLETE;
1268  ret.success = c->next & 1u;
1269  ret.tag = c->tag;
1270  c->done(c->done_arg, c);
1271  goto done;
1272  }
1273  prev = c;
1274  }
1275  if (cqd->shutdown.load(std::memory_order_relaxed)) {
1276  gpr_mu_unlock(cq->mu);
1277  ret.type = GRPC_QUEUE_SHUTDOWN;
1278  ret.success = 0;
1279  break;
1280  }
1281  if (!add_plucker(cq, tag, &worker)) {
1283  "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1284  "is %d",
1286  gpr_mu_unlock(cq->mu);
1287  /* TODO(ctiller): should we use a different result here */
1288  ret.type = GRPC_QUEUE_TIMEOUT;
1289  ret.success = 0;
1291  break;
1292  }
1293  if (!is_finished_arg.first_loop &&
1294  grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1295  del_plucker(cq, tag, &worker);
1296  gpr_mu_unlock(cq->mu);
1297  ret.type = GRPC_QUEUE_TIMEOUT;
1298  ret.success = 0;
1300  break;
1301  }
1302  cq->num_polls++;
1304  cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1305  if (!GRPC_ERROR_IS_NONE(err)) {
1306  del_plucker(cq, tag, &worker);
1307  gpr_mu_unlock(cq->mu);
1308  gpr_log(GPR_ERROR, "Completion queue pluck failed: %s",
1311  ret.type = GRPC_QUEUE_TIMEOUT;
1312  ret.success = 0;
1314  break;
1315  }
1316  is_finished_arg.first_loop = false;
1317  del_plucker(cq, tag, &worker);
1318  }
1319 done:
1321  GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1322 
1323  GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1324 
1325  return ret;
1326 }
1327 
1329  gpr_timespec deadline, void* reserved) {
1330  return cq->vtable->pluck(cq, tag, deadline, reserved);
1331 }
1332 
1334  cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1335 
1336  GPR_ASSERT(cqd->shutdown_called);
1337  GPR_ASSERT(!cqd->shutdown.load(std::memory_order_relaxed));
1338  cqd->shutdown.store(true, std::memory_order_relaxed);
1339 
1341 }
1342 
1343 /* NOTE: This function is almost exactly identical to cq_shutdown_next() but
1344  * merging them is a bit tricky and probably not worth it */
1346  cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1347 
1348  /* Need an extra ref for cq here because:
1349  * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1350  * Pollset shutdown decrements the cq ref count which can potentially destroy
1351  * the cq (if that happens to be the last ref).
1352  * Creating an extra ref here prevents the cq from getting destroyed while
1353  * this function is still active */
1354  GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1355  gpr_mu_lock(cq->mu);
1356  if (cqd->shutdown_called) {
1357  gpr_mu_unlock(cq->mu);
1358  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1359  return;
1360  }
1361  cqd->shutdown_called = true;
1362  if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1364  }
1365  gpr_mu_unlock(cq->mu);
1366  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1367 }
1368 
1370  cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1371  auto* callback = cqd->shutdown_callback;
1372 
1373  GPR_ASSERT(cqd->shutdown_called);
1374 
1378  return;
1379  }
1380 
1381  // Schedule the callback on a closure if not internal or triggered
1382  // from a background poller thread.
1385  GRPC_ERROR_NONE);
1386 }
1387 
1389  cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1390 
1391  /* Need an extra ref for cq here because:
1392  * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1393  * Pollset shutdown decrements the cq ref count which can potentially destroy
1394  * the cq (if that happens to be the last ref).
1395  * Creating an extra ref here prevents the cq from getting destroyed while
1396  * this function is still active */
1397  GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1398  gpr_mu_lock(cq->mu);
1399  if (cqd->shutdown_called) {
1400  gpr_mu_unlock(cq->mu);
1401  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1402  return;
1403  }
1404  cqd->shutdown_called = true;
1405  if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1406  gpr_mu_unlock(cq->mu);
1408  } else {
1409  gpr_mu_unlock(cq->mu);
1410  }
1411  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1412 }
1413 
1414 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
1415  to zero here, then enter shutdown mode and wake up any waiters */
1417  GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
1418  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1420  GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
1421  cq->vtable->shutdown(cq);
1422 }
1423 
1425  GPR_TIMER_SCOPE("grpc_completion_queue_destroy", 0);
1426  GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
1428 
1430  GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1431 }
1432 
1434  return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1435 }
1436 
1438  return cq->poller_vtable->can_listen;
1439 }
grpc_pollset_worker
struct grpc_pollset_worker grpc_pollset_worker
Definition: pollset.h:39
grpc_trace_pending_tags
grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags")
gpr_cv_signal
GPRAPI void gpr_cv_signal(gpr_cv *cv)
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
gpr_timespec::tv_nsec
int32_t tv_nsec
Definition: gpr_types.h:52
grpc_core::RefCount::Ref
void Ref(Value n=1)
Definition: ref_counted.h:68
grpc_completion_queue::outstanding_tag_capacity
size_t outstanding_tag_capacity
Definition: completion_queue.cc:359
gpr_timespec::tv_sec
int64_t tv_sec
Definition: gpr_types.h:51
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
GPR_SPINLOCK_INITIALIZER
#define GPR_SPINLOCK_INITIALIZER
Definition: src/core/lib/gpr/spinlock.h:35
grpc_pollset_size
size_t grpc_pollset_size(void)
Definition: pollset.cc:56
atomic_utils.h
cq_finish_shutdown_pluck
static void cq_finish_shutdown_pluck(grpc_completion_queue *cq)
Definition: completion_queue.cc:1333
iomgr.h
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
grpc_core::ApplicationCallbackExecCtx::Enqueue
static void Enqueue(grpc_completion_queue_functor *functor, int is_success)
Definition: exec_ctx.h:326
pollset.h
vtable
static const grpc_transport_vtable vtable
Definition: binder_transport.cc:680
cq_is_finished_arg::deadline
grpc_core::Timestamp deadline
Definition: completion_queue.cc:905
init
const char * init
Definition: upb/upb/bindings/lua/main.c:49
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
grpc_trace_cq_refcount
grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount")
bool
bool
Definition: setup_once.h:312
grpc_core::DebugLocation
Definition: debug_location.h:31
cq_vtable::next
grpc_event(* next)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:215
functor_callback
static void functor_callback(void *arg, grpc_error_handle error)
Definition: completion_queue.cc:833
GRPC_CQ_NEXT
@ GRPC_CQ_NEXT
Definition: grpc_types.h:760
dump_pending_tags
static void dump_pending_tags(grpc_completion_queue *cq)
Definition: completion_queue.cc:948
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
timers.h
cq_vtable::end_op
void(* end_op)(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:212
grpc_cq_polling_type
grpc_cq_polling_type
Definition: grpc_types.h:740
grpc_iomgr_is_any_background_poller_thread
bool grpc_iomgr_is_any_background_poller_thread()
Definition: iomgr.cc:175
gpr_spinlock
Definition: src/core/lib/gpr/spinlock.h:29
cq_end_op_for_callback
static void cq_end_op_for_callback(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:839
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
worker
static void worker(void *arg)
Definition: threadpool.c:57
grpc_completion_queue::outstanding_tags
void ** outstanding_tags
Definition: completion_queue.cc:357
cq_init_callback
static void cq_init_callback(void *data, grpc_completion_queue_functor *shutdown_callback)
Definition: completion_queue.cc:571
grpc_completion_queue_thread_local_cache_flush
int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq, void **tag, int *ok)
Definition: completion_queue.cc:459
gpr_cv
pthread_cond_t gpr_cv
Definition: impl/codegen/sync_posix.h:48
grpc_cq_completion_type
grpc_cq_completion_type
Definition: grpc_types.h:758
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
cq_begin_op_for_pluck
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:662
GRPC_QUEUE_SHUTDOWN
@ GRPC_QUEUE_SHUTDOWN
Definition: grpc_types.h:554
error_ref_leak.err
err
Definition: error_ref_leak.py:35
GRPC_OP_COMPLETE
@ GRPC_OP_COMPLETE
Definition: grpc_types.h:558
DATA_FROM_CQ
#define DATA_FROM_CQ(cq)
Definition: completion_queue.cc:432
cq_shutdown_callback
static void cq_shutdown_callback(grpc_completion_queue *cq)
Definition: completion_queue.cc:1388
grpc_core::MultiProducerSingleConsumerQueue
Definition: mpscq.h:35
completion_queue.h
file
Definition: bloaty/third_party/zlib/examples/gzappend.c:170
closure.h
grpc_completion_queue::num_polls
int num_polls
Definition: completion_queue.cc:363
grpc_cq_completion
Definition: src/core/lib/surface/completion_queue.h:43
u
OPENSSL_EXPORT pem_password_cb void * u
Definition: pem.h:351
cq_init_pluck
static void cq_init_pluck(void *data, grpc_completion_queue_functor *shutdown_callback)
Definition: completion_queue.cc:561
grpc_event
struct grpc_event grpc_event
grpc_core::ApplicationCallbackExecCtx
Definition: exec_ctx.h:283
cq_end_op_for_next
static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:691
grpc_cq_end_op
void grpc_cq_end_op(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:894
cq_destroy_callback
static void cq_destroy_callback(void *data)
Definition: completion_queue.cc:576
grpc_pollset_work
grpc_error_handle grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker **worker, grpc_core::Timestamp deadline)
Definition: pollset.cc:45
a
int a
Definition: abseil-cpp/absl/container/internal/hash_policy_traits_test.cc:88
ExecCtxNext
Definition: completion_queue.cc:910
cq_pluck
static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1208
ExecCtxNext::CheckReadyToFinish
bool CheckReadyToFinish() override
Definition: completion_queue.cc:915
xds_manager.p
p
Definition: xds_manager.py:60
GRPC_CLOSURE_CREATE
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
Definition: closure.h:160
GRPC_ERROR_CANCELLED
#define GRPC_ERROR_CANCELLED
Definition: error.h:238
cq_vtable
Definition: completion_queue.cc:205
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_completion_queue_thread_local_cache_init
void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq)
Definition: completion_queue.cc:452
grpc_core::Executor::Run
static void Run(grpc_closure *closure, grpc_error_handle error, ExecutorType executor_type=ExecutorType::DEFAULT, ExecutorJobType job_type=ExecutorJobType::SHORT)
Definition: executor.cc:398
stats.h
POLLSET_FROM_CQ
#define POLLSET_FROM_CQ(cq)
Definition: completion_queue.cc:433
grpc_pollset_init
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu)
Definition: pollset.cc:33
GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES
#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES()
Definition: stats_data.h:391
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES
#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES()
Definition: stats_data.h:387
gpr_zalloc
GPRAPI void * gpr_zalloc(size_t size)
Definition: alloc.cc:40
on_pollset_shutdown_done
static void on_pollset_shutdown_done(void *arg, grpc_error_handle error)
Definition: completion_queue.cc:605
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
cq_shutdown_next
static void cq_shutdown_next(grpc_completion_queue *cq)
Definition: completion_queue.cc:1106
cq_shutdown_pluck
static void cq_shutdown_pluck(grpc_completion_queue *cq)
Definition: completion_queue.cc:1345
grpc_cq_completion::next
uintptr_t next
Definition: src/core/lib/surface/completion_queue.h:55
c
void c(T a)
Definition: miscompile_with_no_unique_address_test.cc:40
grpc_cq_pluck_trace
grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck")
grpc_completion_queue::outstanding_tag_count
size_t outstanding_tag_count
Definition: completion_queue.cc:358
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_completion_queue_functor::functor_run
void(* functor_run)(struct grpc_completion_queue_functor *, int)
Definition: grpc_types.h:778
grpc_completion_queue_shutdown
void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
Definition: completion_queue.cc:1416
gpr_realloc
GPRAPI void * gpr_realloc(void *p, size_t size)
Definition: alloc.cc:56
absl::StrJoin
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
Definition: abseil-cpp/absl/strings/str_join.h:239
gen_stats_data.found
bool found
Definition: gen_stats_data.py:61
queue
Definition: sync_test.cc:39
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
gpr_cv_destroy
GPRAPI void gpr_cv_destroy(gpr_cv *cv)
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
root
RefCountedPtr< grpc_tls_certificate_provider > root
Definition: xds_server_config_fetcher.cc:223
grpc_get_cq_completion_type
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq)
Definition: completion_queue.cc:581
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
del_plucker
static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker)
Definition: completion_queue.cc:1150
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_cq_global_init
void grpc_cq_global_init()
Definition: completion_queue.cc:450
cq_is_finished_arg::stolen_completion
grpc_cq_completion * stolen_completion
Definition: completion_queue.cc:906
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
ExecCtxNext::ExecCtxNext
ExecCtxNext(void *arg)
Definition: completion_queue.cc:912
worker
Definition: worker.py:1
grpc_event
Definition: grpc_types.h:564
grpc_completion_queue
Definition: completion_queue.cc:347
grpc_completion_queue::poller_vtable
const cq_poller_vtable * poller_vtable
Definition: completion_queue.cc:354
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
google::protobuf::python::repeated_composite_container::Pop
static PyObject * Pop(PyObject *pself, PyObject *args)
Definition: bloaty/third_party/protobuf/python/google/protobuf/pyext/repeated_composite_container.cc:445
grpc.h
grpc_error_handle
grpc_error * grpc_error_handle
Definition: error.h:50
grpc_cq_pollset
grpc_pollset * grpc_cq_pollset(grpc_completion_queue *cq)
Definition: completion_queue.cc:1433
grpc_core::RefCount::Unref
bool Unref()
Definition: ref_counted.h:152
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
cq_is_finished_arg::cq
grpc_completion_queue * cq
Definition: completion_queue.cc:904
grpc_completion_queue_create_internal
grpc_completion_queue * grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, grpc_completion_queue_functor *shutdown_callback)
Definition: completion_queue.cc:514
arg
Definition: cmdline.cc:40
cq_vtable::init
void(* init)(void *data, grpc_completion_queue_functor *shutdown_callback)
Definition: completion_queue.cc:208
cq_vtable::data_size
size_t data_size
Definition: completion_queue.cc:207
gpr_spinlock_unlock
#define gpr_spinlock_unlock(lock)
Definition: src/core/lib/gpr/spinlock.h:41
gpr_cv_wait
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
std::swap
void swap(Json::Value &a, Json::Value &b)
Specialize std::swap() for Json::Value.
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:1226
time.h
ExecCtxPluck::ExecCtxPluck
ExecCtxPluck(void *arg)
Definition: completion_queue.cc:1165
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
event_string.h
uintptr_t
_W64 unsigned int uintptr_t
Definition: stdint-msvc2008.h:119
GPR_THREAD_LOCAL
#define GPR_THREAD_LOCAL(type)
Definition: tls.h:151
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
grpc_cq_can_listen
bool grpc_cq_can_listen(grpc_completion_queue *cq)
Definition: completion_queue.cc:1437
grpc_pollset_kick
grpc_error_handle grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker)
Definition: pollset.cc:51
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
#define GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
Definition: grpc.h:154
GRPC_SURFACE_TRACE_RETURNED_EVENT
#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)
Definition: completion_queue.cc:438
cq_vtable::destroy
void(* destroy)(void *data)
Definition: completion_queue.cc:210
GRPC_CQ_PLUCK
@ GRPC_CQ_PLUCK
Definition: grpc_types.h:763
grpc_core::Timestamp::ProcessEpoch
static constexpr Timestamp ProcessEpoch()
Definition: src/core/lib/gprpp/time.h:77
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_completion_queue_next
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1133
executor.h
grpc_completion_queue::mu
gpr_mu * mu
Definition: completion_queue.cc:351
cq_vtable::begin_op
bool(* begin_op)(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:211
gpr_types.h
grpc_core::TraceFlag
Definition: debug/trace.h:63
GRPC_STATS_INC_CQS_CREATED
#define GRPC_STATS_INC_CQS_CREATED()
Definition: stats_data.h:180
gpr_spinlock_trylock
#define gpr_spinlock_trylock(lock)
Definition: src/core/lib/gpr/spinlock.h:40
gpr_timespec::clock_type
gpr_clock_type clock_type
Definition: gpr_types.h:55
grpc_core::ApplicationCallbackExecCtx::Available
static bool Available()
Definition: exec_ctx.h:347
cq_begin_op_for_callback
static bool cq_begin_op_for_callback(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:667
grpc_core::ExecCtx::ExecCtx
ExecCtx()
Definition: exec_ctx.h:101
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
grpc_cq_begin_op
bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:672
grpc_cq_internal_unref
void grpc_cq_internal_unref(grpc_completion_queue *cq, const char *reason, const char *file, int line)
Definition: completion_queue.cc:611
cq_begin_op_for_next
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:657
queue
struct queue queue
grpc_api_trace
grpc_core::TraceFlag grpc_api_trace(false, "api")
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES
#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES()
Definition: stats_data.h:389
cq_finish_shutdown_callback
static void cq_finish_shutdown_callback(grpc_completion_queue *cq)
Definition: completion_queue.cc:1369
cq_vtable::shutdown
void(* shutdown)(grpc_completion_queue *cq)
Definition: completion_queue.cc:209
debug_location.h
cq_is_finished_arg::last_seen_things_queued_ever
gpr_atm last_seen_things_queued_ever
Definition: completion_queue.cc:903
cv
unsigned cv
Definition: cxa_demangle.cpp:4908
grpc_cq_internal_ref
void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason, const char *file, int line)
Definition: completion_queue.cc:594
ref_counted.h
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
cq_init_next
static void cq_init_next(void *data, grpc_completion_queue_functor *shutdown_callback)
Definition: completion_queue.cc:551
ExecCtxPluck
Definition: completion_queue.cc:1163
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
ExecCtxNext::check_ready_to_finish_arg_
void * check_ready_to_finish_arg_
Definition: completion_queue.cc:944
grpc_pollset_shutdown
void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure)
Definition: pollset.cc:37
ret
UniquePtr< SSL_SESSION > ret
Definition: ssl_x509.cc:1029
ExecCtxPluck::check_ready_to_finish_arg_
void * check_ready_to_finish_arg_
Definition: completion_queue.cc:1205
alloc.h
grpc_core::MultiProducerSingleConsumerQueue::Node
Definition: mpscq.h:38
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
add_plucker
static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker)
Definition: completion_queue.cc:1138
GRPC_CQ_INTERNAL_UNREF
#define GRPC_CQ_INTERNAL_UNREF(cq, reason)
Definition: src/core/lib/surface/completion_queue.h:65
cq_end_op_for_pluck
static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:769
regen-readme.line
line
Definition: regen-readme.py:30
ok
bool ok
Definition: async_end2end_test.cc:197
grpc_completion_queue_functor
Definition: grpc_types.h:773
arg
struct arg arg
exec_ctx.h
closure
Definition: proxy.cc:59
tls.h
GRPC_CQ_INTERNAL_REF
#define GRPC_CQ_INTERNAL_REF(cq, reason)
Definition: src/core/lib/surface/completion_queue.h:63
cq_finish_shutdown_next
static void cq_finish_shutdown_next(grpc_completion_queue *cq)
Definition: completion_queue.cc:1097
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
cq_check_tag
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq)
Definition: completion_queue.cc:630
grpc_completion_queue_pluck
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1328
g_cq_vtable
static const cq_vtable g_cq_vtable[]
Definition: completion_queue.cc:417
api_trace.h
grpc_completion_queue::pollset_shutdown_done
grpc_closure pollset_shutdown_done
Definition: completion_queue.cc:362
cq_is_finished_arg::tag
void * tag
Definition: completion_queue.cc:907
grpc_core::IncrementIfNonzero
bool IncrementIfNonzero(std::atomic< T > *p)
Definition: atomic_utils.h:31
spinlock.h
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
grpc_completion_queue_destroy
void grpc_completion_queue_destroy(grpc_completion_queue *cq)
Definition: completion_queue.cc:1424
grpc_trace_operation_failures
grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure")
cq_next
static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:963
grpc_completion_queue::owning_refs
grpc_core::RefCount owning_refs
Definition: completion_queue.cc:349
grpc_completion_queue::vtable
const cq_vtable * vtable
Definition: completion_queue.cc:353
atm.h
GRPC_CQ_CALLBACK
@ GRPC_CQ_CALLBACK
Definition: grpc_types.h:766
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
gpr_timespec
Definition: gpr_types.h:50
grpc_error
Definition: error_internal.h:42
size
voidpf void uLong size
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
grpc_core::Timestamp::FromTimespecRoundUp
static Timestamp FromTimespecRoundUp(gpr_timespec t)
Definition: src/core/lib/gprpp/time.cc:136
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
cq_vtable::cq_completion_type
grpc_cq_completion_type cq_completion_type
Definition: completion_queue.cc:206
grpc_pollset_destroy
void grpc_pollset_destroy(grpc_pollset *pollset)
Definition: pollset.cc:41
grpc_core::RefCount
Definition: ref_counted.h:44
sync.h
grpc_closure
Definition: closure.h:56
grpc_get_cq_poll_num
int grpc_get_cq_poll_num(grpc_completion_queue *cq)
Definition: completion_queue.cc:585
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
Definition: grpc_types.h:556
cq_destroy_next
static void cq_destroy_next(void *data)
Definition: completion_queue.cc:556
absl::status_internal::storage
static ABSL_INTERNAL_ATOMIC_HOOK_ATTRIBUTES absl::base_internal::AtomicHook< StatusPayloadPrinter > storage
Definition: abseil-cpp/absl/status/status_payload_printer.cc:26
cq_destroy_pluck
static void cq_destroy_pluck(void *data)
Definition: completion_queue.cc:566
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
cq_is_finished_arg::first_loop
bool first_loop
Definition: completion_queue.cc:908
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
destroy
static std::function< void(void *, Slot *)> destroy
Definition: abseil-cpp/absl/container/internal/hash_policy_traits_test.cc:42
cq_is_finished_arg
Definition: completion_queue.cc:902
ExecCtxPluck::CheckReadyToFinish
bool CheckReadyToFinish() override
Definition: completion_queue.cc:1168
grpc_core::Timestamp::as_timespec
gpr_timespec as_timespec(gpr_clock_type type) const
Definition: src/core/lib/gprpp/time.cc:157
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
GRPC_API_TRACE
#define GRPC_API_TRACE(fmt, nargs, args)
Definition: api_trace.h:48
gpr_cv_init
GPRAPI void gpr_cv_init(gpr_cv *cv)
grpc_core::ExecCtx::InvalidateNow
void InvalidateNow()
Definition: exec_ctx.h:188
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
cq_vtable::pluck
grpc_event(* pluck)(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:217
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:52