aboutsummaryrefslogtreecommitdiff
path: root/net/dcsctp/tx
diff options
context:
space:
mode:
Diffstat (limited to 'net/dcsctp/tx')
-rw-r--r--net/dcsctp/tx/BUILD.gn141
-rw-r--r--net/dcsctp/tx/mock_send_queue.h60
-rw-r--r--net/dcsctp/tx/retransmission_error_counter.cc37
-rw-r--r--net/dcsctp/tx/retransmission_error_counter.h51
-rw-r--r--net/dcsctp/tx/retransmission_error_counter_test.cc76
-rw-r--r--net/dcsctp/tx/retransmission_queue.cc889
-rw-r--r--net/dcsctp/tx/retransmission_queue.h371
-rw-r--r--net/dcsctp/tx/retransmission_queue_test.cc1007
-rw-r--r--net/dcsctp/tx/retransmission_timeout.cc69
-rw-r--r--net/dcsctp/tx/retransmission_timeout.h58
-rw-r--r--net/dcsctp/tx/retransmission_timeout_test.cc151
-rw-r--r--net/dcsctp/tx/rr_send_queue.cc432
-rw-r--r--net/dcsctp/tx/rr_send_queue.h238
-rw-r--r--net/dcsctp/tx/rr_send_queue_test.cc742
-rw-r--r--net/dcsctp/tx/send_queue.h128
15 files changed, 4450 insertions, 0 deletions
diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn
new file mode 100644
index 0000000000..2f0b27afc6
--- /dev/null
+++ b/net/dcsctp/tx/BUILD.gn
@@ -0,0 +1,141 @@
+# Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+#
+# Use of this source code is governed by a BSD-style license
+# that can be found in the LICENSE file in the root of the source
+# tree. An additional intellectual property rights grant can be found
+# in the file PATENTS. All contributing project authors may
+# be found in the AUTHORS file in the root of the source tree.
+
+import("../../../webrtc.gni")
+
+rtc_source_set("send_queue") {
+ deps = [
+ "../../../api:array_view",
+ "../common:internal_types",
+ "../packet:chunk",
+ "../packet:data",
+ "../public:types",
+ ]
+ sources = [ "send_queue.h" ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+}
+
+rtc_library("rr_send_queue") {
+ deps = [
+ ":send_queue",
+ "../../../api:array_view",
+ "../../../rtc_base:checks",
+ "../../../rtc_base:rtc_base_approved",
+ "../common:pair_hash",
+ "../packet:data",
+ "../public:socket",
+ "../public:types",
+ ]
+ sources = [
+ "rr_send_queue.cc",
+ "rr_send_queue.h",
+ ]
+ absl_deps = [
+ "//third_party/abseil-cpp/absl/algorithm:container",
+ "//third_party/abseil-cpp/absl/algorithm:container",
+ "//third_party/abseil-cpp/absl/strings",
+ "//third_party/abseil-cpp/absl/types:optional",
+ ]
+}
+
+rtc_library("retransmission_error_counter") {
+ deps = [
+ "../../../rtc_base:checks",
+ "../../../rtc_base:rtc_base_approved",
+ "../public:types",
+ ]
+ sources = [
+ "retransmission_error_counter.cc",
+ "retransmission_error_counter.h",
+ ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
+}
+
+rtc_library("retransmission_timeout") {
+ deps = [
+ "../../../rtc_base:checks",
+ "../../../rtc_base:rtc_base_approved",
+ "../public:types",
+ ]
+ sources = [
+ "retransmission_timeout.cc",
+ "retransmission_timeout.h",
+ ]
+}
+
+rtc_library("retransmission_queue") {
+ deps = [
+ ":retransmission_timeout",
+ ":send_queue",
+ "../../../api:array_view",
+ "../../../rtc_base:checks",
+ "../../../rtc_base:rtc_base_approved",
+ "../common:math",
+ "../common:pair_hash",
+ "../common:sequence_numbers",
+ "../common:str_join",
+ "../packet:chunk",
+ "../packet:data",
+ "../public:types",
+ "../timer",
+ ]
+ sources = [
+ "retransmission_queue.cc",
+ "retransmission_queue.h",
+ ]
+ absl_deps = [
+ "//third_party/abseil-cpp/absl/algorithm:container",
+ "//third_party/abseil-cpp/absl/strings",
+ "//third_party/abseil-cpp/absl/types:optional",
+ ]
+}
+
+if (rtc_include_tests) {
+ rtc_source_set("mock_send_queue") {
+ testonly = true
+ deps = [
+ ":send_queue",
+ "../../../api:array_view",
+ "../../../test:test_support",
+ ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+ sources = [ "mock_send_queue.h" ]
+ }
+
+ rtc_library("dcsctp_tx_unittests") {
+ testonly = true
+
+ deps = [
+ ":mock_send_queue",
+ ":retransmission_error_counter",
+ ":retransmission_queue",
+ ":retransmission_timeout",
+ ":rr_send_queue",
+ ":send_queue",
+ "../../../api:array_view",
+ "../../../rtc_base:checks",
+ "../../../rtc_base:gunit_helpers",
+ "../../../rtc_base:rtc_base_approved",
+ "../../../test:test_support",
+ "../packet:chunk",
+ "../packet:data",
+ "../public:socket",
+ "../public:types",
+ "../testing:data_generator",
+ "../testing:testing_macros",
+ "../timer",
+ ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+ sources = [
+ "retransmission_error_counter_test.cc",
+ "retransmission_queue_test.cc",
+ "retransmission_timeout_test.cc",
+ "rr_send_queue_test.cc",
+ ]
+ }
+}
diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h
new file mode 100644
index 0000000000..0cf64583ae
--- /dev/null
+++ b/net/dcsctp/tx/mock_send_queue.h
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_
+#define NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_
+
+#include <cstdint>
+
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/tx/send_queue.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+
+class MockSendQueue : public SendQueue {
+ public:
+ MockSendQueue() {
+ ON_CALL(*this, Produce).WillByDefault([](TimeMs now, size_t max_size) {
+ return absl::nullopt;
+ });
+ }
+
+ MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
+ Produce,
+ (TimeMs now, size_t max_size),
+ (override));
+ MOCK_METHOD(bool,
+ Discard,
+ (IsUnordered unordered, StreamID stream_id, MID message_id),
+ (override));
+ MOCK_METHOD(void,
+ PrepareResetStreams,
+ (rtc::ArrayView<const StreamID> streams),
+ (override));
+ MOCK_METHOD(bool, CanResetStreams, (), (const, override));
+ MOCK_METHOD(void, CommitResetStreams, (), (override));
+ MOCK_METHOD(void, RollbackResetStreams, (), (override));
+ MOCK_METHOD(void, Reset, (), (override));
+ MOCK_METHOD(size_t, buffered_amount, (StreamID stream_id), (const, override));
+ MOCK_METHOD(size_t, total_buffered_amount, (), (const, override));
+ MOCK_METHOD(size_t,
+ buffered_amount_low_threshold,
+ (StreamID stream_id),
+ (const, override));
+ MOCK_METHOD(void,
+ SetBufferedAmountLowThreshold,
+ (StreamID stream_id, size_t bytes),
+ (override));
+};
+
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_
diff --git a/net/dcsctp/tx/retransmission_error_counter.cc b/net/dcsctp/tx/retransmission_error_counter.cc
new file mode 100644
index 0000000000..111b6efe96
--- /dev/null
+++ b/net/dcsctp/tx/retransmission_error_counter.cc
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/retransmission_error_counter.h"
+
+#include "absl/strings/string_view.h"
+#include "rtc_base/logging.h"
+
+namespace dcsctp {
+bool RetransmissionErrorCounter::Increment(absl::string_view reason) {
+ ++counter_;
+ if (counter_ > limit_) {
+ RTC_DLOG(LS_INFO) << log_prefix_ << reason
+ << ", too many retransmissions, counter=" << counter_;
+ return false;
+ }
+
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << reason << ", new counter=" << counter_
+ << ", max=" << limit_;
+ return true;
+}
+
+void RetransmissionErrorCounter::Clear() {
+ if (counter_ > 0) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "recovered from counter=" << counter_;
+ counter_ = 0;
+ }
+}
+
+} // namespace dcsctp
diff --git a/net/dcsctp/tx/retransmission_error_counter.h b/net/dcsctp/tx/retransmission_error_counter.h
new file mode 100644
index 0000000000..bb8d1f754d
--- /dev/null
+++ b/net/dcsctp/tx/retransmission_error_counter.h
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_TX_RETRANSMISSION_ERROR_COUNTER_H_
+#define NET_DCSCTP_TX_RETRANSMISSION_ERROR_COUNTER_H_
+
+#include <functional>
+#include <string>
+#include <utility>
+
+#include "absl/strings/string_view.h"
+#include "net/dcsctp/public/dcsctp_options.h"
+
+namespace dcsctp {
+
+// The RetransmissionErrorCounter is a simple counter with a limit, and when
+// the limit is exceeded, the counter is exhausted and the connection will
+// be closed. It's incremented on retransmission errors, such as the T3-RTX
+// timer expiring, but also missing heartbeats and stream reset requests.
+class RetransmissionErrorCounter {
+ public:
+ RetransmissionErrorCounter(absl::string_view log_prefix,
+ const DcSctpOptions& options)
+ : log_prefix_(std::string(log_prefix) + "rtx-errors: "),
+ limit_(options.max_retransmissions) {}
+
+ // Increments the retransmission timer. If the maximum error count has been
+ // reached, `false` will be returned.
+ bool Increment(absl::string_view reason);
+ bool IsExhausted() const { return counter_ > limit_; }
+
+ // Clears the retransmission errors.
+ void Clear();
+
+ // Returns its current value
+ int value() const { return counter_; }
+
+ private:
+ const std::string log_prefix_;
+ const int limit_;
+ int counter_ = 0;
+};
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_TX_RETRANSMISSION_ERROR_COUNTER_H_
diff --git a/net/dcsctp/tx/retransmission_error_counter_test.cc b/net/dcsctp/tx/retransmission_error_counter_test.cc
new file mode 100644
index 0000000000..61ee82926d
--- /dev/null
+++ b/net/dcsctp/tx/retransmission_error_counter_test.cc
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/retransmission_error_counter.h"
+
+#include "net/dcsctp/public/dcsctp_options.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+
+TEST(RetransmissionErrorCounterTest, HasInitialValue) {
+ DcSctpOptions options;
+ RetransmissionErrorCounter counter("log: ", options);
+ EXPECT_EQ(counter.value(), 0);
+}
+
+TEST(RetransmissionErrorCounterTest, ReturnsFalseAtMaximumValue) {
+ DcSctpOptions options;
+ options.max_retransmissions = 5;
+ RetransmissionErrorCounter counter("log: ", options);
+ EXPECT_TRUE(counter.Increment("test")); // 1
+ EXPECT_TRUE(counter.Increment("test")); // 2
+ EXPECT_TRUE(counter.Increment("test")); // 3
+ EXPECT_TRUE(counter.Increment("test")); // 4
+ EXPECT_TRUE(counter.Increment("test")); // 5
+ EXPECT_FALSE(counter.Increment("test")); // Too many retransmissions
+}
+
+TEST(RetransmissionErrorCounterTest, CanHandleZeroRetransmission) {
+ DcSctpOptions options;
+ options.max_retransmissions = 0;
+ RetransmissionErrorCounter counter("log: ", options);
+ EXPECT_FALSE(counter.Increment("test")); // One is too many.
+}
+
+TEST(RetransmissionErrorCounterTest, IsExhaustedAtMaximum) {
+ DcSctpOptions options;
+ options.max_retransmissions = 3;
+ RetransmissionErrorCounter counter("log: ", options);
+ EXPECT_TRUE(counter.Increment("test")); // 1
+ EXPECT_FALSE(counter.IsExhausted());
+ EXPECT_TRUE(counter.Increment("test")); // 2
+ EXPECT_FALSE(counter.IsExhausted());
+ EXPECT_TRUE(counter.Increment("test")); // 3
+ EXPECT_FALSE(counter.IsExhausted());
+ EXPECT_FALSE(counter.Increment("test")); // Too many retransmissions
+ EXPECT_TRUE(counter.IsExhausted());
+ EXPECT_FALSE(counter.Increment("test")); // One after too many
+ EXPECT_TRUE(counter.IsExhausted());
+}
+
+TEST(RetransmissionErrorCounterTest, ClearingCounter) {
+ DcSctpOptions options;
+ options.max_retransmissions = 3;
+ RetransmissionErrorCounter counter("log: ", options);
+ EXPECT_TRUE(counter.Increment("test")); // 1
+ EXPECT_TRUE(counter.Increment("test")); // 2
+ counter.Clear();
+ EXPECT_TRUE(counter.Increment("test")); // 1
+ EXPECT_TRUE(counter.Increment("test")); // 2
+ EXPECT_TRUE(counter.Increment("test")); // 3
+ EXPECT_FALSE(counter.IsExhausted());
+ EXPECT_FALSE(counter.Increment("test")); // Too many retransmissions
+ EXPECT_TRUE(counter.IsExhausted());
+}
+
+} // namespace
+} // namespace dcsctp
diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc
new file mode 100644
index 0000000000..ef2f0e3172
--- /dev/null
+++ b/net/dcsctp/tx/retransmission_queue.cc
@@ -0,0 +1,889 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/retransmission_queue.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <functional>
+#include <iterator>
+#include <map>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/math.h"
+#include "net/dcsctp/common/pair_hash.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/common/str_join.h"
+#include "net/dcsctp/packet/chunk/data_chunk.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/chunk/idata_chunk.h"
+#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
+#include "net/dcsctp/packet/chunk/sack_chunk.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_options.h"
+#include "net/dcsctp/public/types.h"
+#include "net/dcsctp/timer/timer.h"
+#include "net/dcsctp/tx/send_queue.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/strings/string_builder.h"
+
+namespace dcsctp {
+namespace {
+
+// The number of times a packet must be NACKed before it's retransmitted.
+// See https://tools.ietf.org/html/rfc4960#section-7.2.4
+constexpr size_t kNumberOfNacksForRetransmission = 3;
+} // namespace
+
+RetransmissionQueue::RetransmissionQueue(
+ absl::string_view log_prefix,
+ TSN initial_tsn,
+ size_t a_rwnd,
+ SendQueue& send_queue,
+ std::function<void(DurationMs rtt)> on_new_rtt,
+ std::function<void()> on_clear_retransmission_counter,
+ Timer& t3_rtx,
+ const DcSctpOptions& options,
+ bool supports_partial_reliability,
+ bool use_message_interleaving)
+ : options_(options),
+ partial_reliability_(supports_partial_reliability),
+ log_prefix_(std::string(log_prefix) + "tx: "),
+ data_chunk_header_size_(use_message_interleaving
+ ? IDataChunk::kHeaderSize
+ : DataChunk::kHeaderSize),
+ on_new_rtt_(std::move(on_new_rtt)),
+ on_clear_retransmission_counter_(
+ std::move(on_clear_retransmission_counter)),
+ t3_rtx_(t3_rtx),
+ cwnd_(options_.cwnd_mtus_initial * options_.mtu),
+ rwnd_(a_rwnd),
+ // https://tools.ietf.org/html/rfc4960#section-7.2.1
+ // "The initial value of ssthresh MAY be arbitrarily high (for
+ // example, implementations MAY use the size of the receiver advertised
+ // window).""
+ ssthresh_(rwnd_),
+ next_tsn_(tsn_unwrapper_.Unwrap(initial_tsn)),
+ last_cumulative_tsn_ack_(tsn_unwrapper_.Unwrap(TSN(*initial_tsn - 1))),
+ send_queue_(send_queue) {}
+
+bool RetransmissionQueue::IsConsistent() const {
+ size_t actual_outstanding_bytes = 0;
+
+ std::set<UnwrappedTSN> actual_to_be_retransmitted;
+ for (const auto& elem : outstanding_data_) {
+ if (elem.second.is_outstanding()) {
+ actual_outstanding_bytes += GetSerializedChunkSize(elem.second.data());
+ }
+
+ if (elem.second.should_be_retransmitted()) {
+ actual_to_be_retransmitted.insert(elem.first);
+ }
+ }
+
+ return actual_outstanding_bytes == outstanding_bytes_ &&
+ actual_to_be_retransmitted == to_be_retransmitted_;
+}
+
+// Returns how large a chunk will be, serialized, carrying the data
+size_t RetransmissionQueue::GetSerializedChunkSize(const Data& data) const {
+ return RoundUpTo4(data_chunk_header_size_ + data.size());
+}
+
+void RetransmissionQueue::RemoveAcked(UnwrappedTSN cumulative_tsn_ack,
+ AckInfo& ack_info) {
+ auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack);
+
+ for (auto it = outstanding_data_.begin(); it != first_unacked; ++it) {
+ ack_info.bytes_acked_by_cumulative_tsn_ack += it->second.data().size();
+ ack_info.acked_tsns.push_back(it->first.Wrap());
+ if (it->second.is_outstanding()) {
+ outstanding_bytes_ -= GetSerializedChunkSize(it->second.data());
+ } else if (it->second.should_be_retransmitted()) {
+ to_be_retransmitted_.erase(it->first);
+ }
+ }
+
+ outstanding_data_.erase(outstanding_data_.begin(), first_unacked);
+}
+
+void RetransmissionQueue::AckGapBlocks(
+ UnwrappedTSN cumulative_tsn_ack,
+ rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
+ AckInfo& ack_info) {
+ // Mark all non-gaps as ACKED (but they can't be removed) as (from RFC)
+ // "SCTP considers the information carried in the Gap Ack Blocks in the
+ // SACK chunk as advisory.". Note that when NR-SACK is supported, this can be
+ // handled differently.
+
+ for (auto& block : gap_ack_blocks) {
+ auto start = outstanding_data_.lower_bound(
+ UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start));
+ auto end = outstanding_data_.upper_bound(
+ UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end));
+ for (auto iter = start; iter != end; ++iter) {
+ if (!iter->second.is_acked()) {
+ ack_info.bytes_acked_by_new_gap_ack_blocks +=
+ iter->second.data().size();
+ if (iter->second.is_outstanding()) {
+ outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data());
+ }
+ if (iter->second.should_be_retransmitted()) {
+ to_be_retransmitted_.erase(iter->first);
+ }
+ iter->second.Ack();
+ ack_info.highest_tsn_acked =
+ std::max(ack_info.highest_tsn_acked, iter->first);
+ ack_info.acked_tsns.push_back(iter->first.Wrap());
+ }
+ }
+ }
+}
+
+void RetransmissionQueue::NackBetweenAckBlocks(
+ UnwrappedTSN cumulative_tsn_ack,
+ rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
+ AckInfo& ack_info) {
+ // Mark everything between the blocks as NACKED/TO_BE_RETRANSMITTED.
+ // https://tools.ietf.org/html/rfc4960#section-7.2.4
+ // "Mark the DATA chunk(s) with three miss indications for retransmission."
+ // "For each incoming SACK, miss indications are incremented only for
+ // missing TSNs prior to the highest TSN newly acknowledged in the SACK."
+ //
+ // What this means is that only when there is a increasing stream of data
+ // received and there are new packets seen (since last time), packets that are
+ // in-flight and between gaps should be nacked. This means that SCTP relies on
+ // the T3-RTX-timer to re-send packets otherwise.
+ UnwrappedTSN max_tsn_to_nack = ack_info.highest_tsn_acked;
+ if (is_in_fast_recovery() && cumulative_tsn_ack > last_cumulative_tsn_ack_) {
+ // https://tools.ietf.org/html/rfc4960#section-7.2.4
+ // "If an endpoint is in Fast Recovery and a SACK arrives that advances
+ // the Cumulative TSN Ack Point, the miss indications are incremented for
+ // all TSNs reported missing in the SACK."
+ max_tsn_to_nack = UnwrappedTSN::AddTo(
+ cumulative_tsn_ack,
+ gap_ack_blocks.empty() ? 0 : gap_ack_blocks.rbegin()->end);
+ }
+
+ UnwrappedTSN prev_block_last_acked = cumulative_tsn_ack;
+ for (auto& block : gap_ack_blocks) {
+ UnwrappedTSN cur_block_first_acked =
+ UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start);
+ for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked);
+ iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) {
+ if (iter->first <= max_tsn_to_nack) {
+ if (iter->second.is_outstanding()) {
+ outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data());
+ }
+
+ if (iter->second.Nack()) {
+ ack_info.has_packet_loss = true;
+ to_be_retransmitted_.insert(iter->first);
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << *iter->first.Wrap()
+ << " marked for retransmission";
+ }
+ }
+ }
+ prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
+ }
+
+ // Note that packets are not NACKED which are above the highest gap-ack-block
+ // (or above the cumulative ack TSN if no gap-ack-blocks) as only packets
+ // up until the highest_tsn_acked (see above) should be considered when
+ // NACKing.
+}
+
+void RetransmissionQueue::MaybeExitFastRecovery(
+ UnwrappedTSN cumulative_tsn_ack) {
+ // https://tools.ietf.org/html/rfc4960#section-7.2.4
+ // "When a SACK acknowledges all TSNs up to and including this [fast
+ // recovery] exit point, Fast Recovery is exited."
+ if (fast_recovery_exit_tsn_.has_value() &&
+ cumulative_tsn_ack >= *fast_recovery_exit_tsn_) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "exit_point=" << *fast_recovery_exit_tsn_->Wrap()
+ << " reached - exiting fast recovery";
+ fast_recovery_exit_tsn_ = absl::nullopt;
+ }
+}
+
+void RetransmissionQueue::HandleIncreasedCumulativeTsnAck(
+ size_t outstanding_bytes,
+ size_t total_bytes_acked) {
+ // Allow some margin for classifying as fully utilized, due to e.g. that too
+ // small packets (less than kMinimumFragmentedPayload) are not sent +
+ // overhead.
+ bool is_fully_utilized = outstanding_bytes + options_.mtu >= cwnd_;
+ size_t old_cwnd = cwnd_;
+ if (phase() == CongestionAlgorithmPhase::kSlowStart) {
+ if (is_fully_utilized && !is_in_fast_recovery()) {
+ // https://tools.ietf.org/html/rfc4960#section-7.2.1
+ // "Only when these three conditions are met can the cwnd be
+ // increased; otherwise, the cwnd MUST not be increased. If these
+ // conditions are met, then cwnd MUST be increased by, at most, the
+ // lesser of 1) the total size of the previously outstanding DATA
+ // chunk(s) acknowledged, and 2) the destination's path MTU."
+ if (options_.slow_start_tcp_style) {
+ cwnd_ += std::min(total_bytes_acked, cwnd_);
+ } else {
+ cwnd_ += std::min(total_bytes_acked, options_.mtu);
+ }
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "SS increase cwnd=" << cwnd_
+ << " (" << old_cwnd << ")";
+ }
+ } else if (phase() == CongestionAlgorithmPhase::kCongestionAvoidance) {
+ // https://tools.ietf.org/html/rfc4960#section-7.2.2
+ // "Whenever cwnd is greater than ssthresh, upon each SACK arrival
+ // that advances the Cumulative TSN Ack Point, increase
+ // partial_bytes_acked by the total number of bytes of all new chunks
+ // acknowledged in that SACK including chunks acknowledged by the new
+ // Cumulative TSN Ack and by Gap Ack Blocks."
+ size_t old_pba = partial_bytes_acked_;
+ partial_bytes_acked_ += total_bytes_acked;
+
+ if (partial_bytes_acked_ >= cwnd_ && is_fully_utilized) {
+ // https://tools.ietf.org/html/rfc4960#section-7.2.2
+ // "When partial_bytes_acked is equal to or greater than cwnd and
+ // before the arrival of the SACK the sender had cwnd or more bytes of
+ // data outstanding (i.e., before arrival of the SACK, flightsize was
+ // greater than or equal to cwnd), increase cwnd by MTU, and reset
+ // partial_bytes_acked to (partial_bytes_acked - cwnd)."
+ cwnd_ += options_.mtu;
+ partial_bytes_acked_ -= cwnd_;
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA increase cwnd=" << cwnd_
+ << " (" << old_cwnd << ") ssthresh=" << ssthresh_
+ << ", pba=" << partial_bytes_acked_ << " ("
+ << old_pba << ")";
+ } else {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA unchanged cwnd=" << cwnd_
+ << " (" << old_cwnd << ") ssthresh=" << ssthresh_
+ << ", pba=" << partial_bytes_acked_ << " ("
+ << old_pba << ")";
+ }
+ }
+}
+
+void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) {
+ if (!is_in_fast_recovery()) {
+ // https://tools.ietf.org/html/rfc4960#section-7.2.4
+ // "If not in Fast Recovery, adjust the ssthresh and cwnd of the
+ // destination address(es) to which the missing DATA chunks were last
+ // sent, according to the formula described in Section 7.2.3."
+ size_t old_cwnd = cwnd_;
+ size_t old_pba = partial_bytes_acked_;
+ ssthresh_ = std::max(cwnd_ / 2, options_.cwnd_mtus_min * options_.mtu);
+ cwnd_ = ssthresh_;
+ partial_bytes_acked_ = 0;
+
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "packet loss detected (not fast recovery). cwnd="
+ << cwnd_ << " (" << old_cwnd
+ << "), ssthresh=" << ssthresh_
+ << ", pba=" << partial_bytes_acked_ << " (" << old_pba
+ << ")";
+
+ // https://tools.ietf.org/html/rfc4960#section-7.2.4
+ // "If not in Fast Recovery, enter Fast Recovery and mark the highest
+ // outstanding TSN as the Fast Recovery exit point."
+ fast_recovery_exit_tsn_ = outstanding_data_.empty()
+ ? last_cumulative_tsn_ack_
+ : outstanding_data_.rbegin()->first;
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "fast recovery initiated with exit_point="
+ << *fast_recovery_exit_tsn_->Wrap();
+ } else {
+ // https://tools.ietf.org/html/rfc4960#section-7.2.4
+ // "While in Fast Recovery, the ssthresh and cwnd SHOULD NOT change for
+ // any destinations due to a subsequent Fast Recovery event (i.e., one
+ // SHOULD NOT reduce the cwnd further due to a subsequent Fast Retransmit)."
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "packet loss detected (fast recovery). No changes.";
+ }
+}
+
+void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) {
+ rwnd_ = outstanding_bytes_ >= a_rwnd ? 0 : a_rwnd - outstanding_bytes_;
+}
+
+void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() {
+ // Note: Can't use `outstanding_bytes()` as that one doesn't count chunks to
+ // be retransmitted.
+ if (outstanding_data_.empty()) {
+ // https://tools.ietf.org/html/rfc4960#section-6.3.2
+ // "Whenever all outstanding data sent to an address have been
+ // acknowledged, turn off the T3-rtx timer of that address.
+ // Note: Already stopped in `StopT3RtxTimerOnIncreasedCumulativeTsnAck`."
+ } else {
+ // https://tools.ietf.org/html/rfc4960#section-6.3.2
+ // "Whenever a SACK is received that acknowledges the DATA chunk
+ // with the earliest outstanding TSN for that address, restart the T3-rtx
+ // timer for that address with its current RTO (if there is still
+ // outstanding data on that address)."
+ // "Whenever a SACK is received missing a TSN that was previously
+ // acknowledged via a Gap Ack Block, start the T3-rtx for the destination
+ // address to which the DATA chunk was originally transmitted if it is not
+ // already running."
+ if (!t3_rtx_.is_running()) {
+ t3_rtx_.Start();
+ }
+ }
+}
+
+bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const {
+ // https://tools.ietf.org/html/rfc4960#section-6.2.1
+ // "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point,
+ // then drop the SACK. Since Cumulative TSN Ack is monotonically increasing,
+ // a SACK whose Cumulative TSN Ack is less than the Cumulative TSN Ack Point
+ // indicates an out-of- order SACK."
+ //
+ // Note: Important not to drop SACKs with identical TSN to that previously
+ // received, as the gap ack blocks or dup tsn fields may have changed.
+ UnwrappedTSN cumulative_tsn_ack =
+ tsn_unwrapper_.PeekUnwrap(sack.cumulative_tsn_ack());
+ if (cumulative_tsn_ack < last_cumulative_tsn_ack_) {
+ // https://tools.ietf.org/html/rfc4960#section-6.2.1
+ // "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point,
+ // then drop the SACK. Since Cumulative TSN Ack is monotonically
+ // increasing, a SACK whose Cumulative TSN Ack is less than the Cumulative
+ // TSN Ack Point indicates an out-of- order SACK."
+ return false;
+ } else if (outstanding_data_.empty() &&
+ cumulative_tsn_ack > last_cumulative_tsn_ack_) {
+ // No in-flight data and cum-tsn-ack above what was last ACKed - not valid.
+ return false;
+ } else if (!outstanding_data_.empty() &&
+ cumulative_tsn_ack > outstanding_data_.rbegin()->first) {
+ // There is in-flight data, but the cum-tsn-ack is beyond that - not valid.
+ return false;
+ }
+ return true;
+}
+
+bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) {
+ if (!IsSackValid(sack)) {
+ return false;
+ }
+
+ size_t old_outstanding_bytes = outstanding_bytes_;
+ size_t old_rwnd = rwnd_;
+ UnwrappedTSN cumulative_tsn_ack =
+ tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack());
+
+ if (sack.gap_ack_blocks().empty()) {
+ UpdateRTT(now, cumulative_tsn_ack);
+ }
+
+ AckInfo ack_info(cumulative_tsn_ack);
+ // Erase all items up to cumulative_tsn_ack.
+ RemoveAcked(cumulative_tsn_ack, ack_info);
+
+ // ACK packets reported in the gap ack blocks
+ AckGapBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info);
+
+ // NACK and possibly mark for retransmit chunks that weren't acked.
+ NackBetweenAckBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info);
+
+ // Update of outstanding_data_ is now done. Congestion control remains.
+ UpdateReceiverWindow(sack.a_rwnd());
+
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK. Acked TSN: "
+ << StrJoin(ack_info.acked_tsns, ",",
+ [](rtc::StringBuilder& sb, TSN tsn) {
+ sb << *tsn;
+ })
+ << ", cum_tsn_ack=" << *cumulative_tsn_ack.Wrap() << " ("
+ << *last_cumulative_tsn_ack_.Wrap()
+ << "), outstanding_bytes=" << outstanding_bytes_ << " ("
+ << old_outstanding_bytes << "), rwnd=" << rwnd_ << " ("
+ << old_rwnd << ")";
+
+ MaybeExitFastRecovery(cumulative_tsn_ack);
+
+ if (cumulative_tsn_ack > last_cumulative_tsn_ack_) {
+ // https://tools.ietf.org/html/rfc4960#section-6.3.2
+ // "Whenever a SACK is received that acknowledges the DATA chunk
+ // with the earliest outstanding TSN for that address, restart the T3-rtx
+ // timer for that address with its current RTO (if there is still
+ // outstanding data on that address)."
+ // Note: It may be started again in a bit further down.
+ t3_rtx_.Stop();
+
+ HandleIncreasedCumulativeTsnAck(
+ old_outstanding_bytes, ack_info.bytes_acked_by_cumulative_tsn_ack +
+ ack_info.bytes_acked_by_new_gap_ack_blocks);
+ }
+
+ if (ack_info.has_packet_loss) {
+ is_in_fast_retransmit_ = true;
+ HandlePacketLoss(ack_info.highest_tsn_acked);
+ }
+
+ // https://tools.ietf.org/html/rfc4960#section-8.2
+ // "When an outstanding TSN is acknowledged [...] the endpoint shall clear
+ // the error counter ..."
+ if (ack_info.bytes_acked_by_cumulative_tsn_ack > 0 ||
+ ack_info.bytes_acked_by_new_gap_ack_blocks > 0) {
+ on_clear_retransmission_counter_();
+ }
+
+ last_cumulative_tsn_ack_ = cumulative_tsn_ack;
+ StartT3RtxTimerIfOutstandingData();
+ RTC_DCHECK(IsConsistent());
+ return true;
+}
+
+void RetransmissionQueue::UpdateRTT(TimeMs now,
+ UnwrappedTSN cumulative_tsn_ack) {
+ // RTT updating is flawed in SCTP, as explained in e.g. Pedersen J, Griwodz C,
+ // Halvorsen P (2006) Considerations of SCTP retransmission delays for thin
+ // streams.
+ // Due to delayed acknowledgement, the SACK may be sent much later which
+ // increases the calculated RTT.
+ // TODO(boivie): Consider occasionally sending DATA chunks with I-bit set and
+ // use only those packets for measurement.
+
+ auto it = outstanding_data_.find(cumulative_tsn_ack);
+ if (it != outstanding_data_.end()) {
+ if (!it->second.has_been_retransmitted()) {
+ // https://tools.ietf.org/html/rfc4960#section-6.3.1
+ // "Karn's algorithm: RTT measurements MUST NOT be made using
+ // packets that were retransmitted (and thus for which it is ambiguous
+ // whether the reply was for the first instance of the chunk or for a
+ // later instance)"
+ DurationMs rtt = now - it->second.time_sent();
+ on_new_rtt_(rtt);
+ }
+ }
+}
+
+void RetransmissionQueue::HandleT3RtxTimerExpiry() {
+ size_t old_cwnd = cwnd_;
+ size_t old_outstanding_bytes = outstanding_bytes_;
+ // https://tools.ietf.org/html/rfc4960#section-6.3.3
+ // "For the destination address for which the timer expires, adjust
+ // its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU."
+ ssthresh_ = std::max(cwnd_ / 2, 4 * options_.mtu);
+ cwnd_ = 1 * options_.mtu;
+
+ // https://tools.ietf.org/html/rfc4960#section-6.3.3
+ // "For the destination address for which the timer expires, set RTO
+ // <- RTO * 2 ("back off the timer"). The maximum value discussed in rule C7
+ // above (RTO.max) may be used to provide an upper bound to this doubling
+ // operation."
+
+ // Already done by the Timer implementation.
+
+ // https://tools.ietf.org/html/rfc4960#section-6.3.3
+ // "Determine how many of the earliest (i.e., lowest TSN) outstanding
+ // DATA chunks for the address for which the T3-rtx has expired will fit into
+ // a single packet"
+
+ // https://tools.ietf.org/html/rfc4960#section-6.3.3
+ // "Note: Any DATA chunks that were sent to the address for which the
+ // T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be
+ // marked for retransmission and sent as soon as cwnd allows (normally, when a
+ // SACK arrives)."
+ int count = 0;
+ for (auto& elem : outstanding_data_) {
+ UnwrappedTSN tsn = elem.first;
+ TxData& item = elem.second;
+ if (!item.is_acked()) {
+ if (item.is_outstanding()) {
+ outstanding_bytes_ -= GetSerializedChunkSize(item.data());
+ }
+ if (item.Nack(/*retransmit_now=*/true)) {
+ to_be_retransmitted_.insert(tsn);
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Chunk " << *tsn.Wrap()
+ << " will be retransmitted due to T3-RTX";
+ ++count;
+ }
+ }
+ }
+
+ // https://tools.ietf.org/html/rfc4960#section-6.3.3
+ // "Start the retransmission timer T3-rtx on the destination address
+ // to which the retransmission is sent, if rule R1 above indicates to do so."
+
+ // Already done by the Timer implementation.
+
+ RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_
+ << " (" << old_cwnd << "), ssthresh=" << ssthresh_
+ << ", rtx-packets=" << count << ", outstanding_bytes "
+ << outstanding_bytes_ << " (" << old_outstanding_bytes
+ << ")";
+ RTC_DCHECK(IsConsistent());
+}
+
+std::vector<std::pair<TSN, Data>>
+RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) {
+ std::vector<std::pair<TSN, Data>> result;
+
+ for (auto it = to_be_retransmitted_.begin();
+ it != to_be_retransmitted_.end();) {
+ UnwrappedTSN tsn = *it;
+ auto elem = outstanding_data_.find(tsn);
+ RTC_DCHECK(elem != outstanding_data_.end());
+ TxData& item = elem->second;
+ RTC_DCHECK(item.should_be_retransmitted());
+ RTC_DCHECK(!item.is_outstanding());
+ RTC_DCHECK(!item.is_abandoned());
+ RTC_DCHECK(!item.is_acked());
+
+ size_t serialized_size = GetSerializedChunkSize(item.data());
+ if (serialized_size <= max_size) {
+ item.Retransmit();
+ result.emplace_back(tsn.Wrap(), item.data().Clone());
+ max_size -= serialized_size;
+ outstanding_bytes_ += serialized_size;
+ it = to_be_retransmitted_.erase(it);
+ } else {
+ ++it;
+ }
+ // No point in continuing if the packet is full.
+ if (max_size <= data_chunk_header_size_) {
+ break;
+ }
+ }
+
+ return result;
+}
+
+std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
+ TimeMs now,
+ size_t bytes_remaining_in_packet) {
+ // Chunks are always padded to even divisible by four.
+ RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet));
+
+ std::vector<std::pair<TSN, Data>> to_be_sent;
+ size_t old_outstanding_bytes = outstanding_bytes_;
+ size_t old_rwnd = rwnd_;
+ if (is_in_fast_retransmit()) {
+ // https://tools.ietf.org/html/rfc4960#section-7.2.4
+ // "Determine how many of the earliest (i.e., lowest TSN) DATA chunks
+ // marked for retransmission will fit into a single packet ... Retransmit
+ // those K DATA chunks in a single packet. When a Fast Retransmit is being
+ // performed, the sender SHOULD ignore the value of cwnd and SHOULD NOT
+ // delay retransmission for this single packet."
+ is_in_fast_retransmit_ = false;
+ to_be_sent = GetChunksToBeRetransmitted(bytes_remaining_in_packet);
+ size_t to_be_sent_bytes = absl::c_accumulate(
+ to_be_sent, 0, [&](size_t r, const std::pair<TSN, Data>& d) {
+ return r + GetSerializedChunkSize(d.second);
+ });
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "fast-retransmit: sending "
+ << to_be_sent.size() << " chunks, " << to_be_sent_bytes
+ << " bytes";
+ } else {
+ // Normal sending. Calculate the bandwidth budget (how many bytes that is
+ // allowed to be sent), and fill that up first with chunks that are
+ // scheduled to be retransmitted. If there is still budget, send new chunks
+ // (which will have their TSN assigned here.)
+ size_t remaining_cwnd_bytes =
+ outstanding_bytes_ >= cwnd_ ? 0 : cwnd_ - outstanding_bytes_;
+ size_t max_bytes = RoundDownTo4(std::min(
+ std::min(bytes_remaining_in_packet, rwnd()), remaining_cwnd_bytes));
+
+ to_be_sent = GetChunksToBeRetransmitted(max_bytes);
+ max_bytes -= absl::c_accumulate(
+ to_be_sent, 0, [&](size_t r, const std::pair<TSN, Data>& d) {
+ return r + GetSerializedChunkSize(d.second);
+ });
+
+ while (max_bytes > data_chunk_header_size_) {
+ RTC_DCHECK(IsDivisibleBy4(max_bytes));
+ absl::optional<SendQueue::DataToSend> chunk_opt =
+ send_queue_.Produce(now, max_bytes - data_chunk_header_size_);
+ if (!chunk_opt.has_value()) {
+ break;
+ }
+
+ UnwrappedTSN tsn = next_tsn_;
+ next_tsn_.Increment();
+ to_be_sent.emplace_back(tsn.Wrap(), chunk_opt->data.Clone());
+
+ // All chunks are always padded to be even divisible by 4.
+ size_t chunk_size = GetSerializedChunkSize(chunk_opt->data);
+ max_bytes -= chunk_size;
+ outstanding_bytes_ += chunk_size;
+ rwnd_ -= chunk_size;
+ outstanding_data_.emplace(
+ tsn, RetransmissionQueue::TxData(std::move(chunk_opt->data),
+ chunk_opt->max_retransmissions, now,
+ chunk_opt->expires_at));
+ }
+ }
+
+ if (!to_be_sent.empty()) {
+ // https://tools.ietf.org/html/rfc4960#section-6.3.2
+ // "Every time a DATA chunk is sent to any address (including a
+ // retransmission), if the T3-rtx timer of that address is not running,
+ // start it running so that it will expire after the RTO of that address."
+ if (!t3_rtx_.is_running()) {
+ t3_rtx_.Start();
+ }
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Sending TSN "
+ << StrJoin(to_be_sent, ",",
+ [&](rtc::StringBuilder& sb,
+ const std::pair<TSN, Data>& c) {
+ sb << *c.first;
+ })
+ << " - "
+ << absl::c_accumulate(
+ to_be_sent, 0,
+ [&](size_t r, const std::pair<TSN, Data>& d) {
+ return r + GetSerializedChunkSize(d.second);
+ })
+ << " bytes. outstanding_bytes=" << outstanding_bytes_
+ << " (" << old_outstanding_bytes << "), cwnd=" << cwnd_
+ << ", rwnd=" << rwnd_ << " (" << old_rwnd << ")";
+ }
+ RTC_DCHECK(IsConsistent());
+ return to_be_sent;
+}
+
+std::vector<std::pair<TSN, RetransmissionQueue::State>>
+RetransmissionQueue::GetChunkStatesForTesting() const {
+ std::vector<std::pair<TSN, RetransmissionQueue::State>> states;
+ states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked);
+ for (const auto& elem : outstanding_data_) {
+ State state;
+ if (elem.second.is_abandoned()) {
+ state = State::kAbandoned;
+ } else if (elem.second.should_be_retransmitted()) {
+ state = State::kToBeRetransmitted;
+ } else if (elem.second.is_acked()) {
+ state = State::kAcked;
+ } else if (elem.second.is_outstanding()) {
+ state = State::kInFlight;
+ } else {
+ state = State::kNacked;
+ }
+
+ states.emplace_back(elem.first.Wrap(), state);
+ }
+ return states;
+}
+
+bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) {
+ if (!partial_reliability_) {
+ return false;
+ }
+ ExpireChunks(now);
+ if (!outstanding_data_.empty()) {
+ auto it = outstanding_data_.begin();
+ return it->first == last_cumulative_tsn_ack_.next_value() &&
+ it->second.is_abandoned();
+ }
+ RTC_DCHECK(IsConsistent());
+ return false;
+}
+
+void RetransmissionQueue::TxData::Ack() {
+ ack_state_ = AckState::kAcked;
+ should_be_retransmitted_ = false;
+}
+
+bool RetransmissionQueue::TxData::Nack(bool retransmit_now) {
+ ack_state_ = AckState::kNacked;
+ ++nack_count_;
+ if ((retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission) &&
+ !is_abandoned_) {
+ should_be_retransmitted_ = true;
+ return true;
+ }
+ return false;
+}
+
+void RetransmissionQueue::TxData::Retransmit() {
+ ack_state_ = AckState::kUnacked;
+ should_be_retransmitted_ = false;
+
+ nack_count_ = 0;
+ ++num_retransmissions_;
+}
+
+void RetransmissionQueue::TxData::Abandon() {
+ is_abandoned_ = true;
+ should_be_retransmitted_ = false;
+}
+
+bool RetransmissionQueue::TxData::has_expired(TimeMs now) const {
+ if (ack_state_ != AckState::kAcked && !is_abandoned_) {
+ if (max_retransmissions_.has_value() &&
+ num_retransmissions_ >= *max_retransmissions_) {
+ return true;
+ } else if (expires_at_.has_value() && *expires_at_ <= now) {
+ return true;
+ }
+ }
+ return false;
+}
+
+void RetransmissionQueue::ExpireChunks(TimeMs now) {
+ for (const auto& elem : outstanding_data_) {
+ UnwrappedTSN tsn = elem.first;
+ const TxData& item = elem.second;
+
+ // Chunks that are in-flight (possibly lost?), nacked or to be retransmitted
+ // can be expired easily. There is always a risk that a message is expired
+ // that was already received by the peer, but for which there haven't been
+ // a SACK received. But that's acceptable, and handled.
+ if (item.is_abandoned()) {
+ // Already abandoned.
+ } else if (item.has_expired(now)) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap()
+ << " and message " << *item.data().message_id
+ << " as expired";
+ ExpireAllFor(item);
+ } else {
+ // A non-expired chunk. No need to iterate any further.
+ break;
+ }
+ }
+}
+
+void RetransmissionQueue::ExpireAllFor(
+ const RetransmissionQueue::TxData& item) {
+ // Erase all remaining chunks from the producer, if any.
+ if (send_queue_.Discard(item.data().is_unordered, item.data().stream_id,
+ item.data().message_id)) {
+ // There were remaining chunks to be produced for this message. Since the
+ // receiver may have already received all chunks (up till now) for this
+ // message, we can't just FORWARD-TSN to the last fragment in this
+ // (abandoned) message and start sending a new message, as the receiver will
+ // then see a new message before the end of the previous one was seen (or
+ // skipped over). So create a new fragment, representing the end, that the
+ // received will never see as it is abandoned immediately and used as cum
+ // TSN in the sent FORWARD-TSN.
+ UnwrappedTSN tsn = next_tsn_;
+ next_tsn_.Increment();
+ Data message_end(item.data().stream_id, item.data().ssn,
+ item.data().message_id, item.data().fsn, item.data().ppid,
+ std::vector<uint8_t>(), Data::IsBeginning(false),
+ Data::IsEnd(true), item.data().is_unordered);
+ TxData& added_item =
+ outstanding_data_
+ .emplace(tsn, RetransmissionQueue::TxData(std::move(message_end),
+ absl::nullopt, TimeMs(0),
+ absl::nullopt))
+ .first->second;
+ // The added chunk shouldn't be included in `outstanding_bytes`, so set it
+ // as acked.
+ added_item.Ack();
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Adding unsent end placeholder for message at tsn="
+ << *tsn.Wrap();
+ }
+ for (auto& elem : outstanding_data_) {
+ UnwrappedTSN tsn = elem.first;
+ TxData& other = elem.second;
+
+ if (!other.is_abandoned() &&
+ other.data().stream_id == item.data().stream_id &&
+ other.data().is_unordered == item.data().is_unordered &&
+ other.data().message_id == item.data().message_id) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap()
+ << " as abandoned";
+ if (other.should_be_retransmitted()) {
+ to_be_retransmitted_.erase(tsn);
+ }
+ other.Abandon();
+ }
+ }
+}
+
+ForwardTsnChunk RetransmissionQueue::CreateForwardTsn() const {
+ std::unordered_map<StreamID, SSN, StreamID::Hasher>
+ skipped_per_ordered_stream;
+ UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
+
+ for (const auto& elem : outstanding_data_) {
+ UnwrappedTSN tsn = elem.first;
+ const TxData& item = elem.second;
+
+ if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) {
+ break;
+ }
+ new_cumulative_ack = tsn;
+ if (!item.data().is_unordered &&
+ item.data().ssn > skipped_per_ordered_stream[item.data().stream_id]) {
+ skipped_per_ordered_stream[item.data().stream_id] = item.data().ssn;
+ }
+ }
+
+ std::vector<ForwardTsnChunk::SkippedStream> skipped_streams;
+ skipped_streams.reserve(skipped_per_ordered_stream.size());
+ for (const auto& elem : skipped_per_ordered_stream) {
+ skipped_streams.emplace_back(elem.first, elem.second);
+ }
+ return ForwardTsnChunk(new_cumulative_ack.Wrap(), std::move(skipped_streams));
+}
+
+IForwardTsnChunk RetransmissionQueue::CreateIForwardTsn() const {
+ std::unordered_map<std::pair<IsUnordered, StreamID>, MID, UnorderedStreamHash>
+ skipped_per_stream;
+ UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
+
+ for (const auto& elem : outstanding_data_) {
+ UnwrappedTSN tsn = elem.first;
+ const TxData& item = elem.second;
+
+ if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) {
+ break;
+ }
+ new_cumulative_ack = tsn;
+ std::pair<IsUnordered, StreamID> stream_id =
+ std::make_pair(item.data().is_unordered, item.data().stream_id);
+
+ if (item.data().message_id > skipped_per_stream[stream_id]) {
+ skipped_per_stream[stream_id] = item.data().message_id;
+ }
+ }
+
+ std::vector<IForwardTsnChunk::SkippedStream> skipped_streams;
+ skipped_streams.reserve(skipped_per_stream.size());
+ for (const auto& elem : skipped_per_stream) {
+ const std::pair<IsUnordered, StreamID>& stream = elem.first;
+ MID message_id = elem.second;
+ skipped_streams.emplace_back(stream.first, stream.second, message_id);
+ }
+
+ return IForwardTsnChunk(new_cumulative_ack.Wrap(),
+ std::move(skipped_streams));
+}
+
+void RetransmissionQueue::PrepareResetStreams(
+ rtc::ArrayView<const StreamID> streams) {
+ // TODO(boivie): These calls are now only affecting the send queue. The
+ // packet buffer can also change behavior - for example draining the chunk
+ // producer and eagerly assign TSNs so that an "Outgoing SSN Reset Request"
+ // can be sent quickly, with a known `sender_last_assigned_tsn`.
+ send_queue_.PrepareResetStreams(streams);
+}
+bool RetransmissionQueue::CanResetStreams() const {
+ return send_queue_.CanResetStreams();
+}
+void RetransmissionQueue::CommitResetStreams() {
+ send_queue_.CommitResetStreams();
+}
+void RetransmissionQueue::RollbackResetStreams() {
+ send_queue_.RollbackResetStreams();
+}
+
+} // namespace dcsctp
diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h
new file mode 100644
index 0000000000..7f5baf9fff
--- /dev/null
+++ b/net/dcsctp/tx/retransmission_queue.h
@@ -0,0 +1,371 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_
+#define NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_
+
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
+#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
+#include "net/dcsctp/packet/chunk/sack_chunk.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_options.h"
+#include "net/dcsctp/timer/timer.h"
+#include "net/dcsctp/tx/retransmission_timeout.h"
+#include "net/dcsctp/tx/send_queue.h"
+
+namespace dcsctp {
+
+// The RetransmissionQueue manages all DATA/I-DATA chunks that are in-flight and
+// schedules them to be retransmitted if necessary. Chunks are retransmitted
+// when they have been lost for a number of consecutive SACKs, or when the
+// retransmission timer, `t3_rtx` expires.
+//
+// As congestion control is tightly connected with the state of transmitted
+// packets, that's also managed here to limit the amount of data that is
+// in-flight (sent, but not yet acknowledged).
+class RetransmissionQueue {
+ public:
+ static constexpr size_t kMinimumFragmentedPayload = 10;
+ // State for DATA chunks (message fragments) in the queue - used in tests.
+ enum class State {
+ // The chunk has been sent but not received yet (from the sender's point of
+ // view, as no SACK has been received yet that reference this chunk).
+ kInFlight,
+ // A SACK has been received which explicitly marked this chunk as missing -
+ // it's now NACKED and may be retransmitted if NACKED enough times.
+ kNacked,
+ // A chunk that will be retransmitted when possible.
+ kToBeRetransmitted,
+ // A SACK has been received which explicitly marked this chunk as received.
+ kAcked,
+ // A chunk whose message has expired or has been retransmitted too many
+ // times (RFC3758). It will not be retransmitted anymore.
+ kAbandoned,
+ };
+
+ // Creates a RetransmissionQueue which will send data using `initial_tsn` as
+ // the first TSN to use for sent fragments. It will poll data from
+ // `send_queue` and call `on_send_queue_empty` when it is empty. When
+ // SACKs are received, it will estimate the RTT, and call `on_new_rtt`. When
+ // an outstanding chunk has been ACKed, it will call
+ // `on_clear_retransmission_counter` and will also use `t3_rtx`, which is the
+ // SCTP retransmission timer to manage retransmissions.
+ RetransmissionQueue(absl::string_view log_prefix,
+ TSN initial_tsn,
+ size_t a_rwnd,
+ SendQueue& send_queue,
+ std::function<void(DurationMs rtt)> on_new_rtt,
+ std::function<void()> on_clear_retransmission_counter,
+ Timer& t3_rtx,
+ const DcSctpOptions& options,
+ bool supports_partial_reliability = true,
+ bool use_message_interleaving = false);
+
+ // Handles a received SACK. Returns true if the `sack` was processed and
+ // false if it was discarded due to received out-of-order and not relevant.
+ bool HandleSack(TimeMs now, const SackChunk& sack);
+
+ // Handles an expired retransmission timer.
+ void HandleT3RtxTimerExpiry();
+
+ // Returns a list of chunks to send that would fit in one SCTP packet with
+ // `bytes_remaining_in_packet` bytes available. This may be further limited by
+ // the congestion control windows. Note that `ShouldSendForwardTSN` must be
+ // called prior to this method, to abandon expired chunks, as this method will
+ // not expire any chunks.
+ std::vector<std::pair<TSN, Data>> GetChunksToSend(
+ TimeMs now,
+ size_t bytes_remaining_in_packet);
+
+ // Returns the internal state of all queued chunks. This is only used in
+ // unit-tests.
+ std::vector<std::pair<TSN, State>> GetChunkStatesForTesting() const;
+
+ // Returns the next TSN that will be allocated for sent DATA chunks.
+ TSN next_tsn() const { return next_tsn_.Wrap(); }
+
+ // Returns the size of the congestion window, in bytes. This is the number of
+ // bytes that may be in-flight.
+ size_t cwnd() const { return cwnd_; }
+
+ // Overrides the current congestion window size.
+ void set_cwnd(size_t cwnd) { cwnd_ = cwnd; }
+
+ // Returns the current receiver window size.
+ size_t rwnd() const { return rwnd_; }
+
+ // Returns the number of bytes of packets that are in-flight.
+ size_t outstanding_bytes() const { return outstanding_bytes_; }
+
+ // Given the current time `now`, it will evaluate if there are chunks that
+ // have expired and that need to be discarded. It returns true if a
+ // FORWARD-TSN should be sent.
+ bool ShouldSendForwardTsn(TimeMs now);
+
+ // Creates a FORWARD-TSN chunk.
+ ForwardTsnChunk CreateForwardTsn() const;
+
+ // Creates an I-FORWARD-TSN chunk.
+ IForwardTsnChunk CreateIForwardTsn() const;
+
+ // See the SendQueue for a longer description of these methods related
+ // to stream resetting.
+ void PrepareResetStreams(rtc::ArrayView<const StreamID> streams);
+ bool CanResetStreams() const;
+ void CommitResetStreams();
+ void RollbackResetStreams();
+
+ private:
+ enum class CongestionAlgorithmPhase {
+ kSlowStart,
+ kCongestionAvoidance,
+ };
+
+ // A fragmented message's DATA chunk while in the retransmission queue, and
+ // its associated metadata.
+ class TxData {
+ public:
+ explicit TxData(Data data,
+ absl::optional<size_t> max_retransmissions,
+ TimeMs time_sent,
+ absl::optional<TimeMs> expires_at)
+ : max_retransmissions_(max_retransmissions),
+ time_sent_(time_sent),
+ expires_at_(expires_at),
+ data_(std::move(data)) {}
+
+ TimeMs time_sent() const { return time_sent_; }
+
+ const Data& data() const { return data_; }
+
+ // Acks an item.
+ void Ack();
+
+ // Nacks an item. If it has been nacked enough times, or if `retransmit_now`
+ // is set, it might be marked for retransmission, which is indicated by the
+ // return value.
+ bool Nack(bool retransmit_now = false);
+
+ // Prepares the item to be retransmitted. Sets it as outstanding and
+ // clears all nack counters.
+ void Retransmit();
+
+ // Marks this item as abandoned.
+ void Abandon();
+
+ bool is_outstanding() const { return ack_state_ == AckState::kUnacked; }
+ bool is_acked() const { return ack_state_ == AckState::kAcked; }
+ bool is_abandoned() const { return is_abandoned_; }
+
+ // Indicates if this chunk should be retransmitted.
+ bool should_be_retransmitted() const { return should_be_retransmitted_; }
+ // Indicates if this chunk has ever been retransmitted.
+ bool has_been_retransmitted() const { return num_retransmissions_ > 0; }
+
+ // Given the current time, and the current state of this DATA chunk, it will
+ // indicate if it has expired (SCTP Partial Reliability Extension).
+ bool has_expired(TimeMs now) const;
+
+ private:
+ enum class AckState {
+ kUnacked,
+ kAcked,
+ kNacked,
+ };
+ // Indicates the presence of this chunk, if it's in flight (Unacked), has
+ // been received (Acked) or is lost (Nacked).
+ AckState ack_state_ = AckState::kUnacked;
+ // Indicates if this chunk has been abandoned, which is a terminal state.
+ bool is_abandoned_ = false;
+ // Indicates if this chunk should be retransmitted.
+ bool should_be_retransmitted_ = false;
+
+ // The number of times the DATA chunk has been nacked (by having received a
+ // SACK which doesn't include it). Will be cleared on retransmissions.
+ size_t nack_count_ = 0;
+ // The number of times the DATA chunk has been retransmitted.
+ size_t num_retransmissions_ = 0;
+ // If the message was sent with a maximum number of retransmissions, this is
+ // set to that number. The value zero (0) means that it will never be
+ // retransmitted.
+ const absl::optional<size_t> max_retransmissions_;
+ // When the packet was sent, and placed in this queue.
+ const TimeMs time_sent_;
+ // If the message was sent with an expiration time, this is set.
+ const absl::optional<TimeMs> expires_at_;
+ // The actual data to send/retransmit.
+ Data data_;
+ };
+
+ // Contains variables scoped to a processing of an incoming SACK.
+ struct AckInfo {
+ explicit AckInfo(UnwrappedTSN cumulative_tsn_ack)
+ : highest_tsn_acked(cumulative_tsn_ack) {}
+
+ // All TSNs that have been acked (for the first time) in this SACK.
+ std::vector<TSN> acked_tsns;
+
+ // Bytes acked by increasing cumulative_tsn_ack in this SACK
+ size_t bytes_acked_by_cumulative_tsn_ack = 0;
+
+ // Bytes acked by gap blocks in this SACK.
+ size_t bytes_acked_by_new_gap_ack_blocks = 0;
+
+ // Indicates if this SACK indicates that packet loss has occurred. Just
+ // because a packet is missing in the SACK doesn't necessarily mean that
+ // there is packet loss as that packet might be in-flight and received
+ // out-of-order. But when it has been reported missing consecutive times, it
+ // will eventually be considered "lost" and this will be set.
+ bool has_packet_loss = false;
+
+ // Highest TSN Newly Acknowledged, an SCTP variable.
+ UnwrappedTSN highest_tsn_acked;
+ };
+
+ bool IsConsistent() const;
+
+ // Returns how large a chunk will be, serialized, carrying the data
+ size_t GetSerializedChunkSize(const Data& data) const;
+
+ // Indicates if the congestion control algorithm is in "fast recovery".
+ bool is_in_fast_recovery() const {
+ return fast_recovery_exit_tsn_.has_value();
+ }
+
+ // Indicates if the congestion control algorithm is in "fast retransmit".
+ bool is_in_fast_retransmit() const { return is_in_fast_retransmit_; }
+
+ // Indicates if the provided SACK is valid given what has previously been
+ // received. If it returns false, the SACK is most likely a duplicate of
+ // something already seen, so this returning false doesn't necessarily mean
+ // that the SACK is illegal.
+ bool IsSackValid(const SackChunk& sack) const;
+
+ // Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items
+ // in the retransmission queue up until this value and will update `ack_info`
+ // by setting `bytes_acked_by_cumulative_tsn_ack` and `acked_tsns`.
+ void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info);
+
+ // Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK
+ // as "acked" and update `ack_info` by adding new TSNs to `added_tsns`.
+ void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack,
+ rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
+ AckInfo& ack_info);
+
+ // Mark chunks reported as "missing", as "nacked" or "to be retransmitted"
+ // depending how many times this has happened. Only packets up until
+ // `ack_info.highest_tsn_acked` (highest TSN newly acknowledged) are
+ // nacked/retransmitted. The method will set `ack_info.has_packet_loss`.
+ void NackBetweenAckBlocks(
+ UnwrappedTSN cumulative_tsn_ack,
+ rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
+ AckInfo& ack_info);
+
+ // When a SACK chunk is received, this method will be called which _may_ call
+ // into the `RetransmissionTimeout` to update the RTO.
+ void UpdateRTT(TimeMs now, UnwrappedTSN cumulative_tsn_ack);
+
+ // If the congestion control is in "fast recovery mode", this may be exited
+ // now.
+ void MaybeExitFastRecovery(UnwrappedTSN cumulative_tsn_ack);
+
+ // If chunks have been ACKed, stop the retransmission timer.
+ void StopT3RtxTimerOnIncreasedCumulativeTsnAck(
+ UnwrappedTSN cumulative_tsn_ack);
+
+ // Update the congestion control algorithm given as the cumulative ack TSN
+ // value has increased, as reported in an incoming SACK chunk.
+ void HandleIncreasedCumulativeTsnAck(size_t outstanding_bytes,
+ size_t total_bytes_acked);
+ // Update the congestion control algorithm, given as packet loss has been
+ // detected, as reported in an incoming SACK chunk.
+ void HandlePacketLoss(UnwrappedTSN highest_tsn_acked);
+ // Update the view of the receiver window size.
+ void UpdateReceiverWindow(uint32_t a_rwnd);
+ // Given `max_size` of space left in a packet, which chunks can be added to
+ // it?
+ std::vector<std::pair<TSN, Data>> GetChunksToBeRetransmitted(size_t max_size);
+ // If there is data sent and not ACKED, ensure that the retransmission timer
+ // is running.
+ void StartT3RtxTimerIfOutstandingData();
+
+ // Given the current time `now_ms`, expire chunks that have a limited
+ // lifetime.
+ void ExpireChunks(TimeMs now);
+ // Given that a message fragment, `item` has expired, expire all other
+ // fragments that share the same message - even never-before-sent fragments
+ // that are still in the SendQueue.
+ void ExpireAllFor(const RetransmissionQueue::TxData& item);
+
+ // Returns the current congestion control algorithm phase.
+ CongestionAlgorithmPhase phase() const {
+ return (cwnd_ <= ssthresh_)
+ ? CongestionAlgorithmPhase::kSlowStart
+ : CongestionAlgorithmPhase::kCongestionAvoidance;
+ }
+
+ const DcSctpOptions options_;
+ // If the peer supports RFC3758 - SCTP Partial Reliability Extension.
+ const bool partial_reliability_;
+ const std::string log_prefix_;
+ // The size of the data chunk (DATA/I-DATA) header that is used.
+ const size_t data_chunk_header_size_;
+ // Called when a new RTT measurement has been done
+ const std::function<void(DurationMs rtt)> on_new_rtt_;
+ // Called when a SACK has been seen that cleared the retransmission counter.
+ const std::function<void()> on_clear_retransmission_counter_;
+ // The retransmission counter.
+ Timer& t3_rtx_;
+ // Unwraps TSNs
+ UnwrappedTSN::Unwrapper tsn_unwrapper_;
+
+ // Congestion Window. Number of bytes that may be in-flight (sent, not acked).
+ size_t cwnd_;
+ // Receive Window. Number of bytes available in the receiver's RX buffer.
+ size_t rwnd_;
+ // Slow Start Threshold. See RFC4960.
+ size_t ssthresh_;
+ // Partial Bytes Acked. See RFC4960.
+ size_t partial_bytes_acked_ = 0;
+ // If set, fast recovery is enabled until this TSN has been cumulative
+ // acked.
+ absl::optional<UnwrappedTSN> fast_recovery_exit_tsn_ = absl::nullopt;
+ // Indicates if the congestion algorithm is in fast retransmit.
+ bool is_in_fast_retransmit_ = false;
+
+ // Next TSN to used.
+ UnwrappedTSN next_tsn_;
+ // The last cumulative TSN ack number
+ UnwrappedTSN last_cumulative_tsn_ack_;
+ // The send queue.
+ SendQueue& send_queue_;
+ // All the outstanding data chunks that are in-flight and that have not been
+ // cumulative acked. Note that it also contains chunks that have been acked in
+ // gap ack blocks.
+ std::map<UnwrappedTSN, TxData> outstanding_data_;
+ // Data chunks that are to be retransmitted.
+ std::set<UnwrappedTSN> to_be_retransmitted_;
+ // The number of bytes that are in-flight (sent but not yet acked or nacked).
+ size_t outstanding_bytes_ = 0;
+};
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_
diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc
new file mode 100644
index 0000000000..e02b111b5a
--- /dev/null
+++ b/net/dcsctp/tx/retransmission_queue_test.cc
@@ -0,0 +1,1007 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/retransmission_queue.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/packet/chunk/data_chunk.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
+#include "net/dcsctp/packet/chunk/sack_chunk.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_options.h"
+#include "net/dcsctp/testing/data_generator.h"
+#include "net/dcsctp/timer/fake_timeout.h"
+#include "net/dcsctp/timer/timer.h"
+#include "net/dcsctp/tx/mock_send_queue.h"
+#include "net/dcsctp/tx/send_queue.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+using ::testing::MockFunction;
+using State = ::dcsctp::RetransmissionQueue::State;
+using ::testing::_;
+using ::testing::ElementsAre;
+using ::testing::IsEmpty;
+using ::testing::NiceMock;
+using ::testing::Pair;
+using ::testing::Return;
+using ::testing::SizeIs;
+using ::testing::UnorderedElementsAre;
+
+constexpr uint32_t kArwnd = 100000;
+constexpr uint32_t kMaxMtu = 1191;
+
+class RetransmissionQueueTest : public testing::Test {
+ protected:
+ RetransmissionQueueTest()
+ : gen_(MID(42)),
+ timeout_manager_([this]() { return now_; }),
+ timer_manager_([this]() { return timeout_manager_.CreateTimeout(); }),
+ timer_(timer_manager_.CreateTimer(
+ "test/t3_rtx",
+ []() { return absl::nullopt; },
+ TimerOptions(DurationMs(0)))) {}
+
+ std::function<SendQueue::DataToSend(TimeMs, size_t)> CreateChunk() {
+ return [this](TimeMs now, size_t max_size) {
+ return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
+ };
+ }
+
+ std::vector<TSN> GetSentPacketTSNs(RetransmissionQueue& queue) {
+ std::vector<TSN> tsns;
+ for (const auto& elem : queue.GetChunksToSend(now_, 10000)) {
+ tsns.push_back(elem.first);
+ }
+ return tsns;
+ }
+
+ RetransmissionQueue CreateQueue(bool supports_partial_reliability = true,
+ bool use_message_interleaving = false) {
+ DcSctpOptions options;
+ options.mtu = kMaxMtu;
+ return RetransmissionQueue(
+ "", TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(),
+ on_clear_retransmission_counter_.AsStdFunction(), *timer_, options,
+ supports_partial_reliability, use_message_interleaving);
+ }
+
+ DataGenerator gen_;
+ TimeMs now_ = TimeMs(0);
+ FakeTimeoutManager timeout_manager_;
+ TimerManager timer_manager_;
+ NiceMock<MockFunction<void(DurationMs rtt_ms)>> on_rtt_;
+ NiceMock<MockFunction<void()>> on_clear_retransmission_counter_;
+ NiceMock<MockSendQueue> producer_;
+ std::unique_ptr<Timer> timer_;
+};
+
+TEST_F(RetransmissionQueueTest, InitialAckedPrevTsn) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked)));
+}
+
+TEST_F(RetransmissionQueueTest, SendOneChunk) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight)));
+}
+
+TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
+
+ queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(10), State::kAcked)));
+}
+
+TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_THAT(GetSentPacketTSNs(queue),
+ testing::ElementsAre(TSN(10), TSN(11), TSN(12)));
+
+ queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(11), State::kAcked), //
+ Pair(TSN(12), State::kInFlight)));
+}
+
+TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_THAT(GetSentPacketTSNs(queue),
+ testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
+ TSN(15), TSN(16), TSN(17)));
+
+ queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
+ {SackChunk::GapAckBlock(2, 3),
+ SackChunk::GapAckBlock(5, 5)},
+ {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kNacked), //
+ Pair(TSN(14), State::kAcked), //
+ Pair(TSN(15), State::kAcked), //
+ Pair(TSN(16), State::kNacked), //
+ Pair(TSN(17), State::kAcked)));
+}
+
+TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_THAT(GetSentPacketTSNs(queue),
+ testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
+ TSN(15), TSN(16), TSN(17)));
+
+ // Send more chunks, but leave some as gaps to force retransmission after
+ // three NACKs.
+
+ // Send 18
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18)));
+
+ // Ack 12, 14-15, 17-18
+ queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
+ {SackChunk::GapAckBlock(2, 3),
+ SackChunk::GapAckBlock(5, 6)},
+ {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kNacked), //
+ Pair(TSN(14), State::kAcked), //
+ Pair(TSN(15), State::kAcked), //
+ Pair(TSN(16), State::kNacked), //
+ Pair(TSN(17), State::kAcked), //
+ Pair(TSN(18), State::kAcked)));
+
+ // Send 19
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19)));
+
+ // Ack 12, 14-15, 17-19
+ queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
+ {SackChunk::GapAckBlock(2, 3),
+ SackChunk::GapAckBlock(5, 7)},
+ {}));
+
+ // Send 20
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20)));
+
+ // Ack 12, 14-15, 17-20
+ queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
+ {SackChunk::GapAckBlock(2, 3),
+ SackChunk::GapAckBlock(5, 8)},
+ {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kToBeRetransmitted), //
+ Pair(TSN(14), State::kAcked), //
+ Pair(TSN(15), State::kAcked), //
+ Pair(TSN(16), State::kToBeRetransmitted), //
+ Pair(TSN(17), State::kAcked), //
+ Pair(TSN(18), State::kAcked), //
+ Pair(TSN(19), State::kAcked), //
+ Pair(TSN(20), State::kAcked)));
+
+ // This will trigger "fast retransmit" mode and only chunks 13 and 16 will be
+ // resent right now. The send queue will not even be queried.
+ EXPECT_CALL(producer_, Produce).Times(0);
+
+ EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13), TSN(16)));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kInFlight), //
+ Pair(TSN(14), State::kAcked), //
+ Pair(TSN(15), State::kAcked), //
+ Pair(TSN(16), State::kInFlight), //
+ Pair(TSN(17), State::kAcked), //
+ Pair(TSN(18), State::kAcked), //
+ Pair(TSN(19), State::kAcked), //
+ Pair(TSN(20), State::kAcked)));
+}
+
+TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight), //
+ Pair(TSN(11), State::kInFlight)));
+}
+
+TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight)));
+
+ // Will force chunks to be retransmitted
+ queue.HandleT3RtxTimerExpiry();
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kToBeRetransmitted)));
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kToBeRetransmitted)));
+
+ std::vector<std::pair<TSN, Data>> chunks_to_rtx =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight)));
+}
+
+TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) {
+ RetransmissionQueue queue =
+ CreateQueue(/*supports_partial_reliability=*/false);
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight)));
+
+ // Will force chunks to be retransmitted
+ queue.HandleT3RtxTimerExpiry();
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kToBeRetransmitted)));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .Times(0);
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+} // namespace dcsctp
+
+TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight)));
+
+ // Will force chunks to be retransmitted
+ queue.HandleT3RtxTimerExpiry();
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kToBeRetransmitted)));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .Times(1);
+
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kAbandoned)));
+
+ std::vector<std::pair<TSN, Data>> chunks_to_rtx =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_rtx, testing::IsEmpty());
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kAbandoned)));
+}
+
+TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
+ dts.max_retransmissions = 3;
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight)));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .Times(0);
+
+ // Retransmission 1
+ queue.HandleT3RtxTimerExpiry();
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+ EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
+
+ // Retransmission 2
+ queue.HandleT3RtxTimerExpiry();
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+ EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
+
+ // Retransmission 3
+ queue.HandleT3RtxTimerExpiry();
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+ EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
+
+ // Retransmission 4 - not allowed.
+ queue.HandleT3RtxTimerExpiry();
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .Times(1);
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+ EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kAbandoned)));
+}
+
+TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
+ RetransmissionQueue queue = CreateQueue();
+ static constexpr size_t kCwnd = 1200;
+ queue.set_cwnd(kCwnd);
+ EXPECT_EQ(queue.cwnd(), kCwnd);
+ EXPECT_EQ(queue.outstanding_bytes(), 0u);
+
+ std::vector<uint8_t> payload(1000);
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this, payload](TimeMs, size_t) {
+ return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1500);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight)));
+ EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
+
+ // Will force chunks to be retransmitted
+ queue.HandleT3RtxTimerExpiry();
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kToBeRetransmitted)));
+ EXPECT_EQ(queue.outstanding_bytes(), 0u);
+
+ std::vector<std::pair<TSN, Data>> chunks_to_rtx =
+ queue.GetChunksToSend(now_, 1500);
+ EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight)));
+ EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
+}
+
+TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ // Send and ack first chunk (TSN 10)
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
+ Pair(TSN(12), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight), //
+ Pair(TSN(11), State::kInFlight), //
+ Pair(TSN(12), State::kInFlight)));
+
+ // Chunk 10 is acked, but the remaining are lost
+ queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
+ queue.HandleT3RtxTimerExpiry();
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(10), State::kAcked), //
+ Pair(TSN(11), State::kToBeRetransmitted), //
+ Pair(TSN(12), State::kToBeRetransmitted)));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .WillOnce(Return(true));
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+ // NOTE: The TSN=13 represents the end fragment.
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(10), State::kAcked), //
+ Pair(TSN(11), State::kAbandoned), //
+ Pair(TSN(12), State::kAbandoned), //
+ Pair(TSN(13), State::kAbandoned)));
+
+ ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
+ EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13));
+ EXPECT_THAT(forward_tsn.skipped_streams(),
+ UnorderedElementsAre(
+ ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42))));
+}
+
+TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "E"));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ // Send and ack first chunk (TSN 10)
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
+ Pair(TSN(12), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight), //
+ Pair(TSN(11), State::kInFlight), //
+ Pair(TSN(12), State::kInFlight)));
+
+ // Chunk 10 is acked, but the remaining are lost
+ queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
+ queue.HandleT3RtxTimerExpiry();
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(10), State::kAcked), //
+ Pair(TSN(11), State::kToBeRetransmitted), //
+ Pair(TSN(12), State::kToBeRetransmitted)));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .WillOnce(Return(false));
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+ // NOTE: No additional TSN representing the end fragment, as that's TSN=12.
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(10), State::kAcked), //
+ Pair(TSN(11), State::kAbandoned), //
+ Pair(TSN(12), State::kAbandoned)));
+
+ ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
+ EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12));
+ EXPECT_THAT(forward_tsn.skipped_streams(),
+ UnorderedElementsAre(
+ ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42))));
+}
+
+TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
+ RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ DataGeneratorOptions opts;
+ opts.stream_id = StreamID(1);
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", opts));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ DataGeneratorOptions opts;
+ opts.stream_id = StreamID(2);
+ SendQueue::DataToSend dts(gen_.Unordered({1, 2, 3, 4}, "B", opts));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ DataGeneratorOptions opts;
+ opts.stream_id = StreamID(3);
+ SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "B", opts));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ DataGeneratorOptions opts;
+ opts.stream_id = StreamID(4);
+ SendQueue::DataToSend dts(gen_.Ordered({13, 14, 15, 16}, "B", opts));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
+ Pair(TSN(12), _), Pair(TSN(13), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight), //
+ Pair(TSN(11), State::kInFlight), //
+ Pair(TSN(12), State::kInFlight), //
+ Pair(TSN(13), State::kInFlight)));
+
+ // Chunk 13 is acked, but the remaining are lost
+ queue.HandleSack(
+ now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(4, 4)}, {}));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kNacked), //
+ Pair(TSN(11), State::kNacked), //
+ Pair(TSN(12), State::kNacked), //
+ Pair(TSN(13), State::kAcked)));
+
+ queue.HandleT3RtxTimerExpiry();
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kToBeRetransmitted), //
+ Pair(TSN(11), State::kToBeRetransmitted), //
+ Pair(TSN(12), State::kToBeRetransmitted), //
+ Pair(TSN(13), State::kAcked)));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .WillOnce(Return(true));
+ EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42)))
+ .WillOnce(Return(true));
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42)))
+ .WillOnce(Return(true));
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kAbandoned), //
+ Pair(TSN(11), State::kAbandoned), //
+ Pair(TSN(12), State::kAbandoned), //
+ Pair(TSN(13), State::kAcked),
+ // Representing end fragments of stream 1-3
+ Pair(TSN(14), State::kAbandoned), //
+ Pair(TSN(15), State::kAbandoned), //
+ Pair(TSN(16), State::kAbandoned)));
+
+ IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn();
+ EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12));
+ EXPECT_THAT(
+ forward_tsn1.skipped_streams(),
+ UnorderedElementsAre(IForwardTsnChunk::SkippedStream(
+ IsUnordered(false), StreamID(1), MID(42)),
+ IForwardTsnChunk::SkippedStream(
+ IsUnordered(true), StreamID(2), MID(42)),
+ IForwardTsnChunk::SkippedStream(
+ IsUnordered(false), StreamID(3), MID(42))));
+
+ // When TSN 13 is acked, the placeholder "end fragments" must be skipped as
+ // well.
+
+ // A receiver is more likely to ack TSN 13, but do it incrementally.
+ queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
+
+ EXPECT_CALL(producer_, Discard).Times(0);
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+ queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {}));
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(13), State::kAcked), //
+ Pair(TSN(14), State::kAbandoned), //
+ Pair(TSN(15), State::kAbandoned), //
+ Pair(TSN(16), State::kAbandoned)));
+
+ IForwardTsnChunk forward_tsn2 = queue.CreateIForwardTsn();
+ EXPECT_EQ(forward_tsn2.new_cumulative_tsn(), TSN(16));
+ EXPECT_THAT(
+ forward_tsn2.skipped_streams(),
+ UnorderedElementsAre(IForwardTsnChunk::SkippedStream(
+ IsUnordered(false), StreamID(1), MID(42)),
+ IForwardTsnChunk::SkippedStream(
+ IsUnordered(true), StreamID(2), MID(42)),
+ IForwardTsnChunk::SkippedStream(
+ IsUnordered(false), StreamID(3), MID(42))));
+}
+
+TEST_F(RetransmissionQueueTest, MeasureRTT) {
+ RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
+
+ now_ = now_ + DurationMs(123);
+
+ EXPECT_CALL(on_rtt_, Call(DurationMs(123))).Times(1);
+ queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
+}
+
+TEST_F(RetransmissionQueueTest, ValidateCumTsnAtRest) {
+ RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
+
+ EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {})));
+ EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})));
+}
+
+TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) {
+ RetransmissionQueue queue = CreateQueue();
+
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_THAT(GetSentPacketTSNs(queue),
+ testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
+ TSN(15), TSN(16), TSN(17)));
+
+ EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(14), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(15), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(16), kArwnd, {}, {})));
+ EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(17), kArwnd, {}, {})));
+ EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(18), kArwnd, {}, {})));
+}
+
+TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_THAT(GetSentPacketTSNs(queue),
+ testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
+ TSN(15), TSN(16), TSN(17)));
+
+ // Ack 9, 20-25. This is an invalid SACK, but should still be handled.
+ queue.HandleSack(
+ now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(11, 16)}, {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight), //
+ Pair(TSN(11), State::kInFlight), //
+ Pair(TSN(12), State::kInFlight), //
+ Pair(TSN(13), State::kInFlight), //
+ Pair(TSN(14), State::kInFlight), //
+ Pair(TSN(15), State::kInFlight), //
+ Pair(TSN(16), State::kInFlight), //
+ Pair(TSN(17), State::kInFlight)));
+}
+
+TEST_F(RetransmissionQueueTest, HandleInvalidGapAckBlocks) {
+ RetransmissionQueue queue = CreateQueue();
+
+ // Nothing produced - nothing in retransmission queue
+
+ // Ack 9, 12-13
+ queue.HandleSack(
+ now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(3, 4)}, {}));
+
+ // Gap ack blocks are just ignore.
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked)));
+}
+
+TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_THAT(GetSentPacketTSNs(queue),
+ testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
+ TSN(15), TSN(16), TSN(17)));
+
+ // Ack 9, 10-14. This is actually an invalid ACK as the first gap can't be
+ // adjacent to the cum-tsn-ack, but it's not strictly forbidden. However, the
+ // cum-tsn-ack should not move, as the gap-ack-blocks are just advisory.
+ queue.HandleSack(
+ now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(1, 5)}, {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kAcked), //
+ Pair(TSN(11), State::kAcked), //
+ Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kAcked), //
+ Pair(TSN(14), State::kAcked), //
+ Pair(TSN(15), State::kInFlight), //
+ Pair(TSN(16), State::kInFlight), //
+ Pair(TSN(17), State::kInFlight)));
+}
+
+TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) {
+ RetransmissionQueue queue = CreateQueue();
+
+ // See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the
+ // magic numbers in this test.
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t size) {
+ EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize);
+
+ std::vector<uint8_t> payload(183);
+ return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
+ })
+ .WillOnce([this](TimeMs, size_t size) {
+ EXPECT_EQ(size, 976 - DataChunk::kHeaderSize);
+
+ std::vector<uint8_t> payload(957);
+ return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
+ });
+
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1188 - 12);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
+}
+
+TEST_F(RetransmissionQueueTest, AccountsInflightAbandonedChunksAsOutstanding) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ // Send and ack first chunk (TSN 10)
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
+ Pair(TSN(12), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight), //
+ Pair(TSN(11), State::kInFlight), //
+ Pair(TSN(12), State::kInFlight)));
+ EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
+
+ // Discard the message while it was outstanding.
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .Times(1);
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kAbandoned), //
+ Pair(TSN(11), State::kAbandoned), //
+ Pair(TSN(12), State::kAbandoned)));
+ EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
+
+ // Now ACK those, one at a time.
+ queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
+ EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 2u);
+
+ queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
+ EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 1u);
+
+ queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
+ EXPECT_EQ(queue.outstanding_bytes(), 0u);
+}
+
+TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ // Send and ack first chunk (TSN 10)
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
+ Pair(TSN(12), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight), //
+ Pair(TSN(11), State::kInFlight), //
+ Pair(TSN(12), State::kInFlight)));
+ EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
+
+ // Mark the message as lost.
+ queue.HandleT3RtxTimerExpiry();
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .Times(1);
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kAbandoned), //
+ Pair(TSN(11), State::kAbandoned), //
+ Pair(TSN(12), State::kAbandoned)));
+ EXPECT_EQ(queue.outstanding_bytes(), 0u);
+
+ // Now ACK those, one at a time.
+ queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
+ EXPECT_EQ(queue.outstanding_bytes(), 0u);
+
+ queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
+ EXPECT_EQ(queue.outstanding_bytes(), 0u);
+
+ queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
+ EXPECT_EQ(queue.outstanding_bytes(), 0u);
+}
+
+} // namespace
+} // namespace dcsctp
diff --git a/net/dcsctp/tx/retransmission_timeout.cc b/net/dcsctp/tx/retransmission_timeout.cc
new file mode 100644
index 0000000000..7d545a07d0
--- /dev/null
+++ b/net/dcsctp/tx/retransmission_timeout.cc
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/retransmission_timeout.h"
+
+#include <cmath>
+#include <cstdint>
+
+#include "net/dcsctp/public/dcsctp_options.h"
+
+namespace dcsctp {
+namespace {
+// https://tools.ietf.org/html/rfc4960#section-15
+constexpr double kRtoAlpha = 0.125;
+constexpr double kRtoBeta = 0.25;
+} // namespace
+
+RetransmissionTimeout::RetransmissionTimeout(const DcSctpOptions& options)
+ : min_rto_(*options.rto_min),
+ max_rto_(*options.rto_max),
+ max_rtt_(*options.rtt_max),
+ rto_(*options.rto_initial) {}
+
+void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) {
+ double rtt = *measured_rtt;
+
+ // Unrealistic values will be skipped. If a wrongly measured (or otherwise
+ // corrupt) value was processed, it could change the state in a way that would
+ // take a very long time to recover.
+ if (rtt < 0.0 || rtt > max_rtt_) {
+ return;
+ }
+
+ if (first_measurement_) {
+ // https://tools.ietf.org/html/rfc4960#section-6.3.1
+ // "When the first RTT measurement R is made, set
+ // SRTT <- R,
+ // RTTVAR <- R/2, and
+ // RTO <- SRTT + 4 * RTTVAR."
+ srtt_ = rtt;
+ rttvar_ = rtt * 0.5;
+ rto_ = srtt_ + 4 * rttvar_;
+ first_measurement_ = false;
+ } else {
+ // https://tools.ietf.org/html/rfc4960#section-6.3.1
+ // "When a new RTT measurement R' is made, set
+ // RTTVAR <- (1 - RTO.Beta) * RTTVAR + RTO.Beta * |SRTT - R'|
+ // SRTT <- (1 - RTO.Alpha) * SRTT + RTO.Alpha * R'
+ // RTO <- SRTT + 4 * RTTVAR."
+ rttvar_ = (1 - kRtoBeta) * rttvar_ + kRtoBeta * std::abs(srtt_ - rtt);
+ srtt_ = (1 - kRtoAlpha) * srtt_ + kRtoAlpha * rtt;
+ rto_ = srtt_ + 4 * rttvar_;
+ }
+
+ // If the RTO becomes smaller or equal to RTT, expiration timers will be
+ // scheduled at the same time as packets are expected. Only happens in
+ // extremely stable RTTs, i.e. in simulations.
+ rto_ = std::fmax(rto_, rtt + 1);
+
+ // Clamp RTO between min and max.
+ rto_ = std::fmin(std::fmax(rto_, min_rto_), max_rto_);
+}
+} // namespace dcsctp
diff --git a/net/dcsctp/tx/retransmission_timeout.h b/net/dcsctp/tx/retransmission_timeout.h
new file mode 100644
index 0000000000..0fac33e59c
--- /dev/null
+++ b/net/dcsctp/tx/retransmission_timeout.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_TX_RETRANSMISSION_TIMEOUT_H_
+#define NET_DCSCTP_TX_RETRANSMISSION_TIMEOUT_H_
+
+#include <cstdint>
+#include <functional>
+
+#include "net/dcsctp/public/dcsctp_options.h"
+
+namespace dcsctp {
+
+// Manages updating of the Retransmission Timeout (RTO) SCTP variable, which is
+// used directly as the base timeout for T3-RTX and for other timers, such as
+// delayed ack.
+//
+// When a round-trip-time (RTT) is calculated (outside this class), `Observe`
+// is called, which calculates the retransmission timeout (RTO) value. The RTO
+// value will become larger if the RTT is high and/or the RTT values are varying
+// a lot, which is an indicator of a bad connection.
+class RetransmissionTimeout {
+ public:
+ explicit RetransmissionTimeout(const DcSctpOptions& options);
+
+ // To be called when a RTT has been measured, to update the RTO value.
+ void ObserveRTT(DurationMs measured_rtt);
+
+ // Returns the Retransmission Timeout (RTO) value, in milliseconds.
+ DurationMs rto() const { return DurationMs(rto_); }
+
+ // Returns the smoothed RTT value, in milliseconds.
+ DurationMs srtt() const { return DurationMs(srtt_); }
+
+ private:
+ // Note that all intermediate state calculation is done in the floating point
+ // domain, to maintain precision.
+ const double min_rto_;
+ const double max_rto_;
+ const double max_rtt_;
+ // If this is the first measurement
+ bool first_measurement_ = true;
+ // Smoothed Round-Trip Time
+ double srtt_ = 0.0;
+ // Round-Trip Time Variation
+ double rttvar_ = 0.0;
+ // Retransmission Timeout
+ double rto_;
+};
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_TX_RETRANSMISSION_TIMEOUT_H_
diff --git a/net/dcsctp/tx/retransmission_timeout_test.cc b/net/dcsctp/tx/retransmission_timeout_test.cc
new file mode 100644
index 0000000000..3b2e3399fe
--- /dev/null
+++ b/net/dcsctp/tx/retransmission_timeout_test.cc
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/retransmission_timeout.h"
+
+#include "net/dcsctp/public/dcsctp_options.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+
+constexpr DurationMs kMaxRtt = DurationMs(8'000);
+constexpr DurationMs kInitialRto = DurationMs(200);
+constexpr DurationMs kMaxRto = DurationMs(800);
+constexpr DurationMs kMinRto = DurationMs(120);
+
+DcSctpOptions MakeOptions() {
+ DcSctpOptions options;
+ options.rtt_max = kMaxRtt;
+ options.rto_initial = kInitialRto;
+ options.rto_max = kMaxRto;
+ options.rto_min = kMinRto;
+ return options;
+}
+
+TEST(RetransmissionTimeoutTest, HasValidInitialRto) {
+ RetransmissionTimeout rto_(MakeOptions());
+ EXPECT_EQ(rto_.rto(), kInitialRto);
+}
+
+TEST(RetransmissionTimeoutTest, NegativeValuesDoNotAffectRTO) {
+ RetransmissionTimeout rto_(MakeOptions());
+ // Initial negative value
+ rto_.ObserveRTT(DurationMs(-10));
+ EXPECT_EQ(rto_.rto(), kInitialRto);
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 372);
+ // Subsequent negative value
+ rto_.ObserveRTT(DurationMs(-10));
+ EXPECT_EQ(*rto_.rto(), 372);
+}
+
+TEST(RetransmissionTimeoutTest, TooLargeValuesDoNotAffectRTO) {
+ RetransmissionTimeout rto_(MakeOptions());
+ // Initial too large value
+ rto_.ObserveRTT(kMaxRtt + DurationMs(100));
+ EXPECT_EQ(rto_.rto(), kInitialRto);
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 372);
+ // Subsequent too large value
+ rto_.ObserveRTT(kMaxRtt + DurationMs(100));
+ EXPECT_EQ(*rto_.rto(), 372);
+}
+
+TEST(RetransmissionTimeoutTest, WillNeverGoBelowMinimumRto) {
+ RetransmissionTimeout rto_(MakeOptions());
+ for (int i = 0; i < 1000; ++i) {
+ rto_.ObserveRTT(DurationMs(1));
+ }
+ EXPECT_GE(rto_.rto(), kMinRto);
+}
+
+TEST(RetransmissionTimeoutTest, WillNeverGoAboveMaximumRto) {
+ RetransmissionTimeout rto_(MakeOptions());
+ for (int i = 0; i < 1000; ++i) {
+ rto_.ObserveRTT(kMaxRtt - DurationMs(1));
+ // Adding jitter, which would make it RTO be well above RTT.
+ rto_.ObserveRTT(kMaxRtt - DurationMs(100));
+ }
+ EXPECT_LE(rto_.rto(), kMaxRto);
+}
+
+TEST(RetransmissionTimeoutTest, CalculatesRtoForStableRtt) {
+ RetransmissionTimeout rto_(MakeOptions());
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 372);
+ rto_.ObserveRTT(DurationMs(128));
+ EXPECT_EQ(*rto_.rto(), 314);
+ rto_.ObserveRTT(DurationMs(123));
+ EXPECT_EQ(*rto_.rto(), 268);
+ rto_.ObserveRTT(DurationMs(125));
+ EXPECT_EQ(*rto_.rto(), 233);
+ rto_.ObserveRTT(DurationMs(127));
+ EXPECT_EQ(*rto_.rto(), 208);
+}
+
+TEST(RetransmissionTimeoutTest, CalculatesRtoForUnstableRtt) {
+ RetransmissionTimeout rto_(MakeOptions());
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 372);
+ rto_.ObserveRTT(DurationMs(402));
+ EXPECT_EQ(*rto_.rto(), 622);
+ rto_.ObserveRTT(DurationMs(728));
+ EXPECT_EQ(*rto_.rto(), 800);
+ rto_.ObserveRTT(DurationMs(89));
+ EXPECT_EQ(*rto_.rto(), 800);
+ rto_.ObserveRTT(DurationMs(126));
+ EXPECT_EQ(*rto_.rto(), 800);
+}
+
+TEST(RetransmissionTimeoutTest, WillStabilizeAfterAWhile) {
+ RetransmissionTimeout rto_(MakeOptions());
+ rto_.ObserveRTT(DurationMs(124));
+ rto_.ObserveRTT(DurationMs(402));
+ rto_.ObserveRTT(DurationMs(728));
+ rto_.ObserveRTT(DurationMs(89));
+ rto_.ObserveRTT(DurationMs(126));
+ EXPECT_EQ(*rto_.rto(), 800);
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 800);
+ rto_.ObserveRTT(DurationMs(122));
+ EXPECT_EQ(*rto_.rto(), 709);
+ rto_.ObserveRTT(DurationMs(123));
+ EXPECT_EQ(*rto_.rto(), 630);
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 561);
+ rto_.ObserveRTT(DurationMs(122));
+ EXPECT_EQ(*rto_.rto(), 504);
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 453);
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 409);
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 372);
+ rto_.ObserveRTT(DurationMs(124));
+ EXPECT_EQ(*rto_.rto(), 339);
+}
+
+TEST(RetransmissionTimeoutTest, WillAlwaysStayAboveRTT) {
+ // In simulations, it's quite common to have a very stable RTT, and having an
+ // RTO at the same value will cause issues as expiry timers will be scheduled
+ // to be expire exactly when a packet is supposed to arrive. The RTO must be
+ // larger than the RTT. In non-simulated environments, this is a non-issue as
+ // any jitter will increase the RTO.
+ RetransmissionTimeout rto_(MakeOptions());
+
+ for (int i = 0; i < 100; ++i) {
+ rto_.ObserveRTT(DurationMs(124));
+ }
+ EXPECT_GT(*rto_.rto(), 124);
+}
+
+} // namespace
+} // namespace dcsctp
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
new file mode 100644
index 0000000000..4bfbaf718b
--- /dev/null
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -0,0 +1,432 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/rr_send_queue.h"
+
+#include <cstdint>
+#include <deque>
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/public/dcsctp_socket.h"
+#include "net/dcsctp/public/types.h"
+#include "net/dcsctp/tx/send_queue.h"
+#include "rtc_base/logging.h"
+
+namespace dcsctp {
+
+bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) {
+ while (!items_.empty()) {
+ RRSendQueue::OutgoingStream::Item& item = items_.front();
+ if (item.message_id.has_value()) {
+ // Already partially sent messages can always continue to be sent.
+ return true;
+ }
+
+ // Message has expired. Remove it and inspect the next one.
+ if (item.expires_at.has_value() && *item.expires_at <= now) {
+ buffered_amount_.Decrease(item.remaining_size);
+ total_buffered_amount_.Decrease(item.remaining_size);
+ items_.pop_front();
+ RTC_DCHECK(IsConsistent());
+ continue;
+ }
+
+ if (is_paused_) {
+ // The stream has paused (and there is no partially sent message).
+ return false;
+ }
+ return true;
+ }
+ return false;
+}
+
+bool RRSendQueue::IsConsistent() const {
+ size_t total_buffered_amount = 0;
+ for (const auto& stream_entry : streams_) {
+ total_buffered_amount += stream_entry.second.buffered_amount().value();
+ }
+
+ if (previous_message_has_ended_) {
+ auto it = streams_.find(current_stream_id_);
+ if (it != streams_.end() && it->second.has_partially_sent_message()) {
+ RTC_DLOG(LS_ERROR)
+ << "Previous message has ended, but still partial message in stream";
+ return false;
+ }
+ } else {
+ auto it = streams_.find(current_stream_id_);
+ if (it == streams_.end() || !it->second.has_partially_sent_message()) {
+ RTC_DLOG(LS_ERROR)
+ << "Previous message has NOT ended, but there is no partial message";
+ return false;
+ }
+ }
+
+ return total_buffered_amount == total_buffered_amount_.value();
+}
+
+bool RRSendQueue::OutgoingStream::IsConsistent() const {
+ size_t bytes = 0;
+ for (const auto& item : items_) {
+ bytes += item.remaining_size;
+ }
+ return bytes == buffered_amount_.value();
+}
+
+void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) {
+ RTC_DCHECK(bytes <= value_);
+ size_t old_value = value_;
+ value_ -= bytes;
+
+ if (old_value > low_threshold_ && value_ <= low_threshold_) {
+ on_threshold_reached_();
+ }
+}
+
+void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
+ // Betting on https://github.com/w3c/webrtc-pc/issues/2654 being accepted.
+ if (low_threshold_ < value_ && low_threshold >= value_) {
+ on_threshold_reached_();
+ }
+ low_threshold_ = low_threshold;
+}
+
+void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
+ absl::optional<TimeMs> expires_at,
+ const SendOptions& send_options) {
+ buffered_amount_.Increase(message.payload().size());
+ total_buffered_amount_.Increase(message.payload().size());
+ items_.emplace_back(std::move(message), expires_at, send_options);
+
+ RTC_DCHECK(IsConsistent());
+}
+
+absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
+ TimeMs now,
+ size_t max_size) {
+ RTC_DCHECK(!items_.empty());
+
+ Item* item = &items_.front();
+ DcSctpMessage& message = item->message;
+
+ if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
+ RTC_DCHECK(IsConsistent());
+ return absl::nullopt;
+ }
+
+ // Allocate Message ID and SSN when the first fragment is sent.
+ if (!item->message_id.has_value()) {
+ MID& mid =
+ item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
+ item->message_id = mid;
+ mid = MID(*mid + 1);
+ }
+ if (!item->send_options.unordered && !item->ssn.has_value()) {
+ item->ssn = next_ssn_;
+ next_ssn_ = SSN(*next_ssn_ + 1);
+ }
+
+ // Grab the next `max_size` fragment from this message and calculate flags.
+ rtc::ArrayView<const uint8_t> chunk_payload =
+ item->message.payload().subview(item->remaining_offset, max_size);
+ rtc::ArrayView<const uint8_t> message_payload = message.payload();
+ Data::IsBeginning is_beginning(chunk_payload.data() ==
+ message_payload.data());
+ Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
+ (message_payload.data() + message_payload.size()));
+
+ StreamID stream_id = message.stream_id();
+ PPID ppid = message.ppid();
+
+ // Zero-copy the payload if the message fits in a single chunk.
+ std::vector<uint8_t> payload =
+ is_beginning && is_end
+ ? std::move(message).ReleasePayload()
+ : std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
+
+ FSN fsn(item->current_fsn);
+ item->current_fsn = FSN(*item->current_fsn + 1);
+ buffered_amount_.Decrease(payload.size());
+ total_buffered_amount_.Decrease(payload.size());
+
+ SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)),
+ item->message_id.value(), fsn, ppid,
+ std::move(payload), is_beginning, is_end,
+ item->send_options.unordered));
+ chunk.max_retransmissions = item->send_options.max_retransmissions;
+ chunk.expires_at = item->expires_at;
+
+ if (is_end) {
+ // The entire message has been sent, and its last data copied to `chunk`, so
+ // it can safely be discarded.
+ items_.pop_front();
+ } else {
+ item->remaining_offset += chunk_payload.size();
+ item->remaining_size -= chunk_payload.size();
+ RTC_DCHECK(item->remaining_offset + item->remaining_size ==
+ item->message.payload().size());
+ RTC_DCHECK(item->remaining_size > 0);
+ }
+ RTC_DCHECK(IsConsistent());
+ return chunk;
+}
+
+bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
+ MID message_id) {
+ bool result = false;
+ if (!items_.empty()) {
+ Item& item = items_.front();
+ if (item.send_options.unordered == unordered &&
+ item.message_id.has_value() && *item.message_id == message_id) {
+ buffered_amount_.Decrease(item.remaining_size);
+ total_buffered_amount_.Decrease(item.remaining_size);
+ items_.pop_front();
+ // As the item still existed, it had unsent data.
+ result = true;
+ }
+ }
+ RTC_DCHECK(IsConsistent());
+ return result;
+}
+
+void RRSendQueue::OutgoingStream::Pause() {
+ is_paused_ = true;
+
+ // A stream is paused when it's about to be reset. In this implementation,
+ // it will throw away all non-partially send messages. This is subject to
+ // change. It will however not discard any partially sent messages - only
+ // whole messages. Partially delivered messages (at the time of receiving a
+ // Stream Reset command) will always deliver all the fragments before
+ // actually resetting the stream.
+ for (auto it = items_.begin(); it != items_.end();) {
+ if (it->remaining_offset == 0) {
+ buffered_amount_.Decrease(it->remaining_size);
+ total_buffered_amount_.Decrease(it->remaining_size);
+ it = items_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ RTC_DCHECK(IsConsistent());
+}
+
+void RRSendQueue::OutgoingStream::Reset() {
+ if (!items_.empty()) {
+ // If this message has been partially sent, reset it so that it will be
+ // re-sent.
+ auto& item = items_.front();
+ buffered_amount_.Increase(item.message.payload().size() -
+ item.remaining_size);
+ total_buffered_amount_.Increase(item.message.payload().size() -
+ item.remaining_size);
+ item.remaining_offset = 0;
+ item.remaining_size = item.message.payload().size();
+ item.message_id = absl::nullopt;
+ item.ssn = absl::nullopt;
+ item.current_fsn = FSN(0);
+ }
+ is_paused_ = false;
+ next_ordered_mid_ = MID(0);
+ next_unordered_mid_ = MID(0);
+ next_ssn_ = SSN(0);
+ RTC_DCHECK(IsConsistent());
+}
+
+bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
+ if (items_.empty()) {
+ return false;
+ }
+ return items_.front().message_id.has_value();
+}
+
+void RRSendQueue::Add(TimeMs now,
+ DcSctpMessage message,
+ const SendOptions& send_options) {
+ RTC_DCHECK(!message.payload().empty());
+ // Any limited lifetime should start counting from now - when the message
+ // has been added to the queue.
+ absl::optional<TimeMs> expires_at = absl::nullopt;
+ if (send_options.lifetime.has_value()) {
+ // `expires_at` is the time when it expires. Which is slightly larger than
+ // the message's lifetime, as the message is alive during its entire
+ // lifetime (which may be zero).
+ expires_at = now + *send_options.lifetime + DurationMs(1);
+ }
+ GetOrCreateStreamInfo(message.stream_id())
+ .Add(std::move(message), expires_at, send_options);
+ RTC_DCHECK(IsConsistent());
+}
+
+bool RRSendQueue::IsFull() const {
+ return total_buffered_amount() >= buffer_size_;
+}
+
+bool RRSendQueue::IsEmpty() const {
+ return total_buffered_amount() == 0;
+}
+
+std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
+RRSendQueue::GetNextStream(TimeMs now) {
+ auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
+
+ for (auto it = start_it; it != streams_.end(); ++it) {
+ if (it->second.HasDataToSend(now)) {
+ current_stream_id_ = it->first;
+ return it;
+ }
+ }
+
+ for (auto it = streams_.begin(); it != start_it; ++it) {
+ if (it->second.HasDataToSend(now)) {
+ current_stream_id_ = it->first;
+ return it;
+ }
+ }
+ return streams_.end();
+}
+
+absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
+ size_t max_size) {
+ std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
+
+ if (previous_message_has_ended_) {
+ // Previous message has ended. Round-robin to a different stream, if there
+ // even is one with data to send.
+ stream_it = GetNextStream(now);
+ if (stream_it == streams_.end()) {
+ RTC_DLOG(LS_VERBOSE)
+ << log_prefix_
+ << "There is no stream with data; Can't produce any data.";
+ return absl::nullopt;
+ }
+ } else {
+ // The previous message has not ended; Continue from the current stream.
+ stream_it = streams_.find(current_stream_id_);
+ RTC_DCHECK(stream_it != streams_.end());
+ }
+
+ absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
+ if (data.has_value()) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
+ << (data->data.is_unordered ? "unordered" : "ordered")
+ << "::"
+ << (*data->data.is_beginning && *data->data.is_end
+ ? "complete"
+ : *data->data.is_beginning
+ ? "first"
+ : *data->data.is_end ? "last" : "middle")
+ << ", stream_id=" << *stream_it->first
+ << ", ppid=" << *data->data.ppid
+ << ", length=" << data->data.payload.size();
+
+ previous_message_has_ended_ = *data->data.is_end;
+ }
+
+ RTC_DCHECK(IsConsistent());
+ return data;
+}
+
+bool RRSendQueue::Discard(IsUnordered unordered,
+ StreamID stream_id,
+ MID message_id) {
+ bool has_discarded =
+ GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
+ if (has_discarded) {
+ // Only partially sent messages are discarded, so if a message was
+ // discarded, then it was the currently sent message.
+ previous_message_has_ended_ = true;
+ }
+
+ return has_discarded;
+}
+
+void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
+ for (StreamID stream_id : streams) {
+ GetOrCreateStreamInfo(stream_id).Pause();
+ }
+ RTC_DCHECK(IsConsistent());
+}
+
+bool RRSendQueue::CanResetStreams() const {
+ // Streams can be reset if those streams that are paused don't have any
+ // messages that are partially sent.
+ for (auto& stream : streams_) {
+ if (stream.second.is_paused() &&
+ stream.second.has_partially_sent_message()) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void RRSendQueue::CommitResetStreams() {
+ Reset();
+ RTC_DCHECK(IsConsistent());
+}
+
+void RRSendQueue::RollbackResetStreams() {
+ for (auto& stream_entry : streams_) {
+ stream_entry.second.Resume();
+ }
+ RTC_DCHECK(IsConsistent());
+}
+
+void RRSendQueue::Reset() {
+ // Recalculate buffered amount, as partially sent messages may have been put
+ // fully back in the queue.
+ for (auto& stream_entry : streams_) {
+ OutgoingStream& stream = stream_entry.second;
+ stream.Reset();
+ }
+ previous_message_has_ended_ = true;
+}
+
+size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
+ auto it = streams_.find(stream_id);
+ if (it == streams_.end()) {
+ return 0;
+ }
+ return it->second.buffered_amount().value();
+}
+
+size_t RRSendQueue::buffered_amount_low_threshold(StreamID stream_id) const {
+ auto it = streams_.find(stream_id);
+ if (it == streams_.end()) {
+ return 0;
+ }
+ return it->second.buffered_amount().low_threshold();
+}
+
+void RRSendQueue::SetBufferedAmountLowThreshold(StreamID stream_id,
+ size_t bytes) {
+ GetOrCreateStreamInfo(stream_id).buffered_amount().SetLowThreshold(bytes);
+}
+
+RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
+ StreamID stream_id) {
+ auto it = streams_.find(stream_id);
+ if (it != streams_.end()) {
+ return it->second;
+ }
+
+ return streams_
+ .emplace(stream_id,
+ OutgoingStream(
+ [this, stream_id]() { on_buffered_amount_low_(stream_id); },
+ total_buffered_amount_))
+ .first->second;
+}
+} // namespace dcsctp
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
new file mode 100644
index 0000000000..3ec45af17d
--- /dev/null
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -0,0 +1,238 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_
+#define NET_DCSCTP_TX_RR_SEND_QUEUE_H_
+
+#include <cstdint>
+#include <deque>
+#include <map>
+#include <string>
+#include <utility>
+
+#include "absl/algorithm/container.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/pair_hash.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/public/dcsctp_socket.h"
+#include "net/dcsctp/public/types.h"
+#include "net/dcsctp/tx/send_queue.h"
+
+namespace dcsctp {
+
+// The Round Robin SendQueue holds all messages that the client wants to send,
+// but that haven't yet been split into chunks and fully sent on the wire.
+//
+// As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2,
+// it will cycle to send messages from different streams. It will send all
+// fragments from one message before continuing with a different message on
+// possibly a different stream, until support for message interleaving has been
+// implemented.
+//
+// As messages can be (requested to be) sent before the connection is properly
+// established, this send queue is always present - even for closed connections.
+class RRSendQueue : public SendQueue {
+ public:
+ // How small a data chunk's payload may be, if having to fragment a message.
+ static constexpr size_t kMinimumFragmentedPayload = 10;
+
+ RRSendQueue(absl::string_view log_prefix,
+ size_t buffer_size,
+ std::function<void(StreamID)> on_buffered_amount_low,
+ size_t total_buffered_amount_low_threshold,
+ std::function<void()> on_total_buffered_amount_low)
+ : log_prefix_(std::string(log_prefix) + "fcfs: "),
+ buffer_size_(buffer_size),
+ on_buffered_amount_low_(std::move(on_buffered_amount_low)),
+ total_buffered_amount_(std::move(on_total_buffered_amount_low)) {
+ total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
+ }
+
+ // Indicates if the buffer is full. Note that it's up to the caller to ensure
+ // that the buffer is not full prior to adding new items to it.
+ bool IsFull() const;
+ // Indicates if the buffer is empty.
+ bool IsEmpty() const;
+
+ // Adds the message to be sent using the `send_options` provided. The current
+ // time should be in `now`. Note that it's the responsibility of the caller to
+ // ensure that the buffer is not full (by calling `IsFull`) before adding
+ // messages to it.
+ void Add(TimeMs now,
+ DcSctpMessage message,
+ const SendOptions& send_options = {});
+
+ // Implementation of `SendQueue`.
+ absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
+ bool Discard(IsUnordered unordered,
+ StreamID stream_id,
+ MID message_id) override;
+ void PrepareResetStreams(rtc::ArrayView<const StreamID> streams) override;
+ bool CanResetStreams() const override;
+ void CommitResetStreams() override;
+ void RollbackResetStreams() override;
+ void Reset() override;
+ size_t buffered_amount(StreamID stream_id) const override;
+ size_t total_buffered_amount() const override {
+ return total_buffered_amount_.value();
+ }
+ size_t buffered_amount_low_threshold(StreamID stream_id) const override;
+ void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
+
+ private:
+ // Represents a value and a "low threshold" that when the value reaches or
+ // goes under the "low threshold", will trigger `on_threshold_reached`
+ // callback.
+ class ThresholdWatcher {
+ public:
+ explicit ThresholdWatcher(std::function<void()> on_threshold_reached)
+ : on_threshold_reached_(std::move(on_threshold_reached)) {}
+ // Increases the value.
+ void Increase(size_t bytes) { value_ += bytes; }
+ // Decreases the value and triggers `on_threshold_reached` if it's at or
+ // below `low_threshold()`.
+ void Decrease(size_t bytes);
+
+ size_t value() const { return value_; }
+ size_t low_threshold() const { return low_threshold_; }
+ void SetLowThreshold(size_t low_threshold);
+
+ private:
+ const std::function<void()> on_threshold_reached_;
+ size_t value_ = 0;
+ size_t low_threshold_ = 0;
+ };
+
+ // Per-stream information.
+ class OutgoingStream {
+ public:
+ explicit OutgoingStream(std::function<void()> on_buffered_amount_low,
+ ThresholdWatcher& total_buffered_amount)
+ : buffered_amount_(std::move(on_buffered_amount_low)),
+ total_buffered_amount_(total_buffered_amount) {}
+
+ // Enqueues a message to this stream.
+ void Add(DcSctpMessage message,
+ absl::optional<TimeMs> expires_at,
+ const SendOptions& send_options);
+
+ // Possibly produces a data chunk to send.
+ absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
+
+ const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
+ ThresholdWatcher& buffered_amount() { return buffered_amount_; }
+
+ // Discards a partially sent message, see `SendQueue::Discard`.
+ bool Discard(IsUnordered unordered, MID message_id);
+
+ // Pauses this stream, which is used before resetting it.
+ void Pause();
+
+ // Resumes a paused stream.
+ void Resume() { is_paused_ = false; }
+
+ bool is_paused() const { return is_paused_; }
+
+ // Resets this stream, meaning MIDs and SSNs are set to zero.
+ void Reset();
+
+ // Indicates if this stream has a partially sent message in it.
+ bool has_partially_sent_message() const;
+
+ // Indicates if the stream has data to send. It will also try to remove any
+ // expired non-partially sent message.
+ bool HasDataToSend(TimeMs now);
+
+ private:
+ // An enqueued message and metadata.
+ struct Item {
+ explicit Item(DcSctpMessage msg,
+ absl::optional<TimeMs> expires_at,
+ const SendOptions& send_options)
+ : message(std::move(msg)),
+ expires_at(expires_at),
+ send_options(send_options),
+ remaining_offset(0),
+ remaining_size(message.payload().size()) {}
+ DcSctpMessage message;
+ absl::optional<TimeMs> expires_at;
+ SendOptions send_options;
+ // The remaining payload (offset and size) to be sent, when it has been
+ // fragmented.
+ size_t remaining_offset;
+ size_t remaining_size;
+ // If set, an allocated Message ID and SSN. Will be allocated when the
+ // first fragment is sent.
+ absl::optional<MID> message_id = absl::nullopt;
+ absl::optional<SSN> ssn = absl::nullopt;
+ // The current Fragment Sequence Number, incremented for each fragment.
+ FSN current_fsn = FSN(0);
+ };
+
+ bool IsConsistent() const;
+
+ // Streams are pause when they are about to be reset.
+ bool is_paused_ = false;
+ // MIDs are different for unordered and ordered messages sent on a stream.
+ MID next_unordered_mid_ = MID(0);
+ MID next_ordered_mid_ = MID(0);
+
+ SSN next_ssn_ = SSN(0);
+ // Enqueued messages, and metadata.
+ std::deque<Item> items_;
+
+ // The current amount of buffered data.
+ ThresholdWatcher buffered_amount_;
+
+ // Reference to the total buffered amount, which is updated directly by each
+ // stream.
+ ThresholdWatcher& total_buffered_amount_;
+ };
+
+ bool IsConsistent() const;
+ OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
+ absl::optional<DataToSend> Produce(
+ std::map<StreamID, OutgoingStream>::iterator it,
+ TimeMs now,
+ size_t max_size);
+
+ // Return the next stream, in round-robin fashion.
+ std::map<StreamID, OutgoingStream>::iterator GetNextStream(TimeMs now);
+
+ const std::string log_prefix_;
+ const size_t buffer_size_;
+
+ // Called when the buffered amount is below what has been set using
+ // `SetBufferedAmountLowThreshold`.
+ const std::function<void(StreamID)> on_buffered_amount_low_;
+
+ // Called when the total buffered amount is below what has been set using
+ // `SetTotalBufferedAmountLowThreshold`.
+ const std::function<void()> on_total_buffered_amount_low_;
+
+ // The total amount of buffer data, for all streams.
+ ThresholdWatcher total_buffered_amount_;
+
+ // Indicates if the previous fragment sent was the end of a message. For
+ // non-interleaved sending, this means that the next message may come from a
+ // different stream. If not true, the next fragment must be produced from the
+ // same stream as last time.
+ bool previous_message_has_ended_ = true;
+
+ // The current stream to send chunks from. Modified by `GetNextStream`.
+ StreamID current_stream_id_ = StreamID(0);
+
+ // All streams, and messages added to those.
+ std::map<StreamID, OutgoingStream> streams_;
+};
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
new file mode 100644
index 0000000000..682c16af0b
--- /dev/null
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -0,0 +1,742 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/rr_send_queue.h"
+
+#include <cstdint>
+#include <type_traits>
+#include <vector>
+
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/public/dcsctp_options.h"
+#include "net/dcsctp/public/dcsctp_socket.h"
+#include "net/dcsctp/public/types.h"
+#include "net/dcsctp/testing/testing_macros.h"
+#include "net/dcsctp/tx/send_queue.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+using ::testing::SizeIs;
+
+constexpr TimeMs kNow = TimeMs(0);
+constexpr StreamID kStreamID(1);
+constexpr PPID kPPID(53);
+constexpr size_t kMaxQueueSize = 1000;
+constexpr size_t kBufferedAmountLowThreshold = 500;
+constexpr size_t kOneFragmentPacketSize = 100;
+constexpr size_t kTwoFragmentPacketSize = 101;
+
+class RRSendQueueTest : public testing::Test {
+ protected:
+ RRSendQueueTest()
+ : buf_("log: ",
+ kMaxQueueSize,
+ on_buffered_amount_low_.AsStdFunction(),
+ kBufferedAmountLowThreshold,
+ on_total_buffered_amount_low_.AsStdFunction()) {}
+
+ const DcSctpOptions options_;
+ testing::NiceMock<testing::MockFunction<void(StreamID)>>
+ on_buffered_amount_low_;
+ testing::NiceMock<testing::MockFunction<void()>>
+ on_total_buffered_amount_low_;
+ RRSendQueue buf_;
+};
+
+TEST_F(RRSendQueueTest, EmptyBuffer) {
+ EXPECT_TRUE(buf_.IsEmpty());
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+ EXPECT_FALSE(buf_.IsFull());
+}
+
+TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
+
+ EXPECT_FALSE(buf_.IsEmpty());
+ EXPECT_FALSE(buf_.IsFull());
+ absl::optional<SendQueue::DataToSend> chunk_opt =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_opt.has_value());
+ EXPECT_TRUE(chunk_opt->data.is_beginning);
+ EXPECT_TRUE(chunk_opt->data.is_end);
+}
+
+TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) {
+ std::vector<uint8_t> payload(60);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_beg =
+ buf_.Produce(kNow, /*max_size=*/20);
+ ASSERT_TRUE(chunk_beg.has_value());
+ EXPECT_TRUE(chunk_beg->data.is_beginning);
+ EXPECT_FALSE(chunk_beg->data.is_end);
+
+ absl::optional<SendQueue::DataToSend> chunk_mid =
+ buf_.Produce(kNow, /*max_size=*/20);
+ ASSERT_TRUE(chunk_mid.has_value());
+ EXPECT_FALSE(chunk_mid->data.is_beginning);
+ EXPECT_FALSE(chunk_mid->data.is_end);
+
+ absl::optional<SendQueue::DataToSend> chunk_end =
+ buf_.Produce(kNow, /*max_size=*/20);
+ ASSERT_TRUE(chunk_end.has_value());
+ EXPECT_FALSE(chunk_end->data.is_beginning);
+ EXPECT_TRUE(chunk_end->data.is_end);
+
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+}
+
+TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
+ std::vector<uint8_t> payload(60);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(chunk_one->data.ppid, kPPID);
+ EXPECT_TRUE(chunk_one->data.is_beginning);
+ EXPECT_TRUE(chunk_one->data.is_end);
+
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
+ EXPECT_EQ(chunk_two->data.ppid, PPID(54));
+ EXPECT_TRUE(chunk_two->data.is_beginning);
+ EXPECT_TRUE(chunk_two->data.is_end);
+}
+
+TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
+ std::vector<uint8_t> payload(600);
+ EXPECT_FALSE(buf_.IsFull());
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ EXPECT_FALSE(buf_.IsFull());
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
+ EXPECT_TRUE(buf_.IsFull());
+ // However, it's still possible to add messages. It's a soft limit, and it
+ // might be necessary to forcefully add messages due to e.g. external
+ // fragmentation.
+ buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
+ EXPECT_TRUE(buf_.IsFull());
+
+ absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(chunk_one->data.ppid, kPPID);
+
+ EXPECT_TRUE(buf_.IsFull());
+
+ absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
+ EXPECT_EQ(chunk_two->data.ppid, PPID(54));
+
+ EXPECT_FALSE(buf_.IsFull());
+ EXPECT_FALSE(buf_.IsEmpty());
+
+ absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
+ ASSERT_TRUE(chunk_three.has_value());
+ EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
+ EXPECT_EQ(chunk_three->data.ppid, PPID(55));
+
+ EXPECT_FALSE(buf_.IsFull());
+ EXPECT_TRUE(buf_.IsEmpty());
+}
+
+TEST_F(RRSendQueueTest, WillNotSendTooSmallPacket) {
+ std::vector<uint8_t> payload(RRSendQueue::kMinimumFragmentedPayload + 1);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ // Wouldn't fit enough payload (wouldn't want to fragment)
+ EXPECT_FALSE(
+ buf_.Produce(kNow,
+ /*max_size=*/RRSendQueue::kMinimumFragmentedPayload - 1)
+ .has_value());
+
+ // Minimum fragment
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow,
+ /*max_size=*/RRSendQueue::kMinimumFragmentedPayload);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(chunk_one->data.ppid, kPPID);
+
+ // There is only one byte remaining - it can be fetched as it doesn't require
+ // additional fragmentation.
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, /*max_size=*/1);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.stream_id, kStreamID);
+ EXPECT_EQ(chunk_two->data.ppid, kPPID);
+
+ EXPECT_TRUE(buf_.IsEmpty());
+}
+
+TEST_F(RRSendQueueTest, DefaultsToOrderedSend) {
+ std::vector<uint8_t> payload(20);
+
+ // Default is ordered
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_FALSE(chunk_one->data.is_unordered);
+
+ // Explicitly unordered.
+ SendOptions opts;
+ opts.unordered = IsUnordered(true);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_TRUE(chunk_two->data.is_unordered);
+}
+
+TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
+ std::vector<uint8_t> payload(20);
+
+ // Default is no expiry
+ TimeMs now = kNow;
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
+ now += DurationMs(1000000);
+ ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
+
+ SendOptions expires_2_seconds;
+ expires_2_seconds.lifetime = DurationMs(2000);
+
+ // Add and consume within lifetime
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
+ now += DurationMs(2000);
+ ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
+
+ // Add and consume just outside lifetime
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
+ now += DurationMs(2001);
+ ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
+
+ // A long time after expiry
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
+ now += DurationMs(1000000);
+ ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
+
+ // Expire one message, but produce the second that is not expired.
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
+
+ SendOptions expires_4_seconds;
+ expires_4_seconds.lifetime = DurationMs(4000);
+
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
+ now += DurationMs(2001);
+
+ ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
+ ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
+}
+
+TEST_F(RRSendQueueTest, DiscardPartialPackets) {
+ std::vector<uint8_t> payload(120);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_FALSE(chunk_one->data.is_end);
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
+ chunk_one->data.message_id);
+
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_FALSE(chunk_two->data.is_end);
+ EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
+
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_three.has_value());
+ EXPECT_TRUE(chunk_three->data.is_end);
+ EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
+ ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
+
+ // Calling it again shouldn't cause issues.
+ buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
+ chunk_one->data.message_id);
+ ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
+}
+
+TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3}));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5}));
+ EXPECT_EQ(buf_.total_buffered_amount(), 8u);
+
+ buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
+ EXPECT_EQ(buf_.total_buffered_amount(), 5u);
+ buf_.CommitResetStreams();
+ buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(2)}));
+ EXPECT_EQ(buf_.total_buffered_amount(), 0u);
+}
+
+TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) {
+ std::vector<uint8_t> payload(120);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
+
+ StreamID stream_ids[] = {StreamID(1)};
+ buf_.PrepareResetStreams(stream_ids);
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
+}
+
+TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
+ std::vector<uint8_t> payload(50);
+
+ buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
+ EXPECT_EQ(buf_.total_buffered_amount(), 0u);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
+
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+ buf_.CommitResetStreams();
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
+
+ absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(buf_.total_buffered_amount(), 0u);
+}
+
+TEST_F(RRSendQueueTest, CommittingResetsSSN) {
+ std::vector<uint8_t> payload(50);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.ssn, SSN(0));
+
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.ssn, SSN(1));
+
+ StreamID stream_ids[] = {StreamID(1)};
+ buf_.PrepareResetStreams(stream_ids);
+
+ // Buffered
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ EXPECT_TRUE(buf_.CanResetStreams());
+ buf_.CommitResetStreams();
+
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_three.has_value());
+ EXPECT_EQ(chunk_three->data.ssn, SSN(0));
+}
+
+TEST_F(RRSendQueueTest, RollBackResumesSSN) {
+ std::vector<uint8_t> payload(50);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.ssn, SSN(0));
+
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.ssn, SSN(1));
+
+ buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
+
+ // Buffered
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ EXPECT_TRUE(buf_.CanResetStreams());
+ buf_.RollbackResetStreams();
+
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_three.has_value());
+ EXPECT_EQ(chunk_three->data.ssn, SSN(2));
+}
+
+TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) {
+ std::vector<uint8_t> payload(200);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
+}
+
+TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) {
+ std::vector<uint8_t> payload(kTwoFragmentPacketSize);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload,
+ SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk4.data.payload,
+ SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
+}
+
+TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) {
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(2)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(3)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(4)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(5)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(6)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(7)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(8)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(3));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(3));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(5));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(4));
+ EXPECT_THAT(chunk4.data.payload, SizeIs(7));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk5.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk5.data.payload, SizeIs(2));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk6.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk6.data.payload, SizeIs(4));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk7.data.stream_id, StreamID(3));
+ EXPECT_THAT(chunk7.data.payload, SizeIs(6));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk8.data.stream_id, StreamID(4));
+ EXPECT_THAT(chunk8.data.payload, SizeIs(8));
+}
+
+TEST_F(RRSendQueueTest, DoesntTriggerOnBufferedAmountLowWhenSetToZero) {
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 0u);
+}
+
+TEST_F(RRSendQueueTest, TriggersOnBufferedAmountAtZeroLowWhenSent) {
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
+
+ EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
+}
+
+TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowIfAddingMore) {
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+
+ EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
+
+ // Should now trigger again, as buffer_amount went above the threshold.
+ EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(1));
+}
+
+TEST_F(RRSendQueueTest, OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual) {
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 1000);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(10)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 10u);
+
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(10));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(20)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 20u);
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(20));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
+}
+
+TEST_F(RRSendQueueTest, WillTriggerOnBufferedAmountLowSetAboveZero) {
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
+
+ std::vector<uint8_t> payload(1000);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 900u);
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
+
+ EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 700u);
+
+ // Doesn't trigger when reducing even further.
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
+}
+
+TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowSetAboveZero) {
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1000)));
+
+ EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, 400));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(400));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
+
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(200)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
+
+ // Will trigger again, as it went above the limit.
+ EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, 200));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(200));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
+}
+
+TEST_F(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) {
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(100)));
+
+ // Modifying the threshold, still under buffered_amount, should not trigger.
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 99);
+
+ // When the threshold reaches buffered_amount, it will trigger.
+ EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 100);
+
+ // But not when it's set low again.
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
+
+ // But it will trigger when it overshoots.
+ EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 150);
+
+ // But not when it's set low again.
+ EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 0);
+}
+
+TEST_F(RRSendQueueTest,
+ OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp) {
+ EXPECT_CALL(on_total_buffered_amount_low_, Call).Times(0);
+ std::vector<uint8_t> payload(kBufferedAmountLowThreshold - 1);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
+
+ // Will not trigger if going above but never below.
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID,
+ std::vector<uint8_t>(kOneFragmentPacketSize)));
+}
+
+TEST_F(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) {
+ EXPECT_CALL(on_total_buffered_amount_low_, Call).Times(0);
+ std::vector<uint8_t> payload(kBufferedAmountLowThreshold);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
+
+ // Reaches it.
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, std::vector<uint8_t>(1)));
+
+ // Drain it a bit - will trigger.
+ EXPECT_CALL(on_total_buffered_amount_low_, Call).Times(1);
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+}
+
+TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) {
+ buf_.Add(kNow, DcSctpMessage(StreamID(5), kPPID, std::vector<uint8_t>(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(5));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+
+ // Next, it should pick a different stream.
+
+ buf_.Add(kNow,
+ DcSctpMessage(StreamID(1), kPPID,
+ std::vector<uint8_t>(kOneFragmentPacketSize * 2)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ // It should still stay on the Stream1 now, even if might be tempted to switch
+ // to this stream, as it's the stream following 5.
+ buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector<uint8_t>(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ // After stream id 1 is complete, it's time to do stream 6.
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(6));
+ EXPECT_THAT(chunk4.data.payload, SizeIs(1));
+
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+}
+
+TEST_F(RRSendQueueTest, WillStayInStreamWhenOnlySmallFragmentRemaining) {
+ buf_.Add(kNow,
+ DcSctpMessage(StreamID(5), kPPID,
+ std::vector<uint8_t>(kOneFragmentPacketSize * 2)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector<uint8_t>(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(5));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ // Now assume that there will be a lot of previous chunks that need to be
+ // retransmitted, which fills up the next packet and there is little space
+ // left in the packet for new chunks. What it should NOT do right now is to
+ // try to send a message from StreamID 6. And it should not try to send a very
+ // small fragment from StreamID 5 either. So just skip this one.
+ EXPECT_FALSE(buf_.Produce(kNow, 8).has_value());
+
+ // When the next produce request comes with a large buffer to fill, continue
+ // sending from StreamID 5.
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(5));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ // Lastly, produce a message on StreamID 6.
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(6));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(1));
+
+ EXPECT_FALSE(buf_.Produce(kNow, 8).has_value());
+}
+} // namespace
+} // namespace dcsctp
diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h
new file mode 100644
index 0000000000..877dbdda59
--- /dev/null
+++ b/net/dcsctp/tx/send_queue.h
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_TX_SEND_QUEUE_H_
+#define NET_DCSCTP_TX_SEND_QUEUE_H_
+
+#include <cstdint>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/internal_types.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/types.h"
+
+namespace dcsctp {
+
+class SendQueue {
+ public:
+ // Container for a data chunk that is produced by the SendQueue
+ struct DataToSend {
+ explicit DataToSend(Data data) : data(std::move(data)) {}
+ // The data to send, including all parameters.
+ Data data;
+
+ // Partial reliability - RFC3758
+ absl::optional<int> max_retransmissions;
+ absl::optional<TimeMs> expires_at;
+ };
+
+ virtual ~SendQueue() = default;
+
+ // TODO(boivie): This interface is obviously missing an "Add" function, but
+ // that is postponed a bit until the story around how to model message
+ // prioritization, which is important for any advanced stream scheduler, is
+ // further clarified.
+
+ // Produce a chunk to be sent.
+ //
+ // `max_size` refers to how many payload bytes that may be produced, not
+ // including any headers.
+ virtual absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) = 0;
+
+ // Discards a partially sent message identified by the parameters `unordered`,
+ // `stream_id` and `message_id`. The `message_id` comes from the returned
+ // information when having called `Produce`. A partially sent message means
+ // that it has had at least one fragment of it returned when `Produce` was
+ // called prior to calling this method).
+ //
+ // This is used when a message has been found to be expired (by the partial
+ // reliability extension), and the retransmission queue will signal the
+ // receiver that any partially received message fragments should be skipped.
+ // This means that any remaining fragments in the Send Queue must be removed
+ // as well so that they are not sent.
+ //
+ // This function returns true if this message had unsent fragments still in
+ // the queue that were discarded, and false if there were no such fragments.
+ virtual bool Discard(IsUnordered unordered,
+ StreamID stream_id,
+ MID message_id) = 0;
+
+ // Prepares the streams to be reset. This is used to close a WebRTC data
+ // channel and will be signaled to the other side.
+ //
+ // Concretely, it discards all whole (not partly sent) messages in the given
+ // streams and pauses those streams so that future added messages aren't
+ // produced until `ResumeStreams` is called.
+ //
+ // TODO(boivie): Investigate if it really should discard any message at all.
+ // RFC8831 only mentions that "[RFC6525] also guarantees that all the messages
+ // are delivered (or abandoned) before the stream is reset."
+ //
+ // This method can be called multiple times to add more streams to be
+ // reset, and paused while they are resetting. This is the first part of the
+ // two-phase commit protocol to reset streams, where the caller completes the
+ // procedure by either calling `CommitResetStreams` or `RollbackResetStreams`.
+ virtual void PrepareResetStreams(rtc::ArrayView<const StreamID> streams) = 0;
+
+ // Returns true if all non-discarded messages during `PrepareResetStreams`
+ // (which are those that was partially sent before that method was called)
+ // have been sent.
+ virtual bool CanResetStreams() const = 0;
+
+ // Called to commit to reset the streams provided to `PrepareResetStreams`.
+ // It will reset the stream sequence numbers (SSNs) and message identifiers
+ // (MIDs) and resume the paused streams.
+ virtual void CommitResetStreams() = 0;
+
+ // Called to abort the resetting of streams provided to `PrepareResetStreams`.
+ // Will resume the paused streams without resetting the stream sequence
+ // numbers (SSNs) or message identifiers (MIDs). Note that the non-partial
+ // messages that were discarded when calling `PrepareResetStreams` will not be
+ // recovered, to better match the intention from the sender to "close the
+ // channel".
+ virtual void RollbackResetStreams() = 0;
+
+ // Resets all message identifier counters (MID, SSN) and makes all partially
+ // messages be ready to be re-sent in full. This is used when the peer has
+ // been detected to have restarted and is used to try to minimize the amount
+ // of data loss. However, data loss cannot be completely guaranteed when a
+ // peer restarts.
+ virtual void Reset() = 0;
+
+ // Returns the amount of buffered data. This doesn't include packets that are
+ // e.g. inflight.
+ virtual size_t buffered_amount(StreamID stream_id) const = 0;
+
+ // Returns the total amount of buffer data, for all streams.
+ virtual size_t total_buffered_amount() const = 0;
+
+ // Returns the limit for the `OnBufferedAmountLow` event. Default value is 0.
+ virtual size_t buffered_amount_low_threshold(StreamID stream_id) const = 0;
+
+ // Sets a limit for the `OnBufferedAmountLow` event.
+ virtual void SetBufferedAmountLowThreshold(StreamID stream_id,
+ size_t bytes) = 0;
+};
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_TX_SEND_QUEUE_H_