completion_queue_threading_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpc/support/log.h>
22 #include <grpc/support/time.h>
23 
25 #include "src/core/lib/gprpp/thd.h"
29 
30 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
31 
32 static void* create_test_tag(void) {
33  static intptr_t i = 0;
34  return reinterpret_cast<void*>(++i);
35 }
36 
37 /* helper for tests to shutdown correctly and tersely */
39  grpc_event ev;
41 
42  switch (grpc_get_cq_completion_type(cc)) {
43  case GRPC_CQ_NEXT: {
45  nullptr);
46  break;
47  }
48  case GRPC_CQ_PLUCK: {
51  break;
52  }
53  default: {
54  gpr_log(GPR_ERROR, "Unknown completion type");
55  break;
56  }
57  }
58 
61 }
62 
63 static void do_nothing_end_completion(void* /*arg*/,
64  grpc_cq_completion* /*c*/) {}
65 
66 struct thread_state {
68  void* tag;
69 };
70 
71 static void pluck_one(void* arg) {
72  struct thread_state* state = static_cast<struct thread_state*>(arg);
75 }
76 
77 static void test_too_many_plucks(void) {
78  grpc_event ev;
83  struct thread_state thread_states[GPR_ARRAY_SIZE(tags)];
85  unsigned i, j;
86 
87  LOG_TEST("test_too_many_plucks");
88 
90 
91  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
92  tags[i] = create_test_tag();
93  for (j = 0; j < i; j++) {
94  GPR_ASSERT(tags[i] != tags[j]);
95  }
96  thread_states[i].cc = cc;
97  thread_states[i].tag = tags[i];
98  threads[i] =
99  grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i);
100  threads[i].Start();
101  }
102 
103  /* wait until all other threads are plucking */
105 
109 
110  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
113  nullptr, &completions[i]);
114  }
115 
116  for (auto& th : threads) {
117  th.Join();
118  }
119 
121 }
122 
123 #define TEST_THREAD_EVENTS 10000
124 
125 typedef struct test_thread_options {
132  int id;
135 
138 }
139 
140 static void free_completion(void* /*arg*/, grpc_cq_completion* completion) {
142 }
143 
144 static void producer_thread(void* arg) {
145  test_thread_options* opt = static_cast<test_thread_options*>(arg);
146  int i;
147 
148  gpr_log(GPR_INFO, "producer %d started", opt->id);
149  gpr_event_set(&opt->on_started, reinterpret_cast<void*>(1));
151 
152  gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
153  for (i = 0; i < TEST_THREAD_EVENTS; i++) {
154  GPR_ASSERT(grpc_cq_begin_op(opt->cc, (void*)(intptr_t)1));
155  }
156 
157  gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
158  gpr_event_set(&opt->on_phase1_done, reinterpret_cast<void*>(1));
160 
161  gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
162  for (i = 0; i < TEST_THREAD_EVENTS; i++) {
164  grpc_cq_end_op(opt->cc, reinterpret_cast<void*>(1), GRPC_ERROR_NONE,
165  free_completion, nullptr,
166  static_cast<grpc_cq_completion*>(
167  gpr_malloc(sizeof(grpc_cq_completion))));
168  opt->events_triggered++;
169  }
170 
171  gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
172  gpr_event_set(&opt->on_finished, reinterpret_cast<void*>(1));
173 }
174 
175 static void consumer_thread(void* arg) {
176  test_thread_options* opt = static_cast<test_thread_options*>(arg);
177  grpc_event ev;
178 
179  gpr_log(GPR_INFO, "consumer %d started", opt->id);
180  gpr_event_set(&opt->on_started, reinterpret_cast<void*>(1));
182 
183  gpr_log(GPR_INFO, "consumer %d phase 1", opt->id);
184 
185  gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id);
186  gpr_event_set(&opt->on_phase1_done, reinterpret_cast<void*>(1));
188 
189  gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
190  for (;;) {
192  opt->cc, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
193  switch (ev.type) {
194  case GRPC_OP_COMPLETE:
195  GPR_ASSERT(ev.success);
196  opt->events_triggered++;
197  break;
198  case GRPC_QUEUE_SHUTDOWN:
199  gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
200  gpr_event_set(&opt->on_finished, reinterpret_cast<void*>(1));
201  return;
202  case GRPC_QUEUE_TIMEOUT:
203  gpr_log(GPR_ERROR, "Invalid timeout received");
204  abort();
205  }
206  }
207 }
208 
209 static void test_threading(size_t producers, size_t consumers) {
211  gpr_malloc((producers + consumers) * sizeof(test_thread_options)));
212  gpr_event phase1 = GPR_EVENT_INIT;
213  gpr_event phase2 = GPR_EVENT_INIT;
215  size_t i;
216  size_t total_consumed = 0;
217  static int optid = 101;
218 
219  gpr_log(GPR_INFO, "%s: %" PRIuPTR " producers, %" PRIuPTR " consumers",
220  "test_threading", producers, consumers);
221 
222  /* start all threads: they will wait for phase1 */
224  gpr_malloc(sizeof(*threads) * (producers + consumers)));
225  for (i = 0; i < producers + consumers; i++) {
226  gpr_event_init(&options[i].on_started);
227  gpr_event_init(&options[i].on_phase1_done);
228  gpr_event_init(&options[i].on_finished);
229  options[i].phase1 = &phase1;
230  options[i].phase2 = &phase2;
231  options[i].events_triggered = 0;
232  options[i].cc = cc;
233  options[i].id = optid++;
234 
235  bool ok;
237  i < producers ? "grpc_producer" : "grpc_consumer",
238  i < producers ? producer_thread : consumer_thread, options + i, &ok);
239  GPR_ASSERT(ok);
240  threads[i].Start();
241  gpr_event_wait(&options[i].on_started, ten_seconds_time());
242  }
243 
244  /* start phase1: producers will pre-declare all operations they will
245  complete */
246  gpr_log(GPR_INFO, "start phase 1");
247  gpr_event_set(&phase1, reinterpret_cast<void*>(1));
248 
249  gpr_log(GPR_INFO, "wait phase 1");
250  for (i = 0; i < producers + consumers; i++) {
251  GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
252  }
253  gpr_log(GPR_INFO, "done phase 1");
254 
255  /* start phase2: operations will complete, and consumers will consume them */
256  gpr_log(GPR_INFO, "start phase 2");
257  gpr_event_set(&phase2, reinterpret_cast<void*>(1));
258 
259  /* in parallel, we shutdown the completion channel - all events should still
260  be consumed */
262 
263  /* join all threads */
264  gpr_log(GPR_INFO, "wait phase 2");
265  for (i = 0; i < producers + consumers; i++) {
267  }
268  gpr_log(GPR_INFO, "done phase 2");
269 
270  /* destroy the completion channel */
272 
273  for (i = 0; i < producers + consumers; i++) {
274  threads[i].Join();
275  }
276  gpr_free(threads);
277 
278  /* verify that everything was produced and consumed */
279  for (i = 0; i < producers + consumers; i++) {
280  if (i < producers) {
281  GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS);
282  } else {
283  total_consumed += options[i].events_triggered;
284  }
285  }
286  GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS);
287 
288  gpr_free(options);
289 }
290 
291 int main(int argc, char** argv) {
292  grpc::testing::TestEnvironment env(&argc, argv);
293  grpc_init();
295  test_threading(1, 1);
296  test_threading(1, 10);
297  test_threading(10, 1);
298  test_threading(10, 10);
299  grpc_shutdown();
300  return 0;
301 }
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
pluck_one
static void pluck_one(void *arg)
Definition: completion_queue_threading_test.cc:71
iomgr.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc_timeout_seconds_to_deadline
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
Definition: test/core/util/test_config.cc:81
log.h
test_thread_options::events_triggered
size_t events_triggered
Definition: completion_queue_threading_test.cc:131
test_thread_options
struct test_thread_options test_thread_options
generate.env
env
Definition: generate.py:37
GRPC_CQ_NEXT
@ GRPC_CQ_NEXT
Definition: grpc_types.h:760
grpc_completion_queue_create_for_pluck
GRPCAPI grpc_completion_queue * grpc_completion_queue_create_for_pluck(void *reserved)
Definition: completion_queue_factory.cc:69
grpc.framework.interfaces.base.utilities.completion
def completion(terminal_metadata, code, message)
Definition: framework/interfaces/base/utilities.py:45
options
double_dict options[]
Definition: capstone_test.c:55
gpr_event_set
GPRAPI void gpr_event_set(gpr_event *ev, void *value)
Definition: sync.cc:59
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
useful.h
GRPC_QUEUE_SHUTDOWN
@ GRPC_QUEUE_SHUTDOWN
Definition: grpc_types.h:554
GRPC_OP_COMPLETE
@ GRPC_OP_COMPLETE
Definition: grpc_types.h:558
completion_queue.h
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
grpc_cq_completion
Definition: src/core/lib/surface/completion_queue.h:43
test_thread_options::on_finished
gpr_event on_finished
Definition: completion_queue_threading_test.cc:130
ten_seconds_time
gpr_timespec ten_seconds_time(void)
Definition: completion_queue_threading_test.cc:136
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc_cq_end_op
void grpc_cq_end_op(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:894
time.h
do_nothing_end_completion
static void do_nothing_end_completion(void *, grpc_cq_completion *)
Definition: completion_queue_threading_test.cc:63
threads
static uv_thread_t * threads
Definition: threadpool.c:38
gen_build_yaml.struct
def struct(**kwargs)
Definition: test/core/end2end/gen_build_yaml.py:30
main
int main(int argc, char **argv)
Definition: completion_queue_threading_test.cc:291
tags
bool tags[kAvailableTags]
Definition: inproc_callback_test.cc:114
consumer_thread
static void consumer_thread(void *arg)
Definition: completion_queue_threading_test.cc:175
test_threading
static void test_threading(size_t producers, size_t consumers)
Definition: completion_queue_threading_test.cc:209
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_get_cq_completion_type
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq)
Definition: completion_queue.cc:581
grpc_timeout_milliseconds_to_deadline
gpr_timespec grpc_timeout_milliseconds_to_deadline(int64_t time_ms)
Definition: test/core/util/test_config.cc:89
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_event
Definition: grpc_types.h:564
grpc_completion_queue
Definition: completion_queue.cc:347
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
grpc.h
arg
Definition: cmdline.cc:40
create_test_tag
static void * create_test_tag(void)
Definition: completion_queue_threading_test.cc:32
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
gpr_event_init
GPRAPI void gpr_event_init(gpr_event *ev)
Definition: sync.cc:54
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
#define GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
Definition: grpc.h:154
GRPC_CQ_PLUCK
@ GRPC_CQ_PLUCK
Definition: grpc_types.h:763
free_completion
static void free_completion(void *, grpc_cq_completion *completion)
Definition: completion_queue_threading_test.cc:140
grpc_core::ExecCtx
Definition: exec_ctx.h:97
test_thread_options::id
int id
Definition: completion_queue_threading_test.cc:132
gpr_event_wait
GPRAPI void * gpr_event_wait(gpr_event *ev, gpr_timespec abs_deadline)
Definition: sync.cc:73
thread_state
Definition: completion_queue_test.cc:485
test_thread_options::phase2
gpr_event * phase2
Definition: completion_queue_threading_test.cc:129
test_config.h
TEST_THREAD_EVENTS
#define TEST_THREAD_EVENTS
Definition: completion_queue_threading_test.cc:123
gpr_inf_past
GPRAPI gpr_timespec gpr_inf_past(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:63
grpc_completion_queue_pluck
GRPCAPI grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1328
grpc_cq_begin_op
bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:672
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
gpr_event
Definition: impl/codegen/sync_generic.h:31
thread_state::cc
grpc_completion_queue * cc
Definition: completion_queue_test.cc:486
grpc_completion_queue_destroy
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq)
Definition: completion_queue.cc:1424
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
test_thread_options::on_phase1_done
gpr_event on_phase1_done
Definition: completion_queue_threading_test.cc:128
alloc.h
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
thd.h
grpc_completion_queue_next
GRPCAPI grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1133
grpc_completion_queue_shutdown
GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
Definition: completion_queue.cc:1416
ok
bool ok
Definition: async_end2end_test.cc:197
arg
struct arg arg
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc_core::Thread
Definition: thd.h:43
thread_state::tag
void * tag
Definition: completion_queue_test.cc:487
producer_thread
static void producer_thread(void *arg)
Definition: completion_queue_threading_test.cc:144
test_too_many_plucks
static void test_too_many_plucks(void)
Definition: completion_queue_threading_test.cc:77
test_thread_options::phase1
gpr_event * phase1
Definition: completion_queue_threading_test.cc:127
shutdown_and_destroy
static void shutdown_and_destroy(grpc_completion_queue *cc)
Definition: completion_queue_threading_test.cc:38
grpc_completion_queue_create_for_next
GRPCAPI grpc_completion_queue * grpc_completion_queue_create_for_next(void *reserved)
Definition: completion_queue_factory.cc:62
gpr_timespec
Definition: gpr_types.h:50
grpc_event::type
grpc_completion_type type
Definition: grpc_types.h:566
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
GPR_EVENT_INIT
#define GPR_EVENT_INIT
Definition: impl/codegen/sync_generic.h:35
test_thread_options::on_started
gpr_event on_started
Definition: completion_queue_threading_test.cc:126
grpc_event::success
int success
Definition: grpc_types.h:572
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
Definition: grpc_types.h:556
LOG_TEST
#define LOG_TEST(x)
Definition: completion_queue_threading_test.cc:30
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
test_thread_options::cc
grpc_completion_queue * cc
Definition: completion_queue_threading_test.cc:133
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
test_thread_options
Definition: completion_queue_threading_test.cc:125


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:52