dynamic_thread_pool.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include "src/core/lib/gprpp/thd.h"
22 
23 namespace grpc {
24 
26  : pool_(pool),
27  thd_(
28  "grpcpp_dynamic_pool",
29  [](void* th) {
30  static_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc();
31  },
32  this) {
33  thd_.Start();
34 }
36 
38  pool_->ThreadFunc();
39  // Now that we have killed ourselves, we should reduce the thread count
40  grpc_core::MutexLock lock(&pool_->mu_);
41  pool_->nthreads_--;
42  // Move ourselves to dead list
43  pool_->dead_threads_.push_back(this);
44 
45  if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
46  pool_->shutdown_cv_.Signal();
47  }
48 }
49 
51  for (;;) {
52  // Wait until work is available or we are shutting down.
54  if (!shutdown_ && callbacks_.empty()) {
55  // If there are too many threads waiting, then quit this thread
57  break;
58  }
60  cv_.Wait(&mu_);
62  }
63  // Drain callbacks before considering shutdown to ensure all work
64  // gets completed.
65  if (!callbacks_.empty()) {
66  auto cb = callbacks_.front();
67  callbacks_.pop();
68  lock.Release();
69  cb();
70  } else if (shutdown_) {
71  break;
72  }
73  }
74 }
75 
77  : shutdown_(false),
78  reserve_threads_(reserve_threads),
79  nthreads_(0),
80  threads_waiting_(0) {
81  for (int i = 0; i < reserve_threads_; i++) {
83  nthreads_++;
84  new DynamicThread(this);
85  }
86 }
87 
88 void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
89  for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
90  delete *t;
91  }
92 }
93 
96  shutdown_ = true;
97  cv_.SignalAll();
98  while (nthreads_ != 0) {
100  }
102 }
103 
105  grpc_core::MutexLock lock(&mu_);
106  // Add works to the callbacks list
107  callbacks_.push(callback);
108  // Increase pool size or notify as needed
109  if (threads_waiting_ == 0) {
110  // Kick off a new thread
111  nthreads_++;
112  new DynamicThread(this);
113  } else {
114  cv_.Signal();
115  }
116  // Also use this chance to harvest dead threads
117  if (!dead_threads_.empty()) {
119  }
120 }
121 
122 } // namespace grpc
grpc::DynamicThreadPool::DynamicThread::DynamicThread
DynamicThread(DynamicThreadPool *pool)
Definition: dynamic_thread_pool.cc:25
grpc::DynamicThreadPool::callbacks_
std::queue< std::function< void()> > callbacks_
Definition: dynamic_thread_pool.h:54
grpc::DynamicThreadPool::Add
void Add(const std::function< void()> &callback) override
Definition: dynamic_thread_pool.cc:104
grpc::DynamicThreadPool::~DynamicThreadPool
~DynamicThreadPool() override
Definition: dynamic_thread_pool.cc:94
grpc
Definition: grpcpp/alarm.h:33
grpc::DynamicThreadPool::threads_waiting_
int threads_waiting_
Definition: dynamic_thread_pool.h:57
false
#define false
Definition: setup_once.h:323
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_core::ReleasableMutexLock::Release
void Release() ABSL_UNLOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:115
pool_
DescriptorPool pool_
Definition: bloaty/third_party/protobuf/src/google/protobuf/compiler/parser_unittest.cc:181
grpc::DynamicThreadPool::dead_threads_
std::list< DynamicThread * > dead_threads_
Definition: dynamic_thread_pool.h:58
grpc::DynamicThreadPool::cv_
grpc_core::CondVar cv_
Definition: dynamic_thread_pool.h:51
grpc::DynamicThreadPool::reserve_threads_
int reserve_threads_
Definition: dynamic_thread_pool.h:55
grpc_core::CondVar::SignalAll
void SignalAll()
Definition: src/core/lib/gprpp/sync.h:135
grpc::DynamicThreadPool::DynamicThread::ThreadFunc
void ThreadFunc()
Definition: dynamic_thread_pool.cc:37
grpc::DynamicThreadPool
Definition: dynamic_thread_pool.h:32
grpc_core::ReleasableMutexLock
Definition: src/core/lib/gprpp/sync.h:102
grpc_core::CondVar::Signal
void Signal()
Definition: src/core/lib/gprpp/sync.h:134
grpc::DynamicThreadPool::shutdown_
bool shutdown_
Definition: dynamic_thread_pool.h:53
grpc::DynamicThreadPool::ReapThreads
static void ReapThreads(std::list< DynamicThread * > *tlist)
Definition: dynamic_thread_pool.cc:88
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
grpc::DynamicThreadPool::DynamicThread::~DynamicThread
~DynamicThread()
Definition: dynamic_thread_pool.cc:35
dynamic_thread_pool.h
grpc_core::CondVar::Wait
void Wait(Mutex *mu)
Definition: src/core/lib/gprpp/sync.h:137
thd.h
grpc::DynamicThreadPool::ThreadFunc
void ThreadFunc()
Definition: dynamic_thread_pool.cc:50
grpc::DynamicThreadPool::nthreads_
int nthreads_
Definition: dynamic_thread_pool.h:56
pool
InternalDescriptorPool * pool
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:807
grpc::DynamicThreadPool::DynamicThreadPool
DynamicThreadPool(int reserve_threads)
Definition: dynamic_thread_pool.cc:76
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
grpc::DynamicThreadPool::shutdown_cv_
grpc_core::CondVar shutdown_cv_
Definition: dynamic_thread_pool.h:52
grpc::DynamicThreadPool::mu_
grpc_core::Mutex mu_
Definition: dynamic_thread_pool.h:50
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
grpc::DynamicThreadPool::DynamicThread
Definition: dynamic_thread_pool.h:40


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:14