bm_cq_multiple_threads.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <string.h>
20 
21 #include <atomic>
22 
23 #include <benchmark/benchmark.h>
24 
25 #include <grpc/grpc.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 
36 
37 struct grpc_pollset {
39 };
40 
41 static gpr_mu g_mu;
42 static gpr_cv g_cv;
43 static int g_threads_active;
44 static bool g_active;
45 
46 namespace grpc {
47 namespace testing {
49 
52 }
53 
54 static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
55  gpr_mu_init(&ps->mu);
56  *mu = &ps->mu;
57 }
58 
59 static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
60 
62  grpc_pollset_worker* /*worker*/) {
63  return GRPC_ERROR_NONE;
64 }
65 
66 /* Callback when the tag is dequeued from the completion queue. Does nothing */
67 static void cq_done_cb(void* /*done_arg*/, grpc_cq_completion* cq_completion) {
68  gpr_free(cq_completion);
69 }
70 
71 /* Queues a completion tag if deadline is > 0.
72  * Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC)) */
74  grpc_pollset_worker** /*worker*/,
75  grpc_core::Timestamp deadline) {
76  if (deadline == grpc_core::Timestamp::ProcessEpoch()) {
77  gpr_log(GPR_DEBUG, "no-op");
78  return GRPC_ERROR_NONE;
79  }
80 
81  gpr_mu_unlock(&ps->mu);
82 
83  void* tag = reinterpret_cast<void*>(10); // Some random number
86  g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
87  static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
89  gpr_mu_lock(&ps->mu);
90  return GRPC_ERROR_NONE;
91 }
92 
95  memset(&vtable, 0, sizeof(vtable));
96 
97  vtable.pollset_size = sizeof(grpc_pollset);
98  vtable.pollset_init = pollset_init;
99  vtable.pollset_shutdown = pollset_shutdown;
100  vtable.pollset_destroy = pollset_destroy;
101  vtable.pollset_work = pollset_work;
102  vtable.pollset_kick = pollset_kick;
103  vtable.is_any_background_poller_thread = [] { return false; };
104  vtable.add_closure_to_background_poller = [](grpc_closure* /*closure*/,
105  grpc_error_handle /*error*/) {
106  return false;
107  };
108  vtable.shutdown_background_closure = [] {};
109  vtable.shutdown_engine = [] {};
110  vtable.check_engine_available = [](bool) { return true; };
111  vtable.init_engine = [] {};
112  vtable.name = name;
113 
114  return vtable;
115 }
116 
117 static void setup() {
118  grpc_init();
119  GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
120  strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
121  0);
122 
124 }
125 
126 static void teardown() {
128 
129  /* Drain any events */
131  while (grpc_completion_queue_next(g_cq, deadline, nullptr).type !=
133  /* Do nothing */
134  }
135 
137  grpc_shutdown();
138 }
139 
140 /* A few notes about Multi-threaded benchmarks:
141 
142  Setup:
143  The benchmark framework ensures that none of the threads proceed beyond the
144  state.KeepRunning() call unless all the threads have called state.keepRunning
145  at least once. So it is safe to do the initialization in one of the threads
146  before state.KeepRunning() is called.
147 
148  Teardown:
149  The benchmark framework also ensures that no thread is running the benchmark
150  code (i.e the code between two successive calls of state.KeepRunning()) if
151  state.KeepRunning() returns false. So it is safe to do the teardown in one
152  of the threads after state.keepRunning() returns false.
153 
154  However, our use requires synchronization because we do additional work at
155  each thread that requires specific ordering (TrackCounters must be constructed
156  after grpc_init because it needs the number of cores, initialized by grpc,
157  and its Finish call must take place before grpc_shutdown so that it can use
158  grpc_stats).
159 */
162  auto thd_idx = state.thread_index();
163 
164  gpr_mu_lock(&g_mu);
166  if (thd_idx == 0) {
167  setup();
168  g_active = true;
170  } else {
171  while (!g_active) {
172  gpr_cv_wait(&g_cv, &g_mu, deadline);
173  }
174  }
176 
177  // Use a TrackCounters object to monitor the gRPC performance statistics
178  // (optionally including low-level counters) before and after the test
179  TrackCounters track_counters;
180 
181  for (auto _ : state) {
182  GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
184  }
185 
186  state.SetItemsProcessed(state.iterations());
187  track_counters.Finish(state);
188 
189  gpr_mu_lock(&g_mu);
191  if (g_threads_active == 0) {
193  } else {
194  while (g_threads_active > 0) {
195  gpr_cv_wait(&g_cv, &g_mu, deadline);
196  }
197  }
199 
200  if (thd_idx == 0) {
201  teardown();
202  g_active = false;
203  }
204 }
205 
206 BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
207 
208 namespace {
209 const grpc_event_engine_vtable g_none_vtable =
211 const grpc_event_engine_vtable g_bm_vtable =
212  grpc::testing::make_engine_vtable("bm_cq_multiple_threads");
213 } // namespace
214 
215 } // namespace testing
216 } // namespace grpc
217 
218 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
219 // and others do not. This allows us to support both modes.
220 namespace benchmark {
222 } // namespace benchmark
223 
224 int main(int argc, char** argv) {
225  // This test should only ever be run with a non or any polling engine
226  // Override the polling engine for the non-polling engine
227  // and add a custom polling engine
228  grpc_register_event_engine_factory(&grpc::testing::g_none_vtable, false);
229  grpc_register_event_engine_factory(&grpc::testing::g_bm_vtable, true);
230  grpc::testing::TestEnvironment env(&argc, argv);
231  gpr_mu_init(&g_mu);
232  gpr_cv_init(&g_cv);
233  ::benchmark::Initialize(&argc, argv);
234  grpc::testing::InitTest(&argc, &argv, false);
236  return 0;
237 }
grpc_pollset_worker
struct grpc_pollset_worker grpc_pollset_worker
Definition: pollset.h:39
grpc::testing::InitTest
void InitTest(int *argc, char ***argv, bool remove_flags)
Definition: test_config_cc.cc:28
testing
Definition: aws_request_signer_test.cc:25
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
vtable
static const grpc_transport_vtable vtable
Definition: binder_transport.cc:680
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc::testing::pollset_work
static grpc_error_handle pollset_work(grpc_pollset *ps, grpc_pollset_worker **, grpc_core::Timestamp deadline)
Definition: bm_cq_multiple_threads.cc:73
log.h
grpc_pollset::mu
gpr_mu mu
Definition: bm_cq_multiple_threads.cc:38
bool
bool
Definition: setup_once.h:312
benchmark
Definition: bm_alarm.cc:55
generate.env
env
Definition: generate.py:37
grpc::gpr_free
gpr_free(creds_file_name)
memset
return memset(p, 0, total)
grpc
Definition: grpcpp/alarm.h:33
gpr_time_0
GPRAPI gpr_timespec gpr_time_0(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:47
gpr_cv
pthread_cond_t gpr_cv
Definition: impl/codegen/sync_posix.h:48
string.h
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
grpc_register_event_engine_factory
void grpc_register_event_engine_factory(const grpc_event_engine_vtable *vtable, bool add_at_head)
grpc_event_engine_vtable
Definition: ev_posix.h:46
GRPC_QUEUE_SHUTDOWN
@ GRPC_QUEUE_SHUTDOWN
Definition: grpc_types.h:554
GRPC_OP_COMPLETE
@ GRPC_OP_COMPLETE
Definition: grpc_types.h:558
completion_queue.h
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
grpc_cq_completion
Definition: src/core/lib/surface/completion_queue.h:43
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc_cq_end_op
void grpc_cq_end_op(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:894
setup.name
name
Definition: setup.py:542
benchmark::RunTheBenchmarksNamespaced
void RunTheBenchmarksNamespaced()
Definition: bm_alarm.cc:56
grpc_get_poll_strategy_name
const char * grpc_get_poll_strategy_name()
TrackCounters::Finish
virtual void Finish(benchmark::State &state)
Definition: helpers.cc:44
g_cv
static gpr_cv g_cv
Definition: bm_cq_multiple_threads.cc:42
grpc::testing::cq_done_cb
static void cq_done_cb(void *, grpc_cq_completion *cq_completion)
Definition: bm_cq_multiple_threads.cc:67
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
TrackCounters
Definition: helpers.h:51
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
benchmark::RunSpecifiedBenchmarks
size_t RunSpecifiedBenchmarks()
Definition: benchmark/src/benchmark.cc:437
grpc::testing::g_cq
static grpc_completion_queue * g_cq
Definition: bm_cq_multiple_threads.cc:48
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
ev_posix.h
grpc_core::ExecCtx::Flush
bool Flush()
Definition: exec_ctx.cc:69
g_active
static bool g_active
Definition: bm_cq_multiple_threads.cc:44
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
grpc_completion_queue
Definition: completion_queue.cc:347
g_mu
static gpr_mu g_mu
Definition: bm_cq_multiple_threads.cc:41
grpc.h
gmock_output_test._
_
Definition: bloaty/third_party/googletest/googlemock/test/gmock_output_test.py:175
gpr_cv_wait
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
time.h
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
benchmark::Initialize
void Initialize(int *argc, char **argv)
Definition: benchmark/src/benchmark.cc:602
grpc_pollset
struct grpc_pollset grpc_pollset
Definition: pollset.h:38
helpers.h
grpc_core::Timestamp::ProcessEpoch
static constexpr Timestamp ProcessEpoch()
Definition: src/core/lib/gprpp/time.h:77
g_threads_active
static int g_threads_active
Definition: bm_cq_multiple_threads.cc:43
grpc::testing::BM_Cq_Throughput
static void BM_Cq_Throughput(benchmark::State &state)
Definition: bm_cq_multiple_threads.cc:160
grpc::testing::tag
static void * tag(intptr_t t)
Definition: h2_ssl_cert_test.cc:263
test_config.h
grpc_cq_begin_op
bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:672
grpc::testing::BENCHMARK
static const int BENCHMARK
Definition: inproc_sync_unary_ping_pong_test.cc:35
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
grpc_completion_queue_destroy
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq)
Definition: completion_queue.cc:1424
port.h
benchmark::State
Definition: benchmark/include/benchmark/benchmark.h:503
alloc.h
grpc::testing::pollset_destroy
static void pollset_destroy(grpc_pollset *ps)
Definition: bm_cq_multiple_threads.cc:59
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
grpc_completion_queue_next
GRPCAPI grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1133
grpc_completion_queue_shutdown
GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
Definition: completion_queue.cc:1416
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
closure
Definition: proxy.cc:59
grpc::testing::pollset_shutdown
static void pollset_shutdown(grpc_pollset *, grpc_closure *closure)
Definition: bm_cq_multiple_threads.cc:50
test_config.h
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
gpr_cv_broadcast
GPRAPI void gpr_cv_broadcast(gpr_cv *cv)
grpc::testing::make_engine_vtable
static grpc_event_engine_vtable make_engine_vtable(const char *name)
Definition: bm_cq_multiple_threads.cc:93
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
grpc_completion_queue_create_for_next
GRPCAPI grpc_completion_queue * grpc_completion_queue_create_for_next(void *reserved)
Definition: completion_queue_factory.cc:62
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
main
int main(int argc, char **argv)
Definition: bm_cq_multiple_threads.cc:224
gpr_timespec
Definition: gpr_types.h:50
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
grpc_error
Definition: error_internal.h:42
grpc::testing::pollset_kick
static grpc_error_handle pollset_kick(grpc_pollset *, grpc_pollset_worker *)
Definition: bm_cq_multiple_threads.cc:61
grpc_transport_vtable::name
const char * name
Definition: transport_impl.h:43
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_closure
Definition: closure.h:56
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
grpc::testing::setup
static void setup()
Definition: bm_cq_multiple_threads.cc:117
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
gpr_cv_init
GPRAPI void gpr_cv_init(gpr_cv *cv)
grpc::testing::teardown
static void teardown()
Definition: bm_cq_multiple_threads.cc:126
grpc::testing::mu
static gpr_mu mu
Definition: bm_cq.cc:162
grpc::testing::pollset_init
static void pollset_init(grpc_pollset *ps, gpr_mu **mu)
Definition: bm_cq_multiple_threads.cc:54


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:48