combiner.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <assert.h>
24 #include <inttypes.h>
25 #include <string.h>
26 
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 
33 
35 
36 #define GRPC_COMBINER_TRACE(fn) \
37  do { \
38  if (grpc_combiner_trace.enabled()) { \
39  fn; \
40  } \
41  } while (0)
42 
43 #define STATE_UNORPHANED 1
44 #define STATE_ELEM_COUNT_LOW_BIT 2
45 
51 
52 static void offload(void* arg, grpc_error_handle error);
53 
56  gpr_ref_init(&lock->refs, 1);
59  GRPC_CLOSURE_INIT(&lock->offload, offload, lock, nullptr);
60  GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock));
61  return lock;
62 }
63 
64 static void really_destroy(grpc_core::Combiner* lock) {
65  GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p really_destroy", lock));
67  delete lock;
68 }
69 
70 static void start_destroy(grpc_core::Combiner* lock) {
73  GPR_INFO, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
74  if (old_state == 1) {
75  really_destroy(lock);
76  }
77 }
78 
79 #ifndef NDEBUG
80 #define GRPC_COMBINER_DEBUG_SPAM(op, delta) \
81  if (grpc_combiner_trace.enabled()) { \
82  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, \
83  "C:%p %s %" PRIdPTR " --> %" PRIdPTR " %s", lock, (op), \
84  gpr_atm_no_barrier_load(&lock->refs.count), \
85  gpr_atm_no_barrier_load(&lock->refs.count) + (delta), reason); \
86  }
87 #else
88 #define GRPC_COMBINER_DEBUG_SPAM(op, delta)
89 #endif
90 
92  GRPC_COMBINER_DEBUG_SPAM("UNREF", -1);
93  if (gpr_unref(&lock->refs)) {
94  start_destroy(lock);
95  }
96 }
97 
100  GRPC_COMBINER_DEBUG_SPAM(" REF", 1);
101  gpr_ref(&lock->refs);
102  return lock;
103 }
104 
106  lock->next_combiner_on_this_exec_ctx = nullptr;
107  if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
110  } else {
112  ->combiner_data()
115  }
116 }
117 
122  if (lock->next_combiner_on_this_exec_ctx == nullptr) {
124  }
125 }
126 
131  "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR,
132  lock, cl, last));
133  if (last == 1) {
136  reinterpret_cast<gpr_atm>(grpc_core::ExecCtx::Get()));
137  // first element on this list: add it to the list of combiner locks
138  // executing within this exec_ctx
139  push_last_on_exec_ctx(lock);
140  } else {
141  // there may be a race with setting here: if that happens, we may delay
142  // offload for one or two actions, and that's fine
143  gpr_atm initiator =
145  if (initiator != 0 &&
146  initiator != reinterpret_cast<gpr_atm>(grpc_core::ExecCtx::Get())) {
148  }
149  }
150  GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
151  assert(cl->cb);
152 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
154 #else
155  cl->error_data.error = reinterpret_cast<intptr_t>(error);
156 #endif
157  lock->queue.Push(cl->next_data.mpscq_node.get());
158 }
159 
160 static void move_next() {
163  ->combiner_data()
165  if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
167  }
168 }
169 
170 static void offload(void* arg, grpc_error_handle /*error*/) {
171  grpc_core::Combiner* lock = static_cast<grpc_core::Combiner*>(arg);
172  push_last_on_exec_ctx(lock);
173 }
174 
175 static void queue_offload(grpc_core::Combiner* lock) {
176  move_next();
177  GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock));
179 }
180 
182  grpc_core::Combiner* lock =
184  if (lock == nullptr) {
185  return false;
186  }
187 
188  bool contended =
190 
192  "C:%p grpc_combiner_continue_exec_ctx "
193  "contended=%d "
194  "exec_ctx_ready_to_finish=%d "
195  "time_to_execute_final_list=%d",
196  lock, contended,
197  grpc_core::ExecCtx::Get()->IsReadyToFinish(),
199 
200  // offload only if all the following conditions are true:
201  // 1. the combiner is contended and has more than one closure to execute
202  // 2. the current execution context needs to finish as soon as possible
203  // 3. the current thread is not a worker for any background poller
204  // 4. the DEFAULT executor is threaded
205  if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() &&
208  // this execution context wants to move on: schedule remaining work to be
209  // picked up on the executor
210  queue_offload(lock);
211  return true;
212  }
213 
214  if (!lock->time_to_execute_final_list ||
215  // peek to see if something new has shown up, and execute that with
216  // priority
217  (gpr_atm_acq_load(&lock->state) >> 1) > 1) {
220  gpr_log(GPR_INFO, "C:%p maybe_finish_one n=%p", lock, n));
221  if (n == nullptr) {
222  // queue is in an inconsistent state: use this as a cue that we should
223  // go off and do something else for a while (and come back later)
224  queue_offload(lock);
225  return true;
226  }
227  grpc_closure* cl = reinterpret_cast<grpc_closure*>(n);
228 #ifndef NDEBUG
229  cl->scheduled = false;
230 #endif
231 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
232  grpc_error_handle cl_err =
234  cl->error_data.error = 0;
235  cl->cb(cl->cb_arg, std::move(cl_err));
236 #else
237  grpc_error_handle cl_err =
238  reinterpret_cast<grpc_error_handle>(cl->error_data.error);
239  cl->error_data.error = 0;
240  cl->cb(cl->cb_arg, cl_err);
241  GRPC_ERROR_UNREF(cl_err);
242 #endif
243  } else {
244  grpc_closure* c = lock->final_list.head;
245  GPR_ASSERT(c != nullptr);
247  int loops = 0;
248  while (c != nullptr) {
250  gpr_log(GPR_INFO, "C:%p execute_final[%d] c=%p", lock, loops, c));
251  grpc_closure* next = c->next_data.next;
252 #ifndef NDEBUG
253  c->scheduled = false;
254 #endif
255 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
258  c->error_data.error = 0;
259  c->cb(c->cb_arg, std::move(error));
260 #else
262  reinterpret_cast<grpc_error_handle>(c->error_data.error);
263  c->error_data.error = 0;
264  c->cb(c->cb_arg, error);
266 #endif
267  c = next;
268  }
269  }
270 
271  move_next();
272  lock->time_to_execute_final_list = false;
273  gpr_atm old_state =
276  gpr_log(GPR_INFO, "C:%p finish old_state=%" PRIdPTR, lock, old_state));
277 // Define a macro to ease readability of the following switch statement.
278 #define OLD_STATE_WAS(orphaned, elem_count) \
279  (((orphaned) ? 0 : STATE_UNORPHANED) | \
280  ((elem_count)*STATE_ELEM_COUNT_LOW_BIT))
281  // Depending on what the previous state was, we need to perform different
282  // actions.
283  switch (old_state) {
284  default:
285  // we have multiple queued work items: just continue executing them
286  break;
287  case OLD_STATE_WAS(false, 2):
288  case OLD_STATE_WAS(true, 2):
289  // we're down to one queued item: if it's the final list we should do that
290  if (!grpc_closure_list_empty(lock->final_list)) {
291  lock->time_to_execute_final_list = true;
292  }
293  break;
294  case OLD_STATE_WAS(false, 1):
295  // had one count, one unorphaned --> unlocked unorphaned
296  return true;
297  case OLD_STATE_WAS(true, 1):
298  // and one count, one orphaned --> unlocked and orphaned
299  really_destroy(lock);
300  return true;
301  case OLD_STATE_WAS(false, 0):
302  case OLD_STATE_WAS(true, 0):
303  // these values are illegal - representing an already unlocked or
304  // deleted lock
305  GPR_UNREACHABLE_CODE(return true);
306  }
308  return true;
309 }
310 
311 static void enqueue_finally(void* closure, grpc_error_handle error);
312 
316  GPR_ASSERT(lock != nullptr);
318  GPR_INFO, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, closure,
319  grpc_core::ExecCtx::Get()->combiner_data()->active_combiner));
320  if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) {
321  // Using error_data.scratch to store the combiner so that it can be accessed
322  // in enqueue_finally.
323  closure->error_data.scratch = reinterpret_cast<uintptr_t>(lock);
325  return;
326  }
327 
328  if (grpc_closure_list_empty(lock->final_list)) {
330  }
332 }
333 
335  grpc_closure* cl = static_cast<grpc_closure*>(closure);
336  grpc_core::Combiner* lock =
337  reinterpret_cast<grpc_core::Combiner*>(cl->error_data.scratch);
338  cl->error_data.scratch = 0;
340 }
341 
342 namespace grpc_core {
344  combiner_exec(this, closure, error);
345 }
346 
349 }
350 } // namespace grpc_core
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_core::ManualConstructor::get
Type * get()
Definition: manual_constructor.h:110
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
gpr_atm_no_barrier_load
#define gpr_atm_no_barrier_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:53
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
gpr_atm_full_fetch_add
#define gpr_atm_full_fetch_add(p, delta)
Definition: impl/codegen/atm_gcc_atomic.h:62
log.h
gpr_atm_no_barrier_store
#define gpr_atm_no_barrier_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:56
mpscq.h
grpc_core::MultiProducerSingleConsumerQueue::Pop
Node * Pop()
Definition: mpscq.cc:37
grpc_core::ExecCtx::combiner_data
CombinerData * combiner_data()
Definition: exec_ctx.h:143
grpc_core::Combiner::refs
gpr_refcount refs
Definition: combiner.h:53
grpc_closure::cb
grpc_iomgr_cb_func cb
Definition: closure.h:68
iomgr_internal.h
grpc_core::Executor::IsThreadedDefault
static bool IsThreadedDefault()
Definition: executor.cc:442
grpc_core
Definition: call_metric_recorder.h:31
string.h
grpc_core::internal::StatusAllocHeapPtr
uintptr_t StatusAllocHeapPtr(absl::Status s)
Definition: status_helper.cc:421
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_core::ExecCtx::CombinerData::last_combiner
Combiner * last_combiner
Definition: exec_ctx.h:139
start_destroy
static void start_destroy(grpc_core::Combiner *lock)
Definition: combiner.cc:70
grpc_closure::mpscq_node
grpc_core::ManualConstructor< grpc_core::MultiProducerSingleConsumerQueue::Node > mpscq_node
Definition: closure.h:63
grpc_core::Combiner::next_combiner_on_this_exec_ctx
Combiner * next_combiner_on_this_exec_ctx
Definition: combiner.h:39
GRPC_CLOSURE_CREATE
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
Definition: closure.h:160
grpc_iomgr_platform_is_any_background_poller_thread
bool grpc_iomgr_platform_is_any_background_poller_thread()
Definition: iomgr_internal.cc:45
grpc_core::Executor::Run
static void Run(grpc_closure *closure, grpc_error_handle error, ExecutorType executor_type=ExecutorType::DEFAULT, ExecutorJobType job_type=ExecutorJobType::SHORT)
Definition: executor.cc:398
grpc_closure_list_append
bool grpc_closure_list_append(grpc_closure_list *closure_list, grpc_closure *closure)
Definition: closure.h:176
grpc_core::Combiner::queue
MultiProducerSingleConsumerQueue queue
Definition: combiner.h:40
push_last_on_exec_ctx
static void push_last_on_exec_ctx(grpc_core::Combiner *lock)
Definition: combiner.cc:105
grpc_combiner_create
grpc_core::Combiner * grpc_combiner_create(void)
Definition: combiner.cc:54
c
void c(T a)
Definition: miscompile_with_no_unique_address_test.cc:40
grpc_combiner_ref
grpc_core::Combiner * grpc_combiner_ref(grpc_core::Combiner *lock GRPC_COMBINER_DEBUG_ARGS)
Definition: combiner.cc:98
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_closure::next_data
union grpc_closure::@14 next_data
grpc_closure_list::head
grpc_closure * head
Definition: closure.h:42
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_closure::error
uintptr_t error
Definition: closure.h:75
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
gpr_atm_acq_load
#define gpr_atm_acq_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:52
grpc_closure::cb_arg
void * cb_arg
Definition: closure.h:71
grpc_core::MultiProducerSingleConsumerQueue::Push
bool Push(Node *node)
Definition: mpscq.cc:29
STATE_UNORPHANED
#define STATE_UNORPHANED
Definition: combiner.cc:43
grpc_closure::scratch
uintptr_t scratch
Definition: closure.h:64
grpc_combiner_trace
grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner")
grpc_combiner_continue_exec_ctx
bool grpc_combiner_continue_exec_ctx()
Definition: combiner.cc:181
arg
Definition: cmdline.cc:40
grpc_core::Combiner::Run
void Run(grpc_closure *closure, grpc_error_handle error)
Definition: combiner.cc:343
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
queue_offload
static void queue_offload(grpc_core::Combiner *lock)
Definition: combiner.cc:175
uintptr_t
_W64 unsigned int uintptr_t
Definition: stdint-msvc2008.h:119
grpc_closure_list_init
void grpc_closure_list_init(grpc_closure_list *closure_list)
Definition: closure.h:170
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
grpc_core::Combiner::initiating_exec_ctx_or_null
gpr_atm initiating_exec_ctx_or_null
Definition: combiner.h:45
grpc_core::Combiner::state
gpr_atm state
Definition: combiner.h:49
n
int n
Definition: abseil-cpp/absl/container/btree_test.cc:1080
GRPC_COMBINER_DEBUG_SPAM
#define GRPC_COMBINER_DEBUG_SPAM(op, delta)
Definition: combiner.cc:80
executor.h
grpc_core::TraceFlag
Definition: debug/trace.h:63
push_first_on_exec_ctx
static void push_first_on_exec_ctx(grpc_core::Combiner *lock)
Definition: combiner.cc:118
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
really_destroy
static void really_destroy(grpc_core::Combiner *lock)
Definition: combiner.cc:64
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
grpc_closure::error_data
union grpc_closure::@15 error_data
GRPC_COMBINER_TRACE
#define GRPC_COMBINER_TRACE(fn)
Definition: combiner.cc:36
alloc.h
grpc_core::MultiProducerSingleConsumerQueue::Node
Definition: mpscq.h:38
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
grpc_core::Combiner::time_to_execute_final_list
bool time_to_execute_final_list
Definition: combiner.h:50
grpc_core::Combiner::final_list
grpc_closure_list final_list
Definition: combiner.h:51
grpc_core::Combiner::FinallyRun
void FinallyRun(grpc_closure *closure, grpc_error_handle error)
Definition: combiner.cc:347
combiner.h
arg
struct arg arg
closure
Definition: proxy.cc:59
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::internal::StatusMoveFromHeapPtr
absl::Status StatusMoveFromHeapPtr(uintptr_t ptr)
Move the status from a heap ptr. (GetFrom & FreeHeap)
Definition: status_helper.cc:440
grpc_core::Combiner
Definition: combiner.h:34
move_next
static void move_next()
Definition: combiner.cc:160
grpc_core::ExecCtx::CombinerData::active_combiner
Combiner * active_combiner
Definition: exec_ctx.h:137
grpc_core::Combiner::offload
grpc_closure offload
Definition: combiner.h:52
GRPC_COMBINER_DEBUG_ARGS
#define GRPC_COMBINER_DEBUG_ARGS
Definition: combiner.h:67
combiner_finally_exec
static void combiner_finally_exec(grpc_core::Combiner *lock, grpc_closure *closure, grpc_error_handle error)
Definition: combiner.cc:313
combiner_exec
static void combiner_exec(grpc_core::Combiner *lock, grpc_closure *closure, grpc_error_handle error)
Definition: combiner.cc:127
gpr_ref_init
GPRAPI void gpr_ref_init(gpr_refcount *r, int n)
Definition: sync.cc:86
grpc_closure::scheduled
bool scheduled
Definition: closure.h:82
gpr_unref
GPRAPI int gpr_unref(gpr_refcount *r)
Definition: sync.cc:103
grpc_error
Definition: error_internal.h:42
enqueue_finally
static void enqueue_finally(void *closure, grpc_error_handle error)
Definition: combiner.cc:334
grpc_closure_list_empty
bool grpc_closure_list_empty(grpc_closure_list closure_list)
Definition: closure.h:243
STATE_ELEM_COUNT_LOW_BIT
#define STATE_ELEM_COUNT_LOW_BIT
Definition: combiner.cc:44
offload
static void offload(void *arg, grpc_error_handle error)
Definition: combiner.cc:170
grpc_closure
Definition: closure.h:56
grpc_combiner_unref
void grpc_combiner_unref(grpc_core::Combiner *lock GRPC_COMBINER_DEBUG_ARGS)
Definition: combiner.cc:91
gpr_ref
GPRAPI void gpr_ref(gpr_refcount *r)
Definition: sync.cc:88
OLD_STATE_WAS
#define OLD_STATE_WAS(orphaned, elem_count)
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:50