thd_posix.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* Posix implementation for gpr threads. */
20 
22 
24 #include <grpc/support/time.h>
25 
26 #ifdef GPR_POSIX_SYNC
27 
28 #include <pthread.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <unistd.h>
32 
33 #include <grpc/support/log.h>
34 #include <grpc/support/sync.h>
35 #include <grpc/support/thd_id.h>
36 
39 #include "src/core/lib/gprpp/thd.h"
40 
41 namespace grpc_core {
42 namespace {
43 class ThreadInternalsPosix;
44 
45 struct thd_arg {
46  ThreadInternalsPosix* thread;
47  void (*body)(void* arg); /* body of a thread */
48  void* arg; /* argument to a thread */
49  const char* name; /* name of thread. Can be nullptr. */
50  bool joinable;
51  bool tracked;
52 };
53 
54 size_t RoundUpToPageSize(size_t size) {
55  // TODO(yunjiaw): Change this variable (page_size) to a function-level static
56  // when possible
57  size_t page_size = static_cast<size_t>(sysconf(_SC_PAGESIZE));
58  return (size + page_size - 1) & ~(page_size - 1);
59 }
60 
61 // Returns the minimum valid stack size that can be passed to
62 // pthread_attr_setstacksize.
63 size_t MinValidStackSize(size_t request_size) {
64  size_t min_stacksize = sysconf(_SC_THREAD_STACK_MIN);
65  if (request_size < min_stacksize) {
66  request_size = min_stacksize;
67  }
68 
69  // On some systems, pthread_attr_setstacksize() can fail if stacksize is
70  // not a multiple of the system page size.
71  return RoundUpToPageSize(request_size);
72 }
73 
74 class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
75  public:
76  ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
77  void* arg, bool* success, const Thread::Options& options)
78  : started_(false) {
79  gpr_mu_init(&mu_);
80  gpr_cv_init(&ready_);
81  pthread_attr_t attr;
82  /* don't use gpr_malloc as we may cause an infinite recursion with
83  * the profiling code */
84  thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info)));
85  GPR_ASSERT(info != nullptr);
86  info->thread = this;
87  info->body = thd_body;
88  info->arg = arg;
89  info->name = thd_name;
90  info->joinable = options.joinable();
91  info->tracked = options.tracked();
92  if (options.tracked()) {
94  }
95 
96  GPR_ASSERT(pthread_attr_init(&attr) == 0);
97  if (options.joinable()) {
98  GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
99  0);
100  } else {
101  GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ==
102  0);
103  }
104 
105  if (options.stack_size() != 0) {
106  size_t stack_size = MinValidStackSize(options.stack_size());
107  GPR_ASSERT(pthread_attr_setstacksize(&attr, stack_size) == 0);
108  }
109 
110  *success = (pthread_create(
111  &pthread_id_, &attr,
112  [](void* v) -> void* {
113  thd_arg arg = *static_cast<thd_arg*>(v);
114  free(v);
115  if (arg.name != nullptr) {
116 #if GPR_APPLE_PTHREAD_NAME
117  /* Apple supports 64 characters, and will
118  * truncate if it's longer. */
119  pthread_setname_np(arg.name);
120 #elif GPR_LINUX_PTHREAD_NAME
121  /* Linux supports 16 characters max, and will
122  * error if it's longer. */
123  char buf[16];
124  size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
125  strncpy(buf, arg.name, buf_len);
126  buf[buf_len] = '\0';
127  pthread_setname_np(pthread_self(), buf);
128 #endif // GPR_APPLE_PTHREAD_NAME
129  }
130 
131  gpr_mu_lock(&arg.thread->mu_);
132  while (!arg.thread->started_) {
133  gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_,
135  }
136  gpr_mu_unlock(&arg.thread->mu_);
137 
138  if (!arg.joinable) {
139  delete arg.thread;
140  }
141 
142  (*arg.body)(arg.arg);
143  if (arg.tracked) {
145  }
146  return nullptr;
147  },
148  info) == 0);
149 
150  GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
151 
152  if (!(*success)) {
153  /* don't use gpr_free, as this was allocated using malloc (see above) */
154  free(info);
155  if (options.tracked()) {
157  }
158  }
159  }
160 
161  ~ThreadInternalsPosix() override {
163  gpr_cv_destroy(&ready_);
164  }
165 
166  void Start() override {
167  gpr_mu_lock(&mu_);
168  started_ = true;
169  gpr_cv_signal(&ready_);
170  gpr_mu_unlock(&mu_);
171  }
172 
173  void Join() override { pthread_join(pthread_id_, nullptr); }
174 
175  private:
176  gpr_mu mu_;
177  gpr_cv ready_;
178  bool started_;
179  pthread_t pthread_id_;
180 };
181 
182 } // namespace
183 
184 Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
185  bool* success, const Options& options)
186  : options_(options) {
187  bool outcome = false;
188  impl_ = new ThreadInternalsPosix(thd_name, thd_body, arg, &outcome, options);
189  if (outcome) {
190  state_ = ALIVE;
191  } else {
192  state_ = FAILED;
193  delete impl_;
194  impl_ = nullptr;
195  }
196 
197  if (success != nullptr) {
198  *success = outcome;
199  }
200 }
201 } // namespace grpc_core
202 
203 // The following is in the external namespace as it is exposed as C89 API
205  // Use C-style casting because Linux and OSX have different definitions
206  // of pthread_t so that a single C++ cast doesn't handle it.
207  // NOLINTNEXTLINE(google-readability-casting)
208  return (gpr_thd_id)pthread_self();
209 }
210 
211 #endif /* GPR_POSIX_SYNC */
gpr_cv_signal
GPRAPI void gpr_cv_signal(gpr_cv *cv)
fork.h
grpc_core::Fork::IncThreadCount
static void IncThreadCount()
Definition: fork.cc:219
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
log.h
false
#define false
Definition: setup_once.h:323
grpc_core
Definition: call_metric_recorder.h:31
gpr_cv
pthread_cond_t gpr_cv
Definition: impl/codegen/sync_posix.h:48
string.h
options
double_dict options[]
Definition: capstone_test.c:55
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
useful.h
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
FAILED
@ FAILED
Definition: alts_tsi_handshaker_test.cc:85
setup.name
name
Definition: setup.py:542
thd_id.h
time.h
mu_
Mutex mu_
Definition: oob_backend_metric.cc:115
gpr_thd_id
uintptr_t gpr_thd_id
Definition: thd_id.h:35
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
gpr_thd_currentid
GPRAPI gpr_thd_id gpr_thd_currentid(void)
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
arg::name
const char * name
Definition: cmdline.cc:41
gpr_cv_destroy
GPRAPI void gpr_cv_destroy(gpr_cv *cv)
impl_
std::shared_ptr< ExternalConnectionAcceptorImpl > impl_
Definition: external_connection_acceptor_impl.cc:43
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
setup.v
v
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:42
started_
bool started_
Definition: xds_cluster_impl.cc:357
arg
Definition: cmdline.cc:40
gpr_cv_wait
GPRAPI int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline)
grpc_core::Thread::Thread
Thread()
Definition: thd.h:78
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_core::Fork::DecThreadCount
static void DecThreadCount()
Definition: fork.cc:225
gpr_types.h
attr
OPENSSL_EXPORT X509_ATTRIBUTE * attr
Definition: x509.h:1666
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
options_
DebugStringOptions options_
Definition: bloaty/third_party/protobuf/src/google/protobuf/descriptor.cc:2390
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
thd.h
arg
struct arg arg
state_
grpc_connectivity_state state_
Definition: channel_connectivity.cc:213
size
voidpf void uLong size
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
sync.h
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
gpr_cv_init
GPRAPI void gpr_cv_init(gpr_cv *cv)
grpc_core::Join
promise_detail::Join< Promise... > Join(Promise... promises)
Definition: join.h:49
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:35