// Copyright 2020 The Abseil Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "absl/flags/internal/sequence_lock.h" #include #include #include // NOLINT(build/c++11) #include #include #include "gtest/gtest.h" #include "absl/base/internal/sysinfo.h" #include "absl/container/fixed_array.h" #include "absl/time/clock.h" namespace { namespace flags = absl::flags_internal; class ConcurrentSequenceLockTest : public testing::TestWithParam> { public: ConcurrentSequenceLockTest() : buf_bytes_(std::get<0>(GetParam())), num_threads_(std::get<1>(GetParam())) {} protected: const int buf_bytes_; const int num_threads_; }; TEST_P(ConcurrentSequenceLockTest, ReadAndWrite) { const int buf_words = flags::AlignUp(buf_bytes_, sizeof(uint64_t)) / sizeof(uint64_t); // The buffer that will be protected by the SequenceLock. absl::FixedArray> protected_buf(buf_words); for (auto& v : protected_buf) v = -1; flags::SequenceLock seq_lock; std::atomic stop{false}; std::atomic bad_reads{0}; std::atomic good_reads{0}; std::atomic unsuccessful_reads{0}; // Start a bunch of threads which read 'protected_buf' under the sequence // lock. The main thread will concurrently update 'protected_buf'. The updates // always consist of an array of identical integers. The reader ensures that // any data it reads matches that pattern (i.e. the reads are not "torn"). std::vector threads; for (int i = 0; i < num_threads_; i++) { threads.emplace_back([&]() { absl::FixedArray local_buf(buf_bytes_); while (!stop.load(std::memory_order_relaxed)) { if (seq_lock.TryRead(local_buf.data(), protected_buf.data(), buf_bytes_)) { bool good = true; for (const auto& v : local_buf) { if (v != local_buf[0]) good = false; } if (good) { good_reads.fetch_add(1, std::memory_order_relaxed); } else { bad_reads.fetch_add(1, std::memory_order_relaxed); } } else { unsuccessful_reads.fetch_add(1, std::memory_order_relaxed); } } }); } while (unsuccessful_reads.load(std::memory_order_relaxed) < num_threads_) { absl::SleepFor(absl::Milliseconds(1)); } seq_lock.MarkInitialized(); // Run a maximum of 5 seconds. On Windows, the scheduler behavior seems // somewhat unfair and without an explicit timeout for this loop, the tests // can run a long time. absl::Time deadline = absl::Now() + absl::Seconds(5); for (int i = 0; i < 100 && absl::Now() < deadline; i++) { absl::FixedArray writer_buf(buf_bytes_); for (auto& v : writer_buf) v = i; seq_lock.Write(protected_buf.data(), writer_buf.data(), buf_bytes_); absl::SleepFor(absl::Microseconds(10)); } stop.store(true, std::memory_order_relaxed); for (auto& t : threads) t.join(); ASSERT_GE(good_reads, 0); ASSERT_EQ(bad_reads, 0); } // Simple helper for generating a range of thread counts. // Generates [low, low*scale, low*scale^2, ...high) // (even if high is between low*scale^k and low*scale^(k+1)). std::vector MultiplicativeRange(int low, int high, int scale) { std::vector result; for (int current = low; current < high; current *= scale) { result.push_back(current); } result.push_back(high); return result; } #ifndef ABSL_HAVE_THREAD_SANITIZER const int kMaxThreads = absl::base_internal::NumCPUs(); #else // With TSAN, a lot of threads contending for atomic access on the sequence // lock make this test run too slowly. const int kMaxThreads = std::min(absl::base_internal::NumCPUs(), 4); #endif // Return all of the interesting buffer sizes worth testing: // powers of two and adjacent values. std::vector InterestingBufferSizes() { std::vector ret; for (int v : MultiplicativeRange(1, 128, 2)) { ret.push_back(v); if (v > 1) { ret.push_back(v - 1); } ret.push_back(v + 1); } return ret; } INSTANTIATE_TEST_SUITE_P( TestManyByteSizes, ConcurrentSequenceLockTest, testing::Combine( // Buffer size (bytes). testing::ValuesIn(InterestingBufferSizes()), // Number of reader threads. testing::ValuesIn(MultiplicativeRange(1, kMaxThreads, 2)))); // Simple single-threaded test, parameterized by the size of the buffer to be // protected. class SequenceLockTest : public testing::TestWithParam {}; TEST_P(SequenceLockTest, SingleThreaded) { const int size = GetParam(); absl::FixedArray> protected_buf( flags::AlignUp(size, sizeof(uint64_t)) / sizeof(uint64_t)); flags::SequenceLock seq_lock; seq_lock.MarkInitialized(); std::vector src_buf(size, 'x'); seq_lock.Write(protected_buf.data(), src_buf.data(), size); std::vector dst_buf(size, '0'); ASSERT_TRUE(seq_lock.TryRead(dst_buf.data(), protected_buf.data(), size)); ASSERT_EQ(src_buf, dst_buf); } INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, SequenceLockTest, // Buffer size (bytes). testing::Range(1, 128)); } // namespace