aboutsummaryrefslogtreecommitdiff
path: root/rtc_base
diff options
context:
space:
mode:
authorTreeHugger Robot <treehugger-gerrit@google.com>2020-07-23 23:07:18 +0000
committerAndroid (Google) Code Review <android-gerrit@google.com>2020-07-23 23:07:18 +0000
commit9e9b79187bff6d54f8e15db1978c157f8dda9335 (patch)
tree9ba6a46a7e4cd59e1018b94136f46578efe31f2e /rtc_base
parent37f9b0ea9ea6a8c490bdb0dc2f44a586b01c8ab2 (diff)
parent206ccd0b36df69a0d0d0d26ddf7c4ead20202f91 (diff)
downloadwebrtc-9e9b79187bff6d54f8e15db1978c157f8dda9335.tar.gz
Merge changes Ida3bfe62,I2d596942
* changes: Merge remote tracking branch 'upstream-master' Generate new Android.bp file and correct build errors
Diffstat (limited to 'rtc_base')
-rw-r--r--rtc_base/BUILD.gn81
-rw-r--r--rtc_base/DEPS2
-rw-r--r--rtc_base/async_invoker.cc22
-rw-r--r--rtc_base/async_invoker.h91
-rw-r--r--rtc_base/async_invoker_inl.h1
-rw-r--r--rtc_base/bit_buffer.cc11
-rw-r--r--rtc_base/bit_buffer_unittest.cc24
-rw-r--r--rtc_base/buffer.h4
-rw-r--r--rtc_base/buffer_queue.cc10
-rw-r--r--rtc_base/buffer_queue.h8
-rw-r--r--rtc_base/buffer_unittest.cc2
-rw-r--r--rtc_base/checks.h2
-rw-r--r--rtc_base/checks_unittest.cc2
-rw-r--r--rtc_base/deprecated/recursive_critical_section.cc (renamed from rtc_base/critical_section.cc)52
-rw-r--r--rtc_base/deprecated/recursive_critical_section.h (renamed from rtc_base/critical_section.h)51
-rw-r--r--rtc_base/deprecated/recursive_critical_section_unittest.cc (renamed from rtc_base/critical_section_unittest.cc)37
-rw-r--r--rtc_base/deprecated/signal_thread.cc (renamed from rtc_base/signal_thread.cc)46
-rw-r--r--rtc_base/deprecated/signal_thread.h166
-rw-r--r--rtc_base/deprecated/signal_thread_unittest.cc (renamed from rtc_base/signal_thread_unittest.cc)18
-rw-r--r--rtc_base/event_tracer.cc12
-rw-r--r--rtc_base/event_tracer_unittest.cc12
-rw-r--r--rtc_base/experiments/BUILD.gn30
-rw-r--r--rtc_base/experiments/quality_rampup_experiment.cc4
-rw-r--r--rtc_base/experiments/quality_rampup_experiment.h2
-rw-r--r--rtc_base/fake_clock.cc6
-rw-r--r--rtc_base/fake_clock.h4
-rw-r--r--rtc_base/firewall_socket_server.cc14
-rw-r--r--rtc_base/firewall_socket_server.h6
-rw-r--r--rtc_base/logging.cc30
-rw-r--r--rtc_base/logging.h16
-rw-r--r--rtc_base/memory/BUILD.gn5
-rw-r--r--rtc_base/memory/fifo_buffer.cc26
-rw-r--r--rtc_base/memory/fifo_buffer.h17
-rw-r--r--rtc_base/nat_server.cc20
-rw-r--r--rtc_base/nat_server.h9
-rw-r--r--rtc_base/net_helpers.cc62
-rw-r--r--rtc_base/net_helpers.h36
-rw-r--r--rtc_base/network.cc24
-rw-r--r--rtc_base/network.h1
-rw-r--r--rtc_base/network/BUILD.gn6
-rw-r--r--rtc_base/one_time_event.h6
-rw-r--r--rtc_base/openssl_adapter_unittest.cc40
-rw-r--r--rtc_base/operations_chain_unittest.cc5
-rw-r--r--rtc_base/physical_socket_server.cc281
-rw-r--r--rtc_base/physical_socket_server.h58
-rw-r--r--rtc_base/physical_socket_server_unittest.cc133
-rw-r--r--rtc_base/platform_thread_types.cc28
-rw-r--r--rtc_base/rate_limiter.cc6
-rw-r--r--rtc_base/rate_limiter.h4
-rw-r--r--rtc_base/rate_statistics.cc109
-rw-r--r--rtc_base/rate_statistics.h26
-rw-r--r--rtc_base/signal_thread.h148
-rw-r--r--rtc_base/ssl_adapter_unittest.cc2
-rw-r--r--rtc_base/stream.h1
-rw-r--r--rtc_base/strings/string_builder_unittest.cc12
-rw-r--r--rtc_base/swap_queue_unittest.cc6
-rw-r--r--rtc_base/synchronization/BUILD.gn59
-rw-r--r--rtc_base/synchronization/DEPS11
-rw-r--r--rtc_base/synchronization/mutex.cc39
-rw-r--r--rtc_base/synchronization/mutex.h146
-rw-r--r--rtc_base/synchronization/mutex_abseil.h37
-rw-r--r--rtc_base/synchronization/mutex_benchmark.cc95
-rw-r--r--rtc_base/synchronization/mutex_critical_section.h54
-rw-r--r--rtc_base/synchronization/mutex_pthread.h53
-rw-r--r--rtc_base/synchronization/mutex_unittest.cc206
-rw-r--r--rtc_base/synchronization/sequence_checker.cc51
-rw-r--r--rtc_base/synchronization/sequence_checker.h24
-rw-r--r--rtc_base/synchronization/sequence_checker_unittest.cc9
-rw-r--r--rtc_base/synchronization/yield.cc36
-rw-r--r--rtc_base/synchronization/yield.h20
-rw-r--r--rtc_base/synchronization/yield_policy_unittest.cc2
-rw-r--r--rtc_base/system/BUILD.gn13
-rw-r--r--rtc_base/system/file_wrapper.h2
-rw-r--r--rtc_base/system/thread_registry.cc10
-rw-r--r--rtc_base/task_queue_libevent.cc8
-rw-r--r--rtc_base/task_queue_stdlib.cc12
-rw-r--r--rtc_base/task_queue_win.cc8
-rw-r--r--rtc_base/task_utils/BUILD.gn5
-rw-r--r--rtc_base/task_utils/pending_task_safety_flag.cc2
-rw-r--r--rtc_base/task_utils/pending_task_safety_flag.h28
-rw-r--r--rtc_base/task_utils/pending_task_safety_flag_unittest.cc32
-rw-r--r--rtc_base/task_utils/repeating_task.cc11
-rw-r--r--rtc_base/task_utils/repeating_task.h36
-rw-r--r--rtc_base/task_utils/repeating_task_unittest.cc52
-rw-r--r--rtc_base/task_utils/to_queued_task.h22
-rw-r--r--rtc_base/task_utils/to_queued_task_unittest.cc3
-rw-r--r--rtc_base/test_client.cc6
-rw-r--r--rtc_base/test_client.h4
-rw-r--r--rtc_base/thread.cc85
-rw-r--r--rtc_base/thread.h32
-rw-r--r--rtc_base/thread_unittest.cc185
-rw-r--r--rtc_base/time_utils.cc2
-rw-r--r--rtc_base/virtual_socket_server.cc1
-rw-r--r--rtc_base/virtual_socket_server.h5
-rw-r--r--rtc_base/win32_socket_server.cc6
-rw-r--r--rtc_base/win32_socket_server.h4
96 files changed, 1953 insertions, 1300 deletions
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index a61ede4ac9..73bca85efa 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -55,12 +55,13 @@ rtc_library("rtc_base_approved") {
":type_traits",
"../api:array_view",
"../api:scoped_refptr",
+ "synchronization:mutex",
"system:arch",
"system:rtc_export",
"system:unused",
"third_party/base64",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
public_deps = [] # no-presubmit-check TODO(webrtc:8603)
sources = [
@@ -154,6 +155,7 @@ rtc_library("platform_thread_types") {
"platform_thread_types.cc",
"platform_thread_types.h",
]
+ deps = [ ":macromagic" ]
}
rtc_source_set("refcount") {
@@ -168,15 +170,15 @@ rtc_source_set("refcount") {
rtc_library("criticalsection") {
sources = [
- "critical_section.cc",
- "critical_section.h",
+ "deprecated/recursive_critical_section.cc",
+ "deprecated/recursive_critical_section.h",
]
deps = [
":atomicops",
":checks",
":macromagic",
":platform_thread_types",
- "system:rtc_export",
+ "synchronization:yield",
"system:unused",
]
}
@@ -187,6 +189,7 @@ rtc_library("platform_thread") {
":rtc_task_queue_libevent",
":rtc_task_queue_win",
":rtc_task_queue_stdlib",
+ "synchronization:mutex",
"synchronization:sequence_checker",
]
sources = [
@@ -201,8 +204,8 @@ rtc_library("platform_thread") {
":rtc_event",
":thread_checker",
":timeutils",
- "//third_party/abseil-cpp/absl/strings",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
}
rtc_library("rtc_event") {
@@ -225,8 +228,8 @@ rtc_library("rtc_event") {
":checks",
"synchronization:yield_policy",
"system:warn_current_thread_is_deadlocked",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
}
@@ -240,6 +243,9 @@ rtc_library("logging") {
":platform_thread_types",
":stringutils",
":timeutils",
+ "synchronization:mutex",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/meta:type_traits",
"//third_party/abseil-cpp/absl/strings",
@@ -264,7 +270,7 @@ rtc_library("logging") {
deps += [ "system:inline" ]
if (is_mac) {
- libs += [ "Foundation.framework" ]
+ frameworks = [ "Foundation.framework" ]
}
# logging.h needs the deprecation header while downstream projects are
@@ -301,6 +307,8 @@ rtc_library("checks") {
":safe_compare",
"system:inline",
"system:rtc_export",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/meta:type_traits",
"//third_party/abseil-cpp/absl/strings",
]
@@ -317,13 +325,14 @@ rtc_library("rate_limiter") {
deps = [
":rtc_base_approved",
"../system_wrappers",
- "//third_party/abseil-cpp/absl/types:optional",
+ "synchronization:mutex",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_source_set("sanitizer") {
sources = [ "sanitizer.h" ]
- deps = [ "//third_party/abseil-cpp/absl/meta:type_traits" ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/meta:type_traits" ]
}
rtc_source_set("bounded_inline_vector") {
@@ -398,6 +407,8 @@ rtc_library("stringutils") {
":macromagic",
":safe_minmax",
"../api:array_view",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
@@ -433,8 +444,8 @@ rtc_library("rtc_task_queue") {
"../api/task_queue",
"system:rtc_export",
"task_utils:to_queued_task",
- "//third_party/abseil-cpp/absl/memory",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/memory" ]
}
rtc_source_set("rtc_operations_chain") {
@@ -469,6 +480,9 @@ if (rtc_enable_libevent) {
":safe_conversions",
":timeutils",
"../api/task_queue",
+ "synchronization:mutex",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/container:inlined_vector",
"//third_party/abseil-cpp/absl/strings",
]
@@ -489,9 +503,10 @@ if (is_mac || is_ios) {
":checks",
":logging",
"../api/task_queue",
+ "synchronization:mutex",
"system:gcd_helpers",
- "//third_party/abseil-cpp/absl/strings",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
}
}
@@ -512,8 +527,9 @@ if (is_win) {
":safe_conversions",
":timeutils",
"../api/task_queue",
- "//third_party/abseil-cpp/absl/strings",
+ "synchronization:mutex",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
}
}
@@ -532,8 +548,9 @@ rtc_library("rtc_task_queue_stdlib") {
":safe_conversions",
":timeutils",
"../api/task_queue",
- "//third_party/abseil-cpp/absl/strings",
+ "synchronization:mutex",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
}
rtc_library("weak_ptr") {
@@ -576,6 +593,8 @@ rtc_library("rtc_numerics") {
"../api/units:data_rate",
"../api/units:time_delta",
"../api/units:timestamp",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/types:optional",
]
@@ -760,6 +779,7 @@ rtc_library("rtc_base") {
deps = [
":checks",
":deprecation",
+ ":rtc_task_queue",
":stringutils",
"../api:array_view",
"../api:function_view",
@@ -767,12 +787,17 @@ rtc_library("rtc_base") {
"../api/task_queue",
"../system_wrappers:field_trial",
"network:sent_packet",
+ "synchronization:mutex",
+ "synchronization:sequence_checker",
"system:file_wrapper",
"system:inline",
"system:rtc_export",
+ "task_utils:pending_task_safety_flag",
"task_utils:to_queued_task",
"third_party/base64",
"third_party/sigslot",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
@@ -801,6 +826,8 @@ rtc_library("rtc_base") {
"crypt_string.h",
"data_rate_limiter.cc",
"data_rate_limiter.h",
+ "deprecated/signal_thread.cc",
+ "deprecated/signal_thread.h",
"dscp.h",
"file_rotating_stream.cc",
"file_rotating_stream.h",
@@ -853,7 +880,6 @@ rtc_library("rtc_base") {
"rtc_certificate.h",
"rtc_certificate_generator.cc",
"rtc_certificate_generator.h",
- "signal_thread.cc",
"signal_thread.h",
"sigslot_repeater.h",
"socket.cc",
@@ -942,7 +968,7 @@ rtc_library("rtc_base") {
}
if (is_ios) {
- libs += [
+ frameworks = [
"CFNetwork.framework",
"Foundation.framework",
"Security.framework",
@@ -1000,8 +1026,8 @@ rtc_library("gunit_helpers") {
":rtc_base_tests_utils",
":stringutils",
"../test:test_support",
- "//third_party/abseil-cpp/absl/strings",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
}
rtc_library("testclient") {
@@ -1017,6 +1043,7 @@ rtc_library("testclient") {
":rtc_base",
":rtc_base_tests_utils",
":timeutils",
+ "synchronization:mutex",
]
}
@@ -1065,7 +1092,10 @@ rtc_library("rtc_base_tests_utils") {
"../api/units:time_delta",
"../api/units:timestamp",
"memory:fifo_buffer",
+ "synchronization:mutex",
"third_party/sigslot",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/memory",
]
@@ -1087,8 +1117,8 @@ rtc_library("task_queue_for_test") {
"../api/task_queue",
"../api/task_queue:default_task_queue_factory",
"task_utils:to_queued_task",
- "//third_party/abseil-cpp/absl/strings",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
}
if (rtc_include_tests) {
@@ -1100,6 +1130,7 @@ if (rtc_include_tests) {
":rtc_base",
":rtc_base_tests_utils",
"../test:test_support",
+ "synchronization:mutex",
"third_party/sigslot",
]
}
@@ -1128,8 +1159,8 @@ if (rtc_include_tests) {
"../test:test_support",
"third_party/sigslot",
"//testing/gtest",
- "//third_party/abseil-cpp/absl/memory",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/memory" ]
if (is_win) {
sources += [ "win32_socket_server_unittest.cc" ]
}
@@ -1149,7 +1180,7 @@ if (rtc_include_tests) {
"byte_order_unittest.cc",
"checks_unittest.cc",
"copy_on_write_buffer_unittest.cc",
- "critical_section_unittest.cc",
+ "deprecated/recursive_critical_section_unittest.cc",
"event_tracer_unittest.cc",
"event_unittest.cc",
"logging_unittest.cc",
@@ -1208,9 +1239,12 @@ if (rtc_include_tests) {
"../test:test_main",
"../test:test_support",
"memory:unittests",
+ "synchronization:mutex",
"task_utils:to_queued_task",
"third_party/base64",
"third_party/sigslot",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/memory",
]
@@ -1228,8 +1262,8 @@ if (rtc_include_tests) {
":task_queue_for_test",
"../test:test_main",
"../test:test_support",
- "//third_party/abseil-cpp/absl/memory",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/memory" ]
}
rtc_library("rtc_operations_chain_unittests") {
@@ -1279,8 +1313,8 @@ if (rtc_include_tests) {
":rtc_numerics",
"../test:test_main",
"../test:test_support",
- "//third_party/abseil-cpp/absl/algorithm:container",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container" ]
}
rtc_library("rtc_json_unittests") {
@@ -1304,6 +1338,7 @@ if (rtc_include_tests) {
"callback_unittest.cc",
"crc32_unittest.cc",
"data_rate_limiter_unittest.cc",
+ "deprecated/signal_thread_unittest.cc",
"fake_clock_unittest.cc",
"helpers_unittest.cc",
"ip_address_unittest.cc",
@@ -1316,7 +1351,6 @@ if (rtc_include_tests) {
"rolling_accumulator_unittest.cc",
"rtc_certificate_generator_unittest.cc",
"rtc_certificate_unittest.cc",
- "signal_thread_unittest.cc",
"sigslot_tester_unittest.cc",
"test_client_unittest.cc",
"thread_unittest.cc",
@@ -1353,9 +1387,12 @@ if (rtc_include_tests) {
"../test:test_main",
"../test:test_support",
"memory:fifo_buffer",
+ "synchronization:mutex",
"synchronization:synchronization_unittests",
"task_utils:to_queued_task",
"third_party/sigslot",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
diff --git a/rtc_base/DEPS b/rtc_base/DEPS
index 679d06dfc8..c9f7dc5898 100644
--- a/rtc_base/DEPS
+++ b/rtc_base/DEPS
@@ -1,8 +1,8 @@
include_rules = [
"+base/third_party/libevent",
"+json",
- "+third_party/jsoncpp",
"+system_wrappers",
+ "+third_party/jsoncpp",
]
specific_include_rules = {
diff --git a/rtc_base/async_invoker.cc b/rtc_base/async_invoker.cc
index 26f8c523ab..8b410a4561 100644
--- a/rtc_base/async_invoker.cc
+++ b/rtc_base/async_invoker.cc
@@ -101,28 +101,6 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
new ScopedMessageData<AsyncClosure>(std::move(closure)));
}
-GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) {
- thread_->SignalQueueDestroyed.connect(this,
- &GuardedAsyncInvoker::ThreadDestroyed);
-}
-
-GuardedAsyncInvoker::~GuardedAsyncInvoker() {}
-
-bool GuardedAsyncInvoker::Flush(uint32_t id) {
- CritScope cs(&crit_);
- if (thread_ == nullptr)
- return false;
- invoker_.Flush(thread_, id);
- return true;
-}
-
-void GuardedAsyncInvoker::ThreadDestroyed() {
- CritScope cs(&crit_);
- // We should never get more than one notification about the thread dying.
- RTC_DCHECK(thread_ != nullptr);
- thread_ = nullptr;
-}
-
AsyncClosure::AsyncClosure(AsyncInvoker* invoker)
: invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) {
invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed);
diff --git a/rtc_base/async_invoker.h b/rtc_base/async_invoker.h
index f15955d811..ed2df1cdcb 100644
--- a/rtc_base/async_invoker.h
+++ b/rtc_base/async_invoker.h
@@ -169,97 +169,6 @@ class AsyncInvoker : public MessageHandler {
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker);
};
-// Similar to AsyncInvoker, but guards against the Thread being destroyed while
-// there are outstanding dangling pointers to it. It will connect to the current
-// thread in the constructor, and will get notified when that thread is
-// destroyed. After GuardedAsyncInvoker is constructed, it can be used from
-// other threads to post functors to the thread it was constructed on. If that
-// thread dies, any further calls to AsyncInvoke() will be safely ignored.
-class GuardedAsyncInvoker : public sigslot::has_slots<> {
- public:
- GuardedAsyncInvoker();
- ~GuardedAsyncInvoker() override;
-
- // Synchronously execute all outstanding calls we own, and wait for calls to
- // complete before returning. Optionally filter by message id. The destructor
- // will not wait for outstanding calls, so if that behavior is desired, call
- // Flush() first. Returns false if the thread has died.
- bool Flush(uint32_t id = MQID_ANY);
-
- // Call |functor| asynchronously with no callback upon completion. Returns
- // immediately. Returns false if the thread has died.
- template <class ReturnT, class FunctorT>
- bool AsyncInvoke(const Location& posted_from,
- FunctorT&& functor,
- uint32_t id = 0) {
- CritScope cs(&crit_);
- if (thread_ == nullptr)
- return false;
- invoker_.AsyncInvoke<ReturnT, FunctorT>(
- posted_from, thread_, std::forward<FunctorT>(functor), id);
- return true;
- }
-
- // Call |functor| asynchronously with |delay_ms|, with no callback upon
- // completion. Returns immediately. Returns false if the thread has died.
- template <class ReturnT, class FunctorT>
- bool AsyncInvokeDelayed(const Location& posted_from,
- FunctorT&& functor,
- uint32_t delay_ms,
- uint32_t id = 0) {
- CritScope cs(&crit_);
- if (thread_ == nullptr)
- return false;
- invoker_.AsyncInvokeDelayed<ReturnT, FunctorT>(
- posted_from, thread_, std::forward<FunctorT>(functor), delay_ms, id);
- return true;
- }
-
- // Call |functor| asynchronously, calling |callback| when done. Returns false
- // if the thread has died.
- template <class ReturnT, class FunctorT, class HostT>
- bool AsyncInvoke(const Location& posted_from,
- const Location& callback_posted_from,
- FunctorT&& functor,
- void (HostT::*callback)(ReturnT),
- HostT* callback_host,
- uint32_t id = 0) {
- CritScope cs(&crit_);
- if (thread_ == nullptr)
- return false;
- invoker_.AsyncInvoke<ReturnT, FunctorT, HostT>(
- posted_from, callback_posted_from, thread_,
- std::forward<FunctorT>(functor), callback, callback_host, id);
- return true;
- }
-
- // Call |functor| asynchronously calling |callback| when done. Overloaded for
- // void return. Returns false if the thread has died.
- template <class ReturnT, class FunctorT, class HostT>
- bool AsyncInvoke(const Location& posted_from,
- const Location& callback_posted_from,
- FunctorT&& functor,
- void (HostT::*callback)(),
- HostT* callback_host,
- uint32_t id = 0) {
- CritScope cs(&crit_);
- if (thread_ == nullptr)
- return false;
- invoker_.AsyncInvoke<ReturnT, FunctorT, HostT>(
- posted_from, callback_posted_from, thread_,
- std::forward<FunctorT>(functor), callback, callback_host, id);
- return true;
- }
-
- private:
- // Callback when |thread_| is destroyed.
- void ThreadDestroyed();
-
- CriticalSection crit_;
- Thread* thread_ RTC_GUARDED_BY(crit_);
- AsyncInvoker invoker_ RTC_GUARDED_BY(crit_);
-};
-
} // namespace rtc
#endif // RTC_BASE_ASYNC_INVOKER_H_
diff --git a/rtc_base/async_invoker_inl.h b/rtc_base/async_invoker_inl.h
index bd9b0d1aa1..6307afe220 100644
--- a/rtc_base/async_invoker_inl.h
+++ b/rtc_base/async_invoker_inl.h
@@ -13,7 +13,6 @@
#include "api/scoped_refptr.h"
#include "rtc_base/bind.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/event.h"
#include "rtc_base/message_handler.h"
#include "rtc_base/ref_counted_object.h"
diff --git a/rtc_base/bit_buffer.cc b/rtc_base/bit_buffer.cc
index a6dc1c7ab8..540141fe52 100644
--- a/rtc_base/bit_buffer.cc
+++ b/rtc_base/bit_buffer.cc
@@ -162,6 +162,12 @@ bool BitBuffer::ConsumeBits(size_t bit_count) {
bool BitBuffer::ReadNonSymmetric(uint32_t* val, uint32_t num_values) {
RTC_DCHECK_GT(num_values, 0);
RTC_DCHECK_LE(num_values, uint32_t{1} << 31);
+ if (num_values == 1) {
+ // When there is only one possible value, it requires zero bits to store it.
+ // But ReadBits doesn't support reading zero bits.
+ *val = 0;
+ return true;
+ }
size_t count_bits = CountBits(num_values);
uint32_t num_min_bits_values = (uint32_t{1} << count_bits) - num_values;
@@ -308,6 +314,11 @@ bool BitBufferWriter::WriteBits(uint64_t val, size_t bit_count) {
bool BitBufferWriter::WriteNonSymmetric(uint32_t val, uint32_t num_values) {
RTC_DCHECK_LT(val, num_values);
RTC_DCHECK_LE(num_values, uint32_t{1} << 31);
+ if (num_values == 1) {
+ // When there is only one possible value, it requires zero bits to store it.
+ // But WriteBits doesn't support writing zero bits.
+ return true;
+ }
size_t count_bits = CountBits(num_values);
uint32_t num_min_bits_values = (uint32_t{1} << count_bits) - num_values;
diff --git a/rtc_base/bit_buffer_unittest.cc b/rtc_base/bit_buffer_unittest.cc
index b3521b4951..656682c2ef 100644
--- a/rtc_base/bit_buffer_unittest.cc
+++ b/rtc_base/bit_buffer_unittest.cc
@@ -142,7 +142,7 @@ TEST(BitBufferTest, ReadBits) {
EXPECT_FALSE(buffer.ReadBits(&val, 1));
}
-TEST(BitBufferTest, SetOffsetValues) {
+TEST(BitBufferDeathTest, SetOffsetValues) {
uint8_t bytes[4] = {0};
BitBufferWriter buffer(bytes, 4);
@@ -254,6 +254,28 @@ TEST(BitBufferWriterTest, NonSymmetricReadsMatchesWrites) {
EXPECT_THAT(values, ElementsAre(0, 1, 2, 3, 4, 5));
}
+TEST(BitBufferTest, ReadNonSymmetricOnlyValueConsumesNoBits) {
+ const uint8_t bytes[2] = {};
+ BitBuffer reader(bytes, 2);
+ uint32_t value = 0xFFFFFFFF;
+ ASSERT_EQ(reader.RemainingBitCount(), 16u);
+
+ EXPECT_TRUE(reader.ReadNonSymmetric(&value, /*num_values=*/1));
+
+ EXPECT_EQ(value, 0u);
+ EXPECT_EQ(reader.RemainingBitCount(), 16u);
+}
+
+TEST(BitBufferWriterTest, WriteNonSymmetricOnlyValueConsumesNoBits) {
+ uint8_t bytes[2] = {};
+ BitBufferWriter writer(bytes, 2);
+ ASSERT_EQ(writer.RemainingBitCount(), 16u);
+
+ EXPECT_TRUE(writer.WriteNonSymmetric(0, /*num_values=*/1));
+
+ EXPECT_EQ(writer.RemainingBitCount(), 16u);
+}
+
uint64_t GolombEncoded(uint32_t val) {
val++;
uint32_t bit_counter = val;
diff --git a/rtc_base/buffer.h b/rtc_base/buffer.h
index 3048b9179f..d1639e2f71 100644
--- a/rtc_base/buffer.h
+++ b/rtc_base/buffer.h
@@ -370,7 +370,9 @@ class BufferT {
: capacity;
std::unique_ptr<T[]> new_data(new T[new_capacity]);
- std::memcpy(new_data.get(), data_.get(), size_ * sizeof(T));
+ if (data_ != nullptr) {
+ std::memcpy(new_data.get(), data_.get(), size_ * sizeof(T));
+ }
MaybeZeroCompleteBuffer();
data_ = std::move(new_data);
capacity_ = new_capacity;
diff --git a/rtc_base/buffer_queue.cc b/rtc_base/buffer_queue.cc
index 445045ceea..adad9dda17 100644
--- a/rtc_base/buffer_queue.cc
+++ b/rtc_base/buffer_queue.cc
@@ -21,7 +21,7 @@ BufferQueue::BufferQueue(size_t capacity, size_t default_size)
: capacity_(capacity), default_size_(default_size) {}
BufferQueue::~BufferQueue() {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
for (Buffer* buffer : queue_) {
delete buffer;
@@ -32,12 +32,12 @@ BufferQueue::~BufferQueue() {
}
size_t BufferQueue::size() const {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
return queue_.size();
}
void BufferQueue::Clear() {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
while (!queue_.empty()) {
free_list_.push_back(queue_.front());
queue_.pop_front();
@@ -45,7 +45,7 @@ void BufferQueue::Clear() {
}
bool BufferQueue::ReadFront(void* buffer, size_t bytes, size_t* bytes_read) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
if (queue_.empty()) {
return false;
}
@@ -69,7 +69,7 @@ bool BufferQueue::ReadFront(void* buffer, size_t bytes, size_t* bytes_read) {
bool BufferQueue::WriteBack(const void* buffer,
size_t bytes,
size_t* bytes_written) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
if (queue_.size() == capacity_) {
return false;
}
diff --git a/rtc_base/buffer_queue.h b/rtc_base/buffer_queue.h
index 5cb18d0220..29d1a5b136 100644
--- a/rtc_base/buffer_queue.h
+++ b/rtc_base/buffer_queue.h
@@ -18,7 +18,7 @@
#include "rtc_base/buffer.h"
#include "rtc_base/constructor_magic.h"
-#include "rtc_base/critical_section.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
namespace rtc {
@@ -52,9 +52,9 @@ class BufferQueue {
private:
size_t capacity_;
size_t default_size_;
- CriticalSection crit_;
- std::deque<Buffer*> queue_ RTC_GUARDED_BY(crit_);
- std::vector<Buffer*> free_list_ RTC_GUARDED_BY(crit_);
+ mutable webrtc::Mutex mutex_;
+ std::deque<Buffer*> queue_ RTC_GUARDED_BY(mutex_);
+ std::vector<Buffer*> free_list_ RTC_GUARDED_BY(mutex_);
RTC_DISALLOW_COPY_AND_ASSIGN(BufferQueue);
};
diff --git a/rtc_base/buffer_unittest.cc b/rtc_base/buffer_unittest.cc
index 3e7396dd2c..8beae43cf9 100644
--- a/rtc_base/buffer_unittest.cc
+++ b/rtc_base/buffer_unittest.cc
@@ -447,7 +447,7 @@ TEST(BufferTest, TestStruct) {
EXPECT_EQ(kObsidian, buf[2].stone);
}
-TEST(BufferTest, DieOnUseAfterMove) {
+TEST(BufferDeathTest, DieOnUseAfterMove) {
Buffer buf(17);
Buffer buf2 = std::move(buf);
EXPECT_EQ(buf2.size(), 17u);
diff --git a/rtc_base/checks.h b/rtc_base/checks.h
index 2fde3f6640..61c074ac82 100644
--- a/rtc_base/checks.h
+++ b/rtc_base/checks.h
@@ -69,7 +69,7 @@ RTC_NORETURN void rtc_FatalMessage(const char* file, int line, const char* msg);
// the reason that it's better to terminate might simply be that the error
// handling code isn't in place yet; in production, the reason might be that
// the author of the code truly believes that x will always be true, but that
-// she recognizes that if she is wrong, abrupt and unpleasant process
+// they recognizes that if they are wrong, abrupt and unpleasant process
// termination is still better than carrying on with the assumption violated.
//
// RTC_CHECK always evaluates its argument, so it's OK for x to have side
diff --git a/rtc_base/checks_unittest.cc b/rtc_base/checks_unittest.cc
index e6e094e597..91e04cf6a1 100644
--- a/rtc_base/checks_unittest.cc
+++ b/rtc_base/checks_unittest.cc
@@ -19,7 +19,7 @@ TEST(ChecksTest, ExpressionNotEvaluatedWhenCheckPassing) {
}
#if GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
-TEST(ChecksTest, Checks) {
+TEST(ChecksDeathTest, Checks) {
#if RTC_CHECK_MSG_ENABLED
EXPECT_DEATH(FATAL() << "message",
"\n\n#\n"
diff --git a/rtc_base/critical_section.cc b/rtc_base/deprecated/recursive_critical_section.cc
index 1969edefa5..068b9aa808 100644
--- a/rtc_base/critical_section.cc
+++ b/rtc_base/deprecated/recursive_critical_section.cc
@@ -8,17 +8,16 @@
* be found in the AUTHORS file in the root of the source tree.
*/
-#include "rtc_base/critical_section.h"
+#include "rtc_base/deprecated/recursive_critical_section.h"
#include <time.h>
#include "rtc_base/atomic_ops.h"
#include "rtc_base/checks.h"
#include "rtc_base/platform_thread_types.h"
+#include "rtc_base/synchronization/yield.h"
#include "rtc_base/system/unused.h"
-// TODO(tommi): Split this file up to per-platform implementation files.
-
#if RTC_DCHECK_IS_ON
#define RTC_CS_DEBUG_CODE(x) x
#else // !RTC_DCHECK_IS_ON
@@ -27,7 +26,7 @@
namespace rtc {
-CriticalSection::CriticalSection() {
+RecursiveCriticalSection::RecursiveCriticalSection() {
#if defined(WEBRTC_WIN)
InitializeCriticalSection(&crit_);
#elif defined(WEBRTC_POSIX)
@@ -42,7 +41,7 @@ CriticalSection::CriticalSection() {
pthread_mutexattr_settype(&mutex_attribute, PTHREAD_MUTEX_RECURSIVE);
#if defined(WEBRTC_MAC)
pthread_mutexattr_setpolicy_np(&mutex_attribute,
- _PTHREAD_MUTEX_POLICY_FAIRSHARE);
+ _PTHREAD_MUTEX_POLICY_FIRSTFIT);
#endif
pthread_mutex_init(&mutex_, &mutex_attribute);
pthread_mutexattr_destroy(&mutex_attribute);
@@ -56,7 +55,7 @@ CriticalSection::CriticalSection() {
#endif
}
-CriticalSection::~CriticalSection() {
+RecursiveCriticalSection::~RecursiveCriticalSection() {
#if defined(WEBRTC_WIN)
DeleteCriticalSection(&crit_);
#elif defined(WEBRTC_POSIX)
@@ -70,7 +69,7 @@ CriticalSection::~CriticalSection() {
#endif
}
-void CriticalSection::Enter() const RTC_EXCLUSIVE_LOCK_FUNCTION() {
+void RecursiveCriticalSection::Enter() const RTC_EXCLUSIVE_LOCK_FUNCTION() {
#if defined(WEBRTC_WIN)
EnterCriticalSection(&crit_);
#elif defined(WEBRTC_POSIX)
@@ -129,7 +128,8 @@ void CriticalSection::Enter() const RTC_EXCLUSIVE_LOCK_FUNCTION() {
#endif
}
-bool CriticalSection::TryEnter() const RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) {
+bool RecursiveCriticalSection::TryEnter() const
+ RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) {
#if defined(WEBRTC_WIN)
return TryEnterCriticalSection(&crit_) != FALSE;
#elif defined(WEBRTC_POSIX)
@@ -162,7 +162,7 @@ bool CriticalSection::TryEnter() const RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) {
#endif
}
-void CriticalSection::Leave() const RTC_UNLOCK_FUNCTION() {
+void RecursiveCriticalSection::Leave() const RTC_UNLOCK_FUNCTION() {
RTC_DCHECK(CurrentThreadIsOwner());
#if defined(WEBRTC_WIN)
LeaveCriticalSection(&crit_);
@@ -190,7 +190,7 @@ void CriticalSection::Leave() const RTC_UNLOCK_FUNCTION() {
#endif
}
-bool CriticalSection::CurrentThreadIsOwner() const {
+bool RecursiveCriticalSection::CurrentThreadIsOwner() const {
#if defined(WEBRTC_WIN)
// OwningThread has type HANDLE but actually contains the Thread ID:
// http://stackoverflow.com/questions/12675301/why-is-the-owningthread-member-of-critical-section-of-type-handle-when-it-is-de
@@ -209,41 +209,11 @@ bool CriticalSection::CurrentThreadIsOwner() const {
#endif
}
-CritScope::CritScope(const CriticalSection* cs) : cs_(cs) {
+CritScope::CritScope(const RecursiveCriticalSection* cs) : cs_(cs) {
cs_->Enter();
}
CritScope::~CritScope() {
cs_->Leave();
}
-void GlobalLock::Lock() {
-#if !defined(WEBRTC_WIN) && \
- (!defined(WEBRTC_MAC) || RTC_USE_NATIVE_MUTEX_ON_MAC)
- const struct timespec ts_null = {0};
-#endif
-
- while (AtomicOps::CompareAndSwap(&lock_acquired_, 0, 1)) {
-#if defined(WEBRTC_WIN)
- ::Sleep(0);
-#elif defined(WEBRTC_MAC) && !RTC_USE_NATIVE_MUTEX_ON_MAC
- sched_yield();
-#else
- nanosleep(&ts_null, nullptr);
-#endif
- }
-}
-
-void GlobalLock::Unlock() {
- int old_value = AtomicOps::CompareAndSwap(&lock_acquired_, 1, 0);
- RTC_DCHECK_EQ(1, old_value) << "Unlock called without calling Lock first";
-}
-
-GlobalLockScope::GlobalLockScope(GlobalLock* lock) : lock_(lock) {
- lock_->Lock();
-}
-
-GlobalLockScope::~GlobalLockScope() {
- lock_->Unlock();
-}
-
} // namespace rtc
diff --git a/rtc_base/critical_section.h b/rtc_base/deprecated/recursive_critical_section.h
index cf10463bdf..c044c732b9 100644
--- a/rtc_base/critical_section.h
+++ b/rtc_base/deprecated/recursive_critical_section.h
@@ -8,13 +8,11 @@
* be found in the AUTHORS file in the root of the source tree.
*/
-#ifndef RTC_BASE_CRITICAL_SECTION_H_
-#define RTC_BASE_CRITICAL_SECTION_H_
+#ifndef RTC_BASE_DEPRECATED_RECURSIVE_CRITICAL_SECTION_H_
+#define RTC_BASE_DEPRECATED_RECURSIVE_CRITICAL_SECTION_H_
-#include "rtc_base/checks.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/platform_thread_types.h"
-#include "rtc_base/system/rtc_export.h"
#include "rtc_base/thread_annotations.h"
#if defined(WEBRTC_WIN)
@@ -43,13 +41,18 @@
namespace rtc {
+// NOTE: This class is deprecated. Please use webrtc::Mutex instead!
+// Search using https://www.google.com/?q=recursive+lock+considered+harmful
+// to find the reasons.
+//
// Locking methods (Enter, TryEnter, Leave)are const to permit protecting
-// members inside a const context without requiring mutable CriticalSections
-// everywhere. CriticalSection is reentrant lock.
-class RTC_LOCKABLE RTC_EXPORT CriticalSection {
+// members inside a const context without requiring mutable
+// RecursiveCriticalSections everywhere. RecursiveCriticalSection is
+// reentrant lock.
+class RTC_LOCKABLE RecursiveCriticalSection {
public:
- CriticalSection();
- ~CriticalSection();
+ RecursiveCriticalSection();
+ ~RecursiveCriticalSection();
void Enter() const RTC_EXCLUSIVE_LOCK_FUNCTION();
bool TryEnter() const RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true);
@@ -87,37 +90,15 @@ class RTC_LOCKABLE RTC_EXPORT CriticalSection {
// CritScope, for serializing execution through a scope.
class RTC_SCOPED_LOCKABLE CritScope {
public:
- explicit CritScope(const CriticalSection* cs) RTC_EXCLUSIVE_LOCK_FUNCTION(cs);
+ explicit CritScope(const RecursiveCriticalSection* cs)
+ RTC_EXCLUSIVE_LOCK_FUNCTION(cs);
~CritScope() RTC_UNLOCK_FUNCTION();
private:
- const CriticalSection* const cs_;
+ const RecursiveCriticalSection* const cs_;
RTC_DISALLOW_COPY_AND_ASSIGN(CritScope);
};
-// A lock used to protect global variables. Do NOT use for other purposes.
-class RTC_LOCKABLE GlobalLock {
- public:
- constexpr GlobalLock() : lock_acquired_(0) {}
-
- void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION();
- void Unlock() RTC_UNLOCK_FUNCTION();
-
- private:
- volatile int lock_acquired_;
-};
-
-// GlobalLockScope, for serializing execution through a scope.
-class RTC_SCOPED_LOCKABLE GlobalLockScope {
- public:
- explicit GlobalLockScope(GlobalLock* lock) RTC_EXCLUSIVE_LOCK_FUNCTION(lock);
- ~GlobalLockScope() RTC_UNLOCK_FUNCTION();
-
- private:
- GlobalLock* const lock_;
- RTC_DISALLOW_COPY_AND_ASSIGN(GlobalLockScope);
-};
-
} // namespace rtc
-#endif // RTC_BASE_CRITICAL_SECTION_H_
+#endif // RTC_BASE_DEPRECATED_RECURSIVE_CRITICAL_SECTION_H_
diff --git a/rtc_base/critical_section_unittest.cc b/rtc_base/deprecated/recursive_critical_section_unittest.cc
index 16aefd2740..22c2655b3d 100644
--- a/rtc_base/critical_section_unittest.cc
+++ b/rtc_base/deprecated/recursive_critical_section_unittest.cc
@@ -8,7 +8,7 @@
* be found in the AUTHORS file in the root of the source tree.
*/
-#include "rtc_base/critical_section.h"
+#include "rtc_base/deprecated/recursive_critical_section.h"
#include <stddef.h>
#include <stdint.h>
@@ -124,7 +124,7 @@ class RTC_LOCKABLE CriticalSectionLock {
void Unlock() RTC_UNLOCK_FUNCTION() { cs_.Leave(); }
private:
- CriticalSection cs_;
+ RecursiveCriticalSection cs_;
};
template <class Lock>
@@ -183,7 +183,7 @@ class AtomicOpRunner : public RunnerBase {
}
private:
- CriticalSection all_values_crit_;
+ RecursiveCriticalSection all_values_crit_;
Verifier verifier_;
};
@@ -282,26 +282,7 @@ TEST(AtomicOpsTest, CompareAndSwap) {
EXPECT_EQ(1, runner.shared_value());
}
-TEST(GlobalLockTest, CanHaveStaticStorageDuration) {
- static_assert(std::is_trivially_destructible<GlobalLock>::value, "");
- ABSL_CONST_INIT static GlobalLock global_lock;
- global_lock.Lock();
- global_lock.Unlock();
-}
-
-TEST(GlobalLockTest, Basic) {
- // Create and start lots of threads.
- LockRunner<GlobalLock> runner;
- std::vector<std::unique_ptr<Thread>> threads;
- StartThreads(&threads, &runner);
- runner.SetExpectedThreadCount(kNumThreads);
-
- // Release the hounds!
- EXPECT_TRUE(runner.Run());
- EXPECT_EQ(0, runner.shared_value());
-}
-
-TEST(CriticalSectionTest, Basic) {
+TEST(RecursiveCriticalSectionTest, Basic) {
// Create and start lots of threads.
LockRunner<CriticalSectionLock> runner;
std::vector<std::unique_ptr<Thread>> threads;
@@ -339,7 +320,7 @@ class PerfTestData {
private:
uint8_t cache_line_barrier_1_[64];
- CriticalSection lock_;
+ RecursiveCriticalSection lock_;
uint8_t cache_line_barrier_2_[64];
int64_t my_counter_ = 0;
const int expected_count_;
@@ -391,7 +372,7 @@ class PerfTestThread {
// user 1m20.575s
// sys 3m48.872s
// Unit test output:
-// [ OK ] CriticalSectionTest.Performance (294375 ms)
+// [ OK ] RecursiveCriticalSectionTest.Performance (294375 ms)
//
// Native mutex implementation using first fit policy (current macOS default):
// Approximate CPU usage:
@@ -399,7 +380,7 @@ class PerfTestThread {
// user 0m12.738s
// sys 0m31.207s
// Unit test output:
-// [ OK ] CriticalSectionTest.Performance (11444 ms)
+// [ OK ] RecursiveCriticalSectionTest.Performance (11444 ms)
//
// Special partially spin lock based implementation:
// Approximate CPU usage:
@@ -407,10 +388,10 @@ class PerfTestThread {
// user 0m3.014s
// sys 0m4.495s
// Unit test output:
-// [ OK ] CriticalSectionTest.Performance (1885 ms)
+// [ OK ] RecursiveCriticalSectionTest.Performance (1885 ms)
//
// The test is disabled by default to avoid unecessarily loading the bots.
-TEST(CriticalSectionTest, DISABLED_Performance) {
+TEST(RecursiveCriticalSectionTest, DISABLED_Performance) {
PerfTestThread threads[8];
Event event;
diff --git a/rtc_base/signal_thread.cc b/rtc_base/deprecated/signal_thread.cc
index e100fbe179..96bdd65155 100644
--- a/rtc_base/signal_thread.cc
+++ b/rtc_base/deprecated/signal_thread.cc
@@ -8,7 +8,7 @@
* be found in the AUTHORS file in the root of the source tree.
*/
-#include "rtc_base/signal_thread.h"
+#include "rtc_base/deprecated/signal_thread.h"
#include <memory>
@@ -23,26 +23,30 @@ namespace rtc {
// SignalThread
///////////////////////////////////////////////////////////////////////////////
-SignalThread::SignalThread()
+DEPRECATED_SignalThread::DEPRECATED_SignalThread()
: main_(Thread::Current()), worker_(this), state_(kInit), refcount_(1) {
- main_->SignalQueueDestroyed.connect(this,
- &SignalThread::OnMainThreadDestroyed);
+ main_->SignalQueueDestroyed.connect(
+ this, &DEPRECATED_SignalThread::OnMainThreadDestroyed);
worker_.SetName("SignalThread", this);
}
-SignalThread::~SignalThread() {
+DEPRECATED_SignalThread::~DEPRECATED_SignalThread() {
+ rtc::CritScope lock(&cs_);
RTC_DCHECK(refcount_ == 0);
}
-bool SignalThread::SetName(const std::string& name, const void* obj) {
+bool DEPRECATED_SignalThread::SetName(const std::string& name,
+ const void* obj) {
EnterExit ee(this);
+ RTC_DCHECK(!destroy_called_);
RTC_DCHECK(main_->IsCurrent());
RTC_DCHECK(kInit == state_);
return worker_.SetName(name, obj);
}
-void SignalThread::Start() {
+void DEPRECATED_SignalThread::Start() {
EnterExit ee(this);
+ RTC_DCHECK(!destroy_called_);
RTC_DCHECK(main_->IsCurrent());
if (kInit == state_ || kComplete == state_) {
state_ = kRunning;
@@ -53,9 +57,13 @@ void SignalThread::Start() {
}
}
-void SignalThread::Destroy(bool wait) {
+void DEPRECATED_SignalThread::Destroy(bool wait) {
EnterExit ee(this);
- RTC_DCHECK(main_->IsCurrent());
+ // Sometimes the caller can't guarantee which thread will call Destroy, only
+ // that it will be the last thing it does.
+ // RTC_DCHECK(main_->IsCurrent());
+ RTC_DCHECK(!destroy_called_);
+ destroy_called_ = true;
if ((kInit == state_) || (kComplete == state_)) {
refcount_--;
} else if (kRunning == state_ || kReleasing == state_) {
@@ -76,8 +84,9 @@ void SignalThread::Destroy(bool wait) {
}
}
-void SignalThread::Release() {
+void DEPRECATED_SignalThread::Release() {
EnterExit ee(this);
+ RTC_DCHECK(!destroy_called_);
RTC_DCHECK(main_->IsCurrent());
if (kComplete == state_) {
refcount_--;
@@ -89,13 +98,14 @@ void SignalThread::Release() {
}
}
-bool SignalThread::ContinueWork() {
+bool DEPRECATED_SignalThread::ContinueWork() {
EnterExit ee(this);
+ RTC_DCHECK(!destroy_called_);
RTC_DCHECK(worker_.IsCurrent());
return worker_.ProcessMessages(0);
}
-void SignalThread::OnMessage(Message* msg) {
+void DEPRECATED_SignalThread::OnMessage(Message* msg) {
EnterExit ee(this);
if (ST_MSG_WORKER_DONE == msg->message_id) {
RTC_DCHECK(main_->IsCurrent());
@@ -126,21 +136,21 @@ void SignalThread::OnMessage(Message* msg) {
}
}
-SignalThread::Worker::Worker(SignalThread* parent)
+DEPRECATED_SignalThread::Worker::Worker(DEPRECATED_SignalThread* parent)
: Thread(std::make_unique<NullSocketServer>(), /*do_init=*/false),
parent_(parent) {
DoInit();
}
-SignalThread::Worker::~Worker() {
+DEPRECATED_SignalThread::Worker::~Worker() {
Stop();
}
-void SignalThread::Worker::Run() {
+void DEPRECATED_SignalThread::Worker::Run() {
parent_->Run();
}
-void SignalThread::Run() {
+void DEPRECATED_SignalThread::Run() {
DoWork();
{
EnterExit ee(this);
@@ -150,12 +160,12 @@ void SignalThread::Run() {
}
}
-void SignalThread::OnMainThreadDestroyed() {
+void DEPRECATED_SignalThread::OnMainThreadDestroyed() {
EnterExit ee(this);
main_ = nullptr;
}
-bool SignalThread::Worker::IsProcessingMessagesForTesting() {
+bool DEPRECATED_SignalThread::Worker::IsProcessingMessagesForTesting() {
return false;
}
diff --git a/rtc_base/deprecated/signal_thread.h b/rtc_base/deprecated/signal_thread.h
new file mode 100644
index 0000000000..3612f5a1ca
--- /dev/null
+++ b/rtc_base/deprecated/signal_thread.h
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2004 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_DEPRECATED_SIGNAL_THREAD_H_
+#define RTC_BASE_DEPRECATED_SIGNAL_THREAD_H_
+
+#include <string>
+
+#include "rtc_base/checks.h"
+#include "rtc_base/constructor_magic.h"
+#include "rtc_base/deprecated/recursive_critical_section.h"
+#include "rtc_base/deprecation.h"
+#include "rtc_base/message_handler.h"
+#include "rtc_base/third_party/sigslot/sigslot.h"
+#include "rtc_base/thread.h"
+#include "rtc_base/thread_annotations.h"
+
+namespace rtc {
+
+///////////////////////////////////////////////////////////////////////////////
+// NOTE: this class has been deprecated. Do not use for new code. New code
+// should use factilities exposed by api/task_queue/ instead.
+//
+// SignalThread - Base class for worker threads. The main thread should call
+// Start() to begin work, and then follow one of these models:
+// Normal: Wait for SignalWorkDone, and then call Release to destroy.
+// Cancellation: Call Release(true), to abort the worker thread.
+// Fire-and-forget: Call Release(false), which allows the thread to run to
+// completion, and then self-destruct without further notification.
+// Periodic tasks: Wait for SignalWorkDone, then eventually call Start()
+// again to repeat the task. When the instance isn't needed anymore,
+// call Release. DoWork, OnWorkStart and OnWorkStop are called again,
+// on a new thread.
+// The subclass should override DoWork() to perform the background task. By
+// periodically calling ContinueWork(), it can check for cancellation.
+// OnWorkStart and OnWorkDone can be overridden to do pre- or post-work
+// tasks in the context of the main thread.
+///////////////////////////////////////////////////////////////////////////////
+
+class DEPRECATED_SignalThread : public sigslot::has_slots<>,
+ protected MessageHandler {
+ public:
+ DEPRECATED_SignalThread();
+
+ // Context: Main Thread. Call before Start to change the worker's name.
+ bool SetName(const std::string& name, const void* obj);
+
+ // Context: Main Thread. Call to begin the worker thread.
+ void Start();
+
+ // Context: Main Thread. If the worker thread is not running, deletes the
+ // object immediately. Otherwise, asks the worker thread to abort processing,
+ // and schedules the object to be deleted once the worker exits.
+ // SignalWorkDone will not be signalled. If wait is true, does not return
+ // until the thread is deleted.
+ void Destroy(bool wait);
+
+ // Context: Main Thread. If the worker thread is complete, deletes the
+ // object immediately. Otherwise, schedules the object to be deleted once
+ // the worker thread completes. SignalWorkDone will be signalled.
+ void Release();
+
+ // Context: Main Thread. Signalled when work is complete.
+ sigslot::signal1<DEPRECATED_SignalThread*> SignalWorkDone;
+
+ enum { ST_MSG_WORKER_DONE, ST_MSG_FIRST_AVAILABLE };
+
+ protected:
+ ~DEPRECATED_SignalThread() override;
+
+ Thread* worker() { return &worker_; }
+
+ // Context: Main Thread. Subclass should override to do pre-work setup.
+ virtual void OnWorkStart() {}
+
+ // Context: Worker Thread. Subclass should override to do work.
+ virtual void DoWork() = 0;
+
+ // Context: Worker Thread. Subclass should call periodically to
+ // dispatch messages and determine if the thread should terminate.
+ bool ContinueWork();
+
+ // Context: Worker Thread. Subclass should override when extra work is
+ // needed to abort the worker thread.
+ virtual void OnWorkStop() {}
+
+ // Context: Main Thread. Subclass should override to do post-work cleanup.
+ virtual void OnWorkDone() {}
+
+ // Context: Any Thread. If subclass overrides, be sure to call the base
+ // implementation. Do not use (message_id < ST_MSG_FIRST_AVAILABLE)
+ void OnMessage(Message* msg) override;
+
+ private:
+ enum State {
+ kInit, // Initialized, but not started
+ kRunning, // Started and doing work
+ kReleasing, // Same as running, but to be deleted when work is done
+ kComplete, // Work is done
+ kStopping, // Work is being interrupted
+ };
+
+ class Worker : public Thread {
+ public:
+ explicit Worker(DEPRECATED_SignalThread* parent);
+ ~Worker() override;
+ void Run() override;
+ bool IsProcessingMessagesForTesting() override;
+
+ private:
+ DEPRECATED_SignalThread* parent_;
+
+ RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(Worker);
+ };
+
+ class RTC_SCOPED_LOCKABLE EnterExit {
+ public:
+ explicit EnterExit(DEPRECATED_SignalThread* t)
+ RTC_EXCLUSIVE_LOCK_FUNCTION(t->cs_)
+ : t_(t) {
+ t_->cs_.Enter();
+ // If refcount_ is zero then the object has already been deleted and we
+ // will be double-deleting it in ~EnterExit()! (shouldn't happen)
+ RTC_DCHECK_NE(0, t_->refcount_);
+ ++t_->refcount_;
+ }
+ ~EnterExit() RTC_UNLOCK_FUNCTION() {
+ bool d = (0 == --t_->refcount_);
+ t_->cs_.Leave();
+ if (d)
+ delete t_;
+ }
+
+ private:
+ DEPRECATED_SignalThread* t_;
+
+ RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(EnterExit);
+ };
+
+ void Run();
+ void OnMainThreadDestroyed();
+
+ Thread* main_;
+ Worker worker_;
+ RecursiveCriticalSection cs_;
+ State state_ RTC_GUARDED_BY(cs_);
+ int refcount_ RTC_GUARDED_BY(cs_);
+ bool destroy_called_ RTC_GUARDED_BY(cs_) = false;
+
+ RTC_DISALLOW_COPY_AND_ASSIGN(DEPRECATED_SignalThread);
+};
+
+typedef RTC_DEPRECATED DEPRECATED_SignalThread SignalThread;
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace rtc
+
+#endif // RTC_BASE_DEPRECATED_SIGNAL_THREAD_H_
diff --git a/rtc_base/signal_thread_unittest.cc b/rtc_base/deprecated/signal_thread_unittest.cc
index 14761865b8..f5a49aad63 100644
--- a/rtc_base/signal_thread_unittest.cc
+++ b/rtc_base/deprecated/signal_thread_unittest.cc
@@ -13,9 +13,9 @@
#include <memory>
#include "rtc_base/constructor_magic.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/gunit.h"
#include "rtc_base/null_socket_server.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
#include "test/gtest.h"
@@ -28,9 +28,9 @@ static const int kTimeout = 10000;
class SignalThreadTest : public ::testing::Test, public sigslot::has_slots<> {
public:
- class SlowSignalThread : public SignalThread {
+ class SlowSignalThread : public DEPRECATED_SignalThread {
public:
- SlowSignalThread(SignalThreadTest* harness) : harness_(harness) {}
+ explicit SlowSignalThread(SignalThreadTest* harness) : harness_(harness) {}
~SlowSignalThread() override {
EXPECT_EQ(harness_->main_thread_, Thread::Current());
@@ -70,7 +70,7 @@ class SignalThreadTest : public ::testing::Test, public sigslot::has_slots<> {
RTC_DISALLOW_COPY_AND_ASSIGN(SlowSignalThread);
};
- void OnWorkComplete(rtc::SignalThread* thread) {
+ void OnWorkComplete(rtc::DEPRECATED_SignalThread* thread) {
SlowSignalThread* t = static_cast<SlowSignalThread*>(thread);
EXPECT_EQ(t->harness(), this);
EXPECT_EQ(main_thread_, Thread::Current());
@@ -148,23 +148,23 @@ class OwnerThread : public Thread, public sigslot::has_slots<> {
// Delete |signal_thread|.
signal_thread->Destroy(true);
{
- rtc::CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
has_run_ = true;
}
}
bool has_run() {
- rtc::CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
return has_run_;
}
- void OnWorkDone(SignalThread* /*signal_thread*/) {
+ void OnWorkDone(DEPRECATED_SignalThread* /*signal_thread*/) {
FAIL() << " This shouldn't get called.";
}
private:
- rtc::CriticalSection crit_;
+ webrtc::Mutex mutex_;
SignalThreadTest* harness_;
- bool has_run_ RTC_GUARDED_BY(crit_);
+ bool has_run_ RTC_GUARDED_BY(mutex_);
RTC_DISALLOW_COPY_AND_ASSIGN(OwnerThread);
};
diff --git a/rtc_base/event_tracer.cc b/rtc_base/event_tracer.cc
index d23af21421..3af8183b1f 100644
--- a/rtc_base/event_tracer.cc
+++ b/rtc_base/event_tracer.cc
@@ -19,11 +19,11 @@
#include "rtc_base/atomic_ops.h"
#include "rtc_base/checks.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/platform_thread_types.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/thread_checker.h"
#include "rtc_base/time_utils.h"
@@ -120,7 +120,7 @@ class EventLogger final {
arg.value.as_string = str_copy;
}
}
- rtc::CritScope lock(&crit_);
+ webrtc::MutexLock lock(&mutex_);
trace_events_.push_back(
{name, category_enabled, phase, args, timestamp, 1, thread_id});
}
@@ -136,7 +136,7 @@ class EventLogger final {
bool shutting_down = shutdown_event_.Wait(kLoggingIntervalMs);
std::vector<TraceEvent> events;
{
- rtc::CritScope lock(&crit_);
+ webrtc::MutexLock lock(&mutex_);
trace_events_.swap(events);
}
std::string args_str;
@@ -196,7 +196,7 @@ class EventLogger final {
output_file_ = file;
output_file_owned_ = owned;
{
- rtc::CritScope lock(&crit_);
+ webrtc::MutexLock lock(&mutex_);
// Since the atomic fast-path for adding events to the queue can be
// bypassed while the logging thread is shutting down there may be some
// stale events in the queue, hence the vector needs to be cleared to not
@@ -317,8 +317,8 @@ class EventLogger final {
return output;
}
- rtc::CriticalSection crit_;
- std::vector<TraceEvent> trace_events_ RTC_GUARDED_BY(crit_);
+ webrtc::Mutex mutex_;
+ std::vector<TraceEvent> trace_events_ RTC_GUARDED_BY(mutex_);
rtc::PlatformThread logging_thread_;
rtc::Event shutdown_event_;
rtc::ThreadChecker thread_checker_;
diff --git a/rtc_base/event_tracer_unittest.cc b/rtc_base/event_tracer_unittest.cc
index 79cc9c0788..f4d41e4e7c 100644
--- a/rtc_base/event_tracer_unittest.cc
+++ b/rtc_base/event_tracer_unittest.cc
@@ -10,7 +10,7 @@
#include "rtc_base/event_tracer.h"
-#include "rtc_base/critical_section.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/trace_event.h"
#include "test/gtest.h"
@@ -20,17 +20,17 @@ namespace {
class TestStatistics {
public:
void Reset() {
- rtc::CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
events_logged_ = 0;
}
void Increment() {
- rtc::CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
++events_logged_;
}
int Count() const {
- rtc::CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
return events_logged_;
}
@@ -41,8 +41,8 @@ class TestStatistics {
}
private:
- rtc::CriticalSection crit_;
- int events_logged_ RTC_GUARDED_BY(crit_) = 0;
+ mutable webrtc::Mutex mutex_;
+ int events_logged_ RTC_GUARDED_BY(mutex_) = 0;
};
} // namespace
diff --git a/rtc_base/experiments/BUILD.gn b/rtc_base/experiments/BUILD.gn
index bb3e0ce8ae..282b5b9270 100644
--- a/rtc_base/experiments/BUILD.gn
+++ b/rtc_base/experiments/BUILD.gn
@@ -17,8 +17,8 @@ rtc_library("alr_experiment") {
"../:rtc_base_approved",
"../../api/transport:field_trial_based_config",
"../../api/transport:webrtc_key_value_config",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("field_trial_parser") {
@@ -40,6 +40,8 @@ rtc_library("field_trial_parser") {
"../../rtc_base:logging",
"../../rtc_base:safe_conversions",
"../../rtc_base:stringutils",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings:strings",
"//third_party/abseil-cpp/absl/types:optional",
@@ -57,8 +59,8 @@ rtc_library("quality_rampup_experiment") {
"../../api/transport:field_trial_based_config",
"../../api/transport:webrtc_key_value_config",
"../../system_wrappers:field_trial",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("quality_scaler_settings") {
@@ -72,8 +74,8 @@ rtc_library("quality_scaler_settings") {
"../../api/transport:field_trial_based_config",
"../../api/transport:webrtc_key_value_config",
"../../system_wrappers:field_trial",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("quality_scaling_experiment") {
@@ -85,8 +87,8 @@ rtc_library("quality_scaling_experiment") {
"../:rtc_base_approved",
"../../api/video_codecs:video_codecs_api",
"../../system_wrappers:field_trial",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("normalize_simulcast_size_experiment") {
@@ -97,8 +99,8 @@ rtc_library("normalize_simulcast_size_experiment") {
deps = [
"../:rtc_base_approved",
"../../system_wrappers:field_trial",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("balanced_degradation_settings") {
@@ -111,8 +113,8 @@ rtc_library("balanced_degradation_settings") {
"../:rtc_base_approved",
"../../api/video_codecs:video_codecs_api",
"../../system_wrappers:field_trial",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("cpu_speed_experiment") {
@@ -123,8 +125,8 @@ rtc_library("cpu_speed_experiment") {
deps = [
"../:rtc_base_approved",
"../../system_wrappers:field_trial",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("rtt_mult_experiment") {
@@ -135,8 +137,8 @@ rtc_library("rtt_mult_experiment") {
deps = [
"../:rtc_base_approved",
"../../system_wrappers:field_trial",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("jitter_upper_bound_experiment") {
@@ -147,8 +149,8 @@ rtc_library("jitter_upper_bound_experiment") {
deps = [
"../:rtc_base_approved",
"../../system_wrappers:field_trial",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("rate_control_settings") {
@@ -164,6 +166,8 @@ rtc_library("rate_control_settings") {
"../../api/units:data_size",
"../../api/video_codecs:video_codecs_api",
"../../system_wrappers:field_trial",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
@@ -178,8 +182,8 @@ rtc_library("keyframe_interval_settings_experiment") {
":field_trial_parser",
"../../api/transport:field_trial_based_config",
"../../api/transport:webrtc_key_value_config",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("stable_target_rate_experiment") {
@@ -192,8 +196,8 @@ rtc_library("stable_target_rate_experiment") {
":rate_control_settings",
"../../api/transport:field_trial_based_config",
"../../api/transport:webrtc_key_value_config",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("min_video_bitrate_experiment") {
@@ -208,8 +212,8 @@ rtc_library("min_video_bitrate_experiment") {
"../../rtc_base:checks",
"../../rtc_base:logging",
"../../system_wrappers:field_trial",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
if (rtc_include_tests) {
@@ -255,7 +259,7 @@ if (rtc_include_tests) {
"../../test:field_trial",
"../../test:test_main",
"../../test:test_support",
- "//third_party/abseil-cpp/absl/types:optional",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
}
diff --git a/rtc_base/experiments/quality_rampup_experiment.cc b/rtc_base/experiments/quality_rampup_experiment.cc
index caf7e62368..ee6675c924 100644
--- a/rtc_base/experiments/quality_rampup_experiment.cc
+++ b/rtc_base/experiments/quality_rampup_experiment.cc
@@ -70,4 +70,8 @@ bool QualityRampupExperiment::BwHigh(int64_t now_ms,
return (now_ms - *start_ms_) >= min_duration_ms_.Value();
}
+bool QualityRampupExperiment::Enabled() const {
+ return min_pixels_ || min_duration_ms_ || max_bitrate_kbps_;
+}
+
} // namespace webrtc
diff --git a/rtc_base/experiments/quality_rampup_experiment.h b/rtc_base/experiments/quality_rampup_experiment.h
index ff9d7d38e5..9d46901104 100644
--- a/rtc_base/experiments/quality_rampup_experiment.h
+++ b/rtc_base/experiments/quality_rampup_experiment.h
@@ -33,6 +33,8 @@ class QualityRampupExperiment final {
// (max_bitrate_factor_) above |max_bitrate_kbps_| for |min_duration_ms_|.
bool BwHigh(int64_t now_ms, uint32_t available_bw_kbps);
+ bool Enabled() const;
+
private:
explicit QualityRampupExperiment(
const WebRtcKeyValueConfig* const key_value_config);
diff --git a/rtc_base/fake_clock.cc b/rtc_base/fake_clock.cc
index e242e8e659..652a5afa3a 100644
--- a/rtc_base/fake_clock.cc
+++ b/rtc_base/fake_clock.cc
@@ -16,18 +16,18 @@
namespace rtc {
int64_t FakeClock::TimeNanos() const {
- CritScope cs(&lock_);
+ webrtc::MutexLock lock(&lock_);
return time_ns_;
}
void FakeClock::SetTime(webrtc::Timestamp new_time) {
- CritScope cs(&lock_);
+ webrtc::MutexLock lock(&lock_);
RTC_DCHECK(new_time.us() * 1000 >= time_ns_);
time_ns_ = new_time.us() * 1000;
}
void FakeClock::AdvanceTime(webrtc::TimeDelta delta) {
- CritScope cs(&lock_);
+ webrtc::MutexLock lock(&lock_);
time_ns_ += delta.ns();
}
diff --git a/rtc_base/fake_clock.h b/rtc_base/fake_clock.h
index 0ab9a937a8..edb507becb 100644
--- a/rtc_base/fake_clock.h
+++ b/rtc_base/fake_clock.h
@@ -15,7 +15,7 @@
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
-#include "rtc_base/critical_section.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/time_utils.h"
@@ -43,7 +43,7 @@ class FakeClock : public ClockInterface {
void AdvanceTime(webrtc::TimeDelta delta);
private:
- CriticalSection lock_;
+ mutable webrtc::Mutex lock_;
int64_t time_ns_ RTC_GUARDED_BY(lock_) = 0;
};
diff --git a/rtc_base/firewall_socket_server.cc b/rtc_base/firewall_socket_server.cc
index fc7917613c..8f44753760 100644
--- a/rtc_base/firewall_socket_server.cc
+++ b/rtc_base/firewall_socket_server.cc
@@ -163,19 +163,19 @@ void FirewallSocketServer::AddRule(bool allow,
r.p = p;
r.src = src;
r.dst = dst;
- CritScope scope(&crit_);
+ webrtc::MutexLock scope(&mutex_);
rules_.push_back(r);
}
void FirewallSocketServer::ClearRules() {
- CritScope scope(&crit_);
+ webrtc::MutexLock scope(&mutex_);
rules_.clear();
}
bool FirewallSocketServer::Check(FirewallProtocol p,
const SocketAddress& src,
const SocketAddress& dst) {
- CritScope scope(&crit_);
+ webrtc::MutexLock scope(&mutex_);
for (size_t i = 0; i < rules_.size(); ++i) {
const Rule& r = rules_[i];
if ((r.p != p) && (r.p != FP_ANY))
@@ -239,12 +239,12 @@ FirewallManager::~FirewallManager() {
}
void FirewallManager::AddServer(FirewallSocketServer* server) {
- CritScope scope(&crit_);
+ webrtc::MutexLock scope(&mutex_);
servers_.push_back(server);
}
void FirewallManager::RemoveServer(FirewallSocketServer* server) {
- CritScope scope(&crit_);
+ webrtc::MutexLock scope(&mutex_);
servers_.erase(std::remove(servers_.begin(), servers_.end(), server),
servers_.end());
}
@@ -253,7 +253,7 @@ void FirewallManager::AddRule(bool allow,
FirewallProtocol p,
FirewallDirection d,
const SocketAddress& addr) {
- CritScope scope(&crit_);
+ webrtc::MutexLock scope(&mutex_);
for (std::vector<FirewallSocketServer*>::const_iterator it = servers_.begin();
it != servers_.end(); ++it) {
(*it)->AddRule(allow, p, d, addr);
@@ -261,7 +261,7 @@ void FirewallManager::AddRule(bool allow,
}
void FirewallManager::ClearRules() {
- CritScope scope(&crit_);
+ webrtc::MutexLock scope(&mutex_);
for (std::vector<FirewallSocketServer*>::const_iterator it = servers_.begin();
it != servers_.end(); ++it) {
(*it)->ClearRules();
diff --git a/rtc_base/firewall_socket_server.h b/rtc_base/firewall_socket_server.h
index d174033e01..23b91d6ad3 100644
--- a/rtc_base/firewall_socket_server.h
+++ b/rtc_base/firewall_socket_server.h
@@ -14,11 +14,11 @@
#include <vector>
#include "rtc_base/async_socket.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/ip_address.h"
#include "rtc_base/socket.h"
#include "rtc_base/socket_address.h"
#include "rtc_base/socket_server.h"
+#include "rtc_base/synchronization/mutex.h"
namespace rtc {
@@ -90,7 +90,7 @@ class FirewallSocketServer : public SocketServer {
private:
SocketServer* server_;
FirewallManager* manager_;
- CriticalSection crit_;
+ webrtc::Mutex mutex_;
struct Rule {
bool allow;
FirewallProtocol p;
@@ -123,7 +123,7 @@ class FirewallManager {
void ClearRules();
private:
- CriticalSection crit_;
+ webrtc::Mutex mutex_;
std::vector<FirewallSocketServer*> servers_;
};
diff --git a/rtc_base/logging.cc b/rtc_base/logging.cc
index ff7369dd5c..d07a7e75e7 100644
--- a/rtc_base/logging.cc
+++ b/rtc_base/logging.cc
@@ -42,11 +42,11 @@ static const int kMaxLogLineSize = 1024 - 60;
#include "absl/base/attributes.h"
#include "rtc_base/checks.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/platform_thread_types.h"
#include "rtc_base/string_encode.h"
#include "rtc_base/string_utils.h"
#include "rtc_base/strings/string_builder.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/time_utils.h"
@@ -72,7 +72,9 @@ const char* FilenameFromPath(const char* file) {
}
// Global lock for log subsystem, only needed to serialize access to streams_.
-CriticalSection g_log_crit;
+// TODO(bugs.webrtc.org/11665): this is not currently constant initialized and
+// trivially destructible.
+webrtc::Mutex g_log_mutex_;
} // namespace
/////////////////////////////////////////////////////////////////////////////
@@ -85,8 +87,9 @@ bool LogMessage::log_to_stderr_ = true;
// Note: we explicitly do not clean this up, because of the uncertain ordering
// of destructors at program exit. Let the person who sets the stream trigger
// cleanup by setting to null, or let it leak (safe at program exit).
-ABSL_CONST_INIT LogSink* LogMessage::streams_ RTC_GUARDED_BY(g_log_crit) =
+ABSL_CONST_INIT LogSink* LogMessage::streams_ RTC_GUARDED_BY(g_log_mutex_) =
nullptr;
+ABSL_CONST_INIT std::atomic<bool> LogMessage::streams_empty_ = {true};
// Boolean options default to false (0)
bool LogMessage::thread_, LogMessage::timestamp_;
@@ -193,7 +196,7 @@ LogMessage::~LogMessage() {
#endif
}
- CritScope cs(&g_log_crit);
+ webrtc::MutexLock lock(&g_log_mutex_);
for (LogSink* entry = streams_; entry != nullptr; entry = entry->next_) {
if (severity_ >= entry->min_severity_) {
#if defined(WEBRTC_ANDROID)
@@ -242,7 +245,7 @@ void LogMessage::LogTimestamps(bool on) {
void LogMessage::LogToDebug(LoggingSeverity min_sev) {
g_dbg_sev = min_sev;
- CritScope cs(&g_log_crit);
+ webrtc::MutexLock lock(&g_log_mutex_);
UpdateMinLogSeverity();
}
@@ -251,7 +254,7 @@ void LogMessage::SetLogToStderr(bool log_to_stderr) {
}
int LogMessage::GetLogToStream(LogSink* stream) {
- CritScope cs(&g_log_crit);
+ webrtc::MutexLock lock(&g_log_mutex_);
LoggingSeverity sev = LS_NONE;
for (LogSink* entry = streams_; entry != nullptr; entry = entry->next_) {
if (stream == nullptr || stream == entry) {
@@ -262,15 +265,16 @@ int LogMessage::GetLogToStream(LogSink* stream) {
}
void LogMessage::AddLogToStream(LogSink* stream, LoggingSeverity min_sev) {
- CritScope cs(&g_log_crit);
+ webrtc::MutexLock lock(&g_log_mutex_);
stream->min_severity_ = min_sev;
stream->next_ = streams_;
streams_ = stream;
+ streams_empty_.store(false, std::memory_order_relaxed);
UpdateMinLogSeverity();
}
void LogMessage::RemoveLogToStream(LogSink* stream) {
- CritScope cs(&g_log_crit);
+ webrtc::MutexLock lock(&g_log_mutex_);
for (LogSink** entry = &streams_; *entry != nullptr;
entry = &(*entry)->next_) {
if (*entry == stream) {
@@ -278,6 +282,7 @@ void LogMessage::RemoveLogToStream(LogSink* stream) {
break;
}
}
+ streams_empty_.store(streams_ == nullptr, std::memory_order_relaxed);
UpdateMinLogSeverity();
}
@@ -331,7 +336,7 @@ void LogMessage::ConfigureLogging(const char* params) {
}
void LogMessage::UpdateMinLogSeverity()
- RTC_EXCLUSIVE_LOCKS_REQUIRED(g_log_crit) {
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(g_log_mutex_) {
LoggingSeverity min_sev = g_dbg_sev;
for (LogSink* entry = streams_; entry != nullptr; entry = entry->next_) {
min_sev = std::min(min_sev, entry->min_severity_);
@@ -435,12 +440,7 @@ void LogMessage::OutputToDebug(const std::string& str,
bool LogMessage::IsNoop(LoggingSeverity severity) {
if (severity >= g_dbg_sev || severity >= g_min_sev)
return false;
-
- // TODO(tommi): We're grabbing this lock for every LogMessage instance that
- // is going to be logged. This introduces unnecessary synchronization for
- // a feature that's mostly used for testing.
- CritScope cs(&g_log_crit);
- return streams_ == nullptr;
+ return streams_empty_.load(std::memory_order_relaxed);
}
void LogMessage::FinishPrintStream() {
diff --git a/rtc_base/logging.h b/rtc_base/logging.h
index 0aa1e676d1..0852c06182 100644
--- a/rtc_base/logging.h
+++ b/rtc_base/logging.h
@@ -46,6 +46,7 @@
#include <errno.h>
+#include <atomic>
#include <sstream> // no-presubmit-check TODO(webrtc:8982)
#include <string>
#include <utility>
@@ -463,9 +464,14 @@ class LogMessage {
static void SetLogToStderr(bool log_to_stderr);
// Stream: Any non-blocking stream interface.
// Installs the |stream| to collect logs with severtiy |min_sev| or higher.
- // |stream| must live until deinstalled by RemoveLogToStream
+ // |stream| must live until deinstalled by RemoveLogToStream.
+ // If |stream| is the first stream added to the system, we might miss some
+ // early concurrent log statement happening from another thread happening near
+ // this instant.
static void AddLogToStream(LogSink* stream, LoggingSeverity min_sev);
- // Removes the specified stream, without destroying it.
+ // Removes the specified stream, without destroying it. When the method
+ // has completed, it's guaranteed that |stream| will receive no more logging
+ // calls.
static void RemoveLogToStream(LogSink* stream);
// Returns the severity for the specified stream, of if none is specified,
// the minimum stream severity.
@@ -557,6 +563,12 @@ class LogMessage {
// The output streams and their associated severities
static LogSink* streams_;
+ // Holds true with high probability if |streams_| is empty, false with high
+ // probability otherwise. Operated on with std::memory_order_relaxed because
+ // it's ok to lose or log some additional statements near the instant streams
+ // are added/removed.
+ static std::atomic<bool> streams_empty_;
+
// Flags for formatting options
static bool thread_, timestamp_;
diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn
index aa905c6f70..5c3dd0a5d1 100644
--- a/rtc_base/memory/BUILD.gn
+++ b/rtc_base/memory/BUILD.gn
@@ -31,7 +31,10 @@ rtc_library("fifo_buffer") {
"fifo_buffer.cc",
"fifo_buffer.h",
]
- deps = [ "..:rtc_base" ]
+ deps = [
+ "..:rtc_base",
+ "../synchronization:mutex",
+ ]
}
rtc_library("unittests") {
diff --git a/rtc_base/memory/fifo_buffer.cc b/rtc_base/memory/fifo_buffer.cc
index 44fb032e57..49e926719f 100644
--- a/rtc_base/memory/fifo_buffer.cc
+++ b/rtc_base/memory/fifo_buffer.cc
@@ -39,13 +39,13 @@ FifoBuffer::FifoBuffer(size_t size, Thread* owner)
FifoBuffer::~FifoBuffer() {}
bool FifoBuffer::GetBuffered(size_t* size) const {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
*size = data_length_;
return true;
}
bool FifoBuffer::SetCapacity(size_t size) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
if (data_length_ > size) {
return false;
}
@@ -67,7 +67,7 @@ StreamResult FifoBuffer::ReadOffset(void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_read) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
}
@@ -75,12 +75,12 @@ StreamResult FifoBuffer::WriteOffset(const void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_written) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
}
StreamState FifoBuffer::GetState() const {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
return state_;
}
@@ -88,7 +88,7 @@ StreamResult FifoBuffer::Read(void* buffer,
size_t bytes,
size_t* bytes_read,
int* error) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
const bool was_writable = data_length_ < buffer_length_;
size_t copy = 0;
StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
@@ -114,7 +114,7 @@ StreamResult FifoBuffer::Write(const void* buffer,
size_t bytes,
size_t* bytes_written,
int* error) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
const bool was_readable = (data_length_ > 0);
size_t copy = 0;
@@ -136,12 +136,12 @@ StreamResult FifoBuffer::Write(const void* buffer,
}
void FifoBuffer::Close() {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
state_ = SS_CLOSED;
}
const void* FifoBuffer::GetReadData(size_t* size) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
*size = (read_position_ + data_length_ <= buffer_length_)
? data_length_
: buffer_length_ - read_position_;
@@ -149,7 +149,7 @@ const void* FifoBuffer::GetReadData(size_t* size) {
}
void FifoBuffer::ConsumeReadData(size_t size) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
RTC_DCHECK(size <= data_length_);
const bool was_writable = data_length_ < buffer_length_;
read_position_ = (read_position_ + size) % buffer_length_;
@@ -160,7 +160,7 @@ void FifoBuffer::ConsumeReadData(size_t size) {
}
void* FifoBuffer::GetWriteBuffer(size_t* size) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
if (state_ == SS_CLOSED) {
return nullptr;
}
@@ -180,7 +180,7 @@ void* FifoBuffer::GetWriteBuffer(size_t* size) {
}
void FifoBuffer::ConsumeWriteBuffer(size_t size) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
RTC_DCHECK(size <= buffer_length_ - data_length_);
const bool was_readable = (data_length_ > 0);
data_length_ += size;
@@ -190,7 +190,7 @@ void FifoBuffer::ConsumeWriteBuffer(size_t size) {
}
bool FifoBuffer::GetWriteRemaining(size_t* size) const {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
*size = buffer_length_ - data_length_;
return true;
}
diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h
index f859815c70..04c4cbf33b 100644
--- a/rtc_base/memory/fifo_buffer.h
+++ b/rtc_base/memory/fifo_buffer.h
@@ -14,6 +14,7 @@
#include <memory>
#include "rtc_base/stream.h"
+#include "rtc_base/synchronization/mutex.h"
namespace rtc {
@@ -103,7 +104,7 @@ class FifoBuffer final : public StreamInterface {
size_t bytes,
size_t offset,
size_t* bytes_read)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Helper method that implements WriteOffset. Caller must acquire a lock
// when calling this method.
@@ -111,22 +112,22 @@ class FifoBuffer final : public StreamInterface {
size_t bytes,
size_t offset,
size_t* bytes_written)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// keeps the opened/closed state of the stream
- StreamState state_ RTC_GUARDED_BY(crit_);
+ StreamState state_ RTC_GUARDED_BY(mutex_);
// the allocated buffer
- std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(crit_);
+ std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(mutex_);
// size of the allocated buffer
- size_t buffer_length_ RTC_GUARDED_BY(crit_);
+ size_t buffer_length_ RTC_GUARDED_BY(mutex_);
// amount of readable data in the buffer
- size_t data_length_ RTC_GUARDED_BY(crit_);
+ size_t data_length_ RTC_GUARDED_BY(mutex_);
// offset to the readable data
- size_t read_position_ RTC_GUARDED_BY(crit_);
+ size_t read_position_ RTC_GUARDED_BY(mutex_);
// stream callbacks are dispatched on this thread
Thread* owner_;
// object lock
- CriticalSection crit_;
+ mutable webrtc::Mutex mutex_;
RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer);
};
diff --git a/rtc_base/nat_server.cc b/rtc_base/nat_server.cc
index 323a787ee0..725a57be9f 100644
--- a/rtc_base/nat_server.cc
+++ b/rtc_base/nat_server.cc
@@ -174,7 +174,7 @@ void NATServer::OnInternalUDPPacket(AsyncPacketSocket* socket,
RTC_DCHECK(iter != int_map_->end());
// Allow the destination to send packets back to the source.
- iter->second->WhitelistInsert(dest_addr);
+ iter->second->AllowlistInsert(dest_addr);
// Send the packet to its intended destination.
rtc::PacketOptions options;
@@ -227,29 +227,29 @@ void NATServer::Translate(const SocketAddressPair& route) {
bool NATServer::ShouldFilterOut(TransEntry* entry,
const SocketAddress& ext_addr) {
- return entry->WhitelistContains(ext_addr);
+ return entry->AllowlistContains(ext_addr);
}
NATServer::TransEntry::TransEntry(const SocketAddressPair& r,
AsyncUDPSocket* s,
NAT* nat)
: route(r), socket(s) {
- whitelist = new AddressSet(AddrCmp(nat));
+ allowlist = new AddressSet(AddrCmp(nat));
}
NATServer::TransEntry::~TransEntry() {
- delete whitelist;
+ delete allowlist;
delete socket;
}
-void NATServer::TransEntry::WhitelistInsert(const SocketAddress& addr) {
- CritScope cs(&crit_);
- whitelist->insert(addr);
+void NATServer::TransEntry::AllowlistInsert(const SocketAddress& addr) {
+ webrtc::MutexLock lock(&mutex_);
+ allowlist->insert(addr);
}
-bool NATServer::TransEntry::WhitelistContains(const SocketAddress& ext_addr) {
- CritScope cs(&crit_);
- return whitelist->find(ext_addr) == whitelist->end();
+bool NATServer::TransEntry::AllowlistContains(const SocketAddress& ext_addr) {
+ webrtc::MutexLock lock(&mutex_);
+ return allowlist->find(ext_addr) == allowlist->end();
}
} // namespace rtc
diff --git a/rtc_base/nat_server.h b/rtc_base/nat_server.h
index 46f01e9761..5078fbb2c1 100644
--- a/rtc_base/nat_server.h
+++ b/rtc_base/nat_server.h
@@ -20,6 +20,7 @@
#include "rtc_base/proxy_server.h"
#include "rtc_base/socket_address_pair.h"
#include "rtc_base/socket_factory.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread.h"
namespace rtc {
@@ -96,13 +97,13 @@ class NATServer : public sigslot::has_slots<> {
TransEntry(const SocketAddressPair& r, AsyncUDPSocket* s, NAT* nat);
~TransEntry();
- void WhitelistInsert(const SocketAddress& addr);
- bool WhitelistContains(const SocketAddress& ext_addr);
+ void AllowlistInsert(const SocketAddress& addr);
+ bool AllowlistContains(const SocketAddress& ext_addr);
SocketAddressPair route;
AsyncUDPSocket* socket;
- AddressSet* whitelist;
- CriticalSection crit_;
+ AddressSet* allowlist;
+ webrtc::Mutex mutex_;
};
typedef std::map<SocketAddressPair, TransEntry*, RouteCmp> InternalMap;
diff --git a/rtc_base/net_helpers.cc b/rtc_base/net_helpers.cc
index 6ff3791738..c6685e2a65 100644
--- a/rtc_base/net_helpers.cc
+++ b/rtc_base/net_helpers.cc
@@ -10,8 +10,6 @@
#include "rtc_base/net_helpers.h"
-#include <memory>
-
#if defined(WEBRTC_WIN)
#include <ws2spi.h>
#include <ws2tcpip.h>
@@ -26,8 +24,11 @@
#endif
#endif // defined(WEBRTC_POSIX) && !defined(__native_client__)
+#include "api/task_queue/task_queue_base.h"
#include "rtc_base/logging.h"
#include "rtc_base/signal_thread.h"
+#include "rtc_base/task_queue.h"
+#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/third_party/sigslot/sigslot.h" // for signal_with_thread...
namespace rtc {
@@ -83,18 +84,35 @@ int ResolveHostname(const std::string& hostname,
#endif // !__native_client__
}
-// AsyncResolver
-AsyncResolver::AsyncResolver() : SignalThread(), error_(-1) {}
+AsyncResolver::AsyncResolver() : error_(-1) {}
-AsyncResolver::~AsyncResolver() = default;
+AsyncResolver::~AsyncResolver() {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+}
void AsyncResolver::Start(const SocketAddress& addr) {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK(!destroy_called_);
addr_ = addr;
- // SignalThred Start will kickoff the resolve process.
- SignalThread::Start();
+ webrtc::TaskQueueBase* current_task_queue = webrtc::TaskQueueBase::Current();
+ popup_thread_ = Thread::Create();
+ popup_thread_->Start();
+ popup_thread_->PostTask(webrtc::ToQueuedTask(
+ [this, flag = safety_.flag(), addr, current_task_queue] {
+ std::vector<IPAddress> addresses;
+ int error =
+ ResolveHostname(addr.hostname().c_str(), addr.family(), &addresses);
+ current_task_queue->PostTask(webrtc::ToQueuedTask(
+ std::move(flag), [this, error, addresses = std::move(addresses)] {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ ResolveDone(std::move(addresses), error);
+ }));
+ }));
}
bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK(!destroy_called_);
if (error_ != 0 || addresses_.empty())
return false;
@@ -109,20 +127,40 @@ bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const {
}
int AsyncResolver::GetError() const {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK(!destroy_called_);
return error_;
}
void AsyncResolver::Destroy(bool wait) {
- SignalThread::Destroy(wait);
+ // Some callers have trouble guaranteeing that Destroy is called on the
+ // sequence guarded by |sequence_checker_|.
+ // RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK(!destroy_called_);
+ destroy_called_ = true;
+ MaybeSelfDestruct();
}
-void AsyncResolver::DoWork() {
- error_ =
- ResolveHostname(addr_.hostname().c_str(), addr_.family(), &addresses_);
+const std::vector<IPAddress>& AsyncResolver::addresses() const {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK(!destroy_called_);
+ return addresses_;
}
-void AsyncResolver::OnWorkDone() {
+void AsyncResolver::ResolveDone(std::vector<IPAddress> addresses, int error) {
+ addresses_ = addresses;
+ error_ = error;
+ recursion_check_ = true;
SignalDone(this);
+ MaybeSelfDestruct();
+}
+
+void AsyncResolver::MaybeSelfDestruct() {
+ if (!recursion_check_) {
+ delete this;
+ } else {
+ recursion_check_ = false;
+ }
}
const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) {
diff --git a/rtc_base/net_helpers.h b/rtc_base/net_helpers.h
index 1e06940be7..c6aa4be5b2 100644
--- a/rtc_base/net_helpers.h
+++ b/rtc_base/net_helpers.h
@@ -21,16 +21,23 @@
#include "rtc_base/async_resolver_interface.h"
#include "rtc_base/ip_address.h"
-#include "rtc_base/signal_thread.h"
#include "rtc_base/socket_address.h"
+#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/system/rtc_export.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
+#include "rtc_base/thread.h"
+#include "rtc_base/thread_annotations.h"
namespace rtc {
// AsyncResolver will perform async DNS resolution, signaling the result on
// the SignalDone from AsyncResolverInterface when the operation completes.
-class RTC_EXPORT AsyncResolver : public SignalThread,
- public AsyncResolverInterface {
+//
+// This class is thread-compatible, and all methods and destruction needs to
+// happen from the same rtc::Thread, except for Destroy which is allowed to
+// happen on another context provided it's not happening concurrently to another
+// public API call, and is the last access to the object.
+class RTC_EXPORT AsyncResolver : public AsyncResolverInterface {
public:
AsyncResolver();
~AsyncResolver() override;
@@ -40,17 +47,22 @@ class RTC_EXPORT AsyncResolver : public SignalThread,
int GetError() const override;
void Destroy(bool wait) override;
- const std::vector<IPAddress>& addresses() const { return addresses_; }
- void set_error(int error) { error_ = error; }
-
- protected:
- void DoWork() override;
- void OnWorkDone() override;
+ const std::vector<IPAddress>& addresses() const;
private:
- SocketAddress addr_;
- std::vector<IPAddress> addresses_;
- int error_;
+ void ResolveDone(std::vector<IPAddress> addresses, int error)
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(sequence_checker_);
+ void MaybeSelfDestruct();
+
+ SocketAddress addr_ RTC_GUARDED_BY(sequence_checker_);
+ std::vector<IPAddress> addresses_ RTC_GUARDED_BY(sequence_checker_);
+ int error_ RTC_GUARDED_BY(sequence_checker_);
+ webrtc::ScopedTaskSafety safety_ RTC_GUARDED_BY(sequence_checker_);
+ std::unique_ptr<Thread> popup_thread_ RTC_GUARDED_BY(sequence_checker_);
+ bool recursion_check_ =
+ false; // Protects against SignalDone calling into Destroy.
+ bool destroy_called_ = false;
+ webrtc::SequenceChecker sequence_checker_;
};
// rtc namespaced wrappers for inet_ntop and inet_pton so we can avoid
diff --git a/rtc_base/network.cc b/rtc_base/network.cc
index f30063d991..64aee4bdae 100644
--- a/rtc_base/network.cc
+++ b/rtc_base/network.cc
@@ -35,6 +35,7 @@
#include "rtc_base/string_utils.h"
#include "rtc_base/strings/string_builder.h"
#include "rtc_base/thread.h"
+#include "system_wrappers/include/field_trial.h"
namespace rtc {
namespace {
@@ -85,7 +86,8 @@ bool SortNetworks(const Network* a, const Network* b) {
return a->key() < b->key();
}
-uint16_t ComputeNetworkCostByType(int type) {
+uint16_t ComputeNetworkCostByType(int type,
+ bool use_differentiated_cellular_costs) {
// TODO(jonaso) : Rollout support for cellular network cost using A/B
// experiment to make sure it does not introduce regressions.
switch (type) {
@@ -95,11 +97,19 @@ uint16_t ComputeNetworkCostByType(int type) {
case rtc::ADAPTER_TYPE_WIFI:
return kNetworkCostLow;
case rtc::ADAPTER_TYPE_CELLULAR:
+ return kNetworkCostCellular;
case rtc::ADAPTER_TYPE_CELLULAR_2G:
+ return use_differentiated_cellular_costs ? kNetworkCostCellular2G
+ : kNetworkCostCellular;
case rtc::ADAPTER_TYPE_CELLULAR_3G:
+ return use_differentiated_cellular_costs ? kNetworkCostCellular3G
+ : kNetworkCostCellular;
case rtc::ADAPTER_TYPE_CELLULAR_4G:
+ return use_differentiated_cellular_costs ? kNetworkCostCellular4G
+ : kNetworkCostCellular;
case rtc::ADAPTER_TYPE_CELLULAR_5G:
- return kNetworkCostCellular;
+ return use_differentiated_cellular_costs ? kNetworkCostCellular5G
+ : kNetworkCostCellular;
case rtc::ADAPTER_TYPE_ANY:
// Candidates gathered from the any-address/wildcard ports, as backups,
// are given the maximum cost so that if there are other candidates with
@@ -930,7 +940,9 @@ Network::Network(const std::string& name,
scope_id_(0),
ignored_(false),
type_(ADAPTER_TYPE_UNKNOWN),
- preference_(0) {}
+ preference_(0),
+ use_differentiated_cellular_costs_(webrtc::field_trial::IsEnabled(
+ "WebRTC-UseDifferentiatedCellularCosts")) {}
Network::Network(const std::string& name,
const std::string& desc,
@@ -945,7 +957,9 @@ Network::Network(const std::string& name,
scope_id_(0),
ignored_(false),
type_(type),
- preference_(0) {}
+ preference_(0),
+ use_differentiated_cellular_costs_(webrtc::field_trial::IsEnabled(
+ "WebRTC-UseDifferentiatedCellularCosts")) {}
Network::Network(const Network&) = default;
@@ -1017,7 +1031,7 @@ webrtc::MdnsResponderInterface* Network::GetMdnsResponder() const {
uint16_t Network::GetCost() const {
AdapterType type = IsVpn() ? underlying_type_for_vpn_ : type_;
- return ComputeNetworkCostByType(type);
+ return ComputeNetworkCostByType(type, use_differentiated_cellular_costs_);
}
std::string Network::ToString() const {
diff --git a/rtc_base/network.h b/rtc_base/network.h
index bd05b6ae16..a67d2a2339 100644
--- a/rtc_base/network.h
+++ b/rtc_base/network.h
@@ -462,6 +462,7 @@ class RTC_EXPORT Network {
int preference_;
bool active_ = true;
uint16_t id_ = 0;
+ bool use_differentiated_cellular_costs_ = false;
friend class NetworkManager;
};
diff --git a/rtc_base/network/BUILD.gn b/rtc_base/network/BUILD.gn
index 1d06defb3b..35ae3d45f7 100644
--- a/rtc_base/network/BUILD.gn
+++ b/rtc_base/network/BUILD.gn
@@ -13,8 +13,6 @@ rtc_library("sent_packet") {
"sent_packet.cc",
"sent_packet.h",
]
- deps = [
- "../system:rtc_export",
- "//third_party/abseil-cpp/absl/types:optional",
- ]
+ deps = [ "../system:rtc_export" ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
diff --git a/rtc_base/one_time_event.h b/rtc_base/one_time_event.h
index c5ccbf6933..d33ddbd587 100644
--- a/rtc_base/one_time_event.h
+++ b/rtc_base/one_time_event.h
@@ -11,7 +11,7 @@
#ifndef RTC_BASE_ONE_TIME_EVENT_H_
#define RTC_BASE_ONE_TIME_EVENT_H_
-#include "rtc_base/critical_section.h"
+#include "rtc_base/synchronization/mutex.h"
namespace webrtc {
// Provides a simple way to perform an operation (such as logging) one
@@ -26,7 +26,7 @@ class OneTimeEvent {
public:
OneTimeEvent() {}
bool operator()() {
- rtc::CritScope cs(&critsect_);
+ MutexLock lock(&mutex_);
if (happened_) {
return false;
}
@@ -36,7 +36,7 @@ class OneTimeEvent {
private:
bool happened_ = false;
- rtc::CriticalSection critsect_;
+ Mutex mutex_;
};
// A non-thread-safe, ligher-weight version of the OneTimeEvent class.
diff --git a/rtc_base/openssl_adapter_unittest.cc b/rtc_base/openssl_adapter_unittest.cc
index b161304d65..4bd87992d4 100644
--- a/rtc_base/openssl_adapter_unittest.cc
+++ b/rtc_base/openssl_adapter_unittest.cc
@@ -25,28 +25,34 @@ namespace {
class MockAsyncSocket : public AsyncSocket {
public:
virtual ~MockAsyncSocket() = default;
- MOCK_METHOD1(Accept, AsyncSocket*(SocketAddress*));
- MOCK_CONST_METHOD0(GetLocalAddress, SocketAddress());
- MOCK_CONST_METHOD0(GetRemoteAddress, SocketAddress());
- MOCK_METHOD1(Bind, int(const SocketAddress&));
- MOCK_METHOD1(Connect, int(const SocketAddress&));
- MOCK_METHOD2(Send, int(const void*, size_t));
- MOCK_METHOD3(SendTo, int(const void*, size_t, const SocketAddress&));
- MOCK_METHOD3(Recv, int(void*, size_t, int64_t*));
- MOCK_METHOD4(RecvFrom, int(void*, size_t, SocketAddress*, int64_t*));
- MOCK_METHOD1(Listen, int(int));
- MOCK_METHOD0(Close, int());
- MOCK_CONST_METHOD0(GetError, int());
- MOCK_METHOD1(SetError, void(int));
- MOCK_CONST_METHOD0(GetState, ConnState());
- MOCK_METHOD2(GetOption, int(Option, int*));
- MOCK_METHOD2(SetOption, int(Option, int));
+ MOCK_METHOD(AsyncSocket*, Accept, (SocketAddress*), (override));
+ MOCK_METHOD(SocketAddress, GetLocalAddress, (), (const, override));
+ MOCK_METHOD(SocketAddress, GetRemoteAddress, (), (const, override));
+ MOCK_METHOD(int, Bind, (const SocketAddress&), (override));
+ MOCK_METHOD(int, Connect, (const SocketAddress&), (override));
+ MOCK_METHOD(int, Send, (const void*, size_t), (override));
+ MOCK_METHOD(int,
+ SendTo,
+ (const void*, size_t, const SocketAddress&),
+ (override));
+ MOCK_METHOD(int, Recv, (void*, size_t, int64_t*), (override));
+ MOCK_METHOD(int,
+ RecvFrom,
+ (void*, size_t, SocketAddress*, int64_t*),
+ (override));
+ MOCK_METHOD(int, Listen, (int), (override));
+ MOCK_METHOD(int, Close, (), (override));
+ MOCK_METHOD(int, GetError, (), (const, override));
+ MOCK_METHOD(void, SetError, (int), (override));
+ MOCK_METHOD(ConnState, GetState, (), (const, override));
+ MOCK_METHOD(int, GetOption, (Option, int*), (override));
+ MOCK_METHOD(int, SetOption, (Option, int), (override));
};
class MockCertVerifier : public SSLCertificateVerifier {
public:
virtual ~MockCertVerifier() = default;
- MOCK_METHOD1(Verify, bool(const SSLCertificate&));
+ MOCK_METHOD(bool, Verify, (const SSLCertificate&), (override));
};
} // namespace
diff --git a/rtc_base/operations_chain_unittest.cc b/rtc_base/operations_chain_unittest.cc
index 968f94c060..ed3c924998 100644
--- a/rtc_base/operations_chain_unittest.cc
+++ b/rtc_base/operations_chain_unittest.cc
@@ -369,14 +369,15 @@ TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) {
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
-TEST(OperationsChainTest, OperationNotInvokingCallbackShouldCrash) {
+TEST(OperationsChainDeathTest, OperationNotInvokingCallbackShouldCrash) {
scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
EXPECT_DEATH(
operations_chain->ChainOperation([](std::function<void()> callback) {}),
"");
}
-TEST(OperationsChainTest, OperationInvokingCallbackMultipleTimesShouldCrash) {
+TEST(OperationsChainDeathTest,
+ OperationInvokingCallbackMultipleTimesShouldCrash) {
scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
EXPECT_DEATH(
operations_chain->ChainOperation([](std::function<void()> callback) {
diff --git a/rtc_base/physical_socket_server.cc b/rtc_base/physical_socket_server.cc
index 080534af2c..05b32557be 100644
--- a/rtc_base/physical_socket_server.cc
+++ b/rtc_base/physical_socket_server.cc
@@ -24,7 +24,6 @@
// "poll" will be used to wait for the signal dispatcher.
#include <poll.h>
#endif
-#include <signal.h>
#include <sys/ioctl.h>
#include <sys/select.h>
#include <sys/time.h>
@@ -956,182 +955,7 @@ class EventDispatcher : public Dispatcher {
PhysicalSocketServer* ss_;
int afd_[2];
bool fSignaled_;
- CriticalSection crit_;
-};
-
-// These two classes use the self-pipe trick to deliver POSIX signals to our
-// select loop. This is the only safe, reliable, cross-platform way to do
-// non-trivial things with a POSIX signal in an event-driven program (until
-// proper pselect() implementations become ubiquitous).
-
-class PosixSignalHandler {
- public:
- // POSIX only specifies 32 signals, but in principle the system might have
- // more and the programmer might choose to use them, so we size our array
- // for 128.
- static constexpr int kNumPosixSignals = 128;
-
- // There is just a single global instance. (Signal handlers do not get any
- // sort of user-defined void * parameter, so they can't access anything that
- // isn't global.)
- static PosixSignalHandler* Instance() {
- static PosixSignalHandler* const instance = new PosixSignalHandler();
- return instance;
- }
-
- // Returns true if the given signal number is set.
- bool IsSignalSet(int signum) const {
- RTC_DCHECK(signum < static_cast<int>(arraysize(received_signal_)));
- if (signum < static_cast<int>(arraysize(received_signal_))) {
- return received_signal_[signum];
- } else {
- return false;
- }
- }
-
- // Clears the given signal number.
- void ClearSignal(int signum) {
- RTC_DCHECK(signum < static_cast<int>(arraysize(received_signal_)));
- if (signum < static_cast<int>(arraysize(received_signal_))) {
- received_signal_[signum] = false;
- }
- }
-
- // Returns the file descriptor to monitor for signal events.
- int GetDescriptor() const { return afd_[0]; }
-
- // This is called directly from our real signal handler, so it must be
- // signal-handler-safe. That means it cannot assume anything about the
- // user-level state of the process, since the handler could be executed at any
- // time on any thread.
- void OnPosixSignalReceived(int signum) {
- if (signum >= static_cast<int>(arraysize(received_signal_))) {
- // We don't have space in our array for this.
- return;
- }
- // Set a flag saying we've seen this signal.
- received_signal_[signum] = true;
- // Notify application code that we got a signal.
- const uint8_t b[1] = {0};
- if (-1 == write(afd_[1], b, sizeof(b))) {
- // Nothing we can do here. If there's an error somehow then there's
- // nothing we can safely do from a signal handler.
- // No, we can't even safely log it.
- // But, we still have to check the return value here. Otherwise,
- // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
- return;
- }
- }
-
- private:
- PosixSignalHandler() {
- if (pipe(afd_) < 0) {
- RTC_LOG_ERR(LS_ERROR) << "pipe failed";
- return;
- }
- if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
- RTC_LOG_ERR(LS_WARNING) << "fcntl #1 failed";
- }
- if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
- RTC_LOG_ERR(LS_WARNING) << "fcntl #2 failed";
- }
- memset(const_cast<void*>(static_cast<volatile void*>(received_signal_)), 0,
- sizeof(received_signal_));
- }
-
- ~PosixSignalHandler() {
- int fd1 = afd_[0];
- int fd2 = afd_[1];
- // We clobber the stored file descriptor numbers here or else in principle
- // a signal that happens to be delivered during application termination
- // could erroneously write a zero byte to an unrelated file handle in
- // OnPosixSignalReceived() if some other file happens to be opened later
- // during shutdown and happens to be given the same file descriptor number
- // as our pipe had. Unfortunately even with this precaution there is still a
- // race where that could occur if said signal happens to be handled
- // concurrently with this code and happens to have already read the value of
- // afd_[1] from memory before we clobber it, but that's unlikely.
- afd_[0] = -1;
- afd_[1] = -1;
- close(fd1);
- close(fd2);
- }
-
- int afd_[2];
- // These are boolean flags that will be set in our signal handler and read
- // and cleared from Wait(). There is a race involved in this, but it is
- // benign. The signal handler sets the flag before signaling the pipe, so
- // we'll never end up blocking in select() while a flag is still true.
- // However, if two of the same signal arrive close to each other then it's
- // possible that the second time the handler may set the flag while it's still
- // true, meaning that signal will be missed. But the first occurrence of it
- // will still be handled, so this isn't a problem.
- // Volatile is not necessary here for correctness, but this data _is_ volatile
- // so I've marked it as such.
- volatile uint8_t received_signal_[kNumPosixSignals];
-};
-
-class PosixSignalDispatcher : public Dispatcher {
- public:
- PosixSignalDispatcher(PhysicalSocketServer* owner) : owner_(owner) {
- owner_->Add(this);
- }
-
- ~PosixSignalDispatcher() override { owner_->Remove(this); }
-
- uint32_t GetRequestedEvents() override { return DE_READ; }
-
- void OnPreEvent(uint32_t ff) override {
- // Events might get grouped if signals come very fast, so we read out up to
- // 16 bytes to make sure we keep the pipe empty.
- uint8_t b[16];
- ssize_t ret = read(GetDescriptor(), b, sizeof(b));
- if (ret < 0) {
- RTC_LOG_ERR(LS_WARNING) << "Error in read()";
- } else if (ret == 0) {
- RTC_LOG(LS_WARNING) << "Should have read at least one byte";
- }
- }
-
- void OnEvent(uint32_t ff, int err) override {
- for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
- ++signum) {
- if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
- PosixSignalHandler::Instance()->ClearSignal(signum);
- HandlerMap::iterator i = handlers_.find(signum);
- if (i == handlers_.end()) {
- // This can happen if a signal is delivered to our process at around
- // the same time as we unset our handler for it. It is not an error
- // condition, but it's unusual enough to be worth logging.
- RTC_LOG(LS_INFO) << "Received signal with no handler: " << signum;
- } else {
- // Otherwise, execute our handler.
- (*i->second)(signum);
- }
- }
- }
- }
-
- int GetDescriptor() override {
- return PosixSignalHandler::Instance()->GetDescriptor();
- }
-
- bool IsDescriptorClosed() override { return false; }
-
- void SetHandler(int signum, void (*handler)(int)) {
- handlers_[signum] = handler;
- }
-
- void ClearHandler(int signum) { handlers_.erase(signum); }
-
- bool HasHandlers() { return !handlers_.empty(); }
-
- private:
- typedef std::map<int, void (*)(int)> HandlerMap;
-
- HandlerMap handlers_;
- // Our owner.
- PhysicalSocketServer* owner_;
+ RecursiveCriticalSection crit_;
};
#endif // WEBRTC_POSIX
@@ -1205,31 +1029,32 @@ class Signaler : public EventDispatcher {
bool* pf_;
};
-PhysicalSocketServer::PhysicalSocketServer() : fWait_(false) {
+PhysicalSocketServer::PhysicalSocketServer()
+ :
+#if defined(WEBRTC_USE_EPOLL)
+ // Since Linux 2.6.8, the size argument is ignored, but must be greater
+ // than zero. Before that the size served as hint to the kernel for the
+ // amount of space to initially allocate in internal data structures.
+ epoll_fd_(epoll_create(FD_SETSIZE)),
+#endif
+#if defined(WEBRTC_WIN)
+ socket_ev_(WSACreateEvent()),
+#endif
+ fWait_(false) {
#if defined(WEBRTC_USE_EPOLL)
- // Since Linux 2.6.8, the size argument is ignored, but must be greater than
- // zero. Before that the size served as hint to the kernel for the amount of
- // space to initially allocate in internal data structures.
- epoll_fd_ = epoll_create(FD_SETSIZE);
if (epoll_fd_ == -1) {
// Not an error, will fall back to "select" below.
RTC_LOG_E(LS_WARNING, EN, errno) << "epoll_create";
- epoll_fd_ = INVALID_SOCKET;
+ // Note that -1 == INVALID_SOCKET, the alias used by later checks.
}
#endif
signal_wakeup_ = new Signaler(this, &fWait_);
-#if defined(WEBRTC_WIN)
- socket_ev_ = WSACreateEvent();
-#endif
}
PhysicalSocketServer::~PhysicalSocketServer() {
#if defined(WEBRTC_WIN)
WSACloseEvent(socket_ev_);
#endif
-#if defined(WEBRTC_POSIX)
- signal_dispatcher_.reset();
-#endif
delete signal_wakeup_;
#if defined(WEBRTC_USE_EPOLL)
if (epoll_fd_ != INVALID_SOCKET) {
@@ -1540,12 +1365,6 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
#if defined(WEBRTC_USE_EPOLL)
-// Initial number of events to process with one call to "epoll_wait".
-static const size_t kInitialEpollEvents = 128;
-
-// Maximum number of events to process with one call to "epoll_wait".
-static const size_t kMaxEpollEvents = 8192;
-
void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
int fd = pdispatcher->GetDescriptor();
@@ -1612,20 +1431,13 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
tvStop = TimeAfter(cmsWait);
}
- if (epoll_events_.empty()) {
- // The initial space to receive events is created only if epoll is used.
- epoll_events_.resize(kInitialEpollEvents);
- }
-
fWait_ = true;
-
while (fWait_) {
// Wait then call handlers as appropriate
// < 0 means error
// 0 means timeout
// > 0 means count of descriptors ready
- int n = epoll_wait(epoll_fd_, &epoll_events_[0],
- static_cast<int>(epoll_events_.size()),
+ int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(),
static_cast<int>(tvWait));
if (n < 0) {
if (errno != EINTR) {
@@ -1658,13 +1470,6 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
}
}
- if (static_cast<size_t>(n) == epoll_events_.size() &&
- epoll_events_.size() < kMaxEpollEvents) {
- // We used the complete space to receive events, increase size for future
- // iterations.
- epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
- }
-
if (cmsWait != kForever) {
tvWait = TimeDiff(tvStop, TimeMillis());
if (tvWait < 0) {
@@ -1746,62 +1551,6 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
#endif // WEBRTC_USE_EPOLL
-static void GlobalSignalHandler(int signum) {
- PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
-}
-
-bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
- void (*handler)(int)) {
- // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
- // otherwise set one.
- if (handler == SIG_IGN || handler == SIG_DFL) {
- if (!InstallSignal(signum, handler)) {
- return false;
- }
- if (signal_dispatcher_) {
- signal_dispatcher_->ClearHandler(signum);
- if (!signal_dispatcher_->HasHandlers()) {
- signal_dispatcher_.reset();
- }
- }
- } else {
- if (!signal_dispatcher_) {
- signal_dispatcher_.reset(new PosixSignalDispatcher(this));
- }
- signal_dispatcher_->SetHandler(signum, handler);
- if (!InstallSignal(signum, &GlobalSignalHandler)) {
- return false;
- }
- }
- return true;
-}
-
-Dispatcher* PhysicalSocketServer::signal_dispatcher() {
- return signal_dispatcher_.get();
-}
-
-bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
- struct sigaction act;
- // It doesn't really matter what we set this mask to.
- if (sigemptyset(&act.sa_mask) != 0) {
- RTC_LOG_ERR(LS_ERROR) << "Couldn't set mask";
- return false;
- }
- act.sa_handler = handler;
-#if !defined(__native_client__)
- // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
- // and it's a nuisance. Though some syscalls still return EINTR and there's no
- // real standard for which ones. :(
- act.sa_flags = SA_RESTART;
-#else
- act.sa_flags = 0;
-#endif
- if (sigaction(signum, &act, nullptr) != 0) {
- RTC_LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
- return false;
- }
- return true;
-}
#endif // WEBRTC_POSIX
#if defined(WEBRTC_WIN)
diff --git a/rtc_base/physical_socket_server.h b/rtc_base/physical_socket_server.h
index a71810f3db..7eaf590e3a 100644
--- a/rtc_base/physical_socket_server.h
+++ b/rtc_base/physical_socket_server.h
@@ -16,14 +16,16 @@
#define WEBRTC_USE_EPOLL 1
#endif
+#include <array>
#include <memory>
#include <set>
#include <vector>
-#include "rtc_base/critical_section.h"
+#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/net_helpers.h"
#include "rtc_base/socket_server.h"
#include "rtc_base/system/rtc_export.h"
+#include "rtc_base/thread_annotations.h"
#if defined(WEBRTC_POSIX)
typedef int SOCKET;
@@ -41,9 +43,6 @@ enum DispatcherEvent {
};
class Signaler;
-#if defined(WEBRTC_POSIX)
-class PosixSignalDispatcher;
-#endif
class Dispatcher {
public:
@@ -82,33 +81,16 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer {
void Remove(Dispatcher* dispatcher);
void Update(Dispatcher* dispatcher);
-#if defined(WEBRTC_POSIX)
- // Sets the function to be executed in response to the specified POSIX signal.
- // The function is executed from inside Wait() using the "self-pipe trick"--
- // regardless of which thread receives the signal--and hence can safely
- // manipulate user-level data structures.
- // "handler" may be SIG_IGN, SIG_DFL, or a user-specified function, just like
- // with signal(2).
- // Only one PhysicalSocketServer should have user-level signal handlers.
- // Dispatching signals on multiple PhysicalSocketServers is not reliable.
- // The signal mask is not modified. It is the caller's responsibily to
- // maintain it as desired.
- virtual bool SetPosixSignalHandler(int signum, void (*handler)(int));
-
- protected:
- Dispatcher* signal_dispatcher();
-#endif
-
private:
+ // The number of events to process with one call to "epoll_wait".
+ static constexpr size_t kNumEpollEvents = 128;
+
typedef std::set<Dispatcher*> DispatcherSet;
- void AddRemovePendingDispatchers();
+ void AddRemovePendingDispatchers() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
#if defined(WEBRTC_POSIX)
bool WaitSelect(int cms, bool process_io);
- static bool InstallSignal(int signum, void (*handler)(int));
-
- std::unique_ptr<PosixSignalDispatcher> signal_dispatcher_;
#endif // WEBRTC_POSIX
#if defined(WEBRTC_USE_EPOLL)
void AddEpoll(Dispatcher* dispatcher);
@@ -117,19 +99,23 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer {
bool WaitEpoll(int cms);
bool WaitPoll(int cms, Dispatcher* dispatcher);
- int epoll_fd_ = INVALID_SOCKET;
- std::vector<struct epoll_event> epoll_events_;
+ // This array is accessed in isolation by a thread calling into Wait().
+ // It's useless to use a SequenceChecker to guard it because a socket
+ // server can outlive the thread it's bound to, forcing the Wait call
+ // to have to reset the sequence checker on Wait calls.
+ std::array<epoll_event, kNumEpollEvents> epoll_events_;
+ const int epoll_fd_ = INVALID_SOCKET;
#endif // WEBRTC_USE_EPOLL
- DispatcherSet dispatchers_;
- DispatcherSet pending_add_dispatchers_;
- DispatcherSet pending_remove_dispatchers_;
- bool processing_dispatchers_ = false;
- Signaler* signal_wakeup_;
- CriticalSection crit_;
- bool fWait_;
+ DispatcherSet dispatchers_ RTC_GUARDED_BY(crit_);
+ DispatcherSet pending_add_dispatchers_ RTC_GUARDED_BY(crit_);
+ DispatcherSet pending_remove_dispatchers_ RTC_GUARDED_BY(crit_);
+ bool processing_dispatchers_ RTC_GUARDED_BY(crit_) = false;
+ Signaler* signal_wakeup_; // Assigned in constructor only
+ RecursiveCriticalSection crit_;
#if defined(WEBRTC_WIN)
- WSAEVENT socket_ev_;
+ const WSAEVENT socket_ev_;
#endif
+ bool fWait_;
};
class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
@@ -205,7 +191,7 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
SOCKET s_;
bool udp_;
int family_ = 0;
- CriticalSection crit_;
+ RecursiveCriticalSection crit_;
int error_ RTC_GUARDED_BY(crit_);
ConnState state_;
AsyncResolver* resolver_;
diff --git a/rtc_base/physical_socket_server_unittest.cc b/rtc_base/physical_socket_server_unittest.cc
index 5083ca1791..586b9db292 100644
--- a/rtc_base/physical_socket_server_unittest.cc
+++ b/rtc_base/physical_socket_server_unittest.cc
@@ -501,139 +501,6 @@ TEST_F(PhysicalSocketTest,
server_->set_network_binder(nullptr);
}
-class PosixSignalDeliveryTest : public ::testing::Test {
- public:
- static void RecordSignal(int signum) {
- signals_received_.push_back(signum);
- signaled_thread_ = Thread::Current();
- }
-
- protected:
- void SetUp() override { ss_.reset(new PhysicalSocketServer()); }
-
- void TearDown() override {
- ss_.reset(nullptr);
- signals_received_.clear();
- signaled_thread_ = nullptr;
- }
-
- bool ExpectSignal(int signum) {
- if (signals_received_.empty()) {
- RTC_LOG(LS_ERROR) << "ExpectSignal(): No signal received";
- return false;
- }
- if (signals_received_[0] != signum) {
- RTC_LOG(LS_ERROR) << "ExpectSignal(): Received signal "
- << signals_received_[0] << ", expected " << signum;
- return false;
- }
- signals_received_.erase(signals_received_.begin());
- return true;
- }
-
- bool ExpectNone() {
- bool ret = signals_received_.empty();
- if (!ret) {
- RTC_LOG(LS_ERROR) << "ExpectNone(): Received signal "
- << signals_received_[0] << ", expected none";
- }
- return ret;
- }
-
- static std::vector<int> signals_received_;
- static Thread* signaled_thread_;
-
- std::unique_ptr<PhysicalSocketServer> ss_;
-};
-
-std::vector<int> PosixSignalDeliveryTest::signals_received_;
-Thread* PosixSignalDeliveryTest::signaled_thread_ = nullptr;
-
-// Test receiving a synchronous signal while not in Wait() and then entering
-// Wait() afterwards.
-// TODO(webrtc:7864): Fails on real iOS devices
-#if defined(WEBRTC_IOS) && defined(WEBRTC_ARCH_ARM_FAMILY)
-#define MAYBE_RaiseThenWait DISABLED_RaiseThenWait
-#else
-#define MAYBE_RaiseThenWait RaiseThenWait
-#endif
-TEST_F(PosixSignalDeliveryTest, MAYBE_RaiseThenWait) {
- ASSERT_TRUE(ss_->SetPosixSignalHandler(SIGTERM, &RecordSignal));
- raise(SIGTERM);
- EXPECT_TRUE(ss_->Wait(0, true));
- EXPECT_TRUE(ExpectSignal(SIGTERM));
- EXPECT_TRUE(ExpectNone());
-}
-
-// Test that we can handle getting tons of repeated signals and that we see all
-// the different ones.
-// TODO(webrtc:7864): Fails on real iOS devices
-#if defined(WEBRTC_IOS) && defined(WEBRTC_ARCH_ARM_FAMILY)
-#define MAYBE_InsanelyManySignals DISABLED_InsanelyManySignals
-#else
-#define MAYBE_InsanelyManySignals InsanelyManySignals
-#endif
-TEST_F(PosixSignalDeliveryTest, MAYBE_InsanelyManySignals) {
- ss_->SetPosixSignalHandler(SIGTERM, &RecordSignal);
- ss_->SetPosixSignalHandler(SIGINT, &RecordSignal);
- for (int i = 0; i < 10000; ++i) {
- raise(SIGTERM);
- }
- raise(SIGINT);
- EXPECT_TRUE(ss_->Wait(0, true));
- // Order will be lowest signal numbers first.
- EXPECT_TRUE(ExpectSignal(SIGINT));
- EXPECT_TRUE(ExpectSignal(SIGTERM));
- EXPECT_TRUE(ExpectNone());
-}
-
-// Test that a signal during a Wait() call is detected.
-TEST_F(PosixSignalDeliveryTest, SignalDuringWait) {
- ss_->SetPosixSignalHandler(SIGALRM, &RecordSignal);
- alarm(1);
- EXPECT_TRUE(ss_->Wait(1500, true));
- EXPECT_TRUE(ExpectSignal(SIGALRM));
- EXPECT_TRUE(ExpectNone());
-}
-
-// Test that it works no matter what thread the kernel chooses to give the
-// signal to (since it's not guaranteed to be the one that Wait() runs on).
-// TODO(webrtc:7864): Fails on real iOS devices
-#if defined(WEBRTC_IOS) && defined(WEBRTC_ARCH_ARM_FAMILY)
-#define MAYBE_SignalOnDifferentThread DISABLED_SignalOnDifferentThread
-#else
-#define MAYBE_SignalOnDifferentThread SignalOnDifferentThread
-#endif
-TEST_F(PosixSignalDeliveryTest, DISABLED_SignalOnDifferentThread) {
- ss_->SetPosixSignalHandler(SIGTERM, &RecordSignal);
- // Mask out SIGTERM so that it can't be delivered to this thread.
- sigset_t mask;
- sigemptyset(&mask);
- sigaddset(&mask, SIGTERM);
- EXPECT_EQ(0, pthread_sigmask(SIG_SETMASK, &mask, nullptr));
- // Start a new thread that raises it. It will have to be delivered to that
- // thread. Our implementation should safely handle it and dispatch
- // RecordSignal() on this thread.
- std::unique_ptr<Thread> thread(Thread::CreateWithSocketServer());
- thread->Start();
- thread->PostTask(RTC_FROM_HERE, [&thread]() {
- thread->socketserver()->Wait(1000, false);
- // Allow SIGTERM. This will be the only thread with it not masked so it will
- // be delivered to us.
- sigset_t mask;
- sigemptyset(&mask);
- pthread_sigmask(SIG_SETMASK, &mask, nullptr);
-
- // Raise it.
- raise(SIGTERM);
- });
-
- EXPECT_TRUE(ss_->Wait(1500, true));
- EXPECT_TRUE(ExpectSignal(SIGTERM));
- EXPECT_EQ(Thread::Current(), signaled_thread_);
- EXPECT_TRUE(ExpectNone());
-}
-
#endif
} // namespace rtc
diff --git a/rtc_base/platform_thread_types.cc b/rtc_base/platform_thread_types.cc
index ed4a228262..b0243b41dc 100644
--- a/rtc_base/platform_thread_types.cc
+++ b/rtc_base/platform_thread_types.cc
@@ -15,6 +15,16 @@
#include <sys/syscall.h>
#endif
+#if defined(WEBRTC_WIN)
+#include "rtc_base/arraysize.h"
+
+// The SetThreadDescription API was brought in version 1607 of Windows 10.
+// For compatibility with various versions of winuser and avoid clashing with
+// a potentially defined type, we use the RTC_ prefix.
+typedef HRESULT(WINAPI* RTC_SetThreadDescription)(HANDLE hThread,
+ PCWSTR lpThreadDescription);
+#endif
+
namespace rtc {
PlatformThreadId CurrentThreadId() {
@@ -58,6 +68,24 @@ bool IsThreadRefEqual(const PlatformThreadRef& a, const PlatformThreadRef& b) {
void SetCurrentThreadName(const char* name) {
#if defined(WEBRTC_WIN)
+ // The SetThreadDescription API works even if no debugger is attached.
+ // The names set with this API also show up in ETW traces. Very handy.
+ static auto set_thread_description_func =
+ reinterpret_cast<RTC_SetThreadDescription>(::GetProcAddress(
+ ::GetModuleHandleA("Kernel32.dll"), "SetThreadDescription"));
+ if (set_thread_description_func) {
+ // Convert from ASCII to UTF-16.
+ wchar_t wide_thread_name[64];
+ for (size_t i = 0; i < arraysize(wide_thread_name) - 1; ++i) {
+ wide_thread_name[i] = name[i];
+ if (wide_thread_name[i] == L'\0')
+ break;
+ }
+ // Guarantee null-termination.
+ wide_thread_name[arraysize(wide_thread_name) - 1] = L'\0';
+ set_thread_description_func(::GetCurrentThread(), wide_thread_name);
+ }
+
// For details see:
// https://docs.microsoft.com/en-us/visualstudio/debugger/how-to-set-a-thread-name-in-native-code
#pragma pack(push, 8)
diff --git a/rtc_base/rate_limiter.cc b/rtc_base/rate_limiter.cc
index 7394c3eb89..0f3f343aed 100644
--- a/rtc_base/rate_limiter.cc
+++ b/rtc_base/rate_limiter.cc
@@ -31,7 +31,7 @@ RateLimiter::~RateLimiter() {}
// calling SetMaxRate() and a timed maintenance thread periodically updating
// the RTT.
bool RateLimiter::TryUseRate(size_t packet_size_bytes) {
- rtc::CritScope cs(&lock_);
+ MutexLock lock(&lock_);
int64_t now_ms = clock_->TimeInMilliseconds();
absl::optional<uint32_t> current_rate = current_rate_.Rate(now_ms);
if (current_rate) {
@@ -53,14 +53,14 @@ bool RateLimiter::TryUseRate(size_t packet_size_bytes) {
}
void RateLimiter::SetMaxRate(uint32_t max_rate_bps) {
- rtc::CritScope cs(&lock_);
+ MutexLock lock(&lock_);
max_rate_bps_ = max_rate_bps;
}
// Set the window size over which to measure the current bitrate.
// For retransmissions, this is typically the RTT.
bool RateLimiter::SetWindowSize(int64_t window_size_ms) {
- rtc::CritScope cs(&lock_);
+ MutexLock lock(&lock_);
window_size_ms_ = window_size_ms;
return current_rate_.SetWindowSize(window_size_ms,
clock_->TimeInMilliseconds());
diff --git a/rtc_base/rate_limiter.h b/rtc_base/rate_limiter.h
index 1c956d788b..051ccf6aa6 100644
--- a/rtc_base/rate_limiter.h
+++ b/rtc_base/rate_limiter.h
@@ -15,8 +15,8 @@
#include <stdint.h>
#include "rtc_base/constructor_magic.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/rate_statistics.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@@ -45,7 +45,7 @@ class RateLimiter {
private:
Clock* const clock_;
- rtc::CriticalSection lock_;
+ Mutex lock_;
RateStatistics current_rate_ RTC_GUARDED_BY(lock_);
int64_t window_size_ms_ RTC_GUARDED_BY(lock_);
uint32_t max_rate_bps_ RTC_GUARDED_BY(lock_);
diff --git a/rtc_base/rate_statistics.cc b/rtc_base/rate_statistics.cc
index c4c2e78581..85621fa555 100644
--- a/rtc_base/rate_statistics.cc
+++ b/rtc_base/rate_statistics.cc
@@ -20,29 +20,26 @@
namespace webrtc {
+RateStatistics::Bucket::Bucket(int64_t timestamp)
+ : sum(0), num_samples(0), timestamp(timestamp) {}
+
RateStatistics::RateStatistics(int64_t window_size_ms, float scale)
- : buckets_(new Bucket[window_size_ms]()),
- accumulated_count_(0),
+ : accumulated_count_(0),
+ first_timestamp_(-1),
num_samples_(0),
- oldest_time_(-window_size_ms),
- oldest_index_(0),
scale_(scale),
max_window_size_ms_(window_size_ms),
current_window_size_ms_(max_window_size_ms_) {}
RateStatistics::RateStatistics(const RateStatistics& other)
- : accumulated_count_(other.accumulated_count_),
+ : buckets_(other.buckets_),
+ accumulated_count_(other.accumulated_count_),
+ first_timestamp_(other.first_timestamp_),
overflow_(other.overflow_),
num_samples_(other.num_samples_),
- oldest_time_(other.oldest_time_),
- oldest_index_(other.oldest_index_),
scale_(other.scale_),
max_window_size_ms_(other.max_window_size_ms_),
- current_window_size_ms_(other.current_window_size_ms_) {
- buckets_ = std::make_unique<Bucket[]>(other.max_window_size_ms_);
- std::copy(other.buckets_.get(),
- other.buckets_.get() + other.max_window_size_ms_, buckets_.get());
-}
+ current_window_size_ms_(other.current_window_size_ms_) {}
RateStatistics::RateStatistics(RateStatistics&& other) = default;
@@ -52,33 +49,33 @@ void RateStatistics::Reset() {
accumulated_count_ = 0;
overflow_ = false;
num_samples_ = 0;
- oldest_time_ = -max_window_size_ms_;
- oldest_index_ = 0;
+ first_timestamp_ = -1;
current_window_size_ms_ = max_window_size_ms_;
- for (int64_t i = 0; i < max_window_size_ms_; i++)
- buckets_[i] = Bucket();
+ buckets_.clear();
}
void RateStatistics::Update(int64_t count, int64_t now_ms) {
- RTC_DCHECK_LE(0, count);
- if (now_ms < oldest_time_) {
- // Too old data is ignored.
- return;
- }
+ RTC_DCHECK_GE(count, 0);
EraseOld(now_ms);
+ if (first_timestamp_ == -1) {
+ first_timestamp_ = now_ms;
+ }
+
+ if (buckets_.empty() || now_ms != buckets_.back().timestamp) {
+ if (!buckets_.empty() && now_ms < buckets_.back().timestamp) {
+ RTC_LOG(LS_WARNING) << "Timestamp " << now_ms
+ << " is before the last added "
+ "timestamp in the rate window: "
+ << buckets_.back().timestamp << ", aligning to that.";
+ now_ms = buckets_.back().timestamp;
+ }
+ buckets_.emplace_back(now_ms);
+ }
+ Bucket& last_bucket = buckets_.back();
+ last_bucket.sum += count;
+ ++last_bucket.num_samples;
- // First ever sample, reset window to start now.
- if (!IsInitialized())
- oldest_time_ = now_ms;
-
- uint32_t now_offset = rtc::dchecked_cast<uint32_t>(now_ms - oldest_time_);
- RTC_DCHECK_LT(now_offset, max_window_size_ms_);
- uint32_t index = oldest_index_ + now_offset;
- if (index >= max_window_size_ms_)
- index -= max_window_size_ms_;
- buckets_[index].sum += count;
- ++buckets_[index].samples;
if (std::numeric_limits<int64_t>::max() - accumulated_count_ > count) {
accumulated_count_ += count;
} else {
@@ -92,10 +89,22 @@ absl::optional<int64_t> RateStatistics::Rate(int64_t now_ms) const {
// of the members as mutable...
const_cast<RateStatistics*>(this)->EraseOld(now_ms);
+ int active_window_size = 0;
+ if (first_timestamp_ != -1) {
+ if (first_timestamp_ <= now_ms - current_window_size_ms_) {
+ // Count window as full even if no data points currently in view, if the
+ // data stream started before the window.
+ active_window_size = current_window_size_ms_;
+ } else {
+ // Size of a single bucket is 1ms, so even if now_ms == first_timestmap_
+ // the window size should be 1.
+ active_window_size = now_ms - first_timestamp_ + 1;
+ }
+ }
+
// If window is a single bucket or there is only one sample in a data set that
// has not grown to the full window size, or if the accumulator has
// overflowed, treat this as rate unavailable.
- int active_window_size = now_ms - oldest_time_ + 1;
if (num_samples_ == 0 || active_window_size <= 1 ||
(num_samples_ <= 1 &&
rtc::SafeLt(active_window_size, current_window_size_ms_)) ||
@@ -114,43 +123,35 @@ absl::optional<int64_t> RateStatistics::Rate(int64_t now_ms) const {
}
void RateStatistics::EraseOld(int64_t now_ms) {
- if (!IsInitialized())
- return;
-
// New oldest time that is included in data set.
- int64_t new_oldest_time = now_ms - current_window_size_ms_ + 1;
-
- // New oldest time is older than the current one, no need to cull data.
- if (new_oldest_time <= oldest_time_)
- return;
+ const int64_t new_oldest_time = now_ms - current_window_size_ms_ + 1;
// Loop over buckets and remove too old data points.
- while (num_samples_ > 0 && oldest_time_ < new_oldest_time) {
- const Bucket& oldest_bucket = buckets_[oldest_index_];
+ while (!buckets_.empty() && buckets_.front().timestamp < new_oldest_time) {
+ const Bucket& oldest_bucket = buckets_.front();
RTC_DCHECK_GE(accumulated_count_, oldest_bucket.sum);
- RTC_DCHECK_GE(num_samples_, oldest_bucket.samples);
+ RTC_DCHECK_GE(num_samples_, oldest_bucket.num_samples);
accumulated_count_ -= oldest_bucket.sum;
- num_samples_ -= oldest_bucket.samples;
- buckets_[oldest_index_] = Bucket();
- if (++oldest_index_ >= max_window_size_ms_)
- oldest_index_ = 0;
- ++oldest_time_;
+ num_samples_ -= oldest_bucket.num_samples;
+ buckets_.pop_front();
// This does not clear overflow_ even when counter is empty.
// TODO(https://bugs.webrtc.org/11247): Consider if overflow_ can be reset.
}
- oldest_time_ = new_oldest_time;
}
bool RateStatistics::SetWindowSize(int64_t window_size_ms, int64_t now_ms) {
if (window_size_ms <= 0 || window_size_ms > max_window_size_ms_)
return false;
+ if (first_timestamp_ != -1) {
+ // If the window changes (e.g. decreases - removing data point, then
+ // increases again) we need to update the first timestamp mark as
+ // otherwise it indicates the window coveres a region of zeros, suddenly
+ // under-estimating the rate.
+ first_timestamp_ = std::max(first_timestamp_, now_ms - window_size_ms + 1);
+ }
current_window_size_ms_ = window_size_ms;
EraseOld(now_ms);
return true;
}
-bool RateStatistics::IsInitialized() const {
- return oldest_time_ != -max_window_size_ms_;
-}
-
} // namespace webrtc
diff --git a/rtc_base/rate_statistics.h b/rtc_base/rate_statistics.h
index 11c8cee7af..dc8d7f5272 100644
--- a/rtc_base/rate_statistics.h
+++ b/rtc_base/rate_statistics.h
@@ -14,6 +14,7 @@
#include <stddef.h>
#include <stdint.h>
+#include <deque>
#include <memory>
#include "absl/types/optional.h"
@@ -28,6 +29,10 @@ namespace webrtc {
// high; for instance, a 20 Mbit/sec video stream can wrap a 32-bit byte
// counter in 14 minutes.
+// Note that timestamps used in Update(), Rate() and SetWindowSize() must never
+// decrease for two consecutive calls.
+// TODO(bugs.webrtc.org/11600): Migrate from int64_t to Timestamp.
+
class RTC_EXPORT RateStatistics {
public:
static constexpr float kBpsScale = 8000.0f;
@@ -65,19 +70,22 @@ class RTC_EXPORT RateStatistics {
private:
void EraseOld(int64_t now_ms);
- bool IsInitialized() const;
- // Counters are kept in buckets (circular buffer), with one bucket
- // per millisecond.
struct Bucket {
+ explicit Bucket(int64_t timestamp);
int64_t sum; // Sum of all samples in this bucket.
- int samples; // Number of samples in this bucket.
+ int num_samples; // Number of samples in this bucket.
+ const int64_t timestamp; // Timestamp this bucket corresponds to.
};
- std::unique_ptr<Bucket[]> buckets_;
+ // All buckets within the time window, ordered by time.
+ std::deque<Bucket> buckets_;
- // Total count recorded in buckets.
+ // Total count recorded in all buckets.
int64_t accumulated_count_;
+ // Timestamp of the first data point seen, or -1 of none seen.
+ int64_t first_timestamp_;
+
// True if accumulated_count_ has ever grown too large to be
// contained in its integer type.
bool overflow_ = false;
@@ -85,12 +93,6 @@ class RTC_EXPORT RateStatistics {
// The total number of samples in the buckets.
int num_samples_;
- // Oldest time recorded in buckets.
- int64_t oldest_time_;
-
- // Bucket index of oldest counter recorded in buckets.
- int64_t oldest_index_;
-
// To convert counts/ms to desired units
const float scale_;
diff --git a/rtc_base/signal_thread.h b/rtc_base/signal_thread.h
index d9e8ade9b0..b444d54994 100644
--- a/rtc_base/signal_thread.h
+++ b/rtc_base/signal_thread.h
@@ -1,5 +1,5 @@
/*
- * Copyright 2004 The WebRTC Project Authors. All rights reserved.
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
@@ -11,147 +11,9 @@
#ifndef RTC_BASE_SIGNAL_THREAD_H_
#define RTC_BASE_SIGNAL_THREAD_H_
-#include <string>
-
-#include "rtc_base/checks.h"
-#include "rtc_base/constructor_magic.h"
-#include "rtc_base/critical_section.h"
-#include "rtc_base/message_handler.h"
-#include "rtc_base/third_party/sigslot/sigslot.h"
-#include "rtc_base/thread.h"
-#include "rtc_base/thread_annotations.h"
-
-namespace rtc {
-
-///////////////////////////////////////////////////////////////////////////////
-// SignalThread - Base class for worker threads. The main thread should call
-// Start() to begin work, and then follow one of these models:
-// Normal: Wait for SignalWorkDone, and then call Release to destroy.
-// Cancellation: Call Release(true), to abort the worker thread.
-// Fire-and-forget: Call Release(false), which allows the thread to run to
-// completion, and then self-destruct without further notification.
-// Periodic tasks: Wait for SignalWorkDone, then eventually call Start()
-// again to repeat the task. When the instance isn't needed anymore,
-// call Release. DoWork, OnWorkStart and OnWorkStop are called again,
-// on a new thread.
-// The subclass should override DoWork() to perform the background task. By
-// periodically calling ContinueWork(), it can check for cancellation.
-// OnWorkStart and OnWorkDone can be overridden to do pre- or post-work
-// tasks in the context of the main thread.
-///////////////////////////////////////////////////////////////////////////////
-
-class SignalThread : public sigslot::has_slots<>, protected MessageHandler {
- public:
- SignalThread();
-
- // Context: Main Thread. Call before Start to change the worker's name.
- bool SetName(const std::string& name, const void* obj);
-
- // Context: Main Thread. Call to begin the worker thread.
- void Start();
-
- // Context: Main Thread. If the worker thread is not running, deletes the
- // object immediately. Otherwise, asks the worker thread to abort processing,
- // and schedules the object to be deleted once the worker exits.
- // SignalWorkDone will not be signalled. If wait is true, does not return
- // until the thread is deleted.
- void Destroy(bool wait);
-
- // Context: Main Thread. If the worker thread is complete, deletes the
- // object immediately. Otherwise, schedules the object to be deleted once
- // the worker thread completes. SignalWorkDone will be signalled.
- void Release();
-
- // Context: Main Thread. Signalled when work is complete.
- sigslot::signal1<SignalThread*> SignalWorkDone;
-
- enum { ST_MSG_WORKER_DONE, ST_MSG_FIRST_AVAILABLE };
-
- protected:
- ~SignalThread() override;
-
- Thread* worker() { return &worker_; }
-
- // Context: Main Thread. Subclass should override to do pre-work setup.
- virtual void OnWorkStart() {}
-
- // Context: Worker Thread. Subclass should override to do work.
- virtual void DoWork() = 0;
-
- // Context: Worker Thread. Subclass should call periodically to
- // dispatch messages and determine if the thread should terminate.
- bool ContinueWork();
-
- // Context: Worker Thread. Subclass should override when extra work is
- // needed to abort the worker thread.
- virtual void OnWorkStop() {}
-
- // Context: Main Thread. Subclass should override to do post-work cleanup.
- virtual void OnWorkDone() {}
-
- // Context: Any Thread. If subclass overrides, be sure to call the base
- // implementation. Do not use (message_id < ST_MSG_FIRST_AVAILABLE)
- void OnMessage(Message* msg) override;
-
- private:
- enum State {
- kInit, // Initialized, but not started
- kRunning, // Started and doing work
- kReleasing, // Same as running, but to be deleted when work is done
- kComplete, // Work is done
- kStopping, // Work is being interrupted
- };
-
- class Worker : public Thread {
- public:
- explicit Worker(SignalThread* parent);
- ~Worker() override;
- void Run() override;
- bool IsProcessingMessagesForTesting() override;
-
- private:
- SignalThread* parent_;
-
- RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(Worker);
- };
-
- class RTC_SCOPED_LOCKABLE EnterExit {
- public:
- explicit EnterExit(SignalThread* t) RTC_EXCLUSIVE_LOCK_FUNCTION(t->cs_)
- : t_(t) {
- t_->cs_.Enter();
- // If refcount_ is zero then the object has already been deleted and we
- // will be double-deleting it in ~EnterExit()! (shouldn't happen)
- RTC_DCHECK_NE(0, t_->refcount_);
- ++t_->refcount_;
- }
- ~EnterExit() RTC_UNLOCK_FUNCTION() {
- bool d = (0 == --t_->refcount_);
- t_->cs_.Leave();
- if (d)
- delete t_;
- }
-
- private:
- SignalThread* t_;
-
- RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(EnterExit);
- };
-
- void Run();
- void OnMainThreadDestroyed();
-
- Thread* main_;
- Worker worker_;
- CriticalSection cs_;
- State state_;
- int refcount_;
-
- RTC_DISALLOW_COPY_AND_ASSIGN(SignalThread);
-};
-
-///////////////////////////////////////////////////////////////////////////////
-
-} // namespace rtc
+// The facilities in this file have been deprecated. Please do not use them
+// in new code. New code should use factilities exposed by api/task_queue/
+// instead.
+#include "rtc_base/deprecated/signal_thread.h"
#endif // RTC_BASE_SIGNAL_THREAD_H_
diff --git a/rtc_base/ssl_adapter_unittest.cc b/rtc_base/ssl_adapter_unittest.cc
index 125b4bd50d..498eba312b 100644
--- a/rtc_base/ssl_adapter_unittest.cc
+++ b/rtc_base/ssl_adapter_unittest.cc
@@ -50,7 +50,7 @@ static std::string GetSSLProtocolName(const rtc::SSLMode& ssl_mode) {
class MockCertVerifier : public rtc::SSLCertificateVerifier {
public:
virtual ~MockCertVerifier() = default;
- MOCK_METHOD1(Verify, bool(const rtc::SSLCertificate&));
+ MOCK_METHOD(bool, Verify, (const rtc::SSLCertificate&), (override));
};
// TODO(benwright) - Move to using INSTANTIATE_TEST_SUITE_P instead of using
diff --git a/rtc_base/stream.h b/rtc_base/stream.h
index bfb9dc2c41..dc77a7111c 100644
--- a/rtc_base/stream.h
+++ b/rtc_base/stream.h
@@ -15,7 +15,6 @@
#include "rtc_base/buffer.h"
#include "rtc_base/constructor_magic.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/message_handler.h"
#include "rtc_base/system/rtc_export.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
diff --git a/rtc_base/strings/string_builder_unittest.cc b/rtc_base/strings/string_builder_unittest.cc
index 84717ad1d1..99dfd86292 100644
--- a/rtc_base/strings/string_builder_unittest.cc
+++ b/rtc_base/strings/string_builder_unittest.cc
@@ -59,7 +59,7 @@ TEST(SimpleStringBuilder, StdString) {
// off.
#if (GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)) || !RTC_DCHECK_IS_ON
-TEST(SimpleStringBuilder, BufferOverrunConstCharP) {
+TEST(SimpleStringBuilderDeathTest, BufferOverrunConstCharP) {
char sb_buf[4];
SimpleStringBuilder sb(sb_buf);
const char* const msg = "This is just too much";
@@ -71,7 +71,7 @@ TEST(SimpleStringBuilder, BufferOverrunConstCharP) {
#endif
}
-TEST(SimpleStringBuilder, BufferOverrunStdString) {
+TEST(SimpleStringBuilderDeathTest, BufferOverrunStdString) {
char sb_buf[4];
SimpleStringBuilder sb(sb_buf);
sb << 12;
@@ -84,7 +84,7 @@ TEST(SimpleStringBuilder, BufferOverrunStdString) {
#endif
}
-TEST(SimpleStringBuilder, BufferOverrunInt) {
+TEST(SimpleStringBuilderDeathTest, BufferOverrunInt) {
char sb_buf[4];
SimpleStringBuilder sb(sb_buf);
constexpr int num = -12345;
@@ -100,7 +100,7 @@ TEST(SimpleStringBuilder, BufferOverrunInt) {
#endif
}
-TEST(SimpleStringBuilder, BufferOverrunDouble) {
+TEST(SimpleStringBuilderDeathTest, BufferOverrunDouble) {
char sb_buf[5];
SimpleStringBuilder sb(sb_buf);
constexpr double num = 123.456;
@@ -113,7 +113,7 @@ TEST(SimpleStringBuilder, BufferOverrunDouble) {
#endif
}
-TEST(SimpleStringBuilder, BufferOverrunConstCharPAlreadyFull) {
+TEST(SimpleStringBuilderDeathTest, BufferOverrunConstCharPAlreadyFull) {
char sb_buf[4];
SimpleStringBuilder sb(sb_buf);
sb << 123;
@@ -126,7 +126,7 @@ TEST(SimpleStringBuilder, BufferOverrunConstCharPAlreadyFull) {
#endif
}
-TEST(SimpleStringBuilder, BufferOverrunIntAlreadyFull) {
+TEST(SimpleStringBuilderDeathTest, BufferOverrunIntAlreadyFull) {
char sb_buf[4];
SimpleStringBuilder sb(sb_buf);
sb << "xyz";
diff --git a/rtc_base/swap_queue_unittest.cc b/rtc_base/swap_queue_unittest.cc
index 199ac6b185..3862d850fa 100644
--- a/rtc_base/swap_queue_unittest.cc
+++ b/rtc_base/swap_queue_unittest.cc
@@ -135,7 +135,7 @@ TEST(SwapQueueTest, SuccessfulItemVerifyFunctor) {
}
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
-TEST(SwapQueueTest, UnsuccessfulItemVerifyFunctor) {
+TEST(SwapQueueDeathTest, UnsuccessfulItemVerifyFunctor) {
// Queue item verifier for the test.
auto minus_2_verifier = [](const int& i) { return i > -2; };
SwapQueue<int, decltype(minus_2_verifier)> queue(2, minus_2_verifier);
@@ -148,7 +148,7 @@ TEST(SwapQueueTest, UnsuccessfulItemVerifyFunctor) {
EXPECT_DEATH(result = queue.Insert(&invalid_value), "");
}
-TEST(SwapQueueTest, UnSuccessfulItemVerifyInsert) {
+TEST(SwapQueueDeathTest, UnSuccessfulItemVerifyInsert) {
std::vector<int> template_element(kChunkSize);
SwapQueue<std::vector<int>,
SwapQueueItemVerifier<std::vector<int>, &LengthVerifierFunction>>
@@ -158,7 +158,7 @@ TEST(SwapQueueTest, UnSuccessfulItemVerifyInsert) {
EXPECT_DEATH(result = queue.Insert(&invalid_chunk), "");
}
-TEST(SwapQueueTest, UnSuccessfulItemVerifyRemove) {
+TEST(SwapQueueDeathTest, UnSuccessfulItemVerifyRemove) {
std::vector<int> template_element(kChunkSize);
SwapQueue<std::vector<int>,
SwapQueueItemVerifier<std::vector<int>, &LengthVerifierFunction>>
diff --git a/rtc_base/synchronization/BUILD.gn b/rtc_base/synchronization/BUILD.gn
index 3e7b22d4f9..a79a0486af 100644
--- a/rtc_base/synchronization/BUILD.gn
+++ b/rtc_base/synchronization/BUILD.gn
@@ -12,6 +12,38 @@ if (is_android) {
import("//build/config/android/rules.gni")
}
+rtc_library("yield") {
+ sources = [
+ "yield.cc",
+ "yield.h",
+ ]
+ deps = []
+}
+
+rtc_library("mutex") {
+ sources = [
+ "mutex.cc",
+ "mutex.h",
+ "mutex_critical_section.h",
+ "mutex_pthread.h",
+ ]
+ if (rtc_use_absl_mutex) {
+ sources += [ "mutex_abseil.h" ]
+ }
+
+ deps = [
+ ":yield",
+ "..:checks",
+ "..:macromagic",
+ "..:platform_thread_types",
+ "../system:unused",
+ ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/base:core_headers" ]
+ if (rtc_use_absl_mutex) {
+ absl_deps += [ "//third_party/abseil-cpp/absl/synchronization" ]
+ }
+}
+
rtc_library("rw_lock_wrapper") {
public = [ "rw_lock_wrapper.h" ]
sources = [ "rw_lock_wrapper.cc" ]
@@ -36,10 +68,12 @@ rtc_library("sequence_checker") {
"sequence_checker.h",
]
deps = [
+ ":mutex",
"..:checks",
"..:criticalsection",
"..:macromagic",
"..:platform_thread_types",
+ "..:stringutils",
"../../api/task_queue",
"../system:rtc_export",
]
@@ -50,8 +84,8 @@ rtc_library("yield_policy") {
"yield_policy.cc",
"yield_policy.h",
]
- deps = [
- "..:checks",
+ deps = [ "..:checks" ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/base:config",
"//third_party/abseil-cpp/absl/base:core_headers",
]
@@ -60,11 +94,30 @@ rtc_library("yield_policy") {
if (rtc_include_tests) {
rtc_library("synchronization_unittests") {
testonly = true
- sources = [ "yield_policy_unittest.cc" ]
+ sources = [
+ "mutex_unittest.cc",
+ "yield_policy_unittest.cc",
+ ]
deps = [
+ ":mutex",
+ ":yield",
":yield_policy",
+ "..:checks",
+ "..:macromagic",
+ "..:rtc_base",
"..:rtc_event",
"../../test:test_support",
+ "//third_party/google_benchmark",
+ ]
+ }
+
+ rtc_library("mutex_benchmark") {
+ testonly = true
+ sources = [ "mutex_benchmark.cc" ]
+ deps = [
+ ":mutex",
+ "../system:unused",
+ "//third_party/google_benchmark",
]
}
diff --git a/rtc_base/synchronization/DEPS b/rtc_base/synchronization/DEPS
new file mode 100644
index 0000000000..4ed1f2444b
--- /dev/null
+++ b/rtc_base/synchronization/DEPS
@@ -0,0 +1,11 @@
+specific_include_rules = {
+ "mutex_abseil\.h": [
+ "+absl/synchronization"
+ ],
+ ".*_benchmark\.cc": [
+ "+benchmark",
+ ],
+ ".*_unittest\.cc": [
+ "+benchmark",
+ ]
+}
diff --git a/rtc_base/synchronization/mutex.cc b/rtc_base/synchronization/mutex.cc
new file mode 100644
index 0000000000..6c2d6ff7f0
--- /dev/null
+++ b/rtc_base/synchronization/mutex.cc
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/synchronization/mutex.h"
+
+#include "rtc_base/checks.h"
+#include "rtc_base/synchronization/yield.h"
+
+namespace webrtc {
+
+#if !defined(WEBRTC_ABSL_MUTEX)
+void GlobalMutex::Lock() {
+ while (mutex_locked_.exchange(1)) {
+ YieldCurrentThread();
+ }
+}
+
+void GlobalMutex::Unlock() {
+ int old = mutex_locked_.exchange(0);
+ RTC_DCHECK_EQ(old, 1) << "Unlock called without calling Lock first";
+}
+
+GlobalMutexLock::GlobalMutexLock(GlobalMutex* mutex) : mutex_(mutex) {
+ mutex_->Lock();
+}
+
+GlobalMutexLock::~GlobalMutexLock() {
+ mutex_->Unlock();
+}
+#endif // #if !defined(WEBRTC_ABSL_MUTEX)
+
+} // namespace webrtc
diff --git a/rtc_base/synchronization/mutex.h b/rtc_base/synchronization/mutex.h
new file mode 100644
index 0000000000..1ccbbdcbd5
--- /dev/null
+++ b/rtc_base/synchronization/mutex.h
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_SYNCHRONIZATION_MUTEX_H_
+#define RTC_BASE_SYNCHRONIZATION_MUTEX_H_
+
+#include <atomic>
+
+#include "absl/base/const_init.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/platform_thread_types.h"
+#include "rtc_base/system/unused.h"
+#include "rtc_base/thread_annotations.h"
+
+#if defined(WEBRTC_ABSL_MUTEX)
+#include "rtc_base/synchronization/mutex_abseil.h" // nogncheck
+#elif defined(WEBRTC_WIN)
+#include "rtc_base/synchronization/mutex_critical_section.h"
+#elif defined(WEBRTC_POSIX)
+#include "rtc_base/synchronization/mutex_pthread.h"
+#else
+#error Unsupported platform.
+#endif
+
+namespace webrtc {
+
+// The Mutex guarantees exclusive access and aims to follow Abseil semantics
+// (i.e. non-reentrant etc).
+class RTC_LOCKABLE Mutex final {
+ public:
+ Mutex() = default;
+ Mutex(const Mutex&) = delete;
+ Mutex& operator=(const Mutex&) = delete;
+
+ void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() {
+ rtc::PlatformThreadRef current = CurrentThreadRefAssertingNotBeingHolder();
+ impl_.Lock();
+ // |holder_| changes from 0 to CurrentThreadRef().
+ holder_.store(current, std::memory_order_relaxed);
+ }
+ RTC_WARN_UNUSED_RESULT bool TryLock() RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) {
+ rtc::PlatformThreadRef current = CurrentThreadRefAssertingNotBeingHolder();
+ if (impl_.TryLock()) {
+ // |holder_| changes from 0 to CurrentThreadRef().
+ holder_.store(current, std::memory_order_relaxed);
+ return true;
+ }
+ return false;
+ }
+ void Unlock() RTC_UNLOCK_FUNCTION() {
+ // |holder_| changes from CurrentThreadRef() to 0. If something else than
+ // CurrentThreadRef() is stored in |holder_|, the Unlock results in
+ // undefined behavior as mutexes can't be unlocked from another thread than
+ // the one that locked it, or called while not being locked.
+ holder_.store(0, std::memory_order_relaxed);
+ impl_.Unlock();
+ }
+
+ private:
+ rtc::PlatformThreadRef CurrentThreadRefAssertingNotBeingHolder() {
+ rtc::PlatformThreadRef holder = holder_.load(std::memory_order_relaxed);
+ rtc::PlatformThreadRef current = rtc::CurrentThreadRef();
+ // TODO(bugs.webrtc.org/11567): remove this temporary check after migrating
+ // fully to Mutex.
+ RTC_CHECK_NE(holder, current);
+ return current;
+ }
+
+ MutexImpl impl_;
+ // TODO(bugs.webrtc.org/11567): remove |holder_| after migrating fully to
+ // Mutex.
+ // |holder_| contains the PlatformThreadRef of the thread currently holding
+ // the lock, or 0.
+ // Remarks on the used memory orders: the atomic load in
+ // CurrentThreadRefAssertingNotBeingHolder() observes either of two things:
+ // 1. our own previous write to holder_ with our thread ID.
+ // 2. another thread (with ID y) writing y and then 0 from an initial value of
+ // 0. If we're observing case 1, our own stores are obviously ordered before
+ // the load, and hit the CHECK. If we're observing case 2, the value observed
+ // w.r.t |impl_| being locked depends on the memory order. Since we only care
+ // that it's different from CurrentThreadRef()), we use the more performant
+ // option, memory_order_relaxed.
+ std::atomic<rtc::PlatformThreadRef> holder_ = {0};
+};
+
+// MutexLock, for serializing execution through a scope.
+class RTC_SCOPED_LOCKABLE MutexLock final {
+ public:
+ MutexLock(const MutexLock&) = delete;
+ MutexLock& operator=(const MutexLock&) = delete;
+
+ explicit MutexLock(Mutex* mutex) RTC_EXCLUSIVE_LOCK_FUNCTION(mutex)
+ : mutex_(mutex) {
+ mutex->Lock();
+ }
+ ~MutexLock() RTC_UNLOCK_FUNCTION() { mutex_->Unlock(); }
+
+ private:
+ Mutex* mutex_;
+};
+
+// A mutex used to protect global variables. Do NOT use for other purposes.
+#if defined(WEBRTC_ABSL_MUTEX)
+using GlobalMutex = absl::Mutex;
+using GlobalMutexLock = absl::MutexLock;
+#else
+class RTC_LOCKABLE GlobalMutex final {
+ public:
+ GlobalMutex(const GlobalMutex&) = delete;
+ GlobalMutex& operator=(const GlobalMutex&) = delete;
+
+ constexpr explicit GlobalMutex(absl::ConstInitType /*unused*/)
+ : mutex_locked_(0) {}
+
+ void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION();
+ void Unlock() RTC_UNLOCK_FUNCTION();
+
+ private:
+ std::atomic<int> mutex_locked_; // 0 means lock not taken, 1 means taken.
+};
+
+// GlobalMutexLock, for serializing execution through a scope.
+class RTC_SCOPED_LOCKABLE GlobalMutexLock final {
+ public:
+ GlobalMutexLock(const GlobalMutexLock&) = delete;
+ GlobalMutexLock& operator=(const GlobalMutexLock&) = delete;
+
+ explicit GlobalMutexLock(GlobalMutex* mutex)
+ RTC_EXCLUSIVE_LOCK_FUNCTION(mutex_);
+ ~GlobalMutexLock() RTC_UNLOCK_FUNCTION();
+
+ private:
+ GlobalMutex* mutex_;
+};
+#endif // if defined(WEBRTC_ABSL_MUTEX)
+
+} // namespace webrtc
+
+#endif // RTC_BASE_SYNCHRONIZATION_MUTEX_H_
diff --git a/rtc_base/synchronization/mutex_abseil.h b/rtc_base/synchronization/mutex_abseil.h
new file mode 100644
index 0000000000..4ad1d07eef
--- /dev/null
+++ b/rtc_base/synchronization/mutex_abseil.h
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_SYNCHRONIZATION_MUTEX_ABSEIL_H_
+#define RTC_BASE_SYNCHRONIZATION_MUTEX_ABSEIL_H_
+
+#include "absl/synchronization/mutex.h"
+#include "rtc_base/thread_annotations.h"
+
+namespace webrtc {
+
+class RTC_LOCKABLE MutexImpl final {
+ public:
+ MutexImpl() = default;
+ MutexImpl(const MutexImpl&) = delete;
+ MutexImpl& operator=(const MutexImpl&) = delete;
+
+ void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { mutex_.Lock(); }
+ RTC_WARN_UNUSED_RESULT bool TryLock() RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) {
+ return mutex_.TryLock();
+ }
+ void Unlock() RTC_UNLOCK_FUNCTION() { mutex_.Unlock(); }
+
+ private:
+ absl::Mutex mutex_;
+};
+
+} // namespace webrtc
+
+#endif // RTC_BASE_SYNCHRONIZATION_MUTEX_ABSEIL_H_
diff --git a/rtc_base/synchronization/mutex_benchmark.cc b/rtc_base/synchronization/mutex_benchmark.cc
new file mode 100644
index 0000000000..40adca65d8
--- /dev/null
+++ b/rtc_base/synchronization/mutex_benchmark.cc
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "benchmark/benchmark.h"
+#include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/system/unused.h"
+
+namespace webrtc {
+
+class PerfTestData {
+ public:
+ PerfTestData() : cache_line_barrier_1_(), cache_line_barrier_2_() {
+ cache_line_barrier_1_[0]++; // Avoid 'is not used'.
+ cache_line_barrier_2_[0]++; // Avoid 'is not used'.
+ }
+
+ int AddToCounter(int add) {
+ MutexLock mu(&mu_);
+ my_counter_ += add;
+ return 0;
+ }
+
+ private:
+ uint8_t cache_line_barrier_1_[64];
+ Mutex mu_;
+ uint8_t cache_line_barrier_2_[64];
+ int64_t my_counter_ = 0;
+};
+
+void BM_LockWithMutex(benchmark::State& state) {
+ static PerfTestData test_data;
+ for (auto s : state) {
+ RTC_UNUSED(s);
+ benchmark::DoNotOptimize(test_data.AddToCounter(2));
+ }
+}
+
+BENCHMARK(BM_LockWithMutex)->Threads(1);
+BENCHMARK(BM_LockWithMutex)->Threads(2);
+BENCHMARK(BM_LockWithMutex)->Threads(4);
+BENCHMARK(BM_LockWithMutex)->ThreadPerCpu();
+
+} // namespace webrtc
+
+/*
+
+Results:
+
+NB when reproducing: Remember to turn of power management features such as CPU
+scaling before running!
+
+pthreads (Linux):
+----------------------------------------------------------------------
+Run on (12 X 4500 MHz CPU s)
+CPU Caches:
+ L1 Data 32 KiB (x6)
+ L1 Instruction 32 KiB (x6)
+ L2 Unified 1024 KiB (x6)
+ L3 Unified 8448 KiB (x1)
+Load Average: 0.26, 0.28, 0.44
+----------------------------------------------------------------------
+Benchmark Time CPU Iterations
+----------------------------------------------------------------------
+BM_LockWithMutex/threads:1 13.4 ns 13.4 ns 52192906
+BM_LockWithMutex/threads:2 44.2 ns 88.4 ns 8189944
+BM_LockWithMutex/threads:4 52.0 ns 198 ns 3743244
+BM_LockWithMutex/threads:12 84.9 ns 944 ns 733524
+
+std::mutex performs like the pthread implementation (Linux).
+
+Abseil (Linux):
+----------------------------------------------------------------------
+Run on (12 X 4500 MHz CPU s)
+CPU Caches:
+ L1 Data 32 KiB (x6)
+ L1 Instruction 32 KiB (x6)
+ L2 Unified 1024 KiB (x6)
+ L3 Unified 8448 KiB (x1)
+Load Average: 0.27, 0.24, 0.37
+----------------------------------------------------------------------
+Benchmark Time CPU Iterations
+----------------------------------------------------------------------
+BM_LockWithMutex/threads:1 15.0 ns 15.0 ns 46550231
+BM_LockWithMutex/threads:2 91.1 ns 182 ns 4059212
+BM_LockWithMutex/threads:4 40.8 ns 131 ns 5496560
+BM_LockWithMutex/threads:12 37.0 ns 130 ns 5377668
+
+*/
diff --git a/rtc_base/synchronization/mutex_critical_section.h b/rtc_base/synchronization/mutex_critical_section.h
new file mode 100644
index 0000000000..d206794988
--- /dev/null
+++ b/rtc_base/synchronization/mutex_critical_section.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_SYNCHRONIZATION_MUTEX_CRITICAL_SECTION_H_
+#define RTC_BASE_SYNCHRONIZATION_MUTEX_CRITICAL_SECTION_H_
+
+#if defined(WEBRTC_WIN)
+// clang-format off
+// clang formating would change include order.
+
+// Include winsock2.h before including <windows.h> to maintain consistency with
+// win32.h. To include win32.h directly, it must be broken out into its own
+// build target.
+#include <winsock2.h>
+#include <windows.h>
+#include <sal.h> // must come after windows headers.
+// clang-format on
+
+#include "rtc_base/thread_annotations.h"
+
+namespace webrtc {
+
+class RTC_LOCKABLE MutexImpl final {
+ public:
+ MutexImpl() { InitializeCriticalSection(&critical_section_); }
+ MutexImpl(const MutexImpl&) = delete;
+ MutexImpl& operator=(const MutexImpl&) = delete;
+ ~MutexImpl() { DeleteCriticalSection(&critical_section_); }
+
+ void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() {
+ EnterCriticalSection(&critical_section_);
+ }
+ RTC_WARN_UNUSED_RESULT bool TryLock() RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) {
+ return TryEnterCriticalSection(&critical_section_) != FALSE;
+ }
+ void Unlock() RTC_UNLOCK_FUNCTION() {
+ LeaveCriticalSection(&critical_section_);
+ }
+
+ private:
+ CRITICAL_SECTION critical_section_;
+};
+
+} // namespace webrtc
+
+#endif // #if defined(WEBRTC_WIN)
+#endif // RTC_BASE_SYNCHRONIZATION_MUTEX_CRITICAL_SECTION_H_
diff --git a/rtc_base/synchronization/mutex_pthread.h b/rtc_base/synchronization/mutex_pthread.h
new file mode 100644
index 0000000000..c9496e72c9
--- /dev/null
+++ b/rtc_base/synchronization/mutex_pthread.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_SYNCHRONIZATION_MUTEX_PTHREAD_H_
+#define RTC_BASE_SYNCHRONIZATION_MUTEX_PTHREAD_H_
+
+#if defined(WEBRTC_POSIX)
+
+#include <pthread.h>
+#if defined(WEBRTC_MAC)
+#include <pthread_spis.h>
+#endif
+
+#include "rtc_base/thread_annotations.h"
+
+namespace webrtc {
+
+class RTC_LOCKABLE MutexImpl final {
+ public:
+ MutexImpl() {
+ pthread_mutexattr_t mutex_attribute;
+ pthread_mutexattr_init(&mutex_attribute);
+#if defined(WEBRTC_MAC)
+ pthread_mutexattr_setpolicy_np(&mutex_attribute,
+ _PTHREAD_MUTEX_POLICY_FIRSTFIT);
+#endif
+ pthread_mutex_init(&mutex_, &mutex_attribute);
+ pthread_mutexattr_destroy(&mutex_attribute);
+ }
+ MutexImpl(const MutexImpl&) = delete;
+ MutexImpl& operator=(const MutexImpl&) = delete;
+ ~MutexImpl() { pthread_mutex_destroy(&mutex_); }
+
+ void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { pthread_mutex_lock(&mutex_); }
+ RTC_WARN_UNUSED_RESULT bool TryLock() RTC_EXCLUSIVE_TRYLOCK_FUNCTION(true) {
+ return pthread_mutex_trylock(&mutex_) == 0;
+ }
+ void Unlock() RTC_UNLOCK_FUNCTION() { pthread_mutex_unlock(&mutex_); }
+
+ private:
+ pthread_mutex_t mutex_;
+};
+
+} // namespace webrtc
+#endif // #if defined(WEBRTC_POSIX)
+#endif // RTC_BASE_SYNCHRONIZATION_MUTEX_PTHREAD_H_
diff --git a/rtc_base/synchronization/mutex_unittest.cc b/rtc_base/synchronization/mutex_unittest.cc
new file mode 100644
index 0000000000..6a930bc042
--- /dev/null
+++ b/rtc_base/synchronization/mutex_unittest.cc
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/synchronization/mutex.h"
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "benchmark/benchmark.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/event.h"
+#include "rtc_base/location.h"
+#include "rtc_base/message_handler.h"
+#include "rtc_base/platform_thread.h"
+#include "rtc_base/synchronization/yield.h"
+#include "rtc_base/thread.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+namespace {
+
+using ::rtc::Event;
+using ::rtc::Message;
+using ::rtc::MessageHandler;
+using ::rtc::Thread;
+
+constexpr int kNumThreads = 16;
+
+template <class MutexType>
+class RTC_LOCKABLE RawMutexLocker {
+ public:
+ explicit RawMutexLocker(MutexType& mutex) : mutex_(mutex) {}
+ void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { mutex_.Lock(); }
+ void Unlock() RTC_UNLOCK_FUNCTION() { mutex_.Unlock(); }
+
+ private:
+ MutexType& mutex_;
+};
+
+class RTC_LOCKABLE RawMutexTryLocker {
+ public:
+ explicit RawMutexTryLocker(Mutex& mutex) : mutex_(mutex) {}
+ void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() {
+ while (!mutex_.TryLock()) {
+ YieldCurrentThread();
+ }
+ }
+ void Unlock() RTC_UNLOCK_FUNCTION() { mutex_.Unlock(); }
+
+ private:
+ Mutex& mutex_;
+};
+
+template <class MutexType, class MutexLockType>
+class MutexLockLocker {
+ public:
+ explicit MutexLockLocker(MutexType& mutex) : mutex_(mutex) {}
+ void Lock() { lock_ = std::make_unique<MutexLockType>(&mutex_); }
+ void Unlock() { lock_ = nullptr; }
+
+ private:
+ MutexType& mutex_;
+ std::unique_ptr<MutexLockType> lock_;
+};
+
+template <class MutexType, class MutexLocker>
+class LockRunner : public MessageHandler {
+ public:
+ template <typename... Args>
+ explicit LockRunner(Args... args)
+ : threads_active_(0),
+ start_event_(true, false),
+ done_event_(true, false),
+ shared_value_(0),
+ mutex_(args...),
+ locker_(mutex_) {}
+
+ bool Run() {
+ // Signal all threads to start.
+ start_event_.Set();
+
+ // Wait for all threads to finish.
+ return done_event_.Wait(kLongTime);
+ }
+
+ void SetExpectedThreadCount(int count) { threads_active_ = count; }
+
+ int shared_value() {
+ int shared_value;
+ locker_.Lock();
+ shared_value = shared_value_;
+ locker_.Unlock();
+ return shared_value_;
+ }
+
+ void OnMessage(Message* msg) override {
+ ASSERT_TRUE(start_event_.Wait(kLongTime));
+ locker_.Lock();
+
+ EXPECT_EQ(0, shared_value_);
+ int old = shared_value_;
+
+ // Use a loop to increase the chance of race. If the |locker_|
+ // implementation is faulty, it would be improbable that the error slips
+ // through.
+ for (int i = 0; i < kOperationsToRun; ++i) {
+ benchmark::DoNotOptimize(++shared_value_);
+ }
+ EXPECT_EQ(old + kOperationsToRun, shared_value_);
+ shared_value_ = 0;
+
+ locker_.Unlock();
+ if (threads_active_.fetch_sub(1) == 1) {
+ done_event_.Set();
+ }
+ }
+
+ private:
+ static constexpr int kLongTime = 10000; // 10 seconds
+ static constexpr int kOperationsToRun = 1000;
+
+ std::atomic<int> threads_active_;
+ Event start_event_;
+ Event done_event_;
+ int shared_value_;
+ MutexType mutex_;
+ MutexLocker locker_;
+};
+
+void StartThreads(std::vector<std::unique_ptr<Thread>>& threads,
+ MessageHandler* handler) {
+ for (int i = 0; i < kNumThreads; ++i) {
+ std::unique_ptr<Thread> thread(Thread::Create());
+ thread->Start();
+ thread->Post(RTC_FROM_HERE, handler);
+ threads.push_back(std::move(thread));
+ }
+}
+
+TEST(MutexTest, ProtectsSharedResourceWithMutexAndRawMutexLocker) {
+ std::vector<std::unique_ptr<Thread>> threads;
+ LockRunner<Mutex, RawMutexLocker<Mutex>> runner;
+ StartThreads(threads, &runner);
+ runner.SetExpectedThreadCount(kNumThreads);
+ EXPECT_TRUE(runner.Run());
+ EXPECT_EQ(0, runner.shared_value());
+}
+
+TEST(MutexTest, ProtectsSharedResourceWithMutexAndRawMutexTryLocker) {
+ std::vector<std::unique_ptr<Thread>> threads;
+ LockRunner<Mutex, RawMutexTryLocker> runner;
+ StartThreads(threads, &runner);
+ runner.SetExpectedThreadCount(kNumThreads);
+ EXPECT_TRUE(runner.Run());
+ EXPECT_EQ(0, runner.shared_value());
+}
+
+TEST(MutexTest, ProtectsSharedResourceWithMutexAndMutexLocker) {
+ std::vector<std::unique_ptr<Thread>> threads;
+ LockRunner<Mutex, MutexLockLocker<Mutex, MutexLock>> runner;
+ StartThreads(threads, &runner);
+ runner.SetExpectedThreadCount(kNumThreads);
+ EXPECT_TRUE(runner.Run());
+ EXPECT_EQ(0, runner.shared_value());
+}
+
+TEST(MutexTest, ProtectsSharedResourceWithGlobalMutexAndRawMutexLocker) {
+ std::vector<std::unique_ptr<Thread>> threads;
+ LockRunner<GlobalMutex, RawMutexLocker<GlobalMutex>> runner(absl::kConstInit);
+ StartThreads(threads, &runner);
+ runner.SetExpectedThreadCount(kNumThreads);
+ EXPECT_TRUE(runner.Run());
+ EXPECT_EQ(0, runner.shared_value());
+}
+
+TEST(MutexTest, ProtectsSharedResourceWithGlobalMutexAndMutexLocker) {
+ std::vector<std::unique_ptr<Thread>> threads;
+ LockRunner<GlobalMutex, MutexLockLocker<GlobalMutex, GlobalMutexLock>> runner(
+ absl::kConstInit);
+ StartThreads(threads, &runner);
+ runner.SetExpectedThreadCount(kNumThreads);
+ EXPECT_TRUE(runner.Run());
+ EXPECT_EQ(0, runner.shared_value());
+}
+
+TEST(MutexTest, GlobalMutexCanHaveStaticStorageDuration) {
+ ABSL_CONST_INIT static GlobalMutex global_lock(absl::kConstInit);
+ global_lock.Lock();
+ global_lock.Unlock();
+}
+
+} // namespace
+} // namespace webrtc
diff --git a/rtc_base/synchronization/sequence_checker.cc b/rtc_base/synchronization/sequence_checker.cc
index d64f32a616..1de26cf0fe 100644
--- a/rtc_base/synchronization/sequence_checker.cc
+++ b/rtc_base/synchronization/sequence_checker.cc
@@ -13,6 +13,8 @@
#include <dispatch/dispatch.h>
#endif
+#include "rtc_base/strings/string_builder.h"
+
namespace webrtc {
namespace {
// On Mac, returns the label of the current dispatch queue; elsewhere, return
@@ -24,8 +26,16 @@ const void* GetSystemQueueRef() {
return nullptr;
#endif
}
+
} // namespace
+std::string ExpectationToString(const webrtc::SequenceChecker* checker) {
+#if RTC_DCHECK_IS_ON
+ return checker->ExpectationToString();
+#endif
+ return std::string();
+}
+
SequenceCheckerImpl::SequenceCheckerImpl()
: attached_(true),
valid_thread_(rtc::CurrentThreadRef()),
@@ -38,7 +48,7 @@ bool SequenceCheckerImpl::IsCurrent() const {
const TaskQueueBase* const current_queue = TaskQueueBase::Current();
const rtc::PlatformThreadRef current_thread = rtc::CurrentThreadRef();
const void* const current_system_queue = GetSystemQueueRef();
- rtc::CritScope scoped_lock(&lock_);
+ MutexLock scoped_lock(&lock_);
if (!attached_) { // Previously detached.
attached_ = true;
valid_thread_ = current_thread;
@@ -56,10 +66,47 @@ bool SequenceCheckerImpl::IsCurrent() const {
}
void SequenceCheckerImpl::Detach() {
- rtc::CritScope scoped_lock(&lock_);
+ MutexLock scoped_lock(&lock_);
attached_ = false;
// We don't need to touch the other members here, they will be
// reset on the next call to IsCurrent().
}
+#if RTC_DCHECK_IS_ON
+std::string SequenceCheckerImpl::ExpectationToString() const {
+ const TaskQueueBase* const current_queue = TaskQueueBase::Current();
+ const rtc::PlatformThreadRef current_thread = rtc::CurrentThreadRef();
+ const void* const current_system_queue = GetSystemQueueRef();
+ MutexLock scoped_lock(&lock_);
+ if (!attached_)
+ return "Checker currently not attached.";
+
+ // The format of the string is meant to compliment the one we have inside of
+ // FatalLog() (checks.cc). Example:
+ //
+ // # Expected: TQ: 0x0 SysQ: 0x7fff69541330 Thread: 0x11dcf6dc0
+ // # Actual: TQ: 0x7fa8f0604190 SysQ: 0x7fa8f0604a30 Thread: 0x700006f1a000
+ // TaskQueue doesn't match
+
+ rtc::StringBuilder message;
+ message.AppendFormat(
+ "# Expected: TQ: %p SysQ: %p Thread: %p\n"
+ "# Actual: TQ: %p SysQ: %p Thread: %p\n",
+ valid_queue_, valid_system_queue_,
+ reinterpret_cast<const void*>(valid_thread_), current_queue,
+ current_system_queue, reinterpret_cast<const void*>(current_thread));
+
+ if ((valid_queue_ || current_queue) && valid_queue_ != current_queue) {
+ message << "TaskQueue doesn't match\n";
+ } else if (valid_system_queue_ &&
+ valid_system_queue_ != current_system_queue) {
+ message << "System queue doesn't match\n";
+ } else if (!rtc::IsThreadRefEqual(valid_thread_, current_thread)) {
+ message << "Threads don't match\n";
+ }
+
+ return message.Release();
+}
+#endif // RTC_DCHECK_IS_ON
+
} // namespace webrtc
diff --git a/rtc_base/synchronization/sequence_checker.h b/rtc_base/synchronization/sequence_checker.h
index fe644fa14e..ecf8490cec 100644
--- a/rtc_base/synchronization/sequence_checker.h
+++ b/rtc_base/synchronization/sequence_checker.h
@@ -10,9 +10,11 @@
#ifndef RTC_BASE_SYNCHRONIZATION_SEQUENCE_CHECKER_H_
#define RTC_BASE_SYNCHRONIZATION_SEQUENCE_CHECKER_H_
+#include <type_traits>
+
#include "api/task_queue/task_queue_base.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/platform_thread_types.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/rtc_export.h"
#include "rtc_base/thread_annotations.h"
@@ -34,8 +36,13 @@ class RTC_EXPORT SequenceCheckerImpl {
// used exclusively on another thread.
void Detach();
+ // Returns a string that is formatted to match with the error string printed
+ // by RTC_CHECK() when a condition is not met.
+ // This is used in conjunction with the RTC_DCHECK_RUN_ON() macro.
+ std::string ExpectationToString() const;
+
private:
- rtc::CriticalSection lock_;
+ mutable Mutex lock_;
// These are mutable so that IsCurrent can set them.
mutable bool attached_ RTC_GUARDED_BY(lock_);
mutable rtc::PlatformThreadRef valid_thread_ RTC_GUARDED_BY(lock_);
@@ -162,8 +169,19 @@ class RTC_SCOPED_LOCKABLE SequenceCheckerScope {
#define RTC_RUN_ON(x) \
RTC_THREAD_ANNOTATION_ATTRIBUTE__(exclusive_locks_required(x))
+namespace webrtc {
+std::string ExpectationToString(const webrtc::SequenceChecker* checker);
+
+// Catch-all implementation for types other than explicitly supported above.
+template <typename ThreadLikeObject>
+std::string ExpectationToString(const ThreadLikeObject*) {
+ return std::string();
+}
+
+} // namespace webrtc
+
#define RTC_DCHECK_RUN_ON(x) \
webrtc::webrtc_seq_check_impl::SequenceCheckerScope seq_check_scope(x); \
- RTC_DCHECK((x)->IsCurrent())
+ RTC_DCHECK((x)->IsCurrent()) << webrtc::ExpectationToString(x)
#endif // RTC_BASE_SYNCHRONIZATION_SEQUENCE_CHECKER_H_
diff --git a/rtc_base/synchronization/sequence_checker_unittest.cc b/rtc_base/synchronization/sequence_checker_unittest.cc
index 1e62e9759b..6fcb522c54 100644
--- a/rtc_base/synchronization/sequence_checker_unittest.cc
+++ b/rtc_base/synchronization/sequence_checker_unittest.cc
@@ -31,7 +31,7 @@ class CompileTimeTestForGuardedBy {
int CalledOnSequence() RTC_RUN_ON(sequence_checker_) { return guarded_; }
void CallMeFromSequence() {
- RTC_DCHECK_RUN_ON(&sequence_checker_) << "Should be called on sequence";
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
guarded_ = 41;
}
@@ -158,7 +158,12 @@ void TestAnnotationsOnWrongQueue() {
}
#if RTC_DCHECK_IS_ON
-TEST(SequenceCheckerTest, TestAnnotationsOnWrongQueueDebug) {
+// Note: Ending the test suite name with 'DeathTest' is important as it causes
+// gtest to order this test before any other non-death-tests, to avoid potential
+// global process state pollution such as shared worker threads being started
+// (e.g. a side effect of calling InitCocoaMultiThreading() on Mac causes one or
+// two additional threads to be created).
+TEST(SequenceCheckerDeathTest, TestAnnotationsOnWrongQueueDebug) {
ASSERT_DEATH({ TestAnnotationsOnWrongQueue(); }, "");
}
#else
diff --git a/rtc_base/synchronization/yield.cc b/rtc_base/synchronization/yield.cc
new file mode 100644
index 0000000000..cbb58d12ab
--- /dev/null
+++ b/rtc_base/synchronization/yield.cc
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/synchronization/yield.h"
+
+#if defined(WEBRTC_WIN)
+#include <windows.h>
+#else
+#include <sched.h>
+#include <time.h>
+#endif
+
+namespace webrtc {
+
+void YieldCurrentThread() {
+ // TODO(bugs.webrtc.org/11634): use dedicated OS functionality instead of
+ // sleep for yielding.
+#if defined(WEBRTC_WIN)
+ ::Sleep(0);
+#elif defined(WEBRTC_MAC) && defined(RTC_USE_NATIVE_MUTEX_ON_MAC) && \
+ !RTC_USE_NATIVE_MUTEX_ON_MAC
+ sched_yield();
+#else
+ static const struct timespec ts_null = {0};
+ nanosleep(&ts_null, nullptr);
+#endif
+}
+
+} // namespace webrtc
diff --git a/rtc_base/synchronization/yield.h b/rtc_base/synchronization/yield.h
new file mode 100644
index 0000000000..d4f5f99f37
--- /dev/null
+++ b/rtc_base/synchronization/yield.h
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2020 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef RTC_BASE_SYNCHRONIZATION_YIELD_H_
+#define RTC_BASE_SYNCHRONIZATION_YIELD_H_
+
+namespace webrtc {
+
+// Request rescheduling of threads.
+void YieldCurrentThread();
+
+} // namespace webrtc
+
+#endif // RTC_BASE_SYNCHRONIZATION_YIELD_H_
diff --git a/rtc_base/synchronization/yield_policy_unittest.cc b/rtc_base/synchronization/yield_policy_unittest.cc
index e0c622510a..0bf38f4537 100644
--- a/rtc_base/synchronization/yield_policy_unittest.cc
+++ b/rtc_base/synchronization/yield_policy_unittest.cc
@@ -20,7 +20,7 @@ namespace rtc {
namespace {
class MockYieldHandler : public YieldInterface {
public:
- MOCK_METHOD0(YieldExecution, void());
+ MOCK_METHOD(void, YieldExecution, (), (override));
};
} // namespace
TEST(YieldPolicyTest, HandlerReceivesYieldSignalWhenSet) {
diff --git a/rtc_base/system/BUILD.gn b/rtc_base/system/BUILD.gn
index 79cb301038..fdb3f96e00 100644
--- a/rtc_base/system/BUILD.gn
+++ b/rtc_base/system/BUILD.gn
@@ -58,7 +58,7 @@ if (is_mac || is_ios) {
"cocoa_threading.mm",
]
deps = [ "..:checks" ]
- libs = [ "Foundation.framework" ]
+ frameworks = [ "Foundation.framework" ]
}
rtc_library("gcd_helpers") {
@@ -72,13 +72,14 @@ if (is_mac || is_ios) {
rtc_source_set("thread_registry") {
sources = [ "thread_registry.h" ]
- deps = [ "..:rtc_base_approved" ]
+ deps = [
+ "..:rtc_base_approved",
+ "../synchronization:mutex",
+ ]
if (is_android && !build_with_chromium) {
sources += [ "thread_registry.cc" ]
- deps += [
- "../../sdk/android:native_api_stacktrace",
- "//third_party/abseil-cpp/absl/base:core_headers",
- ]
+ deps += [ "../../sdk/android:native_api_stacktrace" ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/base:core_headers" ]
}
}
diff --git a/rtc_base/system/file_wrapper.h b/rtc_base/system/file_wrapper.h
index 24c333a6c3..42c463cb15 100644
--- a/rtc_base/system/file_wrapper.h
+++ b/rtc_base/system/file_wrapper.h
@@ -14,7 +14,7 @@
#include <stddef.h>
#include <stdio.h>
-#include "rtc_base/critical_section.h"
+#include <string>
// Implementation that can read (exclusive) or write from/to a file.
diff --git a/rtc_base/system/thread_registry.cc b/rtc_base/system/thread_registry.cc
index 86605446c7..b0e83ca1e9 100644
--- a/rtc_base/system/thread_registry.cc
+++ b/rtc_base/system/thread_registry.cc
@@ -14,9 +14,9 @@
#include <utility>
#include "absl/base/attributes.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/logging.h"
#include "rtc_base/platform_thread_types.h"
+#include "rtc_base/synchronization/mutex.h"
#include "sdk/android/native_api/stacktrace/stacktrace.h"
namespace webrtc {
@@ -30,7 +30,7 @@ struct ThreadData {
// The map of registered threads, and the lock that protects it. We create the
// map on first use, and never destroy it.
-ABSL_CONST_INIT rtc::GlobalLock g_thread_registry_lock;
+ABSL_CONST_INIT GlobalMutex g_thread_registry_lock(absl::kConstInit);
ABSL_CONST_INIT std::map<const ScopedRegisterThreadForDebugging*, ThreadData>*
g_registered_threads = nullptr;
@@ -38,7 +38,7 @@ ABSL_CONST_INIT std::map<const ScopedRegisterThreadForDebugging*, ThreadData>*
ScopedRegisterThreadForDebugging::ScopedRegisterThreadForDebugging(
rtc::Location location) {
- rtc::GlobalLockScope gls(&g_thread_registry_lock);
+ GlobalMutexLock gls(&g_thread_registry_lock);
if (g_registered_threads == nullptr) {
g_registered_threads =
new std::map<const ScopedRegisterThreadForDebugging*, ThreadData>();
@@ -49,14 +49,14 @@ ScopedRegisterThreadForDebugging::ScopedRegisterThreadForDebugging(
}
ScopedRegisterThreadForDebugging::~ScopedRegisterThreadForDebugging() {
- rtc::GlobalLockScope gls(&g_thread_registry_lock);
+ GlobalMutexLock gls(&g_thread_registry_lock);
RTC_DCHECK(g_registered_threads != nullptr);
const int num_erased = g_registered_threads->erase(this);
RTC_DCHECK_EQ(num_erased, 1);
}
void PrintStackTracesOfRegisteredThreads() {
- rtc::GlobalLockScope gls(&g_thread_registry_lock);
+ GlobalMutexLock gls(&g_thread_registry_lock);
if (g_registered_threads == nullptr) {
return;
}
diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc
index 349a5f21fc..38660cd5a2 100644
--- a/rtc_base/task_queue_libevent.cc
+++ b/rtc_base/task_queue_libevent.cc
@@ -29,11 +29,11 @@
#include "api/task_queue/task_queue_base.h"
#include "base/third_party/libevent/event.h"
#include "rtc_base/checks.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/platform_thread_types.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/time_utils.h"
@@ -130,7 +130,7 @@ class TaskQueueLibevent final : public TaskQueueBase {
event_base* event_base_;
event wakeup_event_;
rtc::PlatformThread thread_;
- rtc::CriticalSection pending_lock_;
+ Mutex pending_lock_;
absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_
RTC_GUARDED_BY(pending_lock_);
// Holds a list of events pending timers for cleanup when the loop exits.
@@ -216,7 +216,7 @@ void TaskQueueLibevent::Delete() {
void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
{
- rtc::CritScope lock(&pending_lock_);
+ MutexLock lock(&pending_lock_);
bool had_pending_tasks = !pending_.empty();
pending_.push_back(std::move(task));
@@ -282,7 +282,7 @@ void TaskQueueLibevent::OnWakeup(int socket,
case kRunTasks: {
absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;
{
- rtc::CritScope lock(&me->pending_lock_);
+ MutexLock lock(&me->pending_lock_);
tasks.swap(me->pending_);
}
RTC_DCHECK(!tasks.empty());
diff --git a/rtc_base/task_queue_stdlib.cc b/rtc_base/task_queue_stdlib.cc
index 7052f7c6db..5de634512e 100644
--- a/rtc_base/task_queue_stdlib.cc
+++ b/rtc_base/task_queue_stdlib.cc
@@ -22,10 +22,10 @@
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/checks.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/time_utils.h"
@@ -97,7 +97,7 @@ class TaskQueueStdlib final : public TaskQueueBase {
// tasks (including delayed tasks).
rtc::PlatformThread thread_;
- rtc::CriticalSection pending_lock_;
+ Mutex pending_lock_;
// Indicates if the worker thread needs to shutdown now.
bool thread_should_quit_ RTC_GUARDED_BY(pending_lock_){false};
@@ -135,7 +135,7 @@ void TaskQueueStdlib::Delete() {
RTC_DCHECK(!IsCurrent());
{
- rtc::CritScope lock(&pending_lock_);
+ MutexLock lock(&pending_lock_);
thread_should_quit_ = true;
}
@@ -148,7 +148,7 @@ void TaskQueueStdlib::Delete() {
void TaskQueueStdlib::PostTask(std::unique_ptr<QueuedTask> task) {
{
- rtc::CritScope lock(&pending_lock_);
+ MutexLock lock(&pending_lock_);
OrderId order = thread_posting_order_++;
pending_queue_.push(std::pair<OrderId, std::unique_ptr<QueuedTask>>(
@@ -166,7 +166,7 @@ void TaskQueueStdlib::PostDelayedTask(std::unique_ptr<QueuedTask> task,
delay.next_fire_at_ms_ = fire_at;
{
- rtc::CritScope lock(&pending_lock_);
+ MutexLock lock(&pending_lock_);
delay.order_ = ++thread_posting_order_;
delayed_queue_[delay] = std::move(task);
}
@@ -179,7 +179,7 @@ TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
auto tick = rtc::TimeMillis();
- rtc::CritScope lock(&pending_lock_);
+ MutexLock lock(&pending_lock_);
if (thread_should_quit_) {
result.final_task_ = true;
diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc
index 8c11b8764a..5eb3776cea 100644
--- a/rtc_base/task_queue_win.cc
+++ b/rtc_base/task_queue_win.cc
@@ -33,12 +33,12 @@
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/arraysize.h"
#include "rtc_base/checks.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/time_utils.h"
+#include "rtc_base/synchronization/mutex.h"
namespace webrtc {
namespace {
@@ -205,7 +205,7 @@ class TaskQueueWin : public TaskQueueBase {
timer_tasks_;
UINT_PTR timer_id_ = 0;
WorkerThread thread_;
- rtc::CriticalSection pending_lock_;
+ Mutex pending_lock_;
std::queue<std::unique_ptr<QueuedTask>> pending_
RTC_GUARDED_BY(pending_lock_);
HANDLE in_queue_;
@@ -235,7 +235,7 @@ void TaskQueueWin::Delete() {
}
void TaskQueueWin::PostTask(std::unique_ptr<QueuedTask> task) {
- rtc::CritScope lock(&pending_lock_);
+ MutexLock lock(&pending_lock_);
pending_.push(std::move(task));
::SetEvent(in_queue_);
}
@@ -262,7 +262,7 @@ void TaskQueueWin::RunPendingTasks() {
while (true) {
std::unique_ptr<QueuedTask> task;
{
- rtc::CritScope lock(&pending_lock_);
+ MutexLock lock(&pending_lock_);
if (pending_.empty())
break;
task = std::move(pending_.front());
diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn
index 1882cd9ee8..54f9a048f0 100644
--- a/rtc_base/task_utils/BUILD.gn
+++ b/rtc_base/task_utils/BUILD.gn
@@ -21,9 +21,10 @@ rtc_library("repeating_task") {
"../../api/task_queue",
"../../api/units:time_delta",
"../../api/units:timestamp",
+ "../../system_wrappers:system_wrappers",
"../synchronization:sequence_checker",
- "//third_party/abseil-cpp/absl/memory",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/memory" ]
}
rtc_library("pending_task_safety_flag") {
@@ -81,7 +82,7 @@ if (rtc_include_tests) {
":to_queued_task",
"../../api/task_queue",
"../../test:test_support",
- "//third_party/abseil-cpp/absl/memory",
]
+ absl_deps = [ "//third_party/abseil-cpp/absl/memory" ]
}
}
diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc
index 307d2d594c..4be2131f3f 100644
--- a/rtc_base/task_utils/pending_task_safety_flag.cc
+++ b/rtc_base/task_utils/pending_task_safety_flag.cc
@@ -15,7 +15,7 @@
namespace webrtc {
// static
-PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() {
+rtc::scoped_refptr<PendingTaskSafetyFlag> PendingTaskSafetyFlag::Create() {
return new rtc::RefCountedObject<PendingTaskSafetyFlag>();
}
diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h
index 1b301c8034..580fb3f912 100644
--- a/rtc_base/task_utils/pending_task_safety_flag.h
+++ b/rtc_base/task_utils/pending_task_safety_flag.h
@@ -36,12 +36,17 @@ namespace webrtc {
// MyMethod();
// }));
//
+// Or implicitly by letting ToQueuedTask do the checking:
+//
+// // Running outside of the main thread.
+// my_task_queue_->PostTask(ToQueuedTask(pending_task_safety_flag_,
+// [this]() { MyMethod(); }));
+//
// Note that checking the state only works on the construction/destruction
// thread of the ReceiveStatisticsProxy instance.
class PendingTaskSafetyFlag : public rtc::RefCountInterface {
public:
- using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>;
- static Pointer Create();
+ static rtc::scoped_refptr<PendingTaskSafetyFlag> Create();
~PendingTaskSafetyFlag() = default;
@@ -56,6 +61,25 @@ class PendingTaskSafetyFlag : public rtc::RefCountInterface {
SequenceChecker main_sequence_;
};
+// Makes using PendingTaskSafetyFlag very simple. Automatic PTSF creation
+// and signalling of destruction when the ScopedTaskSafety instance goes out
+// of scope.
+// Should be used by the class that wants tasks dropped after destruction.
+// Requirements are that the instance be constructed and destructed on
+// the same thread as the potentially dropped tasks would be running on.
+class ScopedTaskSafety {
+ public:
+ ScopedTaskSafety() = default;
+ ~ScopedTaskSafety() { flag_->SetNotAlive(); }
+
+ // Returns a new reference to the safety flag.
+ rtc::scoped_refptr<PendingTaskSafetyFlag> flag() const { return flag_; }
+
+ private:
+ rtc::scoped_refptr<PendingTaskSafetyFlag> flag_ =
+ PendingTaskSafetyFlag::Create();
+};
+
} // namespace webrtc
#endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
index 0c1c3c8e52..6df2fe2ffb 100644
--- a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
+++ b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
@@ -29,7 +29,7 @@ using ::testing::Return;
} // namespace
TEST(PendingTaskSafetyFlagTest, Basic) {
- PendingTaskSafetyFlag::Pointer safety_flag;
+ rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag;
{
// Scope for the |owner| instance.
class Owner {
@@ -37,12 +37,27 @@ TEST(PendingTaskSafetyFlagTest, Basic) {
Owner() = default;
~Owner() { flag_->SetNotAlive(); }
- PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+ rtc::scoped_refptr<PendingTaskSafetyFlag> flag_ =
+ PendingTaskSafetyFlag::Create();
} owner;
EXPECT_TRUE(owner.flag_->alive());
safety_flag = owner.flag_;
EXPECT_TRUE(safety_flag->alive());
}
+ // |owner| now out of scope.
+ EXPECT_FALSE(safety_flag->alive());
+}
+
+TEST(PendingTaskSafetyFlagTest, BasicScoped) {
+ rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag;
+ {
+ struct Owner {
+ ScopedTaskSafety safety;
+ } owner;
+ safety_flag = owner.safety.flag();
+ EXPECT_TRUE(safety_flag->alive());
+ }
+ // |owner| now out of scope.
EXPECT_FALSE(safety_flag->alive());
}
@@ -72,7 +87,8 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) {
private:
TaskQueueBase* const tq_main_;
bool stuff_done_ = false;
- PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+ rtc::scoped_refptr<PendingTaskSafetyFlag> flag_{
+ PendingTaskSafetyFlag::Create()};
};
std::unique_ptr<Owner> owner;
@@ -106,22 +122,18 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) {
}
~Owner() {
RTC_DCHECK(tq_main_->IsCurrent());
- flag_->SetNotAlive();
}
void DoStuff() {
RTC_DCHECK(!tq_main_->IsCurrent());
- tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
- if (!safe->alive())
- return;
- *stuff_done_ = true;
- }));
+ tq_main_->PostTask(
+ ToQueuedTask(safety_, [this]() { *stuff_done_ = true; }));
}
private:
TaskQueueBase* const tq_main_;
bool* const stuff_done_;
- PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+ ScopedTaskSafety safety_;
};
std::unique_ptr<Owner> owner;
diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc
index 4e460bb082..574e6331f1 100644
--- a/rtc_base/task_utils/repeating_task.cc
+++ b/rtc_base/task_utils/repeating_task.cc
@@ -17,10 +17,13 @@
namespace webrtc {
namespace webrtc_repeating_task_impl {
+
RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue,
- TimeDelta first_delay)
+ TimeDelta first_delay,
+ Clock* clock)
: task_queue_(task_queue),
- next_run_time_(Timestamp::Micros(rtc::TimeMicros()) + first_delay) {}
+ clock_(clock),
+ next_run_time_(clock_->CurrentTime() + first_delay) {}
RepeatingTaskBase::~RepeatingTaskBase() = default;
@@ -38,7 +41,7 @@ bool RepeatingTaskBase::Run() {
return true;
RTC_DCHECK(delay.IsFinite());
- TimeDelta lost_time = Timestamp::Micros(rtc::TimeMicros()) - next_run_time_;
+ TimeDelta lost_time = clock_->CurrentTime() - next_run_time_;
next_run_time_ += delay;
delay -= lost_time;
delay = std::max(delay, TimeDelta::Zero());
@@ -51,6 +54,7 @@ bool RepeatingTaskBase::Run() {
}
void RepeatingTaskBase::Stop() {
+ RTC_DCHECK_RUN_ON(task_queue_);
RTC_DCHECK(next_run_time_.IsFinite());
next_run_time_ = Timestamp::PlusInfinity();
}
@@ -75,7 +79,6 @@ RepeatingTaskHandle::RepeatingTaskHandle(
void RepeatingTaskHandle::Stop() {
if (repeating_task_) {
- RTC_DCHECK_RUN_ON(repeating_task_->task_queue_);
repeating_task_->Stop();
repeating_task_ = nullptr;
}
diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h
index 1545d6f757..487b7d19d4 100644
--- a/rtc_base/task_utils/repeating_task.h
+++ b/rtc_base/task_utils/repeating_task.h
@@ -19,8 +19,7 @@
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
-#include "rtc_base/synchronization/sequence_checker.h"
-#include "rtc_base/thread_checker.h"
+#include "system_wrappers/include/clock.h"
namespace webrtc {
@@ -29,17 +28,20 @@ class RepeatingTaskHandle;
namespace webrtc_repeating_task_impl {
class RepeatingTaskBase : public QueuedTask {
public:
- RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay);
+ RepeatingTaskBase(TaskQueueBase* task_queue,
+ TimeDelta first_delay,
+ Clock* clock);
~RepeatingTaskBase() override;
- virtual TimeDelta RunClosure() = 0;
+
+ void Stop();
private:
- friend class ::webrtc::RepeatingTaskHandle;
+ virtual TimeDelta RunClosure() = 0;
bool Run() final;
- void Stop() RTC_RUN_ON(task_queue_);
TaskQueueBase* const task_queue_;
+ Clock* const clock_;
// This is always finite, except for the special case where it's PlusInfinity
// to signal that the task should stop.
Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_);
@@ -51,8 +53,9 @@ class RepeatingTaskImpl final : public RepeatingTaskBase {
public:
RepeatingTaskImpl(TaskQueueBase* task_queue,
TimeDelta first_delay,
- Closure&& closure)
- : RepeatingTaskBase(task_queue, first_delay),
+ Closure&& closure,
+ Clock* clock)
+ : RepeatingTaskBase(task_queue, first_delay, clock),
closure_(std::forward<Closure>(closure)) {
static_assert(
std::is_same<TimeDelta,
@@ -61,9 +64,9 @@ class RepeatingTaskImpl final : public RepeatingTaskBase {
"");
}
+ private:
TimeDelta RunClosure() override { return closure_(); }
- private:
typename std::remove_const<
typename std::remove_reference<Closure>::type>::type closure_;
};
@@ -92,10 +95,11 @@ class RepeatingTaskHandle {
// repeated task is owned by the TaskQueue.
template <class Closure>
static RepeatingTaskHandle Start(TaskQueueBase* task_queue,
- Closure&& closure) {
+ Closure&& closure,
+ Clock* clock = Clock::GetRealTimeClock()) {
auto repeating_task = std::make_unique<
webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
- task_queue, TimeDelta::Zero(), std::forward<Closure>(closure));
+ task_queue, TimeDelta::Zero(), std::forward<Closure>(closure), clock);
auto* repeating_task_ptr = repeating_task.get();
task_queue->PostTask(std::move(repeating_task));
return RepeatingTaskHandle(repeating_task_ptr);
@@ -104,12 +108,14 @@ class RepeatingTaskHandle {
// DelayedStart is equivalent to Start except that the first invocation of the
// closure will be delayed by the given amount.
template <class Closure>
- static RepeatingTaskHandle DelayedStart(TaskQueueBase* task_queue,
- TimeDelta first_delay,
- Closure&& closure) {
+ static RepeatingTaskHandle DelayedStart(
+ TaskQueueBase* task_queue,
+ TimeDelta first_delay,
+ Closure&& closure,
+ Clock* clock = Clock::GetRealTimeClock()) {
auto repeating_task = std::make_unique<
webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
- task_queue, first_delay, std::forward<Closure>(closure));
+ task_queue, first_delay, std::forward<Closure>(closure), clock);
auto* repeating_task_ptr = repeating_task.get();
task_queue->PostDelayedTask(std::move(repeating_task), first_delay.ms());
return RepeatingTaskHandle(repeating_task_ptr);
diff --git a/rtc_base/task_utils/repeating_task_unittest.cc b/rtc_base/task_utils/repeating_task_unittest.cc
index 83efb29209..2fb15d1e5a 100644
--- a/rtc_base/task_utils/repeating_task_unittest.cc
+++ b/rtc_base/task_utils/repeating_task_unittest.cc
@@ -40,8 +40,23 @@ void Sleep(TimeDelta time_delta) {
class MockClosure {
public:
- MOCK_METHOD0(Call, TimeDelta());
- MOCK_METHOD0(Delete, void());
+ MOCK_METHOD(TimeDelta, Call, ());
+ MOCK_METHOD(void, Delete, ());
+};
+
+class MockTaskQueue : public TaskQueueBase {
+ public:
+ MockTaskQueue() : task_queue_setter_(this) {}
+
+ MOCK_METHOD(void, Delete, (), (override));
+ MOCK_METHOD(void, PostTask, (std::unique_ptr<QueuedTask> task), (override));
+ MOCK_METHOD(void,
+ PostDelayedTask,
+ (std::unique_ptr<QueuedTask> task, uint32_t milliseconds),
+ (override));
+
+ private:
+ CurrentTaskQueueSetter task_queue_setter_;
};
class MoveOnlyClosure {
@@ -228,4 +243,37 @@ TEST(RepeatingTaskTest, Example) {
// task queue destruction and running the desctructor closure.
}
+TEST(RepeatingTaskTest, ClockIntegration) {
+ std::unique_ptr<QueuedTask> delayed_task;
+ uint32_t expected_ms = 0;
+ SimulatedClock clock(Timestamp::Millis(0));
+
+ NiceMock<MockTaskQueue> task_queue;
+ ON_CALL(task_queue, PostDelayedTask)
+ .WillByDefault(
+ Invoke([&delayed_task, &expected_ms](std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) {
+ EXPECT_EQ(milliseconds, expected_ms);
+ delayed_task = std::move(task);
+ }));
+
+ expected_ms = 100;
+ RepeatingTaskHandle handle = RepeatingTaskHandle::DelayedStart(
+ &task_queue, TimeDelta::Millis(100),
+ [&clock]() {
+ EXPECT_EQ(Timestamp::Millis(100), clock.CurrentTime());
+ // Simulate work happening for 10ms.
+ clock.AdvanceTimeMilliseconds(10);
+ return TimeDelta::Millis(100);
+ },
+ &clock);
+
+ clock.AdvanceTimeMilliseconds(100);
+ QueuedTask* task_to_run = delayed_task.release();
+ expected_ms = 90;
+ EXPECT_FALSE(task_to_run->Run());
+ EXPECT_NE(nullptr, delayed_task.get());
+ handle.Stop();
+}
+
} // namespace webrtc
diff --git a/rtc_base/task_utils/to_queued_task.h b/rtc_base/task_utils/to_queued_task.h
index cc9325ebd6..07ab0ebe26 100644
--- a/rtc_base/task_utils/to_queued_task.h
+++ b/rtc_base/task_utils/to_queued_task.h
@@ -39,7 +39,7 @@ class ClosureTask : public QueuedTask {
template <typename Closure>
class SafetyClosureTask : public QueuedTask {
public:
- explicit SafetyClosureTask(PendingTaskSafetyFlag::Pointer safety,
+ explicit SafetyClosureTask(rtc::scoped_refptr<PendingTaskSafetyFlag> safety,
Closure&& closure)
: closure_(std::forward<Closure>(closure)),
safety_flag_(std::move(safety)) {}
@@ -52,7 +52,7 @@ class SafetyClosureTask : public QueuedTask {
}
typename std::decay<Closure>::type closure_;
- PendingTaskSafetyFlag::Pointer safety_flag_;
+ rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag_;
};
// Extends ClosureTask to also allow specifying cleanup code.
@@ -81,13 +81,25 @@ std::unique_ptr<QueuedTask> ToQueuedTask(Closure&& closure) {
}
template <typename Closure>
-std::unique_ptr<QueuedTask> ToQueuedTask(PendingTaskSafetyFlag::Pointer safety,
- Closure&& closure) {
+std::unique_ptr<QueuedTask> ToQueuedTask(
+ rtc::scoped_refptr<PendingTaskSafetyFlag> safety,
+ Closure&& closure) {
return std::make_unique<webrtc_new_closure_impl::SafetyClosureTask<Closure>>(
std::move(safety), std::forward<Closure>(closure));
}
-template <typename Closure, typename Cleanup>
+template <typename Closure>
+std::unique_ptr<QueuedTask> ToQueuedTask(const ScopedTaskSafety& safety,
+ Closure&& closure) {
+ return ToQueuedTask(safety.flag(), std::forward<Closure>(closure));
+}
+
+template <typename Closure,
+ typename Cleanup,
+ typename std::enable_if<!std::is_same<
+ typename std::remove_const<
+ typename std::remove_reference<Closure>::type>::type,
+ ScopedTaskSafety>::value>::type* = nullptr>
std::unique_ptr<QueuedTask> ToQueuedTask(Closure&& closure, Cleanup&& cleanup) {
return std::make_unique<
webrtc_new_closure_impl::ClosureTaskWithCleanup<Closure, Cleanup>>(
diff --git a/rtc_base/task_utils/to_queued_task_unittest.cc b/rtc_base/task_utils/to_queued_task_unittest.cc
index e98c81e9ce..261b9e891b 100644
--- a/rtc_base/task_utils/to_queued_task_unittest.cc
+++ b/rtc_base/task_utils/to_queued_task_unittest.cc
@@ -127,7 +127,8 @@ TEST(ToQueuedTaskTest, AcceptsMoveOnlyCleanup) {
}
TEST(ToQueuedTaskTest, PendingTaskSafetyFlag) {
- PendingTaskSafetyFlag::Pointer flag(PendingTaskSafetyFlag::Create());
+ rtc::scoped_refptr<PendingTaskSafetyFlag> flag =
+ PendingTaskSafetyFlag::Create();
int count = 0;
// Create two identical tasks that increment the |count|.
diff --git a/rtc_base/test_client.cc b/rtc_base/test_client.cc
index e5aa9d7987..f23ac2aec0 100644
--- a/rtc_base/test_client.cc
+++ b/rtc_base/test_client.cc
@@ -75,7 +75,7 @@ std::unique_ptr<TestClient::Packet> TestClient::NextPacket(int timeout_ms) {
int64_t end = TimeAfter(timeout_ms);
while (TimeUntil(end) > 0) {
{
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
if (packets_.size() != 0) {
break;
}
@@ -85,7 +85,7 @@ std::unique_ptr<TestClient::Packet> TestClient::NextPacket(int timeout_ms) {
// Return the first packet placed in the queue.
std::unique_ptr<Packet> packet;
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
if (packets_.size() > 0) {
packet = std::move(packets_.front());
packets_.erase(packets_.begin());
@@ -149,7 +149,7 @@ void TestClient::OnPacket(AsyncPacketSocket* socket,
size_t size,
const SocketAddress& remote_addr,
const int64_t& packet_time_us) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
packets_.push_back(
std::make_unique<Packet>(remote_addr, buf, size, packet_time_us));
}
diff --git a/rtc_base/test_client.h b/rtc_base/test_client.h
index b45cf005bb..6989fe1d57 100644
--- a/rtc_base/test_client.h
+++ b/rtc_base/test_client.h
@@ -16,8 +16,8 @@
#include "rtc_base/async_udp_socket.h"
#include "rtc_base/constructor_magic.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/fake_clock.h"
+#include "rtc_base/synchronization/mutex.h"
namespace rtc {
@@ -105,7 +105,7 @@ class TestClient : public sigslot::has_slots<> {
void AdvanceTime(int ms);
ThreadProcessingFakeClock* fake_clock_ = nullptr;
- CriticalSection crit_;
+ webrtc::Mutex mutex_;
std::unique_ptr<AsyncPacketSocket> socket_;
std::vector<std::unique_ptr<Packet>> packets_;
int ready_to_send_count_ = 0;
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index 0fb2e813e0..2882f50da3 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -31,9 +31,10 @@
#include "absl/algorithm/container.h"
#include "rtc_base/atomic_ops.h"
#include "rtc_base/checks.h"
-#include "rtc_base/critical_section.h"
+#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/logging.h"
#include "rtc_base/null_socket_server.h"
+#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h"
@@ -87,8 +88,8 @@ class MessageHandlerWithTask final : public MessageHandler {
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
public:
- MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
- RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
+ MarkProcessingCritScope(const RecursiveCriticalSection* cs,
+ size_t* processing) RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
: cs_(cs), processing_(processing) {
cs_->Enter();
*processing_ += 1;
@@ -100,7 +101,7 @@ class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
}
private:
- const CriticalSection* const cs_;
+ const RecursiveCriticalSection* const cs_;
size_t* processing_;
RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
@@ -168,8 +169,8 @@ void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
// We check the pre-existing who-sends-to-who graph for any path from target
// to source. This loop is guaranteed to terminate because per the send graph
// invariant, there are no cycles in the graph.
- for (auto it = all_targets.begin(); it != all_targets.end(); ++it) {
- const auto& targets = send_graph_[*it];
+ for (size_t i = 0; i < all_targets.size(); i++) {
+ const auto& targets = send_graph_[all_targets[i]];
all_targets.insert(all_targets.end(), targets.begin(), targets.end());
}
RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
@@ -296,6 +297,21 @@ void ThreadManager::SetCurrentThread(Thread* thread) {
RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?";
}
#endif // RTC_DLOG_IS_ON
+
+ if (thread) {
+ thread->EnsureIsCurrentTaskQueue();
+ } else {
+ Thread* current = CurrentThread();
+ if (current) {
+ // The current thread is being cleared, e.g. as a result of
+ // UnwrapCurrent() being called or when a thread is being stopped
+ // (see PreRun()). This signals that the Thread instance is being detached
+ // from the thread, which also means that TaskQueue::Current() must not
+ // return a pointer to the Thread instance.
+ current->ClearCurrentTaskQueue();
+ }
+ }
+
SetCurrentThreadInternal(thread);
}
@@ -824,7 +840,6 @@ void* Thread::PreRun(void* pv) {
Thread* thread = static_cast<Thread*>(pv);
ThreadManager::Instance()->SetCurrentThread(thread);
rtc::SetCurrentThreadName(thread->name_.c_str());
- CurrentTaskQueueSetter set_current_task_queue(thread);
#if defined(WEBRTC_MAC)
ScopedAutoReleasePool pool;
#endif
@@ -878,6 +893,7 @@ void Thread::Send(const Location& posted_from,
AutoThread thread;
Thread* current_thread = Thread::Current();
RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this
+ RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
#if RTC_DCHECK_IS_ON
ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
this);
@@ -935,6 +951,17 @@ void Thread::InvokeInternal(const Location& posted_from,
Send(posted_from, &handler);
}
+// Called by the ThreadManager when being set as the current thread.
+void Thread::EnsureIsCurrentTaskQueue() {
+ task_queue_registration_ =
+ std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this);
+}
+
+// Called by the ThreadManager when being set as the current thread.
+void Thread::ClearCurrentTaskQueue() {
+ task_queue_registration_.reset();
+}
+
void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
RTC_DCHECK(msg);
auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata);
@@ -949,6 +976,50 @@ void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
task.release();
}
+void Thread::AllowInvokesToThread(Thread* thread) {
+#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
+ if (!IsCurrent()) {
+ PostTask(webrtc::ToQueuedTask(
+ [thread, this]() { AllowInvokesToThread(thread); }));
+ return;
+ }
+ RTC_DCHECK_RUN_ON(this);
+ allowed_threads_.push_back(thread);
+ invoke_policy_enabled_ = true;
+#endif
+}
+
+void Thread::DisallowAllInvokes() {
+#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
+ if (!IsCurrent()) {
+ PostTask(webrtc::ToQueuedTask([this]() { DisallowAllInvokes(); }));
+ return;
+ }
+ RTC_DCHECK_RUN_ON(this);
+ allowed_threads_.clear();
+ invoke_policy_enabled_ = true;
+#endif
+}
+
+// Returns true if no policies added or if there is at least one policy
+// that permits invocation to |target| thread.
+bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
+#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
+ RTC_DCHECK_RUN_ON(this);
+ if (!invoke_policy_enabled_) {
+ return true;
+ }
+ for (const auto* thread : allowed_threads_) {
+ if (thread == target) {
+ return true;
+ }
+ }
+ return false;
+#else
+ return true;
+#endif
+}
+
void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
// Though Post takes MessageData by raw pointer (last parameter), it still
// takes it with ownership.
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index 74aab623c8..27a5b7b510 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -29,7 +29,7 @@
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/constructor_magic.h"
-#include "rtc_base/critical_section.h"
+#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/location.h"
#include "rtc_base/message_handler.h"
#include "rtc_base/platform_thread_types.h"
@@ -140,7 +140,7 @@ class RTC_EXPORT ThreadManager {
// Methods that don't modify the list of message queues may be called in a
// re-entrant fashion. "processing_" keeps track of the depth of re-entrant
// calls.
- CriticalSection crit_;
+ RecursiveCriticalSection crit_;
size_t processing_ RTC_GUARDED_BY(crit_) = 0;
#if RTC_DCHECK_IS_ON
// Represents all thread seand actions by storing all send targets per thread.
@@ -338,6 +338,18 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
InvokeInternal(posted_from, functor);
}
+ // Allows invoke to specified |thread|. Thread never will be dereferenced and
+ // will be used only for reference-based comparison, so instance can be safely
+ // deleted. If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing.
+ void AllowInvokesToThread(Thread* thread);
+ // If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing.
+ void DisallowAllInvokes();
+ // Returns true if |target| was allowed by AllowInvokesToThread() or if no
+ // calls were made to AllowInvokesToThread and DisallowAllInvokes. Otherwise
+ // returns false.
+ // If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined always returns true.
+ bool IsInvokeToThreadAllowed(rtc::Thread* target);
+
// Posts a task to invoke the functor on |this| thread asynchronously, i.e.
// without blocking the thread that invoked PostTask(). Ownership of |functor|
// is passed and (usually, see below) destroyed on |this| thread after it is
@@ -519,7 +531,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
friend class ScopedDisallowBlockingCalls;
- CriticalSection* CritForTest() { return &crit_; }
+ RecursiveCriticalSection* CritForTest() { return &crit_; }
private:
class QueuedTaskHandler final : public MessageHandler {
@@ -551,6 +563,12 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
void InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor);
+ // Called by the ThreadManager when being set as the current thread.
+ void EnsureIsCurrentTaskQueue();
+
+ // Called by the ThreadManager when being unset as the current thread.
+ void ClearCurrentTaskQueue();
+
// Returns a static-lifetime MessageHandler which runs message with
// MessageLikeTask payload data.
static MessageHandler* GetPostTaskMessageHandler();
@@ -560,7 +578,11 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
MessageList messages_ RTC_GUARDED_BY(crit_);
PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
- CriticalSection crit_;
+#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
+ std::vector<Thread*> allowed_threads_ RTC_GUARDED_BY(this);
+ bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false;
+#endif
+ RecursiveCriticalSection crit_;
bool fInitialized_;
bool fDestroyed_;
@@ -595,6 +617,8 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// Runs webrtc::QueuedTask posted to the Thread.
QueuedTaskHandler queued_task_handler_;
+ std::unique_ptr<TaskQueueBase::CurrentTaskQueueSetter>
+ task_queue_registration_;
friend class ThreadManager;
diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc
index d53a387914..d3cae34dfa 100644
--- a/rtc_base/thread_unittest.cc
+++ b/rtc_base/thread_unittest.cc
@@ -22,12 +22,14 @@
#include "rtc_base/null_socket_server.h"
#include "rtc_base/physical_socket_server.h"
#include "rtc_base/socket_address.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "test/testsupport/rtc_expect_death.h"
#if defined(WEBRTC_WIN)
#include <comdef.h> // NOLINT
+
#endif
namespace rtc {
@@ -161,17 +163,17 @@ class AtomicBool {
public:
explicit AtomicBool(bool value = false) : flag_(value) {}
AtomicBool& operator=(bool value) {
- CritScope scoped_lock(&cs_);
+ webrtc::MutexLock scoped_lock(&mutex_);
flag_ = value;
return *this;
}
bool get() const {
- CritScope scoped_lock(&cs_);
+ webrtc::MutexLock scoped_lock(&mutex_);
return flag_;
}
private:
- CriticalSection cs_;
+ mutable webrtc::Mutex mutex_;
bool flag_;
};
@@ -288,6 +290,63 @@ TEST(ThreadTest, Wrap) {
ThreadManager::Instance()->SetCurrentThread(current_thread);
}
+#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
+TEST(ThreadTest, InvokeToThreadAllowedReturnsTrueWithoutPolicies) {
+ // Create and start the thread.
+ auto thread1 = Thread::CreateWithSocketServer();
+ auto thread2 = Thread::CreateWithSocketServer();
+
+ thread1->PostTask(ToQueuedTask(
+ [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
+ Thread* th_main = Thread::Current();
+ th_main->ProcessMessages(100);
+}
+
+TEST(ThreadTest, InvokeAllowedWhenThreadsAdded) {
+ // Create and start the thread.
+ auto thread1 = Thread::CreateWithSocketServer();
+ auto thread2 = Thread::CreateWithSocketServer();
+ auto thread3 = Thread::CreateWithSocketServer();
+ auto thread4 = Thread::CreateWithSocketServer();
+
+ thread1->AllowInvokesToThread(thread2.get());
+ thread1->AllowInvokesToThread(thread3.get());
+
+ thread1->PostTask(ToQueuedTask([&]() {
+ EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get()));
+ EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get()));
+ EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get()));
+ }));
+ Thread* th_main = Thread::Current();
+ th_main->ProcessMessages(100);
+}
+
+TEST(ThreadTest, InvokesDisallowedWhenDisallowAllInvokes) {
+ // Create and start the thread.
+ auto thread1 = Thread::CreateWithSocketServer();
+ auto thread2 = Thread::CreateWithSocketServer();
+
+ thread1->DisallowAllInvokes();
+
+ thread1->PostTask(ToQueuedTask([&]() {
+ EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get()));
+ }));
+ Thread* th_main = Thread::Current();
+ th_main->ProcessMessages(100);
+}
+#endif // (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
+
+TEST(ThreadTest, InvokesAllowedByDefault) {
+ // Create and start the thread.
+ auto thread1 = Thread::CreateWithSocketServer();
+ auto thread2 = Thread::CreateWithSocketServer();
+
+ thread1->PostTask(ToQueuedTask(
+ [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
+ Thread* th_main = Thread::Current();
+ th_main->ProcessMessages(100);
+}
+
TEST(ThreadTest, Invoke) {
// Create and start the thread.
auto thread = Thread::CreateWithSocketServer();
@@ -356,18 +415,18 @@ TEST(ThreadTest, ThreeThreadsInvoke) {
explicit LockedBool(bool value) : value_(value) {}
void Set(bool value) {
- CritScope lock(&crit_);
+ webrtc::MutexLock lock(&mutex_);
value_ = value;
}
bool Get() {
- CritScope lock(&crit_);
+ webrtc::MutexLock lock(&mutex_);
return value_;
}
private:
- CriticalSection crit_;
- bool value_ RTC_GUARDED_BY(crit_);
+ webrtc::Mutex mutex_;
+ bool value_ RTC_GUARDED_BY(mutex_);
};
struct LocalFuncs {
@@ -390,7 +449,6 @@ TEST(ThreadTest, ThreeThreadsInvoke) {
Thread* thread1,
Thread* thread2,
LockedBool* out) {
- CriticalSection crit;
LockedBool async_invoked(false);
invoker->AsyncInvoke<void>(
@@ -780,105 +838,6 @@ TEST_F(AsyncInvokeTest, FlushWithIds) {
EXPECT_TRUE(flag2.get());
}
-class GuardedAsyncInvokeTest : public ::testing::Test {
- public:
- void IntCallback(int value) {
- EXPECT_EQ(expected_thread_, Thread::Current());
- int_value_ = value;
- }
- void SetExpectedThreadForIntCallback(Thread* thread) {
- expected_thread_ = thread;
- }
-
- protected:
- constexpr static int kWaitTimeout = 1000;
- GuardedAsyncInvokeTest() : int_value_(0), expected_thread_(nullptr) {}
-
- int int_value_;
- Thread* expected_thread_;
-};
-
-// Functor for creating an invoker.
-struct CreateInvoker {
- CreateInvoker(std::unique_ptr<GuardedAsyncInvoker>* invoker)
- : invoker_(invoker) {}
- void operator()() { invoker_->reset(new GuardedAsyncInvoker()); }
- std::unique_ptr<GuardedAsyncInvoker>* invoker_;
-};
-
-// Test that we can call AsyncInvoke<void>() after the thread died.
-TEST_F(GuardedAsyncInvokeTest, KillThreadFireAndForget) {
- // Create and start the thread.
- std::unique_ptr<Thread> thread(Thread::Create());
- thread->Start();
- std::unique_ptr<GuardedAsyncInvoker> invoker;
- // Create the invoker on |thread|.
- thread->Invoke<void>(RTC_FROM_HERE, CreateInvoker(&invoker));
- // Kill |thread|.
- thread = nullptr;
- // Try calling functor.
- AtomicBool called;
- EXPECT_FALSE(invoker->AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&called)));
- // With thread gone, nothing should happen.
- WAIT(called.get(), kWaitTimeout);
- EXPECT_FALSE(called.get());
-}
-
-// The remaining tests check that GuardedAsyncInvoker behaves as AsyncInvoker
-// when Thread is still alive.
-TEST_F(GuardedAsyncInvokeTest, FireAndForget) {
- GuardedAsyncInvoker invoker;
- // Try calling functor.
- AtomicBool called;
- EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&called)));
- EXPECT_TRUE_WAIT(called.get(), kWaitTimeout);
-}
-
-TEST_F(GuardedAsyncInvokeTest, NonCopyableFunctor) {
- GuardedAsyncInvoker invoker;
- // Try calling functor.
- AtomicBool called;
- EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorD(&called)));
- EXPECT_TRUE_WAIT(called.get(), kWaitTimeout);
-}
-
-TEST_F(GuardedAsyncInvokeTest, Flush) {
- GuardedAsyncInvoker invoker;
- AtomicBool flag1;
- AtomicBool flag2;
- // Queue two async calls to the current thread.
- EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&flag1)));
- EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&flag2)));
- // Because we haven't pumped messages, these should not have run yet.
- EXPECT_FALSE(flag1.get());
- EXPECT_FALSE(flag2.get());
- // Force them to run now.
- EXPECT_TRUE(invoker.Flush());
- EXPECT_TRUE(flag1.get());
- EXPECT_TRUE(flag2.get());
-}
-
-TEST_F(GuardedAsyncInvokeTest, FlushWithIds) {
- GuardedAsyncInvoker invoker;
- AtomicBool flag1;
- AtomicBool flag2;
- // Queue two async calls to the current thread, one with a message id.
- EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&flag1), 5));
- EXPECT_TRUE(invoker.AsyncInvoke<void>(RTC_FROM_HERE, FunctorB(&flag2)));
- // Because we haven't pumped messages, these should not have run yet.
- EXPECT_FALSE(flag1.get());
- EXPECT_FALSE(flag2.get());
- // Execute pending calls with id == 5.
- EXPECT_TRUE(invoker.Flush(5));
- EXPECT_TRUE(flag1.get());
- EXPECT_FALSE(flag2.get());
- flag1 = false;
- // Execute all pending calls. The id == 5 call should not execute again.
- EXPECT_TRUE(invoker.Flush());
- EXPECT_FALSE(flag1.get());
- EXPECT_TRUE(flag2.get());
-}
-
void ThreadIsCurrent(Thread* thread, bool* result, Event* event) {
*result = thread->IsCurrent();
event->Set();
@@ -1148,6 +1107,18 @@ TEST(ThreadPostDelayedTaskTest, InvokesInDelayOrder) {
EXPECT_TRUE(fourth.Wait(0));
}
+TEST(ThreadPostDelayedTaskTest, IsCurrentTaskQueue) {
+ auto current_tq = webrtc::TaskQueueBase::Current();
+ {
+ std::unique_ptr<rtc::Thread> thread(rtc::Thread::Create());
+ thread->WrapCurrent();
+ EXPECT_EQ(webrtc::TaskQueueBase::Current(),
+ static_cast<webrtc::TaskQueueBase*>(thread.get()));
+ thread->UnwrapCurrent();
+ }
+ EXPECT_EQ(webrtc::TaskQueueBase::Current(), current_tq);
+}
+
class ThreadFactory : public webrtc::TaskQueueFactory {
public:
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
diff --git a/rtc_base/time_utils.cc b/rtc_base/time_utils.cc
index 8d919262d3..11c9d5a47f 100644
--- a/rtc_base/time_utils.cc
+++ b/rtc_base/time_utils.cc
@@ -247,7 +247,7 @@ int64_t TimestampWrapAroundHandler::Unwrap(uint32_t ts) {
++num_wrap_;
} else if ((ts - last_ts_) > 0xf0000000) {
// Backwards wrap. Unwrap with last wrap count and don't update last_ts_.
- return ts + ((num_wrap_ - 1) << 32);
+ return ts + (num_wrap_ - 1) * (int64_t{1} << 32);
}
last_ts_ = ts;
diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc
index d42873e18b..3d412d66cc 100644
--- a/rtc_base/virtual_socket_server.cc
+++ b/rtc_base/virtual_socket_server.cc
@@ -19,6 +19,7 @@
#include "absl/algorithm/container.h"
#include "rtc_base/checks.h"
+#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/fake_clock.h"
#include "rtc_base/logging.h"
#include "rtc_base/physical_socket_server.h"
diff --git a/rtc_base/virtual_socket_server.h b/rtc_base/virtual_socket_server.h
index f45fabf0af..84f8fb1bdc 100644
--- a/rtc_base/virtual_socket_server.h
+++ b/rtc_base/virtual_socket_server.h
@@ -17,6 +17,7 @@
#include "rtc_base/checks.h"
#include "rtc_base/constructor_magic.h"
+#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/event.h"
#include "rtc_base/fake_clock.h"
#include "rtc_base/message_handler.h"
@@ -294,7 +295,7 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
std::map<rtc::IPAddress, rtc::IPAddress> alternative_address_mapping_;
std::unique_ptr<Function> delay_dist_;
- CriticalSection delay_crit_;
+ RecursiveCriticalSection delay_crit_;
double drop_prob_;
bool sending_blocked_ = false;
@@ -379,7 +380,7 @@ class VirtualSocket : public AsyncSocket,
bool ready_to_send_ = true;
// Critical section to protect the recv_buffer and queue_
- CriticalSection crit_;
+ RecursiveCriticalSection crit_;
// Network model that enforces bandwidth and capacity constraints
NetworkQueue network_;
diff --git a/rtc_base/win32_socket_server.cc b/rtc_base/win32_socket_server.cc
index 8a5b93a608..cfe21a3630 100644
--- a/rtc_base/win32_socket_server.cc
+++ b/rtc_base/win32_socket_server.cc
@@ -733,7 +733,7 @@ bool Win32SocketServer::Wait(int cms, bool process_io) {
MSG msg;
b = GetMessage(&msg, nullptr, s_wm_wakeup_id, s_wm_wakeup_id);
{
- CritScope scope(&cs_);
+ webrtc::MutexLock lock(&mutex_);
posted_ = false;
}
} else {
@@ -747,7 +747,7 @@ void Win32SocketServer::WakeUp() {
if (wnd_.handle()) {
// Set the "message pending" flag, if not already set.
{
- CritScope scope(&cs_);
+ webrtc::MutexLock lock(&mutex_);
if (posted_)
return;
posted_ = true;
@@ -760,7 +760,7 @@ void Win32SocketServer::WakeUp() {
void Win32SocketServer::Pump() {
// Clear the "message pending" flag.
{
- CritScope scope(&cs_);
+ webrtc::MutexLock lock(&mutex_);
posted_ = false;
}
diff --git a/rtc_base/win32_socket_server.h b/rtc_base/win32_socket_server.h
index 92fd68cd83..317acce0d2 100644
--- a/rtc_base/win32_socket_server.h
+++ b/rtc_base/win32_socket_server.h
@@ -13,10 +13,10 @@
#if defined(WEBRTC_WIN)
#include "rtc_base/async_socket.h"
-#include "rtc_base/critical_section.h"
#include "rtc_base/socket.h"
#include "rtc_base/socket_factory.h"
#include "rtc_base/socket_server.h"
+#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread.h"
#include "rtc_base/win32_window.h"
@@ -123,7 +123,7 @@ class Win32SocketServer : public SocketServer {
static const wchar_t kWindowName[];
Thread* message_queue_;
MessageWindow wnd_;
- CriticalSection cs_;
+ webrtc::Mutex mutex_;
bool posted_;
HWND hdlg_;
};