15 #include "absl/synchronization/mutex.h"
29 #include <type_traits>
32 #include "gtest/gtest.h"
33 #include "absl/base/attributes.h"
34 #include "absl/base/config.h"
35 #include "absl/base/internal/raw_logging.h"
36 #include "absl/base/internal/sysinfo.h"
37 #include "absl/memory/memory.h"
38 #include "absl/synchronization/internal/thread_pool.h"
39 #include "absl/time/clock.h"
40 #include "absl/time/time.h"
45 static constexpr
bool kExtendedTest =
false;
47 std::unique_ptr<absl::synchronization_internal::ThreadPool> CreatePool(
49 return absl::make_unique<absl::synchronization_internal::ThreadPool>(
threads);
52 std::unique_ptr<absl::synchronization_internal::ThreadPool>
54 return CreatePool(kExtendedTest ? 32 : 10);
78 static std::atomic<bool> invariant_checked;
80 static bool GetInvariantChecked() {
81 return invariant_checked.load(std::memory_order_relaxed);
84 static void SetInvariantChecked(
bool new_value) {
85 invariant_checked.store(new_value, std::memory_order_relaxed);
88 static void CheckSumG0G1(
void *
v) {
91 SetInvariantChecked(
true);
95 for (
int i = 0;
i != cxt->iterations;
i++) {
104 for (
int i = 0;
i != cxt->iterations;
i++) {
106 std::this_thread::yield();
107 }
while (!cxt->mu.TryLock());
116 for (
int i = 0;
i != cxt->iterations;
i++) {
119 cxt->mu.AssertReaderHeld();
125 for (
int i = 0;
i != cxt->iterations;
i++) {
129 cxt->mu.AssertHeld();
130 cxt->mu.AssertReaderHeld();
133 for (
int i = 0;
i != cxt->iterations;
i++) {
136 cxt->mu.AssertReaderHeld();
147 bool MyContext::MyTurn() {
149 return cxt->g0 == this->
target || cxt->g0 == cxt->iterations;
157 cxt->mu.AssertHeld();
158 while (cxt->g0 < cxt->iterations) {
161 cxt->mu.AssertHeld();
162 if (cxt->g0 < cxt->iterations) {
165 mc.target += cxt->threads;
173 cxt->mu.AssertHeld();
174 while (cxt->g0 < cxt->iterations) {
175 while (cxt->g0 !=
target && cxt->g0 != cxt->iterations) {
176 cxt->cv.Wait(&cxt->mu);
178 if (cxt->g0 < cxt->iterations) {
188 ABSL_RAW_CHECK(cxt->threads == 2,
"TestSignal should use 2 threads");
191 cxt->mu.AssertHeld();
192 while (cxt->g0 < cxt->iterations) {
193 while (cxt->g0 !=
target && cxt->g0 != cxt->iterations) {
194 cxt->cv.Wait(&cxt->mu);
196 if (cxt->g0 < cxt->iterations) {
208 cxt->mu.AssertHeld();
209 while (cxt->g0 < cxt->iterations) {
210 while (cxt->g0 !=
target && cxt->g0 != cxt->iterations) {
213 if (cxt->g0 < cxt->iterations) {
222 static bool G0GE2(
TestContext *cxt) {
return cxt->g0 >= 2; }
224 static void TestTime(
TestContext *cxt,
int c,
bool use_cv) {
225 ABSL_RAW_CHECK(cxt->iterations == 1,
"TestTime should only use 1 iteration");
226 ABSL_RAW_CHECK(cxt->threads > 2,
"TestTime should use more than 2 threads");
227 const bool kFalse =
false;
320 while (cxt->g0 < 2) {
331 while (cxt->g0 < 2) {
332 cxt->cv.Wait(&cxt->mu);
335 cxt->mu.Await(g0ge2);
341 static void TestMuTime(
TestContext *cxt,
int c) { TestTime(cxt,
c,
false); }
343 static void TestCVTime(
TestContext *cxt,
int c) { TestTime(cxt,
c,
true); }
358 int threads,
int iterations,
int operations) {
365 cxt->iterations = iterations;
369 tp.
Schedule(std::bind(&EndTest, &c0, &
c1, &mu2, &cv2,
371 std::bind(
test, cxt, std::placeholders::_1))));
383 int iterations,
int operations) {
385 return RunTestCommon(&cxt,
test,
threads, iterations, operations);
392 #if !defined(ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED)
393 static int RunTestWithInvariantDebugging(
void (*
test)(
TestContext *cxt,
int),
398 SetInvariantChecked(
false);
400 cxt.mu.EnableInvariantDebugging(
invariant, &cxt);
401 int ret = RunTestCommon(&cxt,
test,
threads, iterations, operations);
410 struct TimeoutBugStruct {
416 static void WaitForA(TimeoutBugStruct *
x) {
422 static bool NoAWaiters(TimeoutBugStruct *
x) {
return x->a_waiter_count == 0; }
437 auto pool = CreateDefaultPool();
442 state.release_mu.Lock();
444 state.barrier_mu.Lock();
445 state.barrier =
true;
446 state.barrier_mu.Unlock();
449 state.released_cv.Signal();
450 state.release_mu.Unlock();
454 state.barrier_mu.Unlock();
455 state.release_mu.Lock();
460 state.release =
true;
462 state.release_mu.Unlock();
467 TEST(
Mutex, CondVarWaitWithTimeoutSignalsAwait) {
478 auto pool = CreateDefaultPool();
483 state.release_mu.Lock();
485 state.barrier_mu.Lock();
486 state.barrier =
true;
487 state.barrier_mu.Unlock();
490 state.released_cv.Signal();
491 state.release_mu.Unlock();
495 state.barrier_mu.Unlock();
496 state.release_mu.Lock();
501 state.release =
true;
504 <<
"; Unrecoverable test failure: CondVar::WaitWithTimeout did not "
505 "unblock the absl::Mutex::Await call in another thread.";
507 state.release_mu.Unlock();
512 auto tp = CreateDefaultPool();
516 x.a_waiter_count = 2;
525 bool always_false =
false;
543 bool signal_unlocked;
545 CondVarWaitDeadlock() {
546 read_lock1 = GetParam() & (1 << 0);
547 read_lock2 = GetParam() & (1 << 1);
548 signal_unlocked = GetParam() & (1 << 2);
588 auto waiter1 = CreatePool(1);
589 auto waiter2 = CreatePool(1);
590 waiter1->Schedule([
this] { this->Waiter1(); });
591 waiter2->Schedule([
this] { this->Waiter2(); });
599 if (signal_unlocked) {
625 struct DequeueAllWakeableBugStruct {
628 int unfinished_count;
635 static void AcquireAsReader(DequeueAllWakeableBugStruct *
x) {
638 x->unfinished_count--;
639 x->done1 = (
x->unfinished_count == 0);
643 x->mu.ReaderUnlock();
647 x->done2 = (
x->finished_count == 0);
653 auto tp = CreateDefaultPool();
655 DequeueAllWakeableBugStruct
x;
656 x.unfinished_count = 2;
658 x.finished_count = 2;
662 tp->
Schedule(std::bind(&AcquireAsReader, &
x));
663 tp->
Schedule(std::bind(&AcquireAsReader, &
x));
677 struct LockWhenTestStruct {
682 bool waiting =
false;
685 static bool LockWhenTestIsCond(LockWhenTestStruct* s) {
692 static void LockWhenTestWaitForIsCond(LockWhenTestStruct* s) {
698 LockWhenTestStruct
s;
717 bool (*cond_eq_10)(
int *) = [](
int *
p) {
return *
p == 10; };
718 bool (*cond_lt_10)(
int *) = [](
int *
p) {
return *
p < 10; };
748 #if !defined(ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE)
759 struct ReaderDecrementBugStruct {
764 bool waiting_on_cond;
765 bool have_reader_lock;
771 static bool IsCond(
void *
v) {
772 ReaderDecrementBugStruct *
x =
reinterpret_cast<ReaderDecrementBugStruct *
>(
v);
774 x->waiting_on_cond =
true;
780 static bool AllDone(
void *
v) {
781 ReaderDecrementBugStruct *
x =
reinterpret_cast<ReaderDecrementBugStruct *
>(
v);
786 static void WaitForCond(ReaderDecrementBugStruct *
x) {
795 static void GetReadLock(ReaderDecrementBugStruct *
x) {
798 x->have_reader_lock =
true;
801 x->mu.ReaderUnlock();
810 ReaderDecrementBugStruct
x;
812 x.waiting_on_cond =
false;
813 x.have_reader_lock =
false;
833 x.mu.AssertReaderHeld();
850 #endif // !ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE
854 #ifdef ABSL_HAVE_THREAD_SANITIZER
860 for (
int i = 0;
i != 10;
i++) {
862 const int kNumLocks = 10;
863 auto mu = absl::make_unique<absl::Mutex[]>(kNumLocks);
864 for (
int j = 0;
j != kNumLocks;
j++) {
875 template <
class...
Args>
876 bool operator()(
Args...)
const {
881 struct DerivedTrue :
True {};
896 auto is_zero = [&
value] {
return value == 0; };
905 auto is_positive = std::bind(std::less<int>(), 0, std::cref(
value));
935 std::random_device dev;
936 std::mt19937
gen(dev());
937 std::uniform_int_distribution<int> random_millis(0, 15);
949 static bool IntIsZero(
int *
x) {
return *
x == 0; }
954 auto tp = CreateDefaultPool();
968 struct AcquireFromConditionStruct {
977 static bool ConditionWithAcquire(AcquireFromConditionStruct *
x) {
980 if (
x->value == 2 ||
x->value == 3) {
985 bool always_false =
false;
993 return x->value == 2 ||
x->value == 3;
996 static void WaitForCond2(AcquireFromConditionStruct *
x) {
1005 auto tp = CreateDefaultPool();
1007 AcquireFromConditionStruct
x;
1011 std::bind(&WaitForCond2, &
x));
1068 class ScopedDisableBazelTestWarnings {
1070 ScopedDisableBazelTestWarnings() {
1072 char file[MAX_PATH];
1073 if (GetEnvironmentVariableA(kVarName,
file,
sizeof(
file)) <
sizeof(
file)) {
1074 warnings_output_file_ =
file;
1075 SetEnvironmentVariableA(kVarName,
nullptr);
1079 if (
file !=
nullptr) {
1080 warnings_output_file_ =
file;
1086 ~ScopedDisableBazelTestWarnings() {
1087 if (!warnings_output_file_.empty()) {
1089 SetEnvironmentVariableA(kVarName, warnings_output_file_.c_str());
1091 setenv(kVarName, warnings_output_file_.c_str(), 0);
1097 static const char kVarName[];
1100 const char ScopedDisableBazelTestWarnings::kVarName[] =
1101 "TEST_WARNINGS_OUTPUT_FILE";
1103 #ifdef ABSL_HAVE_THREAD_SANITIZER
1105 TEST(
Mutex, DISABLED_DeadlockDetectorBazelWarning) {
1107 TEST(
Mutex, DeadlockDetectorBazelWarning) {
1113 ScopedDisableBazelTestWarnings disable_bazel_test_warnings;
1141 const int n_locks = 1 << 17;
1142 auto array_of_locks = absl::make_unique<absl::Mutex[]>(n_locks);
1143 for (
int i = 0;
i < n_locks;
i++) {
1146 for (
int j =
i;
j <
end;
j++) {
1147 array_of_locks[
j].Lock();
1149 for (
int j =
i;
j <
end;
j++) {
1150 array_of_locks[
j].Unlock();
1155 #ifdef ABSL_HAVE_THREAD_SANITIZER
1217 if (actual_delay < expected_delay) {
1219 "Actual delay %s was too short, expected %s (difference %s)",
1231 : TimeoutTestAllowedSchedulingDelay();
1232 if (actual_delay > expected_delay + tolerance) {
1234 "Actual delay %s was too long, expected %s (difference %s)",
1244 struct TimeoutTestParam {
1246 const char *from_file;
1251 bool use_absolute_deadline;
1267 bool expected_result;
1276 std::ostream &
operator<<(std::ostream &os,
const TimeoutTestParam ¶m) {
1277 return os <<
"from: " << param.from_file <<
":" << param.from_line
1278 <<
" use_absolute_deadline: "
1279 << (param.use_absolute_deadline ?
"true" :
"false")
1280 <<
" wait_timeout: " << param.wait_timeout
1281 <<
" satisfy_condition_delay: " << param.satisfy_condition_delay
1282 <<
" expected_result: "
1283 << (param.expected_result ?
"true" :
"false")
1284 <<
" expected_delay: " << param.expected_delay;
1288 std::ostringstream os;
1310 std::vector<TimeoutTestParam> MakeTimeoutTestParamValues() {
1314 const absl::Duration finite = 3 * TimeoutTestAllowedSchedulingDelay();
1321 std::vector<TimeoutTestParam>
values;
1322 for (
bool use_absolute_deadline : {
false,
true}) {
1327 values.push_back(TimeoutTestParam{
1328 __FILE__, __LINE__, use_absolute_deadline,
1336 values.push_back(TimeoutTestParam{
1337 __FILE__, __LINE__, use_absolute_deadline,
1345 values.push_back(TimeoutTestParam{
1346 __FILE__, __LINE__, use_absolute_deadline,
1357 values.push_back(TimeoutTestParam{
1358 __FILE__, __LINE__, use_absolute_deadline,
1366 values.push_back(TimeoutTestParam{
1367 __FILE__, __LINE__, use_absolute_deadline,
1378 values.push_back(TimeoutTestParam{
1379 __FILE__, __LINE__, use_absolute_deadline,
1387 values.push_back(TimeoutTestParam{
1388 __FILE__, __LINE__, use_absolute_deadline,
1396 values.push_back(TimeoutTestParam{
1397 __FILE__, __LINE__, use_absolute_deadline,
1405 values.push_back(TimeoutTestParam{
1406 __FILE__, __LINE__, use_absolute_deadline,
1420 TEST_P(TimeoutTest, Await) {
1421 const TimeoutTestParam params = GetParam();
1427 for (
int attempt = 1;; ++attempt) {
1433 std::unique_ptr<absl::synchronization_internal::ThreadPool>
pool =
1434 CreateDefaultPool();
1435 RunAfterDelay(params.satisfy_condition_delay,
pool.get(), [&] {
1436 absl::MutexLock l(&mu);
1444 params.use_absolute_deadline
1446 :
mu.AwaitWithTimeout(
cond, params.wait_timeout);
1454 TEST_P(TimeoutTest, LockWhen) {
1455 const TimeoutTestParam params = GetParam();
1461 for (
int attempt = 1;; ++attempt) {
1467 std::unique_ptr<absl::synchronization_internal::ThreadPool>
pool =
1468 CreateDefaultPool();
1469 RunAfterDelay(params.satisfy_condition_delay,
pool.get(), [&] {
1470 absl::MutexLock l(&mu);
1477 params.use_absolute_deadline
1479 :
mu.LockWhenWithTimeout(
cond, params.wait_timeout);
1489 TEST_P(TimeoutTest, ReaderLockWhen) {
1490 const TimeoutTestParam params = GetParam();
1496 for (
int attempt = 0;; ++attempt) {
1502 std::unique_ptr<absl::synchronization_internal::ThreadPool>
pool =
1503 CreateDefaultPool();
1504 RunAfterDelay(params.satisfy_condition_delay,
pool.get(), [&] {
1505 absl::MutexLock l(&mu);
1511 params.use_absolute_deadline
1515 params.wait_timeout);
1525 TEST_P(TimeoutTest, Wait) {
1526 const TimeoutTestParam params = GetParam();
1532 for (
int attempt = 0;; ++attempt) {
1539 std::unique_ptr<absl::synchronization_internal::ThreadPool>
pool =
1540 CreateDefaultPool();
1541 RunAfterDelay(params.satisfy_condition_delay,
pool.get(), [&] {
1542 absl::MutexLock l(&mu);
1552 if (params.use_absolute_deadline ?
cv.WaitWithDeadline(&
mu, deadline)
1573 logged_mutex.
Lock();
1578 logged_mutex.
Lock();
1587 static std::vector<int> AllThreadCountValues() {
1588 if (kExtendedTest) {
1589 return {2, 4, 8, 10, 16, 20, 24, 30, 32};
1604 static int ScaleIterations(
int x) {
1608 #if defined(ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE)
1617 int iterations = ScaleIterations(10000000) /
threads;
1618 int operations =
threads * iterations;
1620 #if !defined(ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED)
1621 iterations =
std::min(iterations, 10);
1622 operations =
threads * iterations;
1624 operations, CheckSumG0G1),
1629 TEST_P(MutexVariableThreadCountTest, Try) {
1631 int iterations = 1000000 /
threads;
1632 int operations = iterations *
threads;
1634 #if !defined(ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED)
1635 iterations =
std::min(iterations, 10);
1636 operations =
threads * iterations;
1638 operations, CheckSumG0G1),
1643 TEST_P(MutexVariableThreadCountTest, R20ms) {
1645 int iterations = 100;
1646 int operations = iterations *
threads;
1650 TEST_P(MutexVariableThreadCountTest, RW) {
1652 int iterations = ScaleIterations(20000000) /
threads;
1653 int operations = iterations *
threads;
1655 #if !defined(ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED)
1656 iterations =
std::min(iterations, 10);
1657 operations =
threads * iterations;
1659 operations, CheckSumG0G1),
1664 TEST_P(MutexVariableThreadCountTest, Await) {
1666 int iterations = ScaleIterations(500000);
1667 int operations = iterations;
1671 TEST_P(MutexVariableThreadCountTest, SignalAll) {
1673 int iterations = 200000 /
threads;
1674 int operations = iterations;
1681 int iterations = 200000;
1682 int operations = iterations;
1688 int iterations = 1000;
1689 int operations = iterations;
1710 #if defined(__wasm__) || defined(__asmjs__)
1711 constexpr
int kThreads = 1;
1713 constexpr
int kThreads = 100;
1715 std::vector<std::thread>
top;
1716 for (
unsigned i = 0;
i < 2 * std::thread::hardware_concurrency();
i++) {
1717 top.emplace_back([&]() {
1718 for (
int i = 0;
i < kThreads;
i++) {
1730 for (
auto &th :
top) th.join();