waiter.cc
Go to the documentation of this file.
00001 // Copyright 2017 The Abseil Authors.
00002 //
00003 // Licensed under the Apache License, Version 2.0 (the "License");
00004 // you may not use this file except in compliance with the License.
00005 // You may obtain a copy of the License at
00006 //
00007 //      https://www.apache.org/licenses/LICENSE-2.0
00008 //
00009 // Unless required by applicable law or agreed to in writing, software
00010 // distributed under the License is distributed on an "AS IS" BASIS,
00011 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00012 // See the License for the specific language governing permissions and
00013 // limitations under the License.
00014 
00015 #include "absl/synchronization/internal/waiter.h"
00016 
00017 #include "absl/base/config.h"
00018 
00019 #ifdef _WIN32
00020 #include <windows.h>
00021 #else
00022 #include <pthread.h>
00023 #include <sys/time.h>
00024 #include <unistd.h>
00025 #endif
00026 
00027 #ifdef __linux__
00028 #include <linux/futex.h>
00029 #include <sys/syscall.h>
00030 #endif
00031 
00032 #ifdef ABSL_HAVE_SEMAPHORE_H
00033 #include <semaphore.h>
00034 #endif
00035 
00036 #include <errno.h>
00037 #include <stdio.h>
00038 #include <time.h>
00039 
00040 #include <atomic>
00041 #include <cassert>
00042 #include <cstdint>
00043 #include <new>
00044 #include <type_traits>
00045 
00046 #include "absl/base/internal/raw_logging.h"
00047 #include "absl/base/internal/thread_identity.h"
00048 #include "absl/base/optimization.h"
00049 #include "absl/synchronization/internal/kernel_timeout.h"
00050 
00051 namespace absl {
00052 namespace synchronization_internal {
00053 
00054 static void MaybeBecomeIdle() {
00055   base_internal::ThreadIdentity *identity =
00056       base_internal::CurrentThreadIdentityIfPresent();
00057   assert(identity != nullptr);
00058   const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
00059   const int ticker = identity->ticker.load(std::memory_order_relaxed);
00060   const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
00061   if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
00062     identity->is_idle.store(true, std::memory_order_relaxed);
00063   }
00064 }
00065 
00066 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
00067 
00068 // Some Android headers are missing these definitions even though they
00069 // support these futex operations.
00070 #ifdef __BIONIC__
00071 #ifndef SYS_futex
00072 #define SYS_futex __NR_futex
00073 #endif
00074 #ifndef FUTEX_WAIT_BITSET
00075 #define FUTEX_WAIT_BITSET 9
00076 #endif
00077 #ifndef FUTEX_PRIVATE_FLAG
00078 #define FUTEX_PRIVATE_FLAG 128
00079 #endif
00080 #ifndef FUTEX_CLOCK_REALTIME
00081 #define FUTEX_CLOCK_REALTIME 256
00082 #endif
00083 #ifndef FUTEX_BITSET_MATCH_ANY
00084 #define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
00085 #endif
00086 #endif
00087 
00088 class Futex {
00089  public:
00090   static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
00091                        KernelTimeout t) {
00092     int err = 0;
00093     if (t.has_timeout()) {
00094       // https://locklessinc.com/articles/futex_cheat_sheet/
00095       // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
00096       struct timespec abs_timeout = t.MakeAbsTimespec();
00097       // Atomically check that the futex value is still 0, and if it
00098       // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
00099       err = syscall(
00100           SYS_futex, reinterpret_cast<int32_t *>(v),
00101           FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
00102           &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
00103     } else {
00104       // Atomically check that the futex value is still 0, and if it
00105       // is, sleep until woken by FUTEX_WAKE.
00106       err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
00107                     FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
00108     }
00109     if (err != 0) {
00110       err = -errno;
00111     }
00112     return err;
00113   }
00114 
00115   static int Wake(std::atomic<int32_t> *v, int32_t count) {
00116     int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
00117                       FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
00118     if (ABSL_PREDICT_FALSE(err < 0)) {
00119       err = -errno;
00120     }
00121     return err;
00122   }
00123 };
00124 
00125 void Waiter::Init() {
00126   futex_.store(0, std::memory_order_relaxed);
00127 }
00128 
00129 bool Waiter::Wait(KernelTimeout t) {
00130   // Loop until we can atomically decrement futex from a positive
00131   // value, waiting on a futex while we believe it is zero.
00132   while (true) {
00133     int32_t x = futex_.load(std::memory_order_relaxed);
00134     if (x != 0) {
00135       if (!futex_.compare_exchange_weak(x, x - 1,
00136                                         std::memory_order_acquire,
00137                                         std::memory_order_relaxed)) {
00138         continue;  // Raced with someone, retry.
00139       }
00140       return true;  // Consumed a wakeup, we are done.
00141     }
00142 
00143     const int err = Futex::WaitUntil(&futex_, 0, t);
00144     if (err != 0) {
00145       if (err == -EINTR || err == -EWOULDBLOCK) {
00146         // Do nothing, the loop will retry.
00147       } else if (err == -ETIMEDOUT) {
00148         return false;
00149       } else {
00150         ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
00151       }
00152     }
00153 
00154     MaybeBecomeIdle();
00155   }
00156 }
00157 
00158 void Waiter::Post() {
00159   if (futex_.fetch_add(1, std::memory_order_release) == 0) {
00160     // We incremented from 0, need to wake a potential waker.
00161     Poke();
00162   }
00163 }
00164 
00165 void Waiter::Poke() {
00166   // Wake one thread waiting on the futex.
00167   const int err = Futex::Wake(&futex_, 1);
00168   if (ABSL_PREDICT_FALSE(err < 0)) {
00169     ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
00170   }
00171 }
00172 
00173 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
00174 
00175 class PthreadMutexHolder {
00176  public:
00177   explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
00178     const int err = pthread_mutex_lock(mu_);
00179     if (err != 0) {
00180       ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
00181     }
00182   }
00183 
00184   PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
00185   PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
00186 
00187   ~PthreadMutexHolder() {
00188     const int err = pthread_mutex_unlock(mu_);
00189     if (err != 0) {
00190       ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
00191     }
00192   }
00193 
00194  private:
00195   pthread_mutex_t *mu_;
00196 };
00197 
00198 void Waiter::Init() {
00199   const int err = pthread_mutex_init(&mu_, 0);
00200   if (err != 0) {
00201     ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
00202   }
00203 
00204   const int err2 = pthread_cond_init(&cv_, 0);
00205   if (err2 != 0) {
00206     ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
00207   }
00208 
00209   waiter_count_.store(0, std::memory_order_relaxed);
00210   wakeup_count_.store(0, std::memory_order_relaxed);
00211 }
00212 
00213 bool Waiter::Wait(KernelTimeout t) {
00214   struct timespec abs_timeout;
00215   if (t.has_timeout()) {
00216     abs_timeout = t.MakeAbsTimespec();
00217   }
00218 
00219   PthreadMutexHolder h(&mu_);
00220   waiter_count_.fetch_add(1, std::memory_order_relaxed);
00221   // Loop until we find a wakeup to consume or timeout.
00222   while (true) {
00223     int x = wakeup_count_.load(std::memory_order_relaxed);
00224     if (x != 0) {
00225       if (!wakeup_count_.compare_exchange_weak(x, x - 1,
00226                                                std::memory_order_acquire,
00227                                                std::memory_order_relaxed)) {
00228         continue;  // Raced with someone, retry.
00229       }
00230       // Successfully consumed a wakeup, we're done.
00231       waiter_count_.fetch_sub(1, std::memory_order_relaxed);
00232       return true;
00233     }
00234 
00235     // No wakeups available, time to wait.
00236     if (!t.has_timeout()) {
00237       const int err = pthread_cond_wait(&cv_, &mu_);
00238       if (err != 0) {
00239         ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
00240       }
00241     } else {
00242       const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
00243       if (err == ETIMEDOUT) {
00244         waiter_count_.fetch_sub(1, std::memory_order_relaxed);
00245         return false;
00246       }
00247       if (err != 0) {
00248         ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
00249       }
00250     }
00251     MaybeBecomeIdle();
00252   }
00253 }
00254 
00255 void Waiter::Post() {
00256   wakeup_count_.fetch_add(1, std::memory_order_release);
00257   Poke();
00258 }
00259 
00260 void Waiter::Poke() {
00261   if (waiter_count_.load(std::memory_order_relaxed) == 0) {
00262     return;
00263   }
00264   // Potentially a waker. Take the lock and check again.
00265   PthreadMutexHolder h(&mu_);
00266   if (waiter_count_.load(std::memory_order_relaxed) == 0) {
00267     return;
00268   }
00269   const int err = pthread_cond_signal(&cv_);
00270   if (err != 0) {
00271     ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
00272   }
00273 }
00274 
00275 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
00276 
00277 void Waiter::Init() {
00278   if (sem_init(&sem_, 0, 0) != 0) {
00279     ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
00280   }
00281   wakeups_.store(0, std::memory_order_relaxed);
00282 }
00283 
00284 bool Waiter::Wait(KernelTimeout t) {
00285   struct timespec abs_timeout;
00286   if (t.has_timeout()) {
00287     abs_timeout = t.MakeAbsTimespec();
00288   }
00289 
00290   // Loop until we timeout or consume a wakeup.
00291   while (true) {
00292     int x = wakeups_.load(std::memory_order_relaxed);
00293     if (x != 0) {
00294       if (!wakeups_.compare_exchange_weak(x, x - 1,
00295                                           std::memory_order_acquire,
00296                                           std::memory_order_relaxed)) {
00297         continue;  // Raced with someone, retry.
00298       }
00299       // Successfully consumed a wakeup, we're done.
00300       return true;
00301     }
00302 
00303     // Nothing to consume, wait (looping on EINTR).
00304     while (true) {
00305       if (!t.has_timeout()) {
00306         if (sem_wait(&sem_) == 0) break;
00307         if (errno == EINTR) continue;
00308         ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
00309       } else {
00310         if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
00311         if (errno == EINTR) continue;
00312         if (errno == ETIMEDOUT) return false;
00313         ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
00314       }
00315     }
00316     MaybeBecomeIdle();
00317   }
00318 }
00319 
00320 void Waiter::Post() {
00321   wakeups_.fetch_add(1, std::memory_order_release);  // Post a wakeup.
00322   Poke();
00323 }
00324 
00325 void Waiter::Poke() {
00326   if (sem_post(&sem_) != 0) {  // Wake any semaphore waiter.
00327     ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
00328   }
00329 }
00330 
00331 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
00332 
00333 class Waiter::WinHelper {
00334  public:
00335   static SRWLOCK *GetLock(Waiter *w) {
00336     return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
00337   }
00338 
00339   static CONDITION_VARIABLE *GetCond(Waiter *w) {
00340     return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
00341   }
00342 
00343   static_assert(sizeof(SRWLOCK) == sizeof(Waiter::SRWLockStorage),
00344                 "SRWLockStorage does not have the same size as SRWLOCK");
00345   static_assert(
00346       alignof(SRWLOCK) == alignof(Waiter::SRWLockStorage),
00347       "SRWLockStorage does not have the same alignment as SRWLOCK");
00348 
00349   static_assert(sizeof(CONDITION_VARIABLE) ==
00350                     sizeof(Waiter::ConditionVariableStorage),
00351                 "ABSL_CONDITION_VARIABLE_STORAGE does not have the same size "
00352                 "as CONDITION_VARIABLE");
00353   static_assert(alignof(CONDITION_VARIABLE) ==
00354                     alignof(Waiter::ConditionVariableStorage),
00355                 "ConditionVariableStorage does not have the same "
00356                 "alignment as CONDITION_VARIABLE");
00357 
00358   // The SRWLOCK and CONDITION_VARIABLE types must be trivially constuctible
00359   // and destructible because we never call their constructors or destructors.
00360   static_assert(std::is_trivially_constructible<SRWLOCK>::value,
00361                 "The SRWLOCK type must be trivially constructible");
00362   static_assert(std::is_trivially_constructible<CONDITION_VARIABLE>::value,
00363                 "The CONDITION_VARIABLE type must be trivially constructible");
00364   static_assert(std::is_trivially_destructible<SRWLOCK>::value,
00365                 "The SRWLOCK type must be trivially destructible");
00366   static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
00367                 "The CONDITION_VARIABLE type must be trivially destructible");
00368 };
00369 
00370 class LockHolder {
00371  public:
00372   explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
00373     AcquireSRWLockExclusive(mu_);
00374   }
00375 
00376   LockHolder(const LockHolder&) = delete;
00377   LockHolder& operator=(const LockHolder&) = delete;
00378 
00379   ~LockHolder() {
00380     ReleaseSRWLockExclusive(mu_);
00381   }
00382 
00383  private:
00384   SRWLOCK* mu_;
00385 };
00386 
00387 void Waiter::Init() {
00388   auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
00389   auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
00390   InitializeSRWLock(mu);
00391   InitializeConditionVariable(cv);
00392   waiter_count_.store(0, std::memory_order_relaxed);
00393   wakeup_count_.store(0, std::memory_order_relaxed);
00394 }
00395 
00396 bool Waiter::Wait(KernelTimeout t) {
00397   SRWLOCK *mu = WinHelper::GetLock(this);
00398   CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
00399 
00400   LockHolder h(mu);
00401   waiter_count_.fetch_add(1, std::memory_order_relaxed);
00402 
00403   // Loop until we find a wakeup to consume or timeout.
00404   while (true) {
00405     int x = wakeup_count_.load(std::memory_order_relaxed);
00406     if (x != 0) {
00407       if (!wakeup_count_.compare_exchange_weak(x, x - 1,
00408                                                std::memory_order_acquire,
00409                                                std::memory_order_relaxed)) {
00410         continue;  // Raced with someone, retry.
00411       }
00412       // Successfully consumed a wakeup, we're done.
00413       waiter_count_.fetch_sub(1, std::memory_order_relaxed);
00414       return true;
00415     }
00416 
00417     // No wakeups available, time to wait.
00418     if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
00419       // GetLastError() returns a Win32 DWORD, but we assign to
00420       // unsigned long to simplify the ABSL_RAW_LOG case below.  The uniform
00421       // initialization guarantees this is not a narrowing conversion.
00422       const unsigned long err{GetLastError()};  // NOLINT(runtime/int)
00423       if (err == ERROR_TIMEOUT) {
00424         waiter_count_.fetch_sub(1, std::memory_order_relaxed);
00425         return false;
00426       } else {
00427         ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
00428       }
00429     }
00430 
00431     MaybeBecomeIdle();
00432   }
00433 }
00434 
00435 void Waiter::Post() {
00436   wakeup_count_.fetch_add(1, std::memory_order_release);
00437   Poke();
00438 }
00439 
00440 void Waiter::Poke() {
00441   if (waiter_count_.load(std::memory_order_relaxed) == 0) {
00442     return;
00443   }
00444   // Potentially a waker. Take the lock and check again.
00445   LockHolder h(WinHelper::GetLock(this));
00446   if (waiter_count_.load(std::memory_order_relaxed) == 0) {
00447     return;
00448   }
00449   WakeConditionVariable(WinHelper::GetCond(this));
00450 }
00451 
00452 #else
00453 #error Unknown ABSL_WAITER_MODE
00454 #endif
00455 
00456 }  // namespace synchronization_internal
00457 }  // namespace absl


abseil_cpp
Author(s):
autogenerated on Wed Jun 19 2019 19:42:16