00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "absl/synchronization/internal/waiter.h"
00016
00017 #include "absl/base/config.h"
00018
00019 #ifdef _WIN32
00020 #include <windows.h>
00021 #else
00022 #include <pthread.h>
00023 #include <sys/time.h>
00024 #include <unistd.h>
00025 #endif
00026
00027 #ifdef __linux__
00028 #include <linux/futex.h>
00029 #include <sys/syscall.h>
00030 #endif
00031
00032 #ifdef ABSL_HAVE_SEMAPHORE_H
00033 #include <semaphore.h>
00034 #endif
00035
00036 #include <errno.h>
00037 #include <stdio.h>
00038 #include <time.h>
00039
00040 #include <atomic>
00041 #include <cassert>
00042 #include <cstdint>
00043 #include <new>
00044 #include <type_traits>
00045
00046 #include "absl/base/internal/raw_logging.h"
00047 #include "absl/base/internal/thread_identity.h"
00048 #include "absl/base/optimization.h"
00049 #include "absl/synchronization/internal/kernel_timeout.h"
00050
00051 namespace absl {
00052 namespace synchronization_internal {
00053
00054 static void MaybeBecomeIdle() {
00055 base_internal::ThreadIdentity *identity =
00056 base_internal::CurrentThreadIdentityIfPresent();
00057 assert(identity != nullptr);
00058 const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
00059 const int ticker = identity->ticker.load(std::memory_order_relaxed);
00060 const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
00061 if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
00062 identity->is_idle.store(true, std::memory_order_relaxed);
00063 }
00064 }
00065
00066 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
00067
00068
00069
00070 #ifdef __BIONIC__
00071 #ifndef SYS_futex
00072 #define SYS_futex __NR_futex
00073 #endif
00074 #ifndef FUTEX_WAIT_BITSET
00075 #define FUTEX_WAIT_BITSET 9
00076 #endif
00077 #ifndef FUTEX_PRIVATE_FLAG
00078 #define FUTEX_PRIVATE_FLAG 128
00079 #endif
00080 #ifndef FUTEX_CLOCK_REALTIME
00081 #define FUTEX_CLOCK_REALTIME 256
00082 #endif
00083 #ifndef FUTEX_BITSET_MATCH_ANY
00084 #define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
00085 #endif
00086 #endif
00087
00088 class Futex {
00089 public:
00090 static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
00091 KernelTimeout t) {
00092 int err = 0;
00093 if (t.has_timeout()) {
00094
00095
00096 struct timespec abs_timeout = t.MakeAbsTimespec();
00097
00098
00099 err = syscall(
00100 SYS_futex, reinterpret_cast<int32_t *>(v),
00101 FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
00102 &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
00103 } else {
00104
00105
00106 err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
00107 FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
00108 }
00109 if (err != 0) {
00110 err = -errno;
00111 }
00112 return err;
00113 }
00114
00115 static int Wake(std::atomic<int32_t> *v, int32_t count) {
00116 int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
00117 FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
00118 if (ABSL_PREDICT_FALSE(err < 0)) {
00119 err = -errno;
00120 }
00121 return err;
00122 }
00123 };
00124
00125 void Waiter::Init() {
00126 futex_.store(0, std::memory_order_relaxed);
00127 }
00128
00129 bool Waiter::Wait(KernelTimeout t) {
00130
00131
00132 while (true) {
00133 int32_t x = futex_.load(std::memory_order_relaxed);
00134 if (x != 0) {
00135 if (!futex_.compare_exchange_weak(x, x - 1,
00136 std::memory_order_acquire,
00137 std::memory_order_relaxed)) {
00138 continue;
00139 }
00140 return true;
00141 }
00142
00143 const int err = Futex::WaitUntil(&futex_, 0, t);
00144 if (err != 0) {
00145 if (err == -EINTR || err == -EWOULDBLOCK) {
00146
00147 } else if (err == -ETIMEDOUT) {
00148 return false;
00149 } else {
00150 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
00151 }
00152 }
00153
00154 MaybeBecomeIdle();
00155 }
00156 }
00157
00158 void Waiter::Post() {
00159 if (futex_.fetch_add(1, std::memory_order_release) == 0) {
00160
00161 Poke();
00162 }
00163 }
00164
00165 void Waiter::Poke() {
00166
00167 const int err = Futex::Wake(&futex_, 1);
00168 if (ABSL_PREDICT_FALSE(err < 0)) {
00169 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
00170 }
00171 }
00172
00173 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
00174
00175 class PthreadMutexHolder {
00176 public:
00177 explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
00178 const int err = pthread_mutex_lock(mu_);
00179 if (err != 0) {
00180 ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
00181 }
00182 }
00183
00184 PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
00185 PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
00186
00187 ~PthreadMutexHolder() {
00188 const int err = pthread_mutex_unlock(mu_);
00189 if (err != 0) {
00190 ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
00191 }
00192 }
00193
00194 private:
00195 pthread_mutex_t *mu_;
00196 };
00197
00198 void Waiter::Init() {
00199 const int err = pthread_mutex_init(&mu_, 0);
00200 if (err != 0) {
00201 ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
00202 }
00203
00204 const int err2 = pthread_cond_init(&cv_, 0);
00205 if (err2 != 0) {
00206 ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
00207 }
00208
00209 waiter_count_.store(0, std::memory_order_relaxed);
00210 wakeup_count_.store(0, std::memory_order_relaxed);
00211 }
00212
00213 bool Waiter::Wait(KernelTimeout t) {
00214 struct timespec abs_timeout;
00215 if (t.has_timeout()) {
00216 abs_timeout = t.MakeAbsTimespec();
00217 }
00218
00219 PthreadMutexHolder h(&mu_);
00220 waiter_count_.fetch_add(1, std::memory_order_relaxed);
00221
00222 while (true) {
00223 int x = wakeup_count_.load(std::memory_order_relaxed);
00224 if (x != 0) {
00225 if (!wakeup_count_.compare_exchange_weak(x, x - 1,
00226 std::memory_order_acquire,
00227 std::memory_order_relaxed)) {
00228 continue;
00229 }
00230
00231 waiter_count_.fetch_sub(1, std::memory_order_relaxed);
00232 return true;
00233 }
00234
00235
00236 if (!t.has_timeout()) {
00237 const int err = pthread_cond_wait(&cv_, &mu_);
00238 if (err != 0) {
00239 ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
00240 }
00241 } else {
00242 const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
00243 if (err == ETIMEDOUT) {
00244 waiter_count_.fetch_sub(1, std::memory_order_relaxed);
00245 return false;
00246 }
00247 if (err != 0) {
00248 ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
00249 }
00250 }
00251 MaybeBecomeIdle();
00252 }
00253 }
00254
00255 void Waiter::Post() {
00256 wakeup_count_.fetch_add(1, std::memory_order_release);
00257 Poke();
00258 }
00259
00260 void Waiter::Poke() {
00261 if (waiter_count_.load(std::memory_order_relaxed) == 0) {
00262 return;
00263 }
00264
00265 PthreadMutexHolder h(&mu_);
00266 if (waiter_count_.load(std::memory_order_relaxed) == 0) {
00267 return;
00268 }
00269 const int err = pthread_cond_signal(&cv_);
00270 if (err != 0) {
00271 ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
00272 }
00273 }
00274
00275 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
00276
00277 void Waiter::Init() {
00278 if (sem_init(&sem_, 0, 0) != 0) {
00279 ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
00280 }
00281 wakeups_.store(0, std::memory_order_relaxed);
00282 }
00283
00284 bool Waiter::Wait(KernelTimeout t) {
00285 struct timespec abs_timeout;
00286 if (t.has_timeout()) {
00287 abs_timeout = t.MakeAbsTimespec();
00288 }
00289
00290
00291 while (true) {
00292 int x = wakeups_.load(std::memory_order_relaxed);
00293 if (x != 0) {
00294 if (!wakeups_.compare_exchange_weak(x, x - 1,
00295 std::memory_order_acquire,
00296 std::memory_order_relaxed)) {
00297 continue;
00298 }
00299
00300 return true;
00301 }
00302
00303
00304 while (true) {
00305 if (!t.has_timeout()) {
00306 if (sem_wait(&sem_) == 0) break;
00307 if (errno == EINTR) continue;
00308 ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
00309 } else {
00310 if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
00311 if (errno == EINTR) continue;
00312 if (errno == ETIMEDOUT) return false;
00313 ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
00314 }
00315 }
00316 MaybeBecomeIdle();
00317 }
00318 }
00319
00320 void Waiter::Post() {
00321 wakeups_.fetch_add(1, std::memory_order_release);
00322 Poke();
00323 }
00324
00325 void Waiter::Poke() {
00326 if (sem_post(&sem_) != 0) {
00327 ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
00328 }
00329 }
00330
00331 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
00332
00333 class Waiter::WinHelper {
00334 public:
00335 static SRWLOCK *GetLock(Waiter *w) {
00336 return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
00337 }
00338
00339 static CONDITION_VARIABLE *GetCond(Waiter *w) {
00340 return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
00341 }
00342
00343 static_assert(sizeof(SRWLOCK) == sizeof(Waiter::SRWLockStorage),
00344 "SRWLockStorage does not have the same size as SRWLOCK");
00345 static_assert(
00346 alignof(SRWLOCK) == alignof(Waiter::SRWLockStorage),
00347 "SRWLockStorage does not have the same alignment as SRWLOCK");
00348
00349 static_assert(sizeof(CONDITION_VARIABLE) ==
00350 sizeof(Waiter::ConditionVariableStorage),
00351 "ABSL_CONDITION_VARIABLE_STORAGE does not have the same size "
00352 "as CONDITION_VARIABLE");
00353 static_assert(alignof(CONDITION_VARIABLE) ==
00354 alignof(Waiter::ConditionVariableStorage),
00355 "ConditionVariableStorage does not have the same "
00356 "alignment as CONDITION_VARIABLE");
00357
00358
00359
00360 static_assert(std::is_trivially_constructible<SRWLOCK>::value,
00361 "The SRWLOCK type must be trivially constructible");
00362 static_assert(std::is_trivially_constructible<CONDITION_VARIABLE>::value,
00363 "The CONDITION_VARIABLE type must be trivially constructible");
00364 static_assert(std::is_trivially_destructible<SRWLOCK>::value,
00365 "The SRWLOCK type must be trivially destructible");
00366 static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
00367 "The CONDITION_VARIABLE type must be trivially destructible");
00368 };
00369
00370 class LockHolder {
00371 public:
00372 explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
00373 AcquireSRWLockExclusive(mu_);
00374 }
00375
00376 LockHolder(const LockHolder&) = delete;
00377 LockHolder& operator=(const LockHolder&) = delete;
00378
00379 ~LockHolder() {
00380 ReleaseSRWLockExclusive(mu_);
00381 }
00382
00383 private:
00384 SRWLOCK* mu_;
00385 };
00386
00387 void Waiter::Init() {
00388 auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
00389 auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
00390 InitializeSRWLock(mu);
00391 InitializeConditionVariable(cv);
00392 waiter_count_.store(0, std::memory_order_relaxed);
00393 wakeup_count_.store(0, std::memory_order_relaxed);
00394 }
00395
00396 bool Waiter::Wait(KernelTimeout t) {
00397 SRWLOCK *mu = WinHelper::GetLock(this);
00398 CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
00399
00400 LockHolder h(mu);
00401 waiter_count_.fetch_add(1, std::memory_order_relaxed);
00402
00403
00404 while (true) {
00405 int x = wakeup_count_.load(std::memory_order_relaxed);
00406 if (x != 0) {
00407 if (!wakeup_count_.compare_exchange_weak(x, x - 1,
00408 std::memory_order_acquire,
00409 std::memory_order_relaxed)) {
00410 continue;
00411 }
00412
00413 waiter_count_.fetch_sub(1, std::memory_order_relaxed);
00414 return true;
00415 }
00416
00417
00418 if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
00419
00420
00421
00422 const unsigned long err{GetLastError()};
00423 if (err == ERROR_TIMEOUT) {
00424 waiter_count_.fetch_sub(1, std::memory_order_relaxed);
00425 return false;
00426 } else {
00427 ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
00428 }
00429 }
00430
00431 MaybeBecomeIdle();
00432 }
00433 }
00434
00435 void Waiter::Post() {
00436 wakeup_count_.fetch_add(1, std::memory_order_release);
00437 Poke();
00438 }
00439
00440 void Waiter::Poke() {
00441 if (waiter_count_.load(std::memory_order_relaxed) == 0) {
00442 return;
00443 }
00444
00445 LockHolder h(WinHelper::GetLock(this));
00446 if (waiter_count_.load(std::memory_order_relaxed) == 0) {
00447 return;
00448 }
00449 WakeConditionVariable(WinHelper::GetCond(this));
00450 }
00451
00452 #else
00453 #error Unknown ABSL_WAITER_MODE
00454 #endif
00455
00456 }
00457 }