bm_threadpool.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <condition_variable>
20 #include <mutex>
21 
22 #include <benchmark/benchmark.h>
23 
24 #include <grpc/grpc.h>
25 
30 
31 namespace grpc {
32 namespace testing {
33 
34 // This helper class allows a thread to block for a pre-specified number of
35 // actions. BlockingCounter has an initial non-negative count on initialization.
36 // Each call to DecrementCount will decrease the count by 1. When making a call
37 // to Wait, if the count is greater than 0, the thread will be blocked, until
38 // the count reaches 0.
40  public:
41  explicit BlockingCounter(int count) : count_(count) {}
42  void DecrementCount() {
43  std::lock_guard<std::mutex> l(mu_);
44  count_--;
45  if (count_ == 0) cv_.notify_all();
46  }
47 
48  void Wait() {
49  std::unique_lock<std::mutex> l(mu_);
50  while (count_ > 0) {
51  cv_.wait(l);
52  }
53  }
54 
55  private:
56  int count_;
58  std::condition_variable cv_;
59 };
60 
61 // This is a functor/closure class for threadpool microbenchmark.
62 // This functor (closure) class will add another functor into pool if the
63 // number passed in (num_add) is greater than 0. Otherwise, it will decrement
64 // the counter to indicate that task is finished. This functor will suicide at
65 // the end, therefore, no need for caller to do clean-ups.
67  public:
69  int num_add)
70  : pool_(pool), counter_(counter), num_add_(num_add) {
72  inlineable = false;
73  internal_next = this;
74  internal_success = 0;
75  }
76  // When the functor gets to run in thread pool, it will take itself as first
77  // argument and internal_success as second one.
78  static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
79  auto* callback = static_cast<AddAnotherFunctor*>(cb);
80  if (--callback->num_add_ > 0) {
81  callback->pool_->Add(new AddAnotherFunctor(
82  callback->pool_, callback->counter_, callback->num_add_));
83  } else {
84  callback->counter_->DecrementCount();
85  }
86  // Suicides.
87  delete callback;
88  }
89 
90  private:
93  int num_add_;
94 };
95 
96 template <int kConcurrentFunctor>
98  const int num_iterations = state.range(0);
99  const int num_threads = state.range(1);
100  // Number of adds done by each closure.
101  const int num_add = num_iterations / kConcurrentFunctor;
103  while (state.KeepRunningBatch(num_iterations)) {
104  BlockingCounter counter(kConcurrentFunctor);
105  for (int i = 0; i < kConcurrentFunctor; ++i) {
106  pool.Add(new AddAnotherFunctor(&pool, &counter, num_add));
107  }
108  counter.Wait();
109  }
110  state.SetItemsProcessed(state.iterations());
111 }
112 
113 // First pair of arguments is range for number of iterations (num_iterations).
114 // Second pair of arguments is range for thread pool size (num_threads).
115 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 1)->RangePair(524288, 524288, 1, 1024);
116 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 4)->RangePair(524288, 524288, 1, 1024);
117 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 8)->RangePair(524288, 524288, 1, 1024);
119  ->RangePair(524288, 524288, 1, 1024);
121  ->RangePair(524288, 524288, 1, 1024);
123  ->RangePair(524288, 524288, 1, 1024);
125  ->RangePair(524288, 524288, 1, 1024);
127  ->RangePair(524288, 524288, 1, 1024);
129  ->RangePair(524288, 524288, 1, 1024);
130 
131 // A functor class that will delete self on end of running.
133  public:
136  inlineable = false;
137  internal_next = this;
138  internal_success = 0;
139  }
140 
141  static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
142  // On running, the first argument would be itself.
143  auto* callback = static_cast<SuicideFunctorForAdd*>(cb);
144  callback->counter_->DecrementCount();
145  delete callback;
146  }
147 
148  private:
150 };
151 
152 // Performs the scenario of external thread(s) adding closures into pool.
154  static grpc_core::ThreadPool* external_add_pool = nullptr;
155  int thread_idx = state.thread_index();
156  // Setup for each run of test.
157  if (thread_idx == 0) {
158  const int num_threads = state.range(1);
159  external_add_pool = new grpc_core::ThreadPool(num_threads);
160  }
161  const int num_iterations = state.range(0) / state.threads();
162  while (state.KeepRunningBatch(num_iterations)) {
163  BlockingCounter counter(num_iterations);
164  for (int i = 0; i < num_iterations; ++i) {
165  external_add_pool->Add(new SuicideFunctorForAdd(&counter));
166  }
167  counter.Wait();
168  }
169 
170  // Teardown at the end of each test run.
171  if (thread_idx == 0) {
172  state.SetItemsProcessed(state.range(0));
173  delete external_add_pool;
174  }
175 }
177  // First pair is range for number of iterations (num_iterations).
178  // Second pair is range for thread pool size (num_threads).
179  ->RangePair(524288, 524288, 1, 1024)
180  ->ThreadRange(1, 256); // Concurrent external thread(s) up to 256
181 
182 // Functor (closure) that adds itself into pool repeatedly. By adding self, the
183 // overhead would be low and can measure the time of add more accurately.
185  public:
187  int num_add)
188  : pool_(pool), counter_(counter), num_add_(num_add) {
190  inlineable = false;
191  internal_next = this;
192  internal_success = 0;
193  }
194  // When the functor gets to run in thread pool, it will take itself as first
195  // argument and internal_success as second one.
196  static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
197  auto* callback = static_cast<AddSelfFunctor*>(cb);
198  if (--callback->num_add_ > 0) {
199  callback->pool_->Add(cb);
200  } else {
201  callback->counter_->DecrementCount();
202  // Suicides.
203  delete callback;
204  }
205  }
206 
207  private:
210  int num_add_;
211 };
212 
213 template <int kConcurrentFunctor>
215  const int num_iterations = state.range(0);
216  const int num_threads = state.range(1);
217  // Number of adds done by each closure.
218  const int num_add = num_iterations / kConcurrentFunctor;
220  while (state.KeepRunningBatch(num_iterations)) {
221  BlockingCounter counter(kConcurrentFunctor);
222  for (int i = 0; i < kConcurrentFunctor; ++i) {
223  pool.Add(new AddSelfFunctor(&pool, &counter, num_add));
224  }
225  counter.Wait();
226  }
227  state.SetItemsProcessed(state.iterations());
228 }
229 
230 // First pair of arguments is range for number of iterations (num_iterations).
231 // Second pair of arguments is range for thread pool size (num_threads).
232 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 1)->RangePair(524288, 524288, 1, 1024);
233 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 4)->RangePair(524288, 524288, 1, 1024);
234 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 8)->RangePair(524288, 524288, 1, 1024);
235 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 16)->RangePair(524288, 524288, 1, 1024);
236 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 32)->RangePair(524288, 524288, 1, 1024);
237 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 64)->RangePair(524288, 524288, 1, 1024);
238 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 128)->RangePair(524288, 524288, 1, 1024);
239 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 512)->RangePair(524288, 524288, 1, 1024);
240 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 2048)->RangePair(524288, 524288, 1, 1024);
241 
242 #if defined(__GNUC__) && !defined(SWIG)
243 #if defined(__i386__) || defined(__x86_64__)
244 #define CACHELINE_SIZE 64
245 #elif defined(__powerpc64__)
246 #define CACHELINE_SIZE 128
247 #elif defined(__aarch64__)
248 #define CACHELINE_SIZE 64
249 #elif defined(__arm__)
250 #if defined(__ARM_ARCH_5T__)
251 #define CACHELINE_SIZE 32
252 #elif defined(__ARM_ARCH_7A__)
253 #define CACHELINE_SIZE 64
254 #endif
255 #endif
256 #ifndef CACHELINE_SIZE
257 #define CACHELINE_SIZE 64
258 #endif
259 #endif
260 
261 // A functor (closure) that simulates closures with small but non-trivial amount
262 // of work.
264  public:
266 
269  inlineable = false;
270  internal_next = this;
271  internal_success = 0;
272  val_ = 0;
273  }
274  static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
275  auto* callback = static_cast<ShortWorkFunctorForAdd*>(cb);
276  // Uses pad to avoid compiler complaining unused variable error.
277  callback->pad[0] = 0;
278  for (int i = 0; i < 1000; ++i) {
279  callback->val_++;
280  }
281  callback->counter_->DecrementCount();
282  }
283 
284  private:
285  char pad[CACHELINE_SIZE];
286  volatile int val_;
287 };
288 
289 // Simulates workloads where many short running callbacks are added to the
290 // threadpool. The callbacks are not enough to keep all the workers busy
291 // continuously so the number of workers running changes overtime.
292 //
293 // In effect this tests how well the threadpool avoids spurious wakeups.
295  const int num_threads = state.range(0);
296 
297  const int kNumSpikes = 1000;
298  const int batch_size = 3 * num_threads;
299  std::vector<ShortWorkFunctorForAdd> work_vector(batch_size);
301  while (state.KeepRunningBatch(kNumSpikes * batch_size)) {
302  for (int i = 0; i != kNumSpikes; ++i) {
303  BlockingCounter counter(batch_size);
304  for (auto& w : work_vector) {
305  w.counter_ = &counter;
306  pool.Add(&w);
307  }
308  counter.Wait();
309  }
310  }
311  state.SetItemsProcessed(state.iterations() * batch_size);
312 }
313 BENCHMARK(BM_SpikyLoad)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16);
314 
315 } // namespace testing
316 } // namespace grpc
317 
318 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
319 // and others do not. This allows us to support both modes.
320 namespace benchmark {
322 } // namespace benchmark
323 
324 int main(int argc, char* argv[]) {
325  grpc::testing::TestEnvironment env(&argc, argv);
326  LibraryInitializer libInit;
327  ::benchmark::Initialize(&argc, argv);
328  grpc::testing::InitTest(&argc, &argv, false);
330  return 0;
331 }
grpc::testing::InitTest
void InitTest(int *argc, char ***argv, bool remove_flags)
Definition: test_config_cc.cc:28
grpc::testing::AddAnotherFunctor::AddAnotherFunctor
AddAnotherFunctor(grpc_core::ThreadPool *pool, BlockingCounter *counter, int num_add)
Definition: bm_threadpool.cc:68
testing
Definition: aws_request_signer_test.cc:25
grpc::testing::AddSelfFunctor::pool_
grpc_core::ThreadPool * pool_
Definition: bm_threadpool.cc:208
grpc::testing::BM_SpikyLoad
static void BM_SpikyLoad(benchmark::State &state)
Definition: bm_threadpool.cc:294
benchmark
Definition: bm_alarm.cc:55
generate.env
env
Definition: generate.py:37
grpc::testing::SuicideFunctorForAdd::SuicideFunctorForAdd
SuicideFunctorForAdd(BlockingCounter *counter)
Definition: bm_threadpool.cc:134
grpc
Definition: grpcpp/alarm.h:33
mutex
static uv_mutex_t mutex
Definition: threadpool.c:34
grpc::testing::AddSelfFunctor::Run
static void Run(grpc_completion_queue_functor *cb, int)
Definition: bm_threadpool.cc:196
grpc_completion_queue_functor::internal_success
int internal_success
Definition: grpc_types.h:785
main
int main(int argc, char *argv[])
Definition: bm_threadpool.cc:324
grpc::testing::BlockingCounter::DecrementCount
void DecrementCount()
Definition: bm_threadpool.cc:42
benchmark::RunTheBenchmarksNamespaced
void RunTheBenchmarksNamespaced()
Definition: bm_alarm.cc:56
grpc::testing::ShortWorkFunctorForAdd::pad
char pad[CACHELINE_SIZE]
Definition: bm_threadpool.cc:285
grpc::testing::ThreadPoolAddSelf
static void ThreadPoolAddSelf(benchmark::State &state)
Definition: bm_threadpool.cc:214
grpc::testing::BM_ThreadPoolExternalAdd
static void BM_ThreadPoolExternalAdd(benchmark::State &state)
Definition: bm_threadpool.cc:153
grpc::testing::BENCHMARK_TEMPLATE
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess, NoOpMutator, NoOpMutator) -> Apply(StreamingPingPongMsgSizeArgs)
grpc::testing::SuicideFunctorForAdd::counter_
BlockingCounter * counter_
Definition: bm_threadpool.cc:149
grpc::testing::ShortWorkFunctorForAdd
Definition: bm_threadpool.cc:263
grpc::testing::ShortWorkFunctorForAdd::ShortWorkFunctorForAdd
ShortWorkFunctorForAdd()
Definition: bm_threadpool.cc:267
grpc::testing::AddAnotherFunctor::pool_
grpc_core::ThreadPool * pool_
Definition: bm_threadpool.cc:91
benchmark::RunSpecifiedBenchmarks
size_t RunSpecifiedBenchmarks()
Definition: benchmark/src/benchmark.cc:437
grpc::testing::AddAnotherFunctor::counter_
BlockingCounter * counter_
Definition: bm_threadpool.cc:92
grpc::testing::BlockingCounter::Wait
void Wait()
Definition: bm_threadpool.cc:48
grpc::testing::BlockingCounter
Definition: bm_threadpool.cc:39
grpc_completion_queue_functor::functor_run
void(* functor_run)(struct grpc_completion_queue_functor *, int)
Definition: grpc_types.h:778
grpc::testing::ThreadPoolAddAnother
static void ThreadPoolAddAnother(benchmark::State &state)
Definition: bm_threadpool.cc:97
grpc::testing::AddSelfFunctor::AddSelfFunctor
AddSelfFunctor(grpc_core::ThreadPool *pool, BlockingCounter *counter, int num_add)
Definition: bm_threadpool.cc:186
threadpool.h
grpc.h
counter
static int counter
Definition: abseil-cpp/absl/flags/reflection_test.cc:131
grpc::testing::AddSelfFunctor::num_add_
int num_add_
Definition: bm_threadpool.cc:210
grpc_completion_queue_functor::internal_next
struct grpc_completion_queue_functor * internal_next
Definition: grpc_types.h:786
benchmark::Initialize
void Initialize(int *argc, char **argv)
Definition: benchmark/src/benchmark.cc:602
grpc::testing::AddSelfFunctor
Definition: bm_threadpool.cc:184
grpc::testing::BlockingCounter::count_
int count_
Definition: bm_threadpool.cc:56
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
grpc::testing::SuicideFunctorForAdd
Definition: bm_threadpool.cc:132
grpc::testing::AddAnotherFunctor
Definition: bm_threadpool.cc:66
grpc::testing::AddAnotherFunctor::Run
static void Run(grpc_completion_queue_functor *cb, int)
Definition: bm_threadpool.cc:78
helpers.h
grpc::testing::SuicideFunctorForAdd::Run
static void Run(grpc_completion_queue_functor *cb, int)
Definition: bm_threadpool.cc:141
LibraryInitializer
Definition: helpers.h:33
test_config.h
grpc::testing::ShortWorkFunctorForAdd::counter_
BlockingCounter * counter_
Definition: bm_threadpool.cc:265
grpc::testing::BENCHMARK
static const int BENCHMARK
Definition: inproc_sync_unary_ping_pong_test.cc:35
grpc::testing::ShortWorkFunctorForAdd::Run
static void Run(grpc_completion_queue_functor *cb, int)
Definition: bm_threadpool.cc:274
count
int * count
Definition: bloaty/third_party/googletest/googlemock/test/gmock_stress_test.cc:96
grpc::testing::BlockingCounter::cv_
std::condition_variable cv_
Definition: bm_threadpool.cc:58
grpc::testing::BlockingCounter::mu_
std::mutex mu_
Definition: bm_threadpool.cc:57
grpc_core::ThreadPool
Definition: src/core/lib/iomgr/executor/threadpool.h:97
benchmark::State
Definition: benchmark/include/benchmark/benchmark.h:503
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
num_threads
static volatile int num_threads
Definition: benchmark-thread.c:30
grpc::testing::ShortWorkFunctorForAdd::val_
volatile int val_
Definition: bm_threadpool.cc:286
grpc::testing::BlockingCounter::BlockingCounter
BlockingCounter(int count)
Definition: bm_threadpool.cc:41
grpc_completion_queue_functor
Definition: grpc_types.h:773
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc_core::ThreadPool::Add
void Add(grpc_completion_queue_functor *closure) override
Definition: threadpool.cc:122
grpc::testing::AddAnotherFunctor::num_add_
int num_add_
Definition: bm_threadpool.cc:93
test_config.h
pool
InternalDescriptorPool * pool
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:807
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
grpc::testing::AddSelfFunctor::counter_
BlockingCounter * counter_
Definition: bm_threadpool.cc:209
grpc_completion_queue_functor::inlineable
int inlineable
Definition: grpc_types.h:782


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:49