abseil-cpp/absl/synchronization/internal/waiter.cc
Go to the documentation of this file.
1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "absl/synchronization/internal/waiter.h"
16 
17 #include "absl/base/config.h"
18 
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26 
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31 
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35 
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39 
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45 
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50 
51 
52 namespace absl {
54 namespace synchronization_internal {
55 
56 static void MaybeBecomeIdle() {
59  assert(identity != nullptr);
60  const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
61  const int ticker = identity->ticker.load(std::memory_order_relaxed);
62  const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
63  if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
64  identity->is_idle.store(true, std::memory_order_relaxed);
65  }
66 }
67 
68 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
69 
71  futex_.store(0, std::memory_order_relaxed);
72 }
73 
75  // Loop until we can atomically decrement futex from a positive
76  // value, waiting on a futex while we believe it is zero.
77  // Note that, since the thread ticker is just reset, we don't need to check
78  // whether the thread is idle on the very first pass of the loop.
79  bool first_pass = true;
80 
81  while (true) {
82  int32_t x = futex_.load(std::memory_order_relaxed);
83  while (x != 0) {
84  if (!futex_.compare_exchange_weak(x, x - 1,
85  std::memory_order_acquire,
86  std::memory_order_relaxed)) {
87  continue; // Raced with someone, retry.
88  }
89  return true; // Consumed a wakeup, we are done.
90  }
91 
92  if (!first_pass) MaybeBecomeIdle();
93  const int err = Futex::WaitUntil(&futex_, 0, t);
94  if (err != 0) {
95  if (err == -EINTR || err == -EWOULDBLOCK) {
96  // Do nothing, the loop will retry.
97  } else if (err == -ETIMEDOUT) {
98  return false;
99  } else {
100  ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
101  }
102  }
103  first_pass = false;
104  }
105 }
106 
107 void Waiter::Post() {
108  if (futex_.fetch_add(1, std::memory_order_release) == 0) {
109  // We incremented from 0, need to wake a potential waiter.
110  Poke();
111  }
112 }
113 
114 void Waiter::Poke() {
115  // Wake one thread waiting on the futex.
116  const int err = Futex::Wake(&futex_, 1);
117  if (ABSL_PREDICT_FALSE(err < 0)) {
118  ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
119  }
120 }
121 
122 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
123 
124 class PthreadMutexHolder {
125  public:
126  explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
127  const int err = pthread_mutex_lock(mu_);
128  if (err != 0) {
129  ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
130  }
131  }
132 
133  PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
134  PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
135 
136  ~PthreadMutexHolder() {
137  const int err = pthread_mutex_unlock(mu_);
138  if (err != 0) {
139  ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
140  }
141  }
142 
143  private:
144  pthread_mutex_t *mu_;
145 };
146 
147 Waiter::Waiter() {
148  const int err = pthread_mutex_init(&mu_, 0);
149  if (err != 0) {
150  ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
151  }
152 
153  const int err2 = pthread_cond_init(&cv_, 0);
154  if (err2 != 0) {
155  ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
156  }
157 
158  waiter_count_ = 0;
159  wakeup_count_ = 0;
160 }
161 
162 bool Waiter::Wait(KernelTimeout t) {
163  struct timespec abs_timeout;
164  if (t.has_timeout()) {
165  abs_timeout = t.MakeAbsTimespec();
166  }
167 
168  PthreadMutexHolder h(&mu_);
169  ++waiter_count_;
170  // Loop until we find a wakeup to consume or timeout.
171  // Note that, since the thread ticker is just reset, we don't need to check
172  // whether the thread is idle on the very first pass of the loop.
173  bool first_pass = true;
174  while (wakeup_count_ == 0) {
175  if (!first_pass) MaybeBecomeIdle();
176  // No wakeups available, time to wait.
177  if (!t.has_timeout()) {
178  const int err = pthread_cond_wait(&cv_, &mu_);
179  if (err != 0) {
180  ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
181  }
182  } else {
183  const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
184  if (err == ETIMEDOUT) {
185  --waiter_count_;
186  return false;
187  }
188  if (err != 0) {
189  ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
190  }
191  }
192  first_pass = false;
193  }
194  // Consume a wakeup and we're done.
195  --wakeup_count_;
196  --waiter_count_;
197  return true;
198 }
199 
200 void Waiter::Post() {
201  PthreadMutexHolder h(&mu_);
202  ++wakeup_count_;
203  InternalCondVarPoke();
204 }
205 
206 void Waiter::Poke() {
207  PthreadMutexHolder h(&mu_);
208  InternalCondVarPoke();
209 }
210 
211 void Waiter::InternalCondVarPoke() {
212  if (waiter_count_ != 0) {
213  const int err = pthread_cond_signal(&cv_);
214  if (ABSL_PREDICT_FALSE(err != 0)) {
215  ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
216  }
217  }
218 }
219 
220 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
221 
222 Waiter::Waiter() {
223  if (sem_init(&sem_, 0, 0) != 0) {
224  ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
225  }
226  wakeups_.store(0, std::memory_order_relaxed);
227 }
228 
229 bool Waiter::Wait(KernelTimeout t) {
230  struct timespec abs_timeout;
231  if (t.has_timeout()) {
232  abs_timeout = t.MakeAbsTimespec();
233  }
234 
235  // Loop until we timeout or consume a wakeup.
236  // Note that, since the thread ticker is just reset, we don't need to check
237  // whether the thread is idle on the very first pass of the loop.
238  bool first_pass = true;
239  while (true) {
240  int x = wakeups_.load(std::memory_order_relaxed);
241  while (x != 0) {
242  if (!wakeups_.compare_exchange_weak(x, x - 1,
243  std::memory_order_acquire,
244  std::memory_order_relaxed)) {
245  continue; // Raced with someone, retry.
246  }
247  // Successfully consumed a wakeup, we're done.
248  return true;
249  }
250 
251  if (!first_pass) MaybeBecomeIdle();
252  // Nothing to consume, wait (looping on EINTR).
253  while (true) {
254  if (!t.has_timeout()) {
255  if (sem_wait(&sem_) == 0) break;
256  if (errno == EINTR) continue;
257  ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
258  } else {
259  if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
260  if (errno == EINTR) continue;
261  if (errno == ETIMEDOUT) return false;
262  ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
263  }
264  }
265  first_pass = false;
266  }
267 }
268 
269 void Waiter::Post() {
270  // Post a wakeup.
271  if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
272  // We incremented from 0, need to wake a potential waiter.
273  Poke();
274  }
275 }
276 
277 void Waiter::Poke() {
278  if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
279  ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
280  }
281 }
282 
283 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
284 
285 class Waiter::WinHelper {
286  public:
287  static SRWLOCK *GetLock(Waiter *w) {
288  return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
289  }
290 
291  static CONDITION_VARIABLE *GetCond(Waiter *w) {
292  return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
293  }
294 
295  static_assert(sizeof(SRWLOCK) == sizeof(void *),
296  "`mu_storage_` does not have the same size as SRWLOCK");
297  static_assert(alignof(SRWLOCK) == alignof(void *),
298  "`mu_storage_` does not have the same alignment as SRWLOCK");
299 
300  static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
301  "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
302  "as `CONDITION_VARIABLE`");
303  static_assert(
304  alignof(CONDITION_VARIABLE) == alignof(void *),
305  "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
306 
307  // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
308  // and destructible because we never call their constructors or destructors.
310  "The `SRWLOCK` type must be trivially constructible");
311  static_assert(
313  "The `CONDITION_VARIABLE` type must be trivially constructible");
315  "The `SRWLOCK` type must be trivially destructible");
317  "The `CONDITION_VARIABLE` type must be trivially destructible");
318 };
319 
320 class LockHolder {
321  public:
322  explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
323  AcquireSRWLockExclusive(mu_);
324  }
325 
326  LockHolder(const LockHolder&) = delete;
327  LockHolder& operator=(const LockHolder&) = delete;
328 
329  ~LockHolder() {
330  ReleaseSRWLockExclusive(mu_);
331  }
332 
333  private:
334  SRWLOCK* mu_;
335 };
336 
337 Waiter::Waiter() {
338  auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
339  auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
340  InitializeSRWLock(mu);
341  InitializeConditionVariable(cv);
342  waiter_count_ = 0;
343  wakeup_count_ = 0;
344 }
345 
346 bool Waiter::Wait(KernelTimeout t) {
347  SRWLOCK *mu = WinHelper::GetLock(this);
348  CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
349 
350  LockHolder h(mu);
351  ++waiter_count_;
352 
353  // Loop until we find a wakeup to consume or timeout.
354  // Note that, since the thread ticker is just reset, we don't need to check
355  // whether the thread is idle on the very first pass of the loop.
356  bool first_pass = true;
357  while (wakeup_count_ == 0) {
358  if (!first_pass) MaybeBecomeIdle();
359  // No wakeups available, time to wait.
360  if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
361  // GetLastError() returns a Win32 DWORD, but we assign to
362  // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
363  // initialization guarantees this is not a narrowing conversion.
364  const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
365  if (err == ERROR_TIMEOUT) {
366  --waiter_count_;
367  return false;
368  } else {
369  ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
370  }
371  }
372  first_pass = false;
373  }
374  // Consume a wakeup and we're done.
375  --wakeup_count_;
376  --waiter_count_;
377  return true;
378 }
379 
380 void Waiter::Post() {
381  LockHolder h(WinHelper::GetLock(this));
382  ++wakeup_count_;
383  InternalCondVarPoke();
384 }
385 
386 void Waiter::Poke() {
387  LockHolder h(WinHelper::GetLock(this));
388  InternalCondVarPoke();
389 }
390 
391 void Waiter::InternalCondVarPoke() {
392  if (waiter_count_ != 0) {
393  WakeConditionVariable(WinHelper::GetCond(this));
394  }
395 }
396 
397 #else
398 #error Unknown ABSL_WAITER_MODE
399 #endif
400 
401 } // namespace synchronization_internal
403 } // namespace absl
ABSL_PREDICT_FALSE
#define ABSL_PREDICT_FALSE(x)
Definition: abseil-cpp/absl/base/optimization.h:180
absl::base_internal::ThreadIdentity
Definition: abseil-cpp/absl/base/internal/thread_identity.h:137
absl::base_internal::ThreadIdentity::ticker
std::atomic< int > ticker
Definition: abseil-cpp/absl/base/internal/thread_identity.h:155
cv_
std::condition_variable cv_
Definition: client_callback_end2end_test.cc:733
absl::synchronization_internal::Waiter::Post
void Post()
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:107
absl::base_internal::ThreadIdentity::wait_start
std::atomic< int > wait_start
Definition: abseil-cpp/absl/base/internal/thread_identity.h:156
absl::synchronization_internal::Waiter::Waiter
Waiter()
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:70
error_ref_leak.err
err
Definition: error_ref_leak.py:35
sem_wait
int sem_wait(UV_PLATFORM_SEM_T *semid)
Definition: os390-syscalls.c:583
grpc::internal::WaitUntil
static void WaitUntil(CondVar *cv, Mutex *mu, Predicate pred)
Definition: include/grpcpp/impl/codegen/sync.h:149
CONDITION_VARIABLE
PVOID CONDITION_VARIABLE
Definition: win.h:203
env.new
def new
Definition: env.py:51
ABSL_NAMESPACE_END
#define ABSL_NAMESPACE_END
Definition: third_party/abseil-cpp/absl/base/config.h:171
wakeups_
std::vector< uint32_t > wakeups_
Definition: filter_fuzzer.cc:572
mu_
Mutex mu_
Definition: oob_backend_metric.cc:115
sem_post
int sem_post(UV_PLATFORM_SEM_T *semid)
Definition: os390-syscalls.c:573
absl::base_internal::CurrentThreadIdentityIfPresent
ThreadIdentity * CurrentThreadIdentityIfPresent()
Definition: abseil-cpp/absl/base/internal/thread_identity.cc:128
absl::synchronization_internal::Waiter::kIdlePeriods
static constexpr int kIdlePeriods
Definition: abseil-cpp/absl/synchronization/internal/waiter.h:97
ABSL_NAMESPACE_BEGIN
#define ABSL_NAMESPACE_BEGIN
Definition: third_party/abseil-cpp/absl/base/config.h:170
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
absl::synchronization_internal::KernelTimeout
Definition: abseil-cpp/absl/synchronization/internal/kernel_timeout.h:44
absl::synchronization_internal::MaybeBecomeIdle
static void MaybeBecomeIdle()
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:56
x
int x
Definition: bloaty/third_party/googletest/googlemock/test/gmock-matchers_test.cc:3610
value
const char * value
Definition: hpack_parser_table.cc:165
FATAL
#define FATAL(msg)
Definition: task.h:88
cv
unsigned cv
Definition: cxa_demangle.cpp:4908
sem_init
int sem_init(UV_PLATFORM_SEM_T *semid, int pshared, unsigned int value)
Definition: os390-syscalls.c:563
absl::str_format_internal::LengthMod::t
@ t
absl::synchronization_internal::Waiter::Poke
void Poke()
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:114
absl::synchronization_internal::Waiter::Wait
bool Wait(KernelTimeout t)
Definition: abseil-cpp/absl/synchronization/internal/waiter.cc:74
SRWLOCK
RTL_SRWLOCK SRWLOCK
Definition: win.h:174
absl
Definition: abseil-cpp/absl/algorithm/algorithm.h:31
int32_t
signed int int32_t
Definition: stdint-msvc2008.h:77
absl::str_format_internal::LengthMod::h
@ h
ABSL_RAW_LOG
#define ABSL_RAW_LOG(severity,...)
Definition: abseil-cpp/absl/base/internal/raw_logging.h:44
errno.h
absl::base_internal::ThreadIdentity::is_idle
std::atomic< bool > is_idle
Definition: abseil-cpp/absl/base/internal/thread_identity.h:157


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:52