Classes | Macros | Functions | Variables
completion_queue.cc File Reference
#include <grpc/support/port_platform.h>
#include "src/core/lib/surface/completion_queue.h"
#include <inttypes.h>
#include <stdio.h>
#include <algorithm>
#include <atomic>
#include <new>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include <grpc/grpc.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/atomic_utils.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/event_string.h"
Include dependency graph for completion_queue.cc:

Go to the source code of this file.

Classes

struct  cq_is_finished_arg
 
struct  cq_vtable
 
class  ExecCtxNext
 
class  ExecCtxPluck
 
struct  grpc_completion_queue
 

Macros

#define DATA_FROM_CQ(cq)   ((void*)((cq) + 1))
 
#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)
 
#define POLLSET_FROM_CQ(cq)   ((grpc_pollset*)((cq)->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
 

Functions

static int add_plucker (grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker)
 
static bool cq_begin_op_for_callback (grpc_completion_queue *cq, void *tag)
 
static bool cq_begin_op_for_next (grpc_completion_queue *cq, void *tag)
 
static bool cq_begin_op_for_pluck (grpc_completion_queue *cq, void *tag)
 
static void cq_check_tag (grpc_completion_queue *cq, void *tag, bool lock_cq)
 
static void cq_destroy_callback (void *data)
 
static void cq_destroy_next (void *data)
 
static void cq_destroy_pluck (void *data)
 
static void cq_end_op_for_callback (grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
 
static void cq_end_op_for_next (grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
 
static void cq_end_op_for_pluck (grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
 
static void cq_finish_shutdown_callback (grpc_completion_queue *cq)
 
static void cq_finish_shutdown_next (grpc_completion_queue *cq)
 
static void cq_finish_shutdown_pluck (grpc_completion_queue *cq)
 
static void cq_init_callback (void *data, grpc_completion_queue_functor *shutdown_callback)
 
static void cq_init_next (void *data, grpc_completion_queue_functor *shutdown_callback)
 
static void cq_init_pluck (void *data, grpc_completion_queue_functor *shutdown_callback)
 
static grpc_event cq_next (grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
 
static grpc_event cq_pluck (grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
 
static void cq_shutdown_callback (grpc_completion_queue *cq)
 
static void cq_shutdown_next (grpc_completion_queue *cq)
 
static void cq_shutdown_pluck (grpc_completion_queue *cq)
 
static void del_plucker (grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker)
 
static void dump_pending_tags (grpc_completion_queue *cq)
 
static void functor_callback (void *arg, grpc_error_handle error)
 
grpc_completion_queuegrpc_completion_queue_create_internal (grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, grpc_completion_queue_functor *shutdown_callback)
 
void grpc_completion_queue_destroy (grpc_completion_queue *cq)
 
grpc_event grpc_completion_queue_next (grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
 
grpc_event grpc_completion_queue_pluck (grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
 
void grpc_completion_queue_shutdown (grpc_completion_queue *cq)
 
int grpc_completion_queue_thread_local_cache_flush (grpc_completion_queue *cq, void **tag, int *ok)
 
void grpc_completion_queue_thread_local_cache_init (grpc_completion_queue *cq)
 
bool grpc_cq_begin_op (grpc_completion_queue *cq, void *tag)
 
bool grpc_cq_can_listen (grpc_completion_queue *cq)
 
void grpc_cq_end_op (grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
 
void grpc_cq_global_init ()
 
void grpc_cq_internal_ref (grpc_completion_queue *cq, const char *reason, const char *file, int line)
 
void grpc_cq_internal_unref (grpc_completion_queue *cq, const char *reason, const char *file, int line)
 
grpc_pollsetgrpc_cq_pollset (grpc_completion_queue *cq)
 
grpc_cq_completion_type grpc_get_cq_completion_type (grpc_completion_queue *cq)
 
int grpc_get_cq_poll_num (grpc_completion_queue *cq)
 
static void on_pollset_shutdown_done (void *arg, grpc_error_handle error)
 

Variables

static const cq_vtable g_cq_vtable []
 
grpc_core::TraceFlag grpc_cq_pluck_trace (false, "queue_pluck")
 
grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount (false, "cq_refcount")
 
grpc_core::TraceFlag grpc_trace_operation_failures (false, "op_failure")
 
grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags (false, "pending_tags")
 

Macro Definition Documentation

◆ DATA_FROM_CQ

#define DATA_FROM_CQ (   cq)    ((void*)((cq) + 1))

Definition at line 432 of file completion_queue.cc.

◆ GRPC_SURFACE_TRACE_RETURNED_EVENT

#define GRPC_SURFACE_TRACE_RETURNED_EVENT (   cq,
  event 
)
Value:
do { \
(event)->type != GRPC_QUEUE_TIMEOUT)) { \
gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, \
grpc_event_string(event).c_str()); \
} \
} while (0)

Definition at line 438 of file completion_queue.cc.

◆ POLLSET_FROM_CQ

#define POLLSET_FROM_CQ (   cq)    ((grpc_pollset*)((cq)->vtable->data_size + (char*)DATA_FROM_CQ(cq)))

Definition at line 433 of file completion_queue.cc.

Function Documentation

◆ add_plucker()

static int add_plucker ( grpc_completion_queue cq,
void *  tag,
grpc_pollset_worker **  worker 
)
static

Definition at line 1138 of file completion_queue.cc.

◆ cq_begin_op_for_callback()

static bool cq_begin_op_for_callback ( grpc_completion_queue cq,
void *  tag 
)
static

Definition at line 667 of file completion_queue.cc.

◆ cq_begin_op_for_next()

static bool cq_begin_op_for_next ( grpc_completion_queue cq,
void *  tag 
)
static

Definition at line 657 of file completion_queue.cc.

◆ cq_begin_op_for_pluck()

static bool cq_begin_op_for_pluck ( grpc_completion_queue cq,
void *  tag 
)
static

Definition at line 662 of file completion_queue.cc.

◆ cq_check_tag()

static void cq_check_tag ( grpc_completion_queue cq,
void *  tag,
bool  lock_cq 
)
static

Definition at line 630 of file completion_queue.cc.

◆ cq_destroy_callback()

static void cq_destroy_callback ( void *  data)
static

Definition at line 576 of file completion_queue.cc.

◆ cq_destroy_next()

static void cq_destroy_next ( void *  data)
static

Definition at line 556 of file completion_queue.cc.

◆ cq_destroy_pluck()

static void cq_destroy_pluck ( void *  data)
static

Definition at line 566 of file completion_queue.cc.

◆ cq_end_op_for_callback()

static void cq_end_op_for_callback ( grpc_completion_queue cq,
void *  tag,
grpc_error_handle  error,
void(*)(void *done_arg, grpc_cq_completion *storage)  done,
void *  done_arg,
grpc_cq_completion storage,
bool  internal 
)
static

Definition at line 839 of file completion_queue.cc.

◆ cq_end_op_for_next()

static void cq_end_op_for_next ( grpc_completion_queue cq,
void *  tag,
grpc_error_handle  error,
void(*)(void *done_arg, grpc_cq_completion *storage)  done,
void *  done_arg,
grpc_cq_completion storage,
bool  internal 
)
static

Definition at line 691 of file completion_queue.cc.

◆ cq_end_op_for_pluck()

static void cq_end_op_for_pluck ( grpc_completion_queue cq,
void *  tag,
grpc_error_handle  error,
void(*)(void *done_arg, grpc_cq_completion *storage)  done,
void *  done_arg,
grpc_cq_completion storage,
bool  internal 
)
static

Definition at line 769 of file completion_queue.cc.

◆ cq_finish_shutdown_callback()

static void cq_finish_shutdown_callback ( grpc_completion_queue cq)
static

Definition at line 1369 of file completion_queue.cc.

◆ cq_finish_shutdown_next()

static void cq_finish_shutdown_next ( grpc_completion_queue cq)
static

Definition at line 1097 of file completion_queue.cc.

◆ cq_finish_shutdown_pluck()

static void cq_finish_shutdown_pluck ( grpc_completion_queue cq)
static

Definition at line 1333 of file completion_queue.cc.

◆ cq_init_callback()

static void cq_init_callback ( void *  data,
grpc_completion_queue_functor shutdown_callback 
)
static

Definition at line 571 of file completion_queue.cc.

◆ cq_init_next()

static void cq_init_next ( void *  data,
grpc_completion_queue_functor shutdown_callback 
)
static

Definition at line 551 of file completion_queue.cc.

◆ cq_init_pluck()

static void cq_init_pluck ( void *  data,
grpc_completion_queue_functor shutdown_callback 
)
static

Definition at line 561 of file completion_queue.cc.

◆ cq_next()

static grpc_event cq_next ( grpc_completion_queue cq,
gpr_timespec  deadline,
void *  reserved 
)
static

Definition at line 963 of file completion_queue.cc.

◆ cq_pluck()

static grpc_event cq_pluck ( grpc_completion_queue cq,
void *  tag,
gpr_timespec  deadline,
void *  reserved 
)
static

Definition at line 1208 of file completion_queue.cc.

◆ cq_shutdown_callback()

static void cq_shutdown_callback ( grpc_completion_queue cq)
static

Definition at line 1388 of file completion_queue.cc.

◆ cq_shutdown_next()

static void cq_shutdown_next ( grpc_completion_queue cq)
static

Definition at line 1106 of file completion_queue.cc.

◆ cq_shutdown_pluck()

static void cq_shutdown_pluck ( grpc_completion_queue cq)
static

Definition at line 1345 of file completion_queue.cc.

◆ del_plucker()

static void del_plucker ( grpc_completion_queue cq,
void *  tag,
grpc_pollset_worker **  worker 
)
static

Definition at line 1150 of file completion_queue.cc.

◆ dump_pending_tags()

static void dump_pending_tags ( grpc_completion_queue cq)
static

Definition at line 948 of file completion_queue.cc.

◆ functor_callback()

static void functor_callback ( void *  arg,
grpc_error_handle  error 
)
static

Definition at line 833 of file completion_queue.cc.

◆ grpc_completion_queue_create_internal()

grpc_completion_queue* grpc_completion_queue_create_internal ( grpc_cq_completion_type  completion_type,
grpc_cq_polling_type  polling_type,
grpc_completion_queue_functor shutdown_callback 
)

Definition at line 514 of file completion_queue.cc.

◆ grpc_completion_queue_destroy()

void grpc_completion_queue_destroy ( grpc_completion_queue cq)

Destroy a completion queue. The caller must ensure that the queue is drained and no threads are executing grpc_completion_queue_next

Definition at line 1424 of file completion_queue.cc.

◆ grpc_completion_queue_next()

grpc_event grpc_completion_queue_next ( grpc_completion_queue cq,
gpr_timespec  deadline,
void *  reserved 
)

Blocks until an event is available, the completion queue is being shut down, or deadline is reached.

Returns a grpc_event with type GRPC_QUEUE_TIMEOUT on timeout, otherwise a grpc_event describing the event that occurred.

Callers must not call grpc_completion_queue_next and grpc_completion_queue_pluck simultaneously on the same completion queue.

Definition at line 1133 of file completion_queue.cc.

◆ grpc_completion_queue_pluck()

grpc_event grpc_completion_queue_pluck ( grpc_completion_queue cq,
void *  tag,
gpr_timespec  deadline,
void *  reserved 
)

Blocks until an event with tag 'tag' is available, the completion queue is being shutdown or deadline is reached.

Returns a grpc_event with type GRPC_QUEUE_TIMEOUT on timeout, otherwise a grpc_event describing the event that occurred.

Callers must not call grpc_completion_queue_next and grpc_completion_queue_pluck simultaneously on the same completion queue.

Completion queues support a maximum of GRPC_MAX_COMPLETION_QUEUE_PLUCKERS concurrently executing plucks at any time.

Definition at line 1328 of file completion_queue.cc.

◆ grpc_completion_queue_shutdown()

void grpc_completion_queue_shutdown ( grpc_completion_queue cq)

Begin destruction of a completion queue. Once all possible events are drained then grpc_completion_queue_next will start to produce GRPC_QUEUE_SHUTDOWN events only. At that point it's safe to call grpc_completion_queue_destroy.

After calling this function applications should ensure that no NEW work is added to be published on this completion queue.

Definition at line 1416 of file completion_queue.cc.

◆ grpc_completion_queue_thread_local_cache_flush()

int grpc_completion_queue_thread_local_cache_flush ( grpc_completion_queue cq,
void **  tag,
int ok 
)

Flushes the thread local cache for cq. Returns 1 if there was contents in the cache. If there was an event in cq tls cache, its tag is placed in tag, and ok is set to the event success.

Definition at line 459 of file completion_queue.cc.

◆ grpc_completion_queue_thread_local_cache_init()

void grpc_completion_queue_thread_local_cache_init ( grpc_completion_queue cq)

Initializes a thread local cache for cq. grpc_flush_cq_tls_cache() MUST be called on the same thread, with the same cq.

Definition at line 452 of file completion_queue.cc.

◆ grpc_cq_begin_op()

bool grpc_cq_begin_op ( grpc_completion_queue cq,
void *  tag 
)

Definition at line 672 of file completion_queue.cc.

◆ grpc_cq_can_listen()

bool grpc_cq_can_listen ( grpc_completion_queue cq)

Definition at line 1437 of file completion_queue.cc.

◆ grpc_cq_end_op()

void grpc_cq_end_op ( grpc_completion_queue cq,
void *  tag,
grpc_error_handle  error,
void(*)(void *done_arg, grpc_cq_completion *storage)  done,
void *  done_arg,
grpc_cq_completion storage,
bool  internal 
)

Definition at line 894 of file completion_queue.cc.

◆ grpc_cq_global_init()

void grpc_cq_global_init ( )

Definition at line 450 of file completion_queue.cc.

◆ grpc_cq_internal_ref()

void grpc_cq_internal_ref ( grpc_completion_queue cq,
const char *  reason,
const char *  file,
int  line 
)

Definition at line 594 of file completion_queue.cc.

◆ grpc_cq_internal_unref()

void grpc_cq_internal_unref ( grpc_completion_queue cq,
const char *  reason,
const char *  file,
int  line 
)

Definition at line 611 of file completion_queue.cc.

◆ grpc_cq_pollset()

grpc_pollset* grpc_cq_pollset ( grpc_completion_queue cq)

Definition at line 1433 of file completion_queue.cc.

◆ grpc_get_cq_completion_type()

grpc_cq_completion_type grpc_get_cq_completion_type ( grpc_completion_queue cq)

Definition at line 581 of file completion_queue.cc.

◆ grpc_get_cq_poll_num()

int grpc_get_cq_poll_num ( grpc_completion_queue cq)

Definition at line 585 of file completion_queue.cc.

◆ on_pollset_shutdown_done()

static void on_pollset_shutdown_done ( void *  arg,
grpc_error_handle  error 
)
static

Definition at line 605 of file completion_queue.cc.

Variable Documentation

◆ g_cq_vtable

const cq_vtable g_cq_vtable[]
static

◆ grpc_cq_pluck_trace

grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck")

◆ grpc_trace_cq_refcount

grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount")

◆ grpc_trace_operation_failures

grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure")

◆ grpc_trace_pending_tags

grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags")
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
GRPC_CQ_NEXT
@ GRPC_CQ_NEXT
Definition: grpc_types.h:760
cq_end_op_for_callback
static void cq_end_op_for_callback(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:839
cq_init_callback
static void cq_init_callback(void *data, grpc_completion_queue_functor *shutdown_callback)
Definition: completion_queue.cc:571
cq_begin_op_for_pluck
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:662
cq_shutdown_callback
static void cq_shutdown_callback(grpc_completion_queue *cq)
Definition: completion_queue.cc:1388
cq_init_pluck
static void cq_init_pluck(void *data, grpc_completion_queue_functor *shutdown_callback)
Definition: completion_queue.cc:561
cq_end_op_for_next
static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:691
cq_destroy_callback
static void cq_destroy_callback(void *data)
Definition: completion_queue.cc:576
cq_pluck
static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1208
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
cq_shutdown_next
static void cq_shutdown_next(grpc_completion_queue *cq)
Definition: completion_queue.cc:1106
cq_shutdown_pluck
static void cq_shutdown_pluck(grpc_completion_queue *cq)
Definition: completion_queue.cc:1345
grpc_cq_pluck_trace
grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck")
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
GRPC_CQ_PLUCK
@ GRPC_CQ_PLUCK
Definition: grpc_types.h:763
cq_begin_op_for_callback
static bool cq_begin_op_for_callback(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:667
cq_begin_op_for_next
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:657
grpc_api_trace
grpc_core::TraceFlag grpc_api_trace(false, "api")
cq_init_next
static void cq_init_next(void *data, grpc_completion_queue_functor *shutdown_callback)
Definition: completion_queue.cc:551
cq_end_op_for_pluck
static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:769
grpc_event_string
std::string grpc_event_string(grpc_event *ev)
Definition: event_string.cc:39
cq_next
static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:963
GRPC_CQ_CALLBACK
@ GRPC_CQ_CALLBACK
Definition: grpc_types.h:766
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
Definition: grpc_types.h:556
cq_destroy_next
static void cq_destroy_next(void *data)
Definition: completion_queue.cc:556
cq_destroy_pluck
static void cq_destroy_pluck(void *data)
Definition: completion_queue.cc:566
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37


grpc
Author(s):
autogenerated on Fri May 16 2025 03:01:07