call_combiner.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <inttypes.h>
24 
25 #include <grpc/support/log.h>
26 
29 
30 namespace grpc_core {
31 
32 DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");
33 
34 namespace {
35 
36 // grpc_error LSB can be used
37 constexpr intptr_t kErrorBit = 1;
38 
39 grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) {
40  if (cancel_state & kErrorBit) {
41 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
42  return internal::StatusGetFromHeapPtr(cancel_state & ~kErrorBit);
43 #else
44  return reinterpret_cast<grpc_error_handle>(cancel_state & ~kErrorBit);
45 #endif
46  }
47  return GRPC_ERROR_NONE;
48 }
49 
50 } // namespace
51 
55 #ifdef GRPC_TSAN_ENABLED
56  GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
57  grpc_schedule_on_exec_ctx);
58 #endif
59 }
60 
62  if (cancel_state_ & kErrorBit) {
63 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
65 #else
66  GRPC_ERROR_UNREF(reinterpret_cast<grpc_error_handle>(
67  cancel_state_ & ~static_cast<gpr_atm>(kErrorBit)));
68 #endif
69  }
70 }
71 
72 #ifdef GRPC_TSAN_ENABLED
73 void CallCombiner::TsanClosure(void* arg, grpc_error_handle error) {
74  CallCombiner* self = static_cast<CallCombiner*>(arg);
75  // We ref-count the lock, and check if it's already taken.
76  // If it was taken, we should do nothing. Otherwise, we will mark it as
77  // locked. Note that if two different threads try to do this, only one of
78  // them will be able to mark the lock as acquired, while they both run their
79  // callbacks. In such cases (which should never happen for call_combiner),
80  // TSAN will correctly produce an error.
81  //
82  // TODO(soheil): This only covers the callbacks scheduled by
83  // CallCombiner::Start() and CallCombiner::Stop().
84  // If in the future, a callback gets scheduled using other
85  // mechanisms, we will need to add APIs to externally lock
86  // call combiners.
87  RefCountedPtr<TsanLock> lock = self->tsan_lock_;
88  bool prev = false;
89  if (lock->taken.compare_exchange_strong(prev, true)) {
90  TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
91  } else {
92  lock.reset();
93  }
94  Closure::Run(DEBUG_LOCATION, self->original_closure_, GRPC_ERROR_REF(error));
95  if (lock != nullptr) {
96  TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
97  bool prev = true;
98  GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false));
99  }
100 }
101 #endif
102 
105 #ifdef GRPC_TSAN_ENABLED
106  original_closure_ = closure;
107  ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error);
108 #else
110 #endif
111 }
112 
113 #ifndef NDEBUG
114 #define DEBUG_ARGS const char *file, int line,
115 #define DEBUG_FMT_STR "%s:%d: "
116 #define DEBUG_FMT_ARGS , file, line
117 #else
118 #define DEBUG_ARGS
119 #define DEBUG_FMT_STR
120 #define DEBUG_FMT_ARGS
121 #endif
122 
124  DEBUG_ARGS const char* reason) {
125  GPR_TIMER_SCOPE("CallCombiner::Start", 0);
128  "==> CallCombiner::Start() [%p] closure=%p [" DEBUG_FMT_STR
129  "%s] error=%s",
130  this, closure DEBUG_FMT_ARGS, reason,
132  }
133  size_t prev_size =
134  static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)1));
136  gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
137  prev_size + 1);
138  }
140  if (prev_size == 0) {
142  GPR_TIMER_MARK("call_combiner_initiate", 0);
144  gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY");
145  }
146  // Queue was empty, so execute this closure immediately.
148  } else {
150  gpr_log(GPR_INFO, " QUEUING");
151  }
152  // Queue was not empty, so add closure to queue.
153 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
154  closure->error_data.error = internal::StatusAllocHeapPtr(error);
155 #else
156  closure->error_data.error = reinterpret_cast<intptr_t>(error);
157 #endif
158  queue_.Push(
159  reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
160  }
161 }
162 
163 void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
164  GPR_TIMER_SCOPE("CallCombiner::Stop", 0);
166  gpr_log(GPR_INFO, "==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]",
167  this DEBUG_FMT_ARGS, reason);
168  }
169  size_t prev_size =
170  static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1));
172  gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
173  prev_size - 1);
174  }
175  GPR_ASSERT(prev_size >= 1);
176  if (prev_size > 1) {
177  while (true) {
179  gpr_log(GPR_INFO, " checking queue");
180  }
181  bool empty;
183  reinterpret_cast<grpc_closure*>(queue_.PopAndCheckEnd(&empty));
184  if (closure == nullptr) {
185  // This can happen either due to a race condition within the mpscq
186  // code or because of a race with Start().
188  gpr_log(GPR_INFO, " queue returned no result; checking again");
189  }
190  continue;
191  }
192 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
194  internal::StatusMoveFromHeapPtr(closure->error_data.error);
195 #else
197  reinterpret_cast<grpc_error_handle>(closure->error_data.error);
198 #endif
199  closure->error_data.error = 0;
201  gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
203  }
205  break;
206  }
208  gpr_log(GPR_INFO, " queue empty");
209  }
210 }
211 
214  while (true) {
215  // Decode original state.
216  gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
217  grpc_error_handle original_error = DecodeCancelStateError(original_state);
218  // If error is set, invoke the cancellation closure immediately.
219  // Otherwise, store the new closure.
220  if (!GRPC_ERROR_IS_NONE(original_error)) {
223  "call_combiner=%p: scheduling notify_on_cancel callback=%p "
224  "for pre-existing cancellation",
225  this, closure);
226  }
228  break;
229  } else {
230  if (gpr_atm_full_cas(&cancel_state_, original_state,
231  reinterpret_cast<gpr_atm>(closure))) {
233  gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
234  this, closure);
235  }
236  // If we replaced an earlier closure, invoke the original
237  // closure with GRPC_ERROR_NONE. This allows callers to clean
238  // up any resources they may be holding for the callback.
239  if (original_state != 0) {
240  closure = reinterpret_cast<grpc_closure*>(original_state);
243  "call_combiner=%p: scheduling old cancel callback=%p", this,
244  closure);
245  }
247  }
248  break;
249  }
250  }
251  // cas failed, try again.
252  }
253 }
254 
257 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
259  gpr_atm new_state = kErrorBit | status_ptr;
260 #else
261  gpr_atm new_state = kErrorBit | reinterpret_cast<gpr_atm>(error);
262 #endif
263  while (true) {
264  gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
265  grpc_error_handle original_error = DecodeCancelStateError(original_state);
266  if (!GRPC_ERROR_IS_NONE(original_error)) {
267 #ifdef GRPC_ERROR_IS_ABSEIL_STATUS
268  internal::StatusFreeHeapPtr(status_ptr);
269 #else
271 #endif
272  break;
273  }
274  if (gpr_atm_full_cas(&cancel_state_, original_state, new_state)) {
275  if (original_state != 0) {
276  grpc_closure* notify_on_cancel =
277  reinterpret_cast<grpc_closure*>(original_state);
280  "call_combiner=%p: scheduling notify_on_cancel callback=%p",
281  this, notify_on_cancel);
282  }
283  ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, GRPC_ERROR_REF(error));
284  }
285  break;
286  }
287  // cas failed, try again.
288  }
289 }
290 
291 } // namespace grpc_core
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::CallCombiner
Definition: call_combiner.h:50
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
gpr_atm_full_fetch_add
#define gpr_atm_full_fetch_add(p, delta)
Definition: impl/codegen/atm_gcc_atomic.h:62
log.h
gpr_atm_no_barrier_store
#define gpr_atm_no_barrier_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:56
grpc_core::CallCombiner::SetNotifyOnCancel
void SetNotifyOnCancel(grpc_closure *closure)
Definition: call_combiner.cc:212
timers.h
GRPC_STATS_INC_CALL_COMBINER_CANCELLED
#define GRPC_STATS_INC_CALL_COMBINER_CANCELLED()
Definition: stats_data.h:369
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::internal::StatusAllocHeapPtr
uintptr_t StatusAllocHeapPtr(absl::Status s)
Definition: status_helper.cc:421
grpc_core::RefCountedPtr::reset
void reset(T *value=nullptr)
Definition: ref_counted_ptr.h:111
error
grpc_error_handle error
Definition: retry_filter.cc:499
TSAN_ANNOTATE_RWLOCK_RELEASED
#define TSAN_ANNOTATE_RWLOCK_RELEASED(addr, is_w)
Definition: src/core/lib/iomgr/dynamic_annotations.h:63
TSAN_ANNOTATE_RWLOCK_ACQUIRED
#define TSAN_ANNOTATE_RWLOCK_ACQUIRED(addr, is_w)
Definition: src/core/lib/iomgr/dynamic_annotations.h:62
GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED
#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED()
Definition: stats_data.h:363
grpc_core::CallCombiner::Start
void Start(grpc_closure *closure, grpc_error_handle error, const char *file, int line, const char *reason)
Starts processing closure.
Definition: call_combiner.cc:123
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_core::CallCombiner::CallCombiner
CallCombiner()
Definition: call_combiner.cc:52
stats.h
grpc_core::CallCombiner::Cancel
void Cancel(grpc_error_handle error)
Indicates that the call has been cancelled.
Definition: call_combiner.cc:255
GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS
#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS()
Definition: stats_data.h:365
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_core::CallCombiner::ScheduleClosure
void ScheduleClosure(grpc_closure *closure, grpc_error_handle error)
Definition: call_combiner.cc:103
grpc_core::RefCountedPtr
Definition: ref_counted_ptr.h:35
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
grpc_core::CallCombiner::size_
gpr_atm size_
Definition: call_combiner.h:109
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
gpr_atm_acq_load
#define gpr_atm_acq_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:52
grpc_core::internal::StatusGetFromHeapPtr
absl::Status StatusGetFromHeapPtr(uintptr_t ptr)
Get the status from a heap ptr.
Definition: status_helper.cc:432
call_combiner.h
GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL
#define GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL()
Definition: stats_data.h:367
grpc_core::MultiProducerSingleConsumerQueue::Push
bool Push(Node *node)
Definition: mpscq.cc:29
DEBUG_ARGS
#define DEBUG_ARGS
Definition: call_combiner.cc:114
grpc_core::CallCombiner::cancel_state_
gpr_atm cancel_state_
Definition: call_combiner.h:114
arg
Definition: cmdline.cc:40
DEBUG_FMT_STR
#define DEBUG_FMT_STR
Definition: call_combiner.cc:115
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
grpc_core::CallCombiner::Stop
void Stop(const char *file, int line, const char *reason)
Yields the call combiner to the next closure in the queue, if any.
Definition: call_combiner.cc:163
DEBUG_FMT_ARGS
#define DEBUG_FMT_ARGS
Definition: call_combiner.cc:116
grpc_core::CallCombiner::~CallCombiner
~CallCombiner()
Definition: call_combiner.cc:61
grpc_core::internal::StatusFreeHeapPtr
void StatusFreeHeapPtr(uintptr_t ptr)
Frees the allocated status at heap ptr.
Definition: status_helper.cc:427
google_benchmark.example.empty
def empty(state)
Definition: example.py:31
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
grpc_core::MultiProducerSingleConsumerQueue::PopAndCheckEnd
Node * PopAndCheckEnd(bool *empty)
Definition: mpscq.cc:43
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
grpc_core::CallCombiner::queue_
MultiProducerSingleConsumerQueue queue_
Definition: call_combiner.h:110
grpc_core::MultiProducerSingleConsumerQueue::Node
Definition: mpscq.h:38
arg
struct arg arg
grpc_core::grpc_call_combiner_trace
DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner")
Definition: call_combiner.h:48
closure
Definition: proxy.cc:59
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
grpc_core::internal::StatusMoveFromHeapPtr
absl::Status StatusMoveFromHeapPtr(uintptr_t ptr)
Move the status from a heap ptr. (GetFrom & FreeHeap)
Definition: status_helper.cc:440
grpc_error
Definition: error_internal.h:42
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
grpc_closure
Definition: closure.h:56
grpc_core::DebugOnlyTraceFlag
TraceFlag DebugOnlyTraceFlag
Definition: debug/trace.h:117
GPR_TIMER_MARK
#define GPR_TIMER_MARK(tag, important)
Definition: src/core/lib/profiling/timers.h:39
grpc_core::Closure::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: closure.h:250
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
port_platform.h
gpr_atm_full_cas
#define gpr_atm_full_cas(p, o, n)
Definition: impl/codegen/atm_gcc_sync.h:77


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:51