aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--BUILD.gn1
-rw-r--r--call/call_perf_tests.cc53
-rw-r--r--call/rtp_video_sender_unittest.cc16
-rw-r--r--modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc18
-rw-r--r--rtc_base/task_utils/BUILD.gn27
-rw-r--r--rtc_base/task_utils/pending_task_safety_flag.cc32
-rw-r--r--rtc_base/task_utils/pending_task_safety_flag.h61
-rw-r--r--rtc_base/task_utils/pending_task_safety_flag_unittest.cc151
-rw-r--r--test/scenario/call_client.h6
-rw-r--r--test/scenario/stats_collection_unittest.cc19
-rw-r--r--video/BUILD.gn1
-rw-r--r--video/end_to_end_tests/retransmission_tests.cc60
-rw-r--r--video/end_to_end_tests/stats_tests.cc59
-rw-r--r--video/receive_statistics_proxy.cc115
-rw-r--r--video/receive_statistics_proxy.h32
-rw-r--r--video/receive_statistics_proxy_unittest.cc64
-rw-r--r--video/video_quality_observer.cc4
-rw-r--r--video/video_receive_stream.cc29
-rw-r--r--video/video_receive_stream.h7
19 files changed, 100 insertions, 655 deletions
diff --git a/BUILD.gn b/BUILD.gn
index 4e30a71e7b..b3e771071f 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -547,7 +547,6 @@ if (rtc_include_tests) {
"rtc_base:weak_ptr_unittests",
"rtc_base/experiments:experiments_unittests",
"rtc_base/synchronization:sequence_checker_unittests",
- "rtc_base/task_utils:pending_task_safety_flag_unittests",
"rtc_base/task_utils:to_queued_task_unittests",
"sdk:sdk_tests",
"test:rtp_test_utils",
diff --git a/call/call_perf_tests.cc b/call/call_perf_tests.cc
index 123be7da4c..2d23087cc8 100644
--- a/call/call_perf_tests.cc
+++ b/call/call_perf_tests.cc
@@ -96,24 +96,21 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
static const int kMinRunTimeMs = 30000;
public:
- explicit VideoRtcpAndSyncObserver(TaskQueueBase* task_queue,
- Clock* clock,
- const std::string& test_label)
+ explicit VideoRtcpAndSyncObserver(Clock* clock, const std::string& test_label)
: test::RtpRtcpObserver(CallPerfTest::kLongTimeoutMs),
clock_(clock),
test_label_(test_label),
creation_time_ms_(clock_->TimeInMilliseconds()),
- task_queue_(task_queue) {}
+ first_time_in_sync_(-1),
+ receive_stream_(nullptr) {}
void OnFrame(const VideoFrame& video_frame) override {
- task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); }));
- }
-
- void CheckStats() {
- if (!receive_stream_)
- return;
-
- VideoReceiveStream::Stats stats = receive_stream_->GetStats();
+ VideoReceiveStream::Stats stats;
+ {
+ rtc::CritScope lock(&crit_);
+ if (receive_stream_)
+ stats = receive_stream_->GetStats();
+ }
if (stats.sync_offset_ms == std::numeric_limits<int>::max())
return;
@@ -138,8 +135,7 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
}
void set_receive_stream(VideoReceiveStream* receive_stream) {
- RTC_DCHECK_EQ(task_queue_, TaskQueueBase::Current());
- // Note that receive_stream may be nullptr.
+ rtc::CritScope lock(&crit_);
receive_stream_ = receive_stream;
}
@@ -152,10 +148,10 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
Clock* const clock_;
std::string test_label_;
const int64_t creation_time_ms_;
- int64_t first_time_in_sync_ = -1;
- VideoReceiveStream* receive_stream_ = nullptr;
+ int64_t first_time_in_sync_;
+ rtc::CriticalSection crit_;
+ VideoReceiveStream* receive_stream_ RTC_GUARDED_BY(crit_);
std::vector<double> sync_offset_ms_list_;
- TaskQueueBase* const task_queue_;
};
void CallPerfTest::TestAudioVideoSync(FecMode fec,
@@ -172,8 +168,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
audio_net_config.queue_delay_ms = 500;
audio_net_config.loss_percent = 5;
- auto observer = std::make_unique<VideoRtcpAndSyncObserver>(
- task_queue(), Clock::GetRealTimeClock(), test_label);
+ VideoRtcpAndSyncObserver observer(Clock::GetRealTimeClock(), test_label);
std::map<uint8_t, MediaType> audio_pt_map;
std::map<uint8_t, MediaType> video_pt_map;
@@ -223,7 +218,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
});
audio_send_transport = std::make_unique<test::PacketTransport>(
- task_queue(), sender_call_.get(), observer.get(),
+ task_queue(), sender_call_.get(), &observer,
test::PacketTransport::kSender, audio_pt_map,
std::make_unique<FakeNetworkPipe>(
Clock::GetRealTimeClock(),
@@ -231,7 +226,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
audio_send_transport->SetReceiver(receiver_call_->Receiver());
video_send_transport = std::make_unique<test::PacketTransport>(
- task_queue(), sender_call_.get(), observer.get(),
+ task_queue(), sender_call_.get(), &observer,
test::PacketTransport::kSender, video_pt_map,
std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
std::make_unique<SimulatedNetwork>(
@@ -239,7 +234,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
video_send_transport->SetReceiver(receiver_call_->Receiver());
receive_transport = std::make_unique<test::PacketTransport>(
- task_queue(), receiver_call_.get(), observer.get(),
+ task_queue(), receiver_call_.get(), &observer,
test::PacketTransport::kReceiver, payload_type_map_,
std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
std::make_unique<SimulatedNetwork>(
@@ -264,7 +259,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
video_receive_configs_[0].rtp.ulpfec_payload_type = kUlpfecPayloadType;
}
video_receive_configs_[0].rtp.nack.rtp_history_ms = 1000;
- video_receive_configs_[0].renderer = observer.get();
+ video_receive_configs_[0].renderer = &observer;
video_receive_configs_[0].sync_group = kSyncGroup;
AudioReceiveStream::Config audio_recv_config;
@@ -286,7 +281,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
receiver_call_->CreateAudioReceiveStream(audio_recv_config);
}
EXPECT_EQ(1u, video_receive_streams_.size());
- observer->set_receive_stream(video_receive_streams_[0]);
+ observer.set_receive_stream(video_receive_streams_[0]);
drifting_clock = std::make_unique<DriftingClock>(clock_, video_ntp_speed);
CreateFrameGeneratorCapturerWithDrift(drifting_clock.get(), video_rtp_speed,
kDefaultFramerate, kDefaultWidth,
@@ -298,13 +293,10 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
audio_receive_stream->Start();
});
- EXPECT_TRUE(observer->Wait())
+ EXPECT_TRUE(observer.Wait())
<< "Timed out while waiting for audio and video to be synchronized.";
SendTask(RTC_FROM_HERE, task_queue(), [&]() {
- // Clear the pointer to the receive stream since it will now be deleted.
- observer->set_receive_stream(nullptr);
-
audio_send_stream->Stop();
audio_receive_stream->Stop();
@@ -322,7 +314,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
DestroyCalls();
});
- observer->PrintResults();
+ observer.PrintResults();
// In quick test synchronization may not be achieved in time.
if (!field_trial::IsEnabled("WebRTC-QuickPerfTest")) {
@@ -331,9 +323,6 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
EXPECT_METRIC_EQ(1, metrics::NumSamples("WebRTC.Video.AVSyncOffsetInMs"));
#endif
}
-
- task_queue()->PostTask(
- ToQueuedTask([to_delete = observer.release()]() { delete to_delete; }));
}
TEST_F(CallPerfTest, PlaysOutAudioAndVideoInSyncWithoutClockDrift) {
diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc
index b6fbf77e6d..71bec5e7bb 100644
--- a/call/rtp_video_sender_unittest.cc
+++ b/call/rtp_video_sender_unittest.cc
@@ -510,9 +510,9 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
test::NetworkSimulationConfig net_conf;
net_conf.bandwidth = DataRate::KilobitsPerSec(300);
auto send_node = s.CreateSimulationNode(net_conf);
- auto* callee = s.CreateClient("return", call_conf);
auto* route = s.CreateRoutes(s.CreateClient("send", call_conf), {send_node},
- callee, {s.CreateSimulationNode(net_conf)});
+ s.CreateClient("return", call_conf),
+ {s.CreateSimulationNode(net_conf)});
test::VideoStreamConfig lossy_config;
lossy_config.source.framerate = 5;
@@ -540,20 +540,14 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
// from initial probing.
s.RunFor(TimeDelta::Seconds(1));
rtx_packets = 0;
- int decoded_baseline = 0;
- callee->SendTask([&decoded_baseline, &lossy]() {
- decoded_baseline = lossy->receive()->GetStats().frames_decoded;
- });
+ int decoded_baseline = lossy->receive()->GetStats().frames_decoded;
s.RunFor(TimeDelta::Seconds(1));
// We expect both that RTX packets were sent and that an appropriate number of
// frames were received. This is somewhat redundant but reduces the risk of
// false positives in future regressions (e.g. RTX is send due to probing).
EXPECT_GE(rtx_packets, 1);
- int frames_decoded = 0;
- callee->SendTask([&decoded_baseline, &frames_decoded, &lossy]() {
- frames_decoded =
- lossy->receive()->GetStats().frames_decoded - decoded_baseline;
- });
+ int frames_decoded =
+ lossy->receive()->GetStats().frames_decoded - decoded_baseline;
EXPECT_EQ(frames_decoded, 5);
}
diff --git a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc
index 361da92ff2..1083214fa5 100644
--- a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc
+++ b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc
@@ -537,8 +537,8 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) {
auto ret_net = {s.CreateSimulationNode(net_conf)};
auto* client = s.CreateClient("send", CallClientConfig());
- auto* callee = s.CreateClient("return", CallClientConfig());
- auto* route = s.CreateRoutes(client, send_net, callee, ret_net);
+ auto* route = s.CreateRoutes(
+ client, send_net, s.CreateClient("return", CallClientConfig()), ret_net);
// TODO(srte): Make this work with RTX enabled or remove it.
auto* video = s.CreateVideoStream(route->forward(), [](VideoStreamConfig* c) {
c->stream.use_rtx = false;
@@ -553,17 +553,9 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) {
s.net()->StopCrossTraffic(tcp_traffic);
s.RunFor(TimeDelta::Seconds(20));
}
-
- // Querying the video stats from within the expected runtime environment
- // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
- // we're currently on).
- VideoReceiveStream::Stats video_receive_stats;
- auto* video_stream = video->receive();
- callee->SendTask([&video_stream, &video_receive_stats]() {
- video_receive_stats = video_stream->GetStats();
- });
- return DataSize::Bytes(
- video_receive_stats.rtp_stats.packet_counter.TotalBytes()) /
+ return DataSize::Bytes(video->receive()
+ ->GetStats()
+ .rtp_stats.packet_counter.TotalBytes()) /
s.TimeSinceStart();
}
diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn
index 8409aa29e5..2e7d53ceb2 100644
--- a/rtc_base/task_utils/BUILD.gn
+++ b/rtc_base/task_utils/BUILD.gn
@@ -26,39 +26,12 @@ rtc_library("repeating_task") {
]
}
-rtc_library("pending_task_safety_flag") {
- sources = [
- "pending_task_safety_flag.cc",
- "pending_task_safety_flag.h",
- ]
- deps = [
- "..:checks",
- "..:refcount",
- "..:thread_checker",
- "../../api:scoped_refptr",
- "../synchronization:sequence_checker",
- ]
-}
-
rtc_source_set("to_queued_task") {
sources = [ "to_queued_task.h" ]
deps = [ "../../api/task_queue" ]
}
if (rtc_include_tests) {
- rtc_library("pending_task_safety_flag_unittests") {
- testonly = true
- sources = [ "pending_task_safety_flag_unittest.cc" ]
- deps = [
- ":pending_task_safety_flag",
- ":to_queued_task",
- "..:rtc_base_approved",
- "..:rtc_task_queue",
- "..:task_queue_for_test",
- "../../test:test_support",
- ]
- }
-
rtc_library("repeating_task_unittests") {
testonly = true
sources = [ "repeating_task_unittest.cc" ]
diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc
deleted file mode 100644
index 307d2d594c..0000000000
--- a/rtc_base/task_utils/pending_task_safety_flag.cc
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2020 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-
-#include "rtc_base/ref_counted_object.h"
-
-namespace webrtc {
-
-// static
-PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() {
- return new rtc::RefCountedObject<PendingTaskSafetyFlag>();
-}
-
-void PendingTaskSafetyFlag::SetNotAlive() {
- RTC_DCHECK_RUN_ON(&main_sequence_);
- alive_ = false;
-}
-
-bool PendingTaskSafetyFlag::alive() const {
- RTC_DCHECK_RUN_ON(&main_sequence_);
- return alive_;
-}
-
-} // namespace webrtc
diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h
deleted file mode 100644
index 1b301c8034..0000000000
--- a/rtc_base/task_utils/pending_task_safety_flag.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2020 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
-#define RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
-
-#include "api/scoped_refptr.h"
-#include "rtc_base/checks.h"
-#include "rtc_base/ref_count.h"
-#include "rtc_base/synchronization/sequence_checker.h"
-
-namespace webrtc {
-
-// Use this flag to drop pending tasks that have been posted to the "main"
-// thread/TQ and end up running after the owning instance has been
-// deleted. The owning instance signals deletion by calling SetNotAlive() from
-// its destructor.
-//
-// When posting a task, post a copy (capture by-value in a lambda) of the flag
-// instance and before performing the work, check the |alive()| state. Abort if
-// alive() returns |false|:
-//
-// // Running outside of the main thread.
-// my_task_queue_->PostTask(ToQueuedTask(
-// [safety = pending_task_safety_flag_, this]() {
-// // Now running on the main thread.
-// if (!safety->alive())
-// return;
-// MyMethod();
-// }));
-//
-// Note that checking the state only works on the construction/destruction
-// thread of the ReceiveStatisticsProxy instance.
-class PendingTaskSafetyFlag : public rtc::RefCountInterface {
- public:
- using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>;
- static Pointer Create();
-
- ~PendingTaskSafetyFlag() = default;
-
- void SetNotAlive();
- bool alive() const;
-
- protected:
- PendingTaskSafetyFlag() = default;
-
- private:
- bool alive_ = true;
- SequenceChecker main_sequence_;
-};
-
-} // namespace webrtc
-
-#endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
deleted file mode 100644
index 0c1c3c8e52..0000000000
--- a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Copyright 2019 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-
-#include <memory>
-
-#include "rtc_base/event.h"
-#include "rtc_base/logging.h"
-#include "rtc_base/task_queue_for_test.h"
-#include "rtc_base/task_utils/to_queued_task.h"
-#include "test/gmock.h"
-#include "test/gtest.h"
-
-namespace webrtc {
-namespace {
-using ::testing::AtLeast;
-using ::testing::Invoke;
-using ::testing::MockFunction;
-using ::testing::NiceMock;
-using ::testing::Return;
-} // namespace
-
-TEST(PendingTaskSafetyFlagTest, Basic) {
- PendingTaskSafetyFlag::Pointer safety_flag;
- {
- // Scope for the |owner| instance.
- class Owner {
- public:
- Owner() = default;
- ~Owner() { flag_->SetNotAlive(); }
-
- PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
- } owner;
- EXPECT_TRUE(owner.flag_->alive());
- safety_flag = owner.flag_;
- EXPECT_TRUE(safety_flag->alive());
- }
- EXPECT_FALSE(safety_flag->alive());
-}
-
-TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) {
- TaskQueueForTest tq1("OwnerHere");
- TaskQueueForTest tq2("OwnerNotHere");
-
- class Owner {
- public:
- Owner() : tq_main_(TaskQueueBase::Current()) { RTC_DCHECK(tq_main_); }
- ~Owner() {
- RTC_DCHECK(tq_main_->IsCurrent());
- flag_->SetNotAlive();
- }
-
- void DoStuff() {
- RTC_DCHECK(!tq_main_->IsCurrent());
- tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
- if (!safe->alive())
- return;
- stuff_done_ = true;
- }));
- }
-
- bool stuff_done() const { return stuff_done_; }
-
- private:
- TaskQueueBase* const tq_main_;
- bool stuff_done_ = false;
- PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
- };
-
- std::unique_ptr<Owner> owner;
- tq1.SendTask(
- [&owner]() {
- owner.reset(new Owner());
- EXPECT_FALSE(owner->stuff_done());
- },
- RTC_FROM_HERE);
- ASSERT_TRUE(owner);
- tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
- tq1.SendTask(
- [&owner]() {
- EXPECT_TRUE(owner->stuff_done());
- owner.reset();
- },
- RTC_FROM_HERE);
- ASSERT_FALSE(owner);
-}
-
-TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) {
- TaskQueueForTest tq1("OwnerHere");
- TaskQueueForTest tq2("OwnerNotHere");
-
- class Owner {
- public:
- explicit Owner(bool* stuff_done)
- : tq_main_(TaskQueueBase::Current()), stuff_done_(stuff_done) {
- RTC_DCHECK(tq_main_);
- *stuff_done_ = false;
- }
- ~Owner() {
- RTC_DCHECK(tq_main_->IsCurrent());
- flag_->SetNotAlive();
- }
-
- void DoStuff() {
- RTC_DCHECK(!tq_main_->IsCurrent());
- tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
- if (!safe->alive())
- return;
- *stuff_done_ = true;
- }));
- }
-
- private:
- TaskQueueBase* const tq_main_;
- bool* const stuff_done_;
- PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
- };
-
- std::unique_ptr<Owner> owner;
- bool stuff_done = false;
- tq1.SendTask([&owner, &stuff_done]() { owner.reset(new Owner(&stuff_done)); },
- RTC_FROM_HERE);
- ASSERT_TRUE(owner);
- // Queue up a task on tq1 that will execute before the 'DoStuff' task
- // can, and delete the |owner| before the 'stuff' task can execute.
- rtc::Event blocker;
- tq1.PostTask([&blocker, &owner]() {
- blocker.Wait(rtc::Event::kForever);
- owner.reset();
- });
-
- // Queue up a DoStuff...
- tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
-
- ASSERT_TRUE(owner);
- blocker.Set();
-
- // Run an empty task on tq1 to flush all the queued tasks.
- tq1.SendTask([]() {}, RTC_FROM_HERE);
- ASSERT_FALSE(owner);
- EXPECT_FALSE(stuff_done);
-}
-} // namespace webrtc
diff --git a/test/scenario/call_client.h b/test/scenario/call_client.h
index 33fa2765cb..803b4a8313 100644
--- a/test/scenario/call_client.h
+++ b/test/scenario/call_client.h
@@ -113,11 +113,6 @@ class CallClient : public EmulatedNetworkReceiverInterface {
void OnPacketReceived(EmulatedIpPacket packet) override;
std::unique_ptr<RtcEventLogOutput> GetLogWriter(std::string name);
- // Exposed publicly so that tests can execute tasks such as querying stats
- // for media streams in the expected runtime environment (essentially what
- // CallClient does internally for GetStats()).
- void SendTask(std::function<void()> task);
-
private:
friend class Scenario;
friend class CallClientPair;
@@ -134,6 +129,7 @@ class CallClient : public EmulatedNetworkReceiverInterface {
uint32_t GetNextAudioLocalSsrc();
uint32_t GetNextRtxSsrc();
void AddExtensions(std::vector<RtpExtension> extensions);
+ void SendTask(std::function<void()> task);
int16_t Bind(EmulatedEndpoint* endpoint);
void UnBind();
diff --git a/test/scenario/stats_collection_unittest.cc b/test/scenario/stats_collection_unittest.cc
index af3b982838..fae3365d5d 100644
--- a/test/scenario/stats_collection_unittest.cc
+++ b/test/scenario/stats_collection_unittest.cc
@@ -25,26 +25,17 @@ void CreateAnalyzedStream(Scenario* s,
VideoStreamConfig::Encoder::Implementation::kSoftware;
config.hooks.frame_pair_handlers = {analyzer->Handler()};
auto* caller = s->CreateClient("caller", CallClientConfig());
- auto* callee = s->CreateClient("callee", CallClientConfig());
auto route =
- s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)}, callee,
+ s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)},
+ s->CreateClient("callee", CallClientConfig()),
{s->CreateSimulationNode(NetworkSimulationConfig())});
- VideoStreamPair* video = s->CreateVideoStream(route->forward(), config);
+ auto* video = s->CreateVideoStream(route->forward(), config);
auto* audio = s->CreateAudioStream(route->forward(), AudioStreamConfig());
s->Every(TimeDelta::Seconds(1), [=] {
collectors->call.AddStats(caller->GetStats());
- collectors->video_send.AddStats(video->send()->GetStats(), s->Now());
collectors->audio_receive.AddStats(audio->receive()->GetStats());
-
- // Querying the video stats from within the expected runtime environment
- // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
- // we're currently on).
- VideoReceiveStream::Stats video_receive_stats;
- auto* video_stream = video->receive();
- callee->SendTask([&video_stream, &video_receive_stats]() {
- video_receive_stats = video_stream->GetStats();
- });
- collectors->video_receive.AddStats(video_receive_stats);
+ collectors->video_send.AddStats(video->send()->GetStats(), s->Now());
+ collectors->video_receive.AddStats(video->receive()->GetStats());
});
}
} // namespace
diff --git a/video/BUILD.gn b/video/BUILD.gn
index 9d26ee2c37..14109c3494 100644
--- a/video/BUILD.gn
+++ b/video/BUILD.gn
@@ -115,7 +115,6 @@ rtc_library("video") {
"../rtc_base/experiments:rate_control_settings",
"../rtc_base/synchronization:sequence_checker",
"../rtc_base/system:thread_registry",
- "../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:repeating_task",
"../rtc_base/task_utils:to_queued_task",
"../rtc_base/time:timestamp_extrapolator",
diff --git a/video/end_to_end_tests/retransmission_tests.cc b/video/end_to_end_tests/retransmission_tests.cc
index 2633cf3a67..407aa5f2dc 100644
--- a/video/end_to_end_tests/retransmission_tests.cc
+++ b/video/end_to_end_tests/retransmission_tests.cc
@@ -18,7 +18,6 @@
#include "call/simulated_network.h"
#include "modules/rtp_rtcp/source/rtp_packet.h"
#include "modules/video_coding/codecs/vp8/include/vp8.h"
-#include "rtc_base/event.h"
#include "rtc_base/task_queue_for_test.h"
#include "system_wrappers/include/sleep.h"
#include "test/call_test.h"
@@ -204,7 +203,7 @@ TEST_F(RetransmissionEndToEndTest, ReceivesNackAndRetransmitsAudio) {
TEST_F(RetransmissionEndToEndTest,
StopSendingKeyframeRequestsForInactiveStream) {
- class KeyframeRequestObserver : public test::EndToEndTest, public QueuedTask {
+ class KeyframeRequestObserver : public test::EndToEndTest {
public:
explicit KeyframeRequestObserver(TaskQueueBase* task_queue)
: clock_(Clock::GetRealTimeClock()), task_queue_(task_queue) {}
@@ -217,59 +216,28 @@ TEST_F(RetransmissionEndToEndTest,
receive_stream_ = receive_streams[0];
}
- Action OnReceiveRtcp(const uint8_t* packet, size_t length) override {
- test::RtcpPacketParser parser;
- EXPECT_TRUE(parser.Parse(packet, length));
- if (parser.pli()->num_packets() > 0)
- task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
- return SEND_PACKET;
- }
-
- bool PollStats() {
- if (receive_stream_->GetStats().frames_decoded > 0) {
- frame_decoded_ = true;
- } else if (clock_->TimeInMilliseconds() - start_time_ < 5000) {
- task_queue_->PostDelayedTask(std::unique_ptr<QueuedTask>(this), 100);
- return false;
- }
- return true;
- }
-
void PerformTest() override {
- start_time_ = clock_->TimeInMilliseconds();
- task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
- test_done_.Wait(rtc::Event::kForever);
- }
-
- bool Run() override {
- if (!frame_decoded_) {
- if (PollStats()) {
- send_stream_->Stop();
- if (!frame_decoded_) {
- test_done_.Set();
- } else {
- // Now we wait for the PLI packet. Once we receive it, a task
- // will be posted (see OnReceiveRtcp) and we'll check the stats
- // once more before signaling that we're done.
- }
+ bool frame_decoded = false;
+ int64_t start_time = clock_->TimeInMilliseconds();
+ while (clock_->TimeInMilliseconds() - start_time <= 5000) {
+ if (receive_stream_->GetStats().frames_decoded > 0) {
+ frame_decoded = true;
+ break;
}
- } else {
- EXPECT_EQ(
- 1U,
- receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
- test_done_.Set();
+ SleepMs(100);
}
- return false;
+ ASSERT_TRUE(frame_decoded);
+ SendTask(RTC_FROM_HERE, task_queue_, [this]() { send_stream_->Stop(); });
+ SleepMs(10000);
+ ASSERT_EQ(
+ 1U, receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
}
private:
- Clock* const clock_;
+ Clock* clock_;
VideoSendStream* send_stream_;
VideoReceiveStream* receive_stream_;
TaskQueueBase* const task_queue_;
- rtc::Event test_done_;
- bool frame_decoded_ = false;
- int64_t start_time_ = 0;
} test(task_queue());
RunBaseTest(&test);
diff --git a/video/end_to_end_tests/stats_tests.cc b/video/end_to_end_tests/stats_tests.cc
index 32bcedb9c8..b43f79df0a 100644
--- a/video/end_to_end_tests/stats_tests.cc
+++ b/video/end_to_end_tests/stats_tests.cc
@@ -297,7 +297,6 @@ TEST_F(StatsEndToEndTest, GetStats) {
const std::vector<VideoReceiveStream*>& receive_streams) override {
send_stream_ = send_stream;
receive_streams_ = receive_streams;
- task_queue_ = TaskQueueBase::Current();
}
void PerformTest() override {
@@ -308,10 +307,8 @@ TEST_F(StatsEndToEndTest, GetStats) {
bool send_ok = false;
while (now_ms < stop_time_ms) {
- if (!receive_ok && task_queue_) {
- SendTask(RTC_FROM_HERE, task_queue_,
- [&]() { receive_ok = CheckReceiveStats(); });
- }
+ if (!receive_ok)
+ receive_ok = CheckReceiveStats();
if (!send_ok)
send_ok = CheckSendStats();
@@ -349,7 +346,6 @@ TEST_F(StatsEndToEndTest, GetStats) {
rtc::Event check_stats_event_;
ReceiveStreamRenderer receive_stream_renderer_;
- TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@@ -381,28 +377,22 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) {
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override {
receive_streams_ = receive_streams;
- task_queue_ = TaskQueueBase::Current();
}
void PerformTest() override {
// No frames reported initially.
- SendTask(RTC_FROM_HERE, task_queue_, [&]() {
- for (const auto& receive_stream : receive_streams_) {
- EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
- }
- });
+ for (const auto& receive_stream : receive_streams_) {
+ EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
+ }
// Wait for at least one timing frame to be sent with 100ms grace period.
SleepMs(kDefaultTimingFramesDelayMs + 100);
// Check that timing frames are reported for each stream.
- SendTask(RTC_FROM_HERE, task_queue_, [&]() {
- for (const auto& receive_stream : receive_streams_) {
- EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
- }
- });
+ for (const auto& receive_stream : receive_streams_) {
+ EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
+ }
}
std::vector<VideoReceiveStream*> receive_streams_;
- TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@@ -410,8 +400,7 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) {
TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
static const size_t kNumRtpPacketsToSend = 5;
- class ReceivedRtpStatsObserver : public test::EndToEndTest,
- public QueuedTask {
+ class ReceivedRtpStatsObserver : public test::EndToEndTest {
public:
ReceivedRtpStatsObserver()
: EndToEndTest(kDefaultTimeoutMs),
@@ -423,14 +412,14 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override {
receive_stream_ = receive_streams[0];
- task_queue_ = TaskQueueBase::Current();
- EXPECT_TRUE(task_queue_ != nullptr);
}
Action OnSendRtp(const uint8_t* packet, size_t length) override {
if (sent_rtp_ >= kNumRtpPacketsToSend) {
- // Need to check the stats on the correct thread.
- task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
+ VideoReceiveStream::Stats stats = receive_stream_->GetStats();
+ if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
+ observation_complete_.Set();
+ }
return DROP_PACKET;
}
++sent_rtp_;
@@ -442,17 +431,8 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
<< "Timed out while verifying number of received RTP packets.";
}
- bool Run() override {
- VideoReceiveStream::Stats stats = receive_stream_->GetStats();
- if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
- observation_complete_.Set();
- }
- return false;
- }
-
VideoReceiveStream* receive_stream_;
uint32_t sent_rtp_;
- TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@@ -598,7 +578,7 @@ TEST_F(StatsEndToEndTest, MAYBE_ContentTypeSwitches) {
TEST_F(StatsEndToEndTest, VerifyNackStats) {
static const int kPacketNumberToDrop = 200;
- class NackObserver : public test::EndToEndTest, public QueuedTask {
+ class NackObserver : public test::EndToEndTest {
public:
NackObserver()
: EndToEndTest(kLongTimeoutMs),
@@ -618,7 +598,7 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
dropped_rtp_packet_ = header.sequenceNumber;
return DROP_PACKET;
}
- task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
+ VerifyStats();
return SEND_PACKET;
}
@@ -679,14 +659,6 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
const std::vector<VideoReceiveStream*>& receive_streams) override {
send_stream_ = send_stream;
receive_streams_ = receive_streams;
- task_queue_ = TaskQueueBase::Current();
- EXPECT_TRUE(task_queue_ != nullptr);
- }
-
- bool Run() override {
- rtc::CritScope lock(&crit_);
- VerifyStats();
- return false;
}
void PerformTest() override {
@@ -701,7 +673,6 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
std::vector<VideoReceiveStream*> receive_streams_;
VideoSendStream* send_stream_;
absl::optional<int64_t> start_runtime_ms_;
- TaskQueueBase* task_queue_ = nullptr;
} test;
metrics::Reset();
diff --git a/video/receive_statistics_proxy.cc b/video/receive_statistics_proxy.cc
index acea4e3ddc..82951c8a50 100644
--- a/video/receive_statistics_proxy.cc
+++ b/video/receive_statistics_proxy.cc
@@ -18,12 +18,10 @@
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/string_builder.h"
-#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/clock.h"
#include "system_wrappers/include/field_trial.h"
#include "system_wrappers/include/metrics.h"
-#include "video/video_receive_stream.h"
namespace webrtc {
namespace {
@@ -84,9 +82,9 @@ std::string UmaSuffixForContentType(VideoContentType content_type) {
ReceiveStatisticsProxy::ReceiveStatisticsProxy(
const VideoReceiveStream::Config* config,
- Clock* clock,
- TaskQueueBase* worker_thread)
+ Clock* clock)
: clock_(clock),
+ config_(*config),
start_ms_(clock->TimeInMilliseconds()),
enable_decode_time_histograms_(
!field_trial::IsEnabled("WebRTC-DecodeTimeHistogramsKillSwitch")),
@@ -119,53 +117,27 @@ ReceiveStatisticsProxy::ReceiveStatisticsProxy(
last_codec_type_(kVideoCodecVP8),
num_delayed_frames_rendered_(0),
sum_missed_render_deadline_ms_(0),
- timing_frame_info_counter_(kMovingMaxWindowMs),
- worker_thread_(worker_thread) {
- RTC_DCHECK(worker_thread);
- decode_queue_.Detach();
- incoming_render_queue_.Detach();
- stats_.ssrc = config->rtp.remote_ssrc;
-}
-
-ReceiveStatisticsProxy::ReceiveStatisticsProxy(
- const VideoReceiveStream::Config* config,
- Clock* clock)
- : ReceiveStatisticsProxy(config, clock, internal::GetCurrentTaskQueue()) {}
-
-ReceiveStatisticsProxy::~ReceiveStatisticsProxy() {
- RTC_DCHECK_RUN_ON(&main_thread_);
- task_safety_flag_->SetNotAlive();
+ timing_frame_info_counter_(kMovingMaxWindowMs) {
+ decode_thread_.Detach();
+ network_thread_.Detach();
+ stats_.ssrc = config_.rtp.remote_ssrc;
}
void ReceiveStatisticsProxy::UpdateHistograms(
absl::optional<int> fraction_lost,
const StreamDataCounters& rtp_stats,
const StreamDataCounters* rtx_stats) {
- {
- // TODO(webrtc:11489): Delete this scope after refactoring.
- // We're actually on the main thread here, below is the explanation for
- // why we use another thread checker. Once refactored, we can clean this
- // up and not use the decode_queue_ checker here.
- RTC_DCHECK_RUN_ON(&main_thread_);
- }
-
- // We're not actually running on the decoder thread, but must be called after
+ // Not actually running on the decoder thread, but must be called after
// DecoderThreadStopped, which detaches the thread checker. It is therefore
// safe to access |qp_counters_|, which were updated on the decode thread
// earlier.
- RTC_DCHECK_RUN_ON(&decode_queue_);
+ RTC_DCHECK_RUN_ON(&decode_thread_);
rtc::CritScope lock(&crit_);
- // TODO(webrtc:11489): Many of these variables don't need to be inside the
- // scope of a lock. Also consider grabbing the lock only to copy the state
- // that histograms need to be reported for, then report histograms while not
- // holding the lock.
char log_stream_buf[8 * 1024];
rtc::SimpleStringBuilder log_stream(log_stream_buf);
-
int stream_duration_sec = (clock_->TimeInMilliseconds() - start_ms_) / 1000;
-
if (stats_.frame_counts.key_frames > 0 ||
stats_.frame_counts.delta_frames > 0) {
RTC_HISTOGRAM_COUNTS_100000("WebRTC.Video.ReceiveStreamLifetimeInSeconds",
@@ -504,8 +476,6 @@ void ReceiveStatisticsProxy::UpdateHistograms(
}
void ReceiveStatisticsProxy::QualitySample() {
- RTC_DCHECK_RUN_ON(&incoming_render_queue_);
-
int64_t now = clock_->TimeInMilliseconds();
if (last_sample_time_ + kMinSampleLengthMs > now)
return;
@@ -575,8 +545,6 @@ void ReceiveStatisticsProxy::QualitySample() {
}
void ReceiveStatisticsProxy::UpdateFramerate(int64_t now_ms) const {
- // TODO(webrtc:11489): Currently seems to be called from two threads,
- // main and decode. Consider moving both to main.
int64_t old_frames_ms = now_ms - kRateStatisticsWindowSizeMs;
while (!frame_window_.empty() &&
frame_window_.begin()->first < old_frames_ms) {
@@ -592,9 +560,6 @@ void ReceiveStatisticsProxy::UpdateDecodeTimeHistograms(
int width,
int height,
int decode_time_ms) const {
- RTC_DCHECK_RUN_ON(&decode_queue_);
- // TODO(webrtc:11489): Consider posting the work to the worker thread.
-
bool is_4k = (width == 3840 || width == 4096) && height == 2160;
bool is_hd = width == 1920 && height == 1080;
// Only update histograms for 4k/HD and VP9/H264.
@@ -649,7 +614,6 @@ void ReceiveStatisticsProxy::UpdateDecodeTimeHistograms(
absl::optional<int64_t>
ReceiveStatisticsProxy::GetCurrentEstimatedPlayoutNtpTimestampMs(
int64_t now_ms) const {
- RTC_DCHECK_RUN_ON(&main_thread_);
if (!last_estimated_playout_ntp_timestamp_ms_ ||
!last_estimated_playout_time_ms_) {
return absl::nullopt;
@@ -659,12 +623,6 @@ ReceiveStatisticsProxy::GetCurrentEstimatedPlayoutNtpTimestampMs(
}
VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const {
- RTC_DCHECK_RUN_ON(&main_thread_);
-
- // Like VideoReceiveStream::GetStats, called on the worker thread from
- // StatsCollector::ExtractMediaInfo via worker_thread()->Invoke().
- // WebRtcVideoChannel::GetStats(), GetVideoReceiverInfo.
-
rtc::CritScope lock(&crit_);
// Get current frame rates here, as only updating them on new frames prevents
// us from ever correctly displaying frame rate of 0.
@@ -696,16 +654,12 @@ VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const {
}
void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) {
- RTC_DCHECK_RUN_ON(&decode_queue_);
rtc::CritScope lock(&crit_);
stats_.current_payload_type = payload_type;
}
void ReceiveStatisticsProxy::OnDecoderImplementationName(
const char* implementation_name) {
- RTC_DCHECK_RUN_ON(&decode_queue_);
- // TODO(webrtc:11489): is a lock needed for this variable? Currently seems to
- // be only touched on the decoder queue.
rtc::CritScope lock(&crit_);
stats_.decoder_implementation_name = implementation_name;
}
@@ -717,7 +671,6 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated(
int jitter_buffer_ms,
int min_playout_delay_ms,
int render_delay_ms) {
- RTC_DCHECK_RUN_ON(&decode_queue_);
rtc::CritScope lock(&crit_);
stats_.max_decode_ms = max_decode_ms;
stats_.current_delay_ms = current_delay_ms;
@@ -734,14 +687,12 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated(
}
void ReceiveStatisticsProxy::OnUniqueFramesCounted(int num_unique_frames) {
- RTC_DCHECK_RUN_ON(&main_thread_);
rtc::CritScope lock(&crit_);
num_unique_frames_.emplace(num_unique_frames);
}
void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated(
const TimingFrameInfo& info) {
- RTC_DCHECK_RUN_ON(&decode_queue_);
rtc::CritScope lock(&crit_);
if (info.flags != VideoSendTiming::kInvalid) {
int64_t now_ms = clock_->TimeInMilliseconds();
@@ -763,28 +714,6 @@ void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated(
void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated(
uint32_t ssrc,
const RtcpPacketTypeCounter& packet_counter) {
- if (!worker_thread_->IsCurrent()) {
- // RtpRtcp::Configuration has a single RtcpPacketTypeCounterObserver and
- // that same configuration may be used for both receiver and sender
- // (see ModuleRtpRtcpImpl::ModuleRtpRtcpImpl).
- // The RTCPSender implementation currently makes calls to this function on a
- // process thread whereas the RTCPReceiver implementation calls back on the
- // [main] worker thread.
- // So until the sender implementation has been updated, we work around this
- // here by posting the update to the expected thread. We make a by value
- // copy of the |task_safety_flag_| to handle the case if the queued task
- // runs after the |ReceiveStatisticsProxy| has been deleted. In such a
- // case the packet_counter update won't be recorded.
- worker_thread_->PostTask(ToQueuedTask(
- [safety = task_safety_flag_, ssrc, packet_counter, this]() {
- if (!safety->alive())
- return;
- RtcpPacketTypesCounterUpdated(ssrc, packet_counter);
- }));
- return;
- }
-
- RTC_DCHECK_RUN_ON(&main_thread_);
rtc::CritScope lock(&crit_);
if (stats_.ssrc != ssrc)
return;
@@ -792,7 +721,6 @@ void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated(
}
void ReceiveStatisticsProxy::OnCname(uint32_t ssrc, absl::string_view cname) {
- RTC_DCHECK_RUN_ON(&main_thread_);
rtc::CritScope lock(&crit_);
// TODO(pbos): Handle both local and remote ssrcs here and RTC_DCHECK that we
// receive stats from one of them.
@@ -805,13 +733,9 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame,
absl::optional<uint8_t> qp,
int32_t decode_time_ms,
VideoContentType content_type) {
- RTC_DCHECK_RUN_ON(&decode_queue_);
- // TODO(webrtc:11489): - Same as OnRenderedFrame. Both called from within
- // VideoStreamDecoder::FrameToRender
-
rtc::CritScope lock(&crit_);
- const uint64_t now_ms = clock_->TimeInMilliseconds();
+ uint64_t now_ms = clock_->TimeInMilliseconds();
if (videocontenttypehelpers::IsScreenshare(content_type) !=
videocontenttypehelpers::IsScreenshare(last_content_type_)) {
@@ -870,10 +794,6 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame,
}
void ReceiveStatisticsProxy::OnRenderedFrame(const VideoFrame& frame) {
- RTC_DCHECK_RUN_ON(&incoming_render_queue_);
- // TODO(webrtc:11489): Consider posting the work to the worker thread.
- // - Called from VideoReceiveStream::OnFrame.
-
int width = frame.width();
int height = frame.height();
RTC_DCHECK_GT(width, 0);
@@ -913,10 +833,7 @@ void ReceiveStatisticsProxy::OnRenderedFrame(const VideoFrame& frame) {
void ReceiveStatisticsProxy::OnSyncOffsetUpdated(int64_t video_playout_ntp_ms,
int64_t sync_offset_ms,
double estimated_freq_khz) {
- RTC_DCHECK_RUN_ON(&incoming_render_queue_);
rtc::CritScope lock(&crit_);
- // TODO(webrtc:11489): Lock possibly not needed for sync_offset_counter_ if
- // it's only touched on the decoder thread.
sync_offset_counter_.Add(std::abs(sync_offset_ms));
stats_.sync_offset_ms = sync_offset_ms;
last_estimated_playout_ntp_timestamp_ms_ = video_playout_ntp_ms;
@@ -969,7 +886,7 @@ void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) {
}
void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) {
- RTC_DCHECK_RUN_ON(&decode_queue_);
+ RTC_DCHECK_RUN_ON(&decode_thread_);
rtc::CritScope lock(&crit_);
last_codec_type_ = codec_type;
if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
@@ -979,8 +896,6 @@ void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) {
}
void ReceiveStatisticsProxy::OnStreamInactive() {
- RTC_DCHECK_RUN_ON(&decode_queue_);
-
// TODO(sprang): Figure out any other state that should be reset.
rtc::CritScope lock(&crit_);
@@ -991,13 +906,6 @@ void ReceiveStatisticsProxy::OnStreamInactive() {
void ReceiveStatisticsProxy::OnRttUpdate(int64_t avg_rtt_ms,
int64_t max_rtt_ms) {
- // TODO(webrtc:11489): Is this a duplicate of VideoReceiveStream::OnRttUpdate?
- // - looks like that runs on a/the module process thread.
- //
-
- // BUGBUG
- // Actually, it looks like this method is never called except from a unit
- // test, GetStatsReportsDecodeTimingStats.
rtc::CritScope lock(&crit_);
avg_rtt_ms_ = avg_rtt_ms;
}
@@ -1008,7 +916,7 @@ void ReceiveStatisticsProxy::DecoderThreadStarting() {
void ReceiveStatisticsProxy::DecoderThreadStopped() {
RTC_DCHECK_RUN_ON(&main_thread_);
- decode_queue_.Detach();
+ decode_thread_.Detach();
}
ReceiveStatisticsProxy::ContentSpecificStats::ContentSpecificStats()
@@ -1029,5 +937,4 @@ void ReceiveStatisticsProxy::ContentSpecificStats::Add(
frame_counts.delta_frames += other.frame_counts.delta_frames;
interframe_delay_percentiles.Add(other.interframe_delay_percentiles);
}
-
} // namespace webrtc
diff --git a/video/receive_statistics_proxy.h b/video/receive_statistics_proxy.h
index 335359b724..02043d6944 100644
--- a/video/receive_statistics_proxy.h
+++ b/video/receive_statistics_proxy.h
@@ -17,7 +17,6 @@
#include <vector>
#include "absl/types/optional.h"
-#include "api/task_queue/task_queue_base.h"
#include "call/video_receive_stream.h"
#include "modules/include/module_common_types.h"
#include "modules/video_coding/include/video_coding_defines.h"
@@ -27,8 +26,6 @@
#include "rtc_base/numerics/sample_counter.h"
#include "rtc_base/rate_statistics.h"
#include "rtc_base/rate_tracker.h"
-#include "rtc_base/synchronization/sequence_checker.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/thread_checker.h"
#include "video/quality_threshold.h"
@@ -46,13 +43,8 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback,
public CallStatsObserver {
public:
ReceiveStatisticsProxy(const VideoReceiveStream::Config* config,
- Clock* clock,
- TaskQueueBase* worker_thread);
- // TODO(webrtc:11489): Remove this ctor once all callers have been updated
- // to use the above one.
- ReceiveStatisticsProxy(const VideoReceiveStream::Config* config,
Clock* clock);
- ~ReceiveStatisticsProxy();
+ ~ReceiveStatisticsProxy() = default;
VideoReceiveStream::Stats GetStats() const;
@@ -147,6 +139,14 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback,
int64_t now_ms) const RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
Clock* const clock_;
+ // Ownership of this object lies with the owner of the ReceiveStatisticsProxy
+ // instance. Lifetime is guaranteed to outlive |this|.
+ // TODO(tommi): In practice the config_ reference is only used for accessing
+ // config_.rtp.ulpfec.ulpfec_payload_type. Instead of holding a pointer back,
+ // we could just store the value of ulpfec_payload_type and change the
+ // ReceiveStatisticsProxy() ctor to accept a const& of Config (since we'll
+ // then no longer store a pointer to the object).
+ const VideoReceiveStream::Config& config_;
const int64_t start_ms_;
const bool enable_decode_time_histograms_;
@@ -177,7 +177,7 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback,
std::map<VideoContentType, ContentSpecificStats> content_specific_stats_
RTC_GUARDED_BY(crit_);
MaxCounter freq_offset_counter_ RTC_GUARDED_BY(crit_);
- QpCounters qp_counters_ RTC_GUARDED_BY(decode_queue_);
+ QpCounters qp_counters_ RTC_GUARDED_BY(decode_thread_);
int64_t avg_rtt_ms_ RTC_GUARDED_BY(crit_);
mutable std::map<int64_t, size_t> frame_window_ RTC_GUARDED_BY(&crit_);
VideoContentType last_content_type_ RTC_GUARDED_BY(&crit_);
@@ -196,17 +196,9 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback,
RTC_GUARDED_BY(&crit_);
absl::optional<int64_t> last_estimated_playout_time_ms_
RTC_GUARDED_BY(&crit_);
-
- // The thread on which this instance is constructed and some of its main
- // methods are invoked on such as GetStats().
- TaskQueueBase* const worker_thread_;
-
- PendingTaskSafetyFlag::Pointer task_safety_flag_{
- PendingTaskSafetyFlag::Create()};
-
- SequenceChecker decode_queue_;
+ rtc::ThreadChecker decode_thread_;
+ rtc::ThreadChecker network_thread_;
rtc::ThreadChecker main_thread_;
- SequenceChecker incoming_render_queue_;
};
} // namespace webrtc
diff --git a/video/receive_statistics_proxy_unittest.cc b/video/receive_statistics_proxy_unittest.cc
index 59dca1de3d..626542c810 100644
--- a/video/receive_statistics_proxy_unittest.cc
+++ b/video/receive_statistics_proxy_unittest.cc
@@ -22,8 +22,6 @@
#include "api/video/video_frame.h"
#include "api/video/video_frame_buffer.h"
#include "api/video/video_rotation.h"
-#include "rtc_base/task_utils/to_queued_task.h"
-#include "rtc_base/thread.h"
#include "system_wrappers/include/metrics.h"
#include "test/field_trial.h"
#include "test/gtest.h"
@@ -41,63 +39,13 @@ const int kHeight = 720;
// TODO(sakal): ReceiveStatisticsProxy is lacking unittesting.
class ReceiveStatisticsProxyTest : public ::testing::Test {
public:
- ReceiveStatisticsProxyTest()
- : fake_clock_(1234),
- config_(GetTestConfig()),
- worker_thread_(&socket_server_) {
- worker_thread_.WrapCurrent();
- RTC_CHECK_EQ(webrtc::TaskQueueBase::Current(),
- static_cast<TaskQueueBase*>(&worker_thread_));
- metrics::Reset();
- statistics_proxy_.reset(
- new ReceiveStatisticsProxy(&config_, &fake_clock_, &worker_thread_));
- }
-
- ~ReceiveStatisticsProxyTest() override {
- statistics_proxy_.reset();
- worker_thread_.UnwrapCurrent();
- }
+ ReceiveStatisticsProxyTest() : fake_clock_(1234), config_(GetTestConfig()) {}
+ virtual ~ReceiveStatisticsProxyTest() {}
protected:
- class FakeSocketServer : public rtc::SocketServer {
- public:
- FakeSocketServer() = default;
- ~FakeSocketServer() = default;
-
- bool Wait(int cms, bool process_io) override {
- if (fail_next_wait_) {
- fail_next_wait_ = false;
- return false;
- }
- return true;
- }
-
- void WakeUp() override {}
-
- rtc::Socket* CreateSocket(int family, int type) override { return nullptr; }
- rtc::AsyncSocket* CreateAsyncSocket(int family, int type) override {
- return nullptr;
- }
-
- void FailNextWait() { fail_next_wait_ = true; }
-
- private:
- bool fail_next_wait_ = false;
- };
-
- class WorkerThread : public rtc::Thread {
- public:
- explicit WorkerThread(rtc::SocketServer* ss)
- : rtc::Thread(ss), tq_setter_(this) {}
-
- private:
- CurrentTaskQueueSetter tq_setter_;
- };
-
- void FlushWorker() {
- worker_thread_.PostTask(
- ToQueuedTask([this]() { socket_server_.FailNextWait(); }));
- worker_thread_.ProcessMessages(1000);
+ virtual void SetUp() {
+ metrics::Reset();
+ statistics_proxy_.reset(new ReceiveStatisticsProxy(&config_, &fake_clock_));
}
VideoReceiveStream::Config GetTestConfig() {
@@ -130,8 +78,6 @@ class ReceiveStatisticsProxyTest : public ::testing::Test {
SimulatedClock fake_clock_;
const VideoReceiveStream::Config config_;
std::unique_ptr<ReceiveStatisticsProxy> statistics_proxy_;
- FakeSocketServer socket_server_;
- WorkerThread worker_thread_;
};
TEST_F(ReceiveStatisticsProxyTest, OnDecodedFrameIncreasesFramesDecoded) {
diff --git a/video/video_quality_observer.cc b/video/video_quality_observer.cc
index e10def2d79..be7b08c887 100644
--- a/video/video_quality_observer.cc
+++ b/video/video_quality_observer.cc
@@ -49,14 +49,10 @@ VideoQualityObserver::VideoQualityObserver(VideoContentType content_type)
current_resolution_(Resolution::Low),
num_resolution_downgrades_(0),
time_in_blocky_video_ms_(0),
- // TODO(webrtc:11489): content_type_ variable isn't necessary.
content_type_(content_type),
is_paused_(false) {}
void VideoQualityObserver::UpdateHistograms() {
- // TODO(webrtc:11489): Called on the decoder thread - which _might_ be
- // the same as the construction thread.
-
// Don't report anything on an empty video stream.
if (num_frames_rendered_ == 0) {
return;
diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc
index 5d371a59dd..b2b96db9bf 100644
--- a/video/video_receive_stream.cc
+++ b/video/video_receive_stream.cc
@@ -182,13 +182,6 @@ constexpr int kInactiveStreamThresholdMs = 600000; // 10 minutes.
namespace internal {
-TaskQueueBase* GetCurrentTaskQueue() {
- TaskQueueBase* ret = TaskQueueBase::Current();
- if (!ret)
- ret = rtc::ThreadManager::Instance()->CurrentThread();
- return ret;
-}
-
VideoReceiveStream::VideoReceiveStream(
TaskQueueFactory* task_queue_factory,
RtpStreamReceiverControllerInterface* receiver_controller,
@@ -204,11 +197,10 @@ VideoReceiveStream::VideoReceiveStream(
config_(std::move(config)),
num_cpu_cores_(num_cpu_cores),
process_thread_(process_thread),
- worker_thread_(GetCurrentTaskQueue()),
clock_(clock),
call_stats_(call_stats),
source_tracker_(clock_),
- stats_proxy_(&config_, clock_, worker_thread_),
+ stats_proxy_(&config_, clock_),
rtp_receive_statistics_(ReceiveStatistics::Create(clock_)),
timing_(timing),
video_receiver_(clock_, timing_.get()),
@@ -447,7 +439,6 @@ void VideoReceiveStream::Stop() {
}
VideoReceiveStream::Stats VideoReceiveStream::GetStats() const {
- RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
VideoReceiveStream::Stats stats = stats_proxy_.GetStats();
stats.total_bitrate_bps = 0;
StreamStatistician* statistician =
@@ -466,7 +457,6 @@ VideoReceiveStream::Stats VideoReceiveStream::GetStats() const {
}
void VideoReceiveStream::UpdateHistograms() {
- RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
absl::optional<int> fraction_lost;
StreamDataCounters rtp_stats;
StreamStatistician* statistician =
@@ -503,7 +493,6 @@ bool VideoReceiveStream::SetBaseMinimumPlayoutDelayMs(int delay_ms) {
return false;
}
- // TODO(webrtc:11489): Consider posting to worker.
rtc::CritScope cs(&playout_delay_lock_);
base_minimum_playout_delay_ms_ = delay_ms;
UpdatePlayoutDelays();
@@ -517,19 +506,19 @@ int VideoReceiveStream::GetBaseMinimumPlayoutDelayMs() const {
return base_minimum_playout_delay_ms_;
}
-// TODO(webrtc:11489): This method grabs a lock 6 times.
+// TODO(tommi): This method grabs a lock 6 times.
void VideoReceiveStream::OnFrame(const VideoFrame& video_frame) {
int64_t video_playout_ntp_ms;
int64_t sync_offset_ms;
double estimated_freq_khz;
- // TODO(webrtc:11489): GetStreamSyncOffsetInMs grabs three locks. One inside
- // the function itself, another in GetChannel() and a third in
+ // TODO(tommi): GetStreamSyncOffsetInMs grabs three locks. One inside the
+ // function itself, another in GetChannel() and a third in
// GetPlayoutTimestamp. Seems excessive. Anyhow, I'm assuming the function
// succeeds most of the time, which leads to grabbing a fourth lock.
if (rtp_stream_sync_.GetStreamSyncOffsetInMs(
video_frame.timestamp(), video_frame.render_time_ms(),
&video_playout_ntp_ms, &sync_offset_ms, &estimated_freq_khz)) {
- // TODO(webrtc:11489): OnSyncOffsetUpdated grabs a lock.
+ // TODO(tommi): OnSyncOffsetUpdated grabs a lock.
stats_proxy_.OnSyncOffsetUpdated(video_playout_ntp_ms, sync_offset_ms,
estimated_freq_khz);
}
@@ -537,7 +526,7 @@ void VideoReceiveStream::OnFrame(const VideoFrame& video_frame) {
config_.renderer->OnFrame(video_frame);
- // TODO(webrtc:11489): OnRenderFrame grabs a lock too.
+ // TODO(tommi): OnRenderFrame grabs a lock too.
stats_proxy_.OnRenderedFrame(video_frame);
}
@@ -574,9 +563,6 @@ void VideoReceiveStream::OnCompleteFrame(
}
last_complete_frame_time_ms_ = time_now_ms;
- // TODO(webrtc:11489): We grab the playout_delay_lock_ lock potentially twice.
- // Consider checking both min/max and posting to worker if there's a change.
- // If we always update playout delays on the worker, we don't need a lock.
const PlayoutDelay& playout_delay = frame->EncodedImage().playout_delay_;
if (playout_delay.min_ms >= 0) {
rtc::CritScope cs(&playout_delay_lock_);
@@ -632,7 +618,6 @@ void VideoReceiveStream::SetEstimatedPlayoutNtpTimestampMs(
void VideoReceiveStream::SetMinimumPlayoutDelay(int delay_ms) {
RTC_DCHECK_RUN_ON(&module_process_sequence_checker_);
- // TODO(webrtc:11489): Consider posting to worker.
rtc::CritScope cs(&playout_delay_lock_);
syncable_minimum_playout_delay_ms_ = delay_ms;
UpdatePlayoutDelays();
@@ -667,7 +652,6 @@ void VideoReceiveStream::StartNextDecode() {
void VideoReceiveStream::HandleEncodedFrame(
std::unique_ptr<EncodedFrame> frame) {
- // Running on |decode_queue_|.
int64_t now_ms = clock_->TimeInMilliseconds();
// Current OnPreDecode only cares about QP for VP8.
@@ -722,7 +706,6 @@ void VideoReceiveStream::HandleKeyFrameGeneration(
}
void VideoReceiveStream::HandleFrameBufferTimeout() {
- // Running on |decode_queue_|.
int64_t now_ms = clock_->TimeInMilliseconds();
absl::optional<int64_t> last_packet_ms =
rtp_video_stream_receiver_.LastReceivedPacketMs();
diff --git a/video/video_receive_stream.h b/video/video_receive_stream.h
index 1cc7dd2184..c1ebf2b600 100644
--- a/video/video_receive_stream.h
+++ b/video/video_receive_stream.h
@@ -45,12 +45,6 @@ class VCMTiming;
namespace internal {
-// Utility function that fetches the TQ that's active in the current context
-// or the active rtc::Thread if no TQ is active. This is necessary at the moment
-// for VideoReceiveStream and downstream classes as tests and production don't
-// consistently follow the same procedures.
-TaskQueueBase* GetCurrentTaskQueue();
-
class VideoReceiveStream : public webrtc::VideoReceiveStream,
public rtc::VideoSinkInterface<VideoFrame>,
public NackSender,
@@ -167,7 +161,6 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream,
const VideoReceiveStream::Config config_;
const int num_cpu_cores_;
ProcessThread* const process_thread_;
- TaskQueueBase* const worker_thread_;
Clock* const clock_;
CallStats* const call_stats_;