thread_manager.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include <stdlib.h>
22 
23 #include <climits>
24 
25 #include <grpc/support/log.h>
26 
28 #include "src/core/lib/gprpp/thd.h"
30 
31 namespace grpc {
32 
34  : thd_mgr_(thd_mgr) {
35  // Make thread creation exclusive with respect to its join happening in
36  // ~WorkerThread().
38  "grpcpp_sync_server",
39  [](void* th) { static_cast<ThreadManager::WorkerThread*>(th)->Run(); },
40  this, &created_);
41  if (!created_) {
42  gpr_log(GPR_ERROR, "Could not create grpc_sync_server worker-thread");
43  }
44 }
45 
47  thd_mgr_->MainWorkLoop();
48  thd_mgr_->MarkAsCompleted(this);
49 }
50 
52  // Don't join until the thread is fully constructed.
53  thd_.Join();
54 }
55 
57  int min_pollers, int max_pollers)
58  : shutdown_(false),
60  grpc_core::ResourceQuota::FromC(resource_quota)->thread_quota()),
61  num_pollers_(0),
62  min_pollers_(min_pollers),
63  max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
64  num_threads_(0),
66 
68  {
71  }
72 
74 }
75 
78  while (num_threads_ != 0) {
80  }
81 }
82 
85  shutdown_ = true;
86 }
87 
90  return shutdown_;
91 }
92 
94  grpc_core::MutexLock list_lock(&list_mu_);
96 }
97 
99  {
100  grpc_core::MutexLock list_lock(&list_mu_);
101  completed_threads_.push_back(thd);
102  }
103 
104  {
105  grpc_core::MutexLock lock(&mu_);
106  num_threads_--;
107  if (num_threads_ == 0) {
109  }
110  }
111 
112  // Give a thread back to the resource quota
113  thread_quota_->Release(1);
114 }
115 
117  std::list<WorkerThread*> completed_threads;
118  {
119  // swap out the completed threads list: allows other threads to clean up
120  // more quickly
122  completed_threads.swap(completed_threads_);
123  }
124  for (auto thd : completed_threads) delete thd;
125 }
126 
128  if (!thread_quota_->Reserve(min_pollers_)) {
130  "No thread quota available to even create the minimum required "
131  "polling threads (i.e %d). Unable to start the thread manager",
132  min_pollers_);
133  abort();
134  }
135 
136  {
137  grpc_core::MutexLock lock(&mu_);
141  }
142 
143  for (int i = 0; i < min_pollers_; i++) {
144  WorkerThread* worker = new WorkerThread(this);
145  GPR_ASSERT(worker->created()); // Must be able to create the minimum
146  worker->Start();
147  }
148 }
149 
151  while (true) {
152  void* tag;
153  bool ok;
154  WorkStatus work_status = PollForWork(&tag, &ok);
155 
157  // Reduce the number of pollers by 1 and check what happened with the poll
158  num_pollers_--;
159  bool done = false;
160  switch (work_status) {
161  case TIMEOUT:
162  // If we timed out and we have more pollers than we need (or we are
163  // shutdown), finish this thread
164  if (shutdown_ || num_pollers_ > max_pollers_) done = true;
165  break;
166  case SHUTDOWN:
167  // If the thread manager is shutdown, finish this thread
168  done = true;
169  break;
170  case WORK_FOUND:
171  // If we got work and there are now insufficient pollers and there is
172  // quota available to create a new thread, start a new poller thread
173  bool resource_exhausted = false;
174  if (!shutdown_ && num_pollers_ < min_pollers_) {
175  if (thread_quota_->Reserve(1)) {
176  // We can allocate a new poller thread
177  num_pollers_++;
178  num_threads_++;
181  }
182  // Drop lock before spawning thread to avoid contention
183  lock.Release();
184  WorkerThread* worker = new WorkerThread(this);
185  if (worker->created()) {
186  worker->Start();
187  } else {
188  // Get lock again to undo changes to poller/thread counters.
189  grpc_core::MutexLock failure_lock(&mu_);
190  num_pollers_--;
191  num_threads_--;
192  resource_exhausted = true;
193  delete worker;
194  }
195  } else if (num_pollers_ > 0) {
196  // There is still at least some thread polling, so we can go on
197  // even though we are below the number of pollers that we would
198  // like to have (min_pollers_)
199  lock.Release();
200  } else {
201  // There are no pollers to spare and we couldn't allocate
202  // a new thread, so resources are exhausted!
203  lock.Release();
204  resource_exhausted = true;
205  }
206  } else {
207  // There are a sufficient number of pollers available so we can do
208  // the work and continue polling with our existing poller threads
209  lock.Release();
210  }
211  // Lock is always released at this point - do the application work
212  // or return resource exhausted if there is new work but we couldn't
213  // get a thread in which to do it.
214  DoWork(tag, ok, !resource_exhausted);
215  // Take the lock again to check post conditions
216  lock.Lock();
217  // If we're shutdown, we should finish at this point.
218  if (shutdown_) done = true;
219  break;
220  }
221  // If we decided to finish the thread, break out of the while loop
222  if (done) break;
223 
224  // Otherwise go back to polling as long as it doesn't exceed max_pollers_
225  //
226  // **WARNING**:
227  // There is a possibility of threads thrashing here (i.e excessive thread
228  // shutdowns and creations than the ideal case). This happens if max_poller_
229  // count is small and the rate of incoming requests is also small. In such
230  // scenarios we can possibly configure max_pollers_ to a higher value and/or
231  // increase the cq timeout.
232  //
233  // However, not doing this check here and unconditionally incrementing
234  // num_pollers (and hoping that the system will eventually settle down) has
235  // far worse consequences i.e huge number of threads getting created to the
236  // point of thread-exhaustion. For example: if the incoming request rate is
237  // very high, all the polling threads will return very quickly from
238  // PollForWork() with WORK_FOUND. They all briefly decrement num_pollers_
239  // counter thereby possibly - and briefly - making it go below min_pollers;
240  // This will most likely result in the creation of a new poller since
241  // num_pollers_ dipped below min_pollers_.
242  //
243  // Now, If we didn't do the max_poller_ check here, all these threads will
244  // go back to doing PollForWork() and the whole cycle repeats (with a new
245  // thread being added in each cycle). Once the total number of threads in
246  // the system crosses a certain threshold (around ~1500), there is heavy
247  // contention on mutexes (the mu_ here or the mutexes in gRPC core like the
248  // pollset mutex) that makes DoWork() take longer to finish thereby causing
249  // new poller threads to be created even faster. This results in a thread
250  // avalanche.
251  if (num_pollers_ < max_pollers_) {
252  num_pollers_++;
253  } else {
254  break;
255  }
256  };
257 
258  // This thread is exiting. Do some cleanup work i.e delete already completed
259  // worker threads
261 
262  // If we are here, either ThreadManager is shutting down or it already has
263  // enough threads.
264 }
265 
266 } // namespace grpc
grpc::ThreadManager::SHUTDOWN
@ SHUTDOWN
Definition: src/cpp/thread_manager/thread_manager.h:41
thread_manager.h
grpc_core::LockableAndReleasableMutexLock::Lock
void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:181
log.h
grpc
Definition: grpcpp/alarm.h:33
grpc::ThreadManager::shutdown_
bool shutdown_
Definition: src/cpp/thread_manager/thread_manager.h:146
grpc_resource_quota
struct grpc_resource_quota grpc_resource_quota
Definition: grpc_types.h:729
false
#define false
Definition: setup_once.h:323
grpc::ThreadManager::IsShutdown
bool IsShutdown()
Definition: thread_manager.cc:88
grpc_core
Definition: call_metric_recorder.h:31
worker
static void worker(void *arg)
Definition: threadpool.c:57
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc::ThreadManager::Shutdown
virtual void Shutdown()
Definition: thread_manager.cc:83
grpc::ThreadManager::ThreadManager
ThreadManager(const char *name, grpc_resource_quota *resource_quota, int min_pollers, int max_pollers)
Definition: thread_manager.cc:56
grpc::ThreadManager::PollForWork
virtual WorkStatus PollForWork(void **tag, bool *ok)=0
grpc::ThreadManager::WorkerThread::WorkerThread
WorkerThread(ThreadManager *thd_mgr)
Definition: thread_manager.cc:33
grpc::ThreadManager::thread_quota_
grpc_core::ThreadQuotaPtr thread_quota_
Definition: src/cpp/thread_manager/thread_manager.h:155
grpc::ResourceQuota
Definition: include/grpcpp/resource_quota.h:34
resource_quota
ResourceQuotaRefPtr resource_quota
Definition: filter_fuzzer.cc:145
grpc::ThreadManager::num_threads_
int num_threads_
Definition: src/cpp/thread_manager/thread_manager.h:166
grpc::ThreadManager::num_pollers_
int num_pollers_
Definition: src/cpp/thread_manager/thread_manager.h:158
grpc::ThreadManager::completed_threads_
std::list< WorkerThread * > completed_threads_
Definition: src/cpp/thread_manager/thread_manager.h:174
grpc_core::LockableAndReleasableMutexLock
Definition: src/core/lib/gprpp/sync.h:165
grpc::ThreadManager::WorkerThread
Definition: src/cpp/thread_manager/thread_manager.h:118
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
grpc::ThreadManager::WORK_FOUND
@ WORK_FOUND
Definition: src/cpp/thread_manager/thread_manager.h:41
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
worker
Definition: worker.py:1
grpc_core::CondVar::Signal
void Signal()
Definition: src/core/lib/gprpp/sync.h:134
grpc::ThreadManager::Initialize
void Initialize()
Definition: thread_manager.cc:127
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
grpc::ThreadManager::WorkerThread::thd_
grpc_core::Thread thd_
Definition: src/cpp/thread_manager/thread_manager.h:132
grpc::ThreadManager::mu_
grpc_core::Mutex mu_
Definition: src/cpp/thread_manager/thread_manager.h:144
grpc::ThreadManager::min_pollers_
int min_pollers_
Definition: src/cpp/thread_manager/thread_manager.h:161
grpc_core::LockableAndReleasableMutexLock::Release
void Release() ABSL_UNLOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:187
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc::ThreadManager::WorkerThread::created_
bool created_
Definition: src/cpp/thread_manager/thread_manager.h:133
resource_quota.h
grpc::ThreadManager::WorkerThread::~WorkerThread
~WorkerThread()
Definition: thread_manager.cc:51
grpc::ThreadManager::MainWorkLoop
void MainWorkLoop()
Definition: thread_manager.cc:150
grpc::ThreadManager::DoWork
virtual void DoWork(void *tag, bool ok, bool resources)=0
grpc::ThreadManager::Wait
virtual void Wait()
Definition: thread_manager.cc:76
grpc::ThreadManager::~ThreadManager
virtual ~ThreadManager()
Definition: thread_manager.cc:67
grpc::ThreadManager::WorkerThread::Run
void Run()
Definition: thread_manager.cc:46
grpc::ThreadManager::MarkAsCompleted
void MarkAsCompleted(WorkerThread *thd)
Definition: thread_manager.cc:98
grpc_core::CondVar::Wait
void Wait(Mutex *mu)
Definition: src/core/lib/gprpp/sync.h:137
thd.h
grpc::ThreadManager::shutdown_cv_
grpc_core::CondVar shutdown_cv_
Definition: src/cpp/thread_manager/thread_manager.h:147
ok
bool ok
Definition: async_end2end_test.cc:197
grpc::ThreadManager::max_pollers_
int max_pollers_
Definition: src/cpp/thread_manager/thread_manager.h:162
grpc_core::Thread
Definition: thd.h:43
grpc::ThreadManager
Definition: src/cpp/thread_manager/thread_manager.h:31
ref_counted_ptr.h
grpc::ThreadManager::GetMaxActiveThreadsSoFar
int GetMaxActiveThreadsSoFar()
Definition: thread_manager.cc:93
profile_analyzer.thd
thd
Definition: profile_analyzer.py:168
grpc::ThreadManager::max_active_threads_sofar_
int max_active_threads_sofar_
Definition: src/cpp/thread_manager/thread_manager.h:171
grpc::ThreadManager::CleanupCompletedThreads
void CleanupCompletedThreads()
Definition: thread_manager.cc:116
grpc::ThreadManager::WorkStatus
WorkStatus
Definition: src/cpp/thread_manager/thread_manager.h:41
grpc::ThreadManager::TIMEOUT
@ TIMEOUT
Definition: src/cpp/thread_manager/thread_manager.h:41
grpc::ThreadManager::list_mu_
grpc_core::Mutex list_mu_
Definition: src/cpp/thread_manager/thread_manager.h:173
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:37