diff options
author | Sebastian Jansson <srte@webrtc.org> | 2019-12-12 17:22:23 +0100 |
---|---|---|
committer | Commit Bot <commit-bot@chromium.org> | 2019-12-12 18:25:25 +0000 |
commit | 08c46adc1e9f9a8d74357fe132a68906ae6e6974 (patch) | |
tree | 5bbe18640471c401b1e8e31d7145a052b304693d /modules/congestion_controller | |
parent | 3a7e8b040311b8335e880fdd30d66dc2f12cbf7a (diff) | |
download | webrtc-08c46adc1e9f9a8d74357fe132a68906ae6e6974.tar.gz |
Extracts ssrc based feedback tracking from feedback adapter.
This prepares for moving TransportFeedbackAdapter to TaskQueue.
Bug: webrtc:9883
Change-Id: Ib333f6a6837ff6dd8b10813e8953e4d8cd5d8633
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/162040
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30076}
Diffstat (limited to 'modules/congestion_controller')
7 files changed, 232 insertions, 151 deletions
diff --git a/modules/congestion_controller/rtp/BUILD.gn b/modules/congestion_controller/rtp/BUILD.gn index 36a9b257bd..38a4bf19df 100644 --- a/modules/congestion_controller/rtp/BUILD.gn +++ b/modules/congestion_controller/rtp/BUILD.gn @@ -45,6 +45,8 @@ rtc_library("transport_feedback") { sources = [ "transport_feedback_adapter.cc", "transport_feedback_adapter.h", + "transport_feedback_demuxer.cc", + "transport_feedback_demuxer.h", ] deps = [ @@ -69,6 +71,7 @@ if (rtc_include_tests) { sources = [ "transport_feedback_adapter_unittest.cc", + "transport_feedback_demuxer_unittest.cc", ] deps = [ ":transport_feedback", diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.cc b/modules/congestion_controller/rtp/transport_feedback_adapter.cc index b070b0e23a..8783b737d9 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.cc @@ -66,30 +66,6 @@ DataSize InFlightBytesTracker::GetOutstandingData( TransportFeedbackAdapter::TransportFeedbackAdapter() = default; -TransportFeedbackAdapter::~TransportFeedbackAdapter() { - RTC_DCHECK(observers_.empty()); -} - -void TransportFeedbackAdapter::RegisterStreamFeedbackObserver( - std::vector<uint32_t> ssrcs, - StreamFeedbackObserver* observer) { - rtc::CritScope cs(&observers_lock_); - RTC_DCHECK(observer); - RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) { - return pair.second == observer; - }) == observers_.end()); - observers_.push_back({ssrcs, observer}); -} - -void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver( - StreamFeedbackObserver* observer) { - rtc::CritScope cs(&observers_lock_); - RTC_DCHECK(observer); - const auto it = absl::c_find_if( - observers_, [=](const auto& pair) { return pair.second == observer; }); - RTC_DCHECK(it != observers_.end()); - observers_.erase(it); -} void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info, size_t overhead_bytes, @@ -104,10 +80,6 @@ void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info, packet.local_net_id = local_net_id_; packet.remote_net_id = remote_net_id_; packet.sent.pacing_info = packet_info.pacing_info; - if (packet_info.has_rtp_sequence_number) { - packet.ssrc = packet_info.ssrc; - packet.rtp_sequence_number = packet_info.rtp_sequence_number; - } while (!history_.empty() && creation_time - history_.begin()->second.creation_time > @@ -168,32 +140,25 @@ TransportFeedbackAdapter::ProcessTransportFeedback( RTC_LOG(LS_INFO) << "Empty transport feedback packet received."; return absl::nullopt; } - std::vector<PacketFeedback> feedback_vector; + + rtc::CritScope cs(&lock_); TransportPacketsFeedback msg; msg.feedback_time = feedback_receive_time; - { - rtc::CritScope cs(&lock_); - msg.prior_in_flight = - in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); - feedback_vector = - ProcessTransportFeedbackInner(feedback, feedback_receive_time); - if (feedback_vector.empty()) - return absl::nullopt; - for (const PacketFeedback& fb : feedback_vector) { - PacketResult res; - res.sent_packet = fb.sent; - res.receive_time = fb.receive_time; - msg.packet_feedbacks.push_back(res); - } - auto it = history_.find(last_ack_seq_num_); - if (it != history_.end()) { - msg.first_unacked_send_time = it->second.sent.send_time; - } - msg.data_in_flight = - in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); + msg.prior_in_flight = + in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); + msg.packet_feedbacks = + ProcessTransportFeedbackInner(feedback, feedback_receive_time); + if (msg.packet_feedbacks.empty()) + return absl::nullopt; + + auto it = history_.find(last_ack_seq_num_); + if (it != history_.end()) { + msg.first_unacked_send_time = it->second.sent.send_time; } - SignalObservers(feedback_vector); + msg.data_in_flight = + in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); + return msg; } @@ -209,7 +174,7 @@ DataSize TransportFeedbackAdapter::GetOutstandingData() const { return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); } -std::vector<PacketFeedback> +std::vector<PacketResult> TransportFeedbackAdapter::ProcessTransportFeedbackInner( const rtcp::TransportFeedback& feedback, Timestamp feedback_time) { @@ -225,8 +190,8 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner( } last_timestamp_ = feedback.GetBaseTime(); - std::vector<PacketFeedback> packet_feedback_vector; - packet_feedback_vector.reserve(feedback.GetPacketStatusCount()); + std::vector<PacketResult> packet_result_vector; + packet_result_vector.reserve(feedback.GetPacketStatusCount()); size_t failed_lookups = 0; size_t ignored = 0; @@ -269,7 +234,10 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner( } if (packet_feedback.local_net_id == local_net_id_ && packet_feedback.remote_net_id == remote_net_id_) { - packet_feedback_vector.push_back(packet_feedback); + PacketResult result; + result.sent_packet = packet_feedback.sent; + result.receive_time = packet_feedback.receive_time; + packet_result_vector.push_back(result); } else { ++ignored; } @@ -285,27 +253,7 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner( << " packets because they were sent on a different route."; } - return packet_feedback_vector; -} - -void TransportFeedbackAdapter::SignalObservers( - const std::vector<PacketFeedback>& feedback_vector) { - rtc::CritScope cs(&observers_lock_); - for (auto& observer : observers_) { - std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback; - for (const auto& packet : feedback_vector) { - if (packet.ssrc && absl::c_count(observer.first, *packet.ssrc) > 0) { - StreamFeedbackObserver::StreamPacketInfo packet_info; - packet_info.ssrc = *packet.ssrc; - packet_info.rtp_sequence_number = packet.rtp_sequence_number; - packet_info.received = packet.receive_time.IsFinite(); - selected_feedback.push_back(packet_info); - } - } - if (!selected_feedback.empty()) { - observer.second->OnPacketFeedbackVector(std::move(selected_feedback)); - } - } + return packet_result_vector; } } // namespace webrtc diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.h b/modules/congestion_controller/rtp/transport_feedback_adapter.h index 699c6ed489..b6bed96711 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.h +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.h @@ -38,9 +38,6 @@ struct PacketFeedback { // The network route ids that this packet is associated with. uint16_t local_net_id = 0; uint16_t remote_net_id = 0; - // The SSRC and RTP sequence number of the packet this feedback refers to. - absl::optional<uint32_t> ssrc; - uint16_t rtp_sequence_number = 0; }; class InFlightBytesTracker { @@ -55,16 +52,9 @@ class InFlightBytesTracker { std::map<RemoteAndLocalNetworkId, DataSize> in_flight_data_; }; -class TransportFeedbackAdapter : public StreamFeedbackProvider { +class TransportFeedbackAdapter { public: TransportFeedbackAdapter(); - virtual ~TransportFeedbackAdapter(); - - void RegisterStreamFeedbackObserver( - std::vector<uint32_t> ssrcs, - StreamFeedbackObserver* observer) override; - void DeRegisterStreamFeedbackObserver( - StreamFeedbackObserver* observer) override; void AddPacket(const RtpPacketSendInfo& packet_info, size_t overhead_bytes, @@ -83,15 +73,10 @@ class TransportFeedbackAdapter : public StreamFeedbackProvider { private: enum class SendTimeHistoryStatus { kNotAdded, kOk, kDuplicate }; - void OnTransportFeedback(const rtcp::TransportFeedback& feedback); - - std::vector<PacketFeedback> ProcessTransportFeedbackInner( + std::vector<PacketResult> ProcessTransportFeedbackInner( const rtcp::TransportFeedback& feedback, Timestamp feedback_time) RTC_RUN_ON(&lock_); - void SignalObservers( - const std::vector<PacketFeedback>& packet_feedback_vector); - rtc::CriticalSection lock_; DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero(); Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); @@ -110,13 +95,6 @@ class TransportFeedbackAdapter : public StreamFeedbackProvider { uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0; uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0; - - rtc::CriticalSection observers_lock_; - // Maps a set of ssrcs to corresponding observer. Vectors are used rather than - // set/map to ensure that the processing order is consistent independently of - // the randomized ssrcs. - std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>> - observers_ RTC_GUARDED_BY(&observers_lock_); }; } // namespace webrtc diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc b/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc index e03bcc3750..a95f866630 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc @@ -126,58 +126,6 @@ class TransportFeedbackAdapterTest : public ::testing::Test { std::unique_ptr<TransportFeedbackAdapter> adapter_; }; -TEST_F(TransportFeedbackAdapterTest, ObserverSanity) { - MockStreamFeedbackObserver mock; - adapter_->RegisterStreamFeedbackObserver({kSsrc}, &mock); - - const std::vector<PacketResult> packets = { - CreatePacket(100, 200, 0, 1000, kPacingInfo0), - CreatePacket(110, 210, 1, 2000, kPacingInfo0), - CreatePacket(120, 220, 2, 3000, kPacingInfo0)}; - - rtcp::TransportFeedback feedback; - feedback.SetBase(packets[0].sent_packet.sequence_number, - packets[0].receive_time.us()); - - for (const auto& packet : packets) { - OnSentPacket(packet); - EXPECT_TRUE(feedback.AddReceivedPacket(packet.sent_packet.sequence_number, - packet.receive_time.us())); - } - - EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1); - adapter_->ProcessTransportFeedback(feedback, clock_.CurrentTime()); - - adapter_->DeRegisterStreamFeedbackObserver(&mock); - - auto new_packet = CreatePacket(130, 230, 3, 4000, kPacingInfo0); - OnSentPacket(new_packet); - - rtcp::TransportFeedback second_feedback; - second_feedback.SetBase(new_packet.sent_packet.sequence_number, - new_packet.receive_time.us()); - EXPECT_TRUE(second_feedback.AddReceivedPacket( - new_packet.sent_packet.sequence_number, new_packet.receive_time.us())); - EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0); - adapter_->ProcessTransportFeedback(second_feedback, clock_.CurrentTime()); -} - -#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) -TEST_F(TransportFeedbackAdapterTest, ObserverDoubleRegistrationDeathTest) { - MockStreamFeedbackObserver mock; - adapter_->RegisterStreamFeedbackObserver({0}, &mock); - EXPECT_DEATH(adapter_->RegisterStreamFeedbackObserver({0}, &mock), ""); - adapter_->DeRegisterStreamFeedbackObserver(&mock); -} - -TEST_F(TransportFeedbackAdapterTest, ObserverMissingDeRegistrationDeathTest) { - MockStreamFeedbackObserver mock; - adapter_->RegisterStreamFeedbackObserver({0}, &mock); - EXPECT_DEATH(adapter_.reset(), ""); - adapter_->DeRegisterStreamFeedbackObserver(&mock); -} -#endif - TEST_F(TransportFeedbackAdapterTest, AdaptsFeedbackAndPopulatesSendTimes) { std::vector<PacketResult> packets; packets.push_back(CreatePacket(100, 200, 0, 1500, kPacingInfo0)); diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc new file mode 100644 index 0000000000..c7893d71a9 --- /dev/null +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h" +#include "absl/algorithm/container.h" +#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" + +namespace webrtc { +namespace { +static const size_t kMaxPacketsInHistory = 5000; +} +void TransportFeedbackDemuxer::RegisterStreamFeedbackObserver( + std::vector<uint32_t> ssrcs, + StreamFeedbackObserver* observer) { + rtc::CritScope cs(&observers_lock_); + RTC_DCHECK(observer); + RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) { + return pair.second == observer; + }) == observers_.end()); + observers_.push_back({ssrcs, observer}); +} + +void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver( + StreamFeedbackObserver* observer) { + rtc::CritScope cs(&observers_lock_); + RTC_DCHECK(observer); + const auto it = absl::c_find_if( + observers_, [=](const auto& pair) { return pair.second == observer; }); + RTC_DCHECK(it != observers_.end()); + observers_.erase(it); +} + +void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { + rtc::CritScope cs(&lock_); + if (packet_info.has_rtp_sequence_number && packet_info.ssrc != 0) { + StreamFeedbackObserver::StreamPacketInfo info; + info.ssrc = packet_info.ssrc; + info.rtp_sequence_number = packet_info.rtp_sequence_number; + info.received = false; + history_.insert( + {seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number), + info}); + } + while (history_.size() > kMaxPacketsInHistory) { + history_.erase(history_.begin()); + } +} + +void TransportFeedbackDemuxer::OnTransportFeedback( + const rtcp::TransportFeedback& feedback) { + std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks; + { + rtc::CritScope cs(&lock_); + for (const auto& packet : feedback.GetAllPackets()) { + int64_t seq_num = + seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); + auto it = history_.find(seq_num); + if (it != history_.end()) { + auto packet_info = it->second; + packet_info.received = packet.received(); + stream_feedbacks.push_back(packet_info); + if (packet.received()) + history_.erase(it); + } + } + } + + rtc::CritScope cs(&observers_lock_); + for (auto& observer : observers_) { + std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback; + for (const auto& packet_info : stream_feedbacks) { + if (absl::c_count(observer.first, packet_info.ssrc) > 0) { + selected_feedback.push_back(packet_info); + } + } + if (!selected_feedback.empty()) { + observer.second->OnPacketFeedbackVector(std::move(selected_feedback)); + } + } +} + +} // namespace webrtc diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h new file mode 100644 index 0000000000..bcd25d5835 --- /dev/null +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_ +#define MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_ + +#include <map> +#include <utility> +#include <vector> + +#include "modules/include/module_common_types_public.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "rtc_base/critical_section.h" + +namespace webrtc { + +class TransportFeedbackDemuxer : public StreamFeedbackProvider { + public: + // Implements StreamFeedbackProvider interface + void RegisterStreamFeedbackObserver( + std::vector<uint32_t> ssrcs, + StreamFeedbackObserver* observer) override; + void DeRegisterStreamFeedbackObserver( + StreamFeedbackObserver* observer) override; + void AddPacket(const RtpPacketSendInfo& packet_info); + void OnTransportFeedback(const rtcp::TransportFeedback& feedback); + + private: + rtc::CriticalSection lock_; + SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_); + std::map<int64_t, StreamFeedbackObserver::StreamPacketInfo> history_ + RTC_GUARDED_BY(&lock_); + + // Maps a set of ssrcs to corresponding observer. Vectors are used rather than + // set/map to ensure that the processing order is consistent independently of + // the randomized ssrcs. + rtc::CriticalSection observers_lock_; + std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>> + observers_ RTC_GUARDED_BY(&observers_lock_); +}; +} // namespace webrtc + +#endif // MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_ diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer_unittest.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer_unittest.cc new file mode 100644 index 0000000000..144e3e135d --- /dev/null +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer_unittest.cc @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h" + +#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" +#include "test/gmock.h" +#include "test/gtest.h" + +namespace webrtc { +namespace { + +using ::testing::_; +static constexpr uint32_t kSsrc = 8492; + +class MockStreamFeedbackObserver : public webrtc::StreamFeedbackObserver { + public: + MOCK_METHOD1(OnPacketFeedbackVector, + void(std::vector<StreamPacketInfo> packet_feedback_vector)); +}; + +RtpPacketSendInfo CreatePacket(uint32_t ssrc, + int16_t rtp_sequence_number, + int64_t transport_sequence_number) { + RtpPacketSendInfo res; + res.ssrc = ssrc; + res.transport_sequence_number = transport_sequence_number; + res.rtp_sequence_number = rtp_sequence_number; + res.has_rtp_sequence_number = true; + return res; +} +} // namespace +TEST(TransportFeedbackDemuxerTest, ObserverSanity) { + TransportFeedbackDemuxer demuxer; + MockStreamFeedbackObserver mock; + demuxer.RegisterStreamFeedbackObserver({kSsrc}, &mock); + + demuxer.AddPacket(CreatePacket(kSsrc, 55, 1)); + demuxer.AddPacket(CreatePacket(kSsrc, 56, 2)); + demuxer.AddPacket(CreatePacket(kSsrc, 57, 3)); + + rtcp::TransportFeedback feedback; + feedback.SetBase(1, 1000); + ASSERT_TRUE(feedback.AddReceivedPacket(1, 1000)); + ASSERT_TRUE(feedback.AddReceivedPacket(2, 2000)); + ASSERT_TRUE(feedback.AddReceivedPacket(3, 3000)); + + EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1); + demuxer.OnTransportFeedback(feedback); + + demuxer.DeRegisterStreamFeedbackObserver(&mock); + + demuxer.AddPacket(CreatePacket(kSsrc, 58, 4)); + rtcp::TransportFeedback second_feedback; + second_feedback.SetBase(4, 4000); + ASSERT_TRUE(second_feedback.AddReceivedPacket(4, 4000)); + + EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0); + demuxer.OnTransportFeedback(second_feedback); +} +} // namespace webrtc |