diff options
author | Sebastian Jansson <srte@webrtc.org> | 2019-12-13 14:47:31 +0000 |
---|---|---|
committer | Commit Bot <commit-bot@chromium.org> | 2019-12-13 14:47:48 +0000 |
commit | 41466b7bef5aa0927e3724f53c556bc4019f80d4 (patch) | |
tree | 652c55c9992e70e3ef997b1a80e02c5f74356a21 /modules/congestion_controller | |
parent | 9d06bc2e6df9be069173ff1008806b45f1d5972f (diff) | |
download | webrtc-41466b7bef5aa0927e3724f53c556bc4019f80d4.tar.gz |
Revert "Extracts ssrc based feedback tracking from feedback adapter."
This reverts commit 08c46adc1e9f9a8d74357fe132a68906ae6e6974.
Reason for revert: Incomplete.
Original change's description:
> Extracts ssrc based feedback tracking from feedback adapter.
>
> This prepares for moving TransportFeedbackAdapter to TaskQueue.
>
> Bug: webrtc:9883
> Change-Id: Ib333f6a6837ff6dd8b10813e8953e4d8cd5d8633
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/162040
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Commit-Queue: Sebastian Jansson <srte@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#30076}
TBR=sprang@webrtc.org,srte@webrtc.org
Change-Id: I6a79e7627f9de2d8c876d6a13ca36f3ac06fde7f
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: webrtc:9883
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/162200
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30087}
Diffstat (limited to 'modules/congestion_controller')
7 files changed, 151 insertions, 232 deletions
diff --git a/modules/congestion_controller/rtp/BUILD.gn b/modules/congestion_controller/rtp/BUILD.gn index 38a4bf19df..36a9b257bd 100644 --- a/modules/congestion_controller/rtp/BUILD.gn +++ b/modules/congestion_controller/rtp/BUILD.gn @@ -45,8 +45,6 @@ rtc_library("transport_feedback") { sources = [ "transport_feedback_adapter.cc", "transport_feedback_adapter.h", - "transport_feedback_demuxer.cc", - "transport_feedback_demuxer.h", ] deps = [ @@ -71,7 +69,6 @@ if (rtc_include_tests) { sources = [ "transport_feedback_adapter_unittest.cc", - "transport_feedback_demuxer_unittest.cc", ] deps = [ ":transport_feedback", diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.cc b/modules/congestion_controller/rtp/transport_feedback_adapter.cc index 8783b737d9..b070b0e23a 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.cc @@ -66,6 +66,30 @@ DataSize InFlightBytesTracker::GetOutstandingData( TransportFeedbackAdapter::TransportFeedbackAdapter() = default; +TransportFeedbackAdapter::~TransportFeedbackAdapter() { + RTC_DCHECK(observers_.empty()); +} + +void TransportFeedbackAdapter::RegisterStreamFeedbackObserver( + std::vector<uint32_t> ssrcs, + StreamFeedbackObserver* observer) { + rtc::CritScope cs(&observers_lock_); + RTC_DCHECK(observer); + RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) { + return pair.second == observer; + }) == observers_.end()); + observers_.push_back({ssrcs, observer}); +} + +void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver( + StreamFeedbackObserver* observer) { + rtc::CritScope cs(&observers_lock_); + RTC_DCHECK(observer); + const auto it = absl::c_find_if( + observers_, [=](const auto& pair) { return pair.second == observer; }); + RTC_DCHECK(it != observers_.end()); + observers_.erase(it); +} void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info, size_t overhead_bytes, @@ -80,6 +104,10 @@ void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info, packet.local_net_id = local_net_id_; packet.remote_net_id = remote_net_id_; packet.sent.pacing_info = packet_info.pacing_info; + if (packet_info.has_rtp_sequence_number) { + packet.ssrc = packet_info.ssrc; + packet.rtp_sequence_number = packet_info.rtp_sequence_number; + } while (!history_.empty() && creation_time - history_.begin()->second.creation_time > @@ -140,25 +168,32 @@ TransportFeedbackAdapter::ProcessTransportFeedback( RTC_LOG(LS_INFO) << "Empty transport feedback packet received."; return absl::nullopt; } - - rtc::CritScope cs(&lock_); + std::vector<PacketFeedback> feedback_vector; TransportPacketsFeedback msg; msg.feedback_time = feedback_receive_time; + { + rtc::CritScope cs(&lock_); + msg.prior_in_flight = + in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); + feedback_vector = + ProcessTransportFeedbackInner(feedback, feedback_receive_time); + if (feedback_vector.empty()) + return absl::nullopt; - msg.prior_in_flight = - in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); - msg.packet_feedbacks = - ProcessTransportFeedbackInner(feedback, feedback_receive_time); - if (msg.packet_feedbacks.empty()) - return absl::nullopt; - - auto it = history_.find(last_ack_seq_num_); - if (it != history_.end()) { - msg.first_unacked_send_time = it->second.sent.send_time; + for (const PacketFeedback& fb : feedback_vector) { + PacketResult res; + res.sent_packet = fb.sent; + res.receive_time = fb.receive_time; + msg.packet_feedbacks.push_back(res); + } + auto it = history_.find(last_ack_seq_num_); + if (it != history_.end()) { + msg.first_unacked_send_time = it->second.sent.send_time; + } + msg.data_in_flight = + in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); } - msg.data_in_flight = - in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); - + SignalObservers(feedback_vector); return msg; } @@ -174,7 +209,7 @@ DataSize TransportFeedbackAdapter::GetOutstandingData() const { return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); } -std::vector<PacketResult> +std::vector<PacketFeedback> TransportFeedbackAdapter::ProcessTransportFeedbackInner( const rtcp::TransportFeedback& feedback, Timestamp feedback_time) { @@ -190,8 +225,8 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner( } last_timestamp_ = feedback.GetBaseTime(); - std::vector<PacketResult> packet_result_vector; - packet_result_vector.reserve(feedback.GetPacketStatusCount()); + std::vector<PacketFeedback> packet_feedback_vector; + packet_feedback_vector.reserve(feedback.GetPacketStatusCount()); size_t failed_lookups = 0; size_t ignored = 0; @@ -234,10 +269,7 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner( } if (packet_feedback.local_net_id == local_net_id_ && packet_feedback.remote_net_id == remote_net_id_) { - PacketResult result; - result.sent_packet = packet_feedback.sent; - result.receive_time = packet_feedback.receive_time; - packet_result_vector.push_back(result); + packet_feedback_vector.push_back(packet_feedback); } else { ++ignored; } @@ -253,7 +285,27 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner( << " packets because they were sent on a different route."; } - return packet_result_vector; + return packet_feedback_vector; +} + +void TransportFeedbackAdapter::SignalObservers( + const std::vector<PacketFeedback>& feedback_vector) { + rtc::CritScope cs(&observers_lock_); + for (auto& observer : observers_) { + std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback; + for (const auto& packet : feedback_vector) { + if (packet.ssrc && absl::c_count(observer.first, *packet.ssrc) > 0) { + StreamFeedbackObserver::StreamPacketInfo packet_info; + packet_info.ssrc = *packet.ssrc; + packet_info.rtp_sequence_number = packet.rtp_sequence_number; + packet_info.received = packet.receive_time.IsFinite(); + selected_feedback.push_back(packet_info); + } + } + if (!selected_feedback.empty()) { + observer.second->OnPacketFeedbackVector(std::move(selected_feedback)); + } + } } } // namespace webrtc diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.h b/modules/congestion_controller/rtp/transport_feedback_adapter.h index b6bed96711..699c6ed489 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.h +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.h @@ -38,6 +38,9 @@ struct PacketFeedback { // The network route ids that this packet is associated with. uint16_t local_net_id = 0; uint16_t remote_net_id = 0; + // The SSRC and RTP sequence number of the packet this feedback refers to. + absl::optional<uint32_t> ssrc; + uint16_t rtp_sequence_number = 0; }; class InFlightBytesTracker { @@ -52,9 +55,16 @@ class InFlightBytesTracker { std::map<RemoteAndLocalNetworkId, DataSize> in_flight_data_; }; -class TransportFeedbackAdapter { +class TransportFeedbackAdapter : public StreamFeedbackProvider { public: TransportFeedbackAdapter(); + virtual ~TransportFeedbackAdapter(); + + void RegisterStreamFeedbackObserver( + std::vector<uint32_t> ssrcs, + StreamFeedbackObserver* observer) override; + void DeRegisterStreamFeedbackObserver( + StreamFeedbackObserver* observer) override; void AddPacket(const RtpPacketSendInfo& packet_info, size_t overhead_bytes, @@ -73,10 +83,15 @@ class TransportFeedbackAdapter { private: enum class SendTimeHistoryStatus { kNotAdded, kOk, kDuplicate }; - std::vector<PacketResult> ProcessTransportFeedbackInner( + void OnTransportFeedback(const rtcp::TransportFeedback& feedback); + + std::vector<PacketFeedback> ProcessTransportFeedbackInner( const rtcp::TransportFeedback& feedback, Timestamp feedback_time) RTC_RUN_ON(&lock_); + void SignalObservers( + const std::vector<PacketFeedback>& packet_feedback_vector); + rtc::CriticalSection lock_; DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero(); Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); @@ -95,6 +110,13 @@ class TransportFeedbackAdapter { uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0; uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0; + + rtc::CriticalSection observers_lock_; + // Maps a set of ssrcs to corresponding observer. Vectors are used rather than + // set/map to ensure that the processing order is consistent independently of + // the randomized ssrcs. + std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>> + observers_ RTC_GUARDED_BY(&observers_lock_); }; } // namespace webrtc diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc b/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc index a95f866630..e03bcc3750 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc @@ -126,6 +126,58 @@ class TransportFeedbackAdapterTest : public ::testing::Test { std::unique_ptr<TransportFeedbackAdapter> adapter_; }; +TEST_F(TransportFeedbackAdapterTest, ObserverSanity) { + MockStreamFeedbackObserver mock; + adapter_->RegisterStreamFeedbackObserver({kSsrc}, &mock); + + const std::vector<PacketResult> packets = { + CreatePacket(100, 200, 0, 1000, kPacingInfo0), + CreatePacket(110, 210, 1, 2000, kPacingInfo0), + CreatePacket(120, 220, 2, 3000, kPacingInfo0)}; + + rtcp::TransportFeedback feedback; + feedback.SetBase(packets[0].sent_packet.sequence_number, + packets[0].receive_time.us()); + + for (const auto& packet : packets) { + OnSentPacket(packet); + EXPECT_TRUE(feedback.AddReceivedPacket(packet.sent_packet.sequence_number, + packet.receive_time.us())); + } + + EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1); + adapter_->ProcessTransportFeedback(feedback, clock_.CurrentTime()); + + adapter_->DeRegisterStreamFeedbackObserver(&mock); + + auto new_packet = CreatePacket(130, 230, 3, 4000, kPacingInfo0); + OnSentPacket(new_packet); + + rtcp::TransportFeedback second_feedback; + second_feedback.SetBase(new_packet.sent_packet.sequence_number, + new_packet.receive_time.us()); + EXPECT_TRUE(second_feedback.AddReceivedPacket( + new_packet.sent_packet.sequence_number, new_packet.receive_time.us())); + EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0); + adapter_->ProcessTransportFeedback(second_feedback, clock_.CurrentTime()); +} + +#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) +TEST_F(TransportFeedbackAdapterTest, ObserverDoubleRegistrationDeathTest) { + MockStreamFeedbackObserver mock; + adapter_->RegisterStreamFeedbackObserver({0}, &mock); + EXPECT_DEATH(adapter_->RegisterStreamFeedbackObserver({0}, &mock), ""); + adapter_->DeRegisterStreamFeedbackObserver(&mock); +} + +TEST_F(TransportFeedbackAdapterTest, ObserverMissingDeRegistrationDeathTest) { + MockStreamFeedbackObserver mock; + adapter_->RegisterStreamFeedbackObserver({0}, &mock); + EXPECT_DEATH(adapter_.reset(), ""); + adapter_->DeRegisterStreamFeedbackObserver(&mock); +} +#endif + TEST_F(TransportFeedbackAdapterTest, AdaptsFeedbackAndPopulatesSendTimes) { std::vector<PacketResult> packets; packets.push_back(CreatePacket(100, 200, 0, 1500, kPacingInfo0)); diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc deleted file mode 100644 index c7893d71a9..0000000000 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ -#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h" -#include "absl/algorithm/container.h" -#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" - -namespace webrtc { -namespace { -static const size_t kMaxPacketsInHistory = 5000; -} -void TransportFeedbackDemuxer::RegisterStreamFeedbackObserver( - std::vector<uint32_t> ssrcs, - StreamFeedbackObserver* observer) { - rtc::CritScope cs(&observers_lock_); - RTC_DCHECK(observer); - RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) { - return pair.second == observer; - }) == observers_.end()); - observers_.push_back({ssrcs, observer}); -} - -void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver( - StreamFeedbackObserver* observer) { - rtc::CritScope cs(&observers_lock_); - RTC_DCHECK(observer); - const auto it = absl::c_find_if( - observers_, [=](const auto& pair) { return pair.second == observer; }); - RTC_DCHECK(it != observers_.end()); - observers_.erase(it); -} - -void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { - rtc::CritScope cs(&lock_); - if (packet_info.has_rtp_sequence_number && packet_info.ssrc != 0) { - StreamFeedbackObserver::StreamPacketInfo info; - info.ssrc = packet_info.ssrc; - info.rtp_sequence_number = packet_info.rtp_sequence_number; - info.received = false; - history_.insert( - {seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number), - info}); - } - while (history_.size() > kMaxPacketsInHistory) { - history_.erase(history_.begin()); - } -} - -void TransportFeedbackDemuxer::OnTransportFeedback( - const rtcp::TransportFeedback& feedback) { - std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks; - { - rtc::CritScope cs(&lock_); - for (const auto& packet : feedback.GetAllPackets()) { - int64_t seq_num = - seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); - auto it = history_.find(seq_num); - if (it != history_.end()) { - auto packet_info = it->second; - packet_info.received = packet.received(); - stream_feedbacks.push_back(packet_info); - if (packet.received()) - history_.erase(it); - } - } - } - - rtc::CritScope cs(&observers_lock_); - for (auto& observer : observers_) { - std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback; - for (const auto& packet_info : stream_feedbacks) { - if (absl::c_count(observer.first, packet_info.ssrc) > 0) { - selected_feedback.push_back(packet_info); - } - } - if (!selected_feedback.empty()) { - observer.second->OnPacketFeedbackVector(std::move(selected_feedback)); - } - } -} - -} // namespace webrtc diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h deleted file mode 100644 index bcd25d5835..0000000000 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ -#ifndef MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_ -#define MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_ - -#include <map> -#include <utility> -#include <vector> - -#include "modules/include/module_common_types_public.h" -#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" -#include "rtc_base/critical_section.h" - -namespace webrtc { - -class TransportFeedbackDemuxer : public StreamFeedbackProvider { - public: - // Implements StreamFeedbackProvider interface - void RegisterStreamFeedbackObserver( - std::vector<uint32_t> ssrcs, - StreamFeedbackObserver* observer) override; - void DeRegisterStreamFeedbackObserver( - StreamFeedbackObserver* observer) override; - void AddPacket(const RtpPacketSendInfo& packet_info); - void OnTransportFeedback(const rtcp::TransportFeedback& feedback); - - private: - rtc::CriticalSection lock_; - SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_); - std::map<int64_t, StreamFeedbackObserver::StreamPacketInfo> history_ - RTC_GUARDED_BY(&lock_); - - // Maps a set of ssrcs to corresponding observer. Vectors are used rather than - // set/map to ensure that the processing order is consistent independently of - // the randomized ssrcs. - rtc::CriticalSection observers_lock_; - std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>> - observers_ RTC_GUARDED_BY(&observers_lock_); -}; -} // namespace webrtc - -#endif // MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_ diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer_unittest.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer_unittest.cc deleted file mode 100644 index 144e3e135d..0000000000 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer_unittest.cc +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ -#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h" - -#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" -#include "test/gmock.h" -#include "test/gtest.h" - -namespace webrtc { -namespace { - -using ::testing::_; -static constexpr uint32_t kSsrc = 8492; - -class MockStreamFeedbackObserver : public webrtc::StreamFeedbackObserver { - public: - MOCK_METHOD1(OnPacketFeedbackVector, - void(std::vector<StreamPacketInfo> packet_feedback_vector)); -}; - -RtpPacketSendInfo CreatePacket(uint32_t ssrc, - int16_t rtp_sequence_number, - int64_t transport_sequence_number) { - RtpPacketSendInfo res; - res.ssrc = ssrc; - res.transport_sequence_number = transport_sequence_number; - res.rtp_sequence_number = rtp_sequence_number; - res.has_rtp_sequence_number = true; - return res; -} -} // namespace -TEST(TransportFeedbackDemuxerTest, ObserverSanity) { - TransportFeedbackDemuxer demuxer; - MockStreamFeedbackObserver mock; - demuxer.RegisterStreamFeedbackObserver({kSsrc}, &mock); - - demuxer.AddPacket(CreatePacket(kSsrc, 55, 1)); - demuxer.AddPacket(CreatePacket(kSsrc, 56, 2)); - demuxer.AddPacket(CreatePacket(kSsrc, 57, 3)); - - rtcp::TransportFeedback feedback; - feedback.SetBase(1, 1000); - ASSERT_TRUE(feedback.AddReceivedPacket(1, 1000)); - ASSERT_TRUE(feedback.AddReceivedPacket(2, 2000)); - ASSERT_TRUE(feedback.AddReceivedPacket(3, 3000)); - - EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1); - demuxer.OnTransportFeedback(feedback); - - demuxer.DeRegisterStreamFeedbackObserver(&mock); - - demuxer.AddPacket(CreatePacket(kSsrc, 58, 4)); - rtcp::TransportFeedback second_feedback; - second_feedback.SetBase(4, 4000); - ASSERT_TRUE(second_feedback.AddReceivedPacket(4, 4000)); - - EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0); - demuxer.OnTransportFeedback(second_feedback); -} -} // namespace webrtc |