thread_pool.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include "src/core/lib/gprpp/thd.h"
24 
25 namespace grpc_event_engine {
26 namespace iomgr_engine {
27 
29  : pool_(pool),
30  thd_(
31  "iomgr_eventengine_pool",
32  [](void* th) { static_cast<ThreadPool::Thread*>(th)->ThreadFunc(); },
33  this, nullptr, grpc_core::Thread::Options().set_tracked(false)) {
34  thd_.Start();
35 }
36 ThreadPool::Thread::~Thread() { thd_.Join(); }
37 
39  pool_->ThreadFunc();
40  // Now that we have killed ourselves, we should reduce the thread count
41  grpc_core::MutexLock lock(&pool_->mu_);
42  pool_->nthreads_--;
43  // Move ourselves to dead list
44  pool_->dead_threads_.push_back(this);
45 
46  if (pool_->nthreads_ == 0) {
47  if (pool_->forking_) pool_->fork_cv_.Signal();
48  if (pool_->shutdown_) pool_->shutdown_cv_.Signal();
49  }
50 }
51 
53  for (;;) {
54  // Wait until work is available or we are shutting down.
56  if (!forking_ && !shutdown_ && callbacks_.empty()) {
57  // If there are too many threads waiting, then quit this thread
59  break;
60  }
62  cv_.Wait(&mu_);
64  }
65  // a fork could be initiated while the thread was waiting
66  if (forking_) return;
67  // Drain callbacks before considering shutdown to ensure all work
68  // gets completed.
69  if (!callbacks_.empty()) {
70  auto cb = callbacks_.front();
71  callbacks_.pop();
72  lock.Release();
73  cb();
74  } else if (shutdown_) {
75  break;
76  }
77  }
78 }
79 
80 ThreadPool::ThreadPool(int reserve_threads)
81  : shutdown_(false),
82  reserve_threads_(reserve_threads),
83  nthreads_(0),
85  forking_(false) {
88 }
89 
91  for (int i = 0; i < n; i++) {
92  nthreads_++;
93  new Thread(this);
94  }
95 }
96 
97 void ThreadPool::ReapThreads(std::vector<Thread*>* tlist) {
98  for (auto* t : *tlist) delete t;
99  tlist->clear();
100 }
101 
103  grpc_core::MutexLock lock(&mu_);
104  shutdown_ = true;
105  cv_.SignalAll();
106  while (nthreads_ != 0) {
108  }
110 }
111 
112 void ThreadPool::Add(const std::function<void()>& callback) {
113  grpc_core::MutexLock lock(&mu_);
114  // Add works to the callbacks list
115  callbacks_.push(callback);
116  // Store the callback for later if we are forking.
117  // TODO(hork): should we block instead?
118  if (forking_) return;
119  // Increase pool size or notify as needed
120  if (threads_waiting_ == 0) {
121  // Kick off a new thread
122  nthreads_++;
123  new Thread(this);
124  } else {
125  cv_.Signal();
126  }
127  // Also use this chance to harvest dead threads
128  if (!dead_threads_.empty()) {
130  }
131 }
132 
134  grpc_core::MutexLock lock(&mu_);
135  forking_ = true;
136  cv_.SignalAll();
137  while (nthreads_ != 0) {
138  fork_cv_.Wait(&mu_);
139  }
141 }
142 
144  grpc_core::MutexLock lock(&mu_);
145  forking_ = false;
147 }
148 
150  grpc_core::MutexLock lock(&mu_);
151  forking_ = false;
153 }
154 
155 } // namespace iomgr_engine
156 } // namespace grpc_event_engine
grpc_event_engine::iomgr_engine::ThreadPool::callbacks_
std::queue< std::function< void()> > callbacks_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:70
grpc_event_engine::iomgr_engine::ThreadPool::fork_cv_
grpc_core::CondVar fork_cv_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:68
grpc_event_engine::iomgr_engine::ThreadPool::Add
void Add(const std::function< void()> &callback)
Definition: thread_pool.cc:112
grpc_event_engine::iomgr_engine::ThreadPool::Thread::Thread
Thread(ThreadPool *pool)
Definition: thread_pool.cc:28
false
#define false
Definition: setup_once.h:323
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_core::ReleasableMutexLock::Release
void Release() ABSL_UNLOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:115
grpc_event_engine::iomgr_engine::ThreadPool::nthreads_
int nthreads_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:72
grpc_event_engine::iomgr_engine::ThreadPool::forking_
bool forking_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:75
pool_
DescriptorPool pool_
Definition: bloaty/third_party/protobuf/src/google/protobuf/compiler/parser_unittest.cc:181
grpc_event_engine::iomgr_engine::ThreadPool::ThreadFunc
void ThreadFunc()
Definition: thread_pool.cc:52
grpc_event_engine::iomgr_engine::ThreadPool::Thread::ThreadFunc
void ThreadFunc()
Definition: thread_pool.cc:38
grpc_event_engine::iomgr_engine::ThreadPool::StartNThreadsLocked
void StartNThreadsLocked(int n) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_)
Definition: thread_pool.cc:90
grpc_core::CondVar::SignalAll
void SignalAll()
Definition: src/core/lib/gprpp/sync.h:135
grpc_core::ReleasableMutexLock
Definition: src/core/lib/gprpp/sync.h:102
grpc_event_engine::iomgr_engine::ThreadPool::PrepareFork
void PrepareFork() override
Definition: thread_pool.cc:133
grpc_event_engine::iomgr_engine::ThreadPool::cv_
grpc_core::CondVar cv_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:66
grpc_core::CondVar::Signal
void Signal()
Definition: src/core/lib/gprpp/sync.h:134
grpc_event_engine::iomgr_engine::ThreadPool::PostforkChild
void PostforkChild() override
Definition: thread_pool.cc:149
thread_pool.h
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
grpc_core::Thread::Options
Definition: thd.h:45
grpc_event_engine::iomgr_engine::ThreadPool::Thread
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:50
grpc_event_engine::iomgr_engine::ThreadPool::Thread::~Thread
~Thread()
Definition: thread_pool.cc:36
grpc_event_engine::iomgr_engine::ThreadPool::dead_threads_
std::vector< Thread * > dead_threads_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:74
grpc_event_engine::iomgr_engine::ThreadPool::ThreadPool
ThreadPool(int reserve_threads)
Definition: thread_pool.cc:80
n
int n
Definition: abseil-cpp/absl/container/btree_test.cc:1080
grpc_event_engine::iomgr_engine::ThreadPool::~ThreadPool
~ThreadPool() override
Definition: thread_pool.cc:102
grpc_event_engine::iomgr_engine::ThreadPool::ReapThreads
static void ReapThreads(std::vector< Thread * > *tlist)
Definition: thread_pool.cc:97
grpc_event_engine::iomgr_engine::ThreadPool::mu_
grpc_core::Mutex mu_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:65
grpc_core::CondVar::Wait
void Wait(Mutex *mu)
Definition: src/core/lib/gprpp/sync.h:137
grpc_event_engine
Definition: endpoint_config.h:24
thd.h
grpc_event_engine::iomgr_engine::ThreadPool::reserve_threads_
int reserve_threads_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:71
grpc_event_engine::iomgr_engine::ThreadPool::shutdown_cv_
grpc_core::CondVar shutdown_cv_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:67
grpc_event_engine::iomgr_engine::ThreadPool::shutdown_
bool shutdown_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:69
pool
InternalDescriptorPool * pool
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:807
grpc_event_engine::iomgr_engine::ThreadPool::PostforkParent
void PostforkParent() override
Definition: thread_pool.cc:143
grpc_core::Thread::Options::set_tracked
Options & set_tracked(bool tracked)
Set whether the thread is tracked for fork support.
Definition: thd.h:56
grpc_event_engine::iomgr_engine::ThreadPool::threads_waiting_
int threads_waiting_
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:73
grpc_event_engine::iomgr_engine::ThreadPool
Definition: src/core/lib/event_engine/iomgr_engine/thread_pool.h:37
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:37