mpscq_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include <inttypes.h>
22 #include <stdlib.h>
23 
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/sync.h>
27 
29 #include "src/core/lib/gprpp/thd.h"
31 
33 
34 typedef struct test_node {
36  size_t i;
37  size_t* ctr;
38 } test_node;
39 
40 static test_node* new_node(size_t i, size_t* ctr) {
41  test_node* n = new test_node();
42  n->i = i;
43  n->ctr = ctr;
44  return n;
45 }
46 
47 static void test_serial(void) {
48  gpr_log(GPR_DEBUG, "test_serial");
50  for (size_t i = 0; i < 10000000; i++) {
51  q.Push(&new_node(i, nullptr)->node);
52  }
53  for (size_t i = 0; i < 10000000; i++) {
54  test_node* n = reinterpret_cast<test_node*>(q.Pop());
55  GPR_ASSERT(n);
56  GPR_ASSERT(n->i == i);
57  delete n;
58  }
59 }
60 
61 typedef struct {
62  size_t ctr;
65 } thd_args;
66 
67 #define THREAD_ITERATIONS 10000
68 
69 static void test_thread(void* args) {
70  thd_args* a = static_cast<thd_args*>(args);
72  for (size_t i = 1; i <= THREAD_ITERATIONS; i++) {
73  a->q->Push(&new_node(i, &a->ctr)->node);
74  }
75 }
76 
77 static void test_mt(void) {
78  gpr_log(GPR_DEBUG, "test_mt");
81  grpc_core::Thread thds[100];
82  thd_args ta[GPR_ARRAY_SIZE(thds)];
84  for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
85  ta[i].ctr = 0;
86  ta[i].q = &q;
87  ta[i].start = &start;
88  thds[i] = grpc_core::Thread("grpc_mt_test", test_thread, &ta[i]);
89  thds[i].Start();
90  }
91  size_t num_done = 0;
92  size_t spins = 0;
93  gpr_event_set(&start, reinterpret_cast<void*>(1));
94  while (num_done != GPR_ARRAY_SIZE(thds)) {
96  while ((n = q.Pop()) == nullptr) {
97  spins++;
98  }
99  test_node* tn = reinterpret_cast<test_node*>(n);
100  GPR_ASSERT(*tn->ctr == tn->i - 1);
101  *tn->ctr = tn->i;
102  if (tn->i == THREAD_ITERATIONS) num_done++;
103  delete tn;
104  }
105  gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, spins);
106  for (auto& th : thds) {
107  th.Join();
108  }
109 }
110 
111 typedef struct {
113  size_t num_thds;
115  size_t num_done;
116  size_t spins;
119 } pull_args;
120 
121 static void pull_thread(void* arg) {
122  pull_args* pa = static_cast<pull_args*>(arg);
124 
125  for (;;) {
126  gpr_mu_lock(&pa->mu);
127  if (pa->num_done == pa->num_thds) {
128  gpr_mu_unlock(&pa->mu);
129  return;
130  }
132  while ((n = pa->q->Pop()) == nullptr) {
133  pa->spins++;
134  }
135  test_node* tn = reinterpret_cast<test_node*>(n);
136  GPR_ASSERT(*tn->ctr == tn->i - 1);
137  *tn->ctr = tn->i;
138  if (tn->i == THREAD_ITERATIONS) pa->num_done++;
139  delete tn;
140  gpr_mu_unlock(&pa->mu);
141  }
142 }
143 
144 static void test_mt_multipop(void) {
145  gpr_log(GPR_DEBUG, "test_mt_multipop");
148  grpc_core::Thread thds[50];
149  grpc_core::Thread pull_thds[50];
150  thd_args ta[GPR_ARRAY_SIZE(thds)];
152  for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
153  ta[i].ctr = 0;
154  ta[i].q = &q;
155  ta[i].start = &start;
156  thds[i] = grpc_core::Thread("grpc_multipop_test", test_thread, &ta[i]);
157  thds[i].Start();
158  }
159  pull_args pa;
160  pa.ta = ta;
161  pa.num_thds = GPR_ARRAY_SIZE(thds);
162  pa.spins = 0;
163  pa.num_done = 0;
164  pa.q = &q;
165  pa.start = &start;
166  gpr_mu_init(&pa.mu);
167  for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) {
168  pull_thds[i] = grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa);
169  pull_thds[i].Start();
170  }
171  gpr_event_set(&start, reinterpret_cast<void*>(1));
172  for (auto& pth : pull_thds) {
173  pth.Join();
174  }
175  gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, pa.spins);
176  for (auto& th : thds) {
177  th.Join();
178  }
179  gpr_mu_destroy(&pa.mu);
180 }
181 
182 int main(int argc, char** argv) {
183  grpc::testing::TestEnvironment env(&argc, argv);
184  test_serial();
185  test_mt();
187  return 0;
188 }
test_node::node
MultiProducerSingleConsumerQueue::Node node
Definition: mpscq_test.cc:35
test_thread
Definition: test-thread.c:49
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
test_node
Definition: mpscq_test.cc:34
log.h
mpscq.h
grpc_core::MultiProducerSingleConsumerQueue::Pop
Node * Pop()
Definition: mpscq.cc:37
opencensus.proto.agent.common.v1.common_pb2.Node
Node
Definition: common_pb2.py:308
generate.env
env
Definition: generate.py:37
main
int main(int argc, char **argv)
Definition: mpscq_test.cc:182
pull_args::q
MultiProducerSingleConsumerQueue * q
Definition: mpscq_test.cc:117
gpr_event_set
GPRAPI void gpr_event_set(gpr_event *ev, void *value)
Definition: sync.cc:59
useful.h
grpc_core::MultiProducerSingleConsumerQueue
Definition: mpscq.h:35
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
a
int a
Definition: abseil-cpp/absl/container/internal/hash_policy_traits_test.cc:88
pull_thread
static void pull_thread(void *arg)
Definition: mpscq_test.cc:121
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
thd_args::q
MultiProducerSingleConsumerQueue * q
Definition: mpscq_test.cc:63
start
static uint64_t start
Definition: benchmark-pound.c:74
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
pull_args::num_done
size_t num_done
Definition: mpscq_test.cc:115
test_serial
static void test_serial(void)
Definition: mpscq_test.cc:47
THREAD_ITERATIONS
#define THREAD_ITERATIONS
Definition: mpscq_test.cc:67
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
test_node
struct test_node test_node
pull_args::num_thds
size_t num_thds
Definition: mpscq_test.cc:113
arg
Definition: cmdline.cc:40
test_mt
static void test_mt(void)
Definition: mpscq_test.cc:77
test_mt_multipop
static void test_mt_multipop(void)
Definition: mpscq_test.cc:144
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
gpr_event_init
GPRAPI void gpr_event_init(gpr_event *ev)
Definition: sync.cc:54
grpc_core::Thread::Start
void Start()
Definition: thd.h:125
n
int n
Definition: abseil-cpp/absl/container/btree_test.cc:1080
gpr_event_wait
GPRAPI void * gpr_event_wait(gpr_event *ev, gpr_timespec abs_deadline)
Definition: sync.cc:73
test_thread
static void test_thread(void *args)
Definition: mpscq_test.cc:69
pull_args::ta
thd_args * ta
Definition: mpscq_test.cc:112
test_config.h
test_node::i
size_t i
Definition: mpscq_test.cc:36
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
gpr_event
Definition: impl/codegen/sync_generic.h:31
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
alloc.h
pull_args::start
gpr_event * start
Definition: mpscq_test.cc:118
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
thd.h
pull_args::mu
gpr_mu mu
Definition: mpscq_test.cc:114
arg
struct arg arg
new_node
static test_node * new_node(size_t i, size_t *ctr)
Definition: mpscq_test.cc:40
grpc_core::Thread
Definition: thd.h:43
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
pull_args::spins
size_t spins
Definition: mpscq_test.cc:116
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
test_node::ctr
size_t * ctr
Definition: mpscq_test.cc:37
thd_args
Definition: bad_client.cc:44
pull_args
Definition: mpscq_test.cc:111
sync.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
thd_args::ctr
size_t ctr
Definition: mpscq_test.cc:62
thd_args::start
gpr_event * start
Definition: mpscq_test.cc:64


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:30