diff options
-rw-r--r-- | BUILD.gn | 1 | ||||
-rw-r--r-- | call/call_perf_tests.cc | 53 | ||||
-rw-r--r-- | call/rtp_video_sender_unittest.cc | 16 | ||||
-rw-r--r-- | modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc | 18 | ||||
-rw-r--r-- | rtc_base/task_utils/BUILD.gn | 27 | ||||
-rw-r--r-- | rtc_base/task_utils/pending_task_safety_flag.cc | 32 | ||||
-rw-r--r-- | rtc_base/task_utils/pending_task_safety_flag.h | 61 | ||||
-rw-r--r-- | rtc_base/task_utils/pending_task_safety_flag_unittest.cc | 151 | ||||
-rw-r--r-- | test/scenario/call_client.h | 6 | ||||
-rw-r--r-- | test/scenario/stats_collection_unittest.cc | 19 | ||||
-rw-r--r-- | video/BUILD.gn | 1 | ||||
-rw-r--r-- | video/end_to_end_tests/retransmission_tests.cc | 60 | ||||
-rw-r--r-- | video/end_to_end_tests/stats_tests.cc | 59 | ||||
-rw-r--r-- | video/receive_statistics_proxy.cc | 115 | ||||
-rw-r--r-- | video/receive_statistics_proxy.h | 32 | ||||
-rw-r--r-- | video/receive_statistics_proxy_unittest.cc | 64 | ||||
-rw-r--r-- | video/video_quality_observer.cc | 4 | ||||
-rw-r--r-- | video/video_receive_stream.cc | 29 | ||||
-rw-r--r-- | video/video_receive_stream.h | 7 |
19 files changed, 100 insertions, 655 deletions
@@ -547,7 +547,6 @@ if (rtc_include_tests) { "rtc_base:weak_ptr_unittests", "rtc_base/experiments:experiments_unittests", "rtc_base/synchronization:sequence_checker_unittests", - "rtc_base/task_utils:pending_task_safety_flag_unittests", "rtc_base/task_utils:to_queued_task_unittests", "sdk:sdk_tests", "test:rtp_test_utils", diff --git a/call/call_perf_tests.cc b/call/call_perf_tests.cc index 123be7da4c..2d23087cc8 100644 --- a/call/call_perf_tests.cc +++ b/call/call_perf_tests.cc @@ -96,24 +96,21 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver, static const int kMinRunTimeMs = 30000; public: - explicit VideoRtcpAndSyncObserver(TaskQueueBase* task_queue, - Clock* clock, - const std::string& test_label) + explicit VideoRtcpAndSyncObserver(Clock* clock, const std::string& test_label) : test::RtpRtcpObserver(CallPerfTest::kLongTimeoutMs), clock_(clock), test_label_(test_label), creation_time_ms_(clock_->TimeInMilliseconds()), - task_queue_(task_queue) {} + first_time_in_sync_(-1), + receive_stream_(nullptr) {} void OnFrame(const VideoFrame& video_frame) override { - task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); })); - } - - void CheckStats() { - if (!receive_stream_) - return; - - VideoReceiveStream::Stats stats = receive_stream_->GetStats(); + VideoReceiveStream::Stats stats; + { + rtc::CritScope lock(&crit_); + if (receive_stream_) + stats = receive_stream_->GetStats(); + } if (stats.sync_offset_ms == std::numeric_limits<int>::max()) return; @@ -138,8 +135,7 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver, } void set_receive_stream(VideoReceiveStream* receive_stream) { - RTC_DCHECK_EQ(task_queue_, TaskQueueBase::Current()); - // Note that receive_stream may be nullptr. + rtc::CritScope lock(&crit_); receive_stream_ = receive_stream; } @@ -152,10 +148,10 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver, Clock* const clock_; std::string test_label_; const int64_t creation_time_ms_; - int64_t first_time_in_sync_ = -1; - VideoReceiveStream* receive_stream_ = nullptr; + int64_t first_time_in_sync_; + rtc::CriticalSection crit_; + VideoReceiveStream* receive_stream_ RTC_GUARDED_BY(crit_); std::vector<double> sync_offset_ms_list_; - TaskQueueBase* const task_queue_; }; void CallPerfTest::TestAudioVideoSync(FecMode fec, @@ -172,8 +168,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, audio_net_config.queue_delay_ms = 500; audio_net_config.loss_percent = 5; - auto observer = std::make_unique<VideoRtcpAndSyncObserver>( - task_queue(), Clock::GetRealTimeClock(), test_label); + VideoRtcpAndSyncObserver observer(Clock::GetRealTimeClock(), test_label); std::map<uint8_t, MediaType> audio_pt_map; std::map<uint8_t, MediaType> video_pt_map; @@ -223,7 +218,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, }); audio_send_transport = std::make_unique<test::PacketTransport>( - task_queue(), sender_call_.get(), observer.get(), + task_queue(), sender_call_.get(), &observer, test::PacketTransport::kSender, audio_pt_map, std::make_unique<FakeNetworkPipe>( Clock::GetRealTimeClock(), @@ -231,7 +226,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, audio_send_transport->SetReceiver(receiver_call_->Receiver()); video_send_transport = std::make_unique<test::PacketTransport>( - task_queue(), sender_call_.get(), observer.get(), + task_queue(), sender_call_.get(), &observer, test::PacketTransport::kSender, video_pt_map, std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(), std::make_unique<SimulatedNetwork>( @@ -239,7 +234,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, video_send_transport->SetReceiver(receiver_call_->Receiver()); receive_transport = std::make_unique<test::PacketTransport>( - task_queue(), receiver_call_.get(), observer.get(), + task_queue(), receiver_call_.get(), &observer, test::PacketTransport::kReceiver, payload_type_map_, std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(), std::make_unique<SimulatedNetwork>( @@ -264,7 +259,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, video_receive_configs_[0].rtp.ulpfec_payload_type = kUlpfecPayloadType; } video_receive_configs_[0].rtp.nack.rtp_history_ms = 1000; - video_receive_configs_[0].renderer = observer.get(); + video_receive_configs_[0].renderer = &observer; video_receive_configs_[0].sync_group = kSyncGroup; AudioReceiveStream::Config audio_recv_config; @@ -286,7 +281,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, receiver_call_->CreateAudioReceiveStream(audio_recv_config); } EXPECT_EQ(1u, video_receive_streams_.size()); - observer->set_receive_stream(video_receive_streams_[0]); + observer.set_receive_stream(video_receive_streams_[0]); drifting_clock = std::make_unique<DriftingClock>(clock_, video_ntp_speed); CreateFrameGeneratorCapturerWithDrift(drifting_clock.get(), video_rtp_speed, kDefaultFramerate, kDefaultWidth, @@ -298,13 +293,10 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, audio_receive_stream->Start(); }); - EXPECT_TRUE(observer->Wait()) + EXPECT_TRUE(observer.Wait()) << "Timed out while waiting for audio and video to be synchronized."; SendTask(RTC_FROM_HERE, task_queue(), [&]() { - // Clear the pointer to the receive stream since it will now be deleted. - observer->set_receive_stream(nullptr); - audio_send_stream->Stop(); audio_receive_stream->Stop(); @@ -322,7 +314,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, DestroyCalls(); }); - observer->PrintResults(); + observer.PrintResults(); // In quick test synchronization may not be achieved in time. if (!field_trial::IsEnabled("WebRTC-QuickPerfTest")) { @@ -331,9 +323,6 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec, EXPECT_METRIC_EQ(1, metrics::NumSamples("WebRTC.Video.AVSyncOffsetInMs")); #endif } - - task_queue()->PostTask( - ToQueuedTask([to_delete = observer.release()]() { delete to_delete; })); } TEST_F(CallPerfTest, PlaysOutAudioAndVideoInSyncWithoutClockDrift) { diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index b6fbf77e6d..71bec5e7bb 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -510,9 +510,9 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { test::NetworkSimulationConfig net_conf; net_conf.bandwidth = DataRate::KilobitsPerSec(300); auto send_node = s.CreateSimulationNode(net_conf); - auto* callee = s.CreateClient("return", call_conf); auto* route = s.CreateRoutes(s.CreateClient("send", call_conf), {send_node}, - callee, {s.CreateSimulationNode(net_conf)}); + s.CreateClient("return", call_conf), + {s.CreateSimulationNode(net_conf)}); test::VideoStreamConfig lossy_config; lossy_config.source.framerate = 5; @@ -540,20 +540,14 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { // from initial probing. s.RunFor(TimeDelta::Seconds(1)); rtx_packets = 0; - int decoded_baseline = 0; - callee->SendTask([&decoded_baseline, &lossy]() { - decoded_baseline = lossy->receive()->GetStats().frames_decoded; - }); + int decoded_baseline = lossy->receive()->GetStats().frames_decoded; s.RunFor(TimeDelta::Seconds(1)); // We expect both that RTX packets were sent and that an appropriate number of // frames were received. This is somewhat redundant but reduces the risk of // false positives in future regressions (e.g. RTX is send due to probing). EXPECT_GE(rtx_packets, 1); - int frames_decoded = 0; - callee->SendTask([&decoded_baseline, &frames_decoded, &lossy]() { - frames_decoded = - lossy->receive()->GetStats().frames_decoded - decoded_baseline; - }); + int frames_decoded = + lossy->receive()->GetStats().frames_decoded - decoded_baseline; EXPECT_EQ(frames_decoded, 5); } diff --git a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc index 361da92ff2..1083214fa5 100644 --- a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc +++ b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc @@ -537,8 +537,8 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) { auto ret_net = {s.CreateSimulationNode(net_conf)}; auto* client = s.CreateClient("send", CallClientConfig()); - auto* callee = s.CreateClient("return", CallClientConfig()); - auto* route = s.CreateRoutes(client, send_net, callee, ret_net); + auto* route = s.CreateRoutes( + client, send_net, s.CreateClient("return", CallClientConfig()), ret_net); // TODO(srte): Make this work with RTX enabled or remove it. auto* video = s.CreateVideoStream(route->forward(), [](VideoStreamConfig* c) { c->stream.use_rtx = false; @@ -553,17 +553,9 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) { s.net()->StopCrossTraffic(tcp_traffic); s.RunFor(TimeDelta::Seconds(20)); } - - // Querying the video stats from within the expected runtime environment - // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that - // we're currently on). - VideoReceiveStream::Stats video_receive_stats; - auto* video_stream = video->receive(); - callee->SendTask([&video_stream, &video_receive_stats]() { - video_receive_stats = video_stream->GetStats(); - }); - return DataSize::Bytes( - video_receive_stats.rtp_stats.packet_counter.TotalBytes()) / + return DataSize::Bytes(video->receive() + ->GetStats() + .rtp_stats.packet_counter.TotalBytes()) / s.TimeSinceStart(); } diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn index 8409aa29e5..2e7d53ceb2 100644 --- a/rtc_base/task_utils/BUILD.gn +++ b/rtc_base/task_utils/BUILD.gn @@ -26,39 +26,12 @@ rtc_library("repeating_task") { ] } -rtc_library("pending_task_safety_flag") { - sources = [ - "pending_task_safety_flag.cc", - "pending_task_safety_flag.h", - ] - deps = [ - "..:checks", - "..:refcount", - "..:thread_checker", - "../../api:scoped_refptr", - "../synchronization:sequence_checker", - ] -} - rtc_source_set("to_queued_task") { sources = [ "to_queued_task.h" ] deps = [ "../../api/task_queue" ] } if (rtc_include_tests) { - rtc_library("pending_task_safety_flag_unittests") { - testonly = true - sources = [ "pending_task_safety_flag_unittest.cc" ] - deps = [ - ":pending_task_safety_flag", - ":to_queued_task", - "..:rtc_base_approved", - "..:rtc_task_queue", - "..:task_queue_for_test", - "../../test:test_support", - ] - } - rtc_library("repeating_task_unittests") { testonly = true sources = [ "repeating_task_unittest.cc" ] diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc deleted file mode 100644 index 307d2d594c..0000000000 --- a/rtc_base/task_utils/pending_task_safety_flag.cc +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2020 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "rtc_base/task_utils/pending_task_safety_flag.h" - -#include "rtc_base/ref_counted_object.h" - -namespace webrtc { - -// static -PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() { - return new rtc::RefCountedObject<PendingTaskSafetyFlag>(); -} - -void PendingTaskSafetyFlag::SetNotAlive() { - RTC_DCHECK_RUN_ON(&main_sequence_); - alive_ = false; -} - -bool PendingTaskSafetyFlag::alive() const { - RTC_DCHECK_RUN_ON(&main_sequence_); - return alive_; -} - -} // namespace webrtc diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h deleted file mode 100644 index 1b301c8034..0000000000 --- a/rtc_base/task_utils/pending_task_safety_flag.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2020 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ -#define RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ - -#include "api/scoped_refptr.h" -#include "rtc_base/checks.h" -#include "rtc_base/ref_count.h" -#include "rtc_base/synchronization/sequence_checker.h" - -namespace webrtc { - -// Use this flag to drop pending tasks that have been posted to the "main" -// thread/TQ and end up running after the owning instance has been -// deleted. The owning instance signals deletion by calling SetNotAlive() from -// its destructor. -// -// When posting a task, post a copy (capture by-value in a lambda) of the flag -// instance and before performing the work, check the |alive()| state. Abort if -// alive() returns |false|: -// -// // Running outside of the main thread. -// my_task_queue_->PostTask(ToQueuedTask( -// [safety = pending_task_safety_flag_, this]() { -// // Now running on the main thread. -// if (!safety->alive()) -// return; -// MyMethod(); -// })); -// -// Note that checking the state only works on the construction/destruction -// thread of the ReceiveStatisticsProxy instance. -class PendingTaskSafetyFlag : public rtc::RefCountInterface { - public: - using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>; - static Pointer Create(); - - ~PendingTaskSafetyFlag() = default; - - void SetNotAlive(); - bool alive() const; - - protected: - PendingTaskSafetyFlag() = default; - - private: - bool alive_ = true; - SequenceChecker main_sequence_; -}; - -} // namespace webrtc - -#endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc deleted file mode 100644 index 0c1c3c8e52..0000000000 --- a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright 2019 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "rtc_base/task_utils/pending_task_safety_flag.h" - -#include <memory> - -#include "rtc_base/event.h" -#include "rtc_base/logging.h" -#include "rtc_base/task_queue_for_test.h" -#include "rtc_base/task_utils/to_queued_task.h" -#include "test/gmock.h" -#include "test/gtest.h" - -namespace webrtc { -namespace { -using ::testing::AtLeast; -using ::testing::Invoke; -using ::testing::MockFunction; -using ::testing::NiceMock; -using ::testing::Return; -} // namespace - -TEST(PendingTaskSafetyFlagTest, Basic) { - PendingTaskSafetyFlag::Pointer safety_flag; - { - // Scope for the |owner| instance. - class Owner { - public: - Owner() = default; - ~Owner() { flag_->SetNotAlive(); } - - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; - } owner; - EXPECT_TRUE(owner.flag_->alive()); - safety_flag = owner.flag_; - EXPECT_TRUE(safety_flag->alive()); - } - EXPECT_FALSE(safety_flag->alive()); -} - -TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) { - TaskQueueForTest tq1("OwnerHere"); - TaskQueueForTest tq2("OwnerNotHere"); - - class Owner { - public: - Owner() : tq_main_(TaskQueueBase::Current()) { RTC_DCHECK(tq_main_); } - ~Owner() { - RTC_DCHECK(tq_main_->IsCurrent()); - flag_->SetNotAlive(); - } - - void DoStuff() { - RTC_DCHECK(!tq_main_->IsCurrent()); - tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() { - if (!safe->alive()) - return; - stuff_done_ = true; - })); - } - - bool stuff_done() const { return stuff_done_; } - - private: - TaskQueueBase* const tq_main_; - bool stuff_done_ = false; - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; - }; - - std::unique_ptr<Owner> owner; - tq1.SendTask( - [&owner]() { - owner.reset(new Owner()); - EXPECT_FALSE(owner->stuff_done()); - }, - RTC_FROM_HERE); - ASSERT_TRUE(owner); - tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE); - tq1.SendTask( - [&owner]() { - EXPECT_TRUE(owner->stuff_done()); - owner.reset(); - }, - RTC_FROM_HERE); - ASSERT_FALSE(owner); -} - -TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) { - TaskQueueForTest tq1("OwnerHere"); - TaskQueueForTest tq2("OwnerNotHere"); - - class Owner { - public: - explicit Owner(bool* stuff_done) - : tq_main_(TaskQueueBase::Current()), stuff_done_(stuff_done) { - RTC_DCHECK(tq_main_); - *stuff_done_ = false; - } - ~Owner() { - RTC_DCHECK(tq_main_->IsCurrent()); - flag_->SetNotAlive(); - } - - void DoStuff() { - RTC_DCHECK(!tq_main_->IsCurrent()); - tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() { - if (!safe->alive()) - return; - *stuff_done_ = true; - })); - } - - private: - TaskQueueBase* const tq_main_; - bool* const stuff_done_; - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; - }; - - std::unique_ptr<Owner> owner; - bool stuff_done = false; - tq1.SendTask([&owner, &stuff_done]() { owner.reset(new Owner(&stuff_done)); }, - RTC_FROM_HERE); - ASSERT_TRUE(owner); - // Queue up a task on tq1 that will execute before the 'DoStuff' task - // can, and delete the |owner| before the 'stuff' task can execute. - rtc::Event blocker; - tq1.PostTask([&blocker, &owner]() { - blocker.Wait(rtc::Event::kForever); - owner.reset(); - }); - - // Queue up a DoStuff... - tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE); - - ASSERT_TRUE(owner); - blocker.Set(); - - // Run an empty task on tq1 to flush all the queued tasks. - tq1.SendTask([]() {}, RTC_FROM_HERE); - ASSERT_FALSE(owner); - EXPECT_FALSE(stuff_done); -} -} // namespace webrtc diff --git a/test/scenario/call_client.h b/test/scenario/call_client.h index 33fa2765cb..803b4a8313 100644 --- a/test/scenario/call_client.h +++ b/test/scenario/call_client.h @@ -113,11 +113,6 @@ class CallClient : public EmulatedNetworkReceiverInterface { void OnPacketReceived(EmulatedIpPacket packet) override; std::unique_ptr<RtcEventLogOutput> GetLogWriter(std::string name); - // Exposed publicly so that tests can execute tasks such as querying stats - // for media streams in the expected runtime environment (essentially what - // CallClient does internally for GetStats()). - void SendTask(std::function<void()> task); - private: friend class Scenario; friend class CallClientPair; @@ -134,6 +129,7 @@ class CallClient : public EmulatedNetworkReceiverInterface { uint32_t GetNextAudioLocalSsrc(); uint32_t GetNextRtxSsrc(); void AddExtensions(std::vector<RtpExtension> extensions); + void SendTask(std::function<void()> task); int16_t Bind(EmulatedEndpoint* endpoint); void UnBind(); diff --git a/test/scenario/stats_collection_unittest.cc b/test/scenario/stats_collection_unittest.cc index af3b982838..fae3365d5d 100644 --- a/test/scenario/stats_collection_unittest.cc +++ b/test/scenario/stats_collection_unittest.cc @@ -25,26 +25,17 @@ void CreateAnalyzedStream(Scenario* s, VideoStreamConfig::Encoder::Implementation::kSoftware; config.hooks.frame_pair_handlers = {analyzer->Handler()}; auto* caller = s->CreateClient("caller", CallClientConfig()); - auto* callee = s->CreateClient("callee", CallClientConfig()); auto route = - s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)}, callee, + s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)}, + s->CreateClient("callee", CallClientConfig()), {s->CreateSimulationNode(NetworkSimulationConfig())}); - VideoStreamPair* video = s->CreateVideoStream(route->forward(), config); + auto* video = s->CreateVideoStream(route->forward(), config); auto* audio = s->CreateAudioStream(route->forward(), AudioStreamConfig()); s->Every(TimeDelta::Seconds(1), [=] { collectors->call.AddStats(caller->GetStats()); - collectors->video_send.AddStats(video->send()->GetStats(), s->Now()); collectors->audio_receive.AddStats(audio->receive()->GetStats()); - - // Querying the video stats from within the expected runtime environment - // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that - // we're currently on). - VideoReceiveStream::Stats video_receive_stats; - auto* video_stream = video->receive(); - callee->SendTask([&video_stream, &video_receive_stats]() { - video_receive_stats = video_stream->GetStats(); - }); - collectors->video_receive.AddStats(video_receive_stats); + collectors->video_send.AddStats(video->send()->GetStats(), s->Now()); + collectors->video_receive.AddStats(video->receive()->GetStats()); }); } } // namespace diff --git a/video/BUILD.gn b/video/BUILD.gn index 9d26ee2c37..14109c3494 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -115,7 +115,6 @@ rtc_library("video") { "../rtc_base/experiments:rate_control_settings", "../rtc_base/synchronization:sequence_checker", "../rtc_base/system:thread_registry", - "../rtc_base/task_utils:pending_task_safety_flag", "../rtc_base/task_utils:repeating_task", "../rtc_base/task_utils:to_queued_task", "../rtc_base/time:timestamp_extrapolator", diff --git a/video/end_to_end_tests/retransmission_tests.cc b/video/end_to_end_tests/retransmission_tests.cc index 2633cf3a67..407aa5f2dc 100644 --- a/video/end_to_end_tests/retransmission_tests.cc +++ b/video/end_to_end_tests/retransmission_tests.cc @@ -18,7 +18,6 @@ #include "call/simulated_network.h" #include "modules/rtp_rtcp/source/rtp_packet.h" #include "modules/video_coding/codecs/vp8/include/vp8.h" -#include "rtc_base/event.h" #include "rtc_base/task_queue_for_test.h" #include "system_wrappers/include/sleep.h" #include "test/call_test.h" @@ -204,7 +203,7 @@ TEST_F(RetransmissionEndToEndTest, ReceivesNackAndRetransmitsAudio) { TEST_F(RetransmissionEndToEndTest, StopSendingKeyframeRequestsForInactiveStream) { - class KeyframeRequestObserver : public test::EndToEndTest, public QueuedTask { + class KeyframeRequestObserver : public test::EndToEndTest { public: explicit KeyframeRequestObserver(TaskQueueBase* task_queue) : clock_(Clock::GetRealTimeClock()), task_queue_(task_queue) {} @@ -217,59 +216,28 @@ TEST_F(RetransmissionEndToEndTest, receive_stream_ = receive_streams[0]; } - Action OnReceiveRtcp(const uint8_t* packet, size_t length) override { - test::RtcpPacketParser parser; - EXPECT_TRUE(parser.Parse(packet, length)); - if (parser.pli()->num_packets() > 0) - task_queue_->PostTask(std::unique_ptr<QueuedTask>(this)); - return SEND_PACKET; - } - - bool PollStats() { - if (receive_stream_->GetStats().frames_decoded > 0) { - frame_decoded_ = true; - } else if (clock_->TimeInMilliseconds() - start_time_ < 5000) { - task_queue_->PostDelayedTask(std::unique_ptr<QueuedTask>(this), 100); - return false; - } - return true; - } - void PerformTest() override { - start_time_ = clock_->TimeInMilliseconds(); - task_queue_->PostTask(std::unique_ptr<QueuedTask>(this)); - test_done_.Wait(rtc::Event::kForever); - } - - bool Run() override { - if (!frame_decoded_) { - if (PollStats()) { - send_stream_->Stop(); - if (!frame_decoded_) { - test_done_.Set(); - } else { - // Now we wait for the PLI packet. Once we receive it, a task - // will be posted (see OnReceiveRtcp) and we'll check the stats - // once more before signaling that we're done. - } + bool frame_decoded = false; + int64_t start_time = clock_->TimeInMilliseconds(); + while (clock_->TimeInMilliseconds() - start_time <= 5000) { + if (receive_stream_->GetStats().frames_decoded > 0) { + frame_decoded = true; + break; } - } else { - EXPECT_EQ( - 1U, - receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets); - test_done_.Set(); + SleepMs(100); } - return false; + ASSERT_TRUE(frame_decoded); + SendTask(RTC_FROM_HERE, task_queue_, [this]() { send_stream_->Stop(); }); + SleepMs(10000); + ASSERT_EQ( + 1U, receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets); } private: - Clock* const clock_; + Clock* clock_; VideoSendStream* send_stream_; VideoReceiveStream* receive_stream_; TaskQueueBase* const task_queue_; - rtc::Event test_done_; - bool frame_decoded_ = false; - int64_t start_time_ = 0; } test(task_queue()); RunBaseTest(&test); diff --git a/video/end_to_end_tests/stats_tests.cc b/video/end_to_end_tests/stats_tests.cc index 32bcedb9c8..b43f79df0a 100644 --- a/video/end_to_end_tests/stats_tests.cc +++ b/video/end_to_end_tests/stats_tests.cc @@ -297,7 +297,6 @@ TEST_F(StatsEndToEndTest, GetStats) { const std::vector<VideoReceiveStream*>& receive_streams) override { send_stream_ = send_stream; receive_streams_ = receive_streams; - task_queue_ = TaskQueueBase::Current(); } void PerformTest() override { @@ -308,10 +307,8 @@ TEST_F(StatsEndToEndTest, GetStats) { bool send_ok = false; while (now_ms < stop_time_ms) { - if (!receive_ok && task_queue_) { - SendTask(RTC_FROM_HERE, task_queue_, - [&]() { receive_ok = CheckReceiveStats(); }); - } + if (!receive_ok) + receive_ok = CheckReceiveStats(); if (!send_ok) send_ok = CheckSendStats(); @@ -349,7 +346,6 @@ TEST_F(StatsEndToEndTest, GetStats) { rtc::Event check_stats_event_; ReceiveStreamRenderer receive_stream_renderer_; - TaskQueueBase* task_queue_ = nullptr; } test; RunBaseTest(&test); @@ -381,28 +377,22 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) { VideoSendStream* send_stream, const std::vector<VideoReceiveStream*>& receive_streams) override { receive_streams_ = receive_streams; - task_queue_ = TaskQueueBase::Current(); } void PerformTest() override { // No frames reported initially. - SendTask(RTC_FROM_HERE, task_queue_, [&]() { - for (const auto& receive_stream : receive_streams_) { - EXPECT_FALSE(receive_stream->GetStats().timing_frame_info); - } - }); + for (const auto& receive_stream : receive_streams_) { + EXPECT_FALSE(receive_stream->GetStats().timing_frame_info); + } // Wait for at least one timing frame to be sent with 100ms grace period. SleepMs(kDefaultTimingFramesDelayMs + 100); // Check that timing frames are reported for each stream. - SendTask(RTC_FROM_HERE, task_queue_, [&]() { - for (const auto& receive_stream : receive_streams_) { - EXPECT_TRUE(receive_stream->GetStats().timing_frame_info); - } - }); + for (const auto& receive_stream : receive_streams_) { + EXPECT_TRUE(receive_stream->GetStats().timing_frame_info); + } } std::vector<VideoReceiveStream*> receive_streams_; - TaskQueueBase* task_queue_ = nullptr; } test; RunBaseTest(&test); @@ -410,8 +400,7 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) { TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) { static const size_t kNumRtpPacketsToSend = 5; - class ReceivedRtpStatsObserver : public test::EndToEndTest, - public QueuedTask { + class ReceivedRtpStatsObserver : public test::EndToEndTest { public: ReceivedRtpStatsObserver() : EndToEndTest(kDefaultTimeoutMs), @@ -423,14 +412,14 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) { VideoSendStream* send_stream, const std::vector<VideoReceiveStream*>& receive_streams) override { receive_stream_ = receive_streams[0]; - task_queue_ = TaskQueueBase::Current(); - EXPECT_TRUE(task_queue_ != nullptr); } Action OnSendRtp(const uint8_t* packet, size_t length) override { if (sent_rtp_ >= kNumRtpPacketsToSend) { - // Need to check the stats on the correct thread. - task_queue_->PostTask(std::unique_ptr<QueuedTask>(this)); + VideoReceiveStream::Stats stats = receive_stream_->GetStats(); + if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) { + observation_complete_.Set(); + } return DROP_PACKET; } ++sent_rtp_; @@ -442,17 +431,8 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) { << "Timed out while verifying number of received RTP packets."; } - bool Run() override { - VideoReceiveStream::Stats stats = receive_stream_->GetStats(); - if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) { - observation_complete_.Set(); - } - return false; - } - VideoReceiveStream* receive_stream_; uint32_t sent_rtp_; - TaskQueueBase* task_queue_ = nullptr; } test; RunBaseTest(&test); @@ -598,7 +578,7 @@ TEST_F(StatsEndToEndTest, MAYBE_ContentTypeSwitches) { TEST_F(StatsEndToEndTest, VerifyNackStats) { static const int kPacketNumberToDrop = 200; - class NackObserver : public test::EndToEndTest, public QueuedTask { + class NackObserver : public test::EndToEndTest { public: NackObserver() : EndToEndTest(kLongTimeoutMs), @@ -618,7 +598,7 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) { dropped_rtp_packet_ = header.sequenceNumber; return DROP_PACKET; } - task_queue_->PostTask(std::unique_ptr<QueuedTask>(this)); + VerifyStats(); return SEND_PACKET; } @@ -679,14 +659,6 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) { const std::vector<VideoReceiveStream*>& receive_streams) override { send_stream_ = send_stream; receive_streams_ = receive_streams; - task_queue_ = TaskQueueBase::Current(); - EXPECT_TRUE(task_queue_ != nullptr); - } - - bool Run() override { - rtc::CritScope lock(&crit_); - VerifyStats(); - return false; } void PerformTest() override { @@ -701,7 +673,6 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) { std::vector<VideoReceiveStream*> receive_streams_; VideoSendStream* send_stream_; absl::optional<int64_t> start_runtime_ms_; - TaskQueueBase* task_queue_ = nullptr; } test; metrics::Reset(); diff --git a/video/receive_statistics_proxy.cc b/video/receive_statistics_proxy.cc index acea4e3ddc..82951c8a50 100644 --- a/video/receive_statistics_proxy.cc +++ b/video/receive_statistics_proxy.cc @@ -18,12 +18,10 @@ #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/strings/string_builder.h" -#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/clock.h" #include "system_wrappers/include/field_trial.h" #include "system_wrappers/include/metrics.h" -#include "video/video_receive_stream.h" namespace webrtc { namespace { @@ -84,9 +82,9 @@ std::string UmaSuffixForContentType(VideoContentType content_type) { ReceiveStatisticsProxy::ReceiveStatisticsProxy( const VideoReceiveStream::Config* config, - Clock* clock, - TaskQueueBase* worker_thread) + Clock* clock) : clock_(clock), + config_(*config), start_ms_(clock->TimeInMilliseconds()), enable_decode_time_histograms_( !field_trial::IsEnabled("WebRTC-DecodeTimeHistogramsKillSwitch")), @@ -119,53 +117,27 @@ ReceiveStatisticsProxy::ReceiveStatisticsProxy( last_codec_type_(kVideoCodecVP8), num_delayed_frames_rendered_(0), sum_missed_render_deadline_ms_(0), - timing_frame_info_counter_(kMovingMaxWindowMs), - worker_thread_(worker_thread) { - RTC_DCHECK(worker_thread); - decode_queue_.Detach(); - incoming_render_queue_.Detach(); - stats_.ssrc = config->rtp.remote_ssrc; -} - -ReceiveStatisticsProxy::ReceiveStatisticsProxy( - const VideoReceiveStream::Config* config, - Clock* clock) - : ReceiveStatisticsProxy(config, clock, internal::GetCurrentTaskQueue()) {} - -ReceiveStatisticsProxy::~ReceiveStatisticsProxy() { - RTC_DCHECK_RUN_ON(&main_thread_); - task_safety_flag_->SetNotAlive(); + timing_frame_info_counter_(kMovingMaxWindowMs) { + decode_thread_.Detach(); + network_thread_.Detach(); + stats_.ssrc = config_.rtp.remote_ssrc; } void ReceiveStatisticsProxy::UpdateHistograms( absl::optional<int> fraction_lost, const StreamDataCounters& rtp_stats, const StreamDataCounters* rtx_stats) { - { - // TODO(webrtc:11489): Delete this scope after refactoring. - // We're actually on the main thread here, below is the explanation for - // why we use another thread checker. Once refactored, we can clean this - // up and not use the decode_queue_ checker here. - RTC_DCHECK_RUN_ON(&main_thread_); - } - - // We're not actually running on the decoder thread, but must be called after + // Not actually running on the decoder thread, but must be called after // DecoderThreadStopped, which detaches the thread checker. It is therefore // safe to access |qp_counters_|, which were updated on the decode thread // earlier. - RTC_DCHECK_RUN_ON(&decode_queue_); + RTC_DCHECK_RUN_ON(&decode_thread_); rtc::CritScope lock(&crit_); - // TODO(webrtc:11489): Many of these variables don't need to be inside the - // scope of a lock. Also consider grabbing the lock only to copy the state - // that histograms need to be reported for, then report histograms while not - // holding the lock. char log_stream_buf[8 * 1024]; rtc::SimpleStringBuilder log_stream(log_stream_buf); - int stream_duration_sec = (clock_->TimeInMilliseconds() - start_ms_) / 1000; - if (stats_.frame_counts.key_frames > 0 || stats_.frame_counts.delta_frames > 0) { RTC_HISTOGRAM_COUNTS_100000("WebRTC.Video.ReceiveStreamLifetimeInSeconds", @@ -504,8 +476,6 @@ void ReceiveStatisticsProxy::UpdateHistograms( } void ReceiveStatisticsProxy::QualitySample() { - RTC_DCHECK_RUN_ON(&incoming_render_queue_); - int64_t now = clock_->TimeInMilliseconds(); if (last_sample_time_ + kMinSampleLengthMs > now) return; @@ -575,8 +545,6 @@ void ReceiveStatisticsProxy::QualitySample() { } void ReceiveStatisticsProxy::UpdateFramerate(int64_t now_ms) const { - // TODO(webrtc:11489): Currently seems to be called from two threads, - // main and decode. Consider moving both to main. int64_t old_frames_ms = now_ms - kRateStatisticsWindowSizeMs; while (!frame_window_.empty() && frame_window_.begin()->first < old_frames_ms) { @@ -592,9 +560,6 @@ void ReceiveStatisticsProxy::UpdateDecodeTimeHistograms( int width, int height, int decode_time_ms) const { - RTC_DCHECK_RUN_ON(&decode_queue_); - // TODO(webrtc:11489): Consider posting the work to the worker thread. - bool is_4k = (width == 3840 || width == 4096) && height == 2160; bool is_hd = width == 1920 && height == 1080; // Only update histograms for 4k/HD and VP9/H264. @@ -649,7 +614,6 @@ void ReceiveStatisticsProxy::UpdateDecodeTimeHistograms( absl::optional<int64_t> ReceiveStatisticsProxy::GetCurrentEstimatedPlayoutNtpTimestampMs( int64_t now_ms) const { - RTC_DCHECK_RUN_ON(&main_thread_); if (!last_estimated_playout_ntp_timestamp_ms_ || !last_estimated_playout_time_ms_) { return absl::nullopt; @@ -659,12 +623,6 @@ ReceiveStatisticsProxy::GetCurrentEstimatedPlayoutNtpTimestampMs( } VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const { - RTC_DCHECK_RUN_ON(&main_thread_); - - // Like VideoReceiveStream::GetStats, called on the worker thread from - // StatsCollector::ExtractMediaInfo via worker_thread()->Invoke(). - // WebRtcVideoChannel::GetStats(), GetVideoReceiverInfo. - rtc::CritScope lock(&crit_); // Get current frame rates here, as only updating them on new frames prevents // us from ever correctly displaying frame rate of 0. @@ -696,16 +654,12 @@ VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const { } void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) { - RTC_DCHECK_RUN_ON(&decode_queue_); rtc::CritScope lock(&crit_); stats_.current_payload_type = payload_type; } void ReceiveStatisticsProxy::OnDecoderImplementationName( const char* implementation_name) { - RTC_DCHECK_RUN_ON(&decode_queue_); - // TODO(webrtc:11489): is a lock needed for this variable? Currently seems to - // be only touched on the decoder queue. rtc::CritScope lock(&crit_); stats_.decoder_implementation_name = implementation_name; } @@ -717,7 +671,6 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated( int jitter_buffer_ms, int min_playout_delay_ms, int render_delay_ms) { - RTC_DCHECK_RUN_ON(&decode_queue_); rtc::CritScope lock(&crit_); stats_.max_decode_ms = max_decode_ms; stats_.current_delay_ms = current_delay_ms; @@ -734,14 +687,12 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated( } void ReceiveStatisticsProxy::OnUniqueFramesCounted(int num_unique_frames) { - RTC_DCHECK_RUN_ON(&main_thread_); rtc::CritScope lock(&crit_); num_unique_frames_.emplace(num_unique_frames); } void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated( const TimingFrameInfo& info) { - RTC_DCHECK_RUN_ON(&decode_queue_); rtc::CritScope lock(&crit_); if (info.flags != VideoSendTiming::kInvalid) { int64_t now_ms = clock_->TimeInMilliseconds(); @@ -763,28 +714,6 @@ void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated( void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated( uint32_t ssrc, const RtcpPacketTypeCounter& packet_counter) { - if (!worker_thread_->IsCurrent()) { - // RtpRtcp::Configuration has a single RtcpPacketTypeCounterObserver and - // that same configuration may be used for both receiver and sender - // (see ModuleRtpRtcpImpl::ModuleRtpRtcpImpl). - // The RTCPSender implementation currently makes calls to this function on a - // process thread whereas the RTCPReceiver implementation calls back on the - // [main] worker thread. - // So until the sender implementation has been updated, we work around this - // here by posting the update to the expected thread. We make a by value - // copy of the |task_safety_flag_| to handle the case if the queued task - // runs after the |ReceiveStatisticsProxy| has been deleted. In such a - // case the packet_counter update won't be recorded. - worker_thread_->PostTask(ToQueuedTask( - [safety = task_safety_flag_, ssrc, packet_counter, this]() { - if (!safety->alive()) - return; - RtcpPacketTypesCounterUpdated(ssrc, packet_counter); - })); - return; - } - - RTC_DCHECK_RUN_ON(&main_thread_); rtc::CritScope lock(&crit_); if (stats_.ssrc != ssrc) return; @@ -792,7 +721,6 @@ void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated( } void ReceiveStatisticsProxy::OnCname(uint32_t ssrc, absl::string_view cname) { - RTC_DCHECK_RUN_ON(&main_thread_); rtc::CritScope lock(&crit_); // TODO(pbos): Handle both local and remote ssrcs here and RTC_DCHECK that we // receive stats from one of them. @@ -805,13 +733,9 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame, absl::optional<uint8_t> qp, int32_t decode_time_ms, VideoContentType content_type) { - RTC_DCHECK_RUN_ON(&decode_queue_); - // TODO(webrtc:11489): - Same as OnRenderedFrame. Both called from within - // VideoStreamDecoder::FrameToRender - rtc::CritScope lock(&crit_); - const uint64_t now_ms = clock_->TimeInMilliseconds(); + uint64_t now_ms = clock_->TimeInMilliseconds(); if (videocontenttypehelpers::IsScreenshare(content_type) != videocontenttypehelpers::IsScreenshare(last_content_type_)) { @@ -870,10 +794,6 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame, } void ReceiveStatisticsProxy::OnRenderedFrame(const VideoFrame& frame) { - RTC_DCHECK_RUN_ON(&incoming_render_queue_); - // TODO(webrtc:11489): Consider posting the work to the worker thread. - // - Called from VideoReceiveStream::OnFrame. - int width = frame.width(); int height = frame.height(); RTC_DCHECK_GT(width, 0); @@ -913,10 +833,7 @@ void ReceiveStatisticsProxy::OnRenderedFrame(const VideoFrame& frame) { void ReceiveStatisticsProxy::OnSyncOffsetUpdated(int64_t video_playout_ntp_ms, int64_t sync_offset_ms, double estimated_freq_khz) { - RTC_DCHECK_RUN_ON(&incoming_render_queue_); rtc::CritScope lock(&crit_); - // TODO(webrtc:11489): Lock possibly not needed for sync_offset_counter_ if - // it's only touched on the decoder thread. sync_offset_counter_.Add(std::abs(sync_offset_ms)); stats_.sync_offset_ms = sync_offset_ms; last_estimated_playout_ntp_timestamp_ms_ = video_playout_ntp_ms; @@ -969,7 +886,7 @@ void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) { } void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) { - RTC_DCHECK_RUN_ON(&decode_queue_); + RTC_DCHECK_RUN_ON(&decode_thread_); rtc::CritScope lock(&crit_); last_codec_type_ = codec_type; if (last_codec_type_ == kVideoCodecVP8 && qp != -1) { @@ -979,8 +896,6 @@ void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) { } void ReceiveStatisticsProxy::OnStreamInactive() { - RTC_DCHECK_RUN_ON(&decode_queue_); - // TODO(sprang): Figure out any other state that should be reset. rtc::CritScope lock(&crit_); @@ -991,13 +906,6 @@ void ReceiveStatisticsProxy::OnStreamInactive() { void ReceiveStatisticsProxy::OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) { - // TODO(webrtc:11489): Is this a duplicate of VideoReceiveStream::OnRttUpdate? - // - looks like that runs on a/the module process thread. - // - - // BUGBUG - // Actually, it looks like this method is never called except from a unit - // test, GetStatsReportsDecodeTimingStats. rtc::CritScope lock(&crit_); avg_rtt_ms_ = avg_rtt_ms; } @@ -1008,7 +916,7 @@ void ReceiveStatisticsProxy::DecoderThreadStarting() { void ReceiveStatisticsProxy::DecoderThreadStopped() { RTC_DCHECK_RUN_ON(&main_thread_); - decode_queue_.Detach(); + decode_thread_.Detach(); } ReceiveStatisticsProxy::ContentSpecificStats::ContentSpecificStats() @@ -1029,5 +937,4 @@ void ReceiveStatisticsProxy::ContentSpecificStats::Add( frame_counts.delta_frames += other.frame_counts.delta_frames; interframe_delay_percentiles.Add(other.interframe_delay_percentiles); } - } // namespace webrtc diff --git a/video/receive_statistics_proxy.h b/video/receive_statistics_proxy.h index 335359b724..02043d6944 100644 --- a/video/receive_statistics_proxy.h +++ b/video/receive_statistics_proxy.h @@ -17,7 +17,6 @@ #include <vector> #include "absl/types/optional.h" -#include "api/task_queue/task_queue_base.h" #include "call/video_receive_stream.h" #include "modules/include/module_common_types.h" #include "modules/video_coding/include/video_coding_defines.h" @@ -27,8 +26,6 @@ #include "rtc_base/numerics/sample_counter.h" #include "rtc_base/rate_statistics.h" #include "rtc_base/rate_tracker.h" -#include "rtc_base/synchronization/sequence_checker.h" -#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/thread_checker.h" #include "video/quality_threshold.h" @@ -46,13 +43,8 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, public CallStatsObserver { public: ReceiveStatisticsProxy(const VideoReceiveStream::Config* config, - Clock* clock, - TaskQueueBase* worker_thread); - // TODO(webrtc:11489): Remove this ctor once all callers have been updated - // to use the above one. - ReceiveStatisticsProxy(const VideoReceiveStream::Config* config, Clock* clock); - ~ReceiveStatisticsProxy(); + ~ReceiveStatisticsProxy() = default; VideoReceiveStream::Stats GetStats() const; @@ -147,6 +139,14 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, int64_t now_ms) const RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); Clock* const clock_; + // Ownership of this object lies with the owner of the ReceiveStatisticsProxy + // instance. Lifetime is guaranteed to outlive |this|. + // TODO(tommi): In practice the config_ reference is only used for accessing + // config_.rtp.ulpfec.ulpfec_payload_type. Instead of holding a pointer back, + // we could just store the value of ulpfec_payload_type and change the + // ReceiveStatisticsProxy() ctor to accept a const& of Config (since we'll + // then no longer store a pointer to the object). + const VideoReceiveStream::Config& config_; const int64_t start_ms_; const bool enable_decode_time_histograms_; @@ -177,7 +177,7 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, std::map<VideoContentType, ContentSpecificStats> content_specific_stats_ RTC_GUARDED_BY(crit_); MaxCounter freq_offset_counter_ RTC_GUARDED_BY(crit_); - QpCounters qp_counters_ RTC_GUARDED_BY(decode_queue_); + QpCounters qp_counters_ RTC_GUARDED_BY(decode_thread_); int64_t avg_rtt_ms_ RTC_GUARDED_BY(crit_); mutable std::map<int64_t, size_t> frame_window_ RTC_GUARDED_BY(&crit_); VideoContentType last_content_type_ RTC_GUARDED_BY(&crit_); @@ -196,17 +196,9 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, RTC_GUARDED_BY(&crit_); absl::optional<int64_t> last_estimated_playout_time_ms_ RTC_GUARDED_BY(&crit_); - - // The thread on which this instance is constructed and some of its main - // methods are invoked on such as GetStats(). - TaskQueueBase* const worker_thread_; - - PendingTaskSafetyFlag::Pointer task_safety_flag_{ - PendingTaskSafetyFlag::Create()}; - - SequenceChecker decode_queue_; + rtc::ThreadChecker decode_thread_; + rtc::ThreadChecker network_thread_; rtc::ThreadChecker main_thread_; - SequenceChecker incoming_render_queue_; }; } // namespace webrtc diff --git a/video/receive_statistics_proxy_unittest.cc b/video/receive_statistics_proxy_unittest.cc index 59dca1de3d..626542c810 100644 --- a/video/receive_statistics_proxy_unittest.cc +++ b/video/receive_statistics_proxy_unittest.cc @@ -22,8 +22,6 @@ #include "api/video/video_frame.h" #include "api/video/video_frame_buffer.h" #include "api/video/video_rotation.h" -#include "rtc_base/task_utils/to_queued_task.h" -#include "rtc_base/thread.h" #include "system_wrappers/include/metrics.h" #include "test/field_trial.h" #include "test/gtest.h" @@ -41,63 +39,13 @@ const int kHeight = 720; // TODO(sakal): ReceiveStatisticsProxy is lacking unittesting. class ReceiveStatisticsProxyTest : public ::testing::Test { public: - ReceiveStatisticsProxyTest() - : fake_clock_(1234), - config_(GetTestConfig()), - worker_thread_(&socket_server_) { - worker_thread_.WrapCurrent(); - RTC_CHECK_EQ(webrtc::TaskQueueBase::Current(), - static_cast<TaskQueueBase*>(&worker_thread_)); - metrics::Reset(); - statistics_proxy_.reset( - new ReceiveStatisticsProxy(&config_, &fake_clock_, &worker_thread_)); - } - - ~ReceiveStatisticsProxyTest() override { - statistics_proxy_.reset(); - worker_thread_.UnwrapCurrent(); - } + ReceiveStatisticsProxyTest() : fake_clock_(1234), config_(GetTestConfig()) {} + virtual ~ReceiveStatisticsProxyTest() {} protected: - class FakeSocketServer : public rtc::SocketServer { - public: - FakeSocketServer() = default; - ~FakeSocketServer() = default; - - bool Wait(int cms, bool process_io) override { - if (fail_next_wait_) { - fail_next_wait_ = false; - return false; - } - return true; - } - - void WakeUp() override {} - - rtc::Socket* CreateSocket(int family, int type) override { return nullptr; } - rtc::AsyncSocket* CreateAsyncSocket(int family, int type) override { - return nullptr; - } - - void FailNextWait() { fail_next_wait_ = true; } - - private: - bool fail_next_wait_ = false; - }; - - class WorkerThread : public rtc::Thread { - public: - explicit WorkerThread(rtc::SocketServer* ss) - : rtc::Thread(ss), tq_setter_(this) {} - - private: - CurrentTaskQueueSetter tq_setter_; - }; - - void FlushWorker() { - worker_thread_.PostTask( - ToQueuedTask([this]() { socket_server_.FailNextWait(); })); - worker_thread_.ProcessMessages(1000); + virtual void SetUp() { + metrics::Reset(); + statistics_proxy_.reset(new ReceiveStatisticsProxy(&config_, &fake_clock_)); } VideoReceiveStream::Config GetTestConfig() { @@ -130,8 +78,6 @@ class ReceiveStatisticsProxyTest : public ::testing::Test { SimulatedClock fake_clock_; const VideoReceiveStream::Config config_; std::unique_ptr<ReceiveStatisticsProxy> statistics_proxy_; - FakeSocketServer socket_server_; - WorkerThread worker_thread_; }; TEST_F(ReceiveStatisticsProxyTest, OnDecodedFrameIncreasesFramesDecoded) { diff --git a/video/video_quality_observer.cc b/video/video_quality_observer.cc index e10def2d79..be7b08c887 100644 --- a/video/video_quality_observer.cc +++ b/video/video_quality_observer.cc @@ -49,14 +49,10 @@ VideoQualityObserver::VideoQualityObserver(VideoContentType content_type) current_resolution_(Resolution::Low), num_resolution_downgrades_(0), time_in_blocky_video_ms_(0), - // TODO(webrtc:11489): content_type_ variable isn't necessary. content_type_(content_type), is_paused_(false) {} void VideoQualityObserver::UpdateHistograms() { - // TODO(webrtc:11489): Called on the decoder thread - which _might_ be - // the same as the construction thread. - // Don't report anything on an empty video stream. if (num_frames_rendered_ == 0) { return; diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc index 5d371a59dd..b2b96db9bf 100644 --- a/video/video_receive_stream.cc +++ b/video/video_receive_stream.cc @@ -182,13 +182,6 @@ constexpr int kInactiveStreamThresholdMs = 600000; // 10 minutes. namespace internal { -TaskQueueBase* GetCurrentTaskQueue() { - TaskQueueBase* ret = TaskQueueBase::Current(); - if (!ret) - ret = rtc::ThreadManager::Instance()->CurrentThread(); - return ret; -} - VideoReceiveStream::VideoReceiveStream( TaskQueueFactory* task_queue_factory, RtpStreamReceiverControllerInterface* receiver_controller, @@ -204,11 +197,10 @@ VideoReceiveStream::VideoReceiveStream( config_(std::move(config)), num_cpu_cores_(num_cpu_cores), process_thread_(process_thread), - worker_thread_(GetCurrentTaskQueue()), clock_(clock), call_stats_(call_stats), source_tracker_(clock_), - stats_proxy_(&config_, clock_, worker_thread_), + stats_proxy_(&config_, clock_), rtp_receive_statistics_(ReceiveStatistics::Create(clock_)), timing_(timing), video_receiver_(clock_, timing_.get()), @@ -447,7 +439,6 @@ void VideoReceiveStream::Stop() { } VideoReceiveStream::Stats VideoReceiveStream::GetStats() const { - RTC_DCHECK_RUN_ON(&worker_sequence_checker_); VideoReceiveStream::Stats stats = stats_proxy_.GetStats(); stats.total_bitrate_bps = 0; StreamStatistician* statistician = @@ -466,7 +457,6 @@ VideoReceiveStream::Stats VideoReceiveStream::GetStats() const { } void VideoReceiveStream::UpdateHistograms() { - RTC_DCHECK_RUN_ON(&worker_sequence_checker_); absl::optional<int> fraction_lost; StreamDataCounters rtp_stats; StreamStatistician* statistician = @@ -503,7 +493,6 @@ bool VideoReceiveStream::SetBaseMinimumPlayoutDelayMs(int delay_ms) { return false; } - // TODO(webrtc:11489): Consider posting to worker. rtc::CritScope cs(&playout_delay_lock_); base_minimum_playout_delay_ms_ = delay_ms; UpdatePlayoutDelays(); @@ -517,19 +506,19 @@ int VideoReceiveStream::GetBaseMinimumPlayoutDelayMs() const { return base_minimum_playout_delay_ms_; } -// TODO(webrtc:11489): This method grabs a lock 6 times. +// TODO(tommi): This method grabs a lock 6 times. void VideoReceiveStream::OnFrame(const VideoFrame& video_frame) { int64_t video_playout_ntp_ms; int64_t sync_offset_ms; double estimated_freq_khz; - // TODO(webrtc:11489): GetStreamSyncOffsetInMs grabs three locks. One inside - // the function itself, another in GetChannel() and a third in + // TODO(tommi): GetStreamSyncOffsetInMs grabs three locks. One inside the + // function itself, another in GetChannel() and a third in // GetPlayoutTimestamp. Seems excessive. Anyhow, I'm assuming the function // succeeds most of the time, which leads to grabbing a fourth lock. if (rtp_stream_sync_.GetStreamSyncOffsetInMs( video_frame.timestamp(), video_frame.render_time_ms(), &video_playout_ntp_ms, &sync_offset_ms, &estimated_freq_khz)) { - // TODO(webrtc:11489): OnSyncOffsetUpdated grabs a lock. + // TODO(tommi): OnSyncOffsetUpdated grabs a lock. stats_proxy_.OnSyncOffsetUpdated(video_playout_ntp_ms, sync_offset_ms, estimated_freq_khz); } @@ -537,7 +526,7 @@ void VideoReceiveStream::OnFrame(const VideoFrame& video_frame) { config_.renderer->OnFrame(video_frame); - // TODO(webrtc:11489): OnRenderFrame grabs a lock too. + // TODO(tommi): OnRenderFrame grabs a lock too. stats_proxy_.OnRenderedFrame(video_frame); } @@ -574,9 +563,6 @@ void VideoReceiveStream::OnCompleteFrame( } last_complete_frame_time_ms_ = time_now_ms; - // TODO(webrtc:11489): We grab the playout_delay_lock_ lock potentially twice. - // Consider checking both min/max and posting to worker if there's a change. - // If we always update playout delays on the worker, we don't need a lock. const PlayoutDelay& playout_delay = frame->EncodedImage().playout_delay_; if (playout_delay.min_ms >= 0) { rtc::CritScope cs(&playout_delay_lock_); @@ -632,7 +618,6 @@ void VideoReceiveStream::SetEstimatedPlayoutNtpTimestampMs( void VideoReceiveStream::SetMinimumPlayoutDelay(int delay_ms) { RTC_DCHECK_RUN_ON(&module_process_sequence_checker_); - // TODO(webrtc:11489): Consider posting to worker. rtc::CritScope cs(&playout_delay_lock_); syncable_minimum_playout_delay_ms_ = delay_ms; UpdatePlayoutDelays(); @@ -667,7 +652,6 @@ void VideoReceiveStream::StartNextDecode() { void VideoReceiveStream::HandleEncodedFrame( std::unique_ptr<EncodedFrame> frame) { - // Running on |decode_queue_|. int64_t now_ms = clock_->TimeInMilliseconds(); // Current OnPreDecode only cares about QP for VP8. @@ -722,7 +706,6 @@ void VideoReceiveStream::HandleKeyFrameGeneration( } void VideoReceiveStream::HandleFrameBufferTimeout() { - // Running on |decode_queue_|. int64_t now_ms = clock_->TimeInMilliseconds(); absl::optional<int64_t> last_packet_ms = rtp_video_stream_receiver_.LastReceivedPacketMs(); diff --git a/video/video_receive_stream.h b/video/video_receive_stream.h index 1cc7dd2184..c1ebf2b600 100644 --- a/video/video_receive_stream.h +++ b/video/video_receive_stream.h @@ -45,12 +45,6 @@ class VCMTiming; namespace internal { -// Utility function that fetches the TQ that's active in the current context -// or the active rtc::Thread if no TQ is active. This is necessary at the moment -// for VideoReceiveStream and downstream classes as tests and production don't -// consistently follow the same procedures. -TaskQueueBase* GetCurrentTaskQueue(); - class VideoReceiveStream : public webrtc::VideoReceiveStream, public rtc::VideoSinkInterface<VideoFrame>, public NackSender, @@ -167,7 +161,6 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, const VideoReceiveStream::Config config_; const int num_cpu_cores_; ProcessThread* const process_thread_; - TaskQueueBase* const worker_thread_; Clock* const clock_; CallStats* const call_stats_; |