diff options
Diffstat (limited to 'net/dcsctp/tx')
-rw-r--r-- | net/dcsctp/tx/BUILD.gn | 141 | ||||
-rw-r--r-- | net/dcsctp/tx/mock_send_queue.h | 60 | ||||
-rw-r--r-- | net/dcsctp/tx/retransmission_error_counter.cc | 37 | ||||
-rw-r--r-- | net/dcsctp/tx/retransmission_error_counter.h | 51 | ||||
-rw-r--r-- | net/dcsctp/tx/retransmission_error_counter_test.cc | 76 | ||||
-rw-r--r-- | net/dcsctp/tx/retransmission_queue.cc | 889 | ||||
-rw-r--r-- | net/dcsctp/tx/retransmission_queue.h | 371 | ||||
-rw-r--r-- | net/dcsctp/tx/retransmission_queue_test.cc | 1007 | ||||
-rw-r--r-- | net/dcsctp/tx/retransmission_timeout.cc | 69 | ||||
-rw-r--r-- | net/dcsctp/tx/retransmission_timeout.h | 58 | ||||
-rw-r--r-- | net/dcsctp/tx/retransmission_timeout_test.cc | 151 | ||||
-rw-r--r-- | net/dcsctp/tx/rr_send_queue.cc | 432 | ||||
-rw-r--r-- | net/dcsctp/tx/rr_send_queue.h | 238 | ||||
-rw-r--r-- | net/dcsctp/tx/rr_send_queue_test.cc | 742 | ||||
-rw-r--r-- | net/dcsctp/tx/send_queue.h | 128 |
15 files changed, 4450 insertions, 0 deletions
diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn new file mode 100644 index 0000000000..2f0b27afc6 --- /dev/null +++ b/net/dcsctp/tx/BUILD.gn @@ -0,0 +1,141 @@ +# Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. +# +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file in the root of the source +# tree. An additional intellectual property rights grant can be found +# in the file PATENTS. All contributing project authors may +# be found in the AUTHORS file in the root of the source tree. + +import("../../../webrtc.gni") + +rtc_source_set("send_queue") { + deps = [ + "../../../api:array_view", + "../common:internal_types", + "../packet:chunk", + "../packet:data", + "../public:types", + ] + sources = [ "send_queue.h" ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] +} + +rtc_library("rr_send_queue") { + deps = [ + ":send_queue", + "../../../api:array_view", + "../../../rtc_base:checks", + "../../../rtc_base:rtc_base_approved", + "../common:pair_hash", + "../packet:data", + "../public:socket", + "../public:types", + ] + sources = [ + "rr_send_queue.cc", + "rr_send_queue.h", + ] + absl_deps = [ + "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/strings", + "//third_party/abseil-cpp/absl/types:optional", + ] +} + +rtc_library("retransmission_error_counter") { + deps = [ + "../../../rtc_base:checks", + "../../../rtc_base:rtc_base_approved", + "../public:types", + ] + sources = [ + "retransmission_error_counter.cc", + "retransmission_error_counter.h", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] +} + +rtc_library("retransmission_timeout") { + deps = [ + "../../../rtc_base:checks", + "../../../rtc_base:rtc_base_approved", + "../public:types", + ] + sources = [ + "retransmission_timeout.cc", + "retransmission_timeout.h", + ] +} + +rtc_library("retransmission_queue") { + deps = [ + ":retransmission_timeout", + ":send_queue", + "../../../api:array_view", + "../../../rtc_base:checks", + "../../../rtc_base:rtc_base_approved", + "../common:math", + "../common:pair_hash", + "../common:sequence_numbers", + "../common:str_join", + "../packet:chunk", + "../packet:data", + "../public:types", + "../timer", + ] + sources = [ + "retransmission_queue.cc", + "retransmission_queue.h", + ] + absl_deps = [ + "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/strings", + "//third_party/abseil-cpp/absl/types:optional", + ] +} + +if (rtc_include_tests) { + rtc_source_set("mock_send_queue") { + testonly = true + deps = [ + ":send_queue", + "../../../api:array_view", + "../../../test:test_support", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] + sources = [ "mock_send_queue.h" ] + } + + rtc_library("dcsctp_tx_unittests") { + testonly = true + + deps = [ + ":mock_send_queue", + ":retransmission_error_counter", + ":retransmission_queue", + ":retransmission_timeout", + ":rr_send_queue", + ":send_queue", + "../../../api:array_view", + "../../../rtc_base:checks", + "../../../rtc_base:gunit_helpers", + "../../../rtc_base:rtc_base_approved", + "../../../test:test_support", + "../packet:chunk", + "../packet:data", + "../public:socket", + "../public:types", + "../testing:data_generator", + "../testing:testing_macros", + "../timer", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] + sources = [ + "retransmission_error_counter_test.cc", + "retransmission_queue_test.cc", + "retransmission_timeout_test.cc", + "rr_send_queue_test.cc", + ] + } +} diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h new file mode 100644 index 0000000000..0cf64583ae --- /dev/null +++ b/net/dcsctp/tx/mock_send_queue.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_ +#define NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_ + +#include <cstdint> + +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/tx/send_queue.h" +#include "test/gmock.h" + +namespace dcsctp { + +class MockSendQueue : public SendQueue { + public: + MockSendQueue() { + ON_CALL(*this, Produce).WillByDefault([](TimeMs now, size_t max_size) { + return absl::nullopt; + }); + } + + MOCK_METHOD(absl::optional<SendQueue::DataToSend>, + Produce, + (TimeMs now, size_t max_size), + (override)); + MOCK_METHOD(bool, + Discard, + (IsUnordered unordered, StreamID stream_id, MID message_id), + (override)); + MOCK_METHOD(void, + PrepareResetStreams, + (rtc::ArrayView<const StreamID> streams), + (override)); + MOCK_METHOD(bool, CanResetStreams, (), (const, override)); + MOCK_METHOD(void, CommitResetStreams, (), (override)); + MOCK_METHOD(void, RollbackResetStreams, (), (override)); + MOCK_METHOD(void, Reset, (), (override)); + MOCK_METHOD(size_t, buffered_amount, (StreamID stream_id), (const, override)); + MOCK_METHOD(size_t, total_buffered_amount, (), (const, override)); + MOCK_METHOD(size_t, + buffered_amount_low_threshold, + (StreamID stream_id), + (const, override)); + MOCK_METHOD(void, + SetBufferedAmountLowThreshold, + (StreamID stream_id, size_t bytes), + (override)); +}; + +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_ diff --git a/net/dcsctp/tx/retransmission_error_counter.cc b/net/dcsctp/tx/retransmission_error_counter.cc new file mode 100644 index 0000000000..111b6efe96 --- /dev/null +++ b/net/dcsctp/tx/retransmission_error_counter.cc @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/retransmission_error_counter.h" + +#include "absl/strings/string_view.h" +#include "rtc_base/logging.h" + +namespace dcsctp { +bool RetransmissionErrorCounter::Increment(absl::string_view reason) { + ++counter_; + if (counter_ > limit_) { + RTC_DLOG(LS_INFO) << log_prefix_ << reason + << ", too many retransmissions, counter=" << counter_; + return false; + } + + RTC_DLOG(LS_VERBOSE) << log_prefix_ << reason << ", new counter=" << counter_ + << ", max=" << limit_; + return true; +} + +void RetransmissionErrorCounter::Clear() { + if (counter_ > 0) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "recovered from counter=" << counter_; + counter_ = 0; + } +} + +} // namespace dcsctp diff --git a/net/dcsctp/tx/retransmission_error_counter.h b/net/dcsctp/tx/retransmission_error_counter.h new file mode 100644 index 0000000000..bb8d1f754d --- /dev/null +++ b/net/dcsctp/tx/retransmission_error_counter.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_RETRANSMISSION_ERROR_COUNTER_H_ +#define NET_DCSCTP_TX_RETRANSMISSION_ERROR_COUNTER_H_ + +#include <functional> +#include <string> +#include <utility> + +#include "absl/strings/string_view.h" +#include "net/dcsctp/public/dcsctp_options.h" + +namespace dcsctp { + +// The RetransmissionErrorCounter is a simple counter with a limit, and when +// the limit is exceeded, the counter is exhausted and the connection will +// be closed. It's incremented on retransmission errors, such as the T3-RTX +// timer expiring, but also missing heartbeats and stream reset requests. +class RetransmissionErrorCounter { + public: + RetransmissionErrorCounter(absl::string_view log_prefix, + const DcSctpOptions& options) + : log_prefix_(std::string(log_prefix) + "rtx-errors: "), + limit_(options.max_retransmissions) {} + + // Increments the retransmission timer. If the maximum error count has been + // reached, `false` will be returned. + bool Increment(absl::string_view reason); + bool IsExhausted() const { return counter_ > limit_; } + + // Clears the retransmission errors. + void Clear(); + + // Returns its current value + int value() const { return counter_; } + + private: + const std::string log_prefix_; + const int limit_; + int counter_ = 0; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_RETRANSMISSION_ERROR_COUNTER_H_ diff --git a/net/dcsctp/tx/retransmission_error_counter_test.cc b/net/dcsctp/tx/retransmission_error_counter_test.cc new file mode 100644 index 0000000000..61ee82926d --- /dev/null +++ b/net/dcsctp/tx/retransmission_error_counter_test.cc @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/retransmission_error_counter.h" + +#include "net/dcsctp/public/dcsctp_options.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { + +TEST(RetransmissionErrorCounterTest, HasInitialValue) { + DcSctpOptions options; + RetransmissionErrorCounter counter("log: ", options); + EXPECT_EQ(counter.value(), 0); +} + +TEST(RetransmissionErrorCounterTest, ReturnsFalseAtMaximumValue) { + DcSctpOptions options; + options.max_retransmissions = 5; + RetransmissionErrorCounter counter("log: ", options); + EXPECT_TRUE(counter.Increment("test")); // 1 + EXPECT_TRUE(counter.Increment("test")); // 2 + EXPECT_TRUE(counter.Increment("test")); // 3 + EXPECT_TRUE(counter.Increment("test")); // 4 + EXPECT_TRUE(counter.Increment("test")); // 5 + EXPECT_FALSE(counter.Increment("test")); // Too many retransmissions +} + +TEST(RetransmissionErrorCounterTest, CanHandleZeroRetransmission) { + DcSctpOptions options; + options.max_retransmissions = 0; + RetransmissionErrorCounter counter("log: ", options); + EXPECT_FALSE(counter.Increment("test")); // One is too many. +} + +TEST(RetransmissionErrorCounterTest, IsExhaustedAtMaximum) { + DcSctpOptions options; + options.max_retransmissions = 3; + RetransmissionErrorCounter counter("log: ", options); + EXPECT_TRUE(counter.Increment("test")); // 1 + EXPECT_FALSE(counter.IsExhausted()); + EXPECT_TRUE(counter.Increment("test")); // 2 + EXPECT_FALSE(counter.IsExhausted()); + EXPECT_TRUE(counter.Increment("test")); // 3 + EXPECT_FALSE(counter.IsExhausted()); + EXPECT_FALSE(counter.Increment("test")); // Too many retransmissions + EXPECT_TRUE(counter.IsExhausted()); + EXPECT_FALSE(counter.Increment("test")); // One after too many + EXPECT_TRUE(counter.IsExhausted()); +} + +TEST(RetransmissionErrorCounterTest, ClearingCounter) { + DcSctpOptions options; + options.max_retransmissions = 3; + RetransmissionErrorCounter counter("log: ", options); + EXPECT_TRUE(counter.Increment("test")); // 1 + EXPECT_TRUE(counter.Increment("test")); // 2 + counter.Clear(); + EXPECT_TRUE(counter.Increment("test")); // 1 + EXPECT_TRUE(counter.Increment("test")); // 2 + EXPECT_TRUE(counter.Increment("test")); // 3 + EXPECT_FALSE(counter.IsExhausted()); + EXPECT_FALSE(counter.Increment("test")); // Too many retransmissions + EXPECT_TRUE(counter.IsExhausted()); +} + +} // namespace +} // namespace dcsctp diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc new file mode 100644 index 0000000000..ef2f0e3172 --- /dev/null +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -0,0 +1,889 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/retransmission_queue.h" + +#include <algorithm> +#include <cstdint> +#include <functional> +#include <iterator> +#include <map> +#include <set> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "absl/algorithm/container.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/math.h" +#include "net/dcsctp/common/pair_hash.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/common/str_join.h" +#include "net/dcsctp/packet/chunk/data_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/chunk/idata_chunk.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/strings/string_builder.h" + +namespace dcsctp { +namespace { + +// The number of times a packet must be NACKed before it's retransmitted. +// See https://tools.ietf.org/html/rfc4960#section-7.2.4 +constexpr size_t kNumberOfNacksForRetransmission = 3; +} // namespace + +RetransmissionQueue::RetransmissionQueue( + absl::string_view log_prefix, + TSN initial_tsn, + size_t a_rwnd, + SendQueue& send_queue, + std::function<void(DurationMs rtt)> on_new_rtt, + std::function<void()> on_clear_retransmission_counter, + Timer& t3_rtx, + const DcSctpOptions& options, + bool supports_partial_reliability, + bool use_message_interleaving) + : options_(options), + partial_reliability_(supports_partial_reliability), + log_prefix_(std::string(log_prefix) + "tx: "), + data_chunk_header_size_(use_message_interleaving + ? IDataChunk::kHeaderSize + : DataChunk::kHeaderSize), + on_new_rtt_(std::move(on_new_rtt)), + on_clear_retransmission_counter_( + std::move(on_clear_retransmission_counter)), + t3_rtx_(t3_rtx), + cwnd_(options_.cwnd_mtus_initial * options_.mtu), + rwnd_(a_rwnd), + // https://tools.ietf.org/html/rfc4960#section-7.2.1 + // "The initial value of ssthresh MAY be arbitrarily high (for + // example, implementations MAY use the size of the receiver advertised + // window)."" + ssthresh_(rwnd_), + next_tsn_(tsn_unwrapper_.Unwrap(initial_tsn)), + last_cumulative_tsn_ack_(tsn_unwrapper_.Unwrap(TSN(*initial_tsn - 1))), + send_queue_(send_queue) {} + +bool RetransmissionQueue::IsConsistent() const { + size_t actual_outstanding_bytes = 0; + + std::set<UnwrappedTSN> actual_to_be_retransmitted; + for (const auto& elem : outstanding_data_) { + if (elem.second.is_outstanding()) { + actual_outstanding_bytes += GetSerializedChunkSize(elem.second.data()); + } + + if (elem.second.should_be_retransmitted()) { + actual_to_be_retransmitted.insert(elem.first); + } + } + + return actual_outstanding_bytes == outstanding_bytes_ && + actual_to_be_retransmitted == to_be_retransmitted_; +} + +// Returns how large a chunk will be, serialized, carrying the data +size_t RetransmissionQueue::GetSerializedChunkSize(const Data& data) const { + return RoundUpTo4(data_chunk_header_size_ + data.size()); +} + +void RetransmissionQueue::RemoveAcked(UnwrappedTSN cumulative_tsn_ack, + AckInfo& ack_info) { + auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack); + + for (auto it = outstanding_data_.begin(); it != first_unacked; ++it) { + ack_info.bytes_acked_by_cumulative_tsn_ack += it->second.data().size(); + ack_info.acked_tsns.push_back(it->first.Wrap()); + if (it->second.is_outstanding()) { + outstanding_bytes_ -= GetSerializedChunkSize(it->second.data()); + } else if (it->second.should_be_retransmitted()) { + to_be_retransmitted_.erase(it->first); + } + } + + outstanding_data_.erase(outstanding_data_.begin(), first_unacked); +} + +void RetransmissionQueue::AckGapBlocks( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks, + AckInfo& ack_info) { + // Mark all non-gaps as ACKED (but they can't be removed) as (from RFC) + // "SCTP considers the information carried in the Gap Ack Blocks in the + // SACK chunk as advisory.". Note that when NR-SACK is supported, this can be + // handled differently. + + for (auto& block : gap_ack_blocks) { + auto start = outstanding_data_.lower_bound( + UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start)); + auto end = outstanding_data_.upper_bound( + UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end)); + for (auto iter = start; iter != end; ++iter) { + if (!iter->second.is_acked()) { + ack_info.bytes_acked_by_new_gap_ack_blocks += + iter->second.data().size(); + if (iter->second.is_outstanding()) { + outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data()); + } + if (iter->second.should_be_retransmitted()) { + to_be_retransmitted_.erase(iter->first); + } + iter->second.Ack(); + ack_info.highest_tsn_acked = + std::max(ack_info.highest_tsn_acked, iter->first); + ack_info.acked_tsns.push_back(iter->first.Wrap()); + } + } + } +} + +void RetransmissionQueue::NackBetweenAckBlocks( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks, + AckInfo& ack_info) { + // Mark everything between the blocks as NACKED/TO_BE_RETRANSMITTED. + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "Mark the DATA chunk(s) with three miss indications for retransmission." + // "For each incoming SACK, miss indications are incremented only for + // missing TSNs prior to the highest TSN newly acknowledged in the SACK." + // + // What this means is that only when there is a increasing stream of data + // received and there are new packets seen (since last time), packets that are + // in-flight and between gaps should be nacked. This means that SCTP relies on + // the T3-RTX-timer to re-send packets otherwise. + UnwrappedTSN max_tsn_to_nack = ack_info.highest_tsn_acked; + if (is_in_fast_recovery() && cumulative_tsn_ack > last_cumulative_tsn_ack_) { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "If an endpoint is in Fast Recovery and a SACK arrives that advances + // the Cumulative TSN Ack Point, the miss indications are incremented for + // all TSNs reported missing in the SACK." + max_tsn_to_nack = UnwrappedTSN::AddTo( + cumulative_tsn_ack, + gap_ack_blocks.empty() ? 0 : gap_ack_blocks.rbegin()->end); + } + + UnwrappedTSN prev_block_last_acked = cumulative_tsn_ack; + for (auto& block : gap_ack_blocks) { + UnwrappedTSN cur_block_first_acked = + UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); + for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked); + iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) { + if (iter->first <= max_tsn_to_nack) { + if (iter->second.is_outstanding()) { + outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data()); + } + + if (iter->second.Nack()) { + ack_info.has_packet_loss = true; + to_be_retransmitted_.insert(iter->first); + RTC_DLOG(LS_VERBOSE) << log_prefix_ << *iter->first.Wrap() + << " marked for retransmission"; + } + } + } + prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); + } + + // Note that packets are not NACKED which are above the highest gap-ack-block + // (or above the cumulative ack TSN if no gap-ack-blocks) as only packets + // up until the highest_tsn_acked (see above) should be considered when + // NACKing. +} + +void RetransmissionQueue::MaybeExitFastRecovery( + UnwrappedTSN cumulative_tsn_ack) { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "When a SACK acknowledges all TSNs up to and including this [fast + // recovery] exit point, Fast Recovery is exited." + if (fast_recovery_exit_tsn_.has_value() && + cumulative_tsn_ack >= *fast_recovery_exit_tsn_) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "exit_point=" << *fast_recovery_exit_tsn_->Wrap() + << " reached - exiting fast recovery"; + fast_recovery_exit_tsn_ = absl::nullopt; + } +} + +void RetransmissionQueue::HandleIncreasedCumulativeTsnAck( + size_t outstanding_bytes, + size_t total_bytes_acked) { + // Allow some margin for classifying as fully utilized, due to e.g. that too + // small packets (less than kMinimumFragmentedPayload) are not sent + + // overhead. + bool is_fully_utilized = outstanding_bytes + options_.mtu >= cwnd_; + size_t old_cwnd = cwnd_; + if (phase() == CongestionAlgorithmPhase::kSlowStart) { + if (is_fully_utilized && !is_in_fast_recovery()) { + // https://tools.ietf.org/html/rfc4960#section-7.2.1 + // "Only when these three conditions are met can the cwnd be + // increased; otherwise, the cwnd MUST not be increased. If these + // conditions are met, then cwnd MUST be increased by, at most, the + // lesser of 1) the total size of the previously outstanding DATA + // chunk(s) acknowledged, and 2) the destination's path MTU." + if (options_.slow_start_tcp_style) { + cwnd_ += std::min(total_bytes_acked, cwnd_); + } else { + cwnd_ += std::min(total_bytes_acked, options_.mtu); + } + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "SS increase cwnd=" << cwnd_ + << " (" << old_cwnd << ")"; + } + } else if (phase() == CongestionAlgorithmPhase::kCongestionAvoidance) { + // https://tools.ietf.org/html/rfc4960#section-7.2.2 + // "Whenever cwnd is greater than ssthresh, upon each SACK arrival + // that advances the Cumulative TSN Ack Point, increase + // partial_bytes_acked by the total number of bytes of all new chunks + // acknowledged in that SACK including chunks acknowledged by the new + // Cumulative TSN Ack and by Gap Ack Blocks." + size_t old_pba = partial_bytes_acked_; + partial_bytes_acked_ += total_bytes_acked; + + if (partial_bytes_acked_ >= cwnd_ && is_fully_utilized) { + // https://tools.ietf.org/html/rfc4960#section-7.2.2 + // "When partial_bytes_acked is equal to or greater than cwnd and + // before the arrival of the SACK the sender had cwnd or more bytes of + // data outstanding (i.e., before arrival of the SACK, flightsize was + // greater than or equal to cwnd), increase cwnd by MTU, and reset + // partial_bytes_acked to (partial_bytes_acked - cwnd)." + cwnd_ += options_.mtu; + partial_bytes_acked_ -= cwnd_; + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA increase cwnd=" << cwnd_ + << " (" << old_cwnd << ") ssthresh=" << ssthresh_ + << ", pba=" << partial_bytes_acked_ << " (" + << old_pba << ")"; + } else { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA unchanged cwnd=" << cwnd_ + << " (" << old_cwnd << ") ssthresh=" << ssthresh_ + << ", pba=" << partial_bytes_acked_ << " (" + << old_pba << ")"; + } + } +} + +void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) { + if (!is_in_fast_recovery()) { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "If not in Fast Recovery, adjust the ssthresh and cwnd of the + // destination address(es) to which the missing DATA chunks were last + // sent, according to the formula described in Section 7.2.3." + size_t old_cwnd = cwnd_; + size_t old_pba = partial_bytes_acked_; + ssthresh_ = std::max(cwnd_ / 2, options_.cwnd_mtus_min * options_.mtu); + cwnd_ = ssthresh_; + partial_bytes_acked_ = 0; + + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "packet loss detected (not fast recovery). cwnd=" + << cwnd_ << " (" << old_cwnd + << "), ssthresh=" << ssthresh_ + << ", pba=" << partial_bytes_acked_ << " (" << old_pba + << ")"; + + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "If not in Fast Recovery, enter Fast Recovery and mark the highest + // outstanding TSN as the Fast Recovery exit point." + fast_recovery_exit_tsn_ = outstanding_data_.empty() + ? last_cumulative_tsn_ack_ + : outstanding_data_.rbegin()->first; + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "fast recovery initiated with exit_point=" + << *fast_recovery_exit_tsn_->Wrap(); + } else { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "While in Fast Recovery, the ssthresh and cwnd SHOULD NOT change for + // any destinations due to a subsequent Fast Recovery event (i.e., one + // SHOULD NOT reduce the cwnd further due to a subsequent Fast Retransmit)." + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "packet loss detected (fast recovery). No changes."; + } +} + +void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) { + rwnd_ = outstanding_bytes_ >= a_rwnd ? 0 : a_rwnd - outstanding_bytes_; +} + +void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() { + // Note: Can't use `outstanding_bytes()` as that one doesn't count chunks to + // be retransmitted. + if (outstanding_data_.empty()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.2 + // "Whenever all outstanding data sent to an address have been + // acknowledged, turn off the T3-rtx timer of that address. + // Note: Already stopped in `StopT3RtxTimerOnIncreasedCumulativeTsnAck`." + } else { + // https://tools.ietf.org/html/rfc4960#section-6.3.2 + // "Whenever a SACK is received that acknowledges the DATA chunk + // with the earliest outstanding TSN for that address, restart the T3-rtx + // timer for that address with its current RTO (if there is still + // outstanding data on that address)." + // "Whenever a SACK is received missing a TSN that was previously + // acknowledged via a Gap Ack Block, start the T3-rtx for the destination + // address to which the DATA chunk was originally transmitted if it is not + // already running." + if (!t3_rtx_.is_running()) { + t3_rtx_.Start(); + } + } +} + +bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const { + // https://tools.ietf.org/html/rfc4960#section-6.2.1 + // "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point, + // then drop the SACK. Since Cumulative TSN Ack is monotonically increasing, + // a SACK whose Cumulative TSN Ack is less than the Cumulative TSN Ack Point + // indicates an out-of- order SACK." + // + // Note: Important not to drop SACKs with identical TSN to that previously + // received, as the gap ack blocks or dup tsn fields may have changed. + UnwrappedTSN cumulative_tsn_ack = + tsn_unwrapper_.PeekUnwrap(sack.cumulative_tsn_ack()); + if (cumulative_tsn_ack < last_cumulative_tsn_ack_) { + // https://tools.ietf.org/html/rfc4960#section-6.2.1 + // "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point, + // then drop the SACK. Since Cumulative TSN Ack is monotonically + // increasing, a SACK whose Cumulative TSN Ack is less than the Cumulative + // TSN Ack Point indicates an out-of- order SACK." + return false; + } else if (outstanding_data_.empty() && + cumulative_tsn_ack > last_cumulative_tsn_ack_) { + // No in-flight data and cum-tsn-ack above what was last ACKed - not valid. + return false; + } else if (!outstanding_data_.empty() && + cumulative_tsn_ack > outstanding_data_.rbegin()->first) { + // There is in-flight data, but the cum-tsn-ack is beyond that - not valid. + return false; + } + return true; +} + +bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { + if (!IsSackValid(sack)) { + return false; + } + + size_t old_outstanding_bytes = outstanding_bytes_; + size_t old_rwnd = rwnd_; + UnwrappedTSN cumulative_tsn_ack = + tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack()); + + if (sack.gap_ack_blocks().empty()) { + UpdateRTT(now, cumulative_tsn_ack); + } + + AckInfo ack_info(cumulative_tsn_ack); + // Erase all items up to cumulative_tsn_ack. + RemoveAcked(cumulative_tsn_ack, ack_info); + + // ACK packets reported in the gap ack blocks + AckGapBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info); + + // NACK and possibly mark for retransmit chunks that weren't acked. + NackBetweenAckBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info); + + // Update of outstanding_data_ is now done. Congestion control remains. + UpdateReceiverWindow(sack.a_rwnd()); + + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK. Acked TSN: " + << StrJoin(ack_info.acked_tsns, ",", + [](rtc::StringBuilder& sb, TSN tsn) { + sb << *tsn; + }) + << ", cum_tsn_ack=" << *cumulative_tsn_ack.Wrap() << " (" + << *last_cumulative_tsn_ack_.Wrap() + << "), outstanding_bytes=" << outstanding_bytes_ << " (" + << old_outstanding_bytes << "), rwnd=" << rwnd_ << " (" + << old_rwnd << ")"; + + MaybeExitFastRecovery(cumulative_tsn_ack); + + if (cumulative_tsn_ack > last_cumulative_tsn_ack_) { + // https://tools.ietf.org/html/rfc4960#section-6.3.2 + // "Whenever a SACK is received that acknowledges the DATA chunk + // with the earliest outstanding TSN for that address, restart the T3-rtx + // timer for that address with its current RTO (if there is still + // outstanding data on that address)." + // Note: It may be started again in a bit further down. + t3_rtx_.Stop(); + + HandleIncreasedCumulativeTsnAck( + old_outstanding_bytes, ack_info.bytes_acked_by_cumulative_tsn_ack + + ack_info.bytes_acked_by_new_gap_ack_blocks); + } + + if (ack_info.has_packet_loss) { + is_in_fast_retransmit_ = true; + HandlePacketLoss(ack_info.highest_tsn_acked); + } + + // https://tools.ietf.org/html/rfc4960#section-8.2 + // "When an outstanding TSN is acknowledged [...] the endpoint shall clear + // the error counter ..." + if (ack_info.bytes_acked_by_cumulative_tsn_ack > 0 || + ack_info.bytes_acked_by_new_gap_ack_blocks > 0) { + on_clear_retransmission_counter_(); + } + + last_cumulative_tsn_ack_ = cumulative_tsn_ack; + StartT3RtxTimerIfOutstandingData(); + RTC_DCHECK(IsConsistent()); + return true; +} + +void RetransmissionQueue::UpdateRTT(TimeMs now, + UnwrappedTSN cumulative_tsn_ack) { + // RTT updating is flawed in SCTP, as explained in e.g. Pedersen J, Griwodz C, + // Halvorsen P (2006) Considerations of SCTP retransmission delays for thin + // streams. + // Due to delayed acknowledgement, the SACK may be sent much later which + // increases the calculated RTT. + // TODO(boivie): Consider occasionally sending DATA chunks with I-bit set and + // use only those packets for measurement. + + auto it = outstanding_data_.find(cumulative_tsn_ack); + if (it != outstanding_data_.end()) { + if (!it->second.has_been_retransmitted()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.1 + // "Karn's algorithm: RTT measurements MUST NOT be made using + // packets that were retransmitted (and thus for which it is ambiguous + // whether the reply was for the first instance of the chunk or for a + // later instance)" + DurationMs rtt = now - it->second.time_sent(); + on_new_rtt_(rtt); + } + } +} + +void RetransmissionQueue::HandleT3RtxTimerExpiry() { + size_t old_cwnd = cwnd_; + size_t old_outstanding_bytes = outstanding_bytes_; + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "For the destination address for which the timer expires, adjust + // its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU." + ssthresh_ = std::max(cwnd_ / 2, 4 * options_.mtu); + cwnd_ = 1 * options_.mtu; + + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "For the destination address for which the timer expires, set RTO + // <- RTO * 2 ("back off the timer"). The maximum value discussed in rule C7 + // above (RTO.max) may be used to provide an upper bound to this doubling + // operation." + + // Already done by the Timer implementation. + + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "Determine how many of the earliest (i.e., lowest TSN) outstanding + // DATA chunks for the address for which the T3-rtx has expired will fit into + // a single packet" + + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "Note: Any DATA chunks that were sent to the address for which the + // T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be + // marked for retransmission and sent as soon as cwnd allows (normally, when a + // SACK arrives)." + int count = 0; + for (auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + TxData& item = elem.second; + if (!item.is_acked()) { + if (item.is_outstanding()) { + outstanding_bytes_ -= GetSerializedChunkSize(item.data()); + } + if (item.Nack(/*retransmit_now=*/true)) { + to_be_retransmitted_.insert(tsn); + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Chunk " << *tsn.Wrap() + << " will be retransmitted due to T3-RTX"; + ++count; + } + } + } + + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "Start the retransmission timer T3-rtx on the destination address + // to which the retransmission is sent, if rule R1 above indicates to do so." + + // Already done by the Timer implementation. + + RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_ + << " (" << old_cwnd << "), ssthresh=" << ssthresh_ + << ", rtx-packets=" << count << ", outstanding_bytes " + << outstanding_bytes_ << " (" << old_outstanding_bytes + << ")"; + RTC_DCHECK(IsConsistent()); +} + +std::vector<std::pair<TSN, Data>> +RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) { + std::vector<std::pair<TSN, Data>> result; + + for (auto it = to_be_retransmitted_.begin(); + it != to_be_retransmitted_.end();) { + UnwrappedTSN tsn = *it; + auto elem = outstanding_data_.find(tsn); + RTC_DCHECK(elem != outstanding_data_.end()); + TxData& item = elem->second; + RTC_DCHECK(item.should_be_retransmitted()); + RTC_DCHECK(!item.is_outstanding()); + RTC_DCHECK(!item.is_abandoned()); + RTC_DCHECK(!item.is_acked()); + + size_t serialized_size = GetSerializedChunkSize(item.data()); + if (serialized_size <= max_size) { + item.Retransmit(); + result.emplace_back(tsn.Wrap(), item.data().Clone()); + max_size -= serialized_size; + outstanding_bytes_ += serialized_size; + it = to_be_retransmitted_.erase(it); + } else { + ++it; + } + // No point in continuing if the packet is full. + if (max_size <= data_chunk_header_size_) { + break; + } + } + + return result; +} + +std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend( + TimeMs now, + size_t bytes_remaining_in_packet) { + // Chunks are always padded to even divisible by four. + RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet)); + + std::vector<std::pair<TSN, Data>> to_be_sent; + size_t old_outstanding_bytes = outstanding_bytes_; + size_t old_rwnd = rwnd_; + if (is_in_fast_retransmit()) { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "Determine how many of the earliest (i.e., lowest TSN) DATA chunks + // marked for retransmission will fit into a single packet ... Retransmit + // those K DATA chunks in a single packet. When a Fast Retransmit is being + // performed, the sender SHOULD ignore the value of cwnd and SHOULD NOT + // delay retransmission for this single packet." + is_in_fast_retransmit_ = false; + to_be_sent = GetChunksToBeRetransmitted(bytes_remaining_in_packet); + size_t to_be_sent_bytes = absl::c_accumulate( + to_be_sent, 0, [&](size_t r, const std::pair<TSN, Data>& d) { + return r + GetSerializedChunkSize(d.second); + }); + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "fast-retransmit: sending " + << to_be_sent.size() << " chunks, " << to_be_sent_bytes + << " bytes"; + } else { + // Normal sending. Calculate the bandwidth budget (how many bytes that is + // allowed to be sent), and fill that up first with chunks that are + // scheduled to be retransmitted. If there is still budget, send new chunks + // (which will have their TSN assigned here.) + size_t remaining_cwnd_bytes = + outstanding_bytes_ >= cwnd_ ? 0 : cwnd_ - outstanding_bytes_; + size_t max_bytes = RoundDownTo4(std::min( + std::min(bytes_remaining_in_packet, rwnd()), remaining_cwnd_bytes)); + + to_be_sent = GetChunksToBeRetransmitted(max_bytes); + max_bytes -= absl::c_accumulate( + to_be_sent, 0, [&](size_t r, const std::pair<TSN, Data>& d) { + return r + GetSerializedChunkSize(d.second); + }); + + while (max_bytes > data_chunk_header_size_) { + RTC_DCHECK(IsDivisibleBy4(max_bytes)); + absl::optional<SendQueue::DataToSend> chunk_opt = + send_queue_.Produce(now, max_bytes - data_chunk_header_size_); + if (!chunk_opt.has_value()) { + break; + } + + UnwrappedTSN tsn = next_tsn_; + next_tsn_.Increment(); + to_be_sent.emplace_back(tsn.Wrap(), chunk_opt->data.Clone()); + + // All chunks are always padded to be even divisible by 4. + size_t chunk_size = GetSerializedChunkSize(chunk_opt->data); + max_bytes -= chunk_size; + outstanding_bytes_ += chunk_size; + rwnd_ -= chunk_size; + outstanding_data_.emplace( + tsn, RetransmissionQueue::TxData(std::move(chunk_opt->data), + chunk_opt->max_retransmissions, now, + chunk_opt->expires_at)); + } + } + + if (!to_be_sent.empty()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.2 + // "Every time a DATA chunk is sent to any address (including a + // retransmission), if the T3-rtx timer of that address is not running, + // start it running so that it will expire after the RTO of that address." + if (!t3_rtx_.is_running()) { + t3_rtx_.Start(); + } + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Sending TSN " + << StrJoin(to_be_sent, ",", + [&](rtc::StringBuilder& sb, + const std::pair<TSN, Data>& c) { + sb << *c.first; + }) + << " - " + << absl::c_accumulate( + to_be_sent, 0, + [&](size_t r, const std::pair<TSN, Data>& d) { + return r + GetSerializedChunkSize(d.second); + }) + << " bytes. outstanding_bytes=" << outstanding_bytes_ + << " (" << old_outstanding_bytes << "), cwnd=" << cwnd_ + << ", rwnd=" << rwnd_ << " (" << old_rwnd << ")"; + } + RTC_DCHECK(IsConsistent()); + return to_be_sent; +} + +std::vector<std::pair<TSN, RetransmissionQueue::State>> +RetransmissionQueue::GetChunkStatesForTesting() const { + std::vector<std::pair<TSN, RetransmissionQueue::State>> states; + states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked); + for (const auto& elem : outstanding_data_) { + State state; + if (elem.second.is_abandoned()) { + state = State::kAbandoned; + } else if (elem.second.should_be_retransmitted()) { + state = State::kToBeRetransmitted; + } else if (elem.second.is_acked()) { + state = State::kAcked; + } else if (elem.second.is_outstanding()) { + state = State::kInFlight; + } else { + state = State::kNacked; + } + + states.emplace_back(elem.first.Wrap(), state); + } + return states; +} + +bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { + if (!partial_reliability_) { + return false; + } + ExpireChunks(now); + if (!outstanding_data_.empty()) { + auto it = outstanding_data_.begin(); + return it->first == last_cumulative_tsn_ack_.next_value() && + it->second.is_abandoned(); + } + RTC_DCHECK(IsConsistent()); + return false; +} + +void RetransmissionQueue::TxData::Ack() { + ack_state_ = AckState::kAcked; + should_be_retransmitted_ = false; +} + +bool RetransmissionQueue::TxData::Nack(bool retransmit_now) { + ack_state_ = AckState::kNacked; + ++nack_count_; + if ((retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission) && + !is_abandoned_) { + should_be_retransmitted_ = true; + return true; + } + return false; +} + +void RetransmissionQueue::TxData::Retransmit() { + ack_state_ = AckState::kUnacked; + should_be_retransmitted_ = false; + + nack_count_ = 0; + ++num_retransmissions_; +} + +void RetransmissionQueue::TxData::Abandon() { + is_abandoned_ = true; + should_be_retransmitted_ = false; +} + +bool RetransmissionQueue::TxData::has_expired(TimeMs now) const { + if (ack_state_ != AckState::kAcked && !is_abandoned_) { + if (max_retransmissions_.has_value() && + num_retransmissions_ >= *max_retransmissions_) { + return true; + } else if (expires_at_.has_value() && *expires_at_ <= now) { + return true; + } + } + return false; +} + +void RetransmissionQueue::ExpireChunks(TimeMs now) { + for (const auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + const TxData& item = elem.second; + + // Chunks that are in-flight (possibly lost?), nacked or to be retransmitted + // can be expired easily. There is always a risk that a message is expired + // that was already received by the peer, but for which there haven't been + // a SACK received. But that's acceptable, and handled. + if (item.is_abandoned()) { + // Already abandoned. + } else if (item.has_expired(now)) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap() + << " and message " << *item.data().message_id + << " as expired"; + ExpireAllFor(item); + } else { + // A non-expired chunk. No need to iterate any further. + break; + } + } +} + +void RetransmissionQueue::ExpireAllFor( + const RetransmissionQueue::TxData& item) { + // Erase all remaining chunks from the producer, if any. + if (send_queue_.Discard(item.data().is_unordered, item.data().stream_id, + item.data().message_id)) { + // There were remaining chunks to be produced for this message. Since the + // receiver may have already received all chunks (up till now) for this + // message, we can't just FORWARD-TSN to the last fragment in this + // (abandoned) message and start sending a new message, as the receiver will + // then see a new message before the end of the previous one was seen (or + // skipped over). So create a new fragment, representing the end, that the + // received will never see as it is abandoned immediately and used as cum + // TSN in the sent FORWARD-TSN. + UnwrappedTSN tsn = next_tsn_; + next_tsn_.Increment(); + Data message_end(item.data().stream_id, item.data().ssn, + item.data().message_id, item.data().fsn, item.data().ppid, + std::vector<uint8_t>(), Data::IsBeginning(false), + Data::IsEnd(true), item.data().is_unordered); + TxData& added_item = + outstanding_data_ + .emplace(tsn, RetransmissionQueue::TxData(std::move(message_end), + absl::nullopt, TimeMs(0), + absl::nullopt)) + .first->second; + // The added chunk shouldn't be included in `outstanding_bytes`, so set it + // as acked. + added_item.Ack(); + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Adding unsent end placeholder for message at tsn=" + << *tsn.Wrap(); + } + for (auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + TxData& other = elem.second; + + if (!other.is_abandoned() && + other.data().stream_id == item.data().stream_id && + other.data().is_unordered == item.data().is_unordered && + other.data().message_id == item.data().message_id) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap() + << " as abandoned"; + if (other.should_be_retransmitted()) { + to_be_retransmitted_.erase(tsn); + } + other.Abandon(); + } + } +} + +ForwardTsnChunk RetransmissionQueue::CreateForwardTsn() const { + std::unordered_map<StreamID, SSN, StreamID::Hasher> + skipped_per_ordered_stream; + UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; + + for (const auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + const TxData& item = elem.second; + + if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { + break; + } + new_cumulative_ack = tsn; + if (!item.data().is_unordered && + item.data().ssn > skipped_per_ordered_stream[item.data().stream_id]) { + skipped_per_ordered_stream[item.data().stream_id] = item.data().ssn; + } + } + + std::vector<ForwardTsnChunk::SkippedStream> skipped_streams; + skipped_streams.reserve(skipped_per_ordered_stream.size()); + for (const auto& elem : skipped_per_ordered_stream) { + skipped_streams.emplace_back(elem.first, elem.second); + } + return ForwardTsnChunk(new_cumulative_ack.Wrap(), std::move(skipped_streams)); +} + +IForwardTsnChunk RetransmissionQueue::CreateIForwardTsn() const { + std::unordered_map<std::pair<IsUnordered, StreamID>, MID, UnorderedStreamHash> + skipped_per_stream; + UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; + + for (const auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + const TxData& item = elem.second; + + if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { + break; + } + new_cumulative_ack = tsn; + std::pair<IsUnordered, StreamID> stream_id = + std::make_pair(item.data().is_unordered, item.data().stream_id); + + if (item.data().message_id > skipped_per_stream[stream_id]) { + skipped_per_stream[stream_id] = item.data().message_id; + } + } + + std::vector<IForwardTsnChunk::SkippedStream> skipped_streams; + skipped_streams.reserve(skipped_per_stream.size()); + for (const auto& elem : skipped_per_stream) { + const std::pair<IsUnordered, StreamID>& stream = elem.first; + MID message_id = elem.second; + skipped_streams.emplace_back(stream.first, stream.second, message_id); + } + + return IForwardTsnChunk(new_cumulative_ack.Wrap(), + std::move(skipped_streams)); +} + +void RetransmissionQueue::PrepareResetStreams( + rtc::ArrayView<const StreamID> streams) { + // TODO(boivie): These calls are now only affecting the send queue. The + // packet buffer can also change behavior - for example draining the chunk + // producer and eagerly assign TSNs so that an "Outgoing SSN Reset Request" + // can be sent quickly, with a known `sender_last_assigned_tsn`. + send_queue_.PrepareResetStreams(streams); +} +bool RetransmissionQueue::CanResetStreams() const { + return send_queue_.CanResetStreams(); +} +void RetransmissionQueue::CommitResetStreams() { + send_queue_.CommitResetStreams(); +} +void RetransmissionQueue::RollbackResetStreams() { + send_queue_.RollbackResetStreams(); +} + +} // namespace dcsctp diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h new file mode 100644 index 0000000000..7f5baf9fff --- /dev/null +++ b/net/dcsctp/tx/retransmission_queue.h @@ -0,0 +1,371 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_ +#define NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_ + +#include <cstdint> +#include <functional> +#include <map> +#include <set> +#include <string> +#include <utility> +#include <vector> + +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/retransmission_timeout.h" +#include "net/dcsctp/tx/send_queue.h" + +namespace dcsctp { + +// The RetransmissionQueue manages all DATA/I-DATA chunks that are in-flight and +// schedules them to be retransmitted if necessary. Chunks are retransmitted +// when they have been lost for a number of consecutive SACKs, or when the +// retransmission timer, `t3_rtx` expires. +// +// As congestion control is tightly connected with the state of transmitted +// packets, that's also managed here to limit the amount of data that is +// in-flight (sent, but not yet acknowledged). +class RetransmissionQueue { + public: + static constexpr size_t kMinimumFragmentedPayload = 10; + // State for DATA chunks (message fragments) in the queue - used in tests. + enum class State { + // The chunk has been sent but not received yet (from the sender's point of + // view, as no SACK has been received yet that reference this chunk). + kInFlight, + // A SACK has been received which explicitly marked this chunk as missing - + // it's now NACKED and may be retransmitted if NACKED enough times. + kNacked, + // A chunk that will be retransmitted when possible. + kToBeRetransmitted, + // A SACK has been received which explicitly marked this chunk as received. + kAcked, + // A chunk whose message has expired or has been retransmitted too many + // times (RFC3758). It will not be retransmitted anymore. + kAbandoned, + }; + + // Creates a RetransmissionQueue which will send data using `initial_tsn` as + // the first TSN to use for sent fragments. It will poll data from + // `send_queue` and call `on_send_queue_empty` when it is empty. When + // SACKs are received, it will estimate the RTT, and call `on_new_rtt`. When + // an outstanding chunk has been ACKed, it will call + // `on_clear_retransmission_counter` and will also use `t3_rtx`, which is the + // SCTP retransmission timer to manage retransmissions. + RetransmissionQueue(absl::string_view log_prefix, + TSN initial_tsn, + size_t a_rwnd, + SendQueue& send_queue, + std::function<void(DurationMs rtt)> on_new_rtt, + std::function<void()> on_clear_retransmission_counter, + Timer& t3_rtx, + const DcSctpOptions& options, + bool supports_partial_reliability = true, + bool use_message_interleaving = false); + + // Handles a received SACK. Returns true if the `sack` was processed and + // false if it was discarded due to received out-of-order and not relevant. + bool HandleSack(TimeMs now, const SackChunk& sack); + + // Handles an expired retransmission timer. + void HandleT3RtxTimerExpiry(); + + // Returns a list of chunks to send that would fit in one SCTP packet with + // `bytes_remaining_in_packet` bytes available. This may be further limited by + // the congestion control windows. Note that `ShouldSendForwardTSN` must be + // called prior to this method, to abandon expired chunks, as this method will + // not expire any chunks. + std::vector<std::pair<TSN, Data>> GetChunksToSend( + TimeMs now, + size_t bytes_remaining_in_packet); + + // Returns the internal state of all queued chunks. This is only used in + // unit-tests. + std::vector<std::pair<TSN, State>> GetChunkStatesForTesting() const; + + // Returns the next TSN that will be allocated for sent DATA chunks. + TSN next_tsn() const { return next_tsn_.Wrap(); } + + // Returns the size of the congestion window, in bytes. This is the number of + // bytes that may be in-flight. + size_t cwnd() const { return cwnd_; } + + // Overrides the current congestion window size. + void set_cwnd(size_t cwnd) { cwnd_ = cwnd; } + + // Returns the current receiver window size. + size_t rwnd() const { return rwnd_; } + + // Returns the number of bytes of packets that are in-flight. + size_t outstanding_bytes() const { return outstanding_bytes_; } + + // Given the current time `now`, it will evaluate if there are chunks that + // have expired and that need to be discarded. It returns true if a + // FORWARD-TSN should be sent. + bool ShouldSendForwardTsn(TimeMs now); + + // Creates a FORWARD-TSN chunk. + ForwardTsnChunk CreateForwardTsn() const; + + // Creates an I-FORWARD-TSN chunk. + IForwardTsnChunk CreateIForwardTsn() const; + + // See the SendQueue for a longer description of these methods related + // to stream resetting. + void PrepareResetStreams(rtc::ArrayView<const StreamID> streams); + bool CanResetStreams() const; + void CommitResetStreams(); + void RollbackResetStreams(); + + private: + enum class CongestionAlgorithmPhase { + kSlowStart, + kCongestionAvoidance, + }; + + // A fragmented message's DATA chunk while in the retransmission queue, and + // its associated metadata. + class TxData { + public: + explicit TxData(Data data, + absl::optional<size_t> max_retransmissions, + TimeMs time_sent, + absl::optional<TimeMs> expires_at) + : max_retransmissions_(max_retransmissions), + time_sent_(time_sent), + expires_at_(expires_at), + data_(std::move(data)) {} + + TimeMs time_sent() const { return time_sent_; } + + const Data& data() const { return data_; } + + // Acks an item. + void Ack(); + + // Nacks an item. If it has been nacked enough times, or if `retransmit_now` + // is set, it might be marked for retransmission, which is indicated by the + // return value. + bool Nack(bool retransmit_now = false); + + // Prepares the item to be retransmitted. Sets it as outstanding and + // clears all nack counters. + void Retransmit(); + + // Marks this item as abandoned. + void Abandon(); + + bool is_outstanding() const { return ack_state_ == AckState::kUnacked; } + bool is_acked() const { return ack_state_ == AckState::kAcked; } + bool is_abandoned() const { return is_abandoned_; } + + // Indicates if this chunk should be retransmitted. + bool should_be_retransmitted() const { return should_be_retransmitted_; } + // Indicates if this chunk has ever been retransmitted. + bool has_been_retransmitted() const { return num_retransmissions_ > 0; } + + // Given the current time, and the current state of this DATA chunk, it will + // indicate if it has expired (SCTP Partial Reliability Extension). + bool has_expired(TimeMs now) const; + + private: + enum class AckState { + kUnacked, + kAcked, + kNacked, + }; + // Indicates the presence of this chunk, if it's in flight (Unacked), has + // been received (Acked) or is lost (Nacked). + AckState ack_state_ = AckState::kUnacked; + // Indicates if this chunk has been abandoned, which is a terminal state. + bool is_abandoned_ = false; + // Indicates if this chunk should be retransmitted. + bool should_be_retransmitted_ = false; + + // The number of times the DATA chunk has been nacked (by having received a + // SACK which doesn't include it). Will be cleared on retransmissions. + size_t nack_count_ = 0; + // The number of times the DATA chunk has been retransmitted. + size_t num_retransmissions_ = 0; + // If the message was sent with a maximum number of retransmissions, this is + // set to that number. The value zero (0) means that it will never be + // retransmitted. + const absl::optional<size_t> max_retransmissions_; + // When the packet was sent, and placed in this queue. + const TimeMs time_sent_; + // If the message was sent with an expiration time, this is set. + const absl::optional<TimeMs> expires_at_; + // The actual data to send/retransmit. + Data data_; + }; + + // Contains variables scoped to a processing of an incoming SACK. + struct AckInfo { + explicit AckInfo(UnwrappedTSN cumulative_tsn_ack) + : highest_tsn_acked(cumulative_tsn_ack) {} + + // All TSNs that have been acked (for the first time) in this SACK. + std::vector<TSN> acked_tsns; + + // Bytes acked by increasing cumulative_tsn_ack in this SACK + size_t bytes_acked_by_cumulative_tsn_ack = 0; + + // Bytes acked by gap blocks in this SACK. + size_t bytes_acked_by_new_gap_ack_blocks = 0; + + // Indicates if this SACK indicates that packet loss has occurred. Just + // because a packet is missing in the SACK doesn't necessarily mean that + // there is packet loss as that packet might be in-flight and received + // out-of-order. But when it has been reported missing consecutive times, it + // will eventually be considered "lost" and this will be set. + bool has_packet_loss = false; + + // Highest TSN Newly Acknowledged, an SCTP variable. + UnwrappedTSN highest_tsn_acked; + }; + + bool IsConsistent() const; + + // Returns how large a chunk will be, serialized, carrying the data + size_t GetSerializedChunkSize(const Data& data) const; + + // Indicates if the congestion control algorithm is in "fast recovery". + bool is_in_fast_recovery() const { + return fast_recovery_exit_tsn_.has_value(); + } + + // Indicates if the congestion control algorithm is in "fast retransmit". + bool is_in_fast_retransmit() const { return is_in_fast_retransmit_; } + + // Indicates if the provided SACK is valid given what has previously been + // received. If it returns false, the SACK is most likely a duplicate of + // something already seen, so this returning false doesn't necessarily mean + // that the SACK is illegal. + bool IsSackValid(const SackChunk& sack) const; + + // Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items + // in the retransmission queue up until this value and will update `ack_info` + // by setting `bytes_acked_by_cumulative_tsn_ack` and `acked_tsns`. + void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info); + + // Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK + // as "acked" and update `ack_info` by adding new TSNs to `added_tsns`. + void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks, + AckInfo& ack_info); + + // Mark chunks reported as "missing", as "nacked" or "to be retransmitted" + // depending how many times this has happened. Only packets up until + // `ack_info.highest_tsn_acked` (highest TSN newly acknowledged) are + // nacked/retransmitted. The method will set `ack_info.has_packet_loss`. + void NackBetweenAckBlocks( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks, + AckInfo& ack_info); + + // When a SACK chunk is received, this method will be called which _may_ call + // into the `RetransmissionTimeout` to update the RTO. + void UpdateRTT(TimeMs now, UnwrappedTSN cumulative_tsn_ack); + + // If the congestion control is in "fast recovery mode", this may be exited + // now. + void MaybeExitFastRecovery(UnwrappedTSN cumulative_tsn_ack); + + // If chunks have been ACKed, stop the retransmission timer. + void StopT3RtxTimerOnIncreasedCumulativeTsnAck( + UnwrappedTSN cumulative_tsn_ack); + + // Update the congestion control algorithm given as the cumulative ack TSN + // value has increased, as reported in an incoming SACK chunk. + void HandleIncreasedCumulativeTsnAck(size_t outstanding_bytes, + size_t total_bytes_acked); + // Update the congestion control algorithm, given as packet loss has been + // detected, as reported in an incoming SACK chunk. + void HandlePacketLoss(UnwrappedTSN highest_tsn_acked); + // Update the view of the receiver window size. + void UpdateReceiverWindow(uint32_t a_rwnd); + // Given `max_size` of space left in a packet, which chunks can be added to + // it? + std::vector<std::pair<TSN, Data>> GetChunksToBeRetransmitted(size_t max_size); + // If there is data sent and not ACKED, ensure that the retransmission timer + // is running. + void StartT3RtxTimerIfOutstandingData(); + + // Given the current time `now_ms`, expire chunks that have a limited + // lifetime. + void ExpireChunks(TimeMs now); + // Given that a message fragment, `item` has expired, expire all other + // fragments that share the same message - even never-before-sent fragments + // that are still in the SendQueue. + void ExpireAllFor(const RetransmissionQueue::TxData& item); + + // Returns the current congestion control algorithm phase. + CongestionAlgorithmPhase phase() const { + return (cwnd_ <= ssthresh_) + ? CongestionAlgorithmPhase::kSlowStart + : CongestionAlgorithmPhase::kCongestionAvoidance; + } + + const DcSctpOptions options_; + // If the peer supports RFC3758 - SCTP Partial Reliability Extension. + const bool partial_reliability_; + const std::string log_prefix_; + // The size of the data chunk (DATA/I-DATA) header that is used. + const size_t data_chunk_header_size_; + // Called when a new RTT measurement has been done + const std::function<void(DurationMs rtt)> on_new_rtt_; + // Called when a SACK has been seen that cleared the retransmission counter. + const std::function<void()> on_clear_retransmission_counter_; + // The retransmission counter. + Timer& t3_rtx_; + // Unwraps TSNs + UnwrappedTSN::Unwrapper tsn_unwrapper_; + + // Congestion Window. Number of bytes that may be in-flight (sent, not acked). + size_t cwnd_; + // Receive Window. Number of bytes available in the receiver's RX buffer. + size_t rwnd_; + // Slow Start Threshold. See RFC4960. + size_t ssthresh_; + // Partial Bytes Acked. See RFC4960. + size_t partial_bytes_acked_ = 0; + // If set, fast recovery is enabled until this TSN has been cumulative + // acked. + absl::optional<UnwrappedTSN> fast_recovery_exit_tsn_ = absl::nullopt; + // Indicates if the congestion algorithm is in fast retransmit. + bool is_in_fast_retransmit_ = false; + + // Next TSN to used. + UnwrappedTSN next_tsn_; + // The last cumulative TSN ack number + UnwrappedTSN last_cumulative_tsn_ack_; + // The send queue. + SendQueue& send_queue_; + // All the outstanding data chunks that are in-flight and that have not been + // cumulative acked. Note that it also contains chunks that have been acked in + // gap ack blocks. + std::map<UnwrappedTSN, TxData> outstanding_data_; + // Data chunks that are to be retransmitted. + std::set<UnwrappedTSN> to_be_retransmitted_; + // The number of bytes that are in-flight (sent but not yet acked or nacked). + size_t outstanding_bytes_ = 0; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_ diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc new file mode 100644 index 0000000000..e02b111b5a --- /dev/null +++ b/net/dcsctp/tx/retransmission_queue_test.cc @@ -0,0 +1,1007 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/retransmission_queue.h" + +#include <cstddef> +#include <cstdint> +#include <functional> +#include <memory> +#include <utility> +#include <vector> + +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/chunk/data_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/testing/data_generator.h" +#include "net/dcsctp/timer/fake_timeout.h" +#include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/mock_send_queue.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::MockFunction; +using State = ::dcsctp::RetransmissionQueue::State; +using ::testing::_; +using ::testing::ElementsAre; +using ::testing::IsEmpty; +using ::testing::NiceMock; +using ::testing::Pair; +using ::testing::Return; +using ::testing::SizeIs; +using ::testing::UnorderedElementsAre; + +constexpr uint32_t kArwnd = 100000; +constexpr uint32_t kMaxMtu = 1191; + +class RetransmissionQueueTest : public testing::Test { + protected: + RetransmissionQueueTest() + : gen_(MID(42)), + timeout_manager_([this]() { return now_; }), + timer_manager_([this]() { return timeout_manager_.CreateTimeout(); }), + timer_(timer_manager_.CreateTimer( + "test/t3_rtx", + []() { return absl::nullopt; }, + TimerOptions(DurationMs(0)))) {} + + std::function<SendQueue::DataToSend(TimeMs, size_t)> CreateChunk() { + return [this](TimeMs now, size_t max_size) { + return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + }; + } + + std::vector<TSN> GetSentPacketTSNs(RetransmissionQueue& queue) { + std::vector<TSN> tsns; + for (const auto& elem : queue.GetChunksToSend(now_, 10000)) { + tsns.push_back(elem.first); + } + return tsns; + } + + RetransmissionQueue CreateQueue(bool supports_partial_reliability = true, + bool use_message_interleaving = false) { + DcSctpOptions options; + options.mtu = kMaxMtu; + return RetransmissionQueue( + "", TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(), + on_clear_retransmission_counter_.AsStdFunction(), *timer_, options, + supports_partial_reliability, use_message_interleaving); + } + + DataGenerator gen_; + TimeMs now_ = TimeMs(0); + FakeTimeoutManager timeout_manager_; + TimerManager timer_manager_; + NiceMock<MockFunction<void(DurationMs rtt_ms)>> on_rtt_; + NiceMock<MockFunction<void()>> on_clear_retransmission_counter_; + NiceMock<MockSendQueue> producer_; + std::unique_ptr<Timer> timer_; +}; + +TEST_F(RetransmissionQueueTest, InitialAckedPrevTsn) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, SendOneChunk) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); + + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12))); + + queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, + {SackChunk::GapAckBlock(2, 3), + SackChunk::GapAckBlock(5, 5)}, + {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kNacked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kNacked), // + Pair(TSN(17), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + // Send more chunks, but leave some as gaps to force retransmission after + // three NACKs. + + // Send 18 + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18))); + + // Ack 12, 14-15, 17-18 + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, + {SackChunk::GapAckBlock(2, 3), + SackChunk::GapAckBlock(5, 6)}, + {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kNacked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kNacked), // + Pair(TSN(17), State::kAcked), // + Pair(TSN(18), State::kAcked))); + + // Send 19 + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19))); + + // Ack 12, 14-15, 17-19 + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, + {SackChunk::GapAckBlock(2, 3), + SackChunk::GapAckBlock(5, 7)}, + {})); + + // Send 20 + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20))); + + // Ack 12, 14-15, 17-20 + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, + {SackChunk::GapAckBlock(2, 3), + SackChunk::GapAckBlock(5, 8)}, + {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kToBeRetransmitted), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kToBeRetransmitted), // + Pair(TSN(17), State::kAcked), // + Pair(TSN(18), State::kAcked), // + Pair(TSN(19), State::kAcked), // + Pair(TSN(20), State::kAcked))); + + // This will trigger "fast retransmit" mode and only chunks 13 and 16 will be + // resent right now. The send queue will not even be queried. + EXPECT_CALL(producer_, Produce).Times(0); + + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13), TSN(16))); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kInFlight), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kInFlight), // + Pair(TSN(17), State::kAcked), // + Pair(TSN(18), State::kAcked), // + Pair(TSN(19), State::kAcked), // + Pair(TSN(20), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + }) + .WillOnce([this](TimeMs, size_t) { + return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _))); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + + // Will force chunks to be retransmitted + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + + std::vector<std::pair<TSN, Data>> chunks_to_rtx = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { + RetransmissionQueue queue = + CreateQueue(/*supports_partial_reliability=*/false); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + + // Will force chunks to be retransmitted + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(0); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); +} // namespace dcsctp + +TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + + // Will force chunks to be retransmitted + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(1); + + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned))); + + std::vector<std::pair<TSN, Data>> chunks_to_rtx = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_rtx, testing::IsEmpty()); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned))); +} + +TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + dts.max_retransmissions = 3; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(0); + + // Retransmission 1 + queue.HandleT3RtxTimerExpiry(); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); + + // Retransmission 2 + queue.HandleT3RtxTimerExpiry(); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); + + // Retransmission 3 + queue.HandleT3RtxTimerExpiry(); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); + + // Retransmission 4 - not allowed. + queue.HandleT3RtxTimerExpiry(); + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(1); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty()); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned))); +} + +TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { + RetransmissionQueue queue = CreateQueue(); + static constexpr size_t kCwnd = 1200; + queue.set_cwnd(kCwnd); + EXPECT_EQ(queue.cwnd(), kCwnd); + EXPECT_EQ(queue.outstanding_bytes(), 0u); + + std::vector<uint8_t> payload(1000); + EXPECT_CALL(producer_, Produce) + .WillOnce([this, payload](TimeMs, size_t) { + return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1500); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); + + // Will force chunks to be retransmitted + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + EXPECT_EQ(queue.outstanding_bytes(), 0u); + + std::vector<std::pair<TSN, Data>> chunks_to_rtx = + queue.GetChunksToSend(now_, 1500); + EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); +} + +TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + // Send and ack first chunk (TSN 10) + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), + Pair(TSN(12), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight))); + + // Chunk 10 is acked, but the remaining are lost + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kToBeRetransmitted), // + Pair(TSN(12), State::kToBeRetransmitted))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(true)); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + // NOTE: The TSN=13 represents the end fragment. + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned), // + Pair(TSN(13), State::kAbandoned))); + + ForwardTsnChunk forward_tsn = queue.CreateForwardTsn(); + EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13)); + EXPECT_THAT(forward_tsn.skipped_streams(), + UnorderedElementsAre( + ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42)))); +} + +TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "E")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + // Send and ack first chunk (TSN 10) + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), + Pair(TSN(12), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight))); + + // Chunk 10 is acked, but the remaining are lost + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kToBeRetransmitted), // + Pair(TSN(12), State::kToBeRetransmitted))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(false)); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + // NOTE: No additional TSN representing the end fragment, as that's TSN=12. + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned))); + + ForwardTsnChunk forward_tsn = queue.CreateForwardTsn(); + EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12)); + EXPECT_THAT(forward_tsn.skipped_streams(), + UnorderedElementsAre( + ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42)))); +} + +TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { + RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + DataGeneratorOptions opts; + opts.stream_id = StreamID(1); + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", opts)); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + DataGeneratorOptions opts; + opts.stream_id = StreamID(2); + SendQueue::DataToSend dts(gen_.Unordered({1, 2, 3, 4}, "B", opts)); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + DataGeneratorOptions opts; + opts.stream_id = StreamID(3); + SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "B", opts)); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + DataGeneratorOptions opts; + opts.stream_id = StreamID(4); + SendQueue::DataToSend dts(gen_.Ordered({13, 14, 15, 16}, "B", opts)); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), + Pair(TSN(12), _), Pair(TSN(13), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight), // + Pair(TSN(13), State::kInFlight))); + + // Chunk 13 is acked, but the remaining are lost + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(4, 4)}, {})); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kNacked), // + Pair(TSN(11), State::kNacked), // + Pair(TSN(12), State::kNacked), // + Pair(TSN(13), State::kAcked))); + + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted), // + Pair(TSN(11), State::kToBeRetransmitted), // + Pair(TSN(12), State::kToBeRetransmitted), // + Pair(TSN(13), State::kAcked))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(true)); + EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42))) + .WillOnce(Return(true)); + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42))) + .WillOnce(Return(true)); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned), // + Pair(TSN(13), State::kAcked), + // Representing end fragments of stream 1-3 + Pair(TSN(14), State::kAbandoned), // + Pair(TSN(15), State::kAbandoned), // + Pair(TSN(16), State::kAbandoned))); + + IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn(); + EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12)); + EXPECT_THAT( + forward_tsn1.skipped_streams(), + UnorderedElementsAre(IForwardTsnChunk::SkippedStream( + IsUnordered(false), StreamID(1), MID(42)), + IForwardTsnChunk::SkippedStream( + IsUnordered(true), StreamID(2), MID(42)), + IForwardTsnChunk::SkippedStream( + IsUnordered(false), StreamID(3), MID(42)))); + + // When TSN 13 is acked, the placeholder "end fragments" must be skipped as + // well. + + // A receiver is more likely to ack TSN 13, but do it incrementally. + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); + + EXPECT_CALL(producer_, Discard).Times(0); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {})); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(13), State::kAcked), // + Pair(TSN(14), State::kAbandoned), // + Pair(TSN(15), State::kAbandoned), // + Pair(TSN(16), State::kAbandoned))); + + IForwardTsnChunk forward_tsn2 = queue.CreateIForwardTsn(); + EXPECT_EQ(forward_tsn2.new_cumulative_tsn(), TSN(16)); + EXPECT_THAT( + forward_tsn2.skipped_streams(), + UnorderedElementsAre(IForwardTsnChunk::SkippedStream( + IsUnordered(false), StreamID(1), MID(42)), + IForwardTsnChunk::SkippedStream( + IsUnordered(true), StreamID(2), MID(42)), + IForwardTsnChunk::SkippedStream( + IsUnordered(false), StreamID(3), MID(42)))); +} + +TEST_F(RetransmissionQueueTest, MeasureRTT) { + RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + + now_ = now_ + DurationMs(123); + + EXPECT_CALL(on_rtt_, Call(DurationMs(123))).Times(1); + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); +} + +TEST_F(RetransmissionQueueTest, ValidateCumTsnAtRest) { + RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); + + EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {}))); + EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}))); +} + +TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) { + RetransmissionQueue queue = CreateQueue(); + + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(14), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(15), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(16), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(17), kArwnd, {}, {}))); + EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(18), kArwnd, {}, {}))); +} + +TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + // Ack 9, 20-25. This is an invalid SACK, but should still be handled. + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(11, 16)}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight), // + Pair(TSN(13), State::kInFlight), // + Pair(TSN(14), State::kInFlight), // + Pair(TSN(15), State::kInFlight), // + Pair(TSN(16), State::kInFlight), // + Pair(TSN(17), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, HandleInvalidGapAckBlocks) { + RetransmissionQueue queue = CreateQueue(); + + // Nothing produced - nothing in retransmission queue + + // Ack 9, 12-13 + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(3, 4)}, {})); + + // Gap ack blocks are just ignore. + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + // Ack 9, 10-14. This is actually an invalid ACK as the first gap can't be + // adjacent to the cum-tsn-ack, but it's not strictly forbidden. However, the + // cum-tsn-ack should not move, as the gap-ack-blocks are just advisory. + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(1, 5)}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kAcked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kInFlight), // + Pair(TSN(16), State::kInFlight), // + Pair(TSN(17), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { + RetransmissionQueue queue = CreateQueue(); + + // See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the + // magic numbers in this test. + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t size) { + EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize); + + std::vector<uint8_t> payload(183); + return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + }) + .WillOnce([this](TimeMs, size_t size) { + EXPECT_EQ(size, 976 - DataChunk::kHeaderSize); + + std::vector<uint8_t> payload(957); + return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + }); + + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1188 - 12); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _))); +} + +TEST_F(RetransmissionQueueTest, AccountsInflightAbandonedChunksAsOutstanding) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + // Send and ack first chunk (TSN 10) + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), + Pair(TSN(12), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight))); + EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u); + + // Discard the message while it was outstanding. + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(1); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned))); + EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u); + + // Now ACK those, one at a time. + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); + EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 2u); + + queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); + EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 1u); + + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); + EXPECT_EQ(queue.outstanding_bytes(), 0u); +} + +TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + // Send and ack first chunk (TSN 10) + std::vector<std::pair<TSN, Data>> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), + Pair(TSN(12), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight))); + EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u); + + // Mark the message as lost. + queue.HandleT3RtxTimerExpiry(); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(1); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned))); + EXPECT_EQ(queue.outstanding_bytes(), 0u); + + // Now ACK those, one at a time. + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); + EXPECT_EQ(queue.outstanding_bytes(), 0u); + + queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); + EXPECT_EQ(queue.outstanding_bytes(), 0u); + + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); + EXPECT_EQ(queue.outstanding_bytes(), 0u); +} + +} // namespace +} // namespace dcsctp diff --git a/net/dcsctp/tx/retransmission_timeout.cc b/net/dcsctp/tx/retransmission_timeout.cc new file mode 100644 index 0000000000..7d545a07d0 --- /dev/null +++ b/net/dcsctp/tx/retransmission_timeout.cc @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/retransmission_timeout.h" + +#include <cmath> +#include <cstdint> + +#include "net/dcsctp/public/dcsctp_options.h" + +namespace dcsctp { +namespace { +// https://tools.ietf.org/html/rfc4960#section-15 +constexpr double kRtoAlpha = 0.125; +constexpr double kRtoBeta = 0.25; +} // namespace + +RetransmissionTimeout::RetransmissionTimeout(const DcSctpOptions& options) + : min_rto_(*options.rto_min), + max_rto_(*options.rto_max), + max_rtt_(*options.rtt_max), + rto_(*options.rto_initial) {} + +void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) { + double rtt = *measured_rtt; + + // Unrealistic values will be skipped. If a wrongly measured (or otherwise + // corrupt) value was processed, it could change the state in a way that would + // take a very long time to recover. + if (rtt < 0.0 || rtt > max_rtt_) { + return; + } + + if (first_measurement_) { + // https://tools.ietf.org/html/rfc4960#section-6.3.1 + // "When the first RTT measurement R is made, set + // SRTT <- R, + // RTTVAR <- R/2, and + // RTO <- SRTT + 4 * RTTVAR." + srtt_ = rtt; + rttvar_ = rtt * 0.5; + rto_ = srtt_ + 4 * rttvar_; + first_measurement_ = false; + } else { + // https://tools.ietf.org/html/rfc4960#section-6.3.1 + // "When a new RTT measurement R' is made, set + // RTTVAR <- (1 - RTO.Beta) * RTTVAR + RTO.Beta * |SRTT - R'| + // SRTT <- (1 - RTO.Alpha) * SRTT + RTO.Alpha * R' + // RTO <- SRTT + 4 * RTTVAR." + rttvar_ = (1 - kRtoBeta) * rttvar_ + kRtoBeta * std::abs(srtt_ - rtt); + srtt_ = (1 - kRtoAlpha) * srtt_ + kRtoAlpha * rtt; + rto_ = srtt_ + 4 * rttvar_; + } + + // If the RTO becomes smaller or equal to RTT, expiration timers will be + // scheduled at the same time as packets are expected. Only happens in + // extremely stable RTTs, i.e. in simulations. + rto_ = std::fmax(rto_, rtt + 1); + + // Clamp RTO between min and max. + rto_ = std::fmin(std::fmax(rto_, min_rto_), max_rto_); +} +} // namespace dcsctp diff --git a/net/dcsctp/tx/retransmission_timeout.h b/net/dcsctp/tx/retransmission_timeout.h new file mode 100644 index 0000000000..0fac33e59c --- /dev/null +++ b/net/dcsctp/tx/retransmission_timeout.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_RETRANSMISSION_TIMEOUT_H_ +#define NET_DCSCTP_TX_RETRANSMISSION_TIMEOUT_H_ + +#include <cstdint> +#include <functional> + +#include "net/dcsctp/public/dcsctp_options.h" + +namespace dcsctp { + +// Manages updating of the Retransmission Timeout (RTO) SCTP variable, which is +// used directly as the base timeout for T3-RTX and for other timers, such as +// delayed ack. +// +// When a round-trip-time (RTT) is calculated (outside this class), `Observe` +// is called, which calculates the retransmission timeout (RTO) value. The RTO +// value will become larger if the RTT is high and/or the RTT values are varying +// a lot, which is an indicator of a bad connection. +class RetransmissionTimeout { + public: + explicit RetransmissionTimeout(const DcSctpOptions& options); + + // To be called when a RTT has been measured, to update the RTO value. + void ObserveRTT(DurationMs measured_rtt); + + // Returns the Retransmission Timeout (RTO) value, in milliseconds. + DurationMs rto() const { return DurationMs(rto_); } + + // Returns the smoothed RTT value, in milliseconds. + DurationMs srtt() const { return DurationMs(srtt_); } + + private: + // Note that all intermediate state calculation is done in the floating point + // domain, to maintain precision. + const double min_rto_; + const double max_rto_; + const double max_rtt_; + // If this is the first measurement + bool first_measurement_ = true; + // Smoothed Round-Trip Time + double srtt_ = 0.0; + // Round-Trip Time Variation + double rttvar_ = 0.0; + // Retransmission Timeout + double rto_; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_RETRANSMISSION_TIMEOUT_H_ diff --git a/net/dcsctp/tx/retransmission_timeout_test.cc b/net/dcsctp/tx/retransmission_timeout_test.cc new file mode 100644 index 0000000000..3b2e3399fe --- /dev/null +++ b/net/dcsctp/tx/retransmission_timeout_test.cc @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/retransmission_timeout.h" + +#include "net/dcsctp/public/dcsctp_options.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { + +constexpr DurationMs kMaxRtt = DurationMs(8'000); +constexpr DurationMs kInitialRto = DurationMs(200); +constexpr DurationMs kMaxRto = DurationMs(800); +constexpr DurationMs kMinRto = DurationMs(120); + +DcSctpOptions MakeOptions() { + DcSctpOptions options; + options.rtt_max = kMaxRtt; + options.rto_initial = kInitialRto; + options.rto_max = kMaxRto; + options.rto_min = kMinRto; + return options; +} + +TEST(RetransmissionTimeoutTest, HasValidInitialRto) { + RetransmissionTimeout rto_(MakeOptions()); + EXPECT_EQ(rto_.rto(), kInitialRto); +} + +TEST(RetransmissionTimeoutTest, NegativeValuesDoNotAffectRTO) { + RetransmissionTimeout rto_(MakeOptions()); + // Initial negative value + rto_.ObserveRTT(DurationMs(-10)); + EXPECT_EQ(rto_.rto(), kInitialRto); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 372); + // Subsequent negative value + rto_.ObserveRTT(DurationMs(-10)); + EXPECT_EQ(*rto_.rto(), 372); +} + +TEST(RetransmissionTimeoutTest, TooLargeValuesDoNotAffectRTO) { + RetransmissionTimeout rto_(MakeOptions()); + // Initial too large value + rto_.ObserveRTT(kMaxRtt + DurationMs(100)); + EXPECT_EQ(rto_.rto(), kInitialRto); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 372); + // Subsequent too large value + rto_.ObserveRTT(kMaxRtt + DurationMs(100)); + EXPECT_EQ(*rto_.rto(), 372); +} + +TEST(RetransmissionTimeoutTest, WillNeverGoBelowMinimumRto) { + RetransmissionTimeout rto_(MakeOptions()); + for (int i = 0; i < 1000; ++i) { + rto_.ObserveRTT(DurationMs(1)); + } + EXPECT_GE(rto_.rto(), kMinRto); +} + +TEST(RetransmissionTimeoutTest, WillNeverGoAboveMaximumRto) { + RetransmissionTimeout rto_(MakeOptions()); + for (int i = 0; i < 1000; ++i) { + rto_.ObserveRTT(kMaxRtt - DurationMs(1)); + // Adding jitter, which would make it RTO be well above RTT. + rto_.ObserveRTT(kMaxRtt - DurationMs(100)); + } + EXPECT_LE(rto_.rto(), kMaxRto); +} + +TEST(RetransmissionTimeoutTest, CalculatesRtoForStableRtt) { + RetransmissionTimeout rto_(MakeOptions()); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(DurationMs(128)); + EXPECT_EQ(*rto_.rto(), 314); + rto_.ObserveRTT(DurationMs(123)); + EXPECT_EQ(*rto_.rto(), 268); + rto_.ObserveRTT(DurationMs(125)); + EXPECT_EQ(*rto_.rto(), 233); + rto_.ObserveRTT(DurationMs(127)); + EXPECT_EQ(*rto_.rto(), 208); +} + +TEST(RetransmissionTimeoutTest, CalculatesRtoForUnstableRtt) { + RetransmissionTimeout rto_(MakeOptions()); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(DurationMs(402)); + EXPECT_EQ(*rto_.rto(), 622); + rto_.ObserveRTT(DurationMs(728)); + EXPECT_EQ(*rto_.rto(), 800); + rto_.ObserveRTT(DurationMs(89)); + EXPECT_EQ(*rto_.rto(), 800); + rto_.ObserveRTT(DurationMs(126)); + EXPECT_EQ(*rto_.rto(), 800); +} + +TEST(RetransmissionTimeoutTest, WillStabilizeAfterAWhile) { + RetransmissionTimeout rto_(MakeOptions()); + rto_.ObserveRTT(DurationMs(124)); + rto_.ObserveRTT(DurationMs(402)); + rto_.ObserveRTT(DurationMs(728)); + rto_.ObserveRTT(DurationMs(89)); + rto_.ObserveRTT(DurationMs(126)); + EXPECT_EQ(*rto_.rto(), 800); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 800); + rto_.ObserveRTT(DurationMs(122)); + EXPECT_EQ(*rto_.rto(), 709); + rto_.ObserveRTT(DurationMs(123)); + EXPECT_EQ(*rto_.rto(), 630); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 561); + rto_.ObserveRTT(DurationMs(122)); + EXPECT_EQ(*rto_.rto(), 504); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 453); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 409); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(DurationMs(124)); + EXPECT_EQ(*rto_.rto(), 339); +} + +TEST(RetransmissionTimeoutTest, WillAlwaysStayAboveRTT) { + // In simulations, it's quite common to have a very stable RTT, and having an + // RTO at the same value will cause issues as expiry timers will be scheduled + // to be expire exactly when a packet is supposed to arrive. The RTO must be + // larger than the RTT. In non-simulated environments, this is a non-issue as + // any jitter will increase the RTO. + RetransmissionTimeout rto_(MakeOptions()); + + for (int i = 0; i < 100; ++i) { + rto_.ObserveRTT(DurationMs(124)); + } + EXPECT_GT(*rto_.rto(), 124); +} + +} // namespace +} // namespace dcsctp diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc new file mode 100644 index 0000000000..4bfbaf718b --- /dev/null +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -0,0 +1,432 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/rr_send_queue.h" + +#include <cstdint> +#include <deque> +#include <map> +#include <utility> +#include <vector> + +#include "absl/algorithm/container.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/logging.h" + +namespace dcsctp { + +bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) { + while (!items_.empty()) { + RRSendQueue::OutgoingStream::Item& item = items_.front(); + if (item.message_id.has_value()) { + // Already partially sent messages can always continue to be sent. + return true; + } + + // Message has expired. Remove it and inspect the next one. + if (item.expires_at.has_value() && *item.expires_at <= now) { + buffered_amount_.Decrease(item.remaining_size); + total_buffered_amount_.Decrease(item.remaining_size); + items_.pop_front(); + RTC_DCHECK(IsConsistent()); + continue; + } + + if (is_paused_) { + // The stream has paused (and there is no partially sent message). + return false; + } + return true; + } + return false; +} + +bool RRSendQueue::IsConsistent() const { + size_t total_buffered_amount = 0; + for (const auto& stream_entry : streams_) { + total_buffered_amount += stream_entry.second.buffered_amount().value(); + } + + if (previous_message_has_ended_) { + auto it = streams_.find(current_stream_id_); + if (it != streams_.end() && it->second.has_partially_sent_message()) { + RTC_DLOG(LS_ERROR) + << "Previous message has ended, but still partial message in stream"; + return false; + } + } else { + auto it = streams_.find(current_stream_id_); + if (it == streams_.end() || !it->second.has_partially_sent_message()) { + RTC_DLOG(LS_ERROR) + << "Previous message has NOT ended, but there is no partial message"; + return false; + } + } + + return total_buffered_amount == total_buffered_amount_.value(); +} + +bool RRSendQueue::OutgoingStream::IsConsistent() const { + size_t bytes = 0; + for (const auto& item : items_) { + bytes += item.remaining_size; + } + return bytes == buffered_amount_.value(); +} + +void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) { + RTC_DCHECK(bytes <= value_); + size_t old_value = value_; + value_ -= bytes; + + if (old_value > low_threshold_ && value_ <= low_threshold_) { + on_threshold_reached_(); + } +} + +void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) { + // Betting on https://github.com/w3c/webrtc-pc/issues/2654 being accepted. + if (low_threshold_ < value_ && low_threshold >= value_) { + on_threshold_reached_(); + } + low_threshold_ = low_threshold; +} + +void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, + absl::optional<TimeMs> expires_at, + const SendOptions& send_options) { + buffered_amount_.Increase(message.payload().size()); + total_buffered_amount_.Increase(message.payload().size()); + items_.emplace_back(std::move(message), expires_at, send_options); + + RTC_DCHECK(IsConsistent()); +} + +absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce( + TimeMs now, + size_t max_size) { + RTC_DCHECK(!items_.empty()); + + Item* item = &items_.front(); + DcSctpMessage& message = item->message; + + if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) { + RTC_DCHECK(IsConsistent()); + return absl::nullopt; + } + + // Allocate Message ID and SSN when the first fragment is sent. + if (!item->message_id.has_value()) { + MID& mid = + item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_; + item->message_id = mid; + mid = MID(*mid + 1); + } + if (!item->send_options.unordered && !item->ssn.has_value()) { + item->ssn = next_ssn_; + next_ssn_ = SSN(*next_ssn_ + 1); + } + + // Grab the next `max_size` fragment from this message and calculate flags. + rtc::ArrayView<const uint8_t> chunk_payload = + item->message.payload().subview(item->remaining_offset, max_size); + rtc::ArrayView<const uint8_t> message_payload = message.payload(); + Data::IsBeginning is_beginning(chunk_payload.data() == + message_payload.data()); + Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) == + (message_payload.data() + message_payload.size())); + + StreamID stream_id = message.stream_id(); + PPID ppid = message.ppid(); + + // Zero-copy the payload if the message fits in a single chunk. + std::vector<uint8_t> payload = + is_beginning && is_end + ? std::move(message).ReleasePayload() + : std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end()); + + FSN fsn(item->current_fsn); + item->current_fsn = FSN(*item->current_fsn + 1); + buffered_amount_.Decrease(payload.size()); + total_buffered_amount_.Decrease(payload.size()); + + SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)), + item->message_id.value(), fsn, ppid, + std::move(payload), is_beginning, is_end, + item->send_options.unordered)); + chunk.max_retransmissions = item->send_options.max_retransmissions; + chunk.expires_at = item->expires_at; + + if (is_end) { + // The entire message has been sent, and its last data copied to `chunk`, so + // it can safely be discarded. + items_.pop_front(); + } else { + item->remaining_offset += chunk_payload.size(); + item->remaining_size -= chunk_payload.size(); + RTC_DCHECK(item->remaining_offset + item->remaining_size == + item->message.payload().size()); + RTC_DCHECK(item->remaining_size > 0); + } + RTC_DCHECK(IsConsistent()); + return chunk; +} + +bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, + MID message_id) { + bool result = false; + if (!items_.empty()) { + Item& item = items_.front(); + if (item.send_options.unordered == unordered && + item.message_id.has_value() && *item.message_id == message_id) { + buffered_amount_.Decrease(item.remaining_size); + total_buffered_amount_.Decrease(item.remaining_size); + items_.pop_front(); + // As the item still existed, it had unsent data. + result = true; + } + } + RTC_DCHECK(IsConsistent()); + return result; +} + +void RRSendQueue::OutgoingStream::Pause() { + is_paused_ = true; + + // A stream is paused when it's about to be reset. In this implementation, + // it will throw away all non-partially send messages. This is subject to + // change. It will however not discard any partially sent messages - only + // whole messages. Partially delivered messages (at the time of receiving a + // Stream Reset command) will always deliver all the fragments before + // actually resetting the stream. + for (auto it = items_.begin(); it != items_.end();) { + if (it->remaining_offset == 0) { + buffered_amount_.Decrease(it->remaining_size); + total_buffered_amount_.Decrease(it->remaining_size); + it = items_.erase(it); + } else { + ++it; + } + } + RTC_DCHECK(IsConsistent()); +} + +void RRSendQueue::OutgoingStream::Reset() { + if (!items_.empty()) { + // If this message has been partially sent, reset it so that it will be + // re-sent. + auto& item = items_.front(); + buffered_amount_.Increase(item.message.payload().size() - + item.remaining_size); + total_buffered_amount_.Increase(item.message.payload().size() - + item.remaining_size); + item.remaining_offset = 0; + item.remaining_size = item.message.payload().size(); + item.message_id = absl::nullopt; + item.ssn = absl::nullopt; + item.current_fsn = FSN(0); + } + is_paused_ = false; + next_ordered_mid_ = MID(0); + next_unordered_mid_ = MID(0); + next_ssn_ = SSN(0); + RTC_DCHECK(IsConsistent()); +} + +bool RRSendQueue::OutgoingStream::has_partially_sent_message() const { + if (items_.empty()) { + return false; + } + return items_.front().message_id.has_value(); +} + +void RRSendQueue::Add(TimeMs now, + DcSctpMessage message, + const SendOptions& send_options) { + RTC_DCHECK(!message.payload().empty()); + // Any limited lifetime should start counting from now - when the message + // has been added to the queue. + absl::optional<TimeMs> expires_at = absl::nullopt; + if (send_options.lifetime.has_value()) { + // `expires_at` is the time when it expires. Which is slightly larger than + // the message's lifetime, as the message is alive during its entire + // lifetime (which may be zero). + expires_at = now + *send_options.lifetime + DurationMs(1); + } + GetOrCreateStreamInfo(message.stream_id()) + .Add(std::move(message), expires_at, send_options); + RTC_DCHECK(IsConsistent()); +} + +bool RRSendQueue::IsFull() const { + return total_buffered_amount() >= buffer_size_; +} + +bool RRSendQueue::IsEmpty() const { + return total_buffered_amount() == 0; +} + +std::map<StreamID, RRSendQueue::OutgoingStream>::iterator +RRSendQueue::GetNextStream(TimeMs now) { + auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1)); + + for (auto it = start_it; it != streams_.end(); ++it) { + if (it->second.HasDataToSend(now)) { + current_stream_id_ = it->first; + return it; + } + } + + for (auto it = streams_.begin(); it != start_it; ++it) { + if (it->second.HasDataToSend(now)) { + current_stream_id_ = it->first; + return it; + } + } + return streams_.end(); +} + +absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now, + size_t max_size) { + std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it; + + if (previous_message_has_ended_) { + // Previous message has ended. Round-robin to a different stream, if there + // even is one with data to send. + stream_it = GetNextStream(now); + if (stream_it == streams_.end()) { + RTC_DLOG(LS_VERBOSE) + << log_prefix_ + << "There is no stream with data; Can't produce any data."; + return absl::nullopt; + } + } else { + // The previous message has not ended; Continue from the current stream. + stream_it = streams_.find(current_stream_id_); + RTC_DCHECK(stream_it != streams_.end()); + } + + absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size); + if (data.has_value()) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type=" + << (data->data.is_unordered ? "unordered" : "ordered") + << "::" + << (*data->data.is_beginning && *data->data.is_end + ? "complete" + : *data->data.is_beginning + ? "first" + : *data->data.is_end ? "last" : "middle") + << ", stream_id=" << *stream_it->first + << ", ppid=" << *data->data.ppid + << ", length=" << data->data.payload.size(); + + previous_message_has_ended_ = *data->data.is_end; + } + + RTC_DCHECK(IsConsistent()); + return data; +} + +bool RRSendQueue::Discard(IsUnordered unordered, + StreamID stream_id, + MID message_id) { + bool has_discarded = + GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); + if (has_discarded) { + // Only partially sent messages are discarded, so if a message was + // discarded, then it was the currently sent message. + previous_message_has_ended_ = true; + } + + return has_discarded; +} + +void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) { + for (StreamID stream_id : streams) { + GetOrCreateStreamInfo(stream_id).Pause(); + } + RTC_DCHECK(IsConsistent()); +} + +bool RRSendQueue::CanResetStreams() const { + // Streams can be reset if those streams that are paused don't have any + // messages that are partially sent. + for (auto& stream : streams_) { + if (stream.second.is_paused() && + stream.second.has_partially_sent_message()) { + return false; + } + } + return true; +} + +void RRSendQueue::CommitResetStreams() { + Reset(); + RTC_DCHECK(IsConsistent()); +} + +void RRSendQueue::RollbackResetStreams() { + for (auto& stream_entry : streams_) { + stream_entry.second.Resume(); + } + RTC_DCHECK(IsConsistent()); +} + +void RRSendQueue::Reset() { + // Recalculate buffered amount, as partially sent messages may have been put + // fully back in the queue. + for (auto& stream_entry : streams_) { + OutgoingStream& stream = stream_entry.second; + stream.Reset(); + } + previous_message_has_ended_ = true; +} + +size_t RRSendQueue::buffered_amount(StreamID stream_id) const { + auto it = streams_.find(stream_id); + if (it == streams_.end()) { + return 0; + } + return it->second.buffered_amount().value(); +} + +size_t RRSendQueue::buffered_amount_low_threshold(StreamID stream_id) const { + auto it = streams_.find(stream_id); + if (it == streams_.end()) { + return 0; + } + return it->second.buffered_amount().low_threshold(); +} + +void RRSendQueue::SetBufferedAmountLowThreshold(StreamID stream_id, + size_t bytes) { + GetOrCreateStreamInfo(stream_id).buffered_amount().SetLowThreshold(bytes); +} + +RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( + StreamID stream_id) { + auto it = streams_.find(stream_id); + if (it != streams_.end()) { + return it->second; + } + + return streams_ + .emplace(stream_id, + OutgoingStream( + [this, stream_id]() { on_buffered_amount_low_(stream_id); }, + total_buffered_amount_)) + .first->second; +} +} // namespace dcsctp diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h new file mode 100644 index 0000000000..3ec45af17d --- /dev/null +++ b/net/dcsctp/tx/rr_send_queue.h @@ -0,0 +1,238 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_ +#define NET_DCSCTP_TX_RR_SEND_QUEUE_H_ + +#include <cstdint> +#include <deque> +#include <map> +#include <string> +#include <utility> + +#include "absl/algorithm/container.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/pair_hash.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" + +namespace dcsctp { + +// The Round Robin SendQueue holds all messages that the client wants to send, +// but that haven't yet been split into chunks and fully sent on the wire. +// +// As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2, +// it will cycle to send messages from different streams. It will send all +// fragments from one message before continuing with a different message on +// possibly a different stream, until support for message interleaving has been +// implemented. +// +// As messages can be (requested to be) sent before the connection is properly +// established, this send queue is always present - even for closed connections. +class RRSendQueue : public SendQueue { + public: + // How small a data chunk's payload may be, if having to fragment a message. + static constexpr size_t kMinimumFragmentedPayload = 10; + + RRSendQueue(absl::string_view log_prefix, + size_t buffer_size, + std::function<void(StreamID)> on_buffered_amount_low, + size_t total_buffered_amount_low_threshold, + std::function<void()> on_total_buffered_amount_low) + : log_prefix_(std::string(log_prefix) + "fcfs: "), + buffer_size_(buffer_size), + on_buffered_amount_low_(std::move(on_buffered_amount_low)), + total_buffered_amount_(std::move(on_total_buffered_amount_low)) { + total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold); + } + + // Indicates if the buffer is full. Note that it's up to the caller to ensure + // that the buffer is not full prior to adding new items to it. + bool IsFull() const; + // Indicates if the buffer is empty. + bool IsEmpty() const; + + // Adds the message to be sent using the `send_options` provided. The current + // time should be in `now`. Note that it's the responsibility of the caller to + // ensure that the buffer is not full (by calling `IsFull`) before adding + // messages to it. + void Add(TimeMs now, + DcSctpMessage message, + const SendOptions& send_options = {}); + + // Implementation of `SendQueue`. + absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override; + bool Discard(IsUnordered unordered, + StreamID stream_id, + MID message_id) override; + void PrepareResetStreams(rtc::ArrayView<const StreamID> streams) override; + bool CanResetStreams() const override; + void CommitResetStreams() override; + void RollbackResetStreams() override; + void Reset() override; + size_t buffered_amount(StreamID stream_id) const override; + size_t total_buffered_amount() const override { + return total_buffered_amount_.value(); + } + size_t buffered_amount_low_threshold(StreamID stream_id) const override; + void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override; + + private: + // Represents a value and a "low threshold" that when the value reaches or + // goes under the "low threshold", will trigger `on_threshold_reached` + // callback. + class ThresholdWatcher { + public: + explicit ThresholdWatcher(std::function<void()> on_threshold_reached) + : on_threshold_reached_(std::move(on_threshold_reached)) {} + // Increases the value. + void Increase(size_t bytes) { value_ += bytes; } + // Decreases the value and triggers `on_threshold_reached` if it's at or + // below `low_threshold()`. + void Decrease(size_t bytes); + + size_t value() const { return value_; } + size_t low_threshold() const { return low_threshold_; } + void SetLowThreshold(size_t low_threshold); + + private: + const std::function<void()> on_threshold_reached_; + size_t value_ = 0; + size_t low_threshold_ = 0; + }; + + // Per-stream information. + class OutgoingStream { + public: + explicit OutgoingStream(std::function<void()> on_buffered_amount_low, + ThresholdWatcher& total_buffered_amount) + : buffered_amount_(std::move(on_buffered_amount_low)), + total_buffered_amount_(total_buffered_amount) {} + + // Enqueues a message to this stream. + void Add(DcSctpMessage message, + absl::optional<TimeMs> expires_at, + const SendOptions& send_options); + + // Possibly produces a data chunk to send. + absl::optional<DataToSend> Produce(TimeMs now, size_t max_size); + + const ThresholdWatcher& buffered_amount() const { return buffered_amount_; } + ThresholdWatcher& buffered_amount() { return buffered_amount_; } + + // Discards a partially sent message, see `SendQueue::Discard`. + bool Discard(IsUnordered unordered, MID message_id); + + // Pauses this stream, which is used before resetting it. + void Pause(); + + // Resumes a paused stream. + void Resume() { is_paused_ = false; } + + bool is_paused() const { return is_paused_; } + + // Resets this stream, meaning MIDs and SSNs are set to zero. + void Reset(); + + // Indicates if this stream has a partially sent message in it. + bool has_partially_sent_message() const; + + // Indicates if the stream has data to send. It will also try to remove any + // expired non-partially sent message. + bool HasDataToSend(TimeMs now); + + private: + // An enqueued message and metadata. + struct Item { + explicit Item(DcSctpMessage msg, + absl::optional<TimeMs> expires_at, + const SendOptions& send_options) + : message(std::move(msg)), + expires_at(expires_at), + send_options(send_options), + remaining_offset(0), + remaining_size(message.payload().size()) {} + DcSctpMessage message; + absl::optional<TimeMs> expires_at; + SendOptions send_options; + // The remaining payload (offset and size) to be sent, when it has been + // fragmented. + size_t remaining_offset; + size_t remaining_size; + // If set, an allocated Message ID and SSN. Will be allocated when the + // first fragment is sent. + absl::optional<MID> message_id = absl::nullopt; + absl::optional<SSN> ssn = absl::nullopt; + // The current Fragment Sequence Number, incremented for each fragment. + FSN current_fsn = FSN(0); + }; + + bool IsConsistent() const; + + // Streams are pause when they are about to be reset. + bool is_paused_ = false; + // MIDs are different for unordered and ordered messages sent on a stream. + MID next_unordered_mid_ = MID(0); + MID next_ordered_mid_ = MID(0); + + SSN next_ssn_ = SSN(0); + // Enqueued messages, and metadata. + std::deque<Item> items_; + + // The current amount of buffered data. + ThresholdWatcher buffered_amount_; + + // Reference to the total buffered amount, which is updated directly by each + // stream. + ThresholdWatcher& total_buffered_amount_; + }; + + bool IsConsistent() const; + OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id); + absl::optional<DataToSend> Produce( + std::map<StreamID, OutgoingStream>::iterator it, + TimeMs now, + size_t max_size); + + // Return the next stream, in round-robin fashion. + std::map<StreamID, OutgoingStream>::iterator GetNextStream(TimeMs now); + + const std::string log_prefix_; + const size_t buffer_size_; + + // Called when the buffered amount is below what has been set using + // `SetBufferedAmountLowThreshold`. + const std::function<void(StreamID)> on_buffered_amount_low_; + + // Called when the total buffered amount is below what has been set using + // `SetTotalBufferedAmountLowThreshold`. + const std::function<void()> on_total_buffered_amount_low_; + + // The total amount of buffer data, for all streams. + ThresholdWatcher total_buffered_amount_; + + // Indicates if the previous fragment sent was the end of a message. For + // non-interleaved sending, this means that the next message may come from a + // different stream. If not true, the next fragment must be produced from the + // same stream as last time. + bool previous_message_has_ended_ = true; + + // The current stream to send chunks from. Modified by `GetNextStream`. + StreamID current_stream_id_ = StreamID(0); + + // All streams, and messages added to those. + std::map<StreamID, OutgoingStream> streams_; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_ diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc new file mode 100644 index 0000000000..682c16af0b --- /dev/null +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -0,0 +1,742 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/rr_send_queue.h" + +#include <cstdint> +#include <type_traits> +#include <vector> + +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/testing/testing_macros.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::SizeIs; + +constexpr TimeMs kNow = TimeMs(0); +constexpr StreamID kStreamID(1); +constexpr PPID kPPID(53); +constexpr size_t kMaxQueueSize = 1000; +constexpr size_t kBufferedAmountLowThreshold = 500; +constexpr size_t kOneFragmentPacketSize = 100; +constexpr size_t kTwoFragmentPacketSize = 101; + +class RRSendQueueTest : public testing::Test { + protected: + RRSendQueueTest() + : buf_("log: ", + kMaxQueueSize, + on_buffered_amount_low_.AsStdFunction(), + kBufferedAmountLowThreshold, + on_total_buffered_amount_low_.AsStdFunction()) {} + + const DcSctpOptions options_; + testing::NiceMock<testing::MockFunction<void(StreamID)>> + on_buffered_amount_low_; + testing::NiceMock<testing::MockFunction<void()>> + on_total_buffered_amount_low_; + RRSendQueue buf_; +}; + +TEST_F(RRSendQueueTest, EmptyBuffer) { + EXPECT_TRUE(buf_.IsEmpty()); + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + EXPECT_FALSE(buf_.IsFull()); +} + +TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); + + EXPECT_FALSE(buf_.IsEmpty()); + EXPECT_FALSE(buf_.IsFull()); + absl::optional<SendQueue::DataToSend> chunk_opt = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_opt.has_value()); + EXPECT_TRUE(chunk_opt->data.is_beginning); + EXPECT_TRUE(chunk_opt->data.is_end); +} + +TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) { + std::vector<uint8_t> payload(60); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + absl::optional<SendQueue::DataToSend> chunk_beg = + buf_.Produce(kNow, /*max_size=*/20); + ASSERT_TRUE(chunk_beg.has_value()); + EXPECT_TRUE(chunk_beg->data.is_beginning); + EXPECT_FALSE(chunk_beg->data.is_end); + + absl::optional<SendQueue::DataToSend> chunk_mid = + buf_.Produce(kNow, /*max_size=*/20); + ASSERT_TRUE(chunk_mid.has_value()); + EXPECT_FALSE(chunk_mid->data.is_beginning); + EXPECT_FALSE(chunk_mid->data.is_end); + + absl::optional<SendQueue::DataToSend> chunk_end = + buf_.Produce(kNow, /*max_size=*/20); + ASSERT_TRUE(chunk_end.has_value()); + EXPECT_FALSE(chunk_end->data.is_beginning); + EXPECT_TRUE(chunk_end->data.is_end); + + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); +} + +TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { + std::vector<uint8_t> payload(60); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); + + absl::optional<SendQueue::DataToSend> chunk_one = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(chunk_one->data.ppid, kPPID); + EXPECT_TRUE(chunk_one->data.is_beginning); + EXPECT_TRUE(chunk_one->data.is_end); + + absl::optional<SendQueue::DataToSend> chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); + EXPECT_EQ(chunk_two->data.ppid, PPID(54)); + EXPECT_TRUE(chunk_two->data.is_beginning); + EXPECT_TRUE(chunk_two->data.is_end); +} + +TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { + std::vector<uint8_t> payload(600); + EXPECT_FALSE(buf_.IsFull()); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_FALSE(buf_.IsFull()); + buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); + EXPECT_TRUE(buf_.IsFull()); + // However, it's still possible to add messages. It's a soft limit, and it + // might be necessary to forcefully add messages due to e.g. external + // fragmentation. + buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); + EXPECT_TRUE(buf_.IsFull()); + + absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(chunk_one->data.ppid, kPPID); + + EXPECT_TRUE(buf_.IsFull()); + + absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); + EXPECT_EQ(chunk_two->data.ppid, PPID(54)); + + EXPECT_FALSE(buf_.IsFull()); + EXPECT_FALSE(buf_.IsEmpty()); + + absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000); + ASSERT_TRUE(chunk_three.has_value()); + EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); + EXPECT_EQ(chunk_three->data.ppid, PPID(55)); + + EXPECT_FALSE(buf_.IsFull()); + EXPECT_TRUE(buf_.IsEmpty()); +} + +TEST_F(RRSendQueueTest, WillNotSendTooSmallPacket) { + std::vector<uint8_t> payload(RRSendQueue::kMinimumFragmentedPayload + 1); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + // Wouldn't fit enough payload (wouldn't want to fragment) + EXPECT_FALSE( + buf_.Produce(kNow, + /*max_size=*/RRSendQueue::kMinimumFragmentedPayload - 1) + .has_value()); + + // Minimum fragment + absl::optional<SendQueue::DataToSend> chunk_one = + buf_.Produce(kNow, + /*max_size=*/RRSendQueue::kMinimumFragmentedPayload); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(chunk_one->data.ppid, kPPID); + + // There is only one byte remaining - it can be fetched as it doesn't require + // additional fragmentation. + absl::optional<SendQueue::DataToSend> chunk_two = + buf_.Produce(kNow, /*max_size=*/1); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.stream_id, kStreamID); + EXPECT_EQ(chunk_two->data.ppid, kPPID); + + EXPECT_TRUE(buf_.IsEmpty()); +} + +TEST_F(RRSendQueueTest, DefaultsToOrderedSend) { + std::vector<uint8_t> payload(20); + + // Default is ordered + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + absl::optional<SendQueue::DataToSend> chunk_one = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_FALSE(chunk_one->data.is_unordered); + + // Explicitly unordered. + SendOptions opts; + opts.unordered = IsUnordered(true); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts); + absl::optional<SendQueue::DataToSend> chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_TRUE(chunk_two->data.is_unordered); +} + +TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { + std::vector<uint8_t> payload(20); + + // Default is no expiry + TimeMs now = kNow; + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload)); + now += DurationMs(1000000); + ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); + + SendOptions expires_2_seconds; + expires_2_seconds.lifetime = DurationMs(2000); + + // Add and consume within lifetime + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + now += DurationMs(2000); + ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); + + // Add and consume just outside lifetime + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + now += DurationMs(2001); + ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); + + // A long time after expiry + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + now += DurationMs(1000000); + ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); + + // Expire one message, but produce the second that is not expired. + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + + SendOptions expires_4_seconds; + expires_4_seconds.lifetime = DurationMs(4000); + + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds); + now += DurationMs(2001); + + ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); + ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); +} + +TEST_F(RRSendQueueTest, DiscardPartialPackets) { + std::vector<uint8_t> payload(120); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload)); + + absl::optional<SendQueue::DataToSend> chunk_one = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_FALSE(chunk_one->data.is_end); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, + chunk_one->data.message_id); + + absl::optional<SendQueue::DataToSend> chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_FALSE(chunk_two->data.is_end); + EXPECT_EQ(chunk_two->data.stream_id, StreamID(2)); + + absl::optional<SendQueue::DataToSend> chunk_three = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_three.has_value()); + EXPECT_TRUE(chunk_three->data.is_end); + EXPECT_EQ(chunk_three->data.stream_id, StreamID(2)); + ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize)); + + // Calling it again shouldn't cause issues. + buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, + chunk_one->data.message_id); + ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize)); +} + +TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) { + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3})); + buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5})); + EXPECT_EQ(buf_.total_buffered_amount(), 8u); + + buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)})); + EXPECT_EQ(buf_.total_buffered_amount(), 5u); + buf_.CommitResetStreams(); + buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(2)})); + EXPECT_EQ(buf_.total_buffered_amount(), 0u); +} + +TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) { + std::vector<uint8_t> payload(120); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50); + + StreamID stream_ids[] = {StreamID(1)}; + buf_.PrepareResetStreams(stream_ids); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50); +} + +TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { + std::vector<uint8_t> payload(50); + + buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)})); + EXPECT_EQ(buf_.total_buffered_amount(), 0u); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + buf_.CommitResetStreams(); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + + absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(buf_.total_buffered_amount(), 0u); +} + +TEST_F(RRSendQueueTest, CommittingResetsSSN) { + std::vector<uint8_t> payload(50); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + absl::optional<SendQueue::DataToSend> chunk_one = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.ssn, SSN(0)); + + absl::optional<SendQueue::DataToSend> chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.ssn, SSN(1)); + + StreamID stream_ids[] = {StreamID(1)}; + buf_.PrepareResetStreams(stream_ids); + + // Buffered + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + EXPECT_TRUE(buf_.CanResetStreams()); + buf_.CommitResetStreams(); + + absl::optional<SendQueue::DataToSend> chunk_three = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_three.has_value()); + EXPECT_EQ(chunk_three->data.ssn, SSN(0)); +} + +TEST_F(RRSendQueueTest, RollBackResumesSSN) { + std::vector<uint8_t> payload(50); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + absl::optional<SendQueue::DataToSend> chunk_one = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.ssn, SSN(0)); + + absl::optional<SendQueue::DataToSend> chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.ssn, SSN(1)); + + buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)})); + + // Buffered + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + EXPECT_TRUE(buf_.CanResetStreams()); + buf_.RollbackResetStreams(); + + absl::optional<SendQueue::DataToSend> chunk_three = + buf_.Produce(kNow, kOneFragmentPacketSize); + ASSERT_TRUE(chunk_three.has_value()); + EXPECT_EQ(chunk_three->data.ssn, SSN(2)); +} + +TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) { + std::vector<uint8_t> payload(200); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(2)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk4.data.stream_id, StreamID(2)); +} + +TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) { + std::vector<uint8_t> payload(kTwoFragmentPacketSize); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, + SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(2)); + EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk4.data.stream_id, StreamID(2)); + EXPECT_THAT(chunk4.data.payload, + SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize)); +} + +TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) { + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1))); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(2))); + buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(3))); + buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(4))); + buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(5))); + buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(6))); + buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(7))); + buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(8))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(1)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(2)); + EXPECT_THAT(chunk2.data.payload, SizeIs(3)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(3)); + EXPECT_THAT(chunk3.data.payload, SizeIs(5)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk4.data.stream_id, StreamID(4)); + EXPECT_THAT(chunk4.data.payload, SizeIs(7)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk5.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk5.data.payload, SizeIs(2)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk6.data.stream_id, StreamID(2)); + EXPECT_THAT(chunk6.data.payload, SizeIs(4)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk7.data.stream_id, StreamID(3)); + EXPECT_THAT(chunk7.data.payload, SizeIs(6)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk8.data.stream_id, StreamID(4)); + EXPECT_THAT(chunk8.data.payload, SizeIs(8)); +} + +TEST_F(RRSendQueueTest, DoesntTriggerOnBufferedAmountLowWhenSetToZero) { + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 0u); +} + +TEST_F(RRSendQueueTest, TriggersOnBufferedAmountAtZeroLowWhenSent) { + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u); + + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(1)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u); +} + +TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowIfAddingMore) { + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1))); + + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(1)); + + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u); + + // Should now trigger again, as buffer_amount went above the threshold. + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(1)); +} + +TEST_F(RRSendQueueTest, OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual) { + buf_.SetBufferedAmountLowThreshold(StreamID(1), 1000); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(10))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 10u); + + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(10)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(20))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 20u); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(20)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u); +} + +TEST_F(RRSendQueueTest, WillTriggerOnBufferedAmountLowSetAboveZero) { + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + buf_.SetBufferedAmountLowThreshold(StreamID(1), 700); + + std::vector<uint8_t> payload(1000); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 900u); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u); + + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 700u); + + // Doesn't trigger when reducing even further. + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u); +} + +TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowSetAboveZero) { + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + buf_.SetBufferedAmountLowThreshold(StreamID(1), 700); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1000))); + + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, 400)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(400)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u); + + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(200))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u); + + // Will trigger again, as it went above the limit. + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, 200)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(200)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u); +} + +TEST_F(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) { + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(100))); + + // Modifying the threshold, still under buffered_amount, should not trigger. + buf_.SetBufferedAmountLowThreshold(StreamID(1), 50); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 99); + + // When the threshold reaches buffered_amount, it will trigger. + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 100); + + // But not when it's set low again. + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 50); + + // But it will trigger when it overshoots. + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 150); + + // But not when it's set low again. + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 0); +} + +TEST_F(RRSendQueueTest, + OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp) { + EXPECT_CALL(on_total_buffered_amount_low_, Call).Times(0); + std::vector<uint8_t> payload(kBufferedAmountLowThreshold - 1); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + + // Will not trigger if going above but never below. + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, + std::vector<uint8_t>(kOneFragmentPacketSize))); +} + +TEST_F(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) { + EXPECT_CALL(on_total_buffered_amount_low_, Call).Times(0); + std::vector<uint8_t> payload(kBufferedAmountLowThreshold); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + + // Reaches it. + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, std::vector<uint8_t>(1))); + + // Drain it a bit - will trigger. + EXPECT_CALL(on_total_buffered_amount_low_, Call).Times(1); + absl::optional<SendQueue::DataToSend> chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); +} + +TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) { + buf_.Add(kNow, DcSctpMessage(StreamID(5), kPPID, std::vector<uint8_t>(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(5)); + EXPECT_THAT(chunk1.data.payload, SizeIs(1)); + + // Next, it should pick a different stream. + + buf_.Add(kNow, + DcSctpMessage(StreamID(1), kPPID, + std::vector<uint8_t>(kOneFragmentPacketSize * 2))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize)); + + // It should still stay on the Stream1 now, even if might be tempted to switch + // to this stream, as it's the stream following 5. + buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector<uint8_t>(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); + + // After stream id 1 is complete, it's time to do stream 6. + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk4.data.stream_id, StreamID(6)); + EXPECT_THAT(chunk4.data.payload, SizeIs(1)); + + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); +} + +TEST_F(RRSendQueueTest, WillStayInStreamWhenOnlySmallFragmentRemaining) { + buf_.Add(kNow, + DcSctpMessage(StreamID(5), kPPID, + std::vector<uint8_t>(kOneFragmentPacketSize * 2))); + buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector<uint8_t>(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(5)); + EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize)); + + // Now assume that there will be a lot of previous chunks that need to be + // retransmitted, which fills up the next packet and there is little space + // left in the packet for new chunks. What it should NOT do right now is to + // try to send a message from StreamID 6. And it should not try to send a very + // small fragment from StreamID 5 either. So just skip this one. + EXPECT_FALSE(buf_.Produce(kNow, 8).has_value()); + + // When the next produce request comes with a large buffer to fill, continue + // sending from StreamID 5. + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(5)); + EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize)); + + // Lastly, produce a message on StreamID 6. + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(6)); + EXPECT_THAT(chunk3.data.payload, SizeIs(1)); + + EXPECT_FALSE(buf_.Produce(kNow, 8).has_value()); +} +} // namespace +} // namespace dcsctp diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h new file mode 100644 index 0000000000..877dbdda59 --- /dev/null +++ b/net/dcsctp/tx/send_queue.h @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_SEND_QUEUE_H_ +#define NET_DCSCTP_TX_SEND_QUEUE_H_ + +#include <cstdint> +#include <limits> +#include <utility> +#include <vector> + +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/internal_types.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/types.h" + +namespace dcsctp { + +class SendQueue { + public: + // Container for a data chunk that is produced by the SendQueue + struct DataToSend { + explicit DataToSend(Data data) : data(std::move(data)) {} + // The data to send, including all parameters. + Data data; + + // Partial reliability - RFC3758 + absl::optional<int> max_retransmissions; + absl::optional<TimeMs> expires_at; + }; + + virtual ~SendQueue() = default; + + // TODO(boivie): This interface is obviously missing an "Add" function, but + // that is postponed a bit until the story around how to model message + // prioritization, which is important for any advanced stream scheduler, is + // further clarified. + + // Produce a chunk to be sent. + // + // `max_size` refers to how many payload bytes that may be produced, not + // including any headers. + virtual absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) = 0; + + // Discards a partially sent message identified by the parameters `unordered`, + // `stream_id` and `message_id`. The `message_id` comes from the returned + // information when having called `Produce`. A partially sent message means + // that it has had at least one fragment of it returned when `Produce` was + // called prior to calling this method). + // + // This is used when a message has been found to be expired (by the partial + // reliability extension), and the retransmission queue will signal the + // receiver that any partially received message fragments should be skipped. + // This means that any remaining fragments in the Send Queue must be removed + // as well so that they are not sent. + // + // This function returns true if this message had unsent fragments still in + // the queue that were discarded, and false if there were no such fragments. + virtual bool Discard(IsUnordered unordered, + StreamID stream_id, + MID message_id) = 0; + + // Prepares the streams to be reset. This is used to close a WebRTC data + // channel and will be signaled to the other side. + // + // Concretely, it discards all whole (not partly sent) messages in the given + // streams and pauses those streams so that future added messages aren't + // produced until `ResumeStreams` is called. + // + // TODO(boivie): Investigate if it really should discard any message at all. + // RFC8831 only mentions that "[RFC6525] also guarantees that all the messages + // are delivered (or abandoned) before the stream is reset." + // + // This method can be called multiple times to add more streams to be + // reset, and paused while they are resetting. This is the first part of the + // two-phase commit protocol to reset streams, where the caller completes the + // procedure by either calling `CommitResetStreams` or `RollbackResetStreams`. + virtual void PrepareResetStreams(rtc::ArrayView<const StreamID> streams) = 0; + + // Returns true if all non-discarded messages during `PrepareResetStreams` + // (which are those that was partially sent before that method was called) + // have been sent. + virtual bool CanResetStreams() const = 0; + + // Called to commit to reset the streams provided to `PrepareResetStreams`. + // It will reset the stream sequence numbers (SSNs) and message identifiers + // (MIDs) and resume the paused streams. + virtual void CommitResetStreams() = 0; + + // Called to abort the resetting of streams provided to `PrepareResetStreams`. + // Will resume the paused streams without resetting the stream sequence + // numbers (SSNs) or message identifiers (MIDs). Note that the non-partial + // messages that were discarded when calling `PrepareResetStreams` will not be + // recovered, to better match the intention from the sender to "close the + // channel". + virtual void RollbackResetStreams() = 0; + + // Resets all message identifier counters (MID, SSN) and makes all partially + // messages be ready to be re-sent in full. This is used when the peer has + // been detected to have restarted and is used to try to minimize the amount + // of data loss. However, data loss cannot be completely guaranteed when a + // peer restarts. + virtual void Reset() = 0; + + // Returns the amount of buffered data. This doesn't include packets that are + // e.g. inflight. + virtual size_t buffered_amount(StreamID stream_id) const = 0; + + // Returns the total amount of buffer data, for all streams. + virtual size_t total_buffered_amount() const = 0; + + // Returns the limit for the `OnBufferedAmountLow` event. Default value is 0. + virtual size_t buffered_amount_low_threshold(StreamID stream_id) const = 0; + + // Sets a limit for the `OnBufferedAmountLow` event. + virtual void SetBufferedAmountLowThreshold(StreamID stream_id, + size_t bytes) = 0; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_SEND_QUEUE_H_ |