Go to the documentation of this file.
32 #include "absl/status/status.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/str_join.h"
77 struct cq_poller_vtable {
89 typedef struct non_polling_worker {
92 struct non_polling_worker*
next;
93 struct non_polling_worker* prev;
96 struct non_polling_poller {
98 bool kicked_without_poller;
99 non_polling_worker*
root;
102 size_t non_polling_poller_size(
void) {
return sizeof(non_polling_poller); }
105 non_polling_poller* npp =
reinterpret_cast<non_polling_poller*
>(pollset);
110 void non_polling_poller_destroy(
grpc_pollset* pollset) {
111 non_polling_poller* npp =
reinterpret_cast<non_polling_poller*
>(pollset);
118 non_polling_poller* npp =
reinterpret_cast<non_polling_poller*
>(pollset);
120 if (npp->kicked_without_poller) {
121 npp->kicked_without_poller =
false;
124 non_polling_worker w;
127 if (npp->root ==
nullptr) {
128 npp->root = w.next = w.prev = &w;
131 w.prev = w.next->prev;
132 w.next->prev = w.prev->next = &w;
136 while (!npp->shutdown && !w.kicked &&
140 if (&w == npp->root) {
142 if (&w == npp->root) {
149 w.next->prev = w.prev;
150 w.prev->next = w.next;
158 non_polling_poller*
p =
reinterpret_cast<non_polling_poller*
>(pollset);
159 if (specific_worker ==
nullptr) {
162 if (specific_worker !=
nullptr) {
163 non_polling_worker* w =
164 reinterpret_cast<non_polling_worker*
>(specific_worker);
170 p->kicked_without_poller =
true;
176 non_polling_poller*
p =
reinterpret_cast<non_polling_poller*
>(pollset);
179 if (
p->root ==
nullptr) {
182 non_polling_worker* w =
p->root;
186 }
while (w !=
p->root);
190 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
198 {
false,
false, non_polling_poller_size, non_polling_poller_init,
199 non_polling_poller_kick, non_polling_poller_work,
200 non_polling_poller_shutdown, non_polling_poller_destroy},
229 CqEventQueue() =
default;
230 ~CqEventQueue() =
default;
235 return num_queue_items_.load(std::memory_order_relaxed);
250 std::atomic<intptr_t> num_queue_items_{0};
253 struct cq_next_data {
257 if (pending_events.load(std::memory_order_acquire) != 0) {
268 std::atomic<intptr_t> things_queued_ever{0};
272 std::atomic<intptr_t> pending_events{1};
275 bool shutdown_called =
false;
278 struct cq_pluck_data {
280 completed_tail = &completed_head;
281 completed_head.next =
reinterpret_cast<uintptr_t>(completed_tail);
286 reinterpret_cast<uintptr_t>(&completed_head));
288 if (pending_events.load(std::memory_order_acquire) != 0) {
300 std::atomic<intptr_t> pending_events{1};
304 std::atomic<intptr_t> things_queued_ever{0};
310 std::atomic<bool> shutdown{
false};
313 bool shutdown_called =
false;
315 int num_pluckers = 0;
319 struct cq_callback_data {
321 : shutdown_callback(shutdown_callback) {}
323 ~cq_callback_data() {
325 if (pending_events.load(std::memory_order_acquire) != 0) {
335 std::atomic<intptr_t> pending_events{1};
338 bool shutdown_called =
false;
432 #define DATA_FROM_CQ(cq) ((void*)((cq) + 1))
433 #define POLLSET_FROM_CQ(cq) \
434 ((grpc_pollset*)((cq)->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
438 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
440 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) && \
441 (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace) || \
442 (event)->type != GRPC_QUEUE_TIMEOUT)) { \
443 gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, \
444 grpc_event_string(event).c_str()); \
453 if (g_cached_cq ==
nullptr) {
454 g_cached_event =
nullptr;
460 void**
tag,
int*
ok) {
463 if (
storage !=
nullptr && g_cached_cq ==
cq) {
470 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
478 g_cached_event =
nullptr;
479 g_cached_cq =
nullptr;
487 return num_queue_items_.fetch_add(1, std::memory_order_relaxed) == 0;
496 bool is_empty =
false;
500 if (
c ==
nullptr && !is_empty) {
508 num_queue_items_.fetch_sub(1, std::memory_order_relaxed);
522 "grpc_completion_queue_create_internal(completion_type=%d, "
524 2, (completion_type, polling_type));
527 const cq_poller_vtable* poller_vtable =
528 &g_poller_vtable_by_poller_type[polling_type];
535 poller_vtable->size()));
547 grpc_schedule_on_exec_ctx);
553 new (
data) cq_next_data();
557 cq_next_data* cqd =
static_cast<cq_next_data*
>(
data);
558 cqd->~cq_next_data();
563 new (
data) cq_pluck_data();
567 cq_pluck_data* cqd =
static_cast<cq_pluck_data*
>(
data);
568 cqd->~cq_pluck_data();
573 new (
data) cq_callback_data(shutdown_callback);
577 cq_callback_data* cqd =
static_cast<cq_callback_data*
>(
data);
578 cqd->~cq_callback_data();
590 return cur_num_polls;
600 const char* reason =
nullptr;
617 const char* reason =
nullptr;
663 cq_pluck_data* cqd =
static_cast<cq_pluck_data*
> DATA_FROM_CQ(
cq);
668 cq_callback_data* cqd =
static_cast<cq_callback_data*
> DATA_FROM_CQ(
cq);
702 "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
703 "done=%p, done_arg=%p, storage=%p)",
721 if (g_cached_cq ==
cq && g_cached_event ==
nullptr) {
725 bool is_first = cqd->queue.Push(
storage);
726 cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
732 if (cqd->pending_events.load(std::memory_order_acquire) != 1) {
746 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
755 cqd->pending_events.store(0, std::memory_order_release);
775 cq_pluck_data* cqd =
static_cast<cq_pluck_data*
> DATA_FROM_CQ(
cq);
783 "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
784 "done=%p, done_arg=%p, storage=%p)",
803 cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
804 cqd->completed_tail->next =
808 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
813 for (
int i = 0;
i < cqd->num_pluckers;
i++) {
814 if (cqd->pluckers[
i].tag ==
tag) {
815 pluck_worker = *cqd->pluckers[
i].worker;
845 cq_callback_data* cqd =
static_cast<cq_callback_data*
> DATA_FROM_CQ(
cq);
852 "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
853 "done=%p, done_arg=%p, storage=%p)",
868 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
879 if (((
internal || functor->inlineable) &&
922 intptr_t current_last_seen_things_queued_ever =
923 cqd->things_queued_ever.load(std::memory_order_relaxed);
925 if (current_last_seen_things_queued_ever !=
926 a->last_seen_things_queued_ever) {
927 a->last_seen_things_queued_ever =
928 cqd->things_queued_ever.load(std::memory_order_relaxed);
935 a->stolen_completion = cqd->queue.Pop();
936 if (
a->stolen_completion !=
nullptr) {
950 std::vector<std::string> parts;
951 parts.push_back(
"PENDING TAGS:");
971 "grpc_completion_queue_next("
973 "deadline=gpr_timespec { tv_sec: %" PRId64
974 ", tv_nsec: %d, clock_type: %d }, "
988 cqd->things_queued_ever.load(std::memory_order_relaxed),
1002 ret.success =
c->next & 1
u;
1004 c->done(
c->done_arg,
c);
1012 ret.success =
c->next & 1
u;
1014 c->done(
c->done_arg,
c);
1022 if (cqd->queue.num_items() > 0) {
1027 if (cqd->pending_events.load(std::memory_order_acquire) == 0) {
1032 if (cqd->queue.num_items() > 0) {
1076 if (cqd->queue.num_items() > 0 &&
1077 cqd->pending_events.load(std::memory_order_acquire) > 0) {
1101 GPR_ASSERT(cqd->pending_events.load(std::memory_order_relaxed) == 0);
1117 if (cqd->shutdown_called) {
1122 cqd->shutdown_called =
true;
1126 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1140 cq_pluck_data* cqd =
static_cast<cq_pluck_data*
> DATA_FROM_CQ(
cq);
1144 cqd->pluckers[cqd->num_pluckers].tag =
tag;
1145 cqd->pluckers[cqd->num_pluckers].worker =
worker;
1146 cqd->num_pluckers++;
1152 cq_pluck_data* cqd =
static_cast<cq_pluck_data*
> DATA_FROM_CQ(
cq);
1153 for (
int i = 0;
i < cqd->num_pluckers;
i++) {
1154 if (cqd->pluckers[
i].tag ==
tag && cqd->pluckers[
i].worker ==
worker) {
1155 cqd->num_pluckers--;
1156 std::swap(cqd->pluckers[
i], cqd->pluckers[cqd->num_pluckers]);
1172 cq_pluck_data* cqd =
static_cast<cq_pluck_data*
> DATA_FROM_CQ(
cq);
1175 gpr_atm current_last_seen_things_queued_ever =
1176 cqd->things_queued_ever.load(std::memory_order_relaxed);
1177 if (current_last_seen_things_queued_ever !=
1178 a->last_seen_things_queued_ever) {
1180 a->last_seen_things_queued_ever =
1181 cqd->things_queued_ever.load(std::memory_order_relaxed);
1186 &cqd->completed_head) {
1187 if (
c->tag ==
a->tag) {
1189 (
c->next & ~static_cast<uintptr_t>(1));
1190 if (
c == cqd->completed_tail) {
1191 cqd->completed_tail = prev;
1194 a->stolen_completion =
c;
1216 cq_pluck_data* cqd =
static_cast<cq_pluck_data*
> DATA_FROM_CQ(
cq);
1220 "grpc_completion_queue_pluck("
1222 "deadline=gpr_timespec { tv_sec: %" PRId64
1223 ", tv_nsec: %d, clock_type: %d }, "
1238 cqd->things_queued_ever.load(std::memory_order_relaxed),
1251 ret.success =
c->next & 1
u;
1253 c->done(
c->done_arg,
c);
1256 prev = &cqd->completed_head;
1259 &cqd->completed_head) {
1260 if (
c->tag ==
tag) {
1262 (
c->next & ~static_cast<uintptr_t>(1));
1263 if (
c == cqd->completed_tail) {
1264 cqd->completed_tail = prev;
1268 ret.success =
c->next & 1
u;
1270 c->done(
c->done_arg,
c);
1275 if (cqd->shutdown.load(std::memory_order_relaxed)) {
1283 "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1334 cq_pluck_data* cqd =
static_cast<cq_pluck_data*
> DATA_FROM_CQ(
cq);
1337 GPR_ASSERT(!cqd->shutdown.load(std::memory_order_relaxed));
1338 cqd->shutdown.store(
true, std::memory_order_relaxed);
1346 cq_pluck_data* cqd =
static_cast<cq_pluck_data*
> DATA_FROM_CQ(
cq);
1356 if (cqd->shutdown_called) {
1361 cqd->shutdown_called =
true;
1362 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1370 cq_callback_data* cqd =
static_cast<cq_callback_data*
> DATA_FROM_CQ(
cq);
1371 auto*
callback = cqd->shutdown_callback;
1389 cq_callback_data* cqd =
static_cast<cq_callback_data*
> DATA_FROM_CQ(
cq);
1399 if (cqd->shutdown_called) {
1404 cqd->shutdown_called =
true;
1405 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
struct grpc_pollset_worker grpc_pollset_worker
grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags")
GPRAPI void gpr_cv_signal(gpr_cv *cv)
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
size_t outstanding_tag_capacity
#define GPR_SPINLOCK_INITIALIZER
size_t grpc_pollset_size(void)
static void cq_finish_shutdown_pluck(grpc_completion_queue *cq)
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
static void Enqueue(grpc_completion_queue_functor *functor, int is_success)
static const grpc_transport_vtable vtable
grpc_core::Timestamp deadline
grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount")
grpc_event(* next)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
static void functor_callback(void *arg, grpc_error_handle error)
static void dump_pending_tags(grpc_completion_queue *cq)
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
void(* end_op)(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
bool grpc_iomgr_is_any_background_poller_thread()
static void cq_end_op_for_callback(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
#define GPR_TIMER_SCOPE(tag, important)
static void worker(void *arg)
static void cq_init_callback(void *data, grpc_completion_queue_functor *shutdown_callback)
int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq, void **tag, int *ok)
GPRAPI void gpr_free(void *ptr)
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag)
static void cq_shutdown_callback(grpc_completion_queue *cq)
OPENSSL_EXPORT pem_password_cb void * u
static void cq_init_pluck(void *data, grpc_completion_queue_functor *shutdown_callback)
struct grpc_event grpc_event
static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
void grpc_cq_end_op(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
static void cq_destroy_callback(void *data)
grpc_error_handle grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker **worker, grpc_core::Timestamp deadline)
static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
bool CheckReadyToFinish() override
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
#define GRPC_ERROR_CANCELLED
#define GRPC_TRACE_FLAG_ENABLED(f)
void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq)
static void Run(grpc_closure *closure, grpc_error_handle error, ExecutorType executor_type=ExecutorType::DEFAULT, ExecutorJobType job_type=ExecutorJobType::SHORT)
#define POLLSET_FROM_CQ(cq)
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu)
#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES()
#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES()
GPRAPI void * gpr_zalloc(size_t size)
static void on_pollset_shutdown_done(void *arg, grpc_error_handle error)
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
static void cq_shutdown_next(grpc_completion_queue *cq)
static void cq_shutdown_pluck(grpc_completion_queue *cq)
grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck")
size_t outstanding_tag_count
void(* functor_run)(struct grpc_completion_queue_functor *, int)
void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
GPRAPI void * gpr_realloc(void *p, size_t size)
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
def c_str(s, encoding='ascii')
GPRAPI void gpr_cv_destroy(gpr_cv *cv)
static void * tag(intptr_t t)
RefCountedPtr< grpc_tls_certificate_provider > root
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq)
static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker)
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
void grpc_cq_global_init()
grpc_cq_completion * stolen_completion
GPRAPI void gpr_mu_init(gpr_mu *mu)
const cq_poller_vtable * poller_vtable
static PyObject * Pop(PyObject *pself, PyObject *args)
grpc_error * grpc_error_handle
grpc_pollset * grpc_cq_pollset(grpc_completion_queue *cq)
grpc_completion_queue * cq
grpc_completion_queue * grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, grpc_completion_queue_functor *shutdown_callback)
void(* init)(void *data, grpc_completion_queue_functor *shutdown_callback)
#define gpr_spinlock_unlock(lock)
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
void swap(Json::Value &a, Json::Value &b)
Specialize std::swap() for Json::Value.
GPRAPI void gpr_mu_lock(gpr_mu *mu)
static void callback(void *arg, int status, int timeouts, struct hostent *host)
_W64 unsigned int uintptr_t
#define GPR_THREAD_LOCAL(type)
bool grpc_cq_can_listen(grpc_completion_queue *cq)
grpc_error_handle grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker)
#define GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)
void(* destroy)(void *data)
static constexpr Timestamp ProcessEpoch()
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
bool(* begin_op)(grpc_completion_queue *cq, void *tag)
#define GRPC_STATS_INC_CQS_CREATED()
#define gpr_spinlock_trylock(lock)
gpr_clock_type clock_type
static bool cq_begin_op_for_callback(grpc_completion_queue *cq, void *tag)
bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag)
void grpc_cq_internal_unref(grpc_completion_queue *cq, const char *reason, const char *file, int line)
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag)
grpc_core::TraceFlag grpc_api_trace(false, "api")
#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES()
static void cq_finish_shutdown_callback(grpc_completion_queue *cq)
void(* shutdown)(grpc_completion_queue *cq)
gpr_atm last_seen_things_queued_ever
void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason, const char *file, int line)
static void cq_init_next(void *data, grpc_completion_queue_functor *shutdown_callback)
grpc_core::ExecCtx exec_ctx
std::string grpc_error_std_string(grpc_error_handle error)
void * check_ready_to_finish_arg_
void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure)
UniquePtr< SSL_SESSION > ret
void * check_ready_to_finish_arg_
AllocList * next[kMaxLevel]
static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker)
#define GRPC_CQ_INTERNAL_UNREF(cq, reason)
static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
#define GRPC_CQ_INTERNAL_REF(cq, reason)
static void cq_finish_shutdown_next(grpc_completion_queue *cq)
#define GRPC_ERROR_UNREF(err)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq)
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
static const cq_vtable g_cq_vtable[]
grpc_closure pollset_shutdown_done
bool IncrementIfNonzero(std::atomic< T > *p)
void grpc_completion_queue_destroy(grpc_completion_queue *cq)
grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure")
static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
grpc_core::RefCount owning_refs
static Timestamp FromTimespecRoundUp(gpr_timespec t)
grpc_cq_completion_type cq_completion_type
void grpc_pollset_destroy(grpc_pollset *pollset)
int grpc_get_cq_poll_num(grpc_completion_queue *cq)
static void cq_destroy_next(void *data)
static ABSL_INTERNAL_ATOMIC_HOOK_ATTRIBUTES absl::base_internal::AtomicHook< StatusPayloadPrinter > storage
static void cq_destroy_pluck(void *data)
static grpc_completion_queue * cq
static std::function< void(void *, Slot *)> destroy
bool CheckReadyToFinish() override
gpr_timespec as_timespec(gpr_clock_type type) const
#define GRPC_API_TRACE(fmt, nargs, args)
GPRAPI void gpr_cv_init(gpr_cv *cv)
#define GRPC_ERROR_IS_NONE(err)
grpc_event(* pluck)(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:52