work_serializer_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include <memory>
22 #include <thread>
23 
24 #include <gtest/gtest.h>
25 
26 #include "absl/memory/memory.h"
27 #include "absl/synchronization/barrier.h"
28 #include "absl/synchronization/notification.h"
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 
35 #include "src/core/lib/gprpp/thd.h"
38 
39 namespace {
40 TEST(WorkSerializerTest, NoOp) { grpc_core::WorkSerializer lock; }
41 
42 TEST(WorkSerializerTest, ExecuteOneRun) {
46  lock.Run([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
49  nullptr);
50 }
51 
52 TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
56  lock.Schedule([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
58  EXPECT_EQ(gpr_event_get(&done), nullptr);
59  lock.DrainQueue();
61  nullptr);
62 }
63 
64 class TestThread {
65  public:
66  explicit TestThread(grpc_core::WorkSerializer* lock)
67  : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
69  thread_.Start();
70  }
71 
72  ~TestThread() {
74  nullptr);
75  thread_.Join();
76  }
77 
78  private:
79  static void ExecuteManyLoop(void* arg) {
80  TestThread* self = static_cast<TestThread*>(arg);
81  size_t n = 1;
82  for (size_t i = 0; i < 10; i++) {
83  for (size_t j = 0; j < 10000; j++) {
84  struct ExecutionArgs {
85  size_t* counter;
86  size_t value;
87  };
88  ExecutionArgs* c = new ExecutionArgs;
89  c->counter = &self->counter_;
90  c->value = n++;
91  self->lock_->Run(
92  [c]() {
93  EXPECT_TRUE(*c->counter == c->value - 1);
94  *c->counter = c->value;
95  delete c;
96  },
98  }
99  // sleep for a little bit, to test other threads picking up the load
101  }
102  self->lock_->Run(
103  [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
105  }
106 
107  grpc_core::WorkSerializer* lock_ = nullptr;
109  size_t counter_ = 0;
111 };
112 
113 TEST(WorkSerializerTest, ExecuteMany) {
115  {
116  std::vector<std::unique_ptr<TestThread>> threads;
117  for (size_t i = 0; i < 100; ++i) {
118  threads.push_back(absl::make_unique<TestThread>(&lock));
119  }
120  }
121 }
122 
123 class TestThreadScheduleAndDrain {
124  public:
125  explicit TestThreadScheduleAndDrain(grpc_core::WorkSerializer* lock)
126  : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
128  thread_.Start();
129  }
130 
131  ~TestThreadScheduleAndDrain() {
133  nullptr);
134  thread_.Join();
135  }
136 
137  private:
138  static void ExecuteManyLoop(void* arg) {
139  TestThreadScheduleAndDrain* self =
140  static_cast<TestThreadScheduleAndDrain*>(arg);
141  size_t n = 1;
142  for (size_t i = 0; i < 10; i++) {
143  for (size_t j = 0; j < 10000; j++) {
144  struct ExecutionArgs {
145  size_t* counter;
146  size_t value;
147  };
148  ExecutionArgs* c = new ExecutionArgs;
149  c->counter = &self->counter_;
150  c->value = n++;
151  self->lock_->Schedule(
152  [c]() {
153  EXPECT_TRUE(*c->counter == c->value - 1);
154  *c->counter = c->value;
155  delete c;
156  },
158  }
159  self->lock_->DrainQueue();
160  // sleep for a little bit, to test other threads picking up the load
162  }
163  self->lock_->Run(
164  [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
166  }
167 
168  grpc_core::WorkSerializer* lock_ = nullptr;
170  size_t counter_ = 0;
172 };
173 
174 TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
176  {
177  std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> threads;
178  for (size_t i = 0; i < 100; ++i) {
179  threads.push_back(absl::make_unique<TestThreadScheduleAndDrain>(&lock));
180  }
181  }
182 }
183 
184 TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
186  {
187  std::vector<std::unique_ptr<TestThread>> run_threads;
188  std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> schedule_threads;
189  for (size_t i = 0; i < 50; ++i) {
190  run_threads.push_back(absl::make_unique<TestThread>(&lock));
191  schedule_threads.push_back(
192  absl::make_unique<TestThreadScheduleAndDrain>(&lock));
193  }
194  }
195 }
196 
197 // Tests that work serializers allow destruction from the last callback
198 TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) {
199  auto lock = std::make_shared<grpc_core::WorkSerializer>();
200  lock->Run([&]() { lock.reset(); }, DEBUG_LOCATION);
201 }
202 
203 // Tests additional racy conditions when the last callback triggers work
204 // serializer destruction.
205 TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
206  for (int i = 0; i < 1000; ++i) {
207  auto lock = std::make_shared<grpc_core::WorkSerializer>();
209  std::thread t1([&]() {
210  notification.WaitForNotification();
211  lock.reset();
212  });
213  lock->Run([&]() { notification.Notify(); }, DEBUG_LOCATION);
214  t1.join();
215  }
216 }
217 
218 // Tests racy conditions when the last callback triggers work
219 // serializer destruction.
220 TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
221  auto lock = std::make_shared<grpc_core::WorkSerializer>();
222  absl::Barrier barrier(51);
223  std::vector<std::thread> threads;
224  threads.reserve(50);
225  for (int i = 0; i < 50; ++i) {
226  threads.emplace_back([lock, &barrier]() mutable {
227  barrier.Block();
228  lock->Run([lock]() mutable { lock.reset(); }, DEBUG_LOCATION);
229  });
230  }
231  barrier.Block();
232  lock.reset();
233  for (auto& thread : threads) {
234  thread.join();
235  }
236 }
237 
238 } // namespace
239 
240 int main(int argc, char** argv) {
241  grpc::testing::TestEnvironment env(&argc, argv);
242  ::testing::InitGoogleTest(&argc, argv);
243  grpc_init();
244  int retval = RUN_ALL_TESTS();
245  grpc_shutdown();
246  return retval;
247 }
grpc_timeout_seconds_to_deadline
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
Definition: test/core/util/test_config.cc:81
log.h
absl::str_format_internal::LengthMod::j
@ j
grpc_core::WorkSerializer::DrainQueue
void DrainQueue()
Definition: work_serializer.cc:229
gpr_event_get
GPRAPI void * gpr_event_get(gpr_event *ev)
Definition: sync.cc:69
generate.env
env
Definition: generate.py:37
NoOp
Definition: bm_call_create.cc:477
counter_
int counter_
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/tokenizer_unittest.cc:150
grpc_core::WorkSerializer
Definition: work_serializer.h:51
gpr_event_set
GPRAPI void gpr_event_set(gpr_event *ev, void *value)
Definition: sync.cc:59
useful.h
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc_core::WorkSerializer::Run
void Run(std::function< void()> callback, const DebugLocation &location)
Definition: work_serializer.cc:219
done_
std::atomic< bool > done_
Definition: fuzzing_event_engine_test.cc:57
threads
static uv_thread_t * threads
Definition: threadpool.c:38
grpc_core::WorkSerializer::Schedule
void Schedule(std::function< void()> callback, const DebugLocation &location)
Definition: work_serializer.cc:224
EXPECT_EQ
#define EXPECT_EQ(a, b)
Definition: iomgr/time_averaged_stats_test.cc:27
absl::Notification
Definition: abseil-cpp/absl/synchronization/notification.h:66
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
c
void c(T a)
Definition: miscompile_with_no_unique_address_test.cc:40
TEST
#define TEST(name, init_size,...)
Definition: arena_test.cc:75
EXPECT_NE
#define EXPECT_NE(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2028
grpc_timeout_milliseconds_to_deadline
gpr_timespec grpc_timeout_milliseconds_to_deadline(int64_t time_ms)
Definition: test/core/util/test_config.cc:89
work_serializer.h
absl::Barrier
Definition: abseil-cpp/absl/synchronization/barrier.h:50
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
grpc.h
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
counter
static int counter
Definition: abseil-cpp/absl/flags/reflection_test.cc:131
arg
Definition: cmdline.cc:40
gpr_event_init
GPRAPI void gpr_event_init(gpr_event *ev)
Definition: sync.cc:54
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
n
int n
Definition: abseil-cpp/absl/container/btree_test.cc:1080
gpr_event_wait
GPRAPI void * gpr_event_wait(gpr_event *ev, gpr_timespec abs_deadline)
Definition: sync.cc:73
executor.h
test_config.h
value
const char * value
Definition: hpack_parser_table.cc:165
main
int main(int argc, char **argv)
Definition: work_serializer_test.cc:240
gpr_event
Definition: impl/codegen/sync_generic.h:31
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
notification
Definition: alts_tsi_handshaker_test.cc:76
notification
struct notification notification
alloc.h
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
thd.h
arg
struct arg arg
grpc_core::Thread
Definition: thd.h:43
EXPECT_TRUE
#define EXPECT_TRUE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1967
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
t1
Table t1
Definition: abseil-cpp/absl/container/internal/raw_hash_set_allocator_test.cc:185
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
thread_
std::unique_ptr< std::thread > thread_
Definition: settings_timeout_test.cc:104


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:53