completion_queue_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
24 #include <grpc/support/time.h>
25 
31 
32 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
33 
34 static void* create_test_tag(void) {
35  static intptr_t i = 0;
36  return reinterpret_cast<void*>(++i);
37 }
38 
39 /* helper for tests to shutdown correctly and tersely */
41  grpc_event ev;
43 
44  switch (grpc_get_cq_completion_type(cc)) {
45  case GRPC_CQ_NEXT: {
47  nullptr);
49  break;
50  }
51  case GRPC_CQ_PLUCK: {
55  break;
56  }
57  case GRPC_CQ_CALLBACK: {
58  // Nothing to do here. The shutdown callback will be invoked when
59  // possible.
60  break;
61  }
62  default: {
63  gpr_log(GPR_ERROR, "Unknown completion type");
64  break;
65  }
66  }
67 
69 }
70 
71 /* ensure we can create and destroy a completion channel */
72 static void test_no_op(void) {
73  grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
74  grpc_cq_polling_type polling_types[] = {
77  LOG_TEST("test_no_op");
78 
79  attr.version = 1;
80  for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
81  for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
82  attr.cq_completion_type = completion_types[i];
83  attr.cq_polling_type = polling_types[j];
86  }
87  }
88 }
89 
90 static void test_pollset_conversion(void) {
91  grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
96 
97  LOG_TEST("test_pollset_conversion");
98 
99  attr.version = 1;
100  for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
101  for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
102  attr.cq_completion_type = completion_types[i];
103  attr.cq_polling_type = polling_types[j];
106  GPR_ASSERT(grpc_cq_pollset(cq) != nullptr);
108  }
109  }
110 }
111 
112 static void test_wait_empty(void) {
113  grpc_cq_polling_type polling_types[] = {
117  grpc_event event;
118 
119  LOG_TEST("test_wait_empty");
120 
121  attr.version = 1;
122  attr.cq_completion_type = GRPC_CQ_NEXT;
123  for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
124  attr.cq_polling_type = polling_types[i];
127  event =
131  }
132 }
133 
134 static void do_nothing_end_completion(void* /*arg*/,
135  grpc_cq_completion* /*c*/) {}
136 
137 static void test_cq_end_op(void) {
138  grpc_event ev;
141  grpc_cq_polling_type polling_types[] = {
144  void* tag = create_test_tag();
145 
146  LOG_TEST("test_cq_end_op");
147 
148  attr.version = 1;
149  attr.cq_completion_type = GRPC_CQ_NEXT;
150  for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
152  attr.cq_polling_type = polling_types[i];
155 
158  &completion);
159 
161  nullptr);
163  GPR_ASSERT(ev.tag == tag);
164  GPR_ASSERT(ev.success);
165 
167  }
168 }
169 
170 static void test_cq_tls_cache_full(void) {
171  grpc_event ev;
174  grpc_cq_polling_type polling_types[] = {
177  void* tag = create_test_tag();
178  void* res_tag;
179  int ok;
180 
181  LOG_TEST("test_cq_tls_cache_full");
182 
183  attr.version = 1;
184  attr.cq_completion_type = GRPC_CQ_NEXT;
185  for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
186  grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
187  attr.cq_polling_type = polling_types[i];
190 
194  &completion);
195 
197  nullptr);
199 
200  GPR_ASSERT(
202  GPR_ASSERT(res_tag == tag);
203  GPR_ASSERT(ok);
204 
206  nullptr);
208 
210  }
211 }
212 
213 static void test_cq_tls_cache_empty(void) {
215  grpc_cq_polling_type polling_types[] = {
218  void* res_tag;
219  int ok;
220 
221  LOG_TEST("test_cq_tls_cache_empty");
222 
223  attr.version = 1;
224  attr.cq_completion_type = GRPC_CQ_NEXT;
225  for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
226  grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
227  attr.cq_polling_type = polling_types[i];
230 
231  GPR_ASSERT(
234  GPR_ASSERT(
237  }
238 }
239 
241  grpc_cq_polling_type polling_types[] = {
245  grpc_event event;
246  LOG_TEST("test_shutdown_then_next_polling");
247 
248  attr.version = 1;
249  attr.cq_completion_type = GRPC_CQ_NEXT;
250  for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
251  attr.cq_polling_type = polling_types[i];
256  nullptr);
259  }
260 }
261 
263  grpc_cq_polling_type polling_types[] = {
267  grpc_event event;
268  LOG_TEST("test_shutdown_then_next_with_timeout");
269 
270  attr.version = 1;
271  attr.cq_completion_type = GRPC_CQ_NEXT;
272  for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
273  attr.cq_polling_type = polling_types[i];
276 
279  nullptr);
282  }
283 }
284 
285 static void test_pluck(void) {
286  grpc_event ev;
288  void* tags[128];
289  grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
290  grpc_cq_polling_type polling_types[] = {
293  unsigned i, j;
294 
295  LOG_TEST("test_pluck");
296 
297  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
298  tags[i] = create_test_tag();
299  for (j = 0; j < i; j++) {
300  GPR_ASSERT(tags[i] != tags[j]);
301  }
302  }
303 
304  attr.version = 1;
305  attr.cq_completion_type = GRPC_CQ_PLUCK;
306  for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
307  grpc_core::ExecCtx exec_ctx; // reset exec_ctx
308  attr.cq_polling_type = polling_types[pidx];
311 
312  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
315  nullptr, &completions[i]);
316  }
317 
318  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
320  cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
321  GPR_ASSERT(ev.tag == tags[i]);
322  }
323 
324  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
327  nullptr, &completions[i]);
328  }
329 
330  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
333  nullptr);
334  GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
335  }
336 
338  }
339 }
340 
341 static void test_pluck_after_shutdown(void) {
342  grpc_cq_polling_type polling_types[] = {
344  grpc_event ev;
347 
348  LOG_TEST("test_pluck_after_shutdown");
349 
350  attr.version = 1;
351  attr.cq_completion_type = GRPC_CQ_PLUCK;
352  for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
353  attr.cq_polling_type = polling_types[i];
358  cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
361  }
362 }
363 
364 static void test_callback(void) {
366  static void* tags[128];
367  grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
368  grpc_cq_polling_type polling_types[] = {
371  unsigned i;
372  static gpr_mu mu, shutdown_mu;
373  static gpr_cv cv, shutdown_cv;
374  static int cb_counter;
375  gpr_mu_init(&mu);
377  gpr_cv_init(&cv);
379 
380  LOG_TEST("test_callback");
381 
382  bool got_shutdown = false;
383  class ShutdownCallback : public grpc_completion_queue_functor {
384  public:
385  explicit ShutdownCallback(bool* done) : done_(done) {
386  functor_run = &ShutdownCallback::Run;
387  inlineable = false;
388  }
389  ~ShutdownCallback() {}
390  static void Run(grpc_completion_queue_functor* cb, int ok) {
392  *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
393  // Signal when the shutdown callback is completed.
396  }
397 
398  private:
399  bool* done_;
400  };
401  ShutdownCallback shutdown_cb(&got_shutdown);
402 
403  attr.version = 2;
404  attr.cq_completion_type = GRPC_CQ_CALLBACK;
405  attr.cq_shutdown_cb = &shutdown_cb;
406 
407  for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
408  int sumtags = 0;
409  int counter = 0;
410  cb_counter = 0;
411  {
412  // reset exec_ctx types
414  attr.cq_polling_type = polling_types[pidx];
417 
418  class TagCallback : public grpc_completion_queue_functor {
419  public:
420  TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
421  functor_run = &TagCallback::Run;
422  // Inlineable should be false since this callback takes locks.
423  inlineable = false;
424  }
425  ~TagCallback() {}
426  static void Run(grpc_completion_queue_functor* cb, int ok) {
427  GPR_ASSERT(static_cast<bool>(ok));
428  auto* callback = static_cast<TagCallback*>(cb);
429  gpr_mu_lock(&mu);
430  cb_counter++;
431  *callback->counter_ += callback->tag_;
432  if (cb_counter == GPR_ARRAY_SIZE(tags)) {
433  gpr_cv_signal(&cv);
434  }
435  gpr_mu_unlock(&mu);
436  delete callback;
437  };
438 
439  private:
440  int* counter_;
441  int tag_;
442  };
443 
444  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
445  tags[i] = static_cast<void*>(new TagCallback(&counter, i));
446  sumtags += i;
447  }
448 
449  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
452  nullptr, &completions[i]);
453  }
454 
455  gpr_mu_lock(&mu);
456  while (cb_counter != GPR_ARRAY_SIZE(tags)) {
457  // Wait for all the callbacks to complete.
459  }
460  gpr_mu_unlock(&mu);
461 
463 
465  while (!got_shutdown) {
466  // Wait for the shutdown callback to complete.
469  }
471  }
472 
473  // Run the assertions to check if the test ran successfully.
474  GPR_ASSERT(sumtags == counter);
475  GPR_ASSERT(got_shutdown);
476  got_shutdown = false;
477  }
478 
479  gpr_cv_destroy(&cv);
481  gpr_mu_destroy(&mu);
483 }
484 
485 struct thread_state {
487  void* tag;
488 };
489 
490 int main(int argc, char** argv) {
491  grpc::testing::TestEnvironment env(&argc, argv);
492  grpc_init();
493  test_no_op();
495  test_wait_empty();
498  test_cq_end_op();
499  test_pluck();
503  test_callback();
504  grpc_shutdown();
505  return 0;
506 }
gpr_cv_signal
GPRAPI void gpr_cv_signal(gpr_cv *cv)
test_cq_end_op
static void test_cq_end_op(void)
Definition: completion_queue_test.cc:137
iomgr.h
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
do_nothing_end_completion
static void do_nothing_end_completion(void *, grpc_cq_completion *)
Definition: completion_queue_test.cc:134
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
generate.env
env
Definition: generate.py:37
GRPC_CQ_NEXT
@ GRPC_CQ_NEXT
Definition: grpc_types.h:760
main
int main(int argc, char **argv)
Definition: completion_queue_test.cc:490
grpc_cq_polling_type
grpc_cq_polling_type
Definition: grpc_types.h:740
grpc.framework.interfaces.base.utilities.completion
def completion(terminal_metadata, code, message)
Definition: framework/interfaces/base/utilities.py:45
counter_
int counter_
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/tokenizer_unittest.cc:150
gpr_cv
pthread_cond_t gpr_cv
Definition: impl/codegen/sync_posix.h:48
grpc_cq_completion_type
grpc_cq_completion_type
Definition: grpc_types.h:758
grpc_completion_queue_thread_local_cache_flush
GRPCAPI int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq, void **tag, int *ok)
Definition: completion_queue.cc:459
test_pluck_after_shutdown
static void test_pluck_after_shutdown(void)
Definition: completion_queue_test.cc:341
useful.h
GRPC_QUEUE_SHUTDOWN
@ GRPC_QUEUE_SHUTDOWN
Definition: grpc_types.h:554
GRPC_OP_COMPLETE
@ GRPC_OP_COMPLETE
Definition: grpc_types.h:558
completion_queue.h
grpc_cq_completion
Definition: src/core/lib/surface/completion_queue.h:43
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc_cq_end_op
void grpc_cq_end_op(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:894
time.h
test_callback
static void test_callback(void)
Definition: completion_queue_test.cc:364
grpc_completion_queue_create
GRPCAPI grpc_completion_queue * grpc_completion_queue_create(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved)
Definition: completion_queue_factory.cc:84
done_
std::atomic< bool > done_
Definition: fuzzing_event_engine_test.cc:57
grpc::testing::shutdown_mu
static gpr_mu shutdown_mu
Definition: bm_cq.cc:162
GRPC_CQ_NON_POLLING
@ GRPC_CQ_NON_POLLING
Definition: grpc_types.h:754
tags
bool tags[kAvailableTags]
Definition: inproc_callback_test.cc:114
memory.h
grpc::testing::shutdown_cv
static gpr_cv shutdown_cv
Definition: bm_cq.cc:163
test_cq_tls_cache_full
static void test_cq_tls_cache_full(void)
Definition: completion_queue_test.cc:170
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
grpc_completion_queue_thread_local_cache_init
GRPCAPI void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq)
Definition: completion_queue.cc:452
test_pollset_conversion
static void test_pollset_conversion(void)
Definition: completion_queue_test.cc:90
test_wait_empty
static void test_wait_empty(void)
Definition: completion_queue_test.cc:112
test_cq_tls_cache_empty
static void test_cq_tls_cache_empty(void)
Definition: completion_queue_test.cc:213
shutdown_and_destroy
static void shutdown_and_destroy(grpc_completion_queue *cc)
Definition: completion_queue_test.cc:40
GRPC_CQ_DEFAULT_POLLING
@ GRPC_CQ_DEFAULT_POLLING
Definition: grpc_types.h:743
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
GRPC_CQ_NON_LISTENING
@ GRPC_CQ_NON_LISTENING
Definition: grpc_types.h:748
gpr_cv_destroy
GPRAPI void gpr_cv_destroy(gpr_cv *cv)
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
grpc_get_cq_completion_type
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq)
Definition: completion_queue.cc:581
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
grpc_completion_queue_factory_lookup
const GRPCAPI grpc_completion_queue_factory * grpc_completion_queue_factory_lookup(const grpc_completion_queue_attributes *attributes)
Definition: completion_queue_factory.cc:48
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
grpc_event
Definition: grpc_types.h:564
grpc_completion_queue
Definition: completion_queue.cc:347
grpc.h
grpc_cq_pollset
grpc_pollset * grpc_cq_pollset(grpc_completion_queue *cq)
Definition: completion_queue.cc:1433
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
counter
static int counter
Definition: abseil-cpp/absl/flags/reflection_test.cc:131
gpr_cv_wait
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
test_pluck
static void test_pluck(void)
Definition: completion_queue_test.cc:285
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
create_test_tag
static void * create_test_tag(void)
Definition: completion_queue_test.cc:34
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
LOG_TEST
#define LOG_TEST(x)
Definition: completion_queue_test.cc:32
GRPC_CQ_PLUCK
@ GRPC_CQ_PLUCK
Definition: grpc_types.h:763
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_completion_queue_attributes
Definition: grpc_types.h:791
test_shutdown_then_next_with_timeout
static void test_shutdown_then_next_with_timeout(void)
Definition: completion_queue_test.cc:262
thread_state
Definition: completion_queue_test.cc:485
test_config.h
gpr_inf_past
GPRAPI gpr_timespec gpr_inf_past(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:63
shutdown_cb
static void shutdown_cb(uv_shutdown_t *req, int status)
Definition: benchmark-tcp-write-batch.c:80
grpc_completion_queue_pluck
GRPCAPI grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1328
grpc_cq_begin_op
bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:672
attr
OPENSSL_EXPORT X509_ATTRIBUTE * attr
Definition: x509.h:1666
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
thread_state::cc
grpc_completion_queue * cc
Definition: completion_queue_test.cc:486
cv
unsigned cv
Definition: cxa_demangle.cpp:4908
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
grpc_completion_queue_destroy
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq)
Definition: completion_queue.cc:1424
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
alloc.h
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
grpc_completion_queue_next
GRPCAPI grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1133
grpc_completion_queue_shutdown
GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
Definition: completion_queue.cc:1416
ok
bool ok
Definition: async_end2end_test.cc:197
grpc_completion_queue_functor
Definition: grpc_types.h:773
thread_state::tag
void * tag
Definition: completion_queue_test.cc:487
tag_
void * tag_
Definition: channel_connectivity.cc:211
GRPC_CQ_CALLBACK
@ GRPC_CQ_CALLBACK
Definition: grpc_types.h:766
grpc_event::type
grpc_completion_type type
Definition: grpc_types.h:566
googletest-break-on-failure-unittest.Run
def Run(command)
Definition: bloaty/third_party/googletest/googletest/test/googletest-break-on-failure-unittest.py:76
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc_event::success
int success
Definition: grpc_types.h:572
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
Definition: grpc_types.h:556
test_shutdown_then_next_polling
static void test_shutdown_then_next_polling(void)
Definition: completion_queue_test.cc:240
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
grpc_event::tag
void * tag
Definition: grpc_types.h:576
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
sync.h
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
gpr_cv_init
GPRAPI void gpr_cv_init(gpr_cv *cv)
test_no_op
static void test_no_op(void)
Definition: completion_queue_test.cc:72


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:52