diff options
author | TreeHugger Robot <treehugger-gerrit@google.com> | 2020-07-23 23:07:18 +0000 |
---|---|---|
committer | Android (Google) Code Review <android-gerrit@google.com> | 2020-07-23 23:07:18 +0000 |
commit | 9e9b79187bff6d54f8e15db1978c157f8dda9335 (patch) | |
tree | 9ba6a46a7e4cd59e1018b94136f46578efe31f2e /rtc_base | |
parent | 37f9b0ea9ea6a8c490bdb0dc2f44a586b01c8ab2 (diff) | |
parent | 206ccd0b36df69a0d0d0d26ddf7c4ead20202f91 (diff) | |
download | webrtc-9e9b79187bff6d54f8e15db1978c157f8dda9335.tar.gz |
Merge changes Ida3bfe62,I2d596942
* changes:
Merge remote tracking branch 'upstream-master'
Generate new Android.bp file and correct build errors
Diffstat (limited to 'rtc_base')
96 files changed, 1953 insertions, 1300 deletions
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index a61ede4ac9..73bca85efa 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -55,12 +55,13 @@ rtc_library("rtc_base_approved") { ":type_traits", "../api:array_view", "../api:scoped_refptr", + "synchronization:mutex", "system:arch", "system:rtc_export", "system:unused", "third_party/base64", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] public_deps = [] # no-presubmit-check TODO(webrtc:8603) sources = [ @@ -154,6 +155,7 @@ rtc_library("platform_thread_types") { "platform_thread_types.cc", "platform_thread_types.h", ] + deps = [ ":macromagic" ] } rtc_source_set("refcount") { @@ -168,15 +170,15 @@ rtc_source_set("refcount") { rtc_library("criticalsection") { sources = [ - "critical_section.cc", - "critical_section.h", + "deprecated/recursive_critical_section.cc", + "deprecated/recursive_critical_section.h", ] deps = [ ":atomicops", ":checks", ":macromagic", ":platform_thread_types", - "system:rtc_export", + "synchronization:yield", "system:unused", ] } @@ -187,6 +189,7 @@ rtc_library("platform_thread") { ":rtc_task_queue_libevent", ":rtc_task_queue_win", ":rtc_task_queue_stdlib", + "synchronization:mutex", "synchronization:sequence_checker", ] sources = [ @@ -201,8 +204,8 @@ rtc_library("platform_thread") { ":rtc_event", ":thread_checker", ":timeutils", - "//third_party/abseil-cpp/absl/strings", ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] } rtc_library("rtc_event") { @@ -225,8 +228,8 @@ rtc_library("rtc_event") { ":checks", "synchronization:yield_policy", "system:warn_current_thread_is_deadlocked", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } } @@ -240,6 +243,9 @@ rtc_library("logging") { ":platform_thread_types", ":stringutils", ":timeutils", + "synchronization:mutex", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/meta:type_traits", "//third_party/abseil-cpp/absl/strings", @@ -264,7 +270,7 @@ rtc_library("logging") { deps += [ "system:inline" ] if (is_mac) { - libs += [ "Foundation.framework" ] + frameworks = [ "Foundation.framework" ] } # logging.h needs the deprecation header while downstream projects are @@ -301,6 +307,8 @@ rtc_library("checks") { ":safe_compare", "system:inline", "system:rtc_export", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/meta:type_traits", "//third_party/abseil-cpp/absl/strings", ] @@ -317,13 +325,14 @@ rtc_library("rate_limiter") { deps = [ ":rtc_base_approved", "../system_wrappers", - "//third_party/abseil-cpp/absl/types:optional", + "synchronization:mutex", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_source_set("sanitizer") { sources = [ "sanitizer.h" ] - deps = [ "//third_party/abseil-cpp/absl/meta:type_traits" ] + absl_deps = [ "//third_party/abseil-cpp/absl/meta:type_traits" ] } rtc_source_set("bounded_inline_vector") { @@ -398,6 +407,8 @@ rtc_library("stringutils") { ":macromagic", ":safe_minmax", "../api:array_view", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/types:optional", ] @@ -433,8 +444,8 @@ rtc_library("rtc_task_queue") { "../api/task_queue", "system:rtc_export", "task_utils:to_queued_task", - "//third_party/abseil-cpp/absl/memory", ] + absl_deps = [ "//third_party/abseil-cpp/absl/memory" ] } rtc_source_set("rtc_operations_chain") { @@ -469,6 +480,9 @@ if (rtc_enable_libevent) { ":safe_conversions", ":timeutils", "../api/task_queue", + "synchronization:mutex", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/container:inlined_vector", "//third_party/abseil-cpp/absl/strings", ] @@ -489,9 +503,10 @@ if (is_mac || is_ios) { ":checks", ":logging", "../api/task_queue", + "synchronization:mutex", "system:gcd_helpers", - "//third_party/abseil-cpp/absl/strings", ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] } } @@ -512,8 +527,9 @@ if (is_win) { ":safe_conversions", ":timeutils", "../api/task_queue", - "//third_party/abseil-cpp/absl/strings", + "synchronization:mutex", ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] } } @@ -532,8 +548,9 @@ rtc_library("rtc_task_queue_stdlib") { ":safe_conversions", ":timeutils", "../api/task_queue", - "//third_party/abseil-cpp/absl/strings", + "synchronization:mutex", ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] } rtc_library("weak_ptr") { @@ -576,6 +593,8 @@ rtc_library("rtc_numerics") { "../api/units:data_rate", "../api/units:time_delta", "../api/units:timestamp", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/types:optional", ] @@ -760,6 +779,7 @@ rtc_library("rtc_base") { deps = [ ":checks", ":deprecation", + ":rtc_task_queue", ":stringutils", "../api:array_view", "../api:function_view", @@ -767,12 +787,17 @@ rtc_library("rtc_base") { "../api/task_queue", "../system_wrappers:field_trial", "network:sent_packet", + "synchronization:mutex", + "synchronization:sequence_checker", "system:file_wrapper", "system:inline", "system:rtc_export", + "task_utils:pending_task_safety_flag", "task_utils:to_queued_task", "third_party/base64", "third_party/sigslot", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", @@ -801,6 +826,8 @@ rtc_library("rtc_base") { "crypt_string.h", "data_rate_limiter.cc", "data_rate_limiter.h", + "deprecated/signal_thread.cc", + "deprecated/signal_thread.h", "dscp.h", "file_rotating_stream.cc", "file_rotating_stream.h", @@ -853,7 +880,6 @@ rtc_library("rtc_base") { "rtc_certificate.h", "rtc_certificate_generator.cc", "rtc_certificate_generator.h", - "signal_thread.cc", "signal_thread.h", "sigslot_repeater.h", "socket.cc", @@ -942,7 +968,7 @@ rtc_library("rtc_base") { } if (is_ios) { - libs += [ + frameworks = [ "CFNetwork.framework", "Foundation.framework", "Security.framework", @@ -1000,8 +1026,8 @@ rtc_library("gunit_helpers") { ":rtc_base_tests_utils", ":stringutils", "../test:test_support", - "//third_party/abseil-cpp/absl/strings", ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] } rtc_library("testclient") { @@ -1017,6 +1043,7 @@ rtc_library("testclient") { ":rtc_base", ":rtc_base_tests_utils", ":timeutils", + "synchronization:mutex", ] } @@ -1065,7 +1092,10 @@ rtc_library("rtc_base_tests_utils") { "../api/units:time_delta", "../api/units:timestamp", "memory:fifo_buffer", + "synchronization:mutex", "third_party/sigslot", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/memory", ] @@ -1087,8 +1117,8 @@ rtc_library("task_queue_for_test") { "../api/task_queue", "../api/task_queue:default_task_queue_factory", "task_utils:to_queued_task", - "//third_party/abseil-cpp/absl/strings", ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] } if (rtc_include_tests) { @@ -1100,6 +1130,7 @@ if (rtc_include_tests) { ":rtc_base", ":rtc_base_tests_utils", "../test:test_support", + "synchronization:mutex", "third_party/sigslot", ] } @@ -1128,8 +1159,8 @@ if (rtc_include_tests) { "../test:test_support", "third_party/sigslot", "//testing/gtest", - "//third_party/abseil-cpp/absl/memory", ] + absl_deps = [ "//third_party/abseil-cpp/absl/memory" ] if (is_win) { sources += [ "win32_socket_server_unittest.cc" ] } @@ -1149,7 +1180,7 @@ if (rtc_include_tests) { "byte_order_unittest.cc", "checks_unittest.cc", "copy_on_write_buffer_unittest.cc", - "critical_section_unittest.cc", + "deprecated/recursive_critical_section_unittest.cc", "event_tracer_unittest.cc", "event_unittest.cc", "logging_unittest.cc", @@ -1208,9 +1239,12 @@ if (rtc_include_tests) { "../test:test_main", "../test:test_support", "memory:unittests", + "synchronization:mutex", "task_utils:to_queued_task", "third_party/base64", "third_party/sigslot", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/memory", ] @@ -1228,8 +1262,8 @@ if (rtc_include_tests) { ":task_queue_for_test", "../test:test_main", "../test:test_support", - "//third_party/abseil-cpp/absl/memory", ] + absl_deps = [ "//third_party/abseil-cpp/absl/memory" ] } rtc_library("rtc_operations_chain_unittests") { @@ -1279,8 +1313,8 @@ if (rtc_include_tests) { ":rtc_numerics", "../test:test_main", "../test:test_support", - "//third_party/abseil-cpp/absl/algorithm:container", ] + absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container" ] } rtc_library("rtc_json_unittests") { @@ -1304,6 +1338,7 @@ if (rtc_include_tests) { "callback_unittest.cc", "crc32_unittest.cc", "data_rate_limiter_unittest.cc", + "deprecated/signal_thread_unittest.cc", "fake_clock_unittest.cc", "helpers_unittest.cc", "ip_address_unittest.cc", @@ -1316,7 +1351,6 @@ if (rtc_include_tests) { "rolling_accumulator_unittest.cc", "rtc_certificate_generator_unittest.cc", "rtc_certificate_unittest.cc", - "signal_thread_unittest.cc", "sigslot_tester_unittest.cc", "test_client_unittest.cc", "thread_unittest.cc", @@ -1353,9 +1387,12 @@ if (rtc_include_tests) { "../test:test_main", "../test:test_support", "memory:fifo_buffer", + "synchronization:mutex", "synchronization:synchronization_unittests", "task_utils:to_queued_task", "third_party/sigslot", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", diff --git a/rtc_base/DEPS b/rtc_base/DEPS index 679d06dfc8..c9f7dc5898 100644 --- a/rtc_base/DEPS +++ b/rtc_base/DEPS @@ -1,8 +1,8 @@ include_rules = [ "+base/third_party/libevent", "+json", - "+third_party/jsoncpp", "+system_wrappers", + "+third_party/jsoncpp", ] specific_include_rules = { diff --git a/rtc_base/async_invoker.cc b/rtc_base/async_invoker.cc index 26f8c523ab..8b410a4561 100644 --- a/rtc_base/async_invoker.cc +++ b/rtc_base/async_invoker.cc @@ -101,28 +101,6 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, new ScopedMessageData<AsyncClosure>(std::move(closure))); } -GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) { - thread_->SignalQueueDestroyed.connect(this, - &GuardedAsyncInvoker::ThreadDestroyed); -} - -GuardedAsyncInvoker::~GuardedAsyncInvoker() {} - -bool GuardedAsyncInvoker::Flush(uint32_t id) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.Flush(thread_, id); - return true; -} - -void GuardedAsyncInvoker::ThreadDestroyed() { - CritScope cs(&crit_); - // We should never get more than one notification about the thread dying. - RTC_DCHECK(thread_ != nullptr); - thread_ = nullptr; -} - AsyncClosure::AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) { invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed); diff --git a/rtc_base/async_invoker.h b/rtc_base/async_invoker.h index f15955d811..ed2df1cdcb 100644 --- a/rtc_base/async_invoker.h +++ b/rtc_base/async_invoker.h @@ -169,97 +169,6 @@ class AsyncInvoker : public MessageHandler { RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker); }; -// Similar to AsyncInvoker, but guards against the Thread being destroyed while -// there are outstanding dangling pointers to it. It will connect to the current -// thread in the constructor, and will get notified when that thread is -// destroyed. After GuardedAsyncInvoker is constructed, it can be used from -// other threads to post functors to the thread it was constructed on. If that -// thread dies, any further calls to AsyncInvoke() will be safely ignored. -class GuardedAsyncInvoker : public sigslot::has_slots<> { - public: - GuardedAsyncInvoker(); - ~GuardedAsyncInvoker() override; - - // Synchronously execute all outstanding calls we own, and wait for calls to - // complete before returning. Optionally filter by message id. The destructor - // will not wait for outstanding calls, so if that behavior is desired, call - // Flush() first. Returns false if the thread has died. - bool Flush(uint32_t id = MQID_ANY); - - // Call |functor| asynchronously with no callback upon completion. Returns - // immediately. Returns false if the thread has died. - template <class ReturnT, class FunctorT> - bool AsyncInvoke(const Location& posted_from, - FunctorT&& functor, - uint32_t id = 0) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.AsyncInvoke<ReturnT, FunctorT>( - posted_from, thread_, std::forward<FunctorT>(functor), id); - return true; - } - - // Call |functor| asynchronously with |delay_ms|, with no callback upon - // completion. Returns immediately. Returns false if the thread has died. - template <class ReturnT, class FunctorT> - bool AsyncInvokeDelayed(const Location& posted_from, - FunctorT&& functor, - uint32_t delay_ms, - uint32_t id = 0) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.AsyncInvokeDelayed<ReturnT, FunctorT>( - posted_from, thread_, std::forward<FunctorT>(functor), delay_ms, id); - return true; - } - - // Call |functor| asynchronously, calling |callback| when done. Returns false - // if the thread has died. - template <class ReturnT, class FunctorT, class HostT> - bool AsyncInvoke(const Location& posted_from, - const Location& callback_posted_from, - FunctorT&& functor, - void (HostT::*callback)(ReturnT), - HostT* callback_host, - uint32_t id = 0) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.AsyncInvoke<ReturnT, FunctorT, HostT>( - posted_from, callback_posted_from, thread_, - std::forward<FunctorT>(functor), callback, callback_host, id); - return true; - } - - // Call |functor| asynchronously calling |callback| when done. Overloaded for - // void return. Returns false if the thread has died. - template <class ReturnT, class FunctorT, class HostT> - bool AsyncInvoke(const Location& posted_from, - const Location& callback_posted_from, - FunctorT&& functor, - void (HostT::*callback)(), - HostT* callback_host, - uint32_t id = 0) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.AsyncInvoke<ReturnT, FunctorT, HostT>( - posted_from, callback_posted_from, thread_, - std::forward<FunctorT>(functor), callback, callback_host, id); - return true; - } - - private: - // Callback when |thread_| is destroyed. - void ThreadDestroyed(); - - CriticalSection crit_; - Thread* thread_ RTC_GUARDED_BY(crit_); - AsyncInvoker invoker_ RTC_GUARDED_BY(crit_); -}; - } // namespace rtc #endif // RTC_BASE_ASYNC_INVOKER_H_ diff --git a/rtc_base/async_invoker_inl.h b/rtc_base/async_invoker_inl.h index bd9b0d1aa1..6307afe220 100644 --- a/rtc_base/async_invoker_inl.h +++ b/rtc_base/async_invoker_inl.h @@ -13,7 +13,6 @@ #include "api/scoped_refptr.h" #include "rtc_base/bind.h" -#include "rtc_base/critical_section.h" #include "rtc_base/event.h" #include "rtc_base/message_handler.h" #include "rtc_base/ref_counted_object.h" diff --git a/rtc_base/bit_buffer.cc b/rtc_base/bit_buffer.cc index a6dc1c7ab8..540141fe52 100644 --- a/rtc_base/bit_buffer.cc +++ b/rtc_base/bit_buffer.cc @@ -162,6 +162,12 @@ bool BitBuffer::ConsumeBits(size_t bit_count) { bool BitBuffer::ReadNonSymmetric(uint32_t* val, uint32_t num_values) { RTC_DCHECK_GT(num_values, 0); RTC_DCHECK_LE(num_values, uint32_t{1} << 31); + if (num_values == 1) { + // When there is only one possible value, it requires zero bits to store it. + // But ReadBits doesn't support reading zero bits. + *val = 0; + return true; + } size_t count_bits = CountBits(num_values); uint32_t num_min_bits_values = (uint32_t{1} << count_bits) - num_values; @@ -308,6 +314,11 @@ bool BitBufferWriter::WriteBits(uint64_t val, size_t bit_count) { bool BitBufferWriter::WriteNonSymmetric(uint32_t val, uint32_t num_values) { RTC_DCHECK_LT(val, num_values); RTC_DCHECK_LE(num_values, uint32_t{1} << 31); + if (num_values == 1) { + // When there is only one possible value, it requires zero bits to store it. + // But WriteBits doesn't support writing zero bits. + return true; + } size_t count_bits = CountBits(num_values); uint32_t num_min_bits_values = (uint32_t{1} << count_bits) - num_values; diff --git a/rtc_base/bit_buffer_unittest.cc b/rtc_base/bit_buffer_unittest.cc index b3521b4951..656682c2ef 100644 --- a/rtc_base/bit_buffer_unittest.cc +++ b/rtc_base/bit_buffer_unittest.cc @@ -142,7 +142,7 @@ TEST(BitBufferTest, ReadBits) { EXPECT_FALSE(buffer.ReadBits(&val, 1)); } -TEST(BitBufferTest, SetOffsetValues) { +TEST(BitBufferDeathTest, SetOffsetValues) { uint8_t bytes[4] = {0}; BitBufferWriter buffer(bytes, 4); @@ -254,6 +254,28 @@ TEST(BitBufferWriterTest, NonSymmetricReadsMatchesWrites) { EXPECT_THAT(values, ElementsAre(0, 1, 2, 3, 4, 5)); } +TEST(BitBufferTest, ReadNonSymmetricOnlyValueConsumesNoBits) { + const uint8_t bytes[2] = {}; + BitBuffer reader(bytes, 2); + uint32_t value = 0xFFFFFFFF; + ASSERT_EQ(reader.RemainingBitCount(), 16u); + + EXPECT_TRUE(reader.ReadNonSymmetric(&value, /*num_values=*/1)); + + EXPECT_EQ(value, 0u); + EXPECT_EQ(reader.RemainingBitCount(), 16u); +} + +TEST(BitBufferWriterTest, WriteNonSymmetricOnlyValueConsumesNoBits) { + uint8_t bytes[2] = {}; + BitBufferWriter writer(bytes, 2); + ASSERT_EQ(writer.RemainingBitCount(), 16u); + + EXPECT_TRUE(writer.WriteNonSymmetric(0, /*num_values=*/1)); + + EXPECT_EQ(writer.RemainingBitCount(), 16u); +} + uint64_t GolombEncoded(uint32_t val) { val++; uint32_t bit_counter = val; diff --git a/rtc_base/buffer.h b/rtc_base/buffer.h index 3048b9179f..d1639e2f71 100644 --- a/rtc_base/buffer.h +++ b/rtc_base/buffer.h @@ -370,7 +370,9 @@ class BufferT { : capacity; std::unique_ptr<T[]> new_data(new T[new_capacity]); - std::memcpy(new_data.get(), data_.get(), size_ * sizeof(T)); + if (data_ != nullptr) { + std::memcpy(new_data.get(), data_.get(), size_ * sizeof(T)); + } MaybeZeroCompleteBuffer(); data_ = std::move(new_data); capacity_ = new_capacity; diff --git a/rtc_base/buffer_queue.cc b/rtc_base/buffer_queue.cc index 445045ceea..adad9dda17 100644 --- a/rtc_base/buffer_queue.cc +++ b/rtc_base/buffer_queue.cc @@ -21,7 +21,7 @@ BufferQueue::BufferQueue(size_t capacity, size_t default_size) : capacity_(capacity), default_size_(default_size) {} BufferQueue::~BufferQueue() { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); for (Buffer* buffer : queue_) { delete buffer; @@ -32,12 +32,12 @@ BufferQueue::~BufferQueue() { } size_t BufferQueue::size() const { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); return queue_.size(); } void BufferQueue::Clear() { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); while (!queue_.empty()) { free_list_.push_back(queue_.front()); queue_.pop_front(); @@ -45,7 +45,7 @@ void BufferQueue::Clear() { } bool BufferQueue::ReadFront(void* buffer, size_t bytes, size_t* bytes_read) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); if (queue_.empty()) { return false; } @@ -69,7 +69,7 @@ bool BufferQueue::ReadFront(void* buffer, size_t bytes, size_t* bytes_read) { bool BufferQueue::WriteBack(const void* buffer, size_t bytes, size_t* bytes_written) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); if (queue_.size() == capacity_) { return false; } diff --git a/rtc_base/buffer_queue.h b/rtc_base/buffer_queue.h index 5cb18d0220..29d1a5b136 100644 --- a/rtc_base/buffer_queue.h +++ b/rtc_base/buffer_queue.h @@ -18,7 +18,7 @@ #include "rtc_base/buffer.h" #include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" namespace rtc { @@ -52,9 +52,9 @@ class BufferQueue { private: size_t capacity_; size_t default_size_; - CriticalSection crit_; - std::deque<Buffer*> queue_ RTC_GUARDED_BY(crit_); - std::vector<Buffer*> free_list_ RTC_GUARDED_BY(crit_); + mutable webrtc::Mutex mutex_; + std::deque<Buffer*> queue_ RTC_GUARDED_BY(mutex_); + std::vector<Buffer*> free_list_ RTC_GUARDED_BY(mutex_); RTC_DISALLOW_COPY_AND_ASSIGN(BufferQueue); }; diff --git a/rtc_base/buffer_unittest.cc b/rtc_base/buffer_unittest.cc index 3e7396dd2c..8beae43cf9 100644 --- a/rtc_base/buffer_unittest.cc +++ b/rtc_base/buffer_unittest.cc @@ -447,7 +447,7 @@ TEST(BufferTest, TestStruct) { EXPECT_EQ(kObsidian, buf[2].stone); } -TEST(BufferTest, DieOnUseAfterMove) { +TEST(BufferDeathTest, DieOnUseAfterMove) { Buffer buf(17); Buffer buf2 = std::move(buf); EXPECT_EQ(buf2.size(), 17u); diff --git a/rtc_base/checks.h b/rtc_base/checks.h index 2fde3f6640..61c074ac82 100644 --- a/rtc_base/checks.h +++ b/rtc_base/checks.h @@ -69,7 +69,7 @@ RTC_NORETURN void rtc_FatalMessage(const char* file, int line, const char* msg); // the reason that it's better to terminate might simply be that the error // handling code isn't in place yet; in production, the reason might be that // the author of the code truly believes that x will always be true, but that -// she recognizes that if she is wrong, abrupt and unpleasant process +// they recognizes that if they are wrong, abrupt and unpleasant process // termination is still better than carrying on with the assumption violated. // // RTC_CHECK always evaluates its argument, so it's OK for x to have side diff --git a/rtc_base/checks_unittest.cc b/rtc_base/checks_unittest.cc index e6e094e597..91e04cf6a1 100644 --- a/rtc_base/checks_unittest.cc +++ b/rtc_base/checks_unittest.cc @@ -19,7 +19,7 @@ TEST(ChecksTest, ExpressionNotEvaluatedWhenCheckPassing) { } #if GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) -TEST(ChecksTest, Checks) { +TEST(ChecksDeathTest, Checks) { #if RTC_CHECK_MSG_ENABLED EXPECT_DEATH(FATAL() << "message", "\n\n#\n" diff --git a/rtc_base/critical_section.cc b/rtc_base/deprecated/recursive_critical_section.cc index 1969edefa5..068b9aa808 100644 --- a/rtc_base/critical_section.cc +++ b/rtc_base/deprecated/recursive_critical_section.cc @@ -8,17 +8,16 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "rtc_base/critical_section.h" +#include "rtc_base/deprecated/recursive_critical_section.h" #include <time.h> #include "rtc_base/atomic_ops.h" #include "rtc_base/checks.h" #include "rtc_base/platform_thread_types.h" +#include "rtc_base/synchronization/yield.h" #include "rtc_base/system/unused.h" -// TODO(tommi): Split this file up to per-platform implementation files. - #if RTC_DCHECK_IS_ON #define RTC_CS_DEBUG_CODE(x) x #else // !RTC_DCHECK_IS_ON @@ -27,7 +26,7 @@ namespace rtc { -CriticalSection::CriticalSection() { +RecursiveCriticalSection::RecursiveCriticalSection() { #if defined(WEBRTC_WIN) InitializeCriticalSection(&crit_); #elif defined(WEBRTC_POSIX) @@ -42,7 +41,7 @@ CriticalSection::CriticalSection() { pthread_mutexattr_settype(&mutex_attribute, PTHREAD_MUTEX_RECURSIVE); #if defined(WEBRTC_MAC) pthread_mutexattr_setpolicy_np(&mutex_attribute, - _PTHREAD_MUTEX_POLICY_FAIRSHARE); + _PTHREAD_MUTEX_POLICY_FIRSTFIT); #endif pthread_mutex_init(&mutex_, &mutex_attribute); pthread_mutexattr_destroy(&mutex_attribute); @@ -56,7 +55,7 @@ CriticalSection::CriticalSection() { #endif } -CriticalSection::~CriticalSection() { +RecursiveCriticalSection::~RecursiveCriticalSection() { #if defined(WEBRTC_WIN) DeleteCriticalSection(&crit_); #elif defined(WEBRTC_POSIX) @@ -70,7 +69,7 @@ CriticalSection::~CriticalSection() { #endif } -void CriticalSection::Enter() const RTC_EXCLUSIVE_LOCK_FUNCTION() { +void RecursiveCriticalSection::Enter() const RTC_EXCLUSIVE_LOCK_FUNCTION() { #if defined(WEBRTC_WIN) EnterCriticalSection(&crit_); #elif defined(WEBRTC_POSIX) @@ -129,7 +128,8 @@ void CriticalSection::Enter() const RTC_EXCLUSIVE_LOCK_FUNCTION() { #endif } -bool CriticalSection::TryEnter() const RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) { +bool RecursiveCriticalSection::TryEnter() const + RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) { #if defined(WEBRTC_WIN) return TryEnterCriticalSection(&crit_) != FALSE; #elif defined(WEBRTC_POSIX) @@ -162,7 +162,7 @@ bool CriticalSection::TryEnter() const RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) { #endif } -void CriticalSection::Leave() const RTC_UNLOCK_FUNCTION() { +void RecursiveCriticalSection::Leave() const RTC_UNLOCK_FUNCTION() { RTC_DCHECK(CurrentThreadIsOwner()); #if defined(WEBRTC_WIN) LeaveCriticalSection(&crit_); @@ -190,7 +190,7 @@ void CriticalSection::Leave() const RTC_UNLOCK_FUNCTION() { #endif } -bool CriticalSection::CurrentThreadIsOwner() const { +bool RecursiveCriticalSection::CurrentThreadIsOwner() const { #if defined(WEBRTC_WIN) // OwningThread has type HANDLE but actually contains the Thread ID: // http://stackoverflow.com/questions/12675301/why-is-the-owningthread-member-of-critical-section-of-type-handle-when-it-is-de @@ -209,41 +209,11 @@ bool CriticalSection::CurrentThreadIsOwner() const { #endif } -CritScope::CritScope(const CriticalSection* cs) : cs_(cs) { +CritScope::CritScope(const RecursiveCriticalSection* cs) : cs_(cs) { cs_->Enter(); } CritScope::~CritScope() { cs_->Leave(); } -void GlobalLock::Lock() { -#if !defined(WEBRTC_WIN) && \ - (!defined(WEBRTC_MAC) || RTC_USE_NATIVE_MUTEX_ON_MAC) - const struct timespec ts_null = {0}; -#endif - - while (AtomicOps::CompareAndSwap(&lock_acquired_, 0, 1)) { -#if defined(WEBRTC_WIN) - ::Sleep(0); -#elif defined(WEBRTC_MAC) && !RTC_USE_NATIVE_MUTEX_ON_MAC - sched_yield(); -#else - nanosleep(&ts_null, nullptr); -#endif - } -} - -void GlobalLock::Unlock() { - int old_value = AtomicOps::CompareAndSwap(&lock_acquired_, 1, 0); - RTC_DCHECK_EQ(1, old_value) << "Unlock called without calling Lock first"; -} - -GlobalLockScope::GlobalLockScope(GlobalLock* lock) : lock_(lock) { - lock_->Lock(); -} - -GlobalLockScope::~GlobalLockScope() { - lock_->Unlock(); -} - } // namespace rtc diff --git a/rtc_base/critical_section.h b/rtc_base/deprecated/recursive_critical_section.h index cf10463bdf..c044c732b9 100644 --- a/rtc_base/critical_section.h +++ b/rtc_base/deprecated/recursive_critical_section.h @@ -8,13 +8,11 @@ * be found in the AUTHORS file in the root of the source tree. */ -#ifndef RTC_BASE_CRITICAL_SECTION_H_ -#define RTC_BASE_CRITICAL_SECTION_H_ +#ifndef RTC_BASE_DEPRECATED_RECURSIVE_CRITICAL_SECTION_H_ +#define RTC_BASE_DEPRECATED_RECURSIVE_CRITICAL_SECTION_H_ -#include "rtc_base/checks.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/platform_thread_types.h" -#include "rtc_base/system/rtc_export.h" #include "rtc_base/thread_annotations.h" #if defined(WEBRTC_WIN) @@ -43,13 +41,18 @@ namespace rtc { +// NOTE: This class is deprecated. Please use webrtc::Mutex instead! +// Search using https://www.google.com/?q=recursive+lock+considered+harmful +// to find the reasons. +// // Locking methods (Enter, TryEnter, Leave)are const to permit protecting -// members inside a const context without requiring mutable CriticalSections -// everywhere. CriticalSection is reentrant lock. -class RTC_LOCKABLE RTC_EXPORT CriticalSection { +// members inside a const context without requiring mutable +// RecursiveCriticalSections everywhere. RecursiveCriticalSection is +// reentrant lock. +class RTC_LOCKABLE RecursiveCriticalSection { public: - CriticalSection(); - ~CriticalSection(); + RecursiveCriticalSection(); + ~RecursiveCriticalSection(); void Enter() const RTC_EXCLUSIVE_LOCK_FUNCTION(); bool TryEnter() const RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true); @@ -87,37 +90,15 @@ class RTC_LOCKABLE RTC_EXPORT CriticalSection { // CritScope, for serializing execution through a scope. class RTC_SCOPED_LOCKABLE CritScope { public: - explicit CritScope(const CriticalSection* cs) RTC_EXCLUSIVE_LOCK_FUNCTION(cs); + explicit CritScope(const RecursiveCriticalSection* cs) + RTC_EXCLUSIVE_LOCK_FUNCTION(cs); ~CritScope() RTC_UNLOCK_FUNCTION(); private: - const CriticalSection* const cs_; + const RecursiveCriticalSection* const cs_; RTC_DISALLOW_COPY_AND_ASSIGN(CritScope); }; -// A lock used to protect global variables. Do NOT use for other purposes. -class RTC_LOCKABLE GlobalLock { - public: - constexpr GlobalLock() : lock_acquired_(0) {} - - void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION(); - void Unlock() RTC_UNLOCK_FUNCTION(); - - private: - volatile int lock_acquired_; -}; - -// GlobalLockScope, for serializing execution through a scope. -class RTC_SCOPED_LOCKABLE GlobalLockScope { - public: - explicit GlobalLockScope(GlobalLock* lock) RTC_EXCLUSIVE_LOCK_FUNCTION(lock); - ~GlobalLockScope() RTC_UNLOCK_FUNCTION(); - - private: - GlobalLock* const lock_; - RTC_DISALLOW_COPY_AND_ASSIGN(GlobalLockScope); -}; - } // namespace rtc -#endif // RTC_BASE_CRITICAL_SECTION_H_ +#endif // RTC_BASE_DEPRECATED_RECURSIVE_CRITICAL_SECTION_H_ diff --git a/rtc_base/critical_section_unittest.cc b/rtc_base/deprecated/recursive_critical_section_unittest.cc index 16aefd2740..22c2655b3d 100644 --- a/rtc_base/critical_section_unittest.cc +++ b/rtc_base/deprecated/recursive_critical_section_unittest.cc @@ -8,7 +8,7 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "rtc_base/critical_section.h" +#include "rtc_base/deprecated/recursive_critical_section.h" #include <stddef.h> #include <stdint.h> @@ -124,7 +124,7 @@ class RTC_LOCKABLE CriticalSectionLock { void Unlock() RTC_UNLOCK_FUNCTION() { cs_.Leave(); } private: - CriticalSection cs_; + RecursiveCriticalSection cs_; }; template <class Lock> @@ -183,7 +183,7 @@ class AtomicOpRunner : public RunnerBase { } private: - CriticalSection all_values_crit_; + RecursiveCriticalSection all_values_crit_; Verifier verifier_; }; @@ -282,26 +282,7 @@ TEST(AtomicOpsTest, CompareAndSwap) { EXPECT_EQ(1, runner.shared_value()); } -TEST(GlobalLockTest, CanHaveStaticStorageDuration) { - static_assert(std::is_trivially_destructible<GlobalLock>::value, ""); - ABSL_CONST_INIT static GlobalLock global_lock; - global_lock.Lock(); - global_lock.Unlock(); -} - -TEST(GlobalLockTest, Basic) { - // Create and start lots of threads. - LockRunner<GlobalLock> runner; - std::vector<std::unique_ptr<Thread>> threads; - StartThreads(&threads, &runner); - runner.SetExpectedThreadCount(kNumThreads); - - // Release the hounds! - EXPECT_TRUE(runner.Run()); - EXPECT_EQ(0, runner.shared_value()); -} - -TEST(CriticalSectionTest, Basic) { +TEST(RecursiveCriticalSectionTest, Basic) { // Create and start lots of threads. LockRunner<CriticalSectionLock> runner; std::vector<std::unique_ptr<Thread>> threads; @@ -339,7 +320,7 @@ class PerfTestData { private: uint8_t cache_line_barrier_1_[64]; - CriticalSection lock_; + RecursiveCriticalSection lock_; uint8_t cache_line_barrier_2_[64]; int64_t my_counter_ = 0; const int expected_count_; @@ -391,7 +372,7 @@ class PerfTestThread { // user 1m20.575s // sys 3m48.872s // Unit test output: -// [ OK ] CriticalSectionTest.Performance (294375 ms) +// [ OK ] RecursiveCriticalSectionTest.Performance (294375 ms) // // Native mutex implementation using first fit policy (current macOS default): // Approximate CPU usage: @@ -399,7 +380,7 @@ class PerfTestThread { // user 0m12.738s // sys 0m31.207s // Unit test output: -// [ OK ] CriticalSectionTest.Performance (11444 ms) +// [ OK ] RecursiveCriticalSectionTest.Performance (11444 ms) // // Special partially spin lock based implementation: // Approximate CPU usage: @@ -407,10 +388,10 @@ class PerfTestThread { // user 0m3.014s // sys 0m4.495s // Unit test output: -// [ OK ] CriticalSectionTest.Performance (1885 ms) +// [ OK ] RecursiveCriticalSectionTest.Performance (1885 ms) // // The test is disabled by default to avoid unecessarily loading the bots. -TEST(CriticalSectionTest, DISABLED_Performance) { +TEST(RecursiveCriticalSectionTest, DISABLED_Performance) { PerfTestThread threads[8]; Event event; diff --git a/rtc_base/signal_thread.cc b/rtc_base/deprecated/signal_thread.cc index e100fbe179..96bdd65155 100644 --- a/rtc_base/signal_thread.cc +++ b/rtc_base/deprecated/signal_thread.cc @@ -8,7 +8,7 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "rtc_base/signal_thread.h" +#include "rtc_base/deprecated/signal_thread.h" #include <memory> @@ -23,26 +23,30 @@ namespace rtc { // SignalThread /////////////////////////////////////////////////////////////////////////////// -SignalThread::SignalThread() +DEPRECATED_SignalThread::DEPRECATED_SignalThread() : main_(Thread::Current()), worker_(this), state_(kInit), refcount_(1) { - main_->SignalQueueDestroyed.connect(this, - &SignalThread::OnMainThreadDestroyed); + main_->SignalQueueDestroyed.connect( + this, &DEPRECATED_SignalThread::OnMainThreadDestroyed); worker_.SetName("SignalThread", this); } -SignalThread::~SignalThread() { +DEPRECATED_SignalThread::~DEPRECATED_SignalThread() { + rtc::CritScope lock(&cs_); RTC_DCHECK(refcount_ == 0); } -bool SignalThread::SetName(const std::string& name, const void* obj) { +bool DEPRECATED_SignalThread::SetName(const std::string& name, + const void* obj) { EnterExit ee(this); + RTC_DCHECK(!destroy_called_); RTC_DCHECK(main_->IsCurrent()); RTC_DCHECK(kInit == state_); return worker_.SetName(name, obj); } -void SignalThread::Start() { +void DEPRECATED_SignalThread::Start() { EnterExit ee(this); + RTC_DCHECK(!destroy_called_); RTC_DCHECK(main_->IsCurrent()); if (kInit == state_ || kComplete == state_) { state_ = kRunning; @@ -53,9 +57,13 @@ void SignalThread::Start() { } } -void SignalThread::Destroy(bool wait) { +void DEPRECATED_SignalThread::Destroy(bool wait) { EnterExit ee(this); - RTC_DCHECK(main_->IsCurrent()); + // Sometimes the caller can't guarantee which thread will call Destroy, only + // that it will be the last thing it does. + // RTC_DCHECK(main_->IsCurrent()); + RTC_DCHECK(!destroy_called_); + destroy_called_ = true; if ((kInit == state_) || (kComplete == state_)) { refcount_--; } else if (kRunning == state_ || kReleasing == state_) { @@ -76,8 +84,9 @@ void SignalThread::Destroy(bool wait) { } } -void SignalThread::Release() { +void DEPRECATED_SignalThread::Release() { EnterExit ee(this); + RTC_DCHECK(!destroy_called_); RTC_DCHECK(main_->IsCurrent()); if (kComplete == state_) { refcount_--; @@ -89,13 +98,14 @@ void SignalThread::Release() { } } -bool SignalThread::ContinueWork() { +bool DEPRECATED_SignalThread::ContinueWork() { EnterExit ee(this); + RTC_DCHECK(!destroy_called_); RTC_DCHECK(worker_.IsCurrent()); return worker_.ProcessMessages(0); } -void SignalThread::OnMessage(Message* msg) { +void DEPRECATED_SignalThread::OnMessage(Message* msg) { EnterExit ee(this); if (ST_MSG_WORKER_DONE == msg->message_id) { RTC_DCHECK(main_->IsCurrent()); @@ -126,21 +136,21 @@ void SignalThread::OnMessage(Message* msg) { } } -SignalThread::Worker::Worker(SignalThread* parent) +DEPRECATED_SignalThread::Worker::Worker(DEPRECATED_SignalThread* parent) : Thread(std::make_unique<NullSocketServer>(), /*do_init=*/false), parent_(parent) { DoInit(); } -SignalThread::Worker::~Worker() { +DEPRECATED_SignalThread::Worker::~Worker() { Stop(); } -void SignalThread::Worker::Run() { +void DEPRECATED_SignalThread::Worker::Run() { parent_->Run(); } -void SignalThread::Run() { +void DEPRECATED_SignalThread::Run() { DoWork(); { EnterExit ee(this); @@ -150,12 +160,12 @@ void SignalThread::Run() { } } -void SignalThread::OnMainThreadDestroyed() { +void DEPRECATED_SignalThread::OnMainThreadDestroyed() { EnterExit ee(this); main_ = nullptr; } -bool SignalThread::Worker::IsProcessingMessagesForTesting() { +bool DEPRECATED_SignalThread::Worker::IsProcessingMessagesForTesting() { return false; } diff --git a/rtc_base/deprecated/signal_thread.h b/rtc_base/deprecated/signal_thread.h new file mode 100644 index 0000000000..3612f5a1ca --- /dev/null +++ b/rtc_base/deprecated/signal_thread.h @@ -0,0 +1,166 @@ +/* + * Copyright 2004 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_DEPRECATED_SIGNAL_THREAD_H_ +#define RTC_BASE_DEPRECATED_SIGNAL_THREAD_H_ + +#include <string> + +#include "rtc_base/checks.h" +#include "rtc_base/constructor_magic.h" +#include "rtc_base/deprecated/recursive_critical_section.h" +#include "rtc_base/deprecation.h" +#include "rtc_base/message_handler.h" +#include "rtc_base/third_party/sigslot/sigslot.h" +#include "rtc_base/thread.h" +#include "rtc_base/thread_annotations.h" + +namespace rtc { + +/////////////////////////////////////////////////////////////////////////////// +// NOTE: this class has been deprecated. Do not use for new code. New code +// should use factilities exposed by api/task_queue/ instead. +// +// SignalThread - Base class for worker threads. The main thread should call +// Start() to begin work, and then follow one of these models: +// Normal: Wait for SignalWorkDone, and then call Release to destroy. +// Cancellation: Call Release(true), to abort the worker thread. +// Fire-and-forget: Call Release(false), which allows the thread to run to +// completion, and then self-destruct without further notification. +// Periodic tasks: Wait for SignalWorkDone, then eventually call Start() +// again to repeat the task. When the instance isn't needed anymore, +// call Release. DoWork, OnWorkStart and OnWorkStop are called again, +// on a new thread. +// The subclass should override DoWork() to perform the background task. By +// periodically calling ContinueWork(), it can check for cancellation. +// OnWorkStart and OnWorkDone can be overridden to do pre- or post-work +// tasks in the context of the main thread. +/////////////////////////////////////////////////////////////////////////////// + +class DEPRECATED_SignalThread : public sigslot::has_slots<>, + protected MessageHandler { + public: + DEPRECATED_SignalThread(); + + // Context: Main Thread. Call before Start to change the worker's name. + bool SetName(const std::string& name, const void* obj); + + // Context: Main Thread. Call to begin the worker thread. + void Start(); + + // Context: Main Thread. If the worker thread is not running, deletes the + // object immediately. Otherwise, asks the worker thread to abort processing, + // and schedules the object to be deleted once the worker exits. + // SignalWorkDone will not be signalled. If wait is true, does not return + // until the thread is deleted. + void Destroy(bool wait); + + // Context: Main Thread. If the worker thread is complete, deletes the + // object immediately. Otherwise, schedules the object to be deleted once + // the worker thread completes. SignalWorkDone will be signalled. + void Release(); + + // Context: Main Thread. Signalled when work is complete. + sigslot::signal1<DEPRECATED_SignalThread*> SignalWorkDone; + + enum { ST_MSG_WORKER_DONE, ST_MSG_FIRST_AVAILABLE }; + + protected: + ~DEPRECATED_SignalThread() override; + + Thread* worker() { return &worker_; } + + // Context: Main Thread. Subclass should override to do pre-work setup. + virtual void OnWorkStart() {} + + // Context: Worker Thread. Subclass should override to do work. + virtual void DoWork() = 0; + + // Context: Worker Thread. Subclass should call periodically to + // dispatch messages and determine if the thread should terminate. + bool ContinueWork(); + + // Context: Worker Thread. Subclass should override when extra work is + // needed to abort the worker thread. + virtual void OnWorkStop() {} + + // Context: Main Thread. Subclass should override to do post-work cleanup. + virtual void OnWorkDone() {} + + // Context: Any Thread. If subclass overrides, be sure to call the base + // implementation. Do not use (message_id < ST_MSG_FIRST_AVAILABLE) + void OnMessage(Message* msg) override; + + private: + enum State { + kInit, // Initialized, but not started + kRunning, // Started and doing work + kReleasing, // Same as running, but to be deleted when work is done + kComplete, // Work is done + kStopping, // Work is being interrupted + }; + + class Worker : public Thread { + public: + explicit Worker(DEPRECATED_SignalThread* parent); + ~Worker() override; + void Run() override; + bool IsProcessingMessagesForTesting() override; + + private: + DEPRECATED_SignalThread* parent_; + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(Worker); + }; + + class RTC_SCOPED_LOCKABLE EnterExit { + public: + explicit EnterExit(DEPRECATED_SignalThread* t) + RTC_EXCLUSIVE_LOCK_FUNCTION(t->cs_) + : t_(t) { + t_->cs_.Enter(); + // If refcount_ is zero then the object has already been deleted and we + // will be double-deleting it in ~EnterExit()! (shouldn't happen) + RTC_DCHECK_NE(0, t_->refcount_); + ++t_->refcount_; + } + ~EnterExit() RTC_UNLOCK_FUNCTION() { + bool d = (0 == --t_->refcount_); + t_->cs_.Leave(); + if (d) + delete t_; + } + + private: + DEPRECATED_SignalThread* t_; + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(EnterExit); + }; + + void Run(); + void OnMainThreadDestroyed(); + + Thread* main_; + Worker worker_; + RecursiveCriticalSection cs_; + State state_ RTC_GUARDED_BY(cs_); + int refcount_ RTC_GUARDED_BY(cs_); + bool destroy_called_ RTC_GUARDED_BY(cs_) = false; + + RTC_DISALLOW_COPY_AND_ASSIGN(DEPRECATED_SignalThread); +}; + +typedef RTC_DEPRECATED DEPRECATED_SignalThread SignalThread; + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace rtc + +#endif // RTC_BASE_DEPRECATED_SIGNAL_THREAD_H_ diff --git a/rtc_base/signal_thread_unittest.cc b/rtc_base/deprecated/signal_thread_unittest.cc index 14761865b8..f5a49aad63 100644 --- a/rtc_base/signal_thread_unittest.cc +++ b/rtc_base/deprecated/signal_thread_unittest.cc @@ -13,9 +13,9 @@ #include <memory> #include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" #include "rtc_base/gunit.h" #include "rtc_base/null_socket_server.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread.h" #include "rtc_base/thread_annotations.h" #include "test/gtest.h" @@ -28,9 +28,9 @@ static const int kTimeout = 10000; class SignalThreadTest : public ::testing::Test, public sigslot::has_slots<> { public: - class SlowSignalThread : public SignalThread { + class SlowSignalThread : public DEPRECATED_SignalThread { public: - SlowSignalThread(SignalThreadTest* harness) : harness_(harness) {} + explicit SlowSignalThread(SignalThreadTest* harness) : harness_(harness) {} ~SlowSignalThread() override { EXPECT_EQ(harness_->main_thread_, Thread::Current()); @@ -70,7 +70,7 @@ class SignalThreadTest : public ::testing::Test, public sigslot::has_slots<> { RTC_DISALLOW_COPY_AND_ASSIGN(SlowSignalThread); }; - void OnWorkComplete(rtc::SignalThread* thread) { + void OnWorkComplete(rtc::DEPRECATED_SignalThread* thread) { SlowSignalThread* t = static_cast<SlowSignalThread*>(thread); EXPECT_EQ(t->harness(), this); EXPECT_EQ(main_thread_, Thread::Current()); @@ -148,23 +148,23 @@ class OwnerThread : public Thread, public sigslot::has_slots<> { // Delete |signal_thread|. signal_thread->Destroy(true); { - rtc::CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); has_run_ = true; } } bool has_run() { - rtc::CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); return has_run_; } - void OnWorkDone(SignalThread* /*signal_thread*/) { + void OnWorkDone(DEPRECATED_SignalThread* /*signal_thread*/) { FAIL() << " This shouldn't get called."; } private: - rtc::CriticalSection crit_; + webrtc::Mutex mutex_; SignalThreadTest* harness_; - bool has_run_ RTC_GUARDED_BY(crit_); + bool has_run_ RTC_GUARDED_BY(mutex_); RTC_DISALLOW_COPY_AND_ASSIGN(OwnerThread); }; diff --git a/rtc_base/event_tracer.cc b/rtc_base/event_tracer.cc index d23af21421..3af8183b1f 100644 --- a/rtc_base/event_tracer.cc +++ b/rtc_base/event_tracer.cc @@ -19,11 +19,11 @@ #include "rtc_base/atomic_ops.h" #include "rtc_base/checks.h" -#include "rtc_base/critical_section.h" #include "rtc_base/event.h" #include "rtc_base/logging.h" #include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread_types.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/thread_checker.h" #include "rtc_base/time_utils.h" @@ -120,7 +120,7 @@ class EventLogger final { arg.value.as_string = str_copy; } } - rtc::CritScope lock(&crit_); + webrtc::MutexLock lock(&mutex_); trace_events_.push_back( {name, category_enabled, phase, args, timestamp, 1, thread_id}); } @@ -136,7 +136,7 @@ class EventLogger final { bool shutting_down = shutdown_event_.Wait(kLoggingIntervalMs); std::vector<TraceEvent> events; { - rtc::CritScope lock(&crit_); + webrtc::MutexLock lock(&mutex_); trace_events_.swap(events); } std::string args_str; @@ -196,7 +196,7 @@ class EventLogger final { output_file_ = file; output_file_owned_ = owned; { - rtc::CritScope lock(&crit_); + webrtc::MutexLock lock(&mutex_); // Since the atomic fast-path for adding events to the queue can be // bypassed while the logging thread is shutting down there may be some // stale events in the queue, hence the vector needs to be cleared to not @@ -317,8 +317,8 @@ class EventLogger final { return output; } - rtc::CriticalSection crit_; - std::vector<TraceEvent> trace_events_ RTC_GUARDED_BY(crit_); + webrtc::Mutex mutex_; + std::vector<TraceEvent> trace_events_ RTC_GUARDED_BY(mutex_); rtc::PlatformThread logging_thread_; rtc::Event shutdown_event_; rtc::ThreadChecker thread_checker_; diff --git a/rtc_base/event_tracer_unittest.cc b/rtc_base/event_tracer_unittest.cc index 79cc9c0788..f4d41e4e7c 100644 --- a/rtc_base/event_tracer_unittest.cc +++ b/rtc_base/event_tracer_unittest.cc @@ -10,7 +10,7 @@ #include "rtc_base/event_tracer.h" -#include "rtc_base/critical_section.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/trace_event.h" #include "test/gtest.h" @@ -20,17 +20,17 @@ namespace { class TestStatistics { public: void Reset() { - rtc::CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); events_logged_ = 0; } void Increment() { - rtc::CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); ++events_logged_; } int Count() const { - rtc::CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); return events_logged_; } @@ -41,8 +41,8 @@ class TestStatistics { } private: - rtc::CriticalSection crit_; - int events_logged_ RTC_GUARDED_BY(crit_) = 0; + mutable webrtc::Mutex mutex_; + int events_logged_ RTC_GUARDED_BY(mutex_) = 0; }; } // namespace diff --git a/rtc_base/experiments/BUILD.gn b/rtc_base/experiments/BUILD.gn index bb3e0ce8ae..282b5b9270 100644 --- a/rtc_base/experiments/BUILD.gn +++ b/rtc_base/experiments/BUILD.gn @@ -17,8 +17,8 @@ rtc_library("alr_experiment") { "../:rtc_base_approved", "../../api/transport:field_trial_based_config", "../../api/transport:webrtc_key_value_config", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("field_trial_parser") { @@ -40,6 +40,8 @@ rtc_library("field_trial_parser") { "../../rtc_base:logging", "../../rtc_base:safe_conversions", "../../rtc_base:stringutils", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings:strings", "//third_party/abseil-cpp/absl/types:optional", @@ -57,8 +59,8 @@ rtc_library("quality_rampup_experiment") { "../../api/transport:field_trial_based_config", "../../api/transport:webrtc_key_value_config", "../../system_wrappers:field_trial", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("quality_scaler_settings") { @@ -72,8 +74,8 @@ rtc_library("quality_scaler_settings") { "../../api/transport:field_trial_based_config", "../../api/transport:webrtc_key_value_config", "../../system_wrappers:field_trial", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("quality_scaling_experiment") { @@ -85,8 +87,8 @@ rtc_library("quality_scaling_experiment") { "../:rtc_base_approved", "../../api/video_codecs:video_codecs_api", "../../system_wrappers:field_trial", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("normalize_simulcast_size_experiment") { @@ -97,8 +99,8 @@ rtc_library("normalize_simulcast_size_experiment") { deps = [ "../:rtc_base_approved", "../../system_wrappers:field_trial", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("balanced_degradation_settings") { @@ -111,8 +113,8 @@ rtc_library("balanced_degradation_settings") { "../:rtc_base_approved", "../../api/video_codecs:video_codecs_api", "../../system_wrappers:field_trial", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("cpu_speed_experiment") { @@ -123,8 +125,8 @@ rtc_library("cpu_speed_experiment") { deps = [ "../:rtc_base_approved", "../../system_wrappers:field_trial", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("rtt_mult_experiment") { @@ -135,8 +137,8 @@ rtc_library("rtt_mult_experiment") { deps = [ "../:rtc_base_approved", "../../system_wrappers:field_trial", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("jitter_upper_bound_experiment") { @@ -147,8 +149,8 @@ rtc_library("jitter_upper_bound_experiment") { deps = [ "../:rtc_base_approved", "../../system_wrappers:field_trial", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("rate_control_settings") { @@ -164,6 +166,8 @@ rtc_library("rate_control_settings") { "../../api/units:data_size", "../../api/video_codecs:video_codecs_api", "../../system_wrappers:field_trial", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/types:optional", ] @@ -178,8 +182,8 @@ rtc_library("keyframe_interval_settings_experiment") { ":field_trial_parser", "../../api/transport:field_trial_based_config", "../../api/transport:webrtc_key_value_config", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("stable_target_rate_experiment") { @@ -192,8 +196,8 @@ rtc_library("stable_target_rate_experiment") { ":rate_control_settings", "../../api/transport:field_trial_based_config", "../../api/transport:webrtc_key_value_config", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("min_video_bitrate_experiment") { @@ -208,8 +212,8 @@ rtc_library("min_video_bitrate_experiment") { "../../rtc_base:checks", "../../rtc_base:logging", "../../system_wrappers:field_trial", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } if (rtc_include_tests) { @@ -255,7 +259,7 @@ if (rtc_include_tests) { "../../test:field_trial", "../../test:test_main", "../../test:test_support", - "//third_party/abseil-cpp/absl/types:optional", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } } diff --git a/rtc_base/experiments/quality_rampup_experiment.cc b/rtc_base/experiments/quality_rampup_experiment.cc index caf7e62368..ee6675c924 100644 --- a/rtc_base/experiments/quality_rampup_experiment.cc +++ b/rtc_base/experiments/quality_rampup_experiment.cc @@ -70,4 +70,8 @@ bool QualityRampupExperiment::BwHigh(int64_t now_ms, return (now_ms - *start_ms_) >= min_duration_ms_.Value(); } +bool QualityRampupExperiment::Enabled() const { + return min_pixels_ || min_duration_ms_ || max_bitrate_kbps_; +} + } // namespace webrtc diff --git a/rtc_base/experiments/quality_rampup_experiment.h b/rtc_base/experiments/quality_rampup_experiment.h index ff9d7d38e5..9d46901104 100644 --- a/rtc_base/experiments/quality_rampup_experiment.h +++ b/rtc_base/experiments/quality_rampup_experiment.h @@ -33,6 +33,8 @@ class QualityRampupExperiment final { // (max_bitrate_factor_) above |max_bitrate_kbps_| for |min_duration_ms_|. bool BwHigh(int64_t now_ms, uint32_t available_bw_kbps); + bool Enabled() const; + private: explicit QualityRampupExperiment( const WebRtcKeyValueConfig* const key_value_config); diff --git a/rtc_base/fake_clock.cc b/rtc_base/fake_clock.cc index e242e8e659..652a5afa3a 100644 --- a/rtc_base/fake_clock.cc +++ b/rtc_base/fake_clock.cc @@ -16,18 +16,18 @@ namespace rtc { int64_t FakeClock::TimeNanos() const { - CritScope cs(&lock_); + webrtc::MutexLock lock(&lock_); return time_ns_; } void FakeClock::SetTime(webrtc::Timestamp new_time) { - CritScope cs(&lock_); + webrtc::MutexLock lock(&lock_); RTC_DCHECK(new_time.us() * 1000 >= time_ns_); time_ns_ = new_time.us() * 1000; } void FakeClock::AdvanceTime(webrtc::TimeDelta delta) { - CritScope cs(&lock_); + webrtc::MutexLock lock(&lock_); time_ns_ += delta.ns(); } diff --git a/rtc_base/fake_clock.h b/rtc_base/fake_clock.h index 0ab9a937a8..edb507becb 100644 --- a/rtc_base/fake_clock.h +++ b/rtc_base/fake_clock.h @@ -15,7 +15,7 @@ #include "api/units/time_delta.h" #include "api/units/timestamp.h" -#include "rtc_base/critical_section.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/time_utils.h" @@ -43,7 +43,7 @@ class FakeClock : public ClockInterface { void AdvanceTime(webrtc::TimeDelta delta); private: - CriticalSection lock_; + mutable webrtc::Mutex lock_; int64_t time_ns_ RTC_GUARDED_BY(lock_) = 0; }; diff --git a/rtc_base/firewall_socket_server.cc b/rtc_base/firewall_socket_server.cc index fc7917613c..8f44753760 100644 --- a/rtc_base/firewall_socket_server.cc +++ b/rtc_base/firewall_socket_server.cc @@ -163,19 +163,19 @@ void FirewallSocketServer::AddRule(bool allow, r.p = p; r.src = src; r.dst = dst; - CritScope scope(&crit_); + webrtc::MutexLock scope(&mutex_); rules_.push_back(r); } void FirewallSocketServer::ClearRules() { - CritScope scope(&crit_); + webrtc::MutexLock scope(&mutex_); rules_.clear(); } bool FirewallSocketServer::Check(FirewallProtocol p, const SocketAddress& src, const SocketAddress& dst) { - CritScope scope(&crit_); + webrtc::MutexLock scope(&mutex_); for (size_t i = 0; i < rules_.size(); ++i) { const Rule& r = rules_[i]; if ((r.p != p) && (r.p != FP_ANY)) @@ -239,12 +239,12 @@ FirewallManager::~FirewallManager() { } void FirewallManager::AddServer(FirewallSocketServer* server) { - CritScope scope(&crit_); + webrtc::MutexLock scope(&mutex_); servers_.push_back(server); } void FirewallManager::RemoveServer(FirewallSocketServer* server) { - CritScope scope(&crit_); + webrtc::MutexLock scope(&mutex_); servers_.erase(std::remove(servers_.begin(), servers_.end(), server), servers_.end()); } @@ -253,7 +253,7 @@ void FirewallManager::AddRule(bool allow, FirewallProtocol p, FirewallDirection d, const SocketAddress& addr) { - CritScope scope(&crit_); + webrtc::MutexLock scope(&mutex_); for (std::vector<FirewallSocketServer*>::const_iterator it = servers_.begin(); it != servers_.end(); ++it) { (*it)->AddRule(allow, p, d, addr); @@ -261,7 +261,7 @@ void FirewallManager::AddRule(bool allow, } void FirewallManager::ClearRules() { - CritScope scope(&crit_); + webrtc::MutexLock scope(&mutex_); for (std::vector<FirewallSocketServer*>::const_iterator it = servers_.begin(); it != servers_.end(); ++it) { (*it)->ClearRules(); diff --git a/rtc_base/firewall_socket_server.h b/rtc_base/firewall_socket_server.h index d174033e01..23b91d6ad3 100644 --- a/rtc_base/firewall_socket_server.h +++ b/rtc_base/firewall_socket_server.h @@ -14,11 +14,11 @@ #include <vector> #include "rtc_base/async_socket.h" -#include "rtc_base/critical_section.h" #include "rtc_base/ip_address.h" #include "rtc_base/socket.h" #include "rtc_base/socket_address.h" #include "rtc_base/socket_server.h" +#include "rtc_base/synchronization/mutex.h" namespace rtc { @@ -90,7 +90,7 @@ class FirewallSocketServer : public SocketServer { private: SocketServer* server_; FirewallManager* manager_; - CriticalSection crit_; + webrtc::Mutex mutex_; struct Rule { bool allow; FirewallProtocol p; @@ -123,7 +123,7 @@ class FirewallManager { void ClearRules(); private: - CriticalSection crit_; + webrtc::Mutex mutex_; std::vector<FirewallSocketServer*> servers_; }; diff --git a/rtc_base/logging.cc b/rtc_base/logging.cc index ff7369dd5c..d07a7e75e7 100644 --- a/rtc_base/logging.cc +++ b/rtc_base/logging.cc @@ -42,11 +42,11 @@ static const int kMaxLogLineSize = 1024 - 60; #include "absl/base/attributes.h" #include "rtc_base/checks.h" -#include "rtc_base/critical_section.h" #include "rtc_base/platform_thread_types.h" #include "rtc_base/string_encode.h" #include "rtc_base/string_utils.h" #include "rtc_base/strings/string_builder.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/time_utils.h" @@ -72,7 +72,9 @@ const char* FilenameFromPath(const char* file) { } // Global lock for log subsystem, only needed to serialize access to streams_. -CriticalSection g_log_crit; +// TODO(bugs.webrtc.org/11665): this is not currently constant initialized and +// trivially destructible. +webrtc::Mutex g_log_mutex_; } // namespace ///////////////////////////////////////////////////////////////////////////// @@ -85,8 +87,9 @@ bool LogMessage::log_to_stderr_ = true; // Note: we explicitly do not clean this up, because of the uncertain ordering // of destructors at program exit. Let the person who sets the stream trigger // cleanup by setting to null, or let it leak (safe at program exit). -ABSL_CONST_INIT LogSink* LogMessage::streams_ RTC_GUARDED_BY(g_log_crit) = +ABSL_CONST_INIT LogSink* LogMessage::streams_ RTC_GUARDED_BY(g_log_mutex_) = nullptr; +ABSL_CONST_INIT std::atomic<bool> LogMessage::streams_empty_ = {true}; // Boolean options default to false (0) bool LogMessage::thread_, LogMessage::timestamp_; @@ -193,7 +196,7 @@ LogMessage::~LogMessage() { #endif } - CritScope cs(&g_log_crit); + webrtc::MutexLock lock(&g_log_mutex_); for (LogSink* entry = streams_; entry != nullptr; entry = entry->next_) { if (severity_ >= entry->min_severity_) { #if defined(WEBRTC_ANDROID) @@ -242,7 +245,7 @@ void LogMessage::LogTimestamps(bool on) { void LogMessage::LogToDebug(LoggingSeverity min_sev) { g_dbg_sev = min_sev; - CritScope cs(&g_log_crit); + webrtc::MutexLock lock(&g_log_mutex_); UpdateMinLogSeverity(); } @@ -251,7 +254,7 @@ void LogMessage::SetLogToStderr(bool log_to_stderr) { } int LogMessage::GetLogToStream(LogSink* stream) { - CritScope cs(&g_log_crit); + webrtc::MutexLock lock(&g_log_mutex_); LoggingSeverity sev = LS_NONE; for (LogSink* entry = streams_; entry != nullptr; entry = entry->next_) { if (stream == nullptr || stream == entry) { @@ -262,15 +265,16 @@ int LogMessage::GetLogToStream(LogSink* stream) { } void LogMessage::AddLogToStream(LogSink* stream, LoggingSeverity min_sev) { - CritScope cs(&g_log_crit); + webrtc::MutexLock lock(&g_log_mutex_); stream->min_severity_ = min_sev; stream->next_ = streams_; streams_ = stream; + streams_empty_.store(false, std::memory_order_relaxed); UpdateMinLogSeverity(); } void LogMessage::RemoveLogToStream(LogSink* stream) { - CritScope cs(&g_log_crit); + webrtc::MutexLock lock(&g_log_mutex_); for (LogSink** entry = &streams_; *entry != nullptr; entry = &(*entry)->next_) { if (*entry == stream) { @@ -278,6 +282,7 @@ void LogMessage::RemoveLogToStream(LogSink* stream) { break; } } + streams_empty_.store(streams_ == nullptr, std::memory_order_relaxed); UpdateMinLogSeverity(); } @@ -331,7 +336,7 @@ void LogMessage::ConfigureLogging(const char* params) { } void LogMessage::UpdateMinLogSeverity() - RTC_EXCLUSIVE_LOCKS_REQUIRED(g_log_crit) { + RTC_EXCLUSIVE_LOCKS_REQUIRED(g_log_mutex_) { LoggingSeverity min_sev = g_dbg_sev; for (LogSink* entry = streams_; entry != nullptr; entry = entry->next_) { min_sev = std::min(min_sev, entry->min_severity_); @@ -435,12 +440,7 @@ void LogMessage::OutputToDebug(const std::string& str, bool LogMessage::IsNoop(LoggingSeverity severity) { if (severity >= g_dbg_sev || severity >= g_min_sev) return false; - - // TODO(tommi): We're grabbing this lock for every LogMessage instance that - // is going to be logged. This introduces unnecessary synchronization for - // a feature that's mostly used for testing. - CritScope cs(&g_log_crit); - return streams_ == nullptr; + return streams_empty_.load(std::memory_order_relaxed); } void LogMessage::FinishPrintStream() { diff --git a/rtc_base/logging.h b/rtc_base/logging.h index 0aa1e676d1..0852c06182 100644 --- a/rtc_base/logging.h +++ b/rtc_base/logging.h @@ -46,6 +46,7 @@ #include <errno.h> +#include <atomic> #include <sstream> // no-presubmit-check TODO(webrtc:8982) #include <string> #include <utility> @@ -463,9 +464,14 @@ class LogMessage { static void SetLogToStderr(bool log_to_stderr); // Stream: Any non-blocking stream interface. // Installs the |stream| to collect logs with severtiy |min_sev| or higher. - // |stream| must live until deinstalled by RemoveLogToStream + // |stream| must live until deinstalled by RemoveLogToStream. + // If |stream| is the first stream added to the system, we might miss some + // early concurrent log statement happening from another thread happening near + // this instant. static void AddLogToStream(LogSink* stream, LoggingSeverity min_sev); - // Removes the specified stream, without destroying it. + // Removes the specified stream, without destroying it. When the method + // has completed, it's guaranteed that |stream| will receive no more logging + // calls. static void RemoveLogToStream(LogSink* stream); // Returns the severity for the specified stream, of if none is specified, // the minimum stream severity. @@ -557,6 +563,12 @@ class LogMessage { // The output streams and their associated severities static LogSink* streams_; + // Holds true with high probability if |streams_| is empty, false with high + // probability otherwise. Operated on with std::memory_order_relaxed because + // it's ok to lose or log some additional statements near the instant streams + // are added/removed. + static std::atomic<bool> streams_empty_; + // Flags for formatting options static bool thread_, timestamp_; diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn index aa905c6f70..5c3dd0a5d1 100644 --- a/rtc_base/memory/BUILD.gn +++ b/rtc_base/memory/BUILD.gn @@ -31,7 +31,10 @@ rtc_library("fifo_buffer") { "fifo_buffer.cc", "fifo_buffer.h", ] - deps = [ "..:rtc_base" ] + deps = [ + "..:rtc_base", + "../synchronization:mutex", + ] } rtc_library("unittests") { diff --git a/rtc_base/memory/fifo_buffer.cc b/rtc_base/memory/fifo_buffer.cc index 44fb032e57..49e926719f 100644 --- a/rtc_base/memory/fifo_buffer.cc +++ b/rtc_base/memory/fifo_buffer.cc @@ -39,13 +39,13 @@ FifoBuffer::FifoBuffer(size_t size, Thread* owner) FifoBuffer::~FifoBuffer() {} bool FifoBuffer::GetBuffered(size_t* size) const { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); *size = data_length_; return true; } bool FifoBuffer::SetCapacity(size_t size) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); if (data_length_ > size) { return false; } @@ -67,7 +67,7 @@ StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes, size_t offset, size_t* bytes_read) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); return ReadOffsetLocked(buffer, bytes, offset, bytes_read); } @@ -75,12 +75,12 @@ StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes, size_t offset, size_t* bytes_written) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); return WriteOffsetLocked(buffer, bytes, offset, bytes_written); } StreamState FifoBuffer::GetState() const { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); return state_; } @@ -88,7 +88,7 @@ StreamResult FifoBuffer::Read(void* buffer, size_t bytes, size_t* bytes_read, int* error) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); const bool was_writable = data_length_ < buffer_length_; size_t copy = 0; StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); @@ -114,7 +114,7 @@ StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, size_t* bytes_written, int* error) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); const bool was_readable = (data_length_ > 0); size_t copy = 0; @@ -136,12 +136,12 @@ StreamResult FifoBuffer::Write(const void* buffer, } void FifoBuffer::Close() { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); state_ = SS_CLOSED; } const void* FifoBuffer::GetReadData(size_t* size) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); *size = (read_position_ + data_length_ <= buffer_length_) ? data_length_ : buffer_length_ - read_position_; @@ -149,7 +149,7 @@ const void* FifoBuffer::GetReadData(size_t* size) { } void FifoBuffer::ConsumeReadData(size_t size) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); RTC_DCHECK(size <= data_length_); const bool was_writable = data_length_ < buffer_length_; read_position_ = (read_position_ + size) % buffer_length_; @@ -160,7 +160,7 @@ void FifoBuffer::ConsumeReadData(size_t size) { } void* FifoBuffer::GetWriteBuffer(size_t* size) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); if (state_ == SS_CLOSED) { return nullptr; } @@ -180,7 +180,7 @@ void* FifoBuffer::GetWriteBuffer(size_t* size) { } void FifoBuffer::ConsumeWriteBuffer(size_t size) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); RTC_DCHECK(size <= buffer_length_ - data_length_); const bool was_readable = (data_length_ > 0); data_length_ += size; @@ -190,7 +190,7 @@ void FifoBuffer::ConsumeWriteBuffer(size_t size) { } bool FifoBuffer::GetWriteRemaining(size_t* size) const { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); *size = buffer_length_ - data_length_; return true; } diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h index f859815c70..04c4cbf33b 100644 --- a/rtc_base/memory/fifo_buffer.h +++ b/rtc_base/memory/fifo_buffer.h @@ -14,6 +14,7 @@ #include <memory> #include "rtc_base/stream.h" +#include "rtc_base/synchronization/mutex.h" namespace rtc { @@ -103,7 +104,7 @@ class FifoBuffer final : public StreamInterface { size_t bytes, size_t offset, size_t* bytes_read) - RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); // Helper method that implements WriteOffset. Caller must acquire a lock // when calling this method. @@ -111,22 +112,22 @@ class FifoBuffer final : public StreamInterface { size_t bytes, size_t offset, size_t* bytes_written) - RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); + RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); // keeps the opened/closed state of the stream - StreamState state_ RTC_GUARDED_BY(crit_); + StreamState state_ RTC_GUARDED_BY(mutex_); // the allocated buffer - std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(crit_); + std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(mutex_); // size of the allocated buffer - size_t buffer_length_ RTC_GUARDED_BY(crit_); + size_t buffer_length_ RTC_GUARDED_BY(mutex_); // amount of readable data in the buffer - size_t data_length_ RTC_GUARDED_BY(crit_); + size_t data_length_ RTC_GUARDED_BY(mutex_); // offset to the readable data - size_t read_position_ RTC_GUARDED_BY(crit_); + size_t read_position_ RTC_GUARDED_BY(mutex_); // stream callbacks are dispatched on this thread Thread* owner_; // object lock - CriticalSection crit_; + mutable webrtc::Mutex mutex_; RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer); }; diff --git a/rtc_base/nat_server.cc b/rtc_base/nat_server.cc index 323a787ee0..725a57be9f 100644 --- a/rtc_base/nat_server.cc +++ b/rtc_base/nat_server.cc @@ -174,7 +174,7 @@ void NATServer::OnInternalUDPPacket(AsyncPacketSocket* socket, RTC_DCHECK(iter != int_map_->end()); // Allow the destination to send packets back to the source. - iter->second->WhitelistInsert(dest_addr); + iter->second->AllowlistInsert(dest_addr); // Send the packet to its intended destination. rtc::PacketOptions options; @@ -227,29 +227,29 @@ void NATServer::Translate(const SocketAddressPair& route) { bool NATServer::ShouldFilterOut(TransEntry* entry, const SocketAddress& ext_addr) { - return entry->WhitelistContains(ext_addr); + return entry->AllowlistContains(ext_addr); } NATServer::TransEntry::TransEntry(const SocketAddressPair& r, AsyncUDPSocket* s, NAT* nat) : route(r), socket(s) { - whitelist = new AddressSet(AddrCmp(nat)); + allowlist = new AddressSet(AddrCmp(nat)); } NATServer::TransEntry::~TransEntry() { - delete whitelist; + delete allowlist; delete socket; } -void NATServer::TransEntry::WhitelistInsert(const SocketAddress& addr) { - CritScope cs(&crit_); - whitelist->insert(addr); +void NATServer::TransEntry::AllowlistInsert(const SocketAddress& addr) { + webrtc::MutexLock lock(&mutex_); + allowlist->insert(addr); } -bool NATServer::TransEntry::WhitelistContains(const SocketAddress& ext_addr) { - CritScope cs(&crit_); - return whitelist->find(ext_addr) == whitelist->end(); +bool NATServer::TransEntry::AllowlistContains(const SocketAddress& ext_addr) { + webrtc::MutexLock lock(&mutex_); + return allowlist->find(ext_addr) == allowlist->end(); } } // namespace rtc diff --git a/rtc_base/nat_server.h b/rtc_base/nat_server.h index 46f01e9761..5078fbb2c1 100644 --- a/rtc_base/nat_server.h +++ b/rtc_base/nat_server.h @@ -20,6 +20,7 @@ #include "rtc_base/proxy_server.h" #include "rtc_base/socket_address_pair.h" #include "rtc_base/socket_factory.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread.h" namespace rtc { @@ -96,13 +97,13 @@ class NATServer : public sigslot::has_slots<> { TransEntry(const SocketAddressPair& r, AsyncUDPSocket* s, NAT* nat); ~TransEntry(); - void WhitelistInsert(const SocketAddress& addr); - bool WhitelistContains(const SocketAddress& ext_addr); + void AllowlistInsert(const SocketAddress& addr); + bool AllowlistContains(const SocketAddress& ext_addr); SocketAddressPair route; AsyncUDPSocket* socket; - AddressSet* whitelist; - CriticalSection crit_; + AddressSet* allowlist; + webrtc::Mutex mutex_; }; typedef std::map<SocketAddressPair, TransEntry*, RouteCmp> InternalMap; diff --git a/rtc_base/net_helpers.cc b/rtc_base/net_helpers.cc index 6ff3791738..c6685e2a65 100644 --- a/rtc_base/net_helpers.cc +++ b/rtc_base/net_helpers.cc @@ -10,8 +10,6 @@ #include "rtc_base/net_helpers.h" -#include <memory> - #if defined(WEBRTC_WIN) #include <ws2spi.h> #include <ws2tcpip.h> @@ -26,8 +24,11 @@ #endif #endif // defined(WEBRTC_POSIX) && !defined(__native_client__) +#include "api/task_queue/task_queue_base.h" #include "rtc_base/logging.h" #include "rtc_base/signal_thread.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/third_party/sigslot/sigslot.h" // for signal_with_thread... namespace rtc { @@ -83,18 +84,35 @@ int ResolveHostname(const std::string& hostname, #endif // !__native_client__ } -// AsyncResolver -AsyncResolver::AsyncResolver() : SignalThread(), error_(-1) {} +AsyncResolver::AsyncResolver() : error_(-1) {} -AsyncResolver::~AsyncResolver() = default; +AsyncResolver::~AsyncResolver() { + RTC_DCHECK_RUN_ON(&sequence_checker_); +} void AsyncResolver::Start(const SocketAddress& addr) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK(!destroy_called_); addr_ = addr; - // SignalThred Start will kickoff the resolve process. - SignalThread::Start(); + webrtc::TaskQueueBase* current_task_queue = webrtc::TaskQueueBase::Current(); + popup_thread_ = Thread::Create(); + popup_thread_->Start(); + popup_thread_->PostTask(webrtc::ToQueuedTask( + [this, flag = safety_.flag(), addr, current_task_queue] { + std::vector<IPAddress> addresses; + int error = + ResolveHostname(addr.hostname().c_str(), addr.family(), &addresses); + current_task_queue->PostTask(webrtc::ToQueuedTask( + std::move(flag), [this, error, addresses = std::move(addresses)] { + RTC_DCHECK_RUN_ON(&sequence_checker_); + ResolveDone(std::move(addresses), error); + })); + })); } bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const { + RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK(!destroy_called_); if (error_ != 0 || addresses_.empty()) return false; @@ -109,20 +127,40 @@ bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const { } int AsyncResolver::GetError() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK(!destroy_called_); return error_; } void AsyncResolver::Destroy(bool wait) { - SignalThread::Destroy(wait); + // Some callers have trouble guaranteeing that Destroy is called on the + // sequence guarded by |sequence_checker_|. + // RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK(!destroy_called_); + destroy_called_ = true; + MaybeSelfDestruct(); } -void AsyncResolver::DoWork() { - error_ = - ResolveHostname(addr_.hostname().c_str(), addr_.family(), &addresses_); +const std::vector<IPAddress>& AsyncResolver::addresses() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK(!destroy_called_); + return addresses_; } -void AsyncResolver::OnWorkDone() { +void AsyncResolver::ResolveDone(std::vector<IPAddress> addresses, int error) { + addresses_ = addresses; + error_ = error; + recursion_check_ = true; SignalDone(this); + MaybeSelfDestruct(); +} + +void AsyncResolver::MaybeSelfDestruct() { + if (!recursion_check_) { + delete this; + } else { + recursion_check_ = false; + } } const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) { diff --git a/rtc_base/net_helpers.h b/rtc_base/net_helpers.h index 1e06940be7..c6aa4be5b2 100644 --- a/rtc_base/net_helpers.h +++ b/rtc_base/net_helpers.h @@ -21,16 +21,23 @@ #include "rtc_base/async_resolver_interface.h" #include "rtc_base/ip_address.h" -#include "rtc_base/signal_thread.h" #include "rtc_base/socket_address.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/system/rtc_export.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/thread.h" +#include "rtc_base/thread_annotations.h" namespace rtc { // AsyncResolver will perform async DNS resolution, signaling the result on // the SignalDone from AsyncResolverInterface when the operation completes. -class RTC_EXPORT AsyncResolver : public SignalThread, - public AsyncResolverInterface { +// +// This class is thread-compatible, and all methods and destruction needs to +// happen from the same rtc::Thread, except for Destroy which is allowed to +// happen on another context provided it's not happening concurrently to another +// public API call, and is the last access to the object. +class RTC_EXPORT AsyncResolver : public AsyncResolverInterface { public: AsyncResolver(); ~AsyncResolver() override; @@ -40,17 +47,22 @@ class RTC_EXPORT AsyncResolver : public SignalThread, int GetError() const override; void Destroy(bool wait) override; - const std::vector<IPAddress>& addresses() const { return addresses_; } - void set_error(int error) { error_ = error; } - - protected: - void DoWork() override; - void OnWorkDone() override; + const std::vector<IPAddress>& addresses() const; private: - SocketAddress addr_; - std::vector<IPAddress> addresses_; - int error_; + void ResolveDone(std::vector<IPAddress> addresses, int error) + RTC_EXCLUSIVE_LOCKS_REQUIRED(sequence_checker_); + void MaybeSelfDestruct(); + + SocketAddress addr_ RTC_GUARDED_BY(sequence_checker_); + std::vector<IPAddress> addresses_ RTC_GUARDED_BY(sequence_checker_); + int error_ RTC_GUARDED_BY(sequence_checker_); + webrtc::ScopedTaskSafety safety_ RTC_GUARDED_BY(sequence_checker_); + std::unique_ptr<Thread> popup_thread_ RTC_GUARDED_BY(sequence_checker_); + bool recursion_check_ = + false; // Protects against SignalDone calling into Destroy. + bool destroy_called_ = false; + webrtc::SequenceChecker sequence_checker_; }; // rtc namespaced wrappers for inet_ntop and inet_pton so we can avoid diff --git a/rtc_base/network.cc b/rtc_base/network.cc index f30063d991..64aee4bdae 100644 --- a/rtc_base/network.cc +++ b/rtc_base/network.cc @@ -35,6 +35,7 @@ #include "rtc_base/string_utils.h" #include "rtc_base/strings/string_builder.h" #include "rtc_base/thread.h" +#include "system_wrappers/include/field_trial.h" namespace rtc { namespace { @@ -85,7 +86,8 @@ bool SortNetworks(const Network* a, const Network* b) { return a->key() < b->key(); } -uint16_t ComputeNetworkCostByType(int type) { +uint16_t ComputeNetworkCostByType(int type, + bool use_differentiated_cellular_costs) { // TODO(jonaso) : Rollout support for cellular network cost using A/B // experiment to make sure it does not introduce regressions. switch (type) { @@ -95,11 +97,19 @@ uint16_t ComputeNetworkCostByType(int type) { case rtc::ADAPTER_TYPE_WIFI: return kNetworkCostLow; case rtc::ADAPTER_TYPE_CELLULAR: + return kNetworkCostCellular; case rtc::ADAPTER_TYPE_CELLULAR_2G: + return use_differentiated_cellular_costs ? kNetworkCostCellular2G + : kNetworkCostCellular; case rtc::ADAPTER_TYPE_CELLULAR_3G: + return use_differentiated_cellular_costs ? kNetworkCostCellular3G + : kNetworkCostCellular; case rtc::ADAPTER_TYPE_CELLULAR_4G: + return use_differentiated_cellular_costs ? kNetworkCostCellular4G + : kNetworkCostCellular; case rtc::ADAPTER_TYPE_CELLULAR_5G: - return kNetworkCostCellular; + return use_differentiated_cellular_costs ? kNetworkCostCellular5G + : kNetworkCostCellular; case rtc::ADAPTER_TYPE_ANY: // Candidates gathered from the any-address/wildcard ports, as backups, // are given the maximum cost so that if there are other candidates with @@ -930,7 +940,9 @@ Network::Network(const std::string& name, scope_id_(0), ignored_(false), type_(ADAPTER_TYPE_UNKNOWN), - preference_(0) {} + preference_(0), + use_differentiated_cellular_costs_(webrtc::field_trial::IsEnabled( + "WebRTC-UseDifferentiatedCellularCosts")) {} Network::Network(const std::string& name, const std::string& desc, @@ -945,7 +957,9 @@ Network::Network(const std::string& name, scope_id_(0), ignored_(false), type_(type), - preference_(0) {} + preference_(0), + use_differentiated_cellular_costs_(webrtc::field_trial::IsEnabled( + "WebRTC-UseDifferentiatedCellularCosts")) {} Network::Network(const Network&) = default; @@ -1017,7 +1031,7 @@ webrtc::MdnsResponderInterface* Network::GetMdnsResponder() const { uint16_t Network::GetCost() const { AdapterType type = IsVpn() ? underlying_type_for_vpn_ : type_; - return ComputeNetworkCostByType(type); + return ComputeNetworkCostByType(type, use_differentiated_cellular_costs_); } std::string Network::ToString() const { diff --git a/rtc_base/network.h b/rtc_base/network.h index bd05b6ae16..a67d2a2339 100644 --- a/rtc_base/network.h +++ b/rtc_base/network.h @@ -462,6 +462,7 @@ class RTC_EXPORT Network { int preference_; bool active_ = true; uint16_t id_ = 0; + bool use_differentiated_cellular_costs_ = false; friend class NetworkManager; }; diff --git a/rtc_base/network/BUILD.gn b/rtc_base/network/BUILD.gn index 1d06defb3b..35ae3d45f7 100644 --- a/rtc_base/network/BUILD.gn +++ b/rtc_base/network/BUILD.gn @@ -13,8 +13,6 @@ rtc_library("sent_packet") { "sent_packet.cc", "sent_packet.h", ] - deps = [ - "../system:rtc_export", - "//third_party/abseil-cpp/absl/types:optional", - ] + deps = [ "../system:rtc_export" ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } diff --git a/rtc_base/one_time_event.h b/rtc_base/one_time_event.h index c5ccbf6933..d33ddbd587 100644 --- a/rtc_base/one_time_event.h +++ b/rtc_base/one_time_event.h @@ -11,7 +11,7 @@ #ifndef RTC_BASE_ONE_TIME_EVENT_H_ #define RTC_BASE_ONE_TIME_EVENT_H_ -#include "rtc_base/critical_section.h" +#include "rtc_base/synchronization/mutex.h" namespace webrtc { // Provides a simple way to perform an operation (such as logging) one @@ -26,7 +26,7 @@ class OneTimeEvent { public: OneTimeEvent() {} bool operator()() { - rtc::CritScope cs(&critsect_); + MutexLock lock(&mutex_); if (happened_) { return false; } @@ -36,7 +36,7 @@ class OneTimeEvent { private: bool happened_ = false; - rtc::CriticalSection critsect_; + Mutex mutex_; }; // A non-thread-safe, ligher-weight version of the OneTimeEvent class. diff --git a/rtc_base/openssl_adapter_unittest.cc b/rtc_base/openssl_adapter_unittest.cc index b161304d65..4bd87992d4 100644 --- a/rtc_base/openssl_adapter_unittest.cc +++ b/rtc_base/openssl_adapter_unittest.cc @@ -25,28 +25,34 @@ namespace { class MockAsyncSocket : public AsyncSocket { public: virtual ~MockAsyncSocket() = default; - MOCK_METHOD1(Accept, AsyncSocket*(SocketAddress*)); - MOCK_CONST_METHOD0(GetLocalAddress, SocketAddress()); - MOCK_CONST_METHOD0(GetRemoteAddress, SocketAddress()); - MOCK_METHOD1(Bind, int(const SocketAddress&)); - MOCK_METHOD1(Connect, int(const SocketAddress&)); - MOCK_METHOD2(Send, int(const void*, size_t)); - MOCK_METHOD3(SendTo, int(const void*, size_t, const SocketAddress&)); - MOCK_METHOD3(Recv, int(void*, size_t, int64_t*)); - MOCK_METHOD4(RecvFrom, int(void*, size_t, SocketAddress*, int64_t*)); - MOCK_METHOD1(Listen, int(int)); - MOCK_METHOD0(Close, int()); - MOCK_CONST_METHOD0(GetError, int()); - MOCK_METHOD1(SetError, void(int)); - MOCK_CONST_METHOD0(GetState, ConnState()); - MOCK_METHOD2(GetOption, int(Option, int*)); - MOCK_METHOD2(SetOption, int(Option, int)); + MOCK_METHOD(AsyncSocket*, Accept, (SocketAddress*), (override)); + MOCK_METHOD(SocketAddress, GetLocalAddress, (), (const, override)); + MOCK_METHOD(SocketAddress, GetRemoteAddress, (), (const, override)); + MOCK_METHOD(int, Bind, (const SocketAddress&), (override)); + MOCK_METHOD(int, Connect, (const SocketAddress&), (override)); + MOCK_METHOD(int, Send, (const void*, size_t), (override)); + MOCK_METHOD(int, + SendTo, + (const void*, size_t, const SocketAddress&), + (override)); + MOCK_METHOD(int, Recv, (void*, size_t, int64_t*), (override)); + MOCK_METHOD(int, + RecvFrom, + (void*, size_t, SocketAddress*, int64_t*), + (override)); + MOCK_METHOD(int, Listen, (int), (override)); + MOCK_METHOD(int, Close, (), (override)); + MOCK_METHOD(int, GetError, (), (const, override)); + MOCK_METHOD(void, SetError, (int), (override)); + MOCK_METHOD(ConnState, GetState, (), (const, override)); + MOCK_METHOD(int, GetOption, (Option, int*), (override)); + MOCK_METHOD(int, SetOption, (Option, int), (override)); }; class MockCertVerifier : public SSLCertificateVerifier { public: virtual ~MockCertVerifier() = default; - MOCK_METHOD1(Verify, bool(const SSLCertificate&)); + MOCK_METHOD(bool, Verify, (const SSLCertificate&), (override)); }; } // namespace diff --git a/rtc_base/operations_chain_unittest.cc b/rtc_base/operations_chain_unittest.cc index 968f94c060..ed3c924998 100644 --- a/rtc_base/operations_chain_unittest.cc +++ b/rtc_base/operations_chain_unittest.cc @@ -369,14 +369,15 @@ TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) { #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) -TEST(OperationsChainTest, OperationNotInvokingCallbackShouldCrash) { +TEST(OperationsChainDeathTest, OperationNotInvokingCallbackShouldCrash) { scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create(); EXPECT_DEATH( operations_chain->ChainOperation([](std::function<void()> callback) {}), ""); } -TEST(OperationsChainTest, OperationInvokingCallbackMultipleTimesShouldCrash) { +TEST(OperationsChainDeathTest, + OperationInvokingCallbackMultipleTimesShouldCrash) { scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create(); EXPECT_DEATH( operations_chain->ChainOperation([](std::function<void()> callback) { diff --git a/rtc_base/physical_socket_server.cc b/rtc_base/physical_socket_server.cc index 080534af2c..05b32557be 100644 --- a/rtc_base/physical_socket_server.cc +++ b/rtc_base/physical_socket_server.cc @@ -24,7 +24,6 @@ // "poll" will be used to wait for the signal dispatcher. #include <poll.h> #endif -#include <signal.h> #include <sys/ioctl.h> #include <sys/select.h> #include <sys/time.h> @@ -956,182 +955,7 @@ class EventDispatcher : public Dispatcher { PhysicalSocketServer* ss_; int afd_[2]; bool fSignaled_; - CriticalSection crit_; -}; - -// These two classes use the self-pipe trick to deliver POSIX signals to our -// select loop. This is the only safe, reliable, cross-platform way to do -// non-trivial things with a POSIX signal in an event-driven program (until -// proper pselect() implementations become ubiquitous). - -class PosixSignalHandler { - public: - // POSIX only specifies 32 signals, but in principle the system might have - // more and the programmer might choose to use them, so we size our array - // for 128. - static constexpr int kNumPosixSignals = 128; - - // There is just a single global instance. (Signal handlers do not get any - // sort of user-defined void * parameter, so they can't access anything that - // isn't global.) - static PosixSignalHandler* Instance() { - static PosixSignalHandler* const instance = new PosixSignalHandler(); - return instance; - } - - // Returns true if the given signal number is set. - bool IsSignalSet(int signum) const { - RTC_DCHECK(signum < static_cast<int>(arraysize(received_signal_))); - if (signum < static_cast<int>(arraysize(received_signal_))) { - return received_signal_[signum]; - } else { - return false; - } - } - - // Clears the given signal number. - void ClearSignal(int signum) { - RTC_DCHECK(signum < static_cast<int>(arraysize(received_signal_))); - if (signum < static_cast<int>(arraysize(received_signal_))) { - received_signal_[signum] = false; - } - } - - // Returns the file descriptor to monitor for signal events. - int GetDescriptor() const { return afd_[0]; } - - // This is called directly from our real signal handler, so it must be - // signal-handler-safe. That means it cannot assume anything about the - // user-level state of the process, since the handler could be executed at any - // time on any thread. - void OnPosixSignalReceived(int signum) { - if (signum >= static_cast<int>(arraysize(received_signal_))) { - // We don't have space in our array for this. - return; - } - // Set a flag saying we've seen this signal. - received_signal_[signum] = true; - // Notify application code that we got a signal. - const uint8_t b[1] = {0}; - if (-1 == write(afd_[1], b, sizeof(b))) { - // Nothing we can do here. If there's an error somehow then there's - // nothing we can safely do from a signal handler. - // No, we can't even safely log it. - // But, we still have to check the return value here. Otherwise, - // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help. - return; - } - } - - private: - PosixSignalHandler() { - if (pipe(afd_) < 0) { - RTC_LOG_ERR(LS_ERROR) << "pipe failed"; - return; - } - if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) { - RTC_LOG_ERR(LS_WARNING) << "fcntl #1 failed"; - } - if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) { - RTC_LOG_ERR(LS_WARNING) << "fcntl #2 failed"; - } - memset(const_cast<void*>(static_cast<volatile void*>(received_signal_)), 0, - sizeof(received_signal_)); - } - - ~PosixSignalHandler() { - int fd1 = afd_[0]; - int fd2 = afd_[1]; - // We clobber the stored file descriptor numbers here or else in principle - // a signal that happens to be delivered during application termination - // could erroneously write a zero byte to an unrelated file handle in - // OnPosixSignalReceived() if some other file happens to be opened later - // during shutdown and happens to be given the same file descriptor number - // as our pipe had. Unfortunately even with this precaution there is still a - // race where that could occur if said signal happens to be handled - // concurrently with this code and happens to have already read the value of - // afd_[1] from memory before we clobber it, but that's unlikely. - afd_[0] = -1; - afd_[1] = -1; - close(fd1); - close(fd2); - } - - int afd_[2]; - // These are boolean flags that will be set in our signal handler and read - // and cleared from Wait(). There is a race involved in this, but it is - // benign. The signal handler sets the flag before signaling the pipe, so - // we'll never end up blocking in select() while a flag is still true. - // However, if two of the same signal arrive close to each other then it's - // possible that the second time the handler may set the flag while it's still - // true, meaning that signal will be missed. But the first occurrence of it - // will still be handled, so this isn't a problem. - // Volatile is not necessary here for correctness, but this data _is_ volatile - // so I've marked it as such. - volatile uint8_t received_signal_[kNumPosixSignals]; -}; - -class PosixSignalDispatcher : public Dispatcher { - public: - PosixSignalDispatcher(PhysicalSocketServer* owner) : owner_(owner) { - owner_->Add(this); - } - - ~PosixSignalDispatcher() override { owner_->Remove(this); } - - uint32_t GetRequestedEvents() override { return DE_READ; } - - void OnPreEvent(uint32_t ff) override { - // Events might get grouped if signals come very fast, so we read out up to - // 16 bytes to make sure we keep the pipe empty. - uint8_t b[16]; - ssize_t ret = read(GetDescriptor(), b, sizeof(b)); - if (ret < 0) { - RTC_LOG_ERR(LS_WARNING) << "Error in read()"; - } else if (ret == 0) { - RTC_LOG(LS_WARNING) << "Should have read at least one byte"; - } - } - - void OnEvent(uint32_t ff, int err) override { - for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals; - ++signum) { - if (PosixSignalHandler::Instance()->IsSignalSet(signum)) { - PosixSignalHandler::Instance()->ClearSignal(signum); - HandlerMap::iterator i = handlers_.find(signum); - if (i == handlers_.end()) { - // This can happen if a signal is delivered to our process at around - // the same time as we unset our handler for it. It is not an error - // condition, but it's unusual enough to be worth logging. - RTC_LOG(LS_INFO) << "Received signal with no handler: " << signum; - } else { - // Otherwise, execute our handler. - (*i->second)(signum); - } - } - } - } - - int GetDescriptor() override { - return PosixSignalHandler::Instance()->GetDescriptor(); - } - - bool IsDescriptorClosed() override { return false; } - - void SetHandler(int signum, void (*handler)(int)) { - handlers_[signum] = handler; - } - - void ClearHandler(int signum) { handlers_.erase(signum); } - - bool HasHandlers() { return !handlers_.empty(); } - - private: - typedef std::map<int, void (*)(int)> HandlerMap; - - HandlerMap handlers_; - // Our owner. - PhysicalSocketServer* owner_; + RecursiveCriticalSection crit_; }; #endif // WEBRTC_POSIX @@ -1205,31 +1029,32 @@ class Signaler : public EventDispatcher { bool* pf_; }; -PhysicalSocketServer::PhysicalSocketServer() : fWait_(false) { +PhysicalSocketServer::PhysicalSocketServer() + : +#if defined(WEBRTC_USE_EPOLL) + // Since Linux 2.6.8, the size argument is ignored, but must be greater + // than zero. Before that the size served as hint to the kernel for the + // amount of space to initially allocate in internal data structures. + epoll_fd_(epoll_create(FD_SETSIZE)), +#endif +#if defined(WEBRTC_WIN) + socket_ev_(WSACreateEvent()), +#endif + fWait_(false) { #if defined(WEBRTC_USE_EPOLL) - // Since Linux 2.6.8, the size argument is ignored, but must be greater than - // zero. Before that the size served as hint to the kernel for the amount of - // space to initially allocate in internal data structures. - epoll_fd_ = epoll_create(FD_SETSIZE); if (epoll_fd_ == -1) { // Not an error, will fall back to "select" below. RTC_LOG_E(LS_WARNING, EN, errno) << "epoll_create"; - epoll_fd_ = INVALID_SOCKET; + // Note that -1 == INVALID_SOCKET, the alias used by later checks. } #endif signal_wakeup_ = new Signaler(this, &fWait_); -#if defined(WEBRTC_WIN) - socket_ev_ = WSACreateEvent(); -#endif } PhysicalSocketServer::~PhysicalSocketServer() { #if defined(WEBRTC_WIN) WSACloseEvent(socket_ev_); #endif -#if defined(WEBRTC_POSIX) - signal_dispatcher_.reset(); -#endif delete signal_wakeup_; #if defined(WEBRTC_USE_EPOLL) if (epoll_fd_ != INVALID_SOCKET) { @@ -1540,12 +1365,6 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { #if defined(WEBRTC_USE_EPOLL) -// Initial number of events to process with one call to "epoll_wait". -static const size_t kInitialEpollEvents = 128; - -// Maximum number of events to process with one call to "epoll_wait". -static const size_t kMaxEpollEvents = 8192; - void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) { RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); int fd = pdispatcher->GetDescriptor(); @@ -1612,20 +1431,13 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) { tvStop = TimeAfter(cmsWait); } - if (epoll_events_.empty()) { - // The initial space to receive events is created only if epoll is used. - epoll_events_.resize(kInitialEpollEvents); - } - fWait_ = true; - while (fWait_) { // Wait then call handlers as appropriate // < 0 means error // 0 means timeout // > 0 means count of descriptors ready - int n = epoll_wait(epoll_fd_, &epoll_events_[0], - static_cast<int>(epoll_events_.size()), + int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(), static_cast<int>(tvWait)); if (n < 0) { if (errno != EINTR) { @@ -1658,13 +1470,6 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) { } } - if (static_cast<size_t>(n) == epoll_events_.size() && - epoll_events_.size() < kMaxEpollEvents) { - // We used the complete space to receive events, increase size for future - // iterations. - epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents)); - } - if (cmsWait != kForever) { tvWait = TimeDiff(tvStop, TimeMillis()); if (tvWait < 0) { @@ -1746,62 +1551,6 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { #endif // WEBRTC_USE_EPOLL -static void GlobalSignalHandler(int signum) { - PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); -} - -bool PhysicalSocketServer::SetPosixSignalHandler(int signum, - void (*handler)(int)) { - // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, - // otherwise set one. - if (handler == SIG_IGN || handler == SIG_DFL) { - if (!InstallSignal(signum, handler)) { - return false; - } - if (signal_dispatcher_) { - signal_dispatcher_->ClearHandler(signum); - if (!signal_dispatcher_->HasHandlers()) { - signal_dispatcher_.reset(); - } - } - } else { - if (!signal_dispatcher_) { - signal_dispatcher_.reset(new PosixSignalDispatcher(this)); - } - signal_dispatcher_->SetHandler(signum, handler); - if (!InstallSignal(signum, &GlobalSignalHandler)) { - return false; - } - } - return true; -} - -Dispatcher* PhysicalSocketServer::signal_dispatcher() { - return signal_dispatcher_.get(); -} - -bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) { - struct sigaction act; - // It doesn't really matter what we set this mask to. - if (sigemptyset(&act.sa_mask) != 0) { - RTC_LOG_ERR(LS_ERROR) << "Couldn't set mask"; - return false; - } - act.sa_handler = handler; -#if !defined(__native_client__) - // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it - // and it's a nuisance. Though some syscalls still return EINTR and there's no - // real standard for which ones. :( - act.sa_flags = SA_RESTART; -#else - act.sa_flags = 0; -#endif - if (sigaction(signum, &act, nullptr) != 0) { - RTC_LOG_ERR(LS_ERROR) << "Couldn't set sigaction"; - return false; - } - return true; -} #endif // WEBRTC_POSIX #if defined(WEBRTC_WIN) diff --git a/rtc_base/physical_socket_server.h b/rtc_base/physical_socket_server.h index a71810f3db..7eaf590e3a 100644 --- a/rtc_base/physical_socket_server.h +++ b/rtc_base/physical_socket_server.h @@ -16,14 +16,16 @@ #define WEBRTC_USE_EPOLL 1 #endif +#include <array> #include <memory> #include <set> #include <vector> -#include "rtc_base/critical_section.h" +#include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/net_helpers.h" #include "rtc_base/socket_server.h" #include "rtc_base/system/rtc_export.h" +#include "rtc_base/thread_annotations.h" #if defined(WEBRTC_POSIX) typedef int SOCKET; @@ -41,9 +43,6 @@ enum DispatcherEvent { }; class Signaler; -#if defined(WEBRTC_POSIX) -class PosixSignalDispatcher; -#endif class Dispatcher { public: @@ -82,33 +81,16 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer { void Remove(Dispatcher* dispatcher); void Update(Dispatcher* dispatcher); -#if defined(WEBRTC_POSIX) - // Sets the function to be executed in response to the specified POSIX signal. - // The function is executed from inside Wait() using the "self-pipe trick"-- - // regardless of which thread receives the signal--and hence can safely - // manipulate user-level data structures. - // "handler" may be SIG_IGN, SIG_DFL, or a user-specified function, just like - // with signal(2). - // Only one PhysicalSocketServer should have user-level signal handlers. - // Dispatching signals on multiple PhysicalSocketServers is not reliable. - // The signal mask is not modified. It is the caller's responsibily to - // maintain it as desired. - virtual bool SetPosixSignalHandler(int signum, void (*handler)(int)); - - protected: - Dispatcher* signal_dispatcher(); -#endif - private: + // The number of events to process with one call to "epoll_wait". + static constexpr size_t kNumEpollEvents = 128; + typedef std::set<Dispatcher*> DispatcherSet; - void AddRemovePendingDispatchers(); + void AddRemovePendingDispatchers() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); #if defined(WEBRTC_POSIX) bool WaitSelect(int cms, bool process_io); - static bool InstallSignal(int signum, void (*handler)(int)); - - std::unique_ptr<PosixSignalDispatcher> signal_dispatcher_; #endif // WEBRTC_POSIX #if defined(WEBRTC_USE_EPOLL) void AddEpoll(Dispatcher* dispatcher); @@ -117,19 +99,23 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer { bool WaitEpoll(int cms); bool WaitPoll(int cms, Dispatcher* dispatcher); - int epoll_fd_ = INVALID_SOCKET; - std::vector<struct epoll_event> epoll_events_; + // This array is accessed in isolation by a thread calling into Wait(). + // It's useless to use a SequenceChecker to guard it because a socket + // server can outlive the thread it's bound to, forcing the Wait call + // to have to reset the sequence checker on Wait calls. + std::array<epoll_event, kNumEpollEvents> epoll_events_; + const int epoll_fd_ = INVALID_SOCKET; #endif // WEBRTC_USE_EPOLL - DispatcherSet dispatchers_; - DispatcherSet pending_add_dispatchers_; - DispatcherSet pending_remove_dispatchers_; - bool processing_dispatchers_ = false; - Signaler* signal_wakeup_; - CriticalSection crit_; - bool fWait_; + DispatcherSet dispatchers_ RTC_GUARDED_BY(crit_); + DispatcherSet pending_add_dispatchers_ RTC_GUARDED_BY(crit_); + DispatcherSet pending_remove_dispatchers_ RTC_GUARDED_BY(crit_); + bool processing_dispatchers_ RTC_GUARDED_BY(crit_) = false; + Signaler* signal_wakeup_; // Assigned in constructor only + RecursiveCriticalSection crit_; #if defined(WEBRTC_WIN) - WSAEVENT socket_ev_; + const WSAEVENT socket_ev_; #endif + bool fWait_; }; class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { @@ -205,7 +191,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { SOCKET s_; bool udp_; int family_ = 0; - CriticalSection crit_; + RecursiveCriticalSection crit_; int error_ RTC_GUARDED_BY(crit_); ConnState state_; AsyncResolver* resolver_; diff --git a/rtc_base/physical_socket_server_unittest.cc b/rtc_base/physical_socket_server_unittest.cc index 5083ca1791..586b9db292 100644 --- a/rtc_base/physical_socket_server_unittest.cc +++ b/rtc_base/physical_socket_server_unittest.cc @@ -501,139 +501,6 @@ TEST_F(PhysicalSocketTest, server_->set_network_binder(nullptr); } -class PosixSignalDeliveryTest : public ::testing::Test { - public: - static void RecordSignal(int signum) { - signals_received_.push_back(signum); - signaled_thread_ = Thread::Current(); - } - - protected: - void SetUp() override { ss_.reset(new PhysicalSocketServer()); } - - void TearDown() override { - ss_.reset(nullptr); - signals_received_.clear(); - signaled_thread_ = nullptr; - } - - bool ExpectSignal(int signum) { - if (signals_received_.empty()) { - RTC_LOG(LS_ERROR) << "ExpectSignal(): No signal received"; - return false; - } - if (signals_received_[0] != signum) { - RTC_LOG(LS_ERROR) << "ExpectSignal(): Received signal " - << signals_received_[0] << ", expected " << signum; - return false; - } - signals_received_.erase(signals_received_.begin()); - return true; - } - - bool ExpectNone() { - bool ret = signals_received_.empty(); - if (!ret) { - RTC_LOG(LS_ERROR) << "ExpectNone(): Received signal " - << signals_received_[0] << ", expected none"; - } - return ret; - } - - static std::vector<int> signals_received_; - static Thread* signaled_thread_; - - std::unique_ptr<PhysicalSocketServer> ss_; -}; - -std::vector<int> PosixSignalDeliveryTest::signals_received_; -Thread* PosixSignalDeliveryTest::signaled_thread_ = nullptr; - -// Test receiving a synchronous signal while not in Wait() and then entering -// Wait() afterwards. -// TODO(webrtc:7864): Fails on real iOS devices -#if defined(WEBRTC_IOS) && defined(WEBRTC_ARCH_ARM_FAMILY) -#define MAYBE_RaiseThenWait DISABLED_RaiseThenWait -#else -#define MAYBE_RaiseThenWait RaiseThenWait -#endif -TEST_F(PosixSignalDeliveryTest, MAYBE_RaiseThenWait) { - ASSERT_TRUE(ss_->SetPosixSignalHandler(SIGTERM, &RecordSignal)); - raise(SIGTERM); - EXPECT_TRUE(ss_->Wait(0, true)); - EXPECT_TRUE(ExpectSignal(SIGTERM)); - EXPECT_TRUE(ExpectNone()); -} - -// Test that we can handle getting tons of repeated signals and that we see all -// the different ones. -// TODO(webrtc:7864): Fails on real iOS devices -#if defined(WEBRTC_IOS) && defined(WEBRTC_ARCH_ARM_FAMILY) -#define MAYBE_InsanelyManySignals DISABLED_InsanelyManySignals -#else -#define MAYBE_InsanelyManySignals InsanelyManySignals -#endif -TEST_F(PosixSignalDeliveryTest, MAYBE_InsanelyManySignals) { - ss_->SetPosixSignalHandler(SIGTERM, &RecordSignal); - ss_->SetPosixSignalHandler(SIGINT, &RecordSignal); - for (int i = 0; i < 10000; ++i) { - raise(SIGTERM); - } - raise(SIGINT); - EXPECT_TRUE(ss_->Wait(0, true)); - // Order will be lowest signal numbers first. - EXPECT_TRUE(ExpectSignal(SIGINT)); - EXPECT_TRUE(ExpectSignal(SIGTERM)); - EXPECT_TRUE(ExpectNone()); -} - -// Test that a signal during a Wait() call is detected. -TEST_F(PosixSignalDeliveryTest, SignalDuringWait) { - ss_->SetPosixSignalHandler(SIGALRM, &RecordSignal); - alarm(1); - EXPECT_TRUE(ss_->Wait(1500, true)); - EXPECT_TRUE(ExpectSignal(SIGALRM)); - EXPECT_TRUE(ExpectNone()); -} - -// Test that it works no matter what thread the kernel chooses to give the -// signal to (since it's not guaranteed to be the one that Wait() runs on). -// TODO(webrtc:7864): Fails on real iOS devices -#if defined(WEBRTC_IOS) && defined(WEBRTC_ARCH_ARM_FAMILY) -#define MAYBE_SignalOnDifferentThread DISABLED_SignalOnDifferentThread -#else -#define MAYBE_SignalOnDifferentThread SignalOnDifferentThread -#endif -TEST_F(PosixSignalDeliveryTest, DISABLED_SignalOnDifferentThread) { - ss_->SetPosixSignalHandler(SIGTERM, &RecordSignal); - // Mask out SIGTERM so that it can't be delivered to this thread. - sigset_t mask; - sigemptyset(&mask); - sigaddset(&mask, SIGTERM); - EXPECT_EQ(0, pthread_sigmask(SIG_SETMASK, &mask, nullptr)); - // Start a new thread that raises it. It will have to be delivered to that - // thread. Our implementation should safely handle it and dispatch - // RecordSignal() on this thread. - std::unique_ptr<Thread> thread(Thread::CreateWithSocketServer()); - thread->Start(); - thread->PostTask(RTC_FROM_HERE, [&thread]() { - thread->socketserver()->Wait(1000, false); - // Allow SIGTERM. This will be the only thread with it not masked so it will - // be delivered to us. - sigset_t mask; - sigemptyset(&mask); - pthread_sigmask(SIG_SETMASK, &mask, nullptr); - - // Raise it. - raise(SIGTERM); - }); - - EXPECT_TRUE(ss_->Wait(1500, true)); - EXPECT_TRUE(ExpectSignal(SIGTERM)); - EXPECT_EQ(Thread::Current(), signaled_thread_); - EXPECT_TRUE(ExpectNone()); -} - #endif } // namespace rtc diff --git a/rtc_base/platform_thread_types.cc b/rtc_base/platform_thread_types.cc index ed4a228262..b0243b41dc 100644 --- a/rtc_base/platform_thread_types.cc +++ b/rtc_base/platform_thread_types.cc @@ -15,6 +15,16 @@ #include <sys/syscall.h> #endif +#if defined(WEBRTC_WIN) +#include "rtc_base/arraysize.h" + +// The SetThreadDescription API was brought in version 1607 of Windows 10. +// For compatibility with various versions of winuser and avoid clashing with +// a potentially defined type, we use the RTC_ prefix. +typedef HRESULT(WINAPI* RTC_SetThreadDescription)(HANDLE hThread, + PCWSTR lpThreadDescription); +#endif + namespace rtc { PlatformThreadId CurrentThreadId() { @@ -58,6 +68,24 @@ bool IsThreadRefEqual(const PlatformThreadRef& a, const PlatformThreadRef& b) { void SetCurrentThreadName(const char* name) { #if defined(WEBRTC_WIN) + // The SetThreadDescription API works even if no debugger is attached. + // The names set with this API also show up in ETW traces. Very handy. + static auto set_thread_description_func = + reinterpret_cast<RTC_SetThreadDescription>(::GetProcAddress( + ::GetModuleHandleA("Kernel32.dll"), "SetThreadDescription")); + if (set_thread_description_func) { + // Convert from ASCII to UTF-16. + wchar_t wide_thread_name[64]; + for (size_t i = 0; i < arraysize(wide_thread_name) - 1; ++i) { + wide_thread_name[i] = name[i]; + if (wide_thread_name[i] == L'\0') + break; + } + // Guarantee null-termination. + wide_thread_name[arraysize(wide_thread_name) - 1] = L'\0'; + set_thread_description_func(::GetCurrentThread(), wide_thread_name); + } + // For details see: // https://docs.microsoft.com/en-us/visualstudio/debugger/how-to-set-a-thread-name-in-native-code #pragma pack(push, 8) diff --git a/rtc_base/rate_limiter.cc b/rtc_base/rate_limiter.cc index 7394c3eb89..0f3f343aed 100644 --- a/rtc_base/rate_limiter.cc +++ b/rtc_base/rate_limiter.cc @@ -31,7 +31,7 @@ RateLimiter::~RateLimiter() {} // calling SetMaxRate() and a timed maintenance thread periodically updating // the RTT. bool RateLimiter::TryUseRate(size_t packet_size_bytes) { - rtc::CritScope cs(&lock_); + MutexLock lock(&lock_); int64_t now_ms = clock_->TimeInMilliseconds(); absl::optional<uint32_t> current_rate = current_rate_.Rate(now_ms); if (current_rate) { @@ -53,14 +53,14 @@ bool RateLimiter::TryUseRate(size_t packet_size_bytes) { } void RateLimiter::SetMaxRate(uint32_t max_rate_bps) { - rtc::CritScope cs(&lock_); + MutexLock lock(&lock_); max_rate_bps_ = max_rate_bps; } // Set the window size over which to measure the current bitrate. // For retransmissions, this is typically the RTT. bool RateLimiter::SetWindowSize(int64_t window_size_ms) { - rtc::CritScope cs(&lock_); + MutexLock lock(&lock_); window_size_ms_ = window_size_ms; return current_rate_.SetWindowSize(window_size_ms, clock_->TimeInMilliseconds()); diff --git a/rtc_base/rate_limiter.h b/rtc_base/rate_limiter.h index 1c956d788b..051ccf6aa6 100644 --- a/rtc_base/rate_limiter.h +++ b/rtc_base/rate_limiter.h @@ -15,8 +15,8 @@ #include <stdint.h> #include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" #include "rtc_base/rate_statistics.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" namespace webrtc { @@ -45,7 +45,7 @@ class RateLimiter { private: Clock* const clock_; - rtc::CriticalSection lock_; + Mutex lock_; RateStatistics current_rate_ RTC_GUARDED_BY(lock_); int64_t window_size_ms_ RTC_GUARDED_BY(lock_); uint32_t max_rate_bps_ RTC_GUARDED_BY(lock_); diff --git a/rtc_base/rate_statistics.cc b/rtc_base/rate_statistics.cc index c4c2e78581..85621fa555 100644 --- a/rtc_base/rate_statistics.cc +++ b/rtc_base/rate_statistics.cc @@ -20,29 +20,26 @@ namespace webrtc { +RateStatistics::Bucket::Bucket(int64_t timestamp) + : sum(0), num_samples(0), timestamp(timestamp) {} + RateStatistics::RateStatistics(int64_t window_size_ms, float scale) - : buckets_(new Bucket[window_size_ms]()), - accumulated_count_(0), + : accumulated_count_(0), + first_timestamp_(-1), num_samples_(0), - oldest_time_(-window_size_ms), - oldest_index_(0), scale_(scale), max_window_size_ms_(window_size_ms), current_window_size_ms_(max_window_size_ms_) {} RateStatistics::RateStatistics(const RateStatistics& other) - : accumulated_count_(other.accumulated_count_), + : buckets_(other.buckets_), + accumulated_count_(other.accumulated_count_), + first_timestamp_(other.first_timestamp_), overflow_(other.overflow_), num_samples_(other.num_samples_), - oldest_time_(other.oldest_time_), - oldest_index_(other.oldest_index_), scale_(other.scale_), max_window_size_ms_(other.max_window_size_ms_), - current_window_size_ms_(other.current_window_size_ms_) { - buckets_ = std::make_unique<Bucket[]>(other.max_window_size_ms_); - std::copy(other.buckets_.get(), - other.buckets_.get() + other.max_window_size_ms_, buckets_.get()); -} + current_window_size_ms_(other.current_window_size_ms_) {} RateStatistics::RateStatistics(RateStatistics&& other) = default; @@ -52,33 +49,33 @@ void RateStatistics::Reset() { accumulated_count_ = 0; overflow_ = false; num_samples_ = 0; - oldest_time_ = -max_window_size_ms_; - oldest_index_ = 0; + first_timestamp_ = -1; current_window_size_ms_ = max_window_size_ms_; - for (int64_t i = 0; i < max_window_size_ms_; i++) - buckets_[i] = Bucket(); + buckets_.clear(); } void RateStatistics::Update(int64_t count, int64_t now_ms) { - RTC_DCHECK_LE(0, count); - if (now_ms < oldest_time_) { - // Too old data is ignored. - return; - } + RTC_DCHECK_GE(count, 0); EraseOld(now_ms); + if (first_timestamp_ == -1) { + first_timestamp_ = now_ms; + } + + if (buckets_.empty() || now_ms != buckets_.back().timestamp) { + if (!buckets_.empty() && now_ms < buckets_.back().timestamp) { + RTC_LOG(LS_WARNING) << "Timestamp " << now_ms + << " is before the last added " + "timestamp in the rate window: " + << buckets_.back().timestamp << ", aligning to that."; + now_ms = buckets_.back().timestamp; + } + buckets_.emplace_back(now_ms); + } + Bucket& last_bucket = buckets_.back(); + last_bucket.sum += count; + ++last_bucket.num_samples; - // First ever sample, reset window to start now. - if (!IsInitialized()) - oldest_time_ = now_ms; - - uint32_t now_offset = rtc::dchecked_cast<uint32_t>(now_ms - oldest_time_); - RTC_DCHECK_LT(now_offset, max_window_size_ms_); - uint32_t index = oldest_index_ + now_offset; - if (index >= max_window_size_ms_) - index -= max_window_size_ms_; - buckets_[index].sum += count; - ++buckets_[index].samples; if (std::numeric_limits<int64_t>::max() - accumulated_count_ > count) { accumulated_count_ += count; } else { @@ -92,10 +89,22 @@ absl::optional<int64_t> RateStatistics::Rate(int64_t now_ms) const { // of the members as mutable... const_cast<RateStatistics*>(this)->EraseOld(now_ms); + int active_window_size = 0; + if (first_timestamp_ != -1) { + if (first_timestamp_ <= now_ms - current_window_size_ms_) { + // Count window as full even if no data points currently in view, if the + // data stream started before the window. + active_window_size = current_window_size_ms_; + } else { + // Size of a single bucket is 1ms, so even if now_ms == first_timestmap_ + // the window size should be 1. + active_window_size = now_ms - first_timestamp_ + 1; + } + } + // If window is a single bucket or there is only one sample in a data set that // has not grown to the full window size, or if the accumulator has // overflowed, treat this as rate unavailable. - int active_window_size = now_ms - oldest_time_ + 1; if (num_samples_ == 0 || active_window_size <= 1 || (num_samples_ <= 1 && rtc::SafeLt(active_window_size, current_window_size_ms_)) || @@ -114,43 +123,35 @@ absl::optional<int64_t> RateStatistics::Rate(int64_t now_ms) const { } void RateStatistics::EraseOld(int64_t now_ms) { - if (!IsInitialized()) - return; - // New oldest time that is included in data set. - int64_t new_oldest_time = now_ms - current_window_size_ms_ + 1; - - // New oldest time is older than the current one, no need to cull data. - if (new_oldest_time <= oldest_time_) - return; + const int64_t new_oldest_time = now_ms - current_window_size_ms_ + 1; // Loop over buckets and remove too old data points. - while (num_samples_ > 0 && oldest_time_ < new_oldest_time) { - const Bucket& oldest_bucket = buckets_[oldest_index_]; + while (!buckets_.empty() && buckets_.front().timestamp < new_oldest_time) { + const Bucket& oldest_bucket = buckets_.front(); RTC_DCHECK_GE(accumulated_count_, oldest_bucket.sum); - RTC_DCHECK_GE(num_samples_, oldest_bucket.samples); + RTC_DCHECK_GE(num_samples_, oldest_bucket.num_samples); accumulated_count_ -= oldest_bucket.sum; - num_samples_ -= oldest_bucket.samples; - buckets_[oldest_index_] = Bucket(); - if (++oldest_index_ >= max_window_size_ms_) - oldest_index_ = 0; - ++oldest_time_; + num_samples_ -= oldest_bucket.num_samples; + buckets_.pop_front(); // This does not clear overflow_ even when counter is empty. // TODO(https://bugs.webrtc.org/11247): Consider if overflow_ can be reset. } - oldest_time_ = new_oldest_time; } bool RateStatistics::SetWindowSize(int64_t window_size_ms, int64_t now_ms) { if (window_size_ms <= 0 || window_size_ms > max_window_size_ms_) return false; + if (first_timestamp_ != -1) { + // If the window changes (e.g. decreases - removing data point, then + // increases again) we need to update the first timestamp mark as + // otherwise it indicates the window coveres a region of zeros, suddenly + // under-estimating the rate. + first_timestamp_ = std::max(first_timestamp_, now_ms - window_size_ms + 1); + } current_window_size_ms_ = window_size_ms; EraseOld(now_ms); return true; } -bool RateStatistics::IsInitialized() const { - return oldest_time_ != -max_window_size_ms_; -} - } // namespace webrtc diff --git a/rtc_base/rate_statistics.h b/rtc_base/rate_statistics.h index 11c8cee7af..dc8d7f5272 100644 --- a/rtc_base/rate_statistics.h +++ b/rtc_base/rate_statistics.h @@ -14,6 +14,7 @@ #include <stddef.h> #include <stdint.h> +#include <deque> #include <memory> #include "absl/types/optional.h" @@ -28,6 +29,10 @@ namespace webrtc { // high; for instance, a 20 Mbit/sec video stream can wrap a 32-bit byte // counter in 14 minutes. +// Note that timestamps used in Update(), Rate() and SetWindowSize() must never +// decrease for two consecutive calls. +// TODO(bugs.webrtc.org/11600): Migrate from int64_t to Timestamp. + class RTC_EXPORT RateStatistics { public: static constexpr float kBpsScale = 8000.0f; @@ -65,19 +70,22 @@ class RTC_EXPORT RateStatistics { private: void EraseOld(int64_t now_ms); - bool IsInitialized() const; - // Counters are kept in buckets (circular buffer), with one bucket - // per millisecond. struct Bucket { + explicit Bucket(int64_t timestamp); int64_t sum; // Sum of all samples in this bucket. - int samples; // Number of samples in this bucket. + int num_samples; // Number of samples in this bucket. + const int64_t timestamp; // Timestamp this bucket corresponds to. }; - std::unique_ptr<Bucket[]> buckets_; + // All buckets within the time window, ordered by time. + std::deque<Bucket> buckets_; - // Total count recorded in buckets. + // Total count recorded in all buckets. int64_t accumulated_count_; + // Timestamp of the first data point seen, or -1 of none seen. + int64_t first_timestamp_; + // True if accumulated_count_ has ever grown too large to be // contained in its integer type. bool overflow_ = false; @@ -85,12 +93,6 @@ class RTC_EXPORT RateStatistics { // The total number of samples in the buckets. int num_samples_; - // Oldest time recorded in buckets. - int64_t oldest_time_; - - // Bucket index of oldest counter recorded in buckets. - int64_t oldest_index_; - // To convert counts/ms to desired units const float scale_; diff --git a/rtc_base/signal_thread.h b/rtc_base/signal_thread.h index d9e8ade9b0..b444d54994 100644 --- a/rtc_base/signal_thread.h +++ b/rtc_base/signal_thread.h @@ -1,5 +1,5 @@ /* - * Copyright 2004 The WebRTC Project Authors. All rights reserved. + * Copyright 2020 The WebRTC Project Authors. All rights reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source @@ -11,147 +11,9 @@ #ifndef RTC_BASE_SIGNAL_THREAD_H_ #define RTC_BASE_SIGNAL_THREAD_H_ -#include <string> - -#include "rtc_base/checks.h" -#include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" -#include "rtc_base/message_handler.h" -#include "rtc_base/third_party/sigslot/sigslot.h" -#include "rtc_base/thread.h" -#include "rtc_base/thread_annotations.h" - -namespace rtc { - -/////////////////////////////////////////////////////////////////////////////// -// SignalThread - Base class for worker threads. The main thread should call -// Start() to begin work, and then follow one of these models: -// Normal: Wait for SignalWorkDone, and then call Release to destroy. -// Cancellation: Call Release(true), to abort the worker thread. -// Fire-and-forget: Call Release(false), which allows the thread to run to -// completion, and then self-destruct without further notification. -// Periodic tasks: Wait for SignalWorkDone, then eventually call Start() -// again to repeat the task. When the instance isn't needed anymore, -// call Release. DoWork, OnWorkStart and OnWorkStop are called again, -// on a new thread. -// The subclass should override DoWork() to perform the background task. By -// periodically calling ContinueWork(), it can check for cancellation. -// OnWorkStart and OnWorkDone can be overridden to do pre- or post-work -// tasks in the context of the main thread. -/////////////////////////////////////////////////////////////////////////////// - -class SignalThread : public sigslot::has_slots<>, protected MessageHandler { - public: - SignalThread(); - - // Context: Main Thread. Call before Start to change the worker's name. - bool SetName(const std::string& name, const void* obj); - - // Context: Main Thread. Call to begin the worker thread. - void Start(); - - // Context: Main Thread. If the worker thread is not running, deletes the - // object immediately. Otherwise, asks the worker thread to abort processing, - // and schedules the object to be deleted once the worker exits. - // SignalWorkDone will not be signalled. If wait is true, does not return - // until the thread is deleted. - void Destroy(bool wait); - - // Context: Main Thread. If the worker thread is complete, deletes the - // object immediately. Otherwise, schedules the object to be deleted once - // the worker thread completes. SignalWorkDone will be signalled. - void Release(); - - // Context: Main Thread. Signalled when work is complete. - sigslot::signal1<SignalThread*> SignalWorkDone; - - enum { ST_MSG_WORKER_DONE, ST_MSG_FIRST_AVAILABLE }; - - protected: - ~SignalThread() override; - - Thread* worker() { return &worker_; } - - // Context: Main Thread. Subclass should override to do pre-work setup. - virtual void OnWorkStart() {} - - // Context: Worker Thread. Subclass should override to do work. - virtual void DoWork() = 0; - - // Context: Worker Thread. Subclass should call periodically to - // dispatch messages and determine if the thread should terminate. - bool ContinueWork(); - - // Context: Worker Thread. Subclass should override when extra work is - // needed to abort the worker thread. - virtual void OnWorkStop() {} - - // Context: Main Thread. Subclass should override to do post-work cleanup. - virtual void OnWorkDone() {} - - // Context: Any Thread. If subclass overrides, be sure to call the base - // implementation. Do not use (message_id < ST_MSG_FIRST_AVAILABLE) - void OnMessage(Message* msg) override; - - private: - enum State { - kInit, // Initialized, but not started - kRunning, // Started and doing work - kReleasing, // Same as running, but to be deleted when work is done - kComplete, // Work is done - kStopping, // Work is being interrupted - }; - - class Worker : public Thread { - public: - explicit Worker(SignalThread* parent); - ~Worker() override; - void Run() override; - bool IsProcessingMessagesForTesting() override; - - private: - SignalThread* parent_; - - RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(Worker); - }; - - class RTC_SCOPED_LOCKABLE EnterExit { - public: - explicit EnterExit(SignalThread* t) RTC_EXCLUSIVE_LOCK_FUNCTION(t->cs_) - : t_(t) { - t_->cs_.Enter(); - // If refcount_ is zero then the object has already been deleted and we - // will be double-deleting it in ~EnterExit()! (shouldn't happen) - RTC_DCHECK_NE(0, t_->refcount_); - ++t_->refcount_; - } - ~EnterExit() RTC_UNLOCK_FUNCTION() { - bool d = (0 == --t_->refcount_); - t_->cs_.Leave(); - if (d) - delete t_; - } - - private: - SignalThread* t_; - - RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(EnterExit); - }; - - void Run(); - void OnMainThreadDestroyed(); - - Thread* main_; - Worker worker_; - CriticalSection cs_; - State state_; - int refcount_; - - RTC_DISALLOW_COPY_AND_ASSIGN(SignalThread); -}; - -/////////////////////////////////////////////////////////////////////////////// - -} // namespace rtc +// The facilities in this file have been deprecated. Please do not use them +// in new code. New code should use factilities exposed by api/task_queue/ +// instead. +#include "rtc_base/deprecated/signal_thread.h" #endif // RTC_BASE_SIGNAL_THREAD_H_ diff --git a/rtc_base/ssl_adapter_unittest.cc b/rtc_base/ssl_adapter_unittest.cc index 125b4bd50d..498eba312b 100644 --- a/rtc_base/ssl_adapter_unittest.cc +++ b/rtc_base/ssl_adapter_unittest.cc @@ -50,7 +50,7 @@ static std::string GetSSLProtocolName(const rtc::SSLMode& ssl_mode) { class MockCertVerifier : public rtc::SSLCertificateVerifier { public: virtual ~MockCertVerifier() = default; - MOCK_METHOD1(Verify, bool(const rtc::SSLCertificate&)); + MOCK_METHOD(bool, Verify, (const rtc::SSLCertificate&), (override)); }; // TODO(benwright) - Move to using INSTANTIATE_TEST_SUITE_P instead of using diff --git a/rtc_base/stream.h b/rtc_base/stream.h index bfb9dc2c41..dc77a7111c 100644 --- a/rtc_base/stream.h +++ b/rtc_base/stream.h @@ -15,7 +15,6 @@ #include "rtc_base/buffer.h" #include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" #include "rtc_base/message_handler.h" #include "rtc_base/system/rtc_export.h" #include "rtc_base/third_party/sigslot/sigslot.h" diff --git a/rtc_base/strings/string_builder_unittest.cc b/rtc_base/strings/string_builder_unittest.cc index 84717ad1d1..99dfd86292 100644 --- a/rtc_base/strings/string_builder_unittest.cc +++ b/rtc_base/strings/string_builder_unittest.cc @@ -59,7 +59,7 @@ TEST(SimpleStringBuilder, StdString) { // off. #if (GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)) || !RTC_DCHECK_IS_ON -TEST(SimpleStringBuilder, BufferOverrunConstCharP) { +TEST(SimpleStringBuilderDeathTest, BufferOverrunConstCharP) { char sb_buf[4]; SimpleStringBuilder sb(sb_buf); const char* const msg = "This is just too much"; @@ -71,7 +71,7 @@ TEST(SimpleStringBuilder, BufferOverrunConstCharP) { #endif } -TEST(SimpleStringBuilder, BufferOverrunStdString) { +TEST(SimpleStringBuilderDeathTest, BufferOverrunStdString) { char sb_buf[4]; SimpleStringBuilder sb(sb_buf); sb << 12; @@ -84,7 +84,7 @@ TEST(SimpleStringBuilder, BufferOverrunStdString) { #endif } -TEST(SimpleStringBuilder, BufferOverrunInt) { +TEST(SimpleStringBuilderDeathTest, BufferOverrunInt) { char sb_buf[4]; SimpleStringBuilder sb(sb_buf); constexpr int num = -12345; @@ -100,7 +100,7 @@ TEST(SimpleStringBuilder, BufferOverrunInt) { #endif } -TEST(SimpleStringBuilder, BufferOverrunDouble) { +TEST(SimpleStringBuilderDeathTest, BufferOverrunDouble) { char sb_buf[5]; SimpleStringBuilder sb(sb_buf); constexpr double num = 123.456; @@ -113,7 +113,7 @@ TEST(SimpleStringBuilder, BufferOverrunDouble) { #endif } -TEST(SimpleStringBuilder, BufferOverrunConstCharPAlreadyFull) { +TEST(SimpleStringBuilderDeathTest, BufferOverrunConstCharPAlreadyFull) { char sb_buf[4]; SimpleStringBuilder sb(sb_buf); sb << 123; @@ -126,7 +126,7 @@ TEST(SimpleStringBuilder, BufferOverrunConstCharPAlreadyFull) { #endif } -TEST(SimpleStringBuilder, BufferOverrunIntAlreadyFull) { +TEST(SimpleStringBuilderDeathTest, BufferOverrunIntAlreadyFull) { char sb_buf[4]; SimpleStringBuilder sb(sb_buf); sb << "xyz"; diff --git a/rtc_base/swap_queue_unittest.cc b/rtc_base/swap_queue_unittest.cc index 199ac6b185..3862d850fa 100644 --- a/rtc_base/swap_queue_unittest.cc +++ b/rtc_base/swap_queue_unittest.cc @@ -135,7 +135,7 @@ TEST(SwapQueueTest, SuccessfulItemVerifyFunctor) { } #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) -TEST(SwapQueueTest, UnsuccessfulItemVerifyFunctor) { +TEST(SwapQueueDeathTest, UnsuccessfulItemVerifyFunctor) { // Queue item verifier for the test. auto minus_2_verifier = [](const int& i) { return i > -2; }; SwapQueue<int, decltype(minus_2_verifier)> queue(2, minus_2_verifier); @@ -148,7 +148,7 @@ TEST(SwapQueueTest, UnsuccessfulItemVerifyFunctor) { EXPECT_DEATH(result = queue.Insert(&invalid_value), ""); } -TEST(SwapQueueTest, UnSuccessfulItemVerifyInsert) { +TEST(SwapQueueDeathTest, UnSuccessfulItemVerifyInsert) { std::vector<int> template_element(kChunkSize); SwapQueue<std::vector<int>, SwapQueueItemVerifier<std::vector<int>, &LengthVerifierFunction>> @@ -158,7 +158,7 @@ TEST(SwapQueueTest, UnSuccessfulItemVerifyInsert) { EXPECT_DEATH(result = queue.Insert(&invalid_chunk), ""); } -TEST(SwapQueueTest, UnSuccessfulItemVerifyRemove) { +TEST(SwapQueueDeathTest, UnSuccessfulItemVerifyRemove) { std::vector<int> template_element(kChunkSize); SwapQueue<std::vector<int>, SwapQueueItemVerifier<std::vector<int>, &LengthVerifierFunction>> diff --git a/rtc_base/synchronization/BUILD.gn b/rtc_base/synchronization/BUILD.gn index 3e7b22d4f9..a79a0486af 100644 --- a/rtc_base/synchronization/BUILD.gn +++ b/rtc_base/synchronization/BUILD.gn @@ -12,6 +12,38 @@ if (is_android) { import("//build/config/android/rules.gni") } +rtc_library("yield") { + sources = [ + "yield.cc", + "yield.h", + ] + deps = [] +} + +rtc_library("mutex") { + sources = [ + "mutex.cc", + "mutex.h", + "mutex_critical_section.h", + "mutex_pthread.h", + ] + if (rtc_use_absl_mutex) { + sources += [ "mutex_abseil.h" ] + } + + deps = [ + ":yield", + "..:checks", + "..:macromagic", + "..:platform_thread_types", + "../system:unused", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/base:core_headers" ] + if (rtc_use_absl_mutex) { + absl_deps += [ "//third_party/abseil-cpp/absl/synchronization" ] + } +} + rtc_library("rw_lock_wrapper") { public = [ "rw_lock_wrapper.h" ] sources = [ "rw_lock_wrapper.cc" ] @@ -36,10 +68,12 @@ rtc_library("sequence_checker") { "sequence_checker.h", ] deps = [ + ":mutex", "..:checks", "..:criticalsection", "..:macromagic", "..:platform_thread_types", + "..:stringutils", "../../api/task_queue", "../system:rtc_export", ] @@ -50,8 +84,8 @@ rtc_library("yield_policy") { "yield_policy.cc", "yield_policy.h", ] - deps = [ - "..:checks", + deps = [ "..:checks" ] + absl_deps = [ "//third_party/abseil-cpp/absl/base:config", "//third_party/abseil-cpp/absl/base:core_headers", ] @@ -60,11 +94,30 @@ rtc_library("yield_policy") { if (rtc_include_tests) { rtc_library("synchronization_unittests") { testonly = true - sources = [ "yield_policy_unittest.cc" ] + sources = [ + "mutex_unittest.cc", + "yield_policy_unittest.cc", + ] deps = [ + ":mutex", + ":yield", ":yield_policy", + "..:checks", + "..:macromagic", + "..:rtc_base", "..:rtc_event", "../../test:test_support", + "//third_party/google_benchmark", + ] + } + + rtc_library("mutex_benchmark") { + testonly = true + sources = [ "mutex_benchmark.cc" ] + deps = [ + ":mutex", + "../system:unused", + "//third_party/google_benchmark", ] } diff --git a/rtc_base/synchronization/DEPS b/rtc_base/synchronization/DEPS new file mode 100644 index 0000000000..4ed1f2444b --- /dev/null +++ b/rtc_base/synchronization/DEPS @@ -0,0 +1,11 @@ +specific_include_rules = { + "mutex_abseil\.h": [ + "+absl/synchronization" + ], + ".*_benchmark\.cc": [ + "+benchmark", + ], + ".*_unittest\.cc": [ + "+benchmark", + ] +} diff --git a/rtc_base/synchronization/mutex.cc b/rtc_base/synchronization/mutex.cc new file mode 100644 index 0000000000..6c2d6ff7f0 --- /dev/null +++ b/rtc_base/synchronization/mutex.cc @@ -0,0 +1,39 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/synchronization/mutex.h" + +#include "rtc_base/checks.h" +#include "rtc_base/synchronization/yield.h" + +namespace webrtc { + +#if !defined(WEBRTC_ABSL_MUTEX) +void GlobalMutex::Lock() { + while (mutex_locked_.exchange(1)) { + YieldCurrentThread(); + } +} + +void GlobalMutex::Unlock() { + int old = mutex_locked_.exchange(0); + RTC_DCHECK_EQ(old, 1) << "Unlock called without calling Lock first"; +} + +GlobalMutexLock::GlobalMutexLock(GlobalMutex* mutex) : mutex_(mutex) { + mutex_->Lock(); +} + +GlobalMutexLock::~GlobalMutexLock() { + mutex_->Unlock(); +} +#endif // #if !defined(WEBRTC_ABSL_MUTEX) + +} // namespace webrtc diff --git a/rtc_base/synchronization/mutex.h b/rtc_base/synchronization/mutex.h new file mode 100644 index 0000000000..1ccbbdcbd5 --- /dev/null +++ b/rtc_base/synchronization/mutex.h @@ -0,0 +1,146 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_SYNCHRONIZATION_MUTEX_H_ +#define RTC_BASE_SYNCHRONIZATION_MUTEX_H_ + +#include <atomic> + +#include "absl/base/const_init.h" +#include "rtc_base/checks.h" +#include "rtc_base/platform_thread_types.h" +#include "rtc_base/system/unused.h" +#include "rtc_base/thread_annotations.h" + +#if defined(WEBRTC_ABSL_MUTEX) +#include "rtc_base/synchronization/mutex_abseil.h" // nogncheck +#elif defined(WEBRTC_WIN) +#include "rtc_base/synchronization/mutex_critical_section.h" +#elif defined(WEBRTC_POSIX) +#include "rtc_base/synchronization/mutex_pthread.h" +#else +#error Unsupported platform. +#endif + +namespace webrtc { + +// The Mutex guarantees exclusive access and aims to follow Abseil semantics +// (i.e. non-reentrant etc). +class RTC_LOCKABLE Mutex final { + public: + Mutex() = default; + Mutex(const Mutex&) = delete; + Mutex& operator=(const Mutex&) = delete; + + void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { + rtc::PlatformThreadRef current = CurrentThreadRefAssertingNotBeingHolder(); + impl_.Lock(); + // |holder_| changes from 0 to CurrentThreadRef(). + holder_.store(current, std::memory_order_relaxed); + } + RTC_WARN_UNUSED_RESULT bool TryLock() RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) { + rtc::PlatformThreadRef current = CurrentThreadRefAssertingNotBeingHolder(); + if (impl_.TryLock()) { + // |holder_| changes from 0 to CurrentThreadRef(). + holder_.store(current, std::memory_order_relaxed); + return true; + } + return false; + } + void Unlock() RTC_UNLOCK_FUNCTION() { + // |holder_| changes from CurrentThreadRef() to 0. If something else than + // CurrentThreadRef() is stored in |holder_|, the Unlock results in + // undefined behavior as mutexes can't be unlocked from another thread than + // the one that locked it, or called while not being locked. + holder_.store(0, std::memory_order_relaxed); + impl_.Unlock(); + } + + private: + rtc::PlatformThreadRef CurrentThreadRefAssertingNotBeingHolder() { + rtc::PlatformThreadRef holder = holder_.load(std::memory_order_relaxed); + rtc::PlatformThreadRef current = rtc::CurrentThreadRef(); + // TODO(bugs.webrtc.org/11567): remove this temporary check after migrating + // fully to Mutex. + RTC_CHECK_NE(holder, current); + return current; + } + + MutexImpl impl_; + // TODO(bugs.webrtc.org/11567): remove |holder_| after migrating fully to + // Mutex. + // |holder_| contains the PlatformThreadRef of the thread currently holding + // the lock, or 0. + // Remarks on the used memory orders: the atomic load in + // CurrentThreadRefAssertingNotBeingHolder() observes either of two things: + // 1. our own previous write to holder_ with our thread ID. + // 2. another thread (with ID y) writing y and then 0 from an initial value of + // 0. If we're observing case 1, our own stores are obviously ordered before + // the load, and hit the CHECK. If we're observing case 2, the value observed + // w.r.t |impl_| being locked depends on the memory order. Since we only care + // that it's different from CurrentThreadRef()), we use the more performant + // option, memory_order_relaxed. + std::atomic<rtc::PlatformThreadRef> holder_ = {0}; +}; + +// MutexLock, for serializing execution through a scope. +class RTC_SCOPED_LOCKABLE MutexLock final { + public: + MutexLock(const MutexLock&) = delete; + MutexLock& operator=(const MutexLock&) = delete; + + explicit MutexLock(Mutex* mutex) RTC_EXCLUSIVE_LOCK_FUNCTION(mutex) + : mutex_(mutex) { + mutex->Lock(); + } + ~MutexLock() RTC_UNLOCK_FUNCTION() { mutex_->Unlock(); } + + private: + Mutex* mutex_; +}; + +// A mutex used to protect global variables. Do NOT use for other purposes. +#if defined(WEBRTC_ABSL_MUTEX) +using GlobalMutex = absl::Mutex; +using GlobalMutexLock = absl::MutexLock; +#else +class RTC_LOCKABLE GlobalMutex final { + public: + GlobalMutex(const GlobalMutex&) = delete; + GlobalMutex& operator=(const GlobalMutex&) = delete; + + constexpr explicit GlobalMutex(absl::ConstInitType /*unused*/) + : mutex_locked_(0) {} + + void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION(); + void Unlock() RTC_UNLOCK_FUNCTION(); + + private: + std::atomic<int> mutex_locked_; // 0 means lock not taken, 1 means taken. +}; + +// GlobalMutexLock, for serializing execution through a scope. +class RTC_SCOPED_LOCKABLE GlobalMutexLock final { + public: + GlobalMutexLock(const GlobalMutexLock&) = delete; + GlobalMutexLock& operator=(const GlobalMutexLock&) = delete; + + explicit GlobalMutexLock(GlobalMutex* mutex) + RTC_EXCLUSIVE_LOCK_FUNCTION(mutex_); + ~GlobalMutexLock() RTC_UNLOCK_FUNCTION(); + + private: + GlobalMutex* mutex_; +}; +#endif // if defined(WEBRTC_ABSL_MUTEX) + +} // namespace webrtc + +#endif // RTC_BASE_SYNCHRONIZATION_MUTEX_H_ diff --git a/rtc_base/synchronization/mutex_abseil.h b/rtc_base/synchronization/mutex_abseil.h new file mode 100644 index 0000000000..4ad1d07eef --- /dev/null +++ b/rtc_base/synchronization/mutex_abseil.h @@ -0,0 +1,37 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_SYNCHRONIZATION_MUTEX_ABSEIL_H_ +#define RTC_BASE_SYNCHRONIZATION_MUTEX_ABSEIL_H_ + +#include "absl/synchronization/mutex.h" +#include "rtc_base/thread_annotations.h" + +namespace webrtc { + +class RTC_LOCKABLE MutexImpl final { + public: + MutexImpl() = default; + MutexImpl(const MutexImpl&) = delete; + MutexImpl& operator=(const MutexImpl&) = delete; + + void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { mutex_.Lock(); } + RTC_WARN_UNUSED_RESULT bool TryLock() RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) { + return mutex_.TryLock(); + } + void Unlock() RTC_UNLOCK_FUNCTION() { mutex_.Unlock(); } + + private: + absl::Mutex mutex_; +}; + +} // namespace webrtc + +#endif // RTC_BASE_SYNCHRONIZATION_MUTEX_ABSEIL_H_ diff --git a/rtc_base/synchronization/mutex_benchmark.cc b/rtc_base/synchronization/mutex_benchmark.cc new file mode 100644 index 0000000000..40adca65d8 --- /dev/null +++ b/rtc_base/synchronization/mutex_benchmark.cc @@ -0,0 +1,95 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "benchmark/benchmark.h" +#include "rtc_base/synchronization/mutex.h" +#include "rtc_base/system/unused.h" + +namespace webrtc { + +class PerfTestData { + public: + PerfTestData() : cache_line_barrier_1_(), cache_line_barrier_2_() { + cache_line_barrier_1_[0]++; // Avoid 'is not used'. + cache_line_barrier_2_[0]++; // Avoid 'is not used'. + } + + int AddToCounter(int add) { + MutexLock mu(&mu_); + my_counter_ += add; + return 0; + } + + private: + uint8_t cache_line_barrier_1_[64]; + Mutex mu_; + uint8_t cache_line_barrier_2_[64]; + int64_t my_counter_ = 0; +}; + +void BM_LockWithMutex(benchmark::State& state) { + static PerfTestData test_data; + for (auto s : state) { + RTC_UNUSED(s); + benchmark::DoNotOptimize(test_data.AddToCounter(2)); + } +} + +BENCHMARK(BM_LockWithMutex)->Threads(1); +BENCHMARK(BM_LockWithMutex)->Threads(2); +BENCHMARK(BM_LockWithMutex)->Threads(4); +BENCHMARK(BM_LockWithMutex)->ThreadPerCpu(); + +} // namespace webrtc + +/* + +Results: + +NB when reproducing: Remember to turn of power management features such as CPU +scaling before running! + +pthreads (Linux): +---------------------------------------------------------------------- +Run on (12 X 4500 MHz CPU s) +CPU Caches: + L1 Data 32 KiB (x6) + L1 Instruction 32 KiB (x6) + L2 Unified 1024 KiB (x6) + L3 Unified 8448 KiB (x1) +Load Average: 0.26, 0.28, 0.44 +---------------------------------------------------------------------- +Benchmark Time CPU Iterations +---------------------------------------------------------------------- +BM_LockWithMutex/threads:1 13.4 ns 13.4 ns 52192906 +BM_LockWithMutex/threads:2 44.2 ns 88.4 ns 8189944 +BM_LockWithMutex/threads:4 52.0 ns 198 ns 3743244 +BM_LockWithMutex/threads:12 84.9 ns 944 ns 733524 + +std::mutex performs like the pthread implementation (Linux). + +Abseil (Linux): +---------------------------------------------------------------------- +Run on (12 X 4500 MHz CPU s) +CPU Caches: + L1 Data 32 KiB (x6) + L1 Instruction 32 KiB (x6) + L2 Unified 1024 KiB (x6) + L3 Unified 8448 KiB (x1) +Load Average: 0.27, 0.24, 0.37 +---------------------------------------------------------------------- +Benchmark Time CPU Iterations +---------------------------------------------------------------------- +BM_LockWithMutex/threads:1 15.0 ns 15.0 ns 46550231 +BM_LockWithMutex/threads:2 91.1 ns 182 ns 4059212 +BM_LockWithMutex/threads:4 40.8 ns 131 ns 5496560 +BM_LockWithMutex/threads:12 37.0 ns 130 ns 5377668 + +*/ diff --git a/rtc_base/synchronization/mutex_critical_section.h b/rtc_base/synchronization/mutex_critical_section.h new file mode 100644 index 0000000000..d206794988 --- /dev/null +++ b/rtc_base/synchronization/mutex_critical_section.h @@ -0,0 +1,54 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_SYNCHRONIZATION_MUTEX_CRITICAL_SECTION_H_ +#define RTC_BASE_SYNCHRONIZATION_MUTEX_CRITICAL_SECTION_H_ + +#if defined(WEBRTC_WIN) +// clang-format off +// clang formating would change include order. + +// Include winsock2.h before including <windows.h> to maintain consistency with +// win32.h. To include win32.h directly, it must be broken out into its own +// build target. +#include <winsock2.h> +#include <windows.h> +#include <sal.h> // must come after windows headers. +// clang-format on + +#include "rtc_base/thread_annotations.h" + +namespace webrtc { + +class RTC_LOCKABLE MutexImpl final { + public: + MutexImpl() { InitializeCriticalSection(&critical_section_); } + MutexImpl(const MutexImpl&) = delete; + MutexImpl& operator=(const MutexImpl&) = delete; + ~MutexImpl() { DeleteCriticalSection(&critical_section_); } + + void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { + EnterCriticalSection(&critical_section_); + } + RTC_WARN_UNUSED_RESULT bool TryLock() RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) { + return TryEnterCriticalSection(&critical_section_) != FALSE; + } + void Unlock() RTC_UNLOCK_FUNCTION() { + LeaveCriticalSection(&critical_section_); + } + + private: + CRITICAL_SECTION critical_section_; +}; + +} // namespace webrtc + +#endif // #if defined(WEBRTC_WIN) +#endif // RTC_BASE_SYNCHRONIZATION_MUTEX_CRITICAL_SECTION_H_ diff --git a/rtc_base/synchronization/mutex_pthread.h b/rtc_base/synchronization/mutex_pthread.h new file mode 100644 index 0000000000..c9496e72c9 --- /dev/null +++ b/rtc_base/synchronization/mutex_pthread.h @@ -0,0 +1,53 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_SYNCHRONIZATION_MUTEX_PTHREAD_H_ +#define RTC_BASE_SYNCHRONIZATION_MUTEX_PTHREAD_H_ + +#if defined(WEBRTC_POSIX) + +#include <pthread.h> +#if defined(WEBRTC_MAC) +#include <pthread_spis.h> +#endif + +#include "rtc_base/thread_annotations.h" + +namespace webrtc { + +class RTC_LOCKABLE MutexImpl final { + public: + MutexImpl() { + pthread_mutexattr_t mutex_attribute; + pthread_mutexattr_init(&mutex_attribute); +#if defined(WEBRTC_MAC) + pthread_mutexattr_setpolicy_np(&mutex_attribute, + _PTHREAD_MUTEX_POLICY_FIRSTFIT); +#endif + pthread_mutex_init(&mutex_, &mutex_attribute); + pthread_mutexattr_destroy(&mutex_attribute); + } + MutexImpl(const MutexImpl&) = delete; + MutexImpl& operator=(const MutexImpl&) = delete; + ~MutexImpl() { pthread_mutex_destroy(&mutex_); } + + void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { pthread_mutex_lock(&mutex_); } + RTC_WARN_UNUSED_RESULT bool TryLock() RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) { + return pthread_mutex_trylock(&mutex_) == 0; + } + void Unlock() RTC_UNLOCK_FUNCTION() { pthread_mutex_unlock(&mutex_); } + + private: + pthread_mutex_t mutex_; +}; + +} // namespace webrtc +#endif // #if defined(WEBRTC_POSIX) +#endif // RTC_BASE_SYNCHRONIZATION_MUTEX_PTHREAD_H_ diff --git a/rtc_base/synchronization/mutex_unittest.cc b/rtc_base/synchronization/mutex_unittest.cc new file mode 100644 index 0000000000..6a930bc042 --- /dev/null +++ b/rtc_base/synchronization/mutex_unittest.cc @@ -0,0 +1,206 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/synchronization/mutex.h" + +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +#include <memory> +#include <type_traits> +#include <utility> +#include <vector> + +#include "benchmark/benchmark.h" +#include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/location.h" +#include "rtc_base/message_handler.h" +#include "rtc_base/platform_thread.h" +#include "rtc_base/synchronization/yield.h" +#include "rtc_base/thread.h" +#include "test/gtest.h" + +namespace webrtc { +namespace { + +using ::rtc::Event; +using ::rtc::Message; +using ::rtc::MessageHandler; +using ::rtc::Thread; + +constexpr int kNumThreads = 16; + +template <class MutexType> +class RTC_LOCKABLE RawMutexLocker { + public: + explicit RawMutexLocker(MutexType& mutex) : mutex_(mutex) {} + void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { mutex_.Lock(); } + void Unlock() RTC_UNLOCK_FUNCTION() { mutex_.Unlock(); } + + private: + MutexType& mutex_; +}; + +class RTC_LOCKABLE RawMutexTryLocker { + public: + explicit RawMutexTryLocker(Mutex& mutex) : mutex_(mutex) {} + void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { + while (!mutex_.TryLock()) { + YieldCurrentThread(); + } + } + void Unlock() RTC_UNLOCK_FUNCTION() { mutex_.Unlock(); } + + private: + Mutex& mutex_; +}; + +template <class MutexType, class MutexLockType> +class MutexLockLocker { + public: + explicit MutexLockLocker(MutexType& mutex) : mutex_(mutex) {} + void Lock() { lock_ = std::make_unique<MutexLockType>(&mutex_); } + void Unlock() { lock_ = nullptr; } + + private: + MutexType& mutex_; + std::unique_ptr<MutexLockType> lock_; +}; + +template <class MutexType, class MutexLocker> +class LockRunner : public MessageHandler { + public: + template <typename... Args> + explicit LockRunner(Args... args) + : threads_active_(0), + start_event_(true, false), + done_event_(true, false), + shared_value_(0), + mutex_(args...), + locker_(mutex_) {} + + bool Run() { + // Signal all threads to start. + start_event_.Set(); + + // Wait for all threads to finish. + return done_event_.Wait(kLongTime); + } + + void SetExpectedThreadCount(int count) { threads_active_ = count; } + + int shared_value() { + int shared_value; + locker_.Lock(); + shared_value = shared_value_; + locker_.Unlock(); + return shared_value_; + } + + void OnMessage(Message* msg) override { + ASSERT_TRUE(start_event_.Wait(kLongTime)); + locker_.Lock(); + + EXPECT_EQ(0, shared_value_); + int old = shared_value_; + + // Use a loop to increase the chance of race. If the |locker_| + // implementation is faulty, it would be improbable that the error slips + // through. + for (int i = 0; i < kOperationsToRun; ++i) { + benchmark::DoNotOptimize(++shared_value_); + } + EXPECT_EQ(old + kOperationsToRun, shared_value_); + shared_value_ = 0; + + locker_.Unlock(); + if (threads_active_.fetch_sub(1) == 1) { + done_event_.Set(); + } + } + + private: + static constexpr int kLongTime = 10000; // 10 seconds + static constexpr int kOperationsToRun = 1000; + + std::atomic<int> threads_active_; + Event start_event_; + Event done_event_; + int shared_value_; + MutexType mutex_; + MutexLocker locker_; +}; + +void StartThreads(std::vector<std::unique_ptr<Thread>>& threads, + MessageHandler* handler) { + for (int i = 0; i < kNumThreads; ++i) { + std::unique_ptr<Thread> thread(Thread::Create()); + thread->Start(); + thread->Post(RTC_FROM_HERE, handler); + threads.push_back(std::move(thread)); + } +} + +TEST(MutexTest, ProtectsSharedResourceWithMutexAndRawMutexLocker) { + std::vector<std::unique_ptr<Thread>> threads; + LockRunner<Mutex, RawMutexLocker<Mutex>> runner; + StartThreads(threads, &runner); + runner.SetExpectedThreadCount(kNumThreads); + EXPECT_TRUE(runner.Run()); + EXPECT_EQ(0, runner.shared_value()); +} + +TEST(MutexTest, ProtectsSharedResourceWithMutexAndRawMutexTryLocker) { + std::vector<std::unique_ptr<Thread>> threads; + LockRunner<Mutex, RawMutexTryLocker> runner; + StartThreads(threads, &runner); + runner.SetExpectedThreadCount(kNumThreads); + EXPECT_TRUE(runner.Run()); + EXPECT_EQ(0, runner.shared_value()); +} + +TEST(MutexTest, ProtectsSharedResourceWithMutexAndMutexLocker) { + std::vector<std::unique_ptr<Thread>> threads; + LockRunner<Mutex, MutexLockLocker<Mutex, MutexLock>> runner; + StartThreads(threads, &runner); + runner.SetExpectedThreadCount(kNumThreads); + EXPECT_TRUE(runner.Run()); + EXPECT_EQ(0, runner.shared_value()); +} + +TEST(MutexTest, ProtectsSharedResourceWithGlobalMutexAndRawMutexLocker) { + std::vector<std::unique_ptr<Thread>> threads; + LockRunner<GlobalMutex, RawMutexLocker<GlobalMutex>> runner(absl::kConstInit); + StartThreads(threads, &runner); + runner.SetExpectedThreadCount(kNumThreads); + EXPECT_TRUE(runner.Run()); + EXPECT_EQ(0, runner.shared_value()); +} + +TEST(MutexTest, ProtectsSharedResourceWithGlobalMutexAndMutexLocker) { + std::vector<std::unique_ptr<Thread>> threads; + LockRunner<GlobalMutex, MutexLockLocker<GlobalMutex, GlobalMutexLock>> runner( + absl::kConstInit); + StartThreads(threads, &runner); + runner.SetExpectedThreadCount(kNumThreads); + EXPECT_TRUE(runner.Run()); + EXPECT_EQ(0, runner.shared_value()); +} + +TEST(MutexTest, GlobalMutexCanHaveStaticStorageDuration) { + ABSL_CONST_INIT static GlobalMutex global_lock(absl::kConstInit); + global_lock.Lock(); + global_lock.Unlock(); +} + +} // namespace +} // namespace webrtc diff --git a/rtc_base/synchronization/sequence_checker.cc b/rtc_base/synchronization/sequence_checker.cc index d64f32a616..1de26cf0fe 100644 --- a/rtc_base/synchronization/sequence_checker.cc +++ b/rtc_base/synchronization/sequence_checker.cc @@ -13,6 +13,8 @@ #include <dispatch/dispatch.h> #endif +#include "rtc_base/strings/string_builder.h" + namespace webrtc { namespace { // On Mac, returns the label of the current dispatch queue; elsewhere, return @@ -24,8 +26,16 @@ const void* GetSystemQueueRef() { return nullptr; #endif } + } // namespace +std::string ExpectationToString(const webrtc::SequenceChecker* checker) { +#if RTC_DCHECK_IS_ON + return checker->ExpectationToString(); +#endif + return std::string(); +} + SequenceCheckerImpl::SequenceCheckerImpl() : attached_(true), valid_thread_(rtc::CurrentThreadRef()), @@ -38,7 +48,7 @@ bool SequenceCheckerImpl::IsCurrent() const { const TaskQueueBase* const current_queue = TaskQueueBase::Current(); const rtc::PlatformThreadRef current_thread = rtc::CurrentThreadRef(); const void* const current_system_queue = GetSystemQueueRef(); - rtc::CritScope scoped_lock(&lock_); + MutexLock scoped_lock(&lock_); if (!attached_) { // Previously detached. attached_ = true; valid_thread_ = current_thread; @@ -56,10 +66,47 @@ bool SequenceCheckerImpl::IsCurrent() const { } void SequenceCheckerImpl::Detach() { - rtc::CritScope scoped_lock(&lock_); + MutexLock scoped_lock(&lock_); attached_ = false; // We don't need to touch the other members here, they will be // reset on the next call to IsCurrent(). } +#if RTC_DCHECK_IS_ON +std::string SequenceCheckerImpl::ExpectationToString() const { + const TaskQueueBase* const current_queue = TaskQueueBase::Current(); + const rtc::PlatformThreadRef current_thread = rtc::CurrentThreadRef(); + const void* const current_system_queue = GetSystemQueueRef(); + MutexLock scoped_lock(&lock_); + if (!attached_) + return "Checker currently not attached."; + + // The format of the string is meant to compliment the one we have inside of + // FatalLog() (checks.cc). Example: + // + // # Expected: TQ: 0x0 SysQ: 0x7fff69541330 Thread: 0x11dcf6dc0 + // # Actual: TQ: 0x7fa8f0604190 SysQ: 0x7fa8f0604a30 Thread: 0x700006f1a000 + // TaskQueue doesn't match + + rtc::StringBuilder message; + message.AppendFormat( + "# Expected: TQ: %p SysQ: %p Thread: %p\n" + "# Actual: TQ: %p SysQ: %p Thread: %p\n", + valid_queue_, valid_system_queue_, + reinterpret_cast<const void*>(valid_thread_), current_queue, + current_system_queue, reinterpret_cast<const void*>(current_thread)); + + if ((valid_queue_ || current_queue) && valid_queue_ != current_queue) { + message << "TaskQueue doesn't match\n"; + } else if (valid_system_queue_ && + valid_system_queue_ != current_system_queue) { + message << "System queue doesn't match\n"; + } else if (!rtc::IsThreadRefEqual(valid_thread_, current_thread)) { + message << "Threads don't match\n"; + } + + return message.Release(); +} +#endif // RTC_DCHECK_IS_ON + } // namespace webrtc diff --git a/rtc_base/synchronization/sequence_checker.h b/rtc_base/synchronization/sequence_checker.h index fe644fa14e..ecf8490cec 100644 --- a/rtc_base/synchronization/sequence_checker.h +++ b/rtc_base/synchronization/sequence_checker.h @@ -10,9 +10,11 @@ #ifndef RTC_BASE_SYNCHRONIZATION_SEQUENCE_CHECKER_H_ #define RTC_BASE_SYNCHRONIZATION_SEQUENCE_CHECKER_H_ +#include <type_traits> + #include "api/task_queue/task_queue_base.h" -#include "rtc_base/critical_section.h" #include "rtc_base/platform_thread_types.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/rtc_export.h" #include "rtc_base/thread_annotations.h" @@ -34,8 +36,13 @@ class RTC_EXPORT SequenceCheckerImpl { // used exclusively on another thread. void Detach(); + // Returns a string that is formatted to match with the error string printed + // by RTC_CHECK() when a condition is not met. + // This is used in conjunction with the RTC_DCHECK_RUN_ON() macro. + std::string ExpectationToString() const; + private: - rtc::CriticalSection lock_; + mutable Mutex lock_; // These are mutable so that IsCurrent can set them. mutable bool attached_ RTC_GUARDED_BY(lock_); mutable rtc::PlatformThreadRef valid_thread_ RTC_GUARDED_BY(lock_); @@ -162,8 +169,19 @@ class RTC_SCOPED_LOCKABLE SequenceCheckerScope { #define RTC_RUN_ON(x) \ RTC_THREAD_ANNOTATION_ATTRIBUTE__(exclusive_locks_required(x)) +namespace webrtc { +std::string ExpectationToString(const webrtc::SequenceChecker* checker); + +// Catch-all implementation for types other than explicitly supported above. +template <typename ThreadLikeObject> +std::string ExpectationToString(const ThreadLikeObject*) { + return std::string(); +} + +} // namespace webrtc + #define RTC_DCHECK_RUN_ON(x) \ webrtc::webrtc_seq_check_impl::SequenceCheckerScope seq_check_scope(x); \ - RTC_DCHECK((x)->IsCurrent()) + RTC_DCHECK((x)->IsCurrent()) << webrtc::ExpectationToString(x) #endif // RTC_BASE_SYNCHRONIZATION_SEQUENCE_CHECKER_H_ diff --git a/rtc_base/synchronization/sequence_checker_unittest.cc b/rtc_base/synchronization/sequence_checker_unittest.cc index 1e62e9759b..6fcb522c54 100644 --- a/rtc_base/synchronization/sequence_checker_unittest.cc +++ b/rtc_base/synchronization/sequence_checker_unittest.cc @@ -31,7 +31,7 @@ class CompileTimeTestForGuardedBy { int CalledOnSequence() RTC_RUN_ON(sequence_checker_) { return guarded_; } void CallMeFromSequence() { - RTC_DCHECK_RUN_ON(&sequence_checker_) << "Should be called on sequence"; + RTC_DCHECK_RUN_ON(&sequence_checker_); guarded_ = 41; } @@ -158,7 +158,12 @@ void TestAnnotationsOnWrongQueue() { } #if RTC_DCHECK_IS_ON -TEST(SequenceCheckerTest, TestAnnotationsOnWrongQueueDebug) { +// Note: Ending the test suite name with 'DeathTest' is important as it causes +// gtest to order this test before any other non-death-tests, to avoid potential +// global process state pollution such as shared worker threads being started +// (e.g. a side effect of calling InitCocoaMultiThreading() on Mac causes one or +// two additional threads to be created). +TEST(SequenceCheckerDeathTest, TestAnnotationsOnWrongQueueDebug) { ASSERT_DEATH({ TestAnnotationsOnWrongQueue(); }, ""); } #else diff --git a/rtc_base/synchronization/yield.cc b/rtc_base/synchronization/yield.cc new file mode 100644 index 0000000000..cbb58d12ab --- /dev/null +++ b/rtc_base/synchronization/yield.cc @@ -0,0 +1,36 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/synchronization/yield.h" + +#if defined(WEBRTC_WIN) +#include <windows.h> +#else +#include <sched.h> +#include <time.h> +#endif + +namespace webrtc { + +void YieldCurrentThread() { + // TODO(bugs.webrtc.org/11634): use dedicated OS functionality instead of + // sleep for yielding. +#if defined(WEBRTC_WIN) + ::Sleep(0); +#elif defined(WEBRTC_MAC) && defined(RTC_USE_NATIVE_MUTEX_ON_MAC) && \ + !RTC_USE_NATIVE_MUTEX_ON_MAC + sched_yield(); +#else + static const struct timespec ts_null = {0}; + nanosleep(&ts_null, nullptr); +#endif +} + +} // namespace webrtc diff --git a/rtc_base/synchronization/yield.h b/rtc_base/synchronization/yield.h new file mode 100644 index 0000000000..d4f5f99f37 --- /dev/null +++ b/rtc_base/synchronization/yield.h @@ -0,0 +1,20 @@ +/* + * Copyright 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef RTC_BASE_SYNCHRONIZATION_YIELD_H_ +#define RTC_BASE_SYNCHRONIZATION_YIELD_H_ + +namespace webrtc { + +// Request rescheduling of threads. +void YieldCurrentThread(); + +} // namespace webrtc + +#endif // RTC_BASE_SYNCHRONIZATION_YIELD_H_ diff --git a/rtc_base/synchronization/yield_policy_unittest.cc b/rtc_base/synchronization/yield_policy_unittest.cc index e0c622510a..0bf38f4537 100644 --- a/rtc_base/synchronization/yield_policy_unittest.cc +++ b/rtc_base/synchronization/yield_policy_unittest.cc @@ -20,7 +20,7 @@ namespace rtc { namespace { class MockYieldHandler : public YieldInterface { public: - MOCK_METHOD0(YieldExecution, void()); + MOCK_METHOD(void, YieldExecution, (), (override)); }; } // namespace TEST(YieldPolicyTest, HandlerReceivesYieldSignalWhenSet) { diff --git a/rtc_base/system/BUILD.gn b/rtc_base/system/BUILD.gn index 79cb301038..fdb3f96e00 100644 --- a/rtc_base/system/BUILD.gn +++ b/rtc_base/system/BUILD.gn @@ -58,7 +58,7 @@ if (is_mac || is_ios) { "cocoa_threading.mm", ] deps = [ "..:checks" ] - libs = [ "Foundation.framework" ] + frameworks = [ "Foundation.framework" ] } rtc_library("gcd_helpers") { @@ -72,13 +72,14 @@ if (is_mac || is_ios) { rtc_source_set("thread_registry") { sources = [ "thread_registry.h" ] - deps = [ "..:rtc_base_approved" ] + deps = [ + "..:rtc_base_approved", + "../synchronization:mutex", + ] if (is_android && !build_with_chromium) { sources += [ "thread_registry.cc" ] - deps += [ - "../../sdk/android:native_api_stacktrace", - "//third_party/abseil-cpp/absl/base:core_headers", - ] + deps += [ "../../sdk/android:native_api_stacktrace" ] + absl_deps = [ "//third_party/abseil-cpp/absl/base:core_headers" ] } } diff --git a/rtc_base/system/file_wrapper.h b/rtc_base/system/file_wrapper.h index 24c333a6c3..42c463cb15 100644 --- a/rtc_base/system/file_wrapper.h +++ b/rtc_base/system/file_wrapper.h @@ -14,7 +14,7 @@ #include <stddef.h> #include <stdio.h> -#include "rtc_base/critical_section.h" +#include <string> // Implementation that can read (exclusive) or write from/to a file. diff --git a/rtc_base/system/thread_registry.cc b/rtc_base/system/thread_registry.cc index 86605446c7..b0e83ca1e9 100644 --- a/rtc_base/system/thread_registry.cc +++ b/rtc_base/system/thread_registry.cc @@ -14,9 +14,9 @@ #include <utility> #include "absl/base/attributes.h" -#include "rtc_base/critical_section.h" #include "rtc_base/logging.h" #include "rtc_base/platform_thread_types.h" +#include "rtc_base/synchronization/mutex.h" #include "sdk/android/native_api/stacktrace/stacktrace.h" namespace webrtc { @@ -30,7 +30,7 @@ struct ThreadData { // The map of registered threads, and the lock that protects it. We create the // map on first use, and never destroy it. -ABSL_CONST_INIT rtc::GlobalLock g_thread_registry_lock; +ABSL_CONST_INIT GlobalMutex g_thread_registry_lock(absl::kConstInit); ABSL_CONST_INIT std::map<const ScopedRegisterThreadForDebugging*, ThreadData>* g_registered_threads = nullptr; @@ -38,7 +38,7 @@ ABSL_CONST_INIT std::map<const ScopedRegisterThreadForDebugging*, ThreadData>* ScopedRegisterThreadForDebugging::ScopedRegisterThreadForDebugging( rtc::Location location) { - rtc::GlobalLockScope gls(&g_thread_registry_lock); + GlobalMutexLock gls(&g_thread_registry_lock); if (g_registered_threads == nullptr) { g_registered_threads = new std::map<const ScopedRegisterThreadForDebugging*, ThreadData>(); @@ -49,14 +49,14 @@ ScopedRegisterThreadForDebugging::ScopedRegisterThreadForDebugging( } ScopedRegisterThreadForDebugging::~ScopedRegisterThreadForDebugging() { - rtc::GlobalLockScope gls(&g_thread_registry_lock); + GlobalMutexLock gls(&g_thread_registry_lock); RTC_DCHECK(g_registered_threads != nullptr); const int num_erased = g_registered_threads->erase(this); RTC_DCHECK_EQ(num_erased, 1); } void PrintStackTracesOfRegisteredThreads() { - rtc::GlobalLockScope gls(&g_thread_registry_lock); + GlobalMutexLock gls(&g_thread_registry_lock); if (g_registered_threads == nullptr) { return; } diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index 349a5f21fc..38660cd5a2 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -29,11 +29,11 @@ #include "api/task_queue/task_queue_base.h" #include "base/third_party/libevent/event.h" #include "rtc_base/checks.h" -#include "rtc_base/critical_section.h" #include "rtc_base/logging.h" #include "rtc_base/numerics/safe_conversions.h" #include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread_types.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/time_utils.h" @@ -130,7 +130,7 @@ class TaskQueueLibevent final : public TaskQueueBase { event_base* event_base_; event wakeup_event_; rtc::PlatformThread thread_; - rtc::CriticalSection pending_lock_; + Mutex pending_lock_; absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_ RTC_GUARDED_BY(pending_lock_); // Holds a list of events pending timers for cleanup when the loop exits. @@ -216,7 +216,7 @@ void TaskQueueLibevent::Delete() { void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) { { - rtc::CritScope lock(&pending_lock_); + MutexLock lock(&pending_lock_); bool had_pending_tasks = !pending_.empty(); pending_.push_back(std::move(task)); @@ -282,7 +282,7 @@ void TaskQueueLibevent::OnWakeup(int socket, case kRunTasks: { absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks; { - rtc::CritScope lock(&me->pending_lock_); + MutexLock lock(&me->pending_lock_); tasks.swap(me->pending_); } RTC_DCHECK(!tasks.empty()); diff --git a/rtc_base/task_queue_stdlib.cc b/rtc_base/task_queue_stdlib.cc index 7052f7c6db..5de634512e 100644 --- a/rtc_base/task_queue_stdlib.cc +++ b/rtc_base/task_queue_stdlib.cc @@ -22,10 +22,10 @@ #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" #include "rtc_base/checks.h" -#include "rtc_base/critical_section.h" #include "rtc_base/event.h" #include "rtc_base/logging.h" #include "rtc_base/platform_thread.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/time_utils.h" @@ -97,7 +97,7 @@ class TaskQueueStdlib final : public TaskQueueBase { // tasks (including delayed tasks). rtc::PlatformThread thread_; - rtc::CriticalSection pending_lock_; + Mutex pending_lock_; // Indicates if the worker thread needs to shutdown now. bool thread_should_quit_ RTC_GUARDED_BY(pending_lock_){false}; @@ -135,7 +135,7 @@ void TaskQueueStdlib::Delete() { RTC_DCHECK(!IsCurrent()); { - rtc::CritScope lock(&pending_lock_); + MutexLock lock(&pending_lock_); thread_should_quit_ = true; } @@ -148,7 +148,7 @@ void TaskQueueStdlib::Delete() { void TaskQueueStdlib::PostTask(std::unique_ptr<QueuedTask> task) { { - rtc::CritScope lock(&pending_lock_); + MutexLock lock(&pending_lock_); OrderId order = thread_posting_order_++; pending_queue_.push(std::pair<OrderId, std::unique_ptr<QueuedTask>>( @@ -166,7 +166,7 @@ void TaskQueueStdlib::PostDelayedTask(std::unique_ptr<QueuedTask> task, delay.next_fire_at_ms_ = fire_at; { - rtc::CritScope lock(&pending_lock_); + MutexLock lock(&pending_lock_); delay.order_ = ++thread_posting_order_; delayed_queue_[delay] = std::move(task); } @@ -179,7 +179,7 @@ TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() { auto tick = rtc::TimeMillis(); - rtc::CritScope lock(&pending_lock_); + MutexLock lock(&pending_lock_); if (thread_should_quit_) { result.final_task_ = true; diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc index 8c11b8764a..5eb3776cea 100644 --- a/rtc_base/task_queue_win.cc +++ b/rtc_base/task_queue_win.cc @@ -33,12 +33,12 @@ #include "api/task_queue/task_queue_base.h" #include "rtc_base/arraysize.h" #include "rtc_base/checks.h" -#include "rtc_base/critical_section.h" #include "rtc_base/event.h" #include "rtc_base/logging.h" #include "rtc_base/numerics/safe_conversions.h" #include "rtc_base/platform_thread.h" #include "rtc_base/time_utils.h" +#include "rtc_base/synchronization/mutex.h" namespace webrtc { namespace { @@ -205,7 +205,7 @@ class TaskQueueWin : public TaskQueueBase { timer_tasks_; UINT_PTR timer_id_ = 0; WorkerThread thread_; - rtc::CriticalSection pending_lock_; + Mutex pending_lock_; std::queue<std::unique_ptr<QueuedTask>> pending_ RTC_GUARDED_BY(pending_lock_); HANDLE in_queue_; @@ -235,7 +235,7 @@ void TaskQueueWin::Delete() { } void TaskQueueWin::PostTask(std::unique_ptr<QueuedTask> task) { - rtc::CritScope lock(&pending_lock_); + MutexLock lock(&pending_lock_); pending_.push(std::move(task)); ::SetEvent(in_queue_); } @@ -262,7 +262,7 @@ void TaskQueueWin::RunPendingTasks() { while (true) { std::unique_ptr<QueuedTask> task; { - rtc::CritScope lock(&pending_lock_); + MutexLock lock(&pending_lock_); if (pending_.empty()) break; task = std::move(pending_.front()); diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn index 1882cd9ee8..54f9a048f0 100644 --- a/rtc_base/task_utils/BUILD.gn +++ b/rtc_base/task_utils/BUILD.gn @@ -21,9 +21,10 @@ rtc_library("repeating_task") { "../../api/task_queue", "../../api/units:time_delta", "../../api/units:timestamp", + "../../system_wrappers:system_wrappers", "../synchronization:sequence_checker", - "//third_party/abseil-cpp/absl/memory", ] + absl_deps = [ "//third_party/abseil-cpp/absl/memory" ] } rtc_library("pending_task_safety_flag") { @@ -81,7 +82,7 @@ if (rtc_include_tests) { ":to_queued_task", "../../api/task_queue", "../../test:test_support", - "//third_party/abseil-cpp/absl/memory", ] + absl_deps = [ "//third_party/abseil-cpp/absl/memory" ] } } diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc index 307d2d594c..4be2131f3f 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.cc +++ b/rtc_base/task_utils/pending_task_safety_flag.cc @@ -15,7 +15,7 @@ namespace webrtc { // static -PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() { +rtc::scoped_refptr<PendingTaskSafetyFlag> PendingTaskSafetyFlag::Create() { return new rtc::RefCountedObject<PendingTaskSafetyFlag>(); } diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h index 1b301c8034..580fb3f912 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.h +++ b/rtc_base/task_utils/pending_task_safety_flag.h @@ -36,12 +36,17 @@ namespace webrtc { // MyMethod(); // })); // +// Or implicitly by letting ToQueuedTask do the checking: +// +// // Running outside of the main thread. +// my_task_queue_->PostTask(ToQueuedTask(pending_task_safety_flag_, +// [this]() { MyMethod(); })); +// // Note that checking the state only works on the construction/destruction // thread of the ReceiveStatisticsProxy instance. class PendingTaskSafetyFlag : public rtc::RefCountInterface { public: - using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>; - static Pointer Create(); + static rtc::scoped_refptr<PendingTaskSafetyFlag> Create(); ~PendingTaskSafetyFlag() = default; @@ -56,6 +61,25 @@ class PendingTaskSafetyFlag : public rtc::RefCountInterface { SequenceChecker main_sequence_; }; +// Makes using PendingTaskSafetyFlag very simple. Automatic PTSF creation +// and signalling of destruction when the ScopedTaskSafety instance goes out +// of scope. +// Should be used by the class that wants tasks dropped after destruction. +// Requirements are that the instance be constructed and destructed on +// the same thread as the potentially dropped tasks would be running on. +class ScopedTaskSafety { + public: + ScopedTaskSafety() = default; + ~ScopedTaskSafety() { flag_->SetNotAlive(); } + + // Returns a new reference to the safety flag. + rtc::scoped_refptr<PendingTaskSafetyFlag> flag() const { return flag_; } + + private: + rtc::scoped_refptr<PendingTaskSafetyFlag> flag_ = + PendingTaskSafetyFlag::Create(); +}; + } // namespace webrtc #endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc index 0c1c3c8e52..6df2fe2ffb 100644 --- a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc +++ b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc @@ -29,7 +29,7 @@ using ::testing::Return; } // namespace TEST(PendingTaskSafetyFlagTest, Basic) { - PendingTaskSafetyFlag::Pointer safety_flag; + rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag; { // Scope for the |owner| instance. class Owner { @@ -37,12 +37,27 @@ TEST(PendingTaskSafetyFlagTest, Basic) { Owner() = default; ~Owner() { flag_->SetNotAlive(); } - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + rtc::scoped_refptr<PendingTaskSafetyFlag> flag_ = + PendingTaskSafetyFlag::Create(); } owner; EXPECT_TRUE(owner.flag_->alive()); safety_flag = owner.flag_; EXPECT_TRUE(safety_flag->alive()); } + // |owner| now out of scope. + EXPECT_FALSE(safety_flag->alive()); +} + +TEST(PendingTaskSafetyFlagTest, BasicScoped) { + rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag; + { + struct Owner { + ScopedTaskSafety safety; + } owner; + safety_flag = owner.safety.flag(); + EXPECT_TRUE(safety_flag->alive()); + } + // |owner| now out of scope. EXPECT_FALSE(safety_flag->alive()); } @@ -72,7 +87,8 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) { private: TaskQueueBase* const tq_main_; bool stuff_done_ = false; - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + rtc::scoped_refptr<PendingTaskSafetyFlag> flag_{ + PendingTaskSafetyFlag::Create()}; }; std::unique_ptr<Owner> owner; @@ -106,22 +122,18 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) { } ~Owner() { RTC_DCHECK(tq_main_->IsCurrent()); - flag_->SetNotAlive(); } void DoStuff() { RTC_DCHECK(!tq_main_->IsCurrent()); - tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() { - if (!safe->alive()) - return; - *stuff_done_ = true; - })); + tq_main_->PostTask( + ToQueuedTask(safety_, [this]() { *stuff_done_ = true; })); } private: TaskQueueBase* const tq_main_; bool* const stuff_done_; - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + ScopedTaskSafety safety_; }; std::unique_ptr<Owner> owner; diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc index 4e460bb082..574e6331f1 100644 --- a/rtc_base/task_utils/repeating_task.cc +++ b/rtc_base/task_utils/repeating_task.cc @@ -17,10 +17,13 @@ namespace webrtc { namespace webrtc_repeating_task_impl { + RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue, - TimeDelta first_delay) + TimeDelta first_delay, + Clock* clock) : task_queue_(task_queue), - next_run_time_(Timestamp::Micros(rtc::TimeMicros()) + first_delay) {} + clock_(clock), + next_run_time_(clock_->CurrentTime() + first_delay) {} RepeatingTaskBase::~RepeatingTaskBase() = default; @@ -38,7 +41,7 @@ bool RepeatingTaskBase::Run() { return true; RTC_DCHECK(delay.IsFinite()); - TimeDelta lost_time = Timestamp::Micros(rtc::TimeMicros()) - next_run_time_; + TimeDelta lost_time = clock_->CurrentTime() - next_run_time_; next_run_time_ += delay; delay -= lost_time; delay = std::max(delay, TimeDelta::Zero()); @@ -51,6 +54,7 @@ bool RepeatingTaskBase::Run() { } void RepeatingTaskBase::Stop() { + RTC_DCHECK_RUN_ON(task_queue_); RTC_DCHECK(next_run_time_.IsFinite()); next_run_time_ = Timestamp::PlusInfinity(); } @@ -75,7 +79,6 @@ RepeatingTaskHandle::RepeatingTaskHandle( void RepeatingTaskHandle::Stop() { if (repeating_task_) { - RTC_DCHECK_RUN_ON(repeating_task_->task_queue_); repeating_task_->Stop(); repeating_task_ = nullptr; } diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h index 1545d6f757..487b7d19d4 100644 --- a/rtc_base/task_utils/repeating_task.h +++ b/rtc_base/task_utils/repeating_task.h @@ -19,8 +19,7 @@ #include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" -#include "rtc_base/synchronization/sequence_checker.h" -#include "rtc_base/thread_checker.h" +#include "system_wrappers/include/clock.h" namespace webrtc { @@ -29,17 +28,20 @@ class RepeatingTaskHandle; namespace webrtc_repeating_task_impl { class RepeatingTaskBase : public QueuedTask { public: - RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay); + RepeatingTaskBase(TaskQueueBase* task_queue, + TimeDelta first_delay, + Clock* clock); ~RepeatingTaskBase() override; - virtual TimeDelta RunClosure() = 0; + + void Stop(); private: - friend class ::webrtc::RepeatingTaskHandle; + virtual TimeDelta RunClosure() = 0; bool Run() final; - void Stop() RTC_RUN_ON(task_queue_); TaskQueueBase* const task_queue_; + Clock* const clock_; // This is always finite, except for the special case where it's PlusInfinity // to signal that the task should stop. Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_); @@ -51,8 +53,9 @@ class RepeatingTaskImpl final : public RepeatingTaskBase { public: RepeatingTaskImpl(TaskQueueBase* task_queue, TimeDelta first_delay, - Closure&& closure) - : RepeatingTaskBase(task_queue, first_delay), + Closure&& closure, + Clock* clock) + : RepeatingTaskBase(task_queue, first_delay, clock), closure_(std::forward<Closure>(closure)) { static_assert( std::is_same<TimeDelta, @@ -61,9 +64,9 @@ class RepeatingTaskImpl final : public RepeatingTaskBase { ""); } + private: TimeDelta RunClosure() override { return closure_(); } - private: typename std::remove_const< typename std::remove_reference<Closure>::type>::type closure_; }; @@ -92,10 +95,11 @@ class RepeatingTaskHandle { // repeated task is owned by the TaskQueue. template <class Closure> static RepeatingTaskHandle Start(TaskQueueBase* task_queue, - Closure&& closure) { + Closure&& closure, + Clock* clock = Clock::GetRealTimeClock()) { auto repeating_task = std::make_unique< webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>( - task_queue, TimeDelta::Zero(), std::forward<Closure>(closure)); + task_queue, TimeDelta::Zero(), std::forward<Closure>(closure), clock); auto* repeating_task_ptr = repeating_task.get(); task_queue->PostTask(std::move(repeating_task)); return RepeatingTaskHandle(repeating_task_ptr); @@ -104,12 +108,14 @@ class RepeatingTaskHandle { // DelayedStart is equivalent to Start except that the first invocation of the // closure will be delayed by the given amount. template <class Closure> - static RepeatingTaskHandle DelayedStart(TaskQueueBase* task_queue, - TimeDelta first_delay, - Closure&& closure) { + static RepeatingTaskHandle DelayedStart( + TaskQueueBase* task_queue, + TimeDelta first_delay, + Closure&& closure, + Clock* clock = Clock::GetRealTimeClock()) { auto repeating_task = std::make_unique< webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>( - task_queue, first_delay, std::forward<Closure>(closure)); + task_queue, first_delay, std::forward<Closure>(closure), clock); auto* repeating_task_ptr = repeating_task.get(); task_queue->PostDelayedTask(std::move(repeating_task), first_delay.ms()); return RepeatingTaskHandle(repeating_task_ptr); diff --git a/rtc_base/task_utils/repeating_task_unittest.cc b/rtc_base/task_utils/repeating_task_unittest.cc index 83efb29209..2fb15d1e5a 100644 --- a/rtc_base/task_utils/repeating_task_unittest.cc +++ b/rtc_base/task_utils/repeating_task_unittest.cc @@ -40,8 +40,23 @@ void Sleep(TimeDelta time_delta) { class MockClosure { public: - MOCK_METHOD0(Call, TimeDelta()); - MOCK_METHOD0(Delete, void()); + MOCK_METHOD(TimeDelta, Call, ()); + MOCK_METHOD(void, Delete, ()); +}; + +class MockTaskQueue : public TaskQueueBase { + public: + MockTaskQueue() : task_queue_setter_(this) {} + + MOCK_METHOD(void, Delete, (), (override)); + MOCK_METHOD(void, PostTask, (std::unique_ptr<QueuedTask> task), (override)); + MOCK_METHOD(void, + PostDelayedTask, + (std::unique_ptr<QueuedTask> task, uint32_t milliseconds), + (override)); + + private: + CurrentTaskQueueSetter task_queue_setter_; }; class MoveOnlyClosure { @@ -228,4 +243,37 @@ TEST(RepeatingTaskTest, Example) { // task queue destruction and running the desctructor closure. } +TEST(RepeatingTaskTest, ClockIntegration) { + std::unique_ptr<QueuedTask> delayed_task; + uint32_t expected_ms = 0; + SimulatedClock clock(Timestamp::Millis(0)); + + NiceMock<MockTaskQueue> task_queue; + ON_CALL(task_queue, PostDelayedTask) + .WillByDefault( + Invoke([&delayed_task, &expected_ms](std::unique_ptr<QueuedTask> task, + uint32_t milliseconds) { + EXPECT_EQ(milliseconds, expected_ms); + delayed_task = std::move(task); + })); + + expected_ms = 100; + RepeatingTaskHandle handle = RepeatingTaskHandle::DelayedStart( + &task_queue, TimeDelta::Millis(100), + [&clock]() { + EXPECT_EQ(Timestamp::Millis(100), clock.CurrentTime()); + // Simulate work happening for 10ms. + clock.AdvanceTimeMilliseconds(10); + return TimeDelta::Millis(100); + }, + &clock); + + clock.AdvanceTimeMilliseconds(100); + QueuedTask* task_to_run = delayed_task.release(); + expected_ms = 90; + EXPECT_FALSE(task_to_run->Run()); + EXPECT_NE(nullptr, delayed_task.get()); + handle.Stop(); +} + } // namespace webrtc diff --git a/rtc_base/task_utils/to_queued_task.h b/rtc_base/task_utils/to_queued_task.h index cc9325ebd6..07ab0ebe26 100644 --- a/rtc_base/task_utils/to_queued_task.h +++ b/rtc_base/task_utils/to_queued_task.h @@ -39,7 +39,7 @@ class ClosureTask : public QueuedTask { template <typename Closure> class SafetyClosureTask : public QueuedTask { public: - explicit SafetyClosureTask(PendingTaskSafetyFlag::Pointer safety, + explicit SafetyClosureTask(rtc::scoped_refptr<PendingTaskSafetyFlag> safety, Closure&& closure) : closure_(std::forward<Closure>(closure)), safety_flag_(std::move(safety)) {} @@ -52,7 +52,7 @@ class SafetyClosureTask : public QueuedTask { } typename std::decay<Closure>::type closure_; - PendingTaskSafetyFlag::Pointer safety_flag_; + rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag_; }; // Extends ClosureTask to also allow specifying cleanup code. @@ -81,13 +81,25 @@ std::unique_ptr<QueuedTask> ToQueuedTask(Closure&& closure) { } template <typename Closure> -std::unique_ptr<QueuedTask> ToQueuedTask(PendingTaskSafetyFlag::Pointer safety, - Closure&& closure) { +std::unique_ptr<QueuedTask> ToQueuedTask( + rtc::scoped_refptr<PendingTaskSafetyFlag> safety, + Closure&& closure) { return std::make_unique<webrtc_new_closure_impl::SafetyClosureTask<Closure>>( std::move(safety), std::forward<Closure>(closure)); } -template <typename Closure, typename Cleanup> +template <typename Closure> +std::unique_ptr<QueuedTask> ToQueuedTask(const ScopedTaskSafety& safety, + Closure&& closure) { + return ToQueuedTask(safety.flag(), std::forward<Closure>(closure)); +} + +template <typename Closure, + typename Cleanup, + typename std::enable_if<!std::is_same< + typename std::remove_const< + typename std::remove_reference<Closure>::type>::type, + ScopedTaskSafety>::value>::type* = nullptr> std::unique_ptr<QueuedTask> ToQueuedTask(Closure&& closure, Cleanup&& cleanup) { return std::make_unique< webrtc_new_closure_impl::ClosureTaskWithCleanup<Closure, Cleanup>>( diff --git a/rtc_base/task_utils/to_queued_task_unittest.cc b/rtc_base/task_utils/to_queued_task_unittest.cc index e98c81e9ce..261b9e891b 100644 --- a/rtc_base/task_utils/to_queued_task_unittest.cc +++ b/rtc_base/task_utils/to_queued_task_unittest.cc @@ -127,7 +127,8 @@ TEST(ToQueuedTaskTest, AcceptsMoveOnlyCleanup) { } TEST(ToQueuedTaskTest, PendingTaskSafetyFlag) { - PendingTaskSafetyFlag::Pointer flag(PendingTaskSafetyFlag::Create()); + rtc::scoped_refptr<PendingTaskSafetyFlag> flag = + PendingTaskSafetyFlag::Create(); int count = 0; // Create two identical tasks that increment the |count|. diff --git a/rtc_base/test_client.cc b/rtc_base/test_client.cc index e5aa9d7987..f23ac2aec0 100644 --- a/rtc_base/test_client.cc +++ b/rtc_base/test_client.cc @@ -75,7 +75,7 @@ std::unique_ptr<TestClient::Packet> TestClient::NextPacket(int timeout_ms) { int64_t end = TimeAfter(timeout_ms); while (TimeUntil(end) > 0) { { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); if (packets_.size() != 0) { break; } @@ -85,7 +85,7 @@ std::unique_ptr<TestClient::Packet> TestClient::NextPacket(int timeout_ms) { // Return the first packet placed in the queue. std::unique_ptr<Packet> packet; - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); if (packets_.size() > 0) { packet = std::move(packets_.front()); packets_.erase(packets_.begin()); @@ -149,7 +149,7 @@ void TestClient::OnPacket(AsyncPacketSocket* socket, size_t size, const SocketAddress& remote_addr, const int64_t& packet_time_us) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); packets_.push_back( std::make_unique<Packet>(remote_addr, buf, size, packet_time_us)); } diff --git a/rtc_base/test_client.h b/rtc_base/test_client.h index b45cf005bb..6989fe1d57 100644 --- a/rtc_base/test_client.h +++ b/rtc_base/test_client.h @@ -16,8 +16,8 @@ #include "rtc_base/async_udp_socket.h" #include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" #include "rtc_base/fake_clock.h" +#include "rtc_base/synchronization/mutex.h" namespace rtc { @@ -105,7 +105,7 @@ class TestClient : public sigslot::has_slots<> { void AdvanceTime(int ms); ThreadProcessingFakeClock* fake_clock_ = nullptr; - CriticalSection crit_; + webrtc::Mutex mutex_; std::unique_ptr<AsyncPacketSocket> socket_; std::vector<std::unique_ptr<Packet>> packets_; int ready_to_send_count_ = 0; diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 0fb2e813e0..2882f50da3 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -31,9 +31,10 @@ #include "absl/algorithm/container.h" #include "rtc_base/atomic_ops.h" #include "rtc_base/checks.h" -#include "rtc_base/critical_section.h" +#include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/logging.h" #include "rtc_base/null_socket_server.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" #include "rtc_base/trace_event.h" @@ -87,8 +88,8 @@ class MessageHandlerWithTask final : public MessageHandler { class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { public: - MarkProcessingCritScope(const CriticalSection* cs, size_t* processing) - RTC_EXCLUSIVE_LOCK_FUNCTION(cs) + MarkProcessingCritScope(const RecursiveCriticalSection* cs, + size_t* processing) RTC_EXCLUSIVE_LOCK_FUNCTION(cs) : cs_(cs), processing_(processing) { cs_->Enter(); *processing_ += 1; @@ -100,7 +101,7 @@ class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { } private: - const CriticalSection* const cs_; + const RecursiveCriticalSection* const cs_; size_t* processing_; RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope); @@ -168,8 +169,8 @@ void ThreadManager::RegisterSendAndCheckForCycles(Thread* source, // We check the pre-existing who-sends-to-who graph for any path from target // to source. This loop is guaranteed to terminate because per the send graph // invariant, there are no cycles in the graph. - for (auto it = all_targets.begin(); it != all_targets.end(); ++it) { - const auto& targets = send_graph_[*it]; + for (size_t i = 0; i < all_targets.size(); i++) { + const auto& targets = send_graph_[all_targets[i]]; all_targets.insert(all_targets.end(), targets.begin(), targets.end()); } RTC_CHECK_EQ(absl::c_count(all_targets, source), 0) @@ -296,6 +297,21 @@ void ThreadManager::SetCurrentThread(Thread* thread) { RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?"; } #endif // RTC_DLOG_IS_ON + + if (thread) { + thread->EnsureIsCurrentTaskQueue(); + } else { + Thread* current = CurrentThread(); + if (current) { + // The current thread is being cleared, e.g. as a result of + // UnwrapCurrent() being called or when a thread is being stopped + // (see PreRun()). This signals that the Thread instance is being detached + // from the thread, which also means that TaskQueue::Current() must not + // return a pointer to the Thread instance. + current->ClearCurrentTaskQueue(); + } + } + SetCurrentThreadInternal(thread); } @@ -824,7 +840,6 @@ void* Thread::PreRun(void* pv) { Thread* thread = static_cast<Thread*>(pv); ThreadManager::Instance()->SetCurrentThread(thread); rtc::SetCurrentThreadName(thread->name_.c_str()); - CurrentTaskQueueSetter set_current_task_queue(thread); #if defined(WEBRTC_MAC) ScopedAutoReleasePool pool; #endif @@ -878,6 +893,7 @@ void Thread::Send(const Location& posted_from, AutoThread thread; Thread* current_thread = Thread::Current(); RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this + RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); #if RTC_DCHECK_IS_ON ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, this); @@ -935,6 +951,17 @@ void Thread::InvokeInternal(const Location& posted_from, Send(posted_from, &handler); } +// Called by the ThreadManager when being set as the current thread. +void Thread::EnsureIsCurrentTaskQueue() { + task_queue_registration_ = + std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this); +} + +// Called by the ThreadManager when being set as the current thread. +void Thread::ClearCurrentTaskQueue() { + task_queue_registration_.reset(); +} + void Thread::QueuedTaskHandler::OnMessage(Message* msg) { RTC_DCHECK(msg); auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata); @@ -949,6 +976,50 @@ void Thread::QueuedTaskHandler::OnMessage(Message* msg) { task.release(); } +void Thread::AllowInvokesToThread(Thread* thread) { +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + if (!IsCurrent()) { + PostTask(webrtc::ToQueuedTask( + [thread, this]() { AllowInvokesToThread(thread); })); + return; + } + RTC_DCHECK_RUN_ON(this); + allowed_threads_.push_back(thread); + invoke_policy_enabled_ = true; +#endif +} + +void Thread::DisallowAllInvokes() { +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + if (!IsCurrent()) { + PostTask(webrtc::ToQueuedTask([this]() { DisallowAllInvokes(); })); + return; + } + RTC_DCHECK_RUN_ON(this); + allowed_threads_.clear(); + invoke_policy_enabled_ = true; +#endif +} + +// Returns true if no policies added or if there is at least one policy +// that permits invocation to |target| thread. +bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) { +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + RTC_DCHECK_RUN_ON(this); + if (!invoke_policy_enabled_) { + return true; + } + for (const auto* thread : allowed_threads_) { + if (thread == target) { + return true; + } + } + return false; +#else + return true; +#endif +} + void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) { // Though Post takes MessageData by raw pointer (last parameter), it still // takes it with ownership. diff --git a/rtc_base/thread.h b/rtc_base/thread.h index 74aab623c8..27a5b7b510 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -29,7 +29,7 @@ #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" #include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" +#include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/location.h" #include "rtc_base/message_handler.h" #include "rtc_base/platform_thread_types.h" @@ -140,7 +140,7 @@ class RTC_EXPORT ThreadManager { // Methods that don't modify the list of message queues may be called in a // re-entrant fashion. "processing_" keeps track of the depth of re-entrant // calls. - CriticalSection crit_; + RecursiveCriticalSection crit_; size_t processing_ RTC_GUARDED_BY(crit_) = 0; #if RTC_DCHECK_IS_ON // Represents all thread seand actions by storing all send targets per thread. @@ -338,6 +338,18 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { InvokeInternal(posted_from, functor); } + // Allows invoke to specified |thread|. Thread never will be dereferenced and + // will be used only for reference-based comparison, so instance can be safely + // deleted. If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing. + void AllowInvokesToThread(Thread* thread); + // If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing. + void DisallowAllInvokes(); + // Returns true if |target| was allowed by AllowInvokesToThread() or if no + // calls were made to AllowInvokesToThread and DisallowAllInvokes. Otherwise + // returns false. + // If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined always returns true. + bool IsInvokeToThreadAllowed(rtc::Thread* target); + // Posts a task to invoke the functor on |this| thread asynchronously, i.e. // without blocking the thread that invoked PostTask(). Ownership of |functor| // is passed and (usually, see below) destroyed on |this| thread after it is @@ -519,7 +531,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { friend class ScopedDisallowBlockingCalls; - CriticalSection* CritForTest() { return &crit_; } + RecursiveCriticalSection* CritForTest() { return &crit_; } private: class QueuedTaskHandler final : public MessageHandler { @@ -551,6 +563,12 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { void InvokeInternal(const Location& posted_from, rtc::FunctionView<void()> functor); + // Called by the ThreadManager when being set as the current thread. + void EnsureIsCurrentTaskQueue(); + + // Called by the ThreadManager when being unset as the current thread. + void ClearCurrentTaskQueue(); + // Returns a static-lifetime MessageHandler which runs message with // MessageLikeTask payload data. static MessageHandler* GetPostTaskMessageHandler(); @@ -560,7 +578,11 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { MessageList messages_ RTC_GUARDED_BY(crit_); PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_); - CriticalSection crit_; +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + std::vector<Thread*> allowed_threads_ RTC_GUARDED_BY(this); + bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false; +#endif + RecursiveCriticalSection crit_; bool fInitialized_; bool fDestroyed_; @@ -595,6 +617,8 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // Runs webrtc::QueuedTask posted to the Thread. QueuedTaskHandler queued_task_handler_; + std::unique_ptr<TaskQueueBase::CurrentTaskQueueSetter> + task_queue_registration_; friend class ThreadManager; diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index d53a387914..d3cae34dfa 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -22,12 +22,14 @@ #include "rtc_base/null_socket_server.h" #include "rtc_base/physical_socket_server.h" #include "rtc_base/socket_address.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/third_party/sigslot/sigslot.h" #include "test/testsupport/rtc_expect_death.h" #if defined(WEBRTC_WIN) #include <comdef.h> // NOLINT + #endif namespace rtc { @@ -161,17 +163,17 @@ class AtomicBool { public: explicit AtomicBool(bool value = false) : flag_(value) {} AtomicBool& operator=(bool value) { - CritScope scoped_lock(&cs_); + webrtc::MutexLock scoped_lock(&mutex_); flag_ = value; return *this; } bool get() const { - CritScope scoped_lock(&cs_); + webrtc::MutexLock scoped_lock(&mutex_); return flag_; } private: - CriticalSection cs_; + mutable webrtc::Mutex mutex_; bool flag_; }; @@ -288,6 +290,63 @@ TEST(ThreadTest, Wrap) { ThreadManager::Instance()->SetCurrentThread(current_thread); } +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) +TEST(ThreadTest, InvokeToThreadAllowedReturnsTrueWithoutPolicies) { + // Create and start the thread. + auto thread1 = Thread::CreateWithSocketServer(); + auto thread2 = Thread::CreateWithSocketServer(); + + thread1->PostTask(ToQueuedTask( + [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); })); + Thread* th_main = Thread::Current(); + th_main->ProcessMessages(100); +} + +TEST(ThreadTest, InvokeAllowedWhenThreadsAdded) { + // Create and start the thread. + auto thread1 = Thread::CreateWithSocketServer(); + auto thread2 = Thread::CreateWithSocketServer(); + auto thread3 = Thread::CreateWithSocketServer(); + auto thread4 = Thread::CreateWithSocketServer(); + + thread1->AllowInvokesToThread(thread2.get()); + thread1->AllowInvokesToThread(thread3.get()); + + thread1->PostTask(ToQueuedTask([&]() { + EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); + EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get())); + EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get())); + })); + Thread* th_main = Thread::Current(); + th_main->ProcessMessages(100); +} + +TEST(ThreadTest, InvokesDisallowedWhenDisallowAllInvokes) { + // Create and start the thread. + auto thread1 = Thread::CreateWithSocketServer(); + auto thread2 = Thread::CreateWithSocketServer(); + + thread1->DisallowAllInvokes(); + + thread1->PostTask(ToQueuedTask([&]() { + EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get())); + })); + Thread* th_main = Thread::Current(); + th_main->ProcessMessages(100); +} +#endif // (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + +TEST(ThreadTest, InvokesAllowedByDefault) { + // Create and start the thread. + auto thread1 = Thread::CreateWithSocketServer(); + auto thread2 = Thread::CreateWithSocketServer(); + + thread1->PostTask(ToQueuedTask( + [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); })); + Thread* th_main = Thread::Current(); + th_main->ProcessMessages(100); +} + TEST(ThreadTest, Invoke) { // Create and start the thread. auto thread = Thread::CreateWithSocketServer(); @@ -356,18 +415,18 @@ TEST(ThreadTest, ThreeThreadsInvoke) { explicit LockedBool(bool value) : value_(value) {} void Set(bool value) { - CritScope lock(&crit_); + webrtc::MutexLock lock(&mutex_); value_ = value; } bool Get() { - CritScope lock(&crit_); + webrtc::MutexLock lock(&mutex_); return value_; } private: - CriticalSection crit_; - bool value_ RTC_GUARDED_BY(crit_); + webrtc::Mutex mutex_; + bool value_ RTC_GUARDED_BY(mutex_); }; struct LocalFuncs { @@ -390,7 +449,6 @@ TEST(ThreadTest, ThreeThreadsInvoke) { Thread* thread1, Thread* thread2, LockedBool* out) { - CriticalSection crit; LockedBool async_invoked(false); invoker->AsyncInvoke<void>( @@ -780,105 +838,6 @@ TEST_F(AsyncInvokeTest, FlushWithIds) { EXPECT_TRUE(flag2.get()); } -class GuardedAsyncInvokeTest : public ::testing::Test { - public: - void IntCallback(int value) { - EXPECT_EQ(expected_thread_, Thread::Current()); - int_value_ = value; - } - void SetExpectedThreadForIntCallback(Thread* thread) { - expected_thread_ = thread; - } - - protected: - constexpr static int kWaitTimeout = 1000; - GuardedAsyncInvokeTest() : int_value_(0), expected_thread_(nullptr) {} - - int int_value_; - Thread* expected_thread_; -}; - -// Functor for creating an invoker. -struct CreateInvoker { - CreateInvoker(std::unique_ptr<GuardedAsyncInvoker>* invoker) - : invoker_(invoker) {} - void operator()() { invoker_->reset(new GuardedAsyncInvoker()); } - std::unique_ptr<GuardedAsyncInvoker>* invoker_; -}; - -// Test that we can call AsyncInvoke<void>() after the thread died. -TEST_F(GuardedAsyncInvokeTest, KillThreadFireAndForget) { - // Create and start the thread. - std::unique_ptr<Thread> thread(Thread::Create()); - thread->Start(); - std::unique_ptr<GuardedAsyncInvoker> invoker; - // Create the invoker on |thread|. - thread->Invoke<void>(RTC_FROM_HERE, CreateInvoker(&invoker)); - // Kill |thread|. - thread = nullptr; - // Try calling functor. - AtomicBool called; - EXPECT_FALSE(invoker->AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&called))); - // With thread gone, nothing should happen. - WAIT(called.get(), kWaitTimeout); - EXPECT_FALSE(called.get()); -} - -// The remaining tests check that GuardedAsyncInvoker behaves as AsyncInvoker -// when Thread is still alive. -TEST_F(GuardedAsyncInvokeTest, FireAndForget) { - GuardedAsyncInvoker invoker; - // Try calling functor. - AtomicBool called; - EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&called))); - EXPECT_TRUE_WAIT(called.get(), kWaitTimeout); -} - -TEST_F(GuardedAsyncInvokeTest, NonCopyableFunctor) { - GuardedAsyncInvoker invoker; - // Try calling functor. - AtomicBool called; - EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorD(&called))); - EXPECT_TRUE_WAIT(called.get(), kWaitTimeout); -} - -TEST_F(GuardedAsyncInvokeTest, Flush) { - GuardedAsyncInvoker invoker; - AtomicBool flag1; - AtomicBool flag2; - // Queue two async calls to the current thread. - EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&flag1))); - EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&flag2))); - // Because we haven't pumped messages, these should not have run yet. - EXPECT_FALSE(flag1.get()); - EXPECT_FALSE(flag2.get()); - // Force them to run now. - EXPECT_TRUE(invoker.Flush()); - EXPECT_TRUE(flag1.get()); - EXPECT_TRUE(flag2.get()); -} - -TEST_F(GuardedAsyncInvokeTest, FlushWithIds) { - GuardedAsyncInvoker invoker; - AtomicBool flag1; - AtomicBool flag2; - // Queue two async calls to the current thread, one with a message id. - EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&flag1), 5)); - EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&flag2))); - // Because we haven't pumped messages, these should not have run yet. - EXPECT_FALSE(flag1.get()); - EXPECT_FALSE(flag2.get()); - // Execute pending calls with id == 5. - EXPECT_TRUE(invoker.Flush(5)); - EXPECT_TRUE(flag1.get()); - EXPECT_FALSE(flag2.get()); - flag1 = false; - // Execute all pending calls. The id == 5 call should not execute again. - EXPECT_TRUE(invoker.Flush()); - EXPECT_FALSE(flag1.get()); - EXPECT_TRUE(flag2.get()); -} - void ThreadIsCurrent(Thread* thread, bool* result, Event* event) { *result = thread->IsCurrent(); event->Set(); @@ -1148,6 +1107,18 @@ TEST(ThreadPostDelayedTaskTest, InvokesInDelayOrder) { EXPECT_TRUE(fourth.Wait(0)); } +TEST(ThreadPostDelayedTaskTest, IsCurrentTaskQueue) { + auto current_tq = webrtc::TaskQueueBase::Current(); + { + std::unique_ptr<rtc::Thread> thread(rtc::Thread::Create()); + thread->WrapCurrent(); + EXPECT_EQ(webrtc::TaskQueueBase::Current(), + static_cast<webrtc::TaskQueueBase*>(thread.get())); + thread->UnwrapCurrent(); + } + EXPECT_EQ(webrtc::TaskQueueBase::Current(), current_tq); +} + class ThreadFactory : public webrtc::TaskQueueFactory { public: std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> diff --git a/rtc_base/time_utils.cc b/rtc_base/time_utils.cc index 8d919262d3..11c9d5a47f 100644 --- a/rtc_base/time_utils.cc +++ b/rtc_base/time_utils.cc @@ -247,7 +247,7 @@ int64_t TimestampWrapAroundHandler::Unwrap(uint32_t ts) { ++num_wrap_; } else if ((ts - last_ts_) > 0xf0000000) { // Backwards wrap. Unwrap with last wrap count and don't update last_ts_. - return ts + ((num_wrap_ - 1) << 32); + return ts + (num_wrap_ - 1) * (int64_t{1} << 32); } last_ts_ = ts; diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc index d42873e18b..3d412d66cc 100644 --- a/rtc_base/virtual_socket_server.cc +++ b/rtc_base/virtual_socket_server.cc @@ -19,6 +19,7 @@ #include "absl/algorithm/container.h" #include "rtc_base/checks.h" +#include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/fake_clock.h" #include "rtc_base/logging.h" #include "rtc_base/physical_socket_server.h" diff --git a/rtc_base/virtual_socket_server.h b/rtc_base/virtual_socket_server.h index f45fabf0af..84f8fb1bdc 100644 --- a/rtc_base/virtual_socket_server.h +++ b/rtc_base/virtual_socket_server.h @@ -17,6 +17,7 @@ #include "rtc_base/checks.h" #include "rtc_base/constructor_magic.h" +#include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/event.h" #include "rtc_base/fake_clock.h" #include "rtc_base/message_handler.h" @@ -294,7 +295,7 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> { std::map<rtc::IPAddress, rtc::IPAddress> alternative_address_mapping_; std::unique_ptr<Function> delay_dist_; - CriticalSection delay_crit_; + RecursiveCriticalSection delay_crit_; double drop_prob_; bool sending_blocked_ = false; @@ -379,7 +380,7 @@ class VirtualSocket : public AsyncSocket, bool ready_to_send_ = true; // Critical section to protect the recv_buffer and queue_ - CriticalSection crit_; + RecursiveCriticalSection crit_; // Network model that enforces bandwidth and capacity constraints NetworkQueue network_; diff --git a/rtc_base/win32_socket_server.cc b/rtc_base/win32_socket_server.cc index 8a5b93a608..cfe21a3630 100644 --- a/rtc_base/win32_socket_server.cc +++ b/rtc_base/win32_socket_server.cc @@ -733,7 +733,7 @@ bool Win32SocketServer::Wait(int cms, bool process_io) { MSG msg; b = GetMessage(&msg, nullptr, s_wm_wakeup_id, s_wm_wakeup_id); { - CritScope scope(&cs_); + webrtc::MutexLock lock(&mutex_); posted_ = false; } } else { @@ -747,7 +747,7 @@ void Win32SocketServer::WakeUp() { if (wnd_.handle()) { // Set the "message pending" flag, if not already set. { - CritScope scope(&cs_); + webrtc::MutexLock lock(&mutex_); if (posted_) return; posted_ = true; @@ -760,7 +760,7 @@ void Win32SocketServer::WakeUp() { void Win32SocketServer::Pump() { // Clear the "message pending" flag. { - CritScope scope(&cs_); + webrtc::MutexLock lock(&mutex_); posted_ = false; } diff --git a/rtc_base/win32_socket_server.h b/rtc_base/win32_socket_server.h index 92fd68cd83..317acce0d2 100644 --- a/rtc_base/win32_socket_server.h +++ b/rtc_base/win32_socket_server.h @@ -13,10 +13,10 @@ #if defined(WEBRTC_WIN) #include "rtc_base/async_socket.h" -#include "rtc_base/critical_section.h" #include "rtc_base/socket.h" #include "rtc_base/socket_factory.h" #include "rtc_base/socket_server.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread.h" #include "rtc_base/win32_window.h" @@ -123,7 +123,7 @@ class Win32SocketServer : public SocketServer { static const wchar_t kWindowName[]; Thread* message_queue_; MessageWindow wnd_; - CriticalSection cs_; + webrtc::Mutex mutex_; bool posted_; HWND hdlg_; }; |