Go to the documentation of this file.
38 #define EXECUTOR_TRACE(format, ...) \
40 if (GRPC_TRACE_FLAG_ENABLED(executor_trace)) { \
41 gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \
45 #define EXECUTOR_TRACE0(str) \
47 if (GRPC_TRACE_FLAG_ENABLED(executor_trace)) { \
48 gpr_log(GPR_INFO, "EXECUTOR " str); \
84 {{default_enqueue_short, default_enqueue_long},
85 {resolver_enqueue_short, resolver_enqueue_long}};
115 while (c !=
nullptr) {
118 EXECUTOR_TRACE(
"(%s) run %p [created by %s:%d]", executor_name, c,
119 c->file_created, c->line_created);
120 c->scheduled =
false;
124 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
127 c->error_data.error = 0;
132 c->error_data.error = 0;
133 c->cb(c->cb_arg,
error);
153 if (curr_num_threads > 0) {
175 if (curr_num_threads == 0) {
193 for (
gpr_atm i = 0;
i < curr_num_threads;
i++) {
196 i + 1, curr_num_threads);
224 g_this_thread_state = ts;
228 size_t subtract_depth = 0;
230 EXECUTOR_TRACE(
"(%s) [%" PRIdPTR
"]: step (sub_depth=%" PRIdPTR
")",
231 ts->
name, ts->
id, subtract_depth);
234 ts->
depth -= subtract_depth;
257 g_this_thread_state =
nullptr;
266 size_t cur_thread_count =
271 if (cur_thread_count == 0) {
292 bool try_new_thread =
false;
297 "(%s) try to schedule %p (%s) (created %s:%d) to thread "
303 closure, is_short ?
"short" :
"long", ts->
id);
326 try_new_thread =
true;
371 }
while (retry_push);
400 executor_enqueue_fns_[
static_cast<size_t>(executor_type)]
439 return executors[
static_cast<size_t>(executor_type)]->
IsThreaded();
450 executors[
i]->SetThreading(enable);
455 EXECUTOR_TRACE(
"Executor::SetThreadingDefault(%d) called", enable);
GPRAPI void gpr_cv_signal(gpr_cv *cv)
GPRAPI unsigned gpr_cpu_num_cores(void)
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
#define gpr_atm_no_barrier_load(p)
void SetThreading(bool threading)
static bool IsThreadedDefault()
#define GRPC_CLOSURE_LIST_INIT
GPRAPI void gpr_free(void *ptr)
void grpc_iomgr_platform_shutdown_background_closure()
OPENSSL_EXPORT pem_password_cb void * u
Executor(const char *executor_name)
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
bool grpc_iomgr_platform_add_closure_to_background_poller(grpc_closure *closure, grpc_error_handle error)
static PerThreadSynch * Enqueue(PerThreadSynch *head, SynchWaitParams *waitp, intptr_t mu, int flags)
static void ShutdownAll()
static void SetThreadingAll(bool enable)
#define gpr_spinlock_lock(lock)
static void Run(grpc_closure *closure, grpc_error_handle error, ExecutorType executor_type=ExecutorType::DEFAULT, ExecutorJobType job_type=ExecutorJobType::SHORT)
bool grpc_closure_list_append(grpc_closure_list *closure_list, grpc_closure *closure)
TraceFlag executor_trace(false, "executor")
GPRAPI void * gpr_zalloc(size_t size)
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
GPRAPI void gpr_cv_destroy(gpr_cv *cv)
GPRAPI void gpr_mu_init(gpr_mu *mu)
static size_t RunClosures(const char *executor_name, grpc_closure_list list)
#define gpr_atm_acq_load(p)
static void SetThreadingDefault(bool enable)
#define gpr_spinlock_unlock(lock)
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
#define EXECUTOR_TRACE(format,...)
#define gpr_atm_rel_store(p, value)
GPRAPI void gpr_mu_lock(gpr_mu *mu)
constexpr size_t HashPointer(T *p, size_t range)
#define GPR_THREAD_LOCAL(type)
#define gpr_spinlock_trylock(lock)
#define GPR_SPINLOCK_STATIC_INITIALIZER
#define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD
void Enqueue(grpc_closure *closure, grpc_error_handle error, bool is_short)
grpc_core::ExecCtx exec_ctx
#define EXECUTOR_TRACE0(str)
AllocList * next[kMaxLevel]
#define GRPC_ERROR_UNREF(err)
absl::Status StatusMoveFromHeapPtr(uintptr_t ptr)
Move the status from a heap ptr. (GetFrom & FreeHeap)
static void ThreadMain(void *arg)
#define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD
bool grpc_closure_list_empty(grpc_closure_list closure_list)
gpr_spinlock adding_thread_lock_
void grpc_executor_global_init()
GPRAPI void gpr_cv_init(gpr_cv *cv)
grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:16