00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "absl/synchronization/mutex.h"
00016
00017 #ifdef _WIN32
00018 #include <windows.h>
00019 #ifdef ERROR
00020 #undef ERROR
00021 #endif
00022 #else
00023 #include <fcntl.h>
00024 #include <pthread.h>
00025 #include <sched.h>
00026 #include <sys/time.h>
00027 #endif
00028
00029 #include <assert.h>
00030 #include <errno.h>
00031 #include <stdio.h>
00032 #include <stdlib.h>
00033 #include <string.h>
00034 #include <time.h>
00035
00036 #include <algorithm>
00037 #include <atomic>
00038 #include <cinttypes>
00039 #include <thread>
00040
00041 #include "absl/base/attributes.h"
00042 #include "absl/base/config.h"
00043 #include "absl/base/dynamic_annotations.h"
00044 #include "absl/base/internal/atomic_hook.h"
00045 #include "absl/base/internal/cycleclock.h"
00046 #include "absl/base/internal/hide_ptr.h"
00047 #include "absl/base/internal/low_level_alloc.h"
00048 #include "absl/base/internal/raw_logging.h"
00049 #include "absl/base/internal/spinlock.h"
00050 #include "absl/base/internal/sysinfo.h"
00051 #include "absl/base/internal/thread_identity.h"
00052 #include "absl/base/port.h"
00053 #include "absl/debugging/stacktrace.h"
00054 #include "absl/debugging/symbolize.h"
00055 #include "absl/synchronization/internal/graphcycles.h"
00056 #include "absl/synchronization/internal/per_thread_sem.h"
00057 #include "absl/time/time.h"
00058
00059 using absl::base_internal::CurrentThreadIdentityIfPresent;
00060 using absl::base_internal::PerThreadSynch;
00061 using absl::base_internal::ThreadIdentity;
00062 using absl::synchronization_internal::GetOrCreateCurrentThreadIdentity;
00063 using absl::synchronization_internal::GraphCycles;
00064 using absl::synchronization_internal::GraphId;
00065 using absl::synchronization_internal::InvalidGraphId;
00066 using absl::synchronization_internal::KernelTimeout;
00067 using absl::synchronization_internal::PerThreadSem;
00068
00069 extern "C" {
00070 ABSL_ATTRIBUTE_WEAK void AbslInternalMutexYield() { std::this_thread::yield(); }
00071 }
00072
00073 namespace absl {
00074
00075 namespace {
00076
00077 #if defined(THREAD_SANITIZER)
00078 constexpr OnDeadlockCycle kDeadlockDetectionDefault = OnDeadlockCycle::kIgnore;
00079 #else
00080 constexpr OnDeadlockCycle kDeadlockDetectionDefault = OnDeadlockCycle::kAbort;
00081 #endif
00082
00083 ABSL_CONST_INIT std::atomic<OnDeadlockCycle> synch_deadlock_detection(
00084 kDeadlockDetectionDefault);
00085 ABSL_CONST_INIT std::atomic<bool> synch_check_invariants(false);
00086
00087
00088
00089
00090
00091
00092 static struct MutexGlobals {
00093 MutexGlobals() {
00094
00095
00096
00097 num_cpus = absl::base_internal::NumCPUs();
00098 spinloop_iterations = num_cpus > 1 ? 1500 : 0;
00099 }
00100 int num_cpus;
00101 int spinloop_iterations;
00102
00103 char padding[ABSL_CACHELINE_SIZE - 2 * sizeof(int)];
00104 } ABSL_CACHELINE_ALIGNED mutex_globals;
00105 static_assert(
00106 sizeof(MutexGlobals) == ABSL_CACHELINE_SIZE,
00107 "MutexGlobals must occupy an entire cacheline to prevent false sharing");
00108
00109 ABSL_CONST_INIT absl::base_internal::AtomicHook<void (*)(int64_t wait_cycles)>
00110 submit_profile_data;
00111 ABSL_CONST_INIT absl::base_internal::AtomicHook<
00112 void (*)(const char *msg, const void *obj, int64_t wait_cycles)> mutex_tracer;
00113 ABSL_CONST_INIT absl::base_internal::AtomicHook<
00114 void (*)(const char *msg, const void *cv)> cond_var_tracer;
00115 ABSL_CONST_INIT absl::base_internal::AtomicHook<
00116 bool (*)(const void *pc, char *out, int out_size)>
00117 symbolizer(absl::Symbolize);
00118
00119 }
00120
00121 static inline bool EvalConditionAnnotated(const Condition *cond, Mutex *mu,
00122 bool locking, bool trylock,
00123 bool read_lock);
00124
00125 void RegisterMutexProfiler(void (*fn)(int64_t wait_timestamp)) {
00126 submit_profile_data.Store(fn);
00127 }
00128
00129 void RegisterMutexTracer(void (*fn)(const char *msg, const void *obj,
00130 int64_t wait_cycles)) {
00131 mutex_tracer.Store(fn);
00132 }
00133
00134 void RegisterCondVarTracer(void (*fn)(const char *msg, const void *cv)) {
00135 cond_var_tracer.Store(fn);
00136 }
00137
00138 void RegisterSymbolizer(bool (*fn)(const void *pc, char *out, int out_size)) {
00139 symbolizer.Store(fn);
00140 }
00141
00142
00143 namespace {
00144 enum DelayMode { AGGRESSIVE, GENTLE };
00145 };
00146 static int Delay(int32_t c, DelayMode mode) {
00147
00148
00149
00150
00151
00152 int32_t limit = (mutex_globals.num_cpus > 1) ?
00153 ((mode == AGGRESSIVE) ? 5000 : 250) : 0;
00154 if (c < limit) {
00155 c++;
00156 } else {
00157 ABSL_TSAN_MUTEX_PRE_DIVERT(nullptr, 0);
00158 if (c == limit) {
00159 AbslInternalMutexYield();
00160 c++;
00161 } else {
00162 absl::SleepFor(absl::Microseconds(10));
00163 c = 0;
00164 }
00165 ABSL_TSAN_MUTEX_POST_DIVERT(nullptr, 0);
00166 }
00167 return (c);
00168 }
00169
00170
00171
00172
00173
00174
00175 static void AtomicSetBits(std::atomic<intptr_t>* pv, intptr_t bits,
00176 intptr_t wait_until_clear) {
00177 intptr_t v;
00178 do {
00179 v = pv->load(std::memory_order_relaxed);
00180 } while ((v & bits) != bits &&
00181 ((v & wait_until_clear) != 0 ||
00182 !pv->compare_exchange_weak(v, v | bits,
00183 std::memory_order_release,
00184 std::memory_order_relaxed)));
00185 }
00186
00187
00188
00189
00190
00191 static void AtomicClearBits(std::atomic<intptr_t>* pv, intptr_t bits,
00192 intptr_t wait_until_clear) {
00193 intptr_t v;
00194 do {
00195 v = pv->load(std::memory_order_relaxed);
00196 } while ((v & bits) != 0 &&
00197 ((v & wait_until_clear) != 0 ||
00198 !pv->compare_exchange_weak(v, v & ~bits,
00199 std::memory_order_release,
00200 std::memory_order_relaxed)));
00201 }
00202
00203
00204
00205
00206 static absl::base_internal::SpinLock deadlock_graph_mu(
00207 absl::base_internal::kLinkerInitialized);
00208
00209
00210 static GraphCycles *deadlock_graph GUARDED_BY(deadlock_graph_mu)
00211 PT_GUARDED_BY(deadlock_graph_mu);
00212
00213
00214
00215
00216
00217
00218
00219 namespace {
00220 enum {
00221
00222 SYNCH_EV_TRYLOCK_SUCCESS,
00223 SYNCH_EV_TRYLOCK_FAILED,
00224 SYNCH_EV_READERTRYLOCK_SUCCESS,
00225 SYNCH_EV_READERTRYLOCK_FAILED,
00226 SYNCH_EV_LOCK,
00227 SYNCH_EV_LOCK_RETURNING,
00228 SYNCH_EV_READERLOCK,
00229 SYNCH_EV_READERLOCK_RETURNING,
00230 SYNCH_EV_UNLOCK,
00231 SYNCH_EV_READERUNLOCK,
00232
00233
00234 SYNCH_EV_WAIT,
00235 SYNCH_EV_WAIT_RETURNING,
00236 SYNCH_EV_SIGNAL,
00237 SYNCH_EV_SIGNALALL,
00238 };
00239
00240 enum {
00241 SYNCH_F_R = 0x01,
00242 SYNCH_F_LCK = 0x02,
00243 SYNCH_F_TRY = 0x04,
00244 SYNCH_F_UNLOCK = 0x08,
00245
00246 SYNCH_F_LCK_W = SYNCH_F_LCK,
00247 SYNCH_F_LCK_R = SYNCH_F_LCK | SYNCH_F_R,
00248 };
00249 }
00250
00251
00252 static const struct {
00253 int flags;
00254 const char *msg;
00255 } event_properties[] = {
00256 {SYNCH_F_LCK_W | SYNCH_F_TRY, "TryLock succeeded "},
00257 {0, "TryLock failed "},
00258 {SYNCH_F_LCK_R | SYNCH_F_TRY, "ReaderTryLock succeeded "},
00259 {0, "ReaderTryLock failed "},
00260 {0, "Lock blocking "},
00261 {SYNCH_F_LCK_W, "Lock returning "},
00262 {0, "ReaderLock blocking "},
00263 {SYNCH_F_LCK_R, "ReaderLock returning "},
00264 {SYNCH_F_LCK_W | SYNCH_F_UNLOCK, "Unlock "},
00265 {SYNCH_F_LCK_R | SYNCH_F_UNLOCK, "ReaderUnlock "},
00266 {0, "Wait on "},
00267 {0, "Wait unblocked "},
00268 {0, "Signal on "},
00269 {0, "SignalAll on "},
00270 };
00271
00272 static absl::base_internal::SpinLock synch_event_mu(
00273 absl::base_internal::kLinkerInitialized);
00274
00275
00276
00277
00278 static const uint32_t kNSynchEvent = 1031;
00279
00280 static struct SynchEvent {
00281
00282 int refcount GUARDED_BY(synch_event_mu);
00283
00284
00285 SynchEvent *next GUARDED_BY(synch_event_mu);
00286
00287
00288 uintptr_t masked_addr;
00289
00290
00291
00292
00293 void (*invariant)(void *arg);
00294 void *arg;
00295 bool log;
00296
00297
00298 char name[1];
00299 } *synch_event[kNSynchEvent] GUARDED_BY(synch_event_mu);
00300
00301
00302
00303
00304
00305
00306
00307
00308 static SynchEvent *EnsureSynchEvent(std::atomic<intptr_t> *addr,
00309 const char *name, intptr_t bits,
00310 intptr_t lockbit) {
00311 uint32_t h = reinterpret_cast<intptr_t>(addr) % kNSynchEvent;
00312 SynchEvent *e;
00313
00314 synch_event_mu.Lock();
00315 for (e = synch_event[h];
00316 e != nullptr && e->masked_addr != base_internal::HidePtr(addr);
00317 e = e->next) {
00318 }
00319 if (e == nullptr) {
00320 if (name == nullptr) {
00321 name = "";
00322 }
00323 size_t l = strlen(name);
00324 e = reinterpret_cast<SynchEvent *>(
00325 base_internal::LowLevelAlloc::Alloc(sizeof(*e) + l));
00326 e->refcount = 2;
00327 e->masked_addr = base_internal::HidePtr(addr);
00328 e->invariant = nullptr;
00329 e->arg = nullptr;
00330 e->log = false;
00331 strcpy(e->name, name);
00332 e->next = synch_event[h];
00333 AtomicSetBits(addr, bits, lockbit);
00334 synch_event[h] = e;
00335 } else {
00336 e->refcount++;
00337 }
00338 synch_event_mu.Unlock();
00339 return e;
00340 }
00341
00342
00343 static void DeleteSynchEvent(SynchEvent *e) {
00344 base_internal::LowLevelAlloc::Free(e);
00345 }
00346
00347
00348 static void UnrefSynchEvent(SynchEvent *e) {
00349 if (e != nullptr) {
00350 synch_event_mu.Lock();
00351 bool del = (--(e->refcount) == 0);
00352 synch_event_mu.Unlock();
00353 if (del) {
00354 DeleteSynchEvent(e);
00355 }
00356 }
00357 }
00358
00359
00360
00361
00362 static void ForgetSynchEvent(std::atomic<intptr_t> *addr, intptr_t bits,
00363 intptr_t lockbit) {
00364 uint32_t h = reinterpret_cast<intptr_t>(addr) % kNSynchEvent;
00365 SynchEvent **pe;
00366 SynchEvent *e;
00367 synch_event_mu.Lock();
00368 for (pe = &synch_event[h];
00369 (e = *pe) != nullptr && e->masked_addr != base_internal::HidePtr(addr);
00370 pe = &e->next) {
00371 }
00372 bool del = false;
00373 if (e != nullptr) {
00374 *pe = e->next;
00375 del = (--(e->refcount) == 0);
00376 }
00377 AtomicClearBits(addr, bits, lockbit);
00378 synch_event_mu.Unlock();
00379 if (del) {
00380 DeleteSynchEvent(e);
00381 }
00382 }
00383
00384
00385
00386
00387 static SynchEvent *GetSynchEvent(const void *addr) {
00388 uint32_t h = reinterpret_cast<intptr_t>(addr) % kNSynchEvent;
00389 SynchEvent *e;
00390 synch_event_mu.Lock();
00391 for (e = synch_event[h];
00392 e != nullptr && e->masked_addr != base_internal::HidePtr(addr);
00393 e = e->next) {
00394 }
00395 if (e != nullptr) {
00396 e->refcount++;
00397 }
00398 synch_event_mu.Unlock();
00399 return e;
00400 }
00401
00402
00403
00404 static void PostSynchEvent(void *obj, int ev) {
00405 SynchEvent *e = GetSynchEvent(obj);
00406
00407
00408 if (e == nullptr || e->log) {
00409 void *pcs[40];
00410 int n = absl::GetStackTrace(pcs, ABSL_ARRAYSIZE(pcs), 1);
00411
00412
00413 char buffer[ABSL_ARRAYSIZE(pcs) * 24];
00414 int pos = snprintf(buffer, sizeof (buffer), " @");
00415 for (int i = 0; i != n; i++) {
00416 pos += snprintf(&buffer[pos], sizeof (buffer) - pos, " %p", pcs[i]);
00417 }
00418 ABSL_RAW_LOG(INFO, "%s%p %s %s", event_properties[ev].msg, obj,
00419 (e == nullptr ? "" : e->name), buffer);
00420 }
00421 const int flags = event_properties[ev].flags;
00422 if ((flags & SYNCH_F_LCK) != 0 && e != nullptr && e->invariant != nullptr) {
00423
00424
00425
00426
00427
00428
00429 struct local {
00430 static bool pred(SynchEvent *ev) {
00431 (*ev->invariant)(ev->arg);
00432 return false;
00433 }
00434 };
00435 Condition cond(&local::pred, e);
00436 Mutex *mu = static_cast<Mutex *>(obj);
00437 const bool locking = (flags & SYNCH_F_UNLOCK) == 0;
00438 const bool trylock = (flags & SYNCH_F_TRY) != 0;
00439 const bool read_lock = (flags & SYNCH_F_R) != 0;
00440 EvalConditionAnnotated(&cond, mu, locking, trylock, read_lock);
00441 }
00442 UnrefSynchEvent(e);
00443 }
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461 struct SynchWaitParams {
00462 SynchWaitParams(Mutex::MuHow how_arg, const Condition *cond_arg,
00463 KernelTimeout timeout_arg, Mutex *cvmu_arg,
00464 PerThreadSynch *thread_arg,
00465 std::atomic<intptr_t> *cv_word_arg)
00466 : how(how_arg),
00467 cond(cond_arg),
00468 timeout(timeout_arg),
00469 cvmu(cvmu_arg),
00470 thread(thread_arg),
00471 cv_word(cv_word_arg),
00472 contention_start_cycles(base_internal::CycleClock::Now()) {}
00473
00474 const Mutex::MuHow how;
00475 const Condition *cond;
00476
00477
00478 KernelTimeout timeout;
00479
00480
00481 Mutex *const cvmu;
00482 PerThreadSynch *const thread;
00483
00484
00485
00486 std::atomic<intptr_t> *cv_word;
00487
00488 int64_t contention_start_cycles;
00489
00490 };
00491
00492 struct SynchLocksHeld {
00493 int n;
00494 bool overflow;
00495 struct {
00496 Mutex *mu;
00497 int32_t count;
00498 GraphId id;
00499 } locks[40];
00500
00501
00502
00503
00504 };
00505
00506
00507
00508 static PerThreadSynch *const kPerThreadSynchNull =
00509 reinterpret_cast<PerThreadSynch *>(1);
00510
00511 static SynchLocksHeld *LocksHeldAlloc() {
00512 SynchLocksHeld *ret = reinterpret_cast<SynchLocksHeld *>(
00513 base_internal::LowLevelAlloc::Alloc(sizeof(SynchLocksHeld)));
00514 ret->n = 0;
00515 ret->overflow = false;
00516 return ret;
00517 }
00518
00519
00520 static PerThreadSynch *Synch_GetPerThread() {
00521 ThreadIdentity *identity = GetOrCreateCurrentThreadIdentity();
00522 return &identity->per_thread_synch;
00523 }
00524
00525 static PerThreadSynch *Synch_GetPerThreadAnnotated(Mutex *mu) {
00526 if (mu) {
00527 ABSL_TSAN_MUTEX_PRE_DIVERT(mu, 0);
00528 }
00529 PerThreadSynch *w = Synch_GetPerThread();
00530 if (mu) {
00531 ABSL_TSAN_MUTEX_POST_DIVERT(mu, 0);
00532 }
00533 return w;
00534 }
00535
00536 static SynchLocksHeld *Synch_GetAllLocks() {
00537 PerThreadSynch *s = Synch_GetPerThread();
00538 if (s->all_locks == nullptr) {
00539 s->all_locks = LocksHeldAlloc();
00540 }
00541 return s->all_locks;
00542 }
00543
00544
00545 inline void Mutex::IncrementSynchSem(Mutex *mu, PerThreadSynch *w) {
00546 if (mu) {
00547 ABSL_TSAN_MUTEX_PRE_DIVERT(mu, 0);
00548 }
00549 PerThreadSem::Post(w->thread_identity());
00550 if (mu) {
00551 ABSL_TSAN_MUTEX_POST_DIVERT(mu, 0);
00552 }
00553 }
00554
00555
00556 bool Mutex::DecrementSynchSem(Mutex *mu, PerThreadSynch *w, KernelTimeout t) {
00557 if (mu) {
00558 ABSL_TSAN_MUTEX_PRE_DIVERT(mu, 0);
00559 }
00560 assert(w == Synch_GetPerThread());
00561 static_cast<void>(w);
00562 bool res = PerThreadSem::Wait(t);
00563 if (mu) {
00564 ABSL_TSAN_MUTEX_POST_DIVERT(mu, 0);
00565 }
00566 return res;
00567 }
00568
00569
00570
00571
00572
00573
00574
00575 void Mutex::InternalAttemptToUseMutexInFatalSignalHandler() {
00576
00577 ThreadIdentity *identity = CurrentThreadIdentityIfPresent();
00578 if (identity != nullptr) {
00579 identity->per_thread_synch.suppress_fatal_errors = true;
00580 }
00581
00582 synch_deadlock_detection.store(OnDeadlockCycle::kIgnore,
00583 std::memory_order_release);
00584 }
00585
00586
00587
00588
00589
00590
00591 static absl::Time DeadlineFromTimeout(absl::Duration timeout) {
00592 #ifndef _WIN32
00593 struct timeval tv;
00594 gettimeofday(&tv, nullptr);
00595 return absl::TimeFromTimeval(tv) + timeout;
00596 #else
00597 return absl::Now() + timeout;
00598 #endif
00599 }
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611 static const intptr_t kMuReader = 0x0001L;
00612 static const intptr_t kMuDesig = 0x0002L;
00613 static const intptr_t kMuWait = 0x0004L;
00614 static const intptr_t kMuWriter = 0x0008L;
00615 static const intptr_t kMuEvent = 0x0010L;
00616
00617
00618
00619
00620
00621
00622
00623
00624 static const intptr_t kMuWrWait = 0x0020L;
00625
00626 static const intptr_t kMuSpin = 0x0040L;
00627 static const intptr_t kMuLow = 0x00ffL;
00628 static const intptr_t kMuHigh = ~kMuLow;
00629
00630
00631 enum {
00632 kGdbMuSpin = kMuSpin,
00633 kGdbMuEvent = kMuEvent,
00634 kGdbMuWait = kMuWait,
00635 kGdbMuWriter = kMuWriter,
00636 kGdbMuDesig = kMuDesig,
00637 kGdbMuWrWait = kMuWrWait,
00638 kGdbMuReader = kMuReader,
00639 kGdbMuLow = kMuLow,
00640 };
00641
00642
00643
00644
00645
00646
00647
00648
00649 static const intptr_t kMuOne = 0x0100;
00650
00651
00652 static const int kMuHasBlocked = 0x01;
00653 static const int kMuIsCond = 0x02;
00654
00655 static_assert(PerThreadSynch::kAlignment > kMuLow,
00656 "PerThreadSynch::kAlignment must be greater than kMuLow");
00657
00658
00659
00660 struct MuHowS {
00661
00662
00663
00664 intptr_t fast_need_zero;
00665 intptr_t fast_or;
00666 intptr_t fast_add;
00667
00668 intptr_t slow_need_zero;
00669
00670 intptr_t slow_inc_need_zero;
00671
00672
00673
00674
00675
00676 };
00677
00678 static const MuHowS kSharedS = {
00679
00680 kMuWriter | kMuWait | kMuEvent,
00681 kMuReader,
00682 kMuOne,
00683 kMuWriter | kMuWait,
00684 kMuSpin | kMuWriter | kMuWrWait,
00685 };
00686 static const MuHowS kExclusiveS = {
00687
00688 kMuWriter | kMuReader | kMuEvent,
00689 kMuWriter,
00690 0,
00691 kMuWriter | kMuReader,
00692 ~static_cast<intptr_t>(0),
00693 };
00694 static const Mutex::MuHow kShared = &kSharedS;
00695 static const Mutex::MuHow kExclusive = &kExclusiveS;
00696
00697 #ifdef NDEBUG
00698 static constexpr bool kDebugMode = false;
00699 #else
00700 static constexpr bool kDebugMode = true;
00701 #endif
00702
00703 #ifdef THREAD_SANITIZER
00704 static unsigned TsanFlags(Mutex::MuHow how) {
00705 return how == kShared ? __tsan_mutex_read_lock : 0;
00706 }
00707 #endif
00708
00709 static bool DebugOnlyIsExiting() {
00710 return false;
00711 }
00712
00713 Mutex::~Mutex() {
00714 intptr_t v = mu_.load(std::memory_order_relaxed);
00715 if ((v & kMuEvent) != 0 && !DebugOnlyIsExiting()) {
00716 ForgetSynchEvent(&this->mu_, kMuEvent, kMuSpin);
00717 }
00718 if (kDebugMode) {
00719 this->ForgetDeadlockInfo();
00720 }
00721 ABSL_TSAN_MUTEX_DESTROY(this, __tsan_mutex_not_static);
00722 }
00723
00724 void Mutex::EnableDebugLog(const char *name) {
00725 SynchEvent *e = EnsureSynchEvent(&this->mu_, name, kMuEvent, kMuSpin);
00726 e->log = true;
00727 UnrefSynchEvent(e);
00728 }
00729
00730 void EnableMutexInvariantDebugging(bool enabled) {
00731 synch_check_invariants.store(enabled, std::memory_order_release);
00732 }
00733
00734 void Mutex::EnableInvariantDebugging(void (*invariant)(void *),
00735 void *arg) {
00736 if (synch_check_invariants.load(std::memory_order_acquire) &&
00737 invariant != nullptr) {
00738 SynchEvent *e = EnsureSynchEvent(&this->mu_, nullptr, kMuEvent, kMuSpin);
00739 e->invariant = invariant;
00740 e->arg = arg;
00741 UnrefSynchEvent(e);
00742 }
00743 }
00744
00745 void SetMutexDeadlockDetectionMode(OnDeadlockCycle mode) {
00746 synch_deadlock_detection.store(mode, std::memory_order_release);
00747 }
00748
00749
00750
00751
00752 static bool MuSameCondition(PerThreadSynch *x, PerThreadSynch *y) {
00753 return x->waitp->how == y->waitp->how &&
00754 Condition::GuaranteedEqual(x->waitp->cond, y->waitp->cond);
00755 }
00756
00757
00758
00759 static inline PerThreadSynch *GetPerThreadSynch(intptr_t v) {
00760 return reinterpret_cast<PerThreadSynch *>(v & kMuHigh);
00761 }
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792
00793
00794
00795
00796
00797
00798
00799
00800
00801
00802
00803
00804
00805
00806
00807
00808
00809
00810
00811
00812
00813
00814
00815
00816
00817 static PerThreadSynch *Skip(PerThreadSynch *x) {
00818 PerThreadSynch *x0 = nullptr;
00819 PerThreadSynch *x1 = x;
00820 PerThreadSynch *x2 = x->skip;
00821 if (x2 != nullptr) {
00822
00823
00824 while ((x0 = x1, x1 = x2, x2 = x2->skip) != nullptr) {
00825 x0->skip = x2;
00826 }
00827 x->skip = x1;
00828 }
00829 return x1;
00830 }
00831
00832
00833
00834
00835
00836 static void FixSkip(PerThreadSynch *ancestor, PerThreadSynch *to_be_removed) {
00837 if (ancestor->skip == to_be_removed) {
00838 if (to_be_removed->skip != nullptr) {
00839 ancestor->skip = to_be_removed->skip;
00840 } else if (ancestor->next != to_be_removed) {
00841 ancestor->skip = ancestor->next;
00842 } else {
00843 ancestor->skip = nullptr;
00844 }
00845 }
00846 }
00847
00848 static void CondVarEnqueue(SynchWaitParams *waitp);
00849
00850
00851
00852
00853
00854
00855
00856
00857
00858
00859
00860
00861
00862
00863
00864
00865
00866
00867
00868
00869 static PerThreadSynch *Enqueue(PerThreadSynch *head,
00870 SynchWaitParams *waitp, intptr_t mu, int flags) {
00871
00872
00873 if (waitp->cv_word != nullptr) {
00874 CondVarEnqueue(waitp);
00875 return head;
00876 }
00877
00878 PerThreadSynch *s = waitp->thread;
00879 ABSL_RAW_CHECK(
00880 s->waitp == nullptr ||
00881 s->waitp == waitp ||
00882 s->suppress_fatal_errors,
00883 "detected illegal recursion into Mutex code");
00884 s->waitp = waitp;
00885 s->skip = nullptr;
00886 s->may_skip = true;
00887 s->wake = false;
00888 s->cond_waiter = ((flags & kMuIsCond) != 0);
00889 if (head == nullptr) {
00890 s->next = s;
00891 s->readers = mu;
00892 s->maybe_unlocking = false;
00893 head = s;
00894 } else {
00895 PerThreadSynch *enqueue_after = nullptr;
00896 #ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM
00897 int64_t now_cycles = base_internal::CycleClock::Now();
00898 if (s->next_priority_read_cycles < now_cycles) {
00899
00900
00901
00902 int policy;
00903 struct sched_param param;
00904 const int err = pthread_getschedparam(pthread_self(), &policy, ¶m);
00905 if (err != 0) {
00906 ABSL_RAW_LOG(ERROR, "pthread_getschedparam failed: %d", err);
00907 } else {
00908 s->priority = param.sched_priority;
00909 s->next_priority_read_cycles =
00910 now_cycles +
00911 static_cast<int64_t>(base_internal::CycleClock::Frequency());
00912 }
00913 }
00914 if (s->priority > head->priority) {
00915
00916 if (!head->maybe_unlocking) {
00917
00918
00919
00920
00921 PerThreadSynch *advance_to = head;
00922 PerThreadSynch *cur;
00923 do {
00924 enqueue_after = advance_to;
00925 cur = enqueue_after->next;
00926 advance_to = Skip(cur);
00927
00928 if (advance_to != cur && s->priority > advance_to->priority &&
00929 MuSameCondition(s, cur)) {
00930
00931
00932
00933 advance_to = cur;
00934 }
00935 } while (s->priority <= advance_to->priority);
00936
00937
00938 } else if (waitp->how == kExclusive &&
00939 Condition::GuaranteedEqual(waitp->cond, nullptr)) {
00940
00941
00942
00943 enqueue_after = head;
00944 }
00945 }
00946 #endif
00947 if (enqueue_after != nullptr) {
00948 s->next = enqueue_after->next;
00949 enqueue_after->next = s;
00950
00951
00952
00953
00954
00955
00956
00957
00958 ABSL_RAW_CHECK(
00959 enqueue_after->skip == nullptr || MuSameCondition(enqueue_after, s),
00960 "Mutex Enqueue failure");
00961
00962 if (enqueue_after != head && enqueue_after->may_skip &&
00963 MuSameCondition(enqueue_after, enqueue_after->next)) {
00964
00965 enqueue_after->skip = enqueue_after->next;
00966 }
00967 if (MuSameCondition(s, s->next)) {
00968 s->skip = s->next;
00969 }
00970 } else {
00971
00972
00973 s->next = head->next;
00974 head->next = s;
00975 s->readers = head->readers;
00976 s->maybe_unlocking = head->maybe_unlocking;
00977 if (head->may_skip && MuSameCondition(head, s)) {
00978
00979 head->skip = s;
00980 }
00981 head = s;
00982 }
00983 }
00984 s->state.store(PerThreadSynch::kQueued, std::memory_order_relaxed);
00985 return head;
00986 }
00987
00988
00989
00990
00991
00992 static PerThreadSynch *Dequeue(PerThreadSynch *head, PerThreadSynch *pw) {
00993 PerThreadSynch *w = pw->next;
00994 pw->next = w->next;
00995 if (head == w) {
00996 head = (pw == w) ? nullptr : pw;
00997 } else if (pw != head && MuSameCondition(pw, pw->next)) {
00998
00999 if (pw->next->skip !=
01000 nullptr) {
01001 pw->skip = pw->next->skip;
01002 } else {
01003 pw->skip = pw->next;
01004 }
01005 }
01006 return head;
01007 }
01008
01009
01010
01011
01012
01013
01014
01015 static PerThreadSynch *DequeueAllWakeable(PerThreadSynch *head,
01016 PerThreadSynch *pw,
01017 PerThreadSynch **wake_tail) {
01018 PerThreadSynch *orig_h = head;
01019 PerThreadSynch *w = pw->next;
01020 bool skipped = false;
01021 do {
01022 if (w->wake) {
01023 ABSL_RAW_CHECK(pw->skip == nullptr, "bad skip in DequeueAllWakeable");
01024
01025
01026
01027 head = Dequeue(head, pw);
01028 w->next = *wake_tail;
01029 *wake_tail = w;
01030 wake_tail = &w->next;
01031 if (w->waitp->how == kExclusive) {
01032 break;
01033 }
01034 } else {
01035 pw = Skip(w);
01036 skipped = true;
01037 }
01038 w = pw->next;
01039
01040
01041
01042
01043
01044
01045
01046
01047 } while (orig_h == head && (pw != head || !skipped));
01048 return head;
01049 }
01050
01051
01052
01053 void Mutex::TryRemove(PerThreadSynch *s) {
01054 intptr_t v = mu_.load(std::memory_order_relaxed);
01055
01056 if ((v & (kMuWait | kMuSpin | kMuWriter | kMuReader)) == kMuWait &&
01057 mu_.compare_exchange_strong(v, v | kMuSpin | kMuWriter,
01058 std::memory_order_acquire,
01059 std::memory_order_relaxed)) {
01060 PerThreadSynch *h = GetPerThreadSynch(v);
01061 if (h != nullptr) {
01062 PerThreadSynch *pw = h;
01063 PerThreadSynch *w;
01064 if ((w = pw->next) != s) {
01065 do {
01066 if (!MuSameCondition(s, w)) {
01067 pw = Skip(w);
01068
01069
01070
01071 } else {
01072 FixSkip(w, s);
01073 pw = w;
01074 }
01075
01076
01077 } while ((w = pw->next) != s && pw != h);
01078 }
01079 if (w == s) {
01080
01081
01082 h = Dequeue(h, pw);
01083 s->next = nullptr;
01084 s->state.store(PerThreadSynch::kAvailable, std::memory_order_release);
01085 }
01086 }
01087 intptr_t nv;
01088 do {
01089 v = mu_.load(std::memory_order_relaxed);
01090 nv = v & (kMuDesig | kMuEvent);
01091 if (h != nullptr) {
01092 nv |= kMuWait | reinterpret_cast<intptr_t>(h);
01093 h->readers = 0;
01094 h->maybe_unlocking = false;
01095 }
01096 } while (!mu_.compare_exchange_weak(v, nv,
01097 std::memory_order_release,
01098 std::memory_order_relaxed));
01099 }
01100 }
01101
01102
01103
01104
01105
01106
01107 ABSL_XRAY_LOG_ARGS(1) void Mutex::Block(PerThreadSynch *s) {
01108 while (s->state.load(std::memory_order_acquire) == PerThreadSynch::kQueued) {
01109 if (!DecrementSynchSem(this, s, s->waitp->timeout)) {
01110
01111
01112
01113
01114
01115 this->TryRemove(s);
01116 int c = 0;
01117 while (s->next != nullptr) {
01118 c = Delay(c, GENTLE);
01119 this->TryRemove(s);
01120 }
01121 if (kDebugMode) {
01122
01123
01124 this->TryRemove(s);
01125 }
01126 s->waitp->timeout = KernelTimeout::Never();
01127 s->waitp->cond = nullptr;
01128 }
01129 }
01130 ABSL_RAW_CHECK(s->waitp != nullptr || s->suppress_fatal_errors,
01131 "detected illegal recursion in Mutex code");
01132 s->waitp = nullptr;
01133 }
01134
01135
01136 PerThreadSynch *Mutex::Wakeup(PerThreadSynch *w) {
01137 PerThreadSynch *next = w->next;
01138 w->next = nullptr;
01139 w->state.store(PerThreadSynch::kAvailable, std::memory_order_release);
01140 IncrementSynchSem(this, w);
01141
01142 return next;
01143 }
01144
01145 static GraphId GetGraphIdLocked(Mutex *mu)
01146 EXCLUSIVE_LOCKS_REQUIRED(deadlock_graph_mu) {
01147 if (!deadlock_graph) {
01148 deadlock_graph =
01149 new (base_internal::LowLevelAlloc::Alloc(sizeof(*deadlock_graph)))
01150 GraphCycles;
01151 }
01152 return deadlock_graph->GetId(mu);
01153 }
01154
01155 static GraphId GetGraphId(Mutex *mu) LOCKS_EXCLUDED(deadlock_graph_mu) {
01156 deadlock_graph_mu.Lock();
01157 GraphId id = GetGraphIdLocked(mu);
01158 deadlock_graph_mu.Unlock();
01159 return id;
01160 }
01161
01162
01163
01164
01165 static void LockEnter(Mutex* mu, GraphId id, SynchLocksHeld *held_locks) {
01166 int n = held_locks->n;
01167 int i = 0;
01168 while (i != n && held_locks->locks[i].id != id) {
01169 i++;
01170 }
01171 if (i == n) {
01172 if (n == ABSL_ARRAYSIZE(held_locks->locks)) {
01173 held_locks->overflow = true;
01174 } else {
01175 held_locks->locks[i].mu = mu;
01176 held_locks->locks[i].count = 1;
01177 held_locks->locks[i].id = id;
01178 held_locks->n = n + 1;
01179 }
01180 } else {
01181 held_locks->locks[i].count++;
01182 }
01183 }
01184
01185
01186
01187
01188
01189 static void LockLeave(Mutex* mu, GraphId id, SynchLocksHeld *held_locks) {
01190 int n = held_locks->n;
01191 int i = 0;
01192 while (i != n && held_locks->locks[i].id != id) {
01193 i++;
01194 }
01195 if (i == n) {
01196 if (!held_locks->overflow) {
01197
01198
01199 i = 0;
01200 while (i != n && held_locks->locks[i].mu != mu) {
01201 i++;
01202 }
01203 if (i == n) {
01204 SynchEvent *mu_events = GetSynchEvent(mu);
01205 ABSL_RAW_LOG(FATAL,
01206 "thread releasing lock it does not hold: %p %s; "
01207 ,
01208 static_cast<void *>(mu),
01209 mu_events == nullptr ? "" : mu_events->name);
01210 }
01211 }
01212 } else if (held_locks->locks[i].count == 1) {
01213 held_locks->n = n - 1;
01214 held_locks->locks[i] = held_locks->locks[n - 1];
01215 held_locks->locks[n - 1].id = InvalidGraphId();
01216 held_locks->locks[n - 1].mu =
01217 nullptr;
01218 } else {
01219 assert(held_locks->locks[i].count > 0);
01220 held_locks->locks[i].count--;
01221 }
01222 }
01223
01224
01225 static inline void DebugOnlyLockEnter(Mutex *mu) {
01226 if (kDebugMode) {
01227 if (synch_deadlock_detection.load(std::memory_order_acquire) !=
01228 OnDeadlockCycle::kIgnore) {
01229 LockEnter(mu, GetGraphId(mu), Synch_GetAllLocks());
01230 }
01231 }
01232 }
01233
01234
01235 static inline void DebugOnlyLockEnter(Mutex *mu, GraphId id) {
01236 if (kDebugMode) {
01237 if (synch_deadlock_detection.load(std::memory_order_acquire) !=
01238 OnDeadlockCycle::kIgnore) {
01239 LockEnter(mu, id, Synch_GetAllLocks());
01240 }
01241 }
01242 }
01243
01244
01245 static inline void DebugOnlyLockLeave(Mutex *mu) {
01246 if (kDebugMode) {
01247 if (synch_deadlock_detection.load(std::memory_order_acquire) !=
01248 OnDeadlockCycle::kIgnore) {
01249 LockLeave(mu, GetGraphId(mu), Synch_GetAllLocks());
01250 }
01251 }
01252 }
01253
01254 static char *StackString(void **pcs, int n, char *buf, int maxlen,
01255 bool symbolize) {
01256 static const int kSymLen = 200;
01257 char sym[kSymLen];
01258 int len = 0;
01259 for (int i = 0; i != n; i++) {
01260 if (symbolize) {
01261 if (!symbolizer(pcs[i], sym, kSymLen)) {
01262 sym[0] = '\0';
01263 }
01264 snprintf(buf + len, maxlen - len, "%s\t@ %p %s\n",
01265 (i == 0 ? "\n" : ""),
01266 pcs[i], sym);
01267 } else {
01268 snprintf(buf + len, maxlen - len, " %p", pcs[i]);
01269 }
01270 len += strlen(&buf[len]);
01271 }
01272 return buf;
01273 }
01274
01275 static char *CurrentStackString(char *buf, int maxlen, bool symbolize) {
01276 void *pcs[40];
01277 return StackString(pcs, absl::GetStackTrace(pcs, ABSL_ARRAYSIZE(pcs), 2), buf,
01278 maxlen, symbolize);
01279 }
01280
01281 namespace {
01282 enum { kMaxDeadlockPathLen = 10 };
01283
01284
01285
01286 struct DeadlockReportBuffers {
01287 char buf[6100];
01288 GraphId path[kMaxDeadlockPathLen];
01289 };
01290
01291 struct ScopedDeadlockReportBuffers {
01292 ScopedDeadlockReportBuffers() {
01293 b = reinterpret_cast<DeadlockReportBuffers *>(
01294 base_internal::LowLevelAlloc::Alloc(sizeof(*b)));
01295 }
01296 ~ScopedDeadlockReportBuffers() { base_internal::LowLevelAlloc::Free(b); }
01297 DeadlockReportBuffers *b;
01298 };
01299
01300
01301 int GetStack(void** stack, int max_depth) {
01302 return absl::GetStackTrace(stack, max_depth, 3);
01303 }
01304 }
01305
01306
01307
01308 static GraphId DeadlockCheck(Mutex *mu) {
01309 if (synch_deadlock_detection.load(std::memory_order_acquire) ==
01310 OnDeadlockCycle::kIgnore) {
01311 return InvalidGraphId();
01312 }
01313
01314 SynchLocksHeld *all_locks = Synch_GetAllLocks();
01315
01316 absl::base_internal::SpinLockHolder lock(&deadlock_graph_mu);
01317 const GraphId mu_id = GetGraphIdLocked(mu);
01318
01319 if (all_locks->n == 0) {
01320
01321
01322
01323
01324 return mu_id;
01325 }
01326
01327
01328
01329
01330
01331 deadlock_graph->UpdateStackTrace(mu_id, all_locks->n + 1, GetStack);
01332
01333
01334 for (int i = 0; i != all_locks->n; i++) {
01335 const GraphId other_node_id = all_locks->locks[i].id;
01336 const Mutex *other =
01337 static_cast<const Mutex *>(deadlock_graph->Ptr(other_node_id));
01338 if (other == nullptr) {
01339
01340 continue;
01341 }
01342
01343
01344 if (!deadlock_graph->InsertEdge(other_node_id, mu_id)) {
01345 ScopedDeadlockReportBuffers scoped_buffers;
01346 DeadlockReportBuffers *b = scoped_buffers.b;
01347 static int number_of_reported_deadlocks = 0;
01348 number_of_reported_deadlocks++;
01349
01350 bool symbolize = number_of_reported_deadlocks <= 2;
01351 ABSL_RAW_LOG(ERROR, "Potential Mutex deadlock: %s",
01352 CurrentStackString(b->buf, sizeof (b->buf), symbolize));
01353 int len = 0;
01354 for (int j = 0; j != all_locks->n; j++) {
01355 void* pr = deadlock_graph->Ptr(all_locks->locks[j].id);
01356 if (pr != nullptr) {
01357 snprintf(b->buf + len, sizeof (b->buf) - len, " %p", pr);
01358 len += static_cast<int>(strlen(&b->buf[len]));
01359 }
01360 }
01361 ABSL_RAW_LOG(ERROR, "Acquiring %p Mutexes held: %s",
01362 static_cast<void *>(mu), b->buf);
01363 ABSL_RAW_LOG(ERROR, "Cycle: ");
01364 int path_len = deadlock_graph->FindPath(
01365 mu_id, other_node_id, ABSL_ARRAYSIZE(b->path), b->path);
01366 for (int j = 0; j != path_len; j++) {
01367 GraphId id = b->path[j];
01368 Mutex *path_mu = static_cast<Mutex *>(deadlock_graph->Ptr(id));
01369 if (path_mu == nullptr) continue;
01370 void** stack;
01371 int depth = deadlock_graph->GetStackTrace(id, &stack);
01372 snprintf(b->buf, sizeof(b->buf),
01373 "mutex@%p stack: ", static_cast<void *>(path_mu));
01374 StackString(stack, depth, b->buf + strlen(b->buf),
01375 static_cast<int>(sizeof(b->buf) - strlen(b->buf)),
01376 symbolize);
01377 ABSL_RAW_LOG(ERROR, "%s", b->buf);
01378 }
01379 if (synch_deadlock_detection.load(std::memory_order_acquire) ==
01380 OnDeadlockCycle::kAbort) {
01381 deadlock_graph_mu.Unlock();
01382 ABSL_RAW_LOG(FATAL, "dying due to potential deadlock");
01383 return mu_id;
01384 }
01385 break;
01386 }
01387 }
01388
01389 return mu_id;
01390 }
01391
01392
01393
01394 static inline GraphId DebugOnlyDeadlockCheck(Mutex *mu) {
01395 if (kDebugMode && synch_deadlock_detection.load(std::memory_order_acquire) !=
01396 OnDeadlockCycle::kIgnore) {
01397 return DeadlockCheck(mu);
01398 } else {
01399 return InvalidGraphId();
01400 }
01401 }
01402
01403 void Mutex::ForgetDeadlockInfo() {
01404 if (kDebugMode && synch_deadlock_detection.load(std::memory_order_acquire) !=
01405 OnDeadlockCycle::kIgnore) {
01406 deadlock_graph_mu.Lock();
01407 if (deadlock_graph != nullptr) {
01408 deadlock_graph->RemoveNode(this);
01409 }
01410 deadlock_graph_mu.Unlock();
01411 }
01412 }
01413
01414 void Mutex::AssertNotHeld() const {
01415
01416
01417 if (kDebugMode &&
01418 (mu_.load(std::memory_order_relaxed) & (kMuWriter | kMuReader)) != 0 &&
01419 synch_deadlock_detection.load(std::memory_order_acquire) !=
01420 OnDeadlockCycle::kIgnore) {
01421 GraphId id = GetGraphId(const_cast<Mutex *>(this));
01422 SynchLocksHeld *locks = Synch_GetAllLocks();
01423 for (int i = 0; i != locks->n; i++) {
01424 if (locks->locks[i].id == id) {
01425 SynchEvent *mu_events = GetSynchEvent(this);
01426 ABSL_RAW_LOG(FATAL, "thread should not hold mutex %p %s",
01427 static_cast<const void *>(this),
01428 (mu_events == nullptr ? "" : mu_events->name));
01429 }
01430 }
01431 }
01432 }
01433
01434
01435
01436 static bool TryAcquireWithSpinning(std::atomic<intptr_t>* mu) {
01437 int c = mutex_globals.spinloop_iterations;
01438 int result = -1;
01439
01440 do {
01441 intptr_t v = mu->load(std::memory_order_relaxed);
01442 if ((v & (kMuReader|kMuEvent)) != 0) {
01443 result = 0;
01444 } else if (((v & kMuWriter) == 0) &&
01445 mu->compare_exchange_strong(v, kMuWriter | v,
01446 std::memory_order_acquire,
01447 std::memory_order_relaxed)) {
01448 result = 1;
01449 }
01450 } while (result == -1 && --c > 0);
01451 return result == 1;
01452 }
01453
01454 ABSL_XRAY_LOG_ARGS(1) void Mutex::Lock() {
01455 ABSL_TSAN_MUTEX_PRE_LOCK(this, 0);
01456 GraphId id = DebugOnlyDeadlockCheck(this);
01457 intptr_t v = mu_.load(std::memory_order_relaxed);
01458
01459 if ((v & (kMuWriter | kMuReader | kMuEvent)) != 0 ||
01460 !mu_.compare_exchange_strong(v, kMuWriter | v,
01461 std::memory_order_acquire,
01462 std::memory_order_relaxed)) {
01463
01464 if (!TryAcquireWithSpinning(&this->mu_)) {
01465 this->LockSlow(kExclusive, nullptr, 0);
01466 }
01467 }
01468 DebugOnlyLockEnter(this, id);
01469 ABSL_TSAN_MUTEX_POST_LOCK(this, 0, 0);
01470 }
01471
01472 ABSL_XRAY_LOG_ARGS(1) void Mutex::ReaderLock() {
01473 ABSL_TSAN_MUTEX_PRE_LOCK(this, __tsan_mutex_read_lock);
01474 GraphId id = DebugOnlyDeadlockCheck(this);
01475 intptr_t v = mu_.load(std::memory_order_relaxed);
01476
01477 if ((v & (kMuWriter | kMuWait | kMuEvent)) != 0 ||
01478 !mu_.compare_exchange_strong(v, (kMuReader | v) + kMuOne,
01479 std::memory_order_acquire,
01480 std::memory_order_relaxed)) {
01481 this->LockSlow(kShared, nullptr, 0);
01482 }
01483 DebugOnlyLockEnter(this, id);
01484 ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_read_lock, 0);
01485 }
01486
01487 void Mutex::LockWhen(const Condition &cond) {
01488 ABSL_TSAN_MUTEX_PRE_LOCK(this, 0);
01489 GraphId id = DebugOnlyDeadlockCheck(this);
01490 this->LockSlow(kExclusive, &cond, 0);
01491 DebugOnlyLockEnter(this, id);
01492 ABSL_TSAN_MUTEX_POST_LOCK(this, 0, 0);
01493 }
01494
01495 bool Mutex::LockWhenWithTimeout(const Condition &cond, absl::Duration timeout) {
01496 return LockWhenWithDeadline(cond, DeadlineFromTimeout(timeout));
01497 }
01498
01499 bool Mutex::LockWhenWithDeadline(const Condition &cond, absl::Time deadline) {
01500 ABSL_TSAN_MUTEX_PRE_LOCK(this, 0);
01501 GraphId id = DebugOnlyDeadlockCheck(this);
01502 bool res = LockSlowWithDeadline(kExclusive, &cond,
01503 KernelTimeout(deadline), 0);
01504 DebugOnlyLockEnter(this, id);
01505 ABSL_TSAN_MUTEX_POST_LOCK(this, 0, 0);
01506 return res;
01507 }
01508
01509 void Mutex::ReaderLockWhen(const Condition &cond) {
01510 ABSL_TSAN_MUTEX_PRE_LOCK(this, __tsan_mutex_read_lock);
01511 GraphId id = DebugOnlyDeadlockCheck(this);
01512 this->LockSlow(kShared, &cond, 0);
01513 DebugOnlyLockEnter(this, id);
01514 ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_read_lock, 0);
01515 }
01516
01517 bool Mutex::ReaderLockWhenWithTimeout(const Condition &cond,
01518 absl::Duration timeout) {
01519 return ReaderLockWhenWithDeadline(cond, DeadlineFromTimeout(timeout));
01520 }
01521
01522 bool Mutex::ReaderLockWhenWithDeadline(const Condition &cond,
01523 absl::Time deadline) {
01524 ABSL_TSAN_MUTEX_PRE_LOCK(this, __tsan_mutex_read_lock);
01525 GraphId id = DebugOnlyDeadlockCheck(this);
01526 bool res = LockSlowWithDeadline(kShared, &cond, KernelTimeout(deadline), 0);
01527 DebugOnlyLockEnter(this, id);
01528 ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_read_lock, 0);
01529 return res;
01530 }
01531
01532 void Mutex::Await(const Condition &cond) {
01533 if (cond.Eval()) {
01534 if (kDebugMode) {
01535 this->AssertReaderHeld();
01536 }
01537 } else {
01538 ABSL_RAW_CHECK(this->AwaitCommon(cond, KernelTimeout::Never()),
01539 "condition untrue on return from Await");
01540 }
01541 }
01542
01543 bool Mutex::AwaitWithTimeout(const Condition &cond, absl::Duration timeout) {
01544 return AwaitWithDeadline(cond, DeadlineFromTimeout(timeout));
01545 }
01546
01547 bool Mutex::AwaitWithDeadline(const Condition &cond, absl::Time deadline) {
01548 if (cond.Eval()) {
01549 if (kDebugMode) {
01550 this->AssertReaderHeld();
01551 }
01552 return true;
01553 }
01554
01555 KernelTimeout t{deadline};
01556 bool res = this->AwaitCommon(cond, t);
01557 ABSL_RAW_CHECK(res || t.has_timeout(),
01558 "condition untrue on return from Await");
01559 return res;
01560 }
01561
01562 bool Mutex::AwaitCommon(const Condition &cond, KernelTimeout t) {
01563 this->AssertReaderHeld();
01564 MuHow how =
01565 (mu_.load(std::memory_order_relaxed) & kMuWriter) ? kExclusive : kShared;
01566 ABSL_TSAN_MUTEX_PRE_UNLOCK(this, TsanFlags(how));
01567 SynchWaitParams waitp(
01568 how, &cond, t, nullptr , Synch_GetPerThreadAnnotated(this),
01569 nullptr );
01570 int flags = kMuHasBlocked;
01571 if (!Condition::GuaranteedEqual(&cond, nullptr)) {
01572 flags |= kMuIsCond;
01573 }
01574 this->UnlockSlow(&waitp);
01575 this->Block(waitp.thread);
01576 ABSL_TSAN_MUTEX_POST_UNLOCK(this, TsanFlags(how));
01577 ABSL_TSAN_MUTEX_PRE_LOCK(this, TsanFlags(how));
01578 this->LockSlowLoop(&waitp, flags);
01579 bool res = waitp.cond != nullptr ||
01580 EvalConditionAnnotated(&cond, this, true, false, how == kShared);
01581 ABSL_TSAN_MUTEX_POST_LOCK(this, TsanFlags(how), 0);
01582 return res;
01583 }
01584
01585 ABSL_XRAY_LOG_ARGS(1) bool Mutex::TryLock() {
01586 ABSL_TSAN_MUTEX_PRE_LOCK(this, __tsan_mutex_try_lock);
01587 intptr_t v = mu_.load(std::memory_order_relaxed);
01588 if ((v & (kMuWriter | kMuReader | kMuEvent)) == 0 &&
01589 mu_.compare_exchange_strong(v, kMuWriter | v,
01590 std::memory_order_acquire,
01591 std::memory_order_relaxed)) {
01592 DebugOnlyLockEnter(this);
01593 ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_try_lock, 0);
01594 return true;
01595 }
01596 if ((v & kMuEvent) != 0) {
01597 if ((v & kExclusive->slow_need_zero) == 0 &&
01598 mu_.compare_exchange_strong(
01599 v, (kExclusive->fast_or | v) + kExclusive->fast_add,
01600 std::memory_order_acquire, std::memory_order_relaxed)) {
01601 DebugOnlyLockEnter(this);
01602 PostSynchEvent(this, SYNCH_EV_TRYLOCK_SUCCESS);
01603 ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_try_lock, 0);
01604 return true;
01605 } else {
01606 PostSynchEvent(this, SYNCH_EV_TRYLOCK_FAILED);
01607 }
01608 }
01609 ABSL_TSAN_MUTEX_POST_LOCK(
01610 this, __tsan_mutex_try_lock | __tsan_mutex_try_lock_failed, 0);
01611 return false;
01612 }
01613
01614 ABSL_XRAY_LOG_ARGS(1) bool Mutex::ReaderTryLock() {
01615 ABSL_TSAN_MUTEX_PRE_LOCK(this,
01616 __tsan_mutex_read_lock | __tsan_mutex_try_lock);
01617 intptr_t v = mu_.load(std::memory_order_relaxed);
01618
01619
01620
01621 int loop_limit = 5;
01622 while ((v & (kMuWriter|kMuWait|kMuEvent)) == 0 && loop_limit != 0) {
01623 if (mu_.compare_exchange_strong(v, (kMuReader | v) + kMuOne,
01624 std::memory_order_acquire,
01625 std::memory_order_relaxed)) {
01626 DebugOnlyLockEnter(this);
01627 ABSL_TSAN_MUTEX_POST_LOCK(
01628 this, __tsan_mutex_read_lock | __tsan_mutex_try_lock, 0);
01629 return true;
01630 }
01631 loop_limit--;
01632 v = mu_.load(std::memory_order_relaxed);
01633 }
01634 if ((v & kMuEvent) != 0) {
01635 loop_limit = 5;
01636 while ((v & kShared->slow_need_zero) == 0 && loop_limit != 0) {
01637 if (mu_.compare_exchange_strong(v, (kMuReader | v) + kMuOne,
01638 std::memory_order_acquire,
01639 std::memory_order_relaxed)) {
01640 DebugOnlyLockEnter(this);
01641 PostSynchEvent(this, SYNCH_EV_READERTRYLOCK_SUCCESS);
01642 ABSL_TSAN_MUTEX_POST_LOCK(
01643 this, __tsan_mutex_read_lock | __tsan_mutex_try_lock, 0);
01644 return true;
01645 }
01646 loop_limit--;
01647 v = mu_.load(std::memory_order_relaxed);
01648 }
01649 if ((v & kMuEvent) != 0) {
01650 PostSynchEvent(this, SYNCH_EV_READERTRYLOCK_FAILED);
01651 }
01652 }
01653 ABSL_TSAN_MUTEX_POST_LOCK(this,
01654 __tsan_mutex_read_lock | __tsan_mutex_try_lock |
01655 __tsan_mutex_try_lock_failed,
01656 0);
01657 return false;
01658 }
01659
01660 ABSL_XRAY_LOG_ARGS(1) void Mutex::Unlock() {
01661 ABSL_TSAN_MUTEX_PRE_UNLOCK(this, 0);
01662 DebugOnlyLockLeave(this);
01663 intptr_t v = mu_.load(std::memory_order_relaxed);
01664
01665 if (kDebugMode && ((v & (kMuWriter | kMuReader)) != kMuWriter)) {
01666 ABSL_RAW_LOG(FATAL, "Mutex unlocked when destroyed or not locked: v=0x%x",
01667 static_cast<unsigned>(v));
01668 }
01669
01670
01671
01672 bool should_try_cas = ((v & (kMuEvent | kMuWriter)) == kMuWriter &&
01673 (v & (kMuWait | kMuDesig)) != kMuWait);
01674
01675
01676
01677 intptr_t x = (v ^ (kMuWriter | kMuWait)) & (kMuWriter | kMuEvent);
01678 intptr_t y = (v ^ (kMuWriter | kMuWait)) & (kMuWait | kMuDesig);
01679
01680
01681
01682
01683 if (kDebugMode && should_try_cas != (x < y)) {
01684
01685
01686 ABSL_RAW_LOG(FATAL, "internal logic error %llx %llx %llx\n",
01687 static_cast<long long>(v), static_cast<long long>(x),
01688 static_cast<long long>(y));
01689 }
01690 if (x < y &&
01691 mu_.compare_exchange_strong(v, v & ~(kMuWrWait | kMuWriter),
01692 std::memory_order_release,
01693 std::memory_order_relaxed)) {
01694
01695 } else {
01696 this->UnlockSlow(nullptr );
01697 }
01698 ABSL_TSAN_MUTEX_POST_UNLOCK(this, 0);
01699 }
01700
01701
01702 static bool ExactlyOneReader(intptr_t v) {
01703 assert((v & (kMuWriter|kMuReader)) == kMuReader);
01704 assert((v & kMuHigh) != 0);
01705
01706
01707
01708 constexpr intptr_t kMuMultipleWaitersMask = kMuHigh ^ kMuOne;
01709 return (v & kMuMultipleWaitersMask) == 0;
01710 }
01711
01712 ABSL_XRAY_LOG_ARGS(1) void Mutex::ReaderUnlock() {
01713 ABSL_TSAN_MUTEX_PRE_UNLOCK(this, __tsan_mutex_read_lock);
01714 DebugOnlyLockLeave(this);
01715 intptr_t v = mu_.load(std::memory_order_relaxed);
01716 assert((v & (kMuWriter|kMuReader)) == kMuReader);
01717 if ((v & (kMuReader|kMuWait|kMuEvent)) == kMuReader) {
01718
01719 intptr_t clear = ExactlyOneReader(v) ? kMuReader|kMuOne : kMuOne;
01720 if (mu_.compare_exchange_strong(v, v - clear,
01721 std::memory_order_release,
01722 std::memory_order_relaxed)) {
01723 ABSL_TSAN_MUTEX_POST_UNLOCK(this, __tsan_mutex_read_lock);
01724 return;
01725 }
01726 }
01727 this->UnlockSlow(nullptr );
01728 ABSL_TSAN_MUTEX_POST_UNLOCK(this, __tsan_mutex_read_lock);
01729 }
01730
01731
01732
01733
01734 static const intptr_t zap_desig_waker[] = {
01735 ~static_cast<intptr_t>(0),
01736 ~static_cast<intptr_t>(
01737 kMuDesig)
01738 };
01739
01740
01741
01742
01743 static const intptr_t ignore_waiting_writers[] = {
01744 ~static_cast<intptr_t>(0),
01745 ~static_cast<intptr_t>(
01746 kMuWrWait)
01747 };
01748
01749
01750 void Mutex::LockSlow(MuHow how, const Condition *cond, int flags) {
01751 ABSL_RAW_CHECK(
01752 this->LockSlowWithDeadline(how, cond, KernelTimeout::Never(), flags),
01753 "condition untrue on return from LockSlow");
01754 }
01755
01756
01757 static inline bool EvalConditionAnnotated(const Condition *cond, Mutex *mu,
01758 bool locking, bool trylock,
01759 bool read_lock) {
01760
01761
01762
01763
01764 bool res = false;
01765 #ifdef THREAD_SANITIZER
01766 const int flags = read_lock ? __tsan_mutex_read_lock : 0;
01767 const int tryflags = flags | (trylock ? __tsan_mutex_try_lock : 0);
01768 #endif
01769 if (locking) {
01770
01771
01772
01773
01774
01775
01776 ABSL_TSAN_MUTEX_POST_LOCK(mu, tryflags, 0);
01777 res = cond->Eval();
01778
01779 ABSL_TSAN_MUTEX_PRE_UNLOCK(mu, flags);
01780 ABSL_TSAN_MUTEX_POST_UNLOCK(mu, flags);
01781 ABSL_TSAN_MUTEX_PRE_LOCK(mu, tryflags);
01782 } else {
01783
01784
01785
01786 ABSL_TSAN_MUTEX_POST_UNLOCK(mu, flags);
01787 ABSL_TSAN_MUTEX_PRE_LOCK(mu, flags);
01788 ABSL_TSAN_MUTEX_POST_LOCK(mu, flags, 0);
01789 res = cond->Eval();
01790 ABSL_TSAN_MUTEX_PRE_UNLOCK(mu, flags);
01791 }
01792
01793 static_cast<void>(mu);
01794 static_cast<void>(trylock);
01795 static_cast<void>(read_lock);
01796 return res;
01797 }
01798
01799
01800
01801
01802
01803
01804
01805
01806
01807 static inline bool EvalConditionIgnored(Mutex *mu, const Condition *cond) {
01808
01809
01810
01811
01812
01813
01814 ABSL_TSAN_MUTEX_PRE_DIVERT(mu, 0);
01815 ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
01816 bool res = cond->Eval();
01817 ANNOTATE_IGNORE_READS_AND_WRITES_END();
01818 ABSL_TSAN_MUTEX_POST_DIVERT(mu, 0);
01819 static_cast<void>(mu);
01820 return res;
01821 }
01822
01823
01824
01825
01826
01827
01828
01829
01830
01831
01832 bool Mutex::LockSlowWithDeadline(MuHow how, const Condition *cond,
01833 KernelTimeout t, int flags) {
01834 intptr_t v = mu_.load(std::memory_order_relaxed);
01835 bool unlock = false;
01836 if ((v & how->fast_need_zero) == 0 &&
01837 mu_.compare_exchange_strong(
01838 v, (how->fast_or | (v & zap_desig_waker[flags & kMuHasBlocked])) +
01839 how->fast_add,
01840 std::memory_order_acquire, std::memory_order_relaxed)) {
01841 if (cond == nullptr ||
01842 EvalConditionAnnotated(cond, this, true, false, how == kShared)) {
01843 return true;
01844 }
01845 unlock = true;
01846 }
01847 SynchWaitParams waitp(
01848 how, cond, t, nullptr , Synch_GetPerThreadAnnotated(this),
01849 nullptr );
01850 if (!Condition::GuaranteedEqual(cond, nullptr)) {
01851 flags |= kMuIsCond;
01852 }
01853 if (unlock) {
01854 this->UnlockSlow(&waitp);
01855 this->Block(waitp.thread);
01856 flags |= kMuHasBlocked;
01857 }
01858 this->LockSlowLoop(&waitp, flags);
01859 return waitp.cond != nullptr ||
01860 cond == nullptr ||
01861 EvalConditionAnnotated(cond, this, true, false, how == kShared);
01862 }
01863
01864
01865
01866
01867 #define RAW_CHECK_FMT(cond, ...) \
01868 do { \
01869 if (ABSL_PREDICT_FALSE(!(cond))) { \
01870 ABSL_RAW_LOG(FATAL, "Check " #cond " failed: " __VA_ARGS__); \
01871 } \
01872 } while (0)
01873
01874 static void CheckForMutexCorruption(intptr_t v, const char* label) {
01875
01876
01877
01878 const uintptr_t w = v ^ kMuWait;
01879
01880
01881
01882
01883
01884
01885 static_assert(kMuReader << 3 == kMuWriter, "must match");
01886 static_assert(kMuWait << 3 == kMuWrWait, "must match");
01887 if (ABSL_PREDICT_TRUE((w & (w << 3) & (kMuWriter | kMuWrWait)) == 0)) return;
01888 RAW_CHECK_FMT((v & (kMuWriter | kMuReader)) != (kMuWriter | kMuReader),
01889 "%s: Mutex corrupt: both reader and writer lock held: %p",
01890 label, reinterpret_cast<void *>(v));
01891 RAW_CHECK_FMT((v & (kMuWait | kMuWrWait)) != kMuWrWait,
01892 "%s: Mutex corrupt: waiting writer with no waiters: %p",
01893 label, reinterpret_cast<void *>(v));
01894 assert(false);
01895 }
01896
01897 void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) {
01898 int c = 0;
01899 intptr_t v = mu_.load(std::memory_order_relaxed);
01900 if ((v & kMuEvent) != 0) {
01901 PostSynchEvent(this,
01902 waitp->how == kExclusive? SYNCH_EV_LOCK: SYNCH_EV_READERLOCK);
01903 }
01904 ABSL_RAW_CHECK(
01905 waitp->thread->waitp == nullptr || waitp->thread->suppress_fatal_errors,
01906 "detected illegal recursion into Mutex code");
01907 for (;;) {
01908 v = mu_.load(std::memory_order_relaxed);
01909 CheckForMutexCorruption(v, "Lock");
01910 if ((v & waitp->how->slow_need_zero) == 0) {
01911 if (mu_.compare_exchange_strong(
01912 v, (waitp->how->fast_or |
01913 (v & zap_desig_waker[flags & kMuHasBlocked])) +
01914 waitp->how->fast_add,
01915 std::memory_order_acquire, std::memory_order_relaxed)) {
01916 if (waitp->cond == nullptr ||
01917 EvalConditionAnnotated(waitp->cond, this, true, false,
01918 waitp->how == kShared)) {
01919 break;
01920 }
01921 this->UnlockSlow(waitp);
01922 this->Block(waitp->thread);
01923 flags |= kMuHasBlocked;
01924 c = 0;
01925 }
01926 } else {
01927 bool dowait = false;
01928 if ((v & (kMuSpin|kMuWait)) == 0) {
01929
01930 PerThreadSynch *new_h = Enqueue(nullptr, waitp, v, flags);
01931 intptr_t nv = (v & zap_desig_waker[flags & kMuHasBlocked] & kMuLow) |
01932 kMuWait;
01933 ABSL_RAW_CHECK(new_h != nullptr, "Enqueue to empty list failed");
01934 if (waitp->how == kExclusive && (v & kMuReader) != 0) {
01935 nv |= kMuWrWait;
01936 }
01937 if (mu_.compare_exchange_strong(
01938 v, reinterpret_cast<intptr_t>(new_h) | nv,
01939 std::memory_order_release, std::memory_order_relaxed)) {
01940 dowait = true;
01941 } else {
01942
01943 waitp->thread->waitp = nullptr;
01944 }
01945 } else if ((v & waitp->how->slow_inc_need_zero &
01946 ignore_waiting_writers[flags & kMuHasBlocked]) == 0) {
01947
01948
01949 if (mu_.compare_exchange_strong(
01950 v, (v & zap_desig_waker[flags & kMuHasBlocked]) | kMuSpin |
01951 kMuReader,
01952 std::memory_order_acquire, std::memory_order_relaxed)) {
01953 PerThreadSynch *h = GetPerThreadSynch(v);
01954 h->readers += kMuOne;
01955 do {
01956 v = mu_.load(std::memory_order_relaxed);
01957 } while (!mu_.compare_exchange_weak(v, (v & ~kMuSpin) | kMuReader,
01958 std::memory_order_release,
01959 std::memory_order_relaxed));
01960 if (waitp->cond == nullptr ||
01961 EvalConditionAnnotated(waitp->cond, this, true, false,
01962 waitp->how == kShared)) {
01963 break;
01964 }
01965 this->UnlockSlow(waitp);
01966 this->Block(waitp->thread);
01967 flags |= kMuHasBlocked;
01968 c = 0;
01969 }
01970 } else if ((v & kMuSpin) == 0 &&
01971 mu_.compare_exchange_strong(
01972 v, (v & zap_desig_waker[flags & kMuHasBlocked]) | kMuSpin |
01973 kMuWait,
01974 std::memory_order_acquire, std::memory_order_relaxed)) {
01975 PerThreadSynch *h = GetPerThreadSynch(v);
01976 PerThreadSynch *new_h = Enqueue(h, waitp, v, flags);
01977 intptr_t wr_wait = 0;
01978 ABSL_RAW_CHECK(new_h != nullptr, "Enqueue to list failed");
01979 if (waitp->how == kExclusive && (v & kMuReader) != 0) {
01980 wr_wait = kMuWrWait;
01981 }
01982 do {
01983 v = mu_.load(std::memory_order_relaxed);
01984 } while (!mu_.compare_exchange_weak(
01985 v, (v & (kMuLow & ~kMuSpin)) | kMuWait | wr_wait |
01986 reinterpret_cast<intptr_t>(new_h),
01987 std::memory_order_release, std::memory_order_relaxed));
01988 dowait = true;
01989 }
01990 if (dowait) {
01991 this->Block(waitp->thread);
01992 flags |= kMuHasBlocked;
01993 c = 0;
01994 }
01995 }
01996 ABSL_RAW_CHECK(
01997 waitp->thread->waitp == nullptr || waitp->thread->suppress_fatal_errors,
01998 "detected illegal recursion into Mutex code");
01999 c = Delay(c, GENTLE);
02000 }
02001 ABSL_RAW_CHECK(
02002 waitp->thread->waitp == nullptr || waitp->thread->suppress_fatal_errors,
02003 "detected illegal recursion into Mutex code");
02004 if ((v & kMuEvent) != 0) {
02005 PostSynchEvent(this,
02006 waitp->how == kExclusive? SYNCH_EV_LOCK_RETURNING :
02007 SYNCH_EV_READERLOCK_RETURNING);
02008 }
02009 }
02010
02011
02012
02013
02014
02015
02016 void Mutex::UnlockSlow(SynchWaitParams *waitp) {
02017 intptr_t v = mu_.load(std::memory_order_relaxed);
02018 this->AssertReaderHeld();
02019 CheckForMutexCorruption(v, "Unlock");
02020 if ((v & kMuEvent) != 0) {
02021 PostSynchEvent(this,
02022 (v & kMuWriter) != 0? SYNCH_EV_UNLOCK: SYNCH_EV_READERUNLOCK);
02023 }
02024 int c = 0;
02025
02026 PerThreadSynch *w = nullptr;
02027
02028 PerThreadSynch *pw = nullptr;
02029
02030 PerThreadSynch *old_h = nullptr;
02031
02032 const Condition *known_false = nullptr;
02033 PerThreadSynch *wake_list = kPerThreadSynchNull;
02034 intptr_t wr_wait = 0;
02035
02036
02037 ABSL_RAW_CHECK(waitp == nullptr || waitp->thread->waitp == nullptr ||
02038 waitp->thread->suppress_fatal_errors,
02039 "detected illegal recursion into Mutex code");
02040
02041
02042
02043 for (;;) {
02044 v = mu_.load(std::memory_order_relaxed);
02045 if ((v & kMuWriter) != 0 && (v & (kMuWait | kMuDesig)) != kMuWait &&
02046 waitp == nullptr) {
02047
02048 if (mu_.compare_exchange_strong(v, v & ~(kMuWrWait | kMuWriter),
02049 std::memory_order_release,
02050 std::memory_order_relaxed)) {
02051 return;
02052 }
02053 } else if ((v & (kMuReader | kMuWait)) == kMuReader && waitp == nullptr) {
02054
02055 intptr_t clear = ExactlyOneReader(v) ? kMuReader | kMuOne : kMuOne;
02056 if (mu_.compare_exchange_strong(v, v - clear,
02057 std::memory_order_release,
02058 std::memory_order_relaxed)) {
02059 return;
02060 }
02061 } else if ((v & kMuSpin) == 0 &&
02062 mu_.compare_exchange_strong(v, v | kMuSpin,
02063 std::memory_order_acquire,
02064 std::memory_order_relaxed)) {
02065 if ((v & kMuWait) == 0) {
02066 intptr_t nv;
02067 bool do_enqueue = true;
02068 ABSL_RAW_CHECK(waitp != nullptr,
02069 "UnlockSlow is confused");
02070 do {
02071 v = mu_.load(std::memory_order_relaxed);
02072
02073 intptr_t new_readers = (v >= kMuOne)? v - kMuOne : v;
02074 PerThreadSynch *new_h = nullptr;
02075 if (do_enqueue) {
02076
02077
02078
02079
02080 do_enqueue = (waitp->cv_word == nullptr);
02081 new_h = Enqueue(nullptr, waitp, new_readers, kMuIsCond);
02082 }
02083 intptr_t clear = kMuWrWait | kMuWriter;
02084 if ((v & kMuWriter) == 0 && ExactlyOneReader(v)) {
02085 clear = kMuWrWait | kMuReader;
02086 }
02087 nv = (v & kMuLow & ~clear & ~kMuSpin);
02088 if (new_h != nullptr) {
02089 nv |= kMuWait | reinterpret_cast<intptr_t>(new_h);
02090 } else {
02091
02092
02093
02094 nv |= new_readers & kMuHigh;
02095 }
02096
02097
02098 } while (!mu_.compare_exchange_weak(v, nv,
02099 std::memory_order_release,
02100 std::memory_order_relaxed));
02101 break;
02102 }
02103
02104
02105
02106 PerThreadSynch *h = GetPerThreadSynch(v);
02107 if ((v & kMuReader) != 0 && (h->readers & kMuHigh) > kMuOne) {
02108
02109 h->readers -= kMuOne;
02110 intptr_t nv = v;
02111 if (waitp != nullptr) {
02112 PerThreadSynch *new_h = Enqueue(h, waitp, v, kMuIsCond);
02113 ABSL_RAW_CHECK(new_h != nullptr,
02114 "waiters disappeared during Enqueue()!");
02115 nv &= kMuLow;
02116 nv |= kMuWait | reinterpret_cast<intptr_t>(new_h);
02117 }
02118 mu_.store(nv, std::memory_order_release);
02119
02120 break;
02121 }
02122
02123
02124
02125 ABSL_RAW_CHECK(old_h == nullptr || h->maybe_unlocking,
02126 "Mutex queue changed beneath us");
02127
02128
02129 if (old_h != nullptr &&
02130 !old_h->may_skip) {
02131 old_h->may_skip = true;
02132 ABSL_RAW_CHECK(old_h->skip == nullptr, "illegal skip from head");
02133 if (h != old_h && MuSameCondition(old_h, old_h->next)) {
02134 old_h->skip = old_h->next;
02135 }
02136 }
02137 if (h->next->waitp->how == kExclusive &&
02138 Condition::GuaranteedEqual(h->next->waitp->cond, nullptr)) {
02139
02140 pw = h;
02141 w = h->next;
02142 w->wake = true;
02143
02144
02145
02146
02147
02148
02149
02150 wr_wait = kMuWrWait;
02151 } else if (w != nullptr && (w->waitp->how == kExclusive || h == old_h)) {
02152
02153
02154
02155 if (pw == nullptr) {
02156 pw = h;
02157 }
02158 } else {
02159
02160
02161
02162
02163 if (old_h == h) {
02164
02165 intptr_t nv = (v & ~(kMuReader|kMuWriter|kMuWrWait));
02166 h->readers = 0;
02167 h->maybe_unlocking = false;
02168 if (waitp != nullptr) {
02169 PerThreadSynch *new_h = Enqueue(h, waitp, v, kMuIsCond);
02170 nv &= kMuLow;
02171 if (new_h != nullptr) {
02172 nv |= kMuWait | reinterpret_cast<intptr_t>(new_h);
02173 }
02174
02175 }
02176
02177
02178 mu_.store(nv, std::memory_order_release);
02179 break;
02180 }
02181
02182
02183 PerThreadSynch *w_walk;
02184 PerThreadSynch *pw_walk;
02185 if (old_h != nullptr) {
02186 pw_walk = old_h;
02187 w_walk = old_h->next;
02188 } else {
02189 pw_walk =
02190 nullptr;
02191 w_walk = h->next;
02192 }
02193
02194 h->may_skip = false;
02195
02196 ABSL_RAW_CHECK(h->skip == nullptr, "illegal skip from head");
02197
02198 h->maybe_unlocking = true;
02199
02200
02201
02202
02203
02204 mu_.store(v, std::memory_order_release);
02205
02206
02207
02208
02209
02210
02211
02212
02213
02214 old_h = h;
02215
02216
02217 while (pw_walk != h) {
02218 w_walk->wake = false;
02219 if (w_walk->waitp->cond ==
02220 nullptr ||
02221 (w_walk->waitp->cond != known_false &&
02222
02223
02224 EvalConditionIgnored(this, w_walk->waitp->cond))) {
02225 if (w == nullptr) {
02226 w_walk->wake = true;
02227 w = w_walk;
02228 pw = pw_walk;
02229 if (w_walk->waitp->how == kExclusive) {
02230 wr_wait = kMuWrWait;
02231 break;
02232 }
02233 } else if (w_walk->waitp->how == kShared) {
02234 w_walk->wake = true;
02235 } else {
02236 wr_wait = kMuWrWait;
02237 }
02238 } else {
02239 known_false = w_walk->waitp->cond;
02240 }
02241 if (w_walk->wake) {
02242 pw_walk = w_walk;
02243 } else {
02244 pw_walk = Skip(w_walk);
02245 }
02246
02247
02248
02249
02250 if (pw_walk != h) {
02251 w_walk = pw_walk->next;
02252 }
02253 }
02254
02255 continue;
02256 }
02257 ABSL_RAW_CHECK(pw->next == w, "pw not w's predecessor");
02258
02259
02260
02261
02262
02263
02264
02265
02266
02267 h = DequeueAllWakeable(h, pw, &wake_list);
02268
02269 intptr_t nv = (v & kMuEvent) | kMuDesig;
02270
02271
02272
02273 if (waitp != nullptr) {
02274 h = Enqueue(h, waitp, v, kMuIsCond);
02275
02276
02277 }
02278
02279 ABSL_RAW_CHECK(wake_list != kPerThreadSynchNull,
02280 "unexpected empty wake list");
02281
02282 if (h != nullptr) {
02283 h->readers = 0;
02284 h->maybe_unlocking = false;
02285 nv |= wr_wait | kMuWait | reinterpret_cast<intptr_t>(h);
02286 }
02287
02288
02289
02290 mu_.store(nv, std::memory_order_release);
02291 break;
02292 }
02293 c = Delay(c, AGGRESSIVE);
02294 }
02295
02296 if (wake_list != kPerThreadSynchNull) {
02297 int64_t enqueue_timestamp = wake_list->waitp->contention_start_cycles;
02298 bool cond_waiter = wake_list->cond_waiter;
02299 do {
02300 wake_list = Wakeup(wake_list);
02301 } while (wake_list != kPerThreadSynchNull);
02302 if (!cond_waiter) {
02303
02304
02305 int64_t wait_cycles = base_internal::CycleClock::Now() - enqueue_timestamp;
02306 mutex_tracer("slow release", this, wait_cycles);
02307 ABSL_TSAN_MUTEX_PRE_DIVERT(this, 0);
02308 submit_profile_data(enqueue_timestamp);
02309 ABSL_TSAN_MUTEX_POST_DIVERT(this, 0);
02310 }
02311 }
02312 }
02313
02314
02315
02316
02317
02318
02319
02320
02321
02322
02323 void Mutex::Trans(MuHow how) {
02324 this->LockSlow(how, nullptr, kMuHasBlocked | kMuIsCond);
02325 }
02326
02327
02328
02329
02330
02331 void Mutex::Fer(PerThreadSynch *w) {
02332 int c = 0;
02333 ABSL_RAW_CHECK(w->waitp->cond == nullptr,
02334 "Mutex::Fer while waiting on Condition");
02335 ABSL_RAW_CHECK(!w->waitp->timeout.has_timeout(),
02336 "Mutex::Fer while in timed wait");
02337 ABSL_RAW_CHECK(w->waitp->cv_word == nullptr,
02338 "Mutex::Fer with pending CondVar queueing");
02339 for (;;) {
02340 intptr_t v = mu_.load(std::memory_order_relaxed);
02341
02342
02343
02344
02345
02346
02347 const intptr_t conflicting =
02348 kMuWriter | (w->waitp->how == kShared ? 0 : kMuReader);
02349 if ((v & conflicting) == 0) {
02350 w->next = nullptr;
02351 w->state.store(PerThreadSynch::kAvailable, std::memory_order_release);
02352 IncrementSynchSem(this, w);
02353 return;
02354 } else {
02355 if ((v & (kMuSpin|kMuWait)) == 0) {
02356
02357 PerThreadSynch *new_h = Enqueue(nullptr, w->waitp, v, kMuIsCond);
02358 ABSL_RAW_CHECK(new_h != nullptr,
02359 "Enqueue failed");
02360 if (mu_.compare_exchange_strong(
02361 v, reinterpret_cast<intptr_t>(new_h) | (v & kMuLow) | kMuWait,
02362 std::memory_order_release, std::memory_order_relaxed)) {
02363 return;
02364 }
02365 } else if ((v & kMuSpin) == 0 &&
02366 mu_.compare_exchange_strong(v, v | kMuSpin | kMuWait)) {
02367 PerThreadSynch *h = GetPerThreadSynch(v);
02368 PerThreadSynch *new_h = Enqueue(h, w->waitp, v, kMuIsCond);
02369 ABSL_RAW_CHECK(new_h != nullptr,
02370 "Enqueue failed");
02371 do {
02372 v = mu_.load(std::memory_order_relaxed);
02373 } while (!mu_.compare_exchange_weak(
02374 v,
02375 (v & kMuLow & ~kMuSpin) | kMuWait |
02376 reinterpret_cast<intptr_t>(new_h),
02377 std::memory_order_release, std::memory_order_relaxed));
02378 return;
02379 }
02380 }
02381 c = Delay(c, GENTLE);
02382 }
02383 }
02384
02385 void Mutex::AssertHeld() const {
02386 if ((mu_.load(std::memory_order_relaxed) & kMuWriter) == 0) {
02387 SynchEvent *e = GetSynchEvent(this);
02388 ABSL_RAW_LOG(FATAL, "thread should hold write lock on Mutex %p %s",
02389 static_cast<const void *>(this),
02390 (e == nullptr ? "" : e->name));
02391 }
02392 }
02393
02394 void Mutex::AssertReaderHeld() const {
02395 if ((mu_.load(std::memory_order_relaxed) & (kMuReader | kMuWriter)) == 0) {
02396 SynchEvent *e = GetSynchEvent(this);
02397 ABSL_RAW_LOG(
02398 FATAL, "thread should hold at least a read lock on Mutex %p %s",
02399 static_cast<const void *>(this), (e == nullptr ? "" : e->name));
02400 }
02401 }
02402
02403
02404 static const intptr_t kCvSpin = 0x0001L;
02405 static const intptr_t kCvEvent = 0x0002L;
02406
02407 static const intptr_t kCvLow = 0x0003L;
02408
02409
02410 enum { kGdbCvSpin = kCvSpin, kGdbCvEvent = kCvEvent, kGdbCvLow = kCvLow, };
02411
02412 static_assert(PerThreadSynch::kAlignment > kCvLow,
02413 "PerThreadSynch::kAlignment must be greater than kCvLow");
02414
02415 void CondVar::EnableDebugLog(const char *name) {
02416 SynchEvent *e = EnsureSynchEvent(&this->cv_, name, kCvEvent, kCvSpin);
02417 e->log = true;
02418 UnrefSynchEvent(e);
02419 }
02420
02421 CondVar::~CondVar() {
02422 if ((cv_.load(std::memory_order_relaxed) & kCvEvent) != 0) {
02423 ForgetSynchEvent(&this->cv_, kCvEvent, kCvSpin);
02424 }
02425 }
02426
02427
02428
02429 void CondVar::Remove(PerThreadSynch *s) {
02430 intptr_t v;
02431 int c = 0;
02432 for (v = cv_.load(std::memory_order_relaxed);;
02433 v = cv_.load(std::memory_order_relaxed)) {
02434 if ((v & kCvSpin) == 0 &&
02435 cv_.compare_exchange_strong(v, v | kCvSpin,
02436 std::memory_order_acquire,
02437 std::memory_order_relaxed)) {
02438 PerThreadSynch *h = reinterpret_cast<PerThreadSynch *>(v & ~kCvLow);
02439 if (h != nullptr) {
02440 PerThreadSynch *w = h;
02441 while (w->next != s && w->next != h) {
02442 w = w->next;
02443 }
02444 if (w->next == s) {
02445 w->next = s->next;
02446 if (h == s) {
02447 h = (w == s) ? nullptr : w;
02448 }
02449 s->next = nullptr;
02450 s->state.store(PerThreadSynch::kAvailable, std::memory_order_release);
02451 }
02452 }
02453
02454 cv_.store((v & kCvEvent) | reinterpret_cast<intptr_t>(h),
02455 std::memory_order_release);
02456 return;
02457 } else {
02458 c = Delay(c, GENTLE);
02459 }
02460 }
02461 }
02462
02463
02464
02465
02466
02467
02468
02469
02470
02471
02472
02473
02474
02475 static void CondVarEnqueue(SynchWaitParams *waitp) {
02476
02477
02478
02479
02480
02481
02482 std::atomic<intptr_t> *cv_word = waitp->cv_word;
02483 waitp->cv_word = nullptr;
02484
02485 intptr_t v = cv_word->load(std::memory_order_relaxed);
02486 int c = 0;
02487 while ((v & kCvSpin) != 0 ||
02488 !cv_word->compare_exchange_weak(v, v | kCvSpin,
02489 std::memory_order_acquire,
02490 std::memory_order_relaxed)) {
02491 c = Delay(c, GENTLE);
02492 v = cv_word->load(std::memory_order_relaxed);
02493 }
02494 ABSL_RAW_CHECK(waitp->thread->waitp == nullptr, "waiting when shouldn't be");
02495 waitp->thread->waitp = waitp;
02496 PerThreadSynch *h = reinterpret_cast<PerThreadSynch *>(v & ~kCvLow);
02497 if (h == nullptr) {
02498 waitp->thread->next = waitp->thread;
02499 } else {
02500 waitp->thread->next = h->next;
02501 h->next = waitp->thread;
02502 }
02503 waitp->thread->state.store(PerThreadSynch::kQueued,
02504 std::memory_order_relaxed);
02505 cv_word->store((v & kCvEvent) | reinterpret_cast<intptr_t>(waitp->thread),
02506 std::memory_order_release);
02507 }
02508
02509 bool CondVar::WaitCommon(Mutex *mutex, KernelTimeout t) {
02510 bool rc = false;
02511
02512 intptr_t mutex_v = mutex->mu_.load(std::memory_order_relaxed);
02513 Mutex::MuHow mutex_how = ((mutex_v & kMuWriter) != 0) ? kExclusive : kShared;
02514 ABSL_TSAN_MUTEX_PRE_UNLOCK(mutex, TsanFlags(mutex_how));
02515
02516
02517 intptr_t v = cv_.load(std::memory_order_relaxed);
02518 cond_var_tracer("Wait", this);
02519 if ((v & kCvEvent) != 0) {
02520 PostSynchEvent(this, SYNCH_EV_WAIT);
02521 }
02522
02523
02524 SynchWaitParams waitp(mutex_how, nullptr, t, mutex,
02525 Synch_GetPerThreadAnnotated(mutex), &cv_);
02526
02527
02528
02529 mutex->UnlockSlow(&waitp);
02530
02531
02532 while (waitp.thread->state.load(std::memory_order_acquire) ==
02533 PerThreadSynch::kQueued) {
02534 if (!Mutex::DecrementSynchSem(mutex, waitp.thread, t)) {
02535 this->Remove(waitp.thread);
02536 rc = true;
02537 }
02538 }
02539
02540 ABSL_RAW_CHECK(waitp.thread->waitp != nullptr, "not waiting when should be");
02541 waitp.thread->waitp = nullptr;
02542
02543
02544 cond_var_tracer("Unwait", this);
02545 if ((v & kCvEvent) != 0) {
02546 PostSynchEvent(this, SYNCH_EV_WAIT_RETURNING);
02547 }
02548
02549
02550
02551
02552
02553 ABSL_TSAN_MUTEX_POST_UNLOCK(mutex, TsanFlags(mutex_how));
02554 ABSL_TSAN_MUTEX_PRE_LOCK(mutex, TsanFlags(mutex_how));
02555 mutex->Trans(mutex_how);
02556 ABSL_TSAN_MUTEX_POST_LOCK(mutex, TsanFlags(mutex_how), 0);
02557 return rc;
02558 }
02559
02560 bool CondVar::WaitWithTimeout(Mutex *mu, absl::Duration timeout) {
02561 return WaitWithDeadline(mu, DeadlineFromTimeout(timeout));
02562 }
02563
02564 bool CondVar::WaitWithDeadline(Mutex *mu, absl::Time deadline) {
02565 return WaitCommon(mu, KernelTimeout(deadline));
02566 }
02567
02568 void CondVar::Wait(Mutex *mu) {
02569 WaitCommon(mu, KernelTimeout::Never());
02570 }
02571
02572
02573
02574
02575
02576 void CondVar::Wakeup(PerThreadSynch *w) {
02577 if (w->waitp->timeout.has_timeout() || w->waitp->cvmu == nullptr) {
02578
02579
02580 Mutex *mu = w->waitp->cvmu;
02581 w->next = nullptr;
02582 w->state.store(PerThreadSynch::kAvailable, std::memory_order_release);
02583 Mutex::IncrementSynchSem(mu, w);
02584 } else {
02585 w->waitp->cvmu->Fer(w);
02586 }
02587 }
02588
02589 void CondVar::Signal() {
02590 ABSL_TSAN_MUTEX_PRE_SIGNAL(nullptr, 0);
02591 intptr_t v;
02592 int c = 0;
02593 for (v = cv_.load(std::memory_order_relaxed); v != 0;
02594 v = cv_.load(std::memory_order_relaxed)) {
02595 if ((v & kCvSpin) == 0 &&
02596 cv_.compare_exchange_strong(v, v | kCvSpin,
02597 std::memory_order_acquire,
02598 std::memory_order_relaxed)) {
02599 PerThreadSynch *h = reinterpret_cast<PerThreadSynch *>(v & ~kCvLow);
02600 PerThreadSynch *w = nullptr;
02601 if (h != nullptr) {
02602 w = h->next;
02603 if (w == h) {
02604 h = nullptr;
02605 } else {
02606 h->next = w->next;
02607 }
02608 }
02609
02610 cv_.store((v & kCvEvent) | reinterpret_cast<intptr_t>(h),
02611 std::memory_order_release);
02612 if (w != nullptr) {
02613 CondVar::Wakeup(w);
02614 cond_var_tracer("Signal wakeup", this);
02615 }
02616 if ((v & kCvEvent) != 0) {
02617 PostSynchEvent(this, SYNCH_EV_SIGNAL);
02618 }
02619 ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0);
02620 return;
02621 } else {
02622 c = Delay(c, GENTLE);
02623 }
02624 }
02625 ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0);
02626 }
02627
02628 void CondVar::SignalAll () {
02629 ABSL_TSAN_MUTEX_PRE_SIGNAL(nullptr, 0);
02630 intptr_t v;
02631 int c = 0;
02632 for (v = cv_.load(std::memory_order_relaxed); v != 0;
02633 v = cv_.load(std::memory_order_relaxed)) {
02634
02635
02636
02637
02638
02639 if ((v & kCvSpin) == 0 &&
02640 cv_.compare_exchange_strong(v, v & kCvEvent, std::memory_order_acquire,
02641 std::memory_order_relaxed)) {
02642 PerThreadSynch *h = reinterpret_cast<PerThreadSynch *>(v & ~kCvLow);
02643 if (h != nullptr) {
02644 PerThreadSynch *w;
02645 PerThreadSynch *n = h->next;
02646 do {
02647 w = n;
02648 n = n->next;
02649 CondVar::Wakeup(w);
02650 } while (w != h);
02651 cond_var_tracer("SignalAll wakeup", this);
02652 }
02653 if ((v & kCvEvent) != 0) {
02654 PostSynchEvent(this, SYNCH_EV_SIGNALALL);
02655 }
02656 ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0);
02657 return;
02658 } else {
02659 c = Delay(c, GENTLE);
02660 }
02661 }
02662 ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0);
02663 }
02664
02665 void ReleasableMutexLock::Release() {
02666 ABSL_RAW_CHECK(this->mu_ != nullptr,
02667 "ReleasableMutexLock::Release may only be called once");
02668 this->mu_->Unlock();
02669 this->mu_ = nullptr;
02670 }
02671
02672 #ifdef THREAD_SANITIZER
02673 extern "C" void __tsan_read1(void *addr);
02674 #else
02675 #define __tsan_read1(addr) // do nothing if TSan not enabled
02676 #endif
02677
02678
02679 static bool Dereference(void *arg) {
02680
02681
02682
02683 __tsan_read1(arg);
02684 return *(static_cast<bool *>(arg));
02685 }
02686
02687 Condition::Condition() {}
02688 const Condition Condition::kTrue;
02689
02690 Condition::Condition(bool (*func)(void *), void *arg)
02691 : eval_(&CallVoidPtrFunction),
02692 function_(func),
02693 method_(nullptr),
02694 arg_(arg) {}
02695
02696 bool Condition::CallVoidPtrFunction(const Condition *c) {
02697 return (*c->function_)(c->arg_);
02698 }
02699
02700 Condition::Condition(const bool *cond)
02701 : eval_(CallVoidPtrFunction),
02702 function_(Dereference),
02703 method_(nullptr),
02704
02705 arg_(const_cast<bool *>(cond)) {}
02706
02707 bool Condition::Eval() const {
02708
02709 return (this->eval_ == nullptr) || (*this->eval_)(this);
02710 }
02711
02712 bool Condition::GuaranteedEqual(const Condition *a, const Condition *b) {
02713 if (a == nullptr) {
02714 return b == nullptr || b->eval_ == nullptr;
02715 }
02716 if (b == nullptr || b->eval_ == nullptr) {
02717 return a->eval_ == nullptr;
02718 }
02719 return a->eval_ == b->eval_ && a->function_ == b->function_ &&
02720 a->arg_ == b->arg_ && a->method_ == b->method_;
02721 }
02722
02723 }