threadpool.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 namespace grpc_core {
24 
26  while (true) {
27  void* elem;
28 
30  // Updates stats and print
32  elem = queue_->Get(&wait_time);
35  "ThreadPool Worker [%s %d] Stats: sleep_time %f",
37  } else {
38  elem = queue_->Get(nullptr);
39  }
40  if (elem == nullptr) {
41  break;
42  }
43  // Runs closure
44  auto* closure = static_cast<grpc_completion_queue_functor*>(elem);
45  closure->functor_run(closure, closure->internal_success);
46  }
47 }
48 
50  // All worker threads in thread pool must be joinable.
52 
53  // Create at least 1 worker thread.
54  if (num_threads_ <= 0) num_threads_ = 1;
55 
56  queue_ = new InfLenFIFOQueue();
57  threads_ = static_cast<ThreadPoolWorker**>(
59  for (int i = 0; i < num_threads_; ++i) {
61  threads_[i]->Start();
62  }
63 }
64 
66 #if defined(__ANDROID__) || defined(__APPLE__)
67  return 1952 * 1024;
68 #else
69  return 64 * 1024;
70 #endif
71 }
72 
74  // For debug checking purpose, using RELAXED order is sufficient.
75  GPR_DEBUG_ASSERT(!shut_down_.load(std::memory_order_relaxed));
76 }
77 
79  thd_name_ = "ThreadPoolWorker";
83 }
84 
85 ThreadPool::ThreadPool(int num_threads, const char* thd_name)
86  : num_threads_(num_threads), thd_name_(thd_name) {
90 }
91 
92 ThreadPool::ThreadPool(int num_threads, const char* thd_name,
93  const Thread::Options& thread_options)
94  : num_threads_(num_threads),
95  thd_name_(thd_name),
96  thread_options_(thread_options) {
97  if (thread_options_.stack_size() == 0) {
99  }
101 }
102 
104  // For debug checking purpose, using RELAXED order is sufficient.
105  shut_down_.store(true, std::memory_order_relaxed);
106 
107  for (int i = 0; i < num_threads_; ++i) {
108  queue_->Put(nullptr);
109  }
110 
111  for (int i = 0; i < num_threads_; ++i) {
112  threads_[i]->Join();
113  }
114 
115  for (int i = 0; i < num_threads_; ++i) {
116  delete threads_[i];
117  }
119  delete queue_;
120 }
121 
124  queue_->Put(static_cast<void*>(closure));
125 }
126 
128 
130 
132  return thread_options_;
133 }
134 
135 const char* ThreadPool::thread_name() const { return thd_name_; }
136 } // namespace grpc_core
grpc_core::ThreadPool::DefaultStackSize
size_t DefaultStackSize()
Definition: threadpool.cc:65
gpr_timespec_to_micros
GPRAPI double gpr_timespec_to_micros(gpr_timespec t)
Definition: src/core/lib/gpr/time.cc:237
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
grpc_core::ThreadPoolWorker::index_
int index_
Definition: src/core/lib/iomgr/executor/threadpool.h:91
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::ThreadPool::thread_name
const char * thread_name() const override
Definition: threadpool.cc:135
grpc_core::InfLenFIFOQueue
Definition: mpmcqueue.h:52
grpc_core::ThreadPoolWorker::Stats::sleep_time
gpr_timespec sleep_time
Definition: src/core/lib/iomgr/executor/threadpool.h:81
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
grpc_core::ThreadPool::threads_
ThreadPoolWorker ** threads_
Definition: src/core/lib/iomgr/executor/threadpool.h:134
gpr_time_0
GPRAPI gpr_timespec gpr_time_0(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:47
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::ThreadPool::ThreadPool
ThreadPool(int num_threads)
Definition: threadpool.cc:78
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
grpc_core::ThreadPoolWorker::Run
void Run()
Definition: threadpool.cc:25
grpc_core::MPMCQueueInterface::count
virtual int count() const =0
grpc_core::ThreadPool::num_pending_closures
int num_pending_closures() const override
Definition: threadpool.cc:127
grpc_core::grpc_thread_pool_trace
DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool")
Definition: mpmcqueue.h:31
grpc_core::ThreadPool::thd_name_
const char * thd_name_
Definition: src/core/lib/iomgr/executor/threadpool.h:132
grpc_core::ThreadPoolWorker::Join
void Join()
Definition: src/core/lib/iomgr/executor/threadpool.h:76
grpc_core::ThreadPool::~ThreadPool
~ThreadPool() override
Definition: threadpool.cc:103
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_core::ThreadPool::thread_options
const Thread::Options & thread_options() const override
Definition: threadpool.cc:131
gpr_zalloc
GPRAPI void * gpr_zalloc(size_t size)
Definition: alloc.cc:40
grpc_core::ThreadPool::SharedThreadPoolConstructor
void SharedThreadPoolConstructor()
Definition: threadpool.cc:49
grpc_core::ThreadPoolWorker
Definition: src/core/lib/iomgr/executor/threadpool.h:63
threadpool.h
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_core::ThreadPool::shut_down_
std::atomic< bool > shut_down_
Definition: src/core/lib/iomgr/executor/threadpool.h:137
grpc_core::Thread::Options::set_stack_size
Options & set_stack_size(size_t bytes)
Definition: thd.h:64
grpc_core::Thread::Options::stack_size
size_t stack_size() const
Definition: thd.h:68
grpc_core::Thread::Options
Definition: thd.h:45
grpc_core::MPMCQueueInterface::Get
virtual void * Get(gpr_timespec *wait_time)=0
grpc_core::Thread::Options::set_joinable
Options & set_joinable(bool joinable)
Set whether the thread is joinable or detached.
Definition: thd.h:49
grpc_core::ThreadPoolWorker::thd_name_
const char * thd_name_
Definition: src/core/lib/iomgr/executor/threadpool.h:90
grpc_core::ThreadPool::thread_options_
Thread::Options thread_options_
Definition: src/core/lib/iomgr/executor/threadpool.h:133
grpc_core::ThreadPool::AssertHasNotBeenShutDown
void AssertHasNotBeenShutDown()
Definition: threadpool.cc:73
grpc_core::ThreadPoolWorker::queue_
MPMCQueueInterface * queue_
Definition: src/core/lib/iomgr/executor/threadpool.h:87
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
num_threads
static volatile int num_threads
Definition: benchmark-thread.c:30
grpc_core::ThreadPool::queue_
MPMCQueueInterface * queue_
Definition: src/core/lib/iomgr/executor/threadpool.h:135
grpc_completion_queue_functor
Definition: grpc_types.h:773
grpc_core::ThreadPool::Add
void Add(grpc_completion_queue_functor *closure) override
Definition: threadpool.cc:122
closure
Definition: proxy.cc:59
grpc_core::ThreadPoolWorker::stats_
Stats stats_
Definition: src/core/lib/iomgr/executor/threadpool.h:89
grpc_core::ThreadPoolWorker::Start
void Start()
Definition: src/core/lib/iomgr/executor/threadpool.h:75
gpr_timespec
Definition: gpr_types.h:50
grpc_core::ThreadPool::num_threads_
int num_threads_
Definition: src/core/lib/iomgr/executor/threadpool.h:131
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
grpc_core::MPMCQueueInterface::Put
virtual void Put(void *elem)=0
grpc_core::ThreadPool::pool_capacity
int pool_capacity() const override
Definition: threadpool.cc:129
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:37