pollset_windows_starvation_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <vector>
19 
20 #include <grpc/grpc.h>
21 #include <grpc/support/time.h>
22 
23 #include "src/core/lib/gprpp/thd.h"
31 
32 #if defined(GRPC_WINSOCK_SOCKET)
33 
34 // At least three threads are required to reproduce #18848
35 const size_t THREADS = 3;
36 
37 struct ThreadParams {
38  gpr_cv cv;
39  gpr_mu mu;
40  int complete;
41  int queuing;
42  gpr_mu* pollset_mu[THREADS];
43 };
44 
45 int main(int argc, char** argv) {
46  grpc_init();
47 
48  // Create the threads that all start queueing for work.
49  //
50  // The first one becomes the active poller for work and the two other
51  // threads go into the poller queue.
52  //
53  // When work arrives, the first one notifies the next queued poller,
54  // this wakes the second thread - however all this does is return from
55  // the grpc_pollset_work function. It's up to that thread to figure
56  // out if it still wants to queue for more work or if it should kick
57  // other pollers.
58  //
59  // Previously that kick only affected pollers in the same pollset, thus
60  // leaving the other threads stuck in the poller queue. Now the pollset-
61  // specific grpc_pollset_kick will also kick pollers from other pollsets
62  // if there are no pollers in the current pollset. This frees up the
63  // last threads and completes the test.
64  ThreadParams params = {};
65  gpr_cv_init(&params.cv);
66  gpr_mu_init(&params.mu);
67  std::vector<grpc_core::Thread> threads;
68  for (int i = 0; i < THREADS; i++) {
70  "Poller",
71  [](void* params) {
72  ThreadParams* tparams = static_cast<ThreadParams*>(params);
74 
75  gpr_mu* mu;
76  grpc_pollset pollset = {};
77  grpc_pollset_init(&pollset, &mu);
78 
79  // Lock the pollset mutex before notifying the test runner thread that
80  // one more thread is queuing. This allows the test runner thread to
81  // wait for all threads to be queued before sending the first kick by
82  // waiting for the mutexes to be released, which happens in
83  // gpr_pollset_work when the poller is queued.
84  gpr_mu_lock(mu);
85 
86  gpr_mu_lock(&tparams->mu);
87  tparams->pollset_mu[tparams->queuing] = mu;
88  tparams->queuing++;
89  gpr_cv_signal(&tparams->cv);
90  gpr_mu_unlock(&tparams->mu);
91 
92  // Queue for work and once we're done, make sure to kick the remaining
93  // threads.
95  error = grpc_pollset_work(&pollset, NULL,
97  error = grpc_pollset_kick(&pollset, NULL);
98 
100 
101  gpr_mu_lock(&tparams->mu);
102  tparams->complete++;
103  gpr_cv_signal(&tparams->cv);
104  gpr_mu_unlock(&tparams->mu);
105  },
106  &params);
107  thd.Start();
108  threads.push_back(std::move(thd));
109  }
110 
111  // Wait for all three threads to be queuing.
112  gpr_mu_lock(&params.mu);
113  while (
114  params.queuing != THREADS &&
115  !gpr_cv_wait(&params.cv, &params.mu, gpr_inf_future(GPR_CLOCK_REALTIME)))
116  ;
117  gpr_mu_unlock(&params.mu);
118 
119  // Wait for the mutexes to be released. This indicates that the threads have
120  // entered the work wait.
121  //
122  // At least currently these are essentially all references to the same global
123  // pollset mutex, but we are still waiting on them once for each thread in
124  // the case this ever changes.
125  for (int i = 0; i < THREADS; i++) {
126  gpr_mu_lock(params.pollset_mu[i]);
127  gpr_mu_unlock(params.pollset_mu[i]);
128  }
129 
130  grpc_iocp_kick();
131 
132  // Wait for the threads to complete.
133  gpr_mu_lock(&params.mu);
134  while (
135  params.complete != THREADS &&
136  !gpr_cv_wait(&params.cv, &params.mu, gpr_inf_future(GPR_CLOCK_REALTIME)))
137  ;
138  gpr_mu_unlock(&params.mu);
139 
140  for (auto& t : threads) t.Join();
141  return EXIT_SUCCESS;
142 }
143 #else /* defined(GRPC_WINSOCK_SOCKET) */
144 int main(int /*argc*/, char** /*argv*/) { return 0; }
145 #endif
gpr_cv_signal
GPRAPI void gpr_cv_signal(gpr_cv *cv)
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
pollset.h
pollset_windows.h
iomgr_internal.h
gpr_cv
pthread_cond_t gpr_cv
Definition: impl/codegen/sync_posix.h:48
main
int main(int, char **)
Definition: pollset_windows_starvation_test.cc:144
error
grpc_error_handle error
Definition: retry_filter.cc:499
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc_pollset_work
grpc_error_handle grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker **worker, grpc_core::Timestamp deadline)
Definition: pollset.cc:45
time.h
iocp_windows.h
threads
static uv_thread_t * threads
Definition: threadpool.c:38
grpc_pollset_init
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu)
Definition: pollset.cc:33
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
init.h
grpc.h
gpr_cv_wait
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_pollset_kick
grpc_error_handle grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker)
Definition: pollset.cc:51
grpc_core::ExecCtx
Definition: exec_ctx.h:97
test_config.h
cv
unsigned cv
Definition: cxa_demangle.cpp:4908
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
absl::str_format_internal::LengthMod::t
@ t
thd.h
exec_ctx.h
grpc_core::Thread
Definition: thd.h:43
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
profile_analyzer.thd
thd
Definition: profile_analyzer.py:168
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
grpc_error
Definition: error_internal.h:42
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
gpr_cv_init
GPRAPI void gpr_cv_init(gpr_cv *cv)


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:44