completion_queue_cc.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2015 gRPC authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 #include <vector>
19 
20 #include "absl/base/thread_annotations.h"
21 
22 #include <grpc/grpc.h>
25 #include <grpc/support/cpu.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/sync.h>
28 #include <grpc/support/time.h>
32 
35 #include "src/core/lib/gprpp/thd.h"
36 
37 namespace grpc {
38 namespace {
39 
40 internal::GrpcLibraryInitializer g_gli_initializer;
41 
42 gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT;
43 grpc_core::Mutex* g_callback_alternative_mu;
44 
45 // Implement a ref-counted callback CQ for global use in the alternative
46 // implementation so that its threads are only created once. Do this using
47 // explicit ref-counts and raw pointers rather than a shared-ptr since that
48 // has a non-trivial destructor and thus can't be used for global variables.
49 struct CallbackAlternativeCQ {
50  int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0;
51  CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu);
52  std::vector<grpc_core::Thread>* nexting_threads
53  ABSL_GUARDED_BY(g_callback_alternative_mu);
54 
55  CompletionQueue* Ref() {
56  grpc_core::MutexLock lock(&*g_callback_alternative_mu);
57  refs++;
58  if (refs == 1) {
59  cq = new CompletionQueue;
60  int num_nexting_threads =
61  grpc_core::Clamp(gpr_cpu_num_cores() / 2, 2u, 16u);
62  nexting_threads = new std::vector<grpc_core::Thread>;
63  for (int i = 0; i < num_nexting_threads; i++) {
64  nexting_threads->emplace_back(
65  "nexting_thread",
66  [](void* arg) {
68  static_cast<CompletionQueue*>(arg)->cq();
69  while (true) {
70  // Use the raw Core next function rather than the C++ Next since
71  // Next incorporates FinalizeResult and we actually want that
72  // called from the callback functor itself.
73  // TODO(vjpai): Migrate below to next without a timeout or idle
74  // phase. That's currently starving out some other polling,
75  // though.
77  cq,
80  nullptr);
81  if (ev.type == GRPC_QUEUE_SHUTDOWN) {
82  return;
83  }
84  if (ev.type == GRPC_QUEUE_TIMEOUT) {
88  continue;
89  }
91  // We can always execute the callback inline rather than
92  // pushing it to another Executor thread because this
93  // thread is definitely running on a background thread, does not
94  // hold any application locks before executing the callback,
95  // and cannot be entered recursively.
96  auto* functor =
97  static_cast<grpc_completion_queue_functor*>(ev.tag);
98  functor->functor_run(functor, ev.success);
99  }
100  },
101  cq);
102  }
103  for (auto& th : *nexting_threads) {
104  th.Start();
105  }
106  }
107  return cq;
108  }
109 
110  void Unref() {
111  grpc_core::MutexLock lock(g_callback_alternative_mu);
112  refs--;
113  if (refs == 0) {
114  cq->Shutdown();
115  for (auto& th : *nexting_threads) {
116  th.Join();
117  }
118  delete nexting_threads;
119  delete cq;
120  }
121  }
122 };
123 
124 CallbackAlternativeCQ g_callback_alternative_cq;
125 
126 } // namespace
127 
128 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
129 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
130 // a 'grpc_completion_queue' instance (which is being passed as the input to
131 // this constructor), one must have already called grpc_init().
133  : GrpcLibraryCodegen(false), cq_(take) {
135 }
136 
139 #ifndef NDEBUG
140  if (!ServerListEmpty()) {
142  "CompletionQueue shutdown being shutdown before its server.");
143  }
144 #endif
146 }
147 
149  void** tag, bool* ok, gpr_timespec deadline) {
150  for (;;) {
151  auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
152  switch (ev.type) {
153  case GRPC_QUEUE_TIMEOUT:
154  return TIMEOUT;
155  case GRPC_QUEUE_SHUTDOWN:
156  return SHUTDOWN;
157  case GRPC_OP_COMPLETE:
158  auto core_cq_tag =
159  static_cast<grpc::internal::CompletionQueueTag*>(ev.tag);
160  *ok = ev.success != 0;
161  *tag = core_cq_tag;
162  if (core_cq_tag->FinalizeResult(tag, ok)) {
163  return GOT_EVENT;
164  }
165  break;
166  }
167  }
168 }
169 
172  : cq_(cq), flushed_(false) {
174 }
175 
177  GPR_ASSERT(flushed_);
178 }
179 
181  int res = 0;
182  void* res_tag;
183  flushed_ = true;
185  &res)) {
186  auto core_cq_tag =
187  static_cast<grpc::internal::CompletionQueueTag*>(res_tag);
188  *ok = res == 1;
189  if (core_cq_tag->FinalizeResult(tag, ok)) {
190  return true;
191  }
192  }
193  return false;
194 }
195 
197  gpr_once_init(&g_once_init_callback_alternative,
198  [] { g_callback_alternative_mu = new grpc_core::Mutex(); });
199  return g_callback_alternative_cq.Ref();
200 }
201 
204  (void)cq;
205  // This accesses g_callback_alternative_cq without acquiring the mutex
206  // but it's considered safe because it just reads the pointer address.
207  GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq);
208  g_callback_alternative_cq.Unref();
209 }
210 
211 } // namespace grpc
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
cq_
grpc_completion_queue * cq_
Definition: channel_connectivity.cc:210
gpr_cpu_num_cores
GPRAPI unsigned gpr_cpu_num_cores(void)
grpc::CompletionQueue::SHUTDOWN
@ SHUTDOWN
The completion queue has been shutdown and fully-drained.
Definition: include/grpcpp/impl/codegen/completion_queue.h:125
log.h
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
grpc
Definition: grpcpp/alarm.h:33
gpr_once
pthread_once_t gpr_once
Definition: impl/codegen/sync_posix.h:50
grpc::CompletionQueue::GOT_EVENT
@ GOT_EVENT
Definition: include/grpcpp/impl/codegen/completion_queue.h:126
grpc::CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache
CompletionQueueTLSCache(CompletionQueue *cq)
Definition: completion_queue_cc.cc:170
false
#define false
Definition: setup_once.h:323
grpc::CompletionQueue::CompleteAvalanching
void CompleteAvalanching()
Definition: include/grpcpp/impl/codegen/completion_queue.h:390
grpc::internal::GrpcLibraryInitializer::summon
int summon()
Definition: grpcpp/impl/grpc_library.h:54
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc::g_gli_initializer
static grpc::internal::GrpcLibraryInitializer g_gli_initializer
Definition: channel_cc.cc:52
grpc_completion_queue_thread_local_cache_flush
GRPCAPI int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq, void **tag, int *ok)
Definition: completion_queue.cc:459
grpc::CompletionQueue::CompletionQueueTLSCache::cq_
CompletionQueue * cq_
Definition: include/grpcpp/impl/codegen/completion_queue.h:316
useful.h
GRPC_QUEUE_SHUTDOWN
@ GRPC_QUEUE_SHUTDOWN
Definition: grpc_types.h:554
GRPC_OP_COMPLETE
@ GRPC_OP_COMPLETE
Definition: grpc_types.h:558
grpc::CompletionQueue::NextStatus
NextStatus
Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
Definition: include/grpcpp/impl/codegen/completion_queue.h:124
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
GPR_ONCE_INIT
#define GPR_ONCE_INIT
Definition: impl/codegen/sync_posix.h:52
time.h
grpc::GrpcLibraryCodegen
Classes that require gRPC to be initialized should inherit from this class.
Definition: grpcpp/impl/codegen/grpc_library.h:40
gpr_once_init
GPRAPI void gpr_once_init(gpr_once *once, void(*init_function)(void))
grpc_types.h
grpc_completion_queue_thread_local_cache_init
GRPCAPI void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq)
Definition: completion_queue.cc:452
refs
std::vector< CordRep * > refs
Definition: cordz_info_statistics_test.cc:81
grpc::CompletionQueue::cq_
grpc_completion_queue * cq_
Definition: include/grpcpp/impl/codegen/completion_queue.h:422
grpc::CompletionQueue::cq
grpc_completion_queue * cq()
Definition: include/grpcpp/impl/codegen/completion_queue.h:249
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_completion_queue_functor::functor_run
void(* functor_run)(struct grpc_completion_queue_functor *, int)
Definition: grpc_types.h:778
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
grpc::CompletionQueue::ReleaseCallbackAlternativeCQ
static void ReleaseCallbackAlternativeCQ(CompletionQueue *cq)
Definition: completion_queue_cc.cc:202
grpc::CompletionQueue::CompletionQueue
CompletionQueue()
Definition: include/grpcpp/impl/codegen/completion_queue.h:108
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_completion_queue
Definition: completion_queue.cc:347
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
grpc.h
completion_queue.h
completion_queue_tag.h
cpu.h
grpc::CompletionQueue::CallbackAlternativeCQ
static CompletionQueue * CallbackAlternativeCQ()
Definition: completion_queue_cc.cc:196
arg
Definition: cmdline.cc:40
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
gpr_types.h
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc_core::Clamp
T Clamp(T val, T min, T max)
Definition: useful.h:31
grpc_library.h
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
grpc::CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache
~CompletionQueueTLSCache()
Definition: completion_queue_cc.cc:176
grpc::CompletionQueue::ServerListEmpty
bool ServerListEmpty() const
Definition: include/grpcpp/impl/codegen/completion_queue.h:411
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
grpc::CompletionQueue::CompletionQueueTLSCache::Flush
bool Flush(void **tag, bool *ok)
Definition: completion_queue_cc.cc:180
grpc::CompletionQueue::TIMEOUT
@ TIMEOUT
deadline was reached.
Definition: include/grpcpp/impl/codegen/completion_queue.h:128
grpc::CompletionQueue::Shutdown
void Shutdown()
Definition: completion_queue_cc.cc:137
thd.h
grpc_completion_queue_next
GRPCAPI grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1133
grpc::CompletionQueue::InitialAvalanching
void InitialAvalanching()
Definition: include/grpcpp/impl/codegen/completion_queue.h:383
ok
bool ok
Definition: async_end2end_test.cc:197
grpc_completion_queue_functor
Definition: grpc_types.h:773
arg
struct arg arg
gpr_time_from_millis
GPRAPI gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:119
grpc::CompletionQueue
Definition: include/grpcpp/impl/codegen/completion_queue.h:104
gpr_timespec
Definition: gpr_types.h:50
grpc::internal::CompletionQueueTag
An interface allowing implementors to process and filter event tags.
Definition: grpcpp/impl/codegen/completion_queue_tag.h:28
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
sync.h
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
Definition: grpc_types.h:556
grpc::CompletionQueue::AsyncNextInternal
NextStatus AsyncNextInternal(void **tag, bool *ok, gpr_timespec deadline)
Definition: completion_queue_cc.cc:148
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
sync.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
ABSL_NO_THREAD_SAFETY_ANALYSIS
#define ABSL_NO_THREAD_SAFETY_ANALYSIS
Definition: abseil-cpp/absl/base/thread_annotations.h:280


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:52