mpmcqueue_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include <grpc/grpc.h>
22 
23 #include "src/core/lib/gprpp/thd.h"
25 
26 #define TEST_NUM_ITEMS 10000
27 
28 // Testing items for queue
29 struct WorkItem {
30  int index;
31  bool done;
32 
33  explicit WorkItem(int i) : index(i) { done = false; }
34 };
35 
36 // Thread to "produce" items and put items into queue
37 // It will also check that all items has been marked done and clean up all
38 // produced items on destructing.
40  public:
42  int num_items)
43  : start_index_(start_index), num_items_(num_items), queue_(queue) {
44  items_ = nullptr;
46  "mpmcq_test_producer_thd",
47  [](void* th) { static_cast<ProducerThread*>(th)->Run(); }, this);
48  }
50  for (int i = 0; i < num_items_; ++i) {
52  delete items_[i];
53  }
54  delete[] items_;
55  }
56 
57  void Start() { thd_.Start(); }
58  void Join() { thd_.Join(); }
59 
60  private:
61  void Run() {
62  items_ = new WorkItem*[num_items_];
63  for (int i = 0; i < num_items_; ++i) {
64  items_[i] = new WorkItem(start_index_ + i);
65  queue_->Put(items_[i]);
66  }
67  }
68 
74 };
75 
76 // Thread to pull out items from queue
78  public:
81  "mpmcq_test_consumer_thd",
82  [](void* th) { static_cast<ConsumerThread*>(th)->Run(); }, this);
83  }
85 
86  void Start() { thd_.Start(); }
87  void Join() { thd_.Join(); }
88 
89  private:
90  void Run() {
91  // count number of Get() called in this thread
92  int count = 0;
93 
94  WorkItem* item;
95  while ((item = static_cast<WorkItem*>(queue_->Get(nullptr))) != nullptr) {
96  count++;
97  GPR_ASSERT(!item->done);
98  item->done = true;
99  }
100 
101  gpr_log(GPR_DEBUG, "ConsumerThread: %d times of Get() called.", count);
102  }
105 };
106 
107 static void test_FIFO(void) {
108  gpr_log(GPR_INFO, "test_FIFO");
109  grpc_core::InfLenFIFOQueue large_queue;
110  for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
111  large_queue.Put(static_cast<void*>(new WorkItem(i)));
112  }
113  GPR_ASSERT(large_queue.count() == TEST_NUM_ITEMS);
114  for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
115  WorkItem* item = static_cast<WorkItem*>(large_queue.Get(nullptr));
116  GPR_ASSERT(i == item->index);
117  delete item;
118  }
119 }
120 
121 // Test if queue's behavior of expanding is correct. (Only does expansion when
122 // it gets full, and each time expands to doubled size).
123 static void test_space_efficiency(void) {
124  gpr_log(GPR_INFO, "test_space_efficiency");
126  for (int i = 0; i < queue.init_num_nodes(); ++i) {
127  queue.Put(static_cast<void*>(new WorkItem(i)));
128  }
129  // Queue should not have been expanded at this time.
130  GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
131  for (int i = 0; i < queue.init_num_nodes(); ++i) {
132  WorkItem* item = static_cast<WorkItem*>(queue.Get(nullptr));
133  queue.Put(item);
134  }
135  GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
136  for (int i = 0; i < queue.init_num_nodes(); ++i) {
137  WorkItem* item = static_cast<WorkItem*>(queue.Get(nullptr));
138  delete item;
139  }
140  // Queue never shrinks even it is empty.
141  GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
142  GPR_ASSERT(queue.count() == 0);
143  // queue empty now
144  for (int i = 0; i < queue.init_num_nodes() * 2; ++i) {
145  queue.Put(static_cast<void*>(new WorkItem(i)));
146  }
147  GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2);
148  // Queue should have been expanded once.
149  GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
150  for (int i = 0; i < queue.init_num_nodes(); ++i) {
151  WorkItem* item = static_cast<WorkItem*>(queue.Get(nullptr));
152  delete item;
153  }
154  GPR_ASSERT(queue.count() == queue.init_num_nodes());
155  // Queue will never shrink, should keep same number of node as before.
156  GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
157  for (int i = 0; i < queue.init_num_nodes() + 1; ++i) {
158  queue.Put(static_cast<void*>(new WorkItem(i)));
159  }
160  GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2 + 1);
161  // Queue should have been expanded twice.
162  GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
163  for (int i = 0; i < queue.init_num_nodes() * 2 + 1; ++i) {
164  WorkItem* item = static_cast<WorkItem*>(queue.Get(nullptr));
165  delete item;
166  }
167  GPR_ASSERT(queue.count() == 0);
168  GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
169  gpr_log(GPR_DEBUG, "Done.");
170 }
171 
172 static void test_many_thread(void) {
173  gpr_log(GPR_INFO, "test_many_thread");
174  const int num_producer_threads = 10;
175  const int num_consumer_threads = 20;
177  ProducerThread** producer_threads = new ProducerThread*[num_producer_threads];
178  ConsumerThread** consumer_threads = new ConsumerThread*[num_consumer_threads];
179 
180  gpr_log(GPR_DEBUG, "Fork ProducerThreads...");
181  for (int i = 0; i < num_producer_threads; ++i) {
182  producer_threads[i] =
184  producer_threads[i]->Start();
185  }
186  gpr_log(GPR_DEBUG, "ProducerThreads Started.");
187  gpr_log(GPR_DEBUG, "Fork ConsumerThreads...");
188  for (int i = 0; i < num_consumer_threads; ++i) {
189  consumer_threads[i] = new ConsumerThread(&queue);
190  consumer_threads[i]->Start();
191  }
192  gpr_log(GPR_DEBUG, "ConsumerThreads Started.");
193  gpr_log(GPR_DEBUG, "Waiting ProducerThreads to finish...");
194  for (int i = 0; i < num_producer_threads; ++i) {
195  producer_threads[i]->Join();
196  }
197  gpr_log(GPR_DEBUG, "All ProducerThreads Terminated.");
198  gpr_log(GPR_DEBUG, "Terminating ConsumerThreads...");
199  for (int i = 0; i < num_consumer_threads; ++i) {
200  queue.Put(nullptr);
201  }
202  for (int i = 0; i < num_consumer_threads; ++i) {
203  consumer_threads[i]->Join();
204  }
205  gpr_log(GPR_DEBUG, "All ConsumerThreads Terminated.");
206  gpr_log(GPR_DEBUG, "Checking WorkItems and Cleaning Up...");
207  for (int i = 0; i < num_producer_threads; ++i) {
208  // Destructor of ProducerThread will do the check of WorkItems
209  delete producer_threads[i];
210  }
211  delete[] producer_threads;
212  for (int i = 0; i < num_consumer_threads; ++i) {
213  delete consumer_threads[i];
214  }
215  delete[] consumer_threads;
216  gpr_log(GPR_DEBUG, "Done.");
217 }
218 
219 int main(int argc, char** argv) {
220  grpc::testing::TestEnvironment env(&argc, argv);
221  grpc_init();
222  test_FIFO();
225  grpc_shutdown();
226  return 0;
227 }
ConsumerThread::Join
void Join()
Definition: mpmcqueue_test.cc:87
test_FIFO
static void test_FIFO(void)
Definition: mpmcqueue_test.cc:107
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::InfLenFIFOQueue
Definition: mpmcqueue.h:52
ConsumerThread
Definition: mpmcqueue_test.cc:77
ConsumerThread::Start
void Start()
Definition: mpmcqueue_test.cc:86
generate.env
env
Definition: generate.py:37
ConsumerThread::ConsumerThread
ConsumerThread(grpc_core::InfLenFIFOQueue *queue)
Definition: mpmcqueue_test.cc:79
mpmcqueue.h
ConsumerThread::Run
void Run()
Definition: mpmcqueue_test.cc:90
WorkItem
Definition: mpmcqueue_test.cc:29
grpc_core::InfLenFIFOQueue::Put
void Put(void *elem) override
Definition: mpmcqueue.cc:102
ProducerThread::items_
WorkItem ** items_
Definition: mpmcqueue_test.cc:73
ConsumerThread::~ConsumerThread
~ConsumerThread()
Definition: mpmcqueue_test.cc:84
ProducerThread::Join
void Join()
Definition: mpmcqueue_test.cc:58
grpc_core::InfLenFIFOQueue::count
int count() const override
Definition: mpmcqueue.h:74
test_space_efficiency
static void test_space_efficiency(void)
Definition: mpmcqueue_test.cc:123
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
queue
Definition: sync_test.cc:39
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc.h
ProducerThread::start_index_
int start_index_
Definition: mpmcqueue_test.cc:69
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
grpc_core::Thread::Join
void Join()
Definition: thd.h:141
ProducerThread::Start
void Start()
Definition: mpmcqueue_test.cc:57
TEST_NUM_ITEMS
#define TEST_NUM_ITEMS
Definition: mpmcqueue_test.cc:26
WorkItem::WorkItem
WorkItem(int i)
Definition: mpmcqueue_test.cc:33
ConsumerThread::queue_
grpc_core::InfLenFIFOQueue * queue_
Definition: mpmcqueue_test.cc:103
grpc_core::Thread::Start
void Start()
Definition: thd.h:125
main
int main(int argc, char **argv)
Definition: mpmcqueue_test.cc:219
test_config.h
ProducerThread::~ProducerThread
~ProducerThread()
Definition: mpmcqueue_test.cc:49
queue
struct queue queue
count
int * count
Definition: bloaty/third_party/googletest/googlemock/test/gmock_stress_test.cc:96
ProducerThread::queue_
grpc_core::InfLenFIFOQueue * queue_
Definition: mpmcqueue_test.cc:71
WorkItem::done
bool done
Definition: mpmcqueue_test.cc:31
WorkItem::index
int index
Definition: mpmcqueue_test.cc:30
ProducerThread::ProducerThread
ProducerThread(grpc_core::InfLenFIFOQueue *queue, int start_index, int num_items)
Definition: mpmcqueue_test.cc:41
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
ProducerThread
Definition: mpmcqueue_test.cc:39
thd.h
ConsumerThread::thd_
grpc_core::Thread thd_
Definition: mpmcqueue_test.cc:104
grpc_core::Thread
Definition: thd.h:43
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
grpc_core::InfLenFIFOQueue::Get
void * Get(gpr_timespec *wait_time) override
Definition: mpmcqueue.cc:142
ProducerThread::num_items_
int num_items_
Definition: mpmcqueue_test.cc:70
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
ProducerThread::thd_
grpc_core::Thread thd_
Definition: mpmcqueue_test.cc:72
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
ProducerThread::Run
void Run()
Definition: mpmcqueue_test.cc:61
test_many_thread
static void test_many_thread(void)
Definition: mpmcqueue_test.cc:172


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:41