waiter.cc
Go to the documentation of this file.
1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "absl/base/config.h"
18 
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26 
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31 
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35 
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39 
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45 
48 #include "absl/base/optimization.h"
50 
51 namespace absl {
52 namespace synchronization_internal {
53 
54 static void MaybeBecomeIdle() {
57  assert(identity != nullptr);
58  const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
59  const int ticker = identity->ticker.load(std::memory_order_relaxed);
60  const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
61  if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
62  identity->is_idle.store(true, std::memory_order_relaxed);
63  }
64 }
65 
66 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
67 
68 // Some Android headers are missing these definitions even though they
69 // support these futex operations.
70 #ifdef __BIONIC__
71 #ifndef SYS_futex
72 #define SYS_futex __NR_futex
73 #endif
74 #ifndef FUTEX_WAIT_BITSET
75 #define FUTEX_WAIT_BITSET 9
76 #endif
77 #ifndef FUTEX_PRIVATE_FLAG
78 #define FUTEX_PRIVATE_FLAG 128
79 #endif
80 #ifndef FUTEX_CLOCK_REALTIME
81 #define FUTEX_CLOCK_REALTIME 256
82 #endif
83 #ifndef FUTEX_BITSET_MATCH_ANY
84 #define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
85 #endif
86 #endif
87 
88 class Futex {
89  public:
90  static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
91  KernelTimeout t) {
92  int err = 0;
93  if (t.has_timeout()) {
94  // https://locklessinc.com/articles/futex_cheat_sheet/
95  // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
96  struct timespec abs_timeout = t.MakeAbsTimespec();
97  // Atomically check that the futex value is still 0, and if it
98  // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
99  err = syscall(
100  SYS_futex, reinterpret_cast<int32_t *>(v),
101  FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
102  &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
103  } else {
104  // Atomically check that the futex value is still 0, and if it
105  // is, sleep until woken by FUTEX_WAKE.
106  err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
107  FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
108  }
109  if (err != 0) {
110  err = -errno;
111  }
112  return err;
113  }
114 
115  static int Wake(std::atomic<int32_t> *v, int32_t count) {
116  int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
117  FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
118  if (ABSL_PREDICT_FALSE(err < 0)) {
119  err = -errno;
120  }
121  return err;
122  }
123 };
124 
125 void Waiter::Init() {
126  futex_.store(0, std::memory_order_relaxed);
127 }
128 
130  // Loop until we can atomically decrement futex from a positive
131  // value, waiting on a futex while we believe it is zero.
132  while (true) {
133  int32_t x = futex_.load(std::memory_order_relaxed);
134  if (x != 0) {
135  if (!futex_.compare_exchange_weak(x, x - 1,
136  std::memory_order_acquire,
137  std::memory_order_relaxed)) {
138  continue; // Raced with someone, retry.
139  }
140  return true; // Consumed a wakeup, we are done.
141  }
142 
143  const int err = Futex::WaitUntil(&futex_, 0, t);
144  if (err != 0) {
145  if (err == -EINTR || err == -EWOULDBLOCK) {
146  // Do nothing, the loop will retry.
147  } else if (err == -ETIMEDOUT) {
148  return false;
149  } else {
150  ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
151  }
152  }
153 
154  MaybeBecomeIdle();
155  }
156 }
157 
158 void Waiter::Post() {
159  if (futex_.fetch_add(1, std::memory_order_release) == 0) {
160  // We incremented from 0, need to wake a potential waker.
161  Poke();
162  }
163 }
164 
165 void Waiter::Poke() {
166  // Wake one thread waiting on the futex.
167  const int err = Futex::Wake(&futex_, 1);
168  if (ABSL_PREDICT_FALSE(err < 0)) {
169  ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
170  }
171 }
172 
173 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
174 
175 class PthreadMutexHolder {
176  public:
177  explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
178  const int err = pthread_mutex_lock(mu_);
179  if (err != 0) {
180  ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
181  }
182  }
183 
184  PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
185  PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
186 
187  ~PthreadMutexHolder() {
188  const int err = pthread_mutex_unlock(mu_);
189  if (err != 0) {
190  ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
191  }
192  }
193 
194  private:
195  pthread_mutex_t *mu_;
196 };
197 
198 void Waiter::Init() {
199  const int err = pthread_mutex_init(&mu_, 0);
200  if (err != 0) {
201  ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
202  }
203 
204  const int err2 = pthread_cond_init(&cv_, 0);
205  if (err2 != 0) {
206  ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
207  }
208 
209  waiter_count_.store(0, std::memory_order_relaxed);
210  wakeup_count_.store(0, std::memory_order_relaxed);
211 }
212 
213 bool Waiter::Wait(KernelTimeout t) {
214  struct timespec abs_timeout;
215  if (t.has_timeout()) {
216  abs_timeout = t.MakeAbsTimespec();
217  }
218 
219  PthreadMutexHolder h(&mu_);
220  waiter_count_.fetch_add(1, std::memory_order_relaxed);
221  // Loop until we find a wakeup to consume or timeout.
222  while (true) {
223  int x = wakeup_count_.load(std::memory_order_relaxed);
224  if (x != 0) {
225  if (!wakeup_count_.compare_exchange_weak(x, x - 1,
226  std::memory_order_acquire,
227  std::memory_order_relaxed)) {
228  continue; // Raced with someone, retry.
229  }
230  // Successfully consumed a wakeup, we're done.
231  waiter_count_.fetch_sub(1, std::memory_order_relaxed);
232  return true;
233  }
234 
235  // No wakeups available, time to wait.
236  if (!t.has_timeout()) {
237  const int err = pthread_cond_wait(&cv_, &mu_);
238  if (err != 0) {
239  ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
240  }
241  } else {
242  const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
243  if (err == ETIMEDOUT) {
244  waiter_count_.fetch_sub(1, std::memory_order_relaxed);
245  return false;
246  }
247  if (err != 0) {
248  ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
249  }
250  }
251  MaybeBecomeIdle();
252  }
253 }
254 
255 void Waiter::Post() {
256  wakeup_count_.fetch_add(1, std::memory_order_release);
257  Poke();
258 }
259 
260 void Waiter::Poke() {
261  if (waiter_count_.load(std::memory_order_relaxed) == 0) {
262  return;
263  }
264  // Potentially a waker. Take the lock and check again.
265  PthreadMutexHolder h(&mu_);
266  if (waiter_count_.load(std::memory_order_relaxed) == 0) {
267  return;
268  }
269  const int err = pthread_cond_signal(&cv_);
270  if (err != 0) {
271  ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
272  }
273 }
274 
275 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
276 
277 void Waiter::Init() {
278  if (sem_init(&sem_, 0, 0) != 0) {
279  ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
280  }
281  wakeups_.store(0, std::memory_order_relaxed);
282 }
283 
284 bool Waiter::Wait(KernelTimeout t) {
285  struct timespec abs_timeout;
286  if (t.has_timeout()) {
287  abs_timeout = t.MakeAbsTimespec();
288  }
289 
290  // Loop until we timeout or consume a wakeup.
291  while (true) {
292  int x = wakeups_.load(std::memory_order_relaxed);
293  if (x != 0) {
294  if (!wakeups_.compare_exchange_weak(x, x - 1,
295  std::memory_order_acquire,
296  std::memory_order_relaxed)) {
297  continue; // Raced with someone, retry.
298  }
299  // Successfully consumed a wakeup, we're done.
300  return true;
301  }
302 
303  // Nothing to consume, wait (looping on EINTR).
304  while (true) {
305  if (!t.has_timeout()) {
306  if (sem_wait(&sem_) == 0) break;
307  if (errno == EINTR) continue;
308  ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
309  } else {
310  if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
311  if (errno == EINTR) continue;
312  if (errno == ETIMEDOUT) return false;
313  ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
314  }
315  }
316  MaybeBecomeIdle();
317  }
318 }
319 
320 void Waiter::Post() {
321  wakeups_.fetch_add(1, std::memory_order_release); // Post a wakeup.
322  Poke();
323 }
324 
325 void Waiter::Poke() {
326  if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
327  ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
328  }
329 }
330 
331 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
332 
333 class Waiter::WinHelper {
334  public:
335  static SRWLOCK *GetLock(Waiter *w) {
336  return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
337  }
338 
339  static CONDITION_VARIABLE *GetCond(Waiter *w) {
340  return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
341  }
342 
343  static_assert(sizeof(SRWLOCK) == sizeof(Waiter::SRWLockStorage),
344  "SRWLockStorage does not have the same size as SRWLOCK");
345  static_assert(
346  alignof(SRWLOCK) == alignof(Waiter::SRWLockStorage),
347  "SRWLockStorage does not have the same alignment as SRWLOCK");
348 
349  static_assert(sizeof(CONDITION_VARIABLE) ==
350  sizeof(Waiter::ConditionVariableStorage),
351  "ABSL_CONDITION_VARIABLE_STORAGE does not have the same size "
352  "as CONDITION_VARIABLE");
353  static_assert(alignof(CONDITION_VARIABLE) ==
354  alignof(Waiter::ConditionVariableStorage),
355  "ConditionVariableStorage does not have the same "
356  "alignment as CONDITION_VARIABLE");
357 
358  // The SRWLOCK and CONDITION_VARIABLE types must be trivially constuctible
359  // and destructible because we never call their constructors or destructors.
361  "The SRWLOCK type must be trivially constructible");
363  "The CONDITION_VARIABLE type must be trivially constructible");
365  "The SRWLOCK type must be trivially destructible");
367  "The CONDITION_VARIABLE type must be trivially destructible");
368 };
369 
370 class LockHolder {
371  public:
372  explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
373  AcquireSRWLockExclusive(mu_);
374  }
375 
376  LockHolder(const LockHolder&) = delete;
377  LockHolder& operator=(const LockHolder&) = delete;
378 
379  ~LockHolder() {
380  ReleaseSRWLockExclusive(mu_);
381  }
382 
383  private:
384  SRWLOCK* mu_;
385 };
386 
387 void Waiter::Init() {
388  auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
389  auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
390  InitializeSRWLock(mu);
391  InitializeConditionVariable(cv);
392  waiter_count_.store(0, std::memory_order_relaxed);
393  wakeup_count_.store(0, std::memory_order_relaxed);
394 }
395 
396 bool Waiter::Wait(KernelTimeout t) {
397  SRWLOCK *mu = WinHelper::GetLock(this);
398  CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
399 
400  LockHolder h(mu);
401  waiter_count_.fetch_add(1, std::memory_order_relaxed);
402 
403  // Loop until we find a wakeup to consume or timeout.
404  while (true) {
405  int x = wakeup_count_.load(std::memory_order_relaxed);
406  if (x != 0) {
407  if (!wakeup_count_.compare_exchange_weak(x, x - 1,
408  std::memory_order_acquire,
409  std::memory_order_relaxed)) {
410  continue; // Raced with someone, retry.
411  }
412  // Successfully consumed a wakeup, we're done.
413  waiter_count_.fetch_sub(1, std::memory_order_relaxed);
414  return true;
415  }
416 
417  // No wakeups available, time to wait.
418  if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
419  // GetLastError() returns a Win32 DWORD, but we assign to
420  // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
421  // initialization guarantees this is not a narrowing conversion.
422  const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
423  if (err == ERROR_TIMEOUT) {
424  waiter_count_.fetch_sub(1, std::memory_order_relaxed);
425  return false;
426  } else {
427  ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
428  }
429  }
430 
431  MaybeBecomeIdle();
432  }
433 }
434 
435 void Waiter::Post() {
436  wakeup_count_.fetch_add(1, std::memory_order_release);
437  Poke();
438 }
439 
440 void Waiter::Poke() {
441  if (waiter_count_.load(std::memory_order_relaxed) == 0) {
442  return;
443  }
444  // Potentially a waker. Take the lock and check again.
445  LockHolder h(WinHelper::GetLock(this));
446  if (waiter_count_.load(std::memory_order_relaxed) == 0) {
447  return;
448  }
449  WakeConditionVariable(WinHelper::GetCond(this));
450 }
451 
452 #else
453 #error Unknown ABSL_WAITER_MODE
454 #endif
455 
456 } // namespace synchronization_internal
457 } // namespace absl
int v
Definition: variant_test.cc:81
bool Wait(KernelTimeout t)
Definition: waiter.cc:129
static int WaitUntil(std::atomic< int32_t > *v, int32_t val, KernelTimeout t)
Definition: waiter.cc:90
#define ABSL_RAW_LOG(severity,...)
Definition: raw_logging.h:42
#define ABSL_PREDICT_FALSE(x)
Definition: optimization.h:177
ThreadIdentity * CurrentThreadIdentityIfPresent()
Definition: algorithm.h:29
static int Wake(std::atomic< int32_t > *v, int32_t count)
Definition: waiter.cc:115
size_t value
static void MaybeBecomeIdle()
Definition: waiter.cc:54


abseil_cpp
Author(s):
autogenerated on Mon Feb 28 2022 21:31:21