bloaty/third_party/abseil-cpp/absl/synchronization/internal/waiter.cc
Go to the documentation of this file.
1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "absl/synchronization/internal/waiter.h"
16 
17 #include "absl/base/config.h"
18 
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26 
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31 
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35 
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39 
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45 
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50 
51 
52 namespace absl {
54 namespace synchronization_internal {
55 
56 static void MaybeBecomeIdle() {
59  assert(identity != nullptr);
60  const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
61  const int ticker = identity->ticker.load(std::memory_order_relaxed);
62  const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
63  if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
64  identity->is_idle.store(true, std::memory_order_relaxed);
65  }
66 }
67 
68 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
69 
71  futex_.store(0, std::memory_order_relaxed);
72 }
73 
74 Waiter::~Waiter() = default;
75 
76 bool Waiter::Wait(KernelTimeout t) {
77  // Loop until we can atomically decrement futex from a positive
78  // value, waiting on a futex while we believe it is zero.
79  // Note that, since the thread ticker is just reset, we don't need to check
80  // whether the thread is idle on the very first pass of the loop.
81  bool first_pass = true;
82 
83  while (true) {
84  int32_t x = futex_.load(std::memory_order_relaxed);
85  while (x != 0) {
86  if (!futex_.compare_exchange_weak(x, x - 1,
87  std::memory_order_acquire,
88  std::memory_order_relaxed)) {
89  continue; // Raced with someone, retry.
90  }
91  return true; // Consumed a wakeup, we are done.
92  }
93 
94  if (!first_pass) MaybeBecomeIdle();
95  const int err = Futex::WaitUntil(&futex_, 0, t);
96  if (err != 0) {
97  if (err == -EINTR || err == -EWOULDBLOCK) {
98  // Do nothing, the loop will retry.
99  } else if (err == -ETIMEDOUT) {
100  return false;
101  } else {
102  ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
103  }
104  }
105  first_pass = false;
106  }
107 }
108 
109 void Waiter::Post() {
110  if (futex_.fetch_add(1, std::memory_order_release) == 0) {
111  // We incremented from 0, need to wake a potential waiter.
112  Poke();
113  }
114 }
115 
116 void Waiter::Poke() {
117  // Wake one thread waiting on the futex.
118  const int err = Futex::Wake(&futex_, 1);
119  if (ABSL_PREDICT_FALSE(err < 0)) {
120  ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
121  }
122 }
123 
124 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
125 
126 class PthreadMutexHolder {
127  public:
128  explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
129  const int err = pthread_mutex_lock(mu_);
130  if (err != 0) {
131  ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
132  }
133  }
134 
135  PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
136  PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
137 
138  ~PthreadMutexHolder() {
139  const int err = pthread_mutex_unlock(mu_);
140  if (err != 0) {
141  ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
142  }
143  }
144 
145  private:
146  pthread_mutex_t *mu_;
147 };
148 
149 Waiter::Waiter() {
150  const int err = pthread_mutex_init(&mu_, 0);
151  if (err != 0) {
152  ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
153  }
154 
155  const int err2 = pthread_cond_init(&cv_, 0);
156  if (err2 != 0) {
157  ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
158  }
159 
160  waiter_count_ = 0;
161  wakeup_count_ = 0;
162 }
163 
164 Waiter::~Waiter() {
165  const int err = pthread_mutex_destroy(&mu_);
166  if (err != 0) {
167  ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
168  }
169 
170  const int err2 = pthread_cond_destroy(&cv_);
171  if (err2 != 0) {
172  ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
173  }
174 }
175 
176 bool Waiter::Wait(KernelTimeout t) {
177  struct timespec abs_timeout;
178  if (t.has_timeout()) {
179  abs_timeout = t.MakeAbsTimespec();
180  }
181 
182  PthreadMutexHolder h(&mu_);
183  ++waiter_count_;
184  // Loop until we find a wakeup to consume or timeout.
185  // Note that, since the thread ticker is just reset, we don't need to check
186  // whether the thread is idle on the very first pass of the loop.
187  bool first_pass = true;
188  while (wakeup_count_ == 0) {
189  if (!first_pass) MaybeBecomeIdle();
190  // No wakeups available, time to wait.
191  if (!t.has_timeout()) {
192  const int err = pthread_cond_wait(&cv_, &mu_);
193  if (err != 0) {
194  ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
195  }
196  } else {
197  const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
198  if (err == ETIMEDOUT) {
199  --waiter_count_;
200  return false;
201  }
202  if (err != 0) {
203  ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
204  }
205  }
206  first_pass = false;
207  }
208  // Consume a wakeup and we're done.
209  --wakeup_count_;
210  --waiter_count_;
211  return true;
212 }
213 
214 void Waiter::Post() {
215  PthreadMutexHolder h(&mu_);
216  ++wakeup_count_;
217  InternalCondVarPoke();
218 }
219 
220 void Waiter::Poke() {
221  PthreadMutexHolder h(&mu_);
222  InternalCondVarPoke();
223 }
224 
225 void Waiter::InternalCondVarPoke() {
226  if (waiter_count_ != 0) {
227  const int err = pthread_cond_signal(&cv_);
228  if (ABSL_PREDICT_FALSE(err != 0)) {
229  ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
230  }
231  }
232 }
233 
234 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
235 
236 Waiter::Waiter() {
237  if (sem_init(&sem_, 0, 0) != 0) {
238  ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
239  }
240  wakeups_.store(0, std::memory_order_relaxed);
241 }
242 
243 Waiter::~Waiter() {
244  if (sem_destroy(&sem_) != 0) {
245  ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
246  }
247 }
248 
249 bool Waiter::Wait(KernelTimeout t) {
250  struct timespec abs_timeout;
251  if (t.has_timeout()) {
252  abs_timeout = t.MakeAbsTimespec();
253  }
254 
255  // Loop until we timeout or consume a wakeup.
256  // Note that, since the thread ticker is just reset, we don't need to check
257  // whether the thread is idle on the very first pass of the loop.
258  bool first_pass = true;
259  while (true) {
260  int x = wakeups_.load(std::memory_order_relaxed);
261  while (x != 0) {
262  if (!wakeups_.compare_exchange_weak(x, x - 1,
263  std::memory_order_acquire,
264  std::memory_order_relaxed)) {
265  continue; // Raced with someone, retry.
266  }
267  // Successfully consumed a wakeup, we're done.
268  return true;
269  }
270 
271  if (!first_pass) MaybeBecomeIdle();
272  // Nothing to consume, wait (looping on EINTR).
273  while (true) {
274  if (!t.has_timeout()) {
275  if (sem_wait(&sem_) == 0) break;
276  if (errno == EINTR) continue;
277  ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
278  } else {
279  if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
280  if (errno == EINTR) continue;
281  if (errno == ETIMEDOUT) return false;
282  ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
283  }
284  }
285  first_pass = false;
286  }
287 }
288 
289 void Waiter::Post() {
290  // Post a wakeup.
291  if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
292  // We incremented from 0, need to wake a potential waiter.
293  Poke();
294  }
295 }
296 
297 void Waiter::Poke() {
298  if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
299  ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
300  }
301 }
302 
303 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
304 
305 class Waiter::WinHelper {
306  public:
307  static SRWLOCK *GetLock(Waiter *w) {
308  return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
309  }
310 
311  static CONDITION_VARIABLE *GetCond(Waiter *w) {
312  return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
313  }
314 
315  static_assert(sizeof(SRWLOCK) == sizeof(void *),
316  "`mu_storage_` does not have the same size as SRWLOCK");
317  static_assert(alignof(SRWLOCK) == alignof(void *),
318  "`mu_storage_` does not have the same alignment as SRWLOCK");
319 
320  static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
321  "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
322  "as `CONDITION_VARIABLE`");
323  static_assert(
324  alignof(CONDITION_VARIABLE) == alignof(void *),
325  "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
326 
327  // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
328  // and destructible because we never call their constructors or destructors.
330  "The `SRWLOCK` type must be trivially constructible");
331  static_assert(
333  "The `CONDITION_VARIABLE` type must be trivially constructible");
335  "The `SRWLOCK` type must be trivially destructible");
337  "The `CONDITION_VARIABLE` type must be trivially destructible");
338 };
339 
340 class LockHolder {
341  public:
342  explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
343  AcquireSRWLockExclusive(mu_);
344  }
345 
346  LockHolder(const LockHolder&) = delete;
347  LockHolder& operator=(const LockHolder&) = delete;
348 
349  ~LockHolder() {
350  ReleaseSRWLockExclusive(mu_);
351  }
352 
353  private:
354  SRWLOCK* mu_;
355 };
356 
357 Waiter::Waiter() {
358  auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
359  auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
360  InitializeSRWLock(mu);
361  InitializeConditionVariable(cv);
362  waiter_count_ = 0;
363  wakeup_count_ = 0;
364 }
365 
366 // SRW locks and condition variables do not need to be explicitly destroyed.
367 // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
368 // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
369 Waiter::~Waiter() = default;
370 
371 bool Waiter::Wait(KernelTimeout t) {
372  SRWLOCK *mu = WinHelper::GetLock(this);
373  CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
374 
375  LockHolder h(mu);
376  ++waiter_count_;
377 
378  // Loop until we find a wakeup to consume or timeout.
379  // Note that, since the thread ticker is just reset, we don't need to check
380  // whether the thread is idle on the very first pass of the loop.
381  bool first_pass = true;
382  while (wakeup_count_ == 0) {
383  if (!first_pass) MaybeBecomeIdle();
384  // No wakeups available, time to wait.
385  if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
386  // GetLastError() returns a Win32 DWORD, but we assign to
387  // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
388  // initialization guarantees this is not a narrowing conversion.
389  const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
390  if (err == ERROR_TIMEOUT) {
391  --waiter_count_;
392  return false;
393  } else {
394  ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
395  }
396  }
397  first_pass = false;
398  }
399  // Consume a wakeup and we're done.
400  --wakeup_count_;
401  --waiter_count_;
402  return true;
403 }
404 
405 void Waiter::Post() {
406  LockHolder h(WinHelper::GetLock(this));
407  ++wakeup_count_;
408  InternalCondVarPoke();
409 }
410 
411 void Waiter::Poke() {
412  LockHolder h(WinHelper::GetLock(this));
413  InternalCondVarPoke();
414 }
415 
416 void Waiter::InternalCondVarPoke() {
417  if (waiter_count_ != 0) {
418  WakeConditionVariable(WinHelper::GetCond(this));
419  }
420 }
421 
422 #else
423 #error Unknown ABSL_WAITER_MODE
424 #endif
425 
426 } // namespace synchronization_internal
428 } // namespace absl
ABSL_PREDICT_FALSE
#define ABSL_PREDICT_FALSE(x)
Definition: abseil-cpp/absl/base/optimization.h:180
absl::base_internal::ThreadIdentity
Definition: abseil-cpp/absl/base/internal/thread_identity.h:137
absl::base_internal::ThreadIdentity::ticker
std::atomic< int > ticker
Definition: abseil-cpp/absl/base/internal/thread_identity.h:155
cv_
std::condition_variable cv_
Definition: client_callback_end2end_test.cc:733
absl::synchronization_internal::Waiter::Post
void Post()
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:107
absl::base_internal::ThreadIdentity::wait_start
std::atomic< int > wait_start
Definition: abseil-cpp/absl/base/internal/thread_identity.h:156
absl::synchronization_internal::Waiter::Waiter
Waiter()
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:70
error_ref_leak.err
err
Definition: error_ref_leak.py:35
sem_wait
int sem_wait(UV_PLATFORM_SEM_T *semid)
Definition: os390-syscalls.c:583
grpc::internal::WaitUntil
static void WaitUntil(CondVar *cv, Mutex *mu, Predicate pred)
Definition: include/grpcpp/impl/codegen/sync.h:149
CONDITION_VARIABLE
PVOID CONDITION_VARIABLE
Definition: win.h:203
env.new
def new
Definition: env.py:51
ABSL_NAMESPACE_END
#define ABSL_NAMESPACE_END
Definition: third_party/abseil-cpp/absl/base/config.h:171
wakeups_
std::vector< uint32_t > wakeups_
Definition: filter_fuzzer.cc:572
mu_
Mutex mu_
Definition: oob_backend_metric.cc:115
sem_post
int sem_post(UV_PLATFORM_SEM_T *semid)
Definition: os390-syscalls.c:573
absl::base_internal::CurrentThreadIdentityIfPresent
ThreadIdentity * CurrentThreadIdentityIfPresent()
Definition: abseil-cpp/absl/base/internal/thread_identity.cc:128
absl::synchronization_internal::Waiter::kIdlePeriods
static constexpr int kIdlePeriods
Definition: abseil-cpp/absl/synchronization/internal/waiter.h:97
ABSL_NAMESPACE_BEGIN
#define ABSL_NAMESPACE_BEGIN
Definition: third_party/abseil-cpp/absl/base/config.h:170
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
absl::synchronization_internal::MaybeBecomeIdle
static void MaybeBecomeIdle()
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:56
x
int x
Definition: bloaty/third_party/googletest/googlemock/test/gmock-matchers_test.cc:3610
sem_destroy
int sem_destroy(UV_PLATFORM_SEM_T *semid)
Definition: os390-syscalls.c:568
value
const char * value
Definition: hpack_parser_table.cc:165
FATAL
#define FATAL(msg)
Definition: task.h:88
absl::synchronization_internal::Waiter::~Waiter
~Waiter()=delete
cv
unsigned cv
Definition: cxa_demangle.cpp:4908
sem_init
int sem_init(UV_PLATFORM_SEM_T *semid, int pshared, unsigned int value)
Definition: os390-syscalls.c:563
absl::str_format_internal::LengthMod::t
@ t
absl::synchronization_internal::Waiter::Poke
void Poke()
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:114
absl::synchronization_internal::Waiter::Wait
bool Wait(KernelTimeout t)
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:74
SRWLOCK
RTL_SRWLOCK SRWLOCK
Definition: win.h:174
absl
Definition: abseil-cpp/absl/algorithm/algorithm.h:31
int32_t
signed int int32_t
Definition: stdint-msvc2008.h:77
absl::str_format_internal::LengthMod::h
@ h
ABSL_RAW_LOG
#define ABSL_RAW_LOG(severity,...)
Definition: abseil-cpp/absl/base/internal/raw_logging.h:44
errno.h
absl::base_internal::ThreadIdentity::is_idle
std::atomic< bool > is_idle
Definition: abseil-cpp/absl/base/internal/thread_identity.h:157


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:52