aboutsummaryrefslogtreecommitdiff
path: root/net/dcsctp/rx/reassembly_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'net/dcsctp/rx/reassembly_queue.cc')
-rw-r--r--net/dcsctp/rx/reassembly_queue.cc158
1 files changed, 71 insertions, 87 deletions
diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc
index b5ab087f68..ae672731c0 100644
--- a/net/dcsctp/rx/reassembly_queue.cc
+++ b/net/dcsctp/rx/reassembly_queue.cc
@@ -23,16 +23,17 @@
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
-#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/public/types.h"
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
#include "net/dcsctp/rx/reassembly_streams.h"
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
#include "rtc_base/logging.h"
+#include "rtc_base/strings/str_join.h"
namespace dcsctp {
namespace {
@@ -68,10 +69,9 @@ ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
use_message_interleaving)) {}
void ReassemblyQueue::Add(TSN tsn, Data data) {
- RTC_DCHECK(IsConsistent());
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
- << ", stream=" << *data.stream_id << ":"
- << *data.message_id << ":" << *data.fsn << ", type="
+ << ", stream=" << *data.stream_id << ":" << *data.mid
+ << ":" << *data.fsn << ", type="
<< (data.is_beginning && data.is_end ? "complete"
: data.is_beginning ? "first"
: data.is_end ? "last"
@@ -83,21 +83,23 @@ void ReassemblyQueue::Add(TSN tsn, Data data) {
// the future, the socket is in "deferred reset processing" mode and must
// buffer chunks until it's exited.
if (deferred_reset_streams_.has_value() &&
- unwrapped_tsn >
- tsn_unwrapper_.Unwrap(
- deferred_reset_streams_->req.sender_last_assigned_tsn())) {
+ unwrapped_tsn > deferred_reset_streams_->sender_last_assigned_tsn &&
+ deferred_reset_streams_->streams.contains(data.stream_id)) {
RTC_DLOG(LS_VERBOSE)
<< log_prefix_ << "Deferring chunk with tsn=" << *tsn
- << " until cum_ack_tsn="
- << *deferred_reset_streams_->req.sender_last_assigned_tsn();
+ << ", sid=" << *data.stream_id << " until tsn="
+ << *deferred_reset_streams_->sender_last_assigned_tsn.Wrap();
// https://tools.ietf.org/html/rfc6525#section-5.2.2
// "In this mode, any data arriving with a TSN larger than the
// Sender's Last Assigned TSN for the affected stream(s) MUST be queued
// locally and held until the cumulative acknowledgment point reaches the
// Sender's Last Assigned TSN."
queued_bytes_ += data.size();
- deferred_reset_streams_->deferred_chunks.emplace_back(
- std::make_pair(tsn, std::move(data)));
+ deferred_reset_streams_->deferred_actions.push_back(
+ [this, tsn, data = std::move(data)]() mutable {
+ queued_bytes_ -= data.size();
+ Add(tsn, std::move(data));
+ });
} else {
queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
}
@@ -113,83 +115,51 @@ void ReassemblyQueue::Add(TSN tsn, Data data) {
RTC_DCHECK(IsConsistent());
}
-ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams(
- const OutgoingSSNResetRequestParameter& req,
- TSN cum_tsn_ack) {
- RTC_DCHECK(IsConsistent());
- if (deferred_reset_streams_.has_value()) {
- // In deferred mode already.
- return ReconfigurationResponseParameter::Result::kInProgress;
- } else if (req.request_sequence_number() <=
- last_completed_reset_req_seq_nbr_) {
- // Already performed at some time previously.
- return ReconfigurationResponseParameter::Result::kSuccessPerformed;
- }
-
- UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn());
- UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack);
-
- // https://tools.ietf.org/html/rfc6525#section-5.2.2
- // "If the Sender's Last Assigned TSN is greater than the
- // cumulative acknowledgment point, then the endpoint MUST enter "deferred
- // reset processing"."
- if (sla_tsn > unwrapped_cum_tsn_ack) {
- RTC_DLOG(LS_VERBOSE)
- << log_prefix_
- << "Entering deferred reset processing mode until cum_tsn_ack="
- << *req.sender_last_assigned_tsn();
- deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req);
- return ReconfigurationResponseParameter::Result::kInProgress;
- }
+void ReassemblyQueue::ResetStreamsAndLeaveDeferredReset(
+ rtc::ArrayView<const StreamID> stream_ids) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Resetting streams: ["
+ << StrJoin(stream_ids, ",",
+ [](rtc::StringBuilder& sb, StreamID sid) {
+ sb << *sid;
+ })
+ << "]";
// https://tools.ietf.org/html/rfc6525#section-5.2.2
// "... streams MUST be reset to 0 as the next expected SSN."
- streams_->ResetStreams(req.stream_ids());
- last_completed_reset_req_seq_nbr_ = req.request_sequence_number();
- RTC_DCHECK(IsConsistent());
- return ReconfigurationResponseParameter::Result::kSuccessPerformed;
-}
+ streams_->ResetStreams(stream_ids);
-bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) {
- RTC_DCHECK(IsConsistent());
if (deferred_reset_streams_.has_value()) {
- UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn);
- UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap(
- deferred_reset_streams_->req.sender_last_assigned_tsn());
- if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) {
- RTC_DLOG(LS_VERBOSE) << log_prefix_
- << "Leaving deferred reset processing with tsn="
- << *cum_ack_tsn << ", feeding back "
- << deferred_reset_streams_->deferred_chunks.size()
- << " chunks";
- // https://tools.ietf.org/html/rfc6525#section-5.2.2
- // "... streams MUST be reset to 0 as the next expected SSN."
- streams_->ResetStreams(deferred_reset_streams_->req.stream_ids());
- std::vector<std::pair<TSN, Data>> deferred_chunks =
- std::move(deferred_reset_streams_->deferred_chunks);
- // The response will not be sent now, but as a reply to the retried
- // request, which will come as "in progress" has been sent prior.
- last_completed_reset_req_seq_nbr_ =
- deferred_reset_streams_->req.request_sequence_number();
- deferred_reset_streams_ = absl::nullopt;
-
- // https://tools.ietf.org/html/rfc6525#section-5.2.2
- // "Any queued TSNs (queued at step E2) MUST now be released and processed
- // normally."
- for (auto& [tsn, data] : deferred_chunks) {
- queued_bytes_ -= data.size();
- Add(tsn, std::move(data));
- }
-
- RTC_DCHECK(IsConsistent());
- return true;
- } else {
- RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn="
- << *cum_ack_tsn;
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Leaving deferred reset processing, feeding back "
+ << deferred_reset_streams_->deferred_actions.size()
+ << " actions";
+ // https://tools.ietf.org/html/rfc6525#section-5.2.2
+ // "Any queued TSNs (queued at step E2) MUST now be released and processed
+ // normally."
+ auto deferred_actions =
+ std::move(deferred_reset_streams_->deferred_actions);
+ deferred_reset_streams_ = absl::nullopt;
+
+ for (auto& action : deferred_actions) {
+ action();
}
}
- return false;
+ RTC_DCHECK(IsConsistent());
+}
+
+void ReassemblyQueue::EnterDeferredReset(
+ TSN sender_last_assigned_tsn,
+ rtc::ArrayView<const StreamID> streams) {
+ if (!deferred_reset_streams_.has_value()) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Entering deferred reset; sender_last_assigned_tsn="
+ << *sender_last_assigned_tsn;
+ deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(
+ tsn_unwrapper_.Unwrap(sender_last_assigned_tsn),
+ webrtc::flat_set<StreamID>(streams.begin(), streams.end()));
+ }
+ RTC_DCHECK(IsConsistent());
}
std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
@@ -237,18 +207,32 @@ void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
}
}
-void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) {
- RTC_DCHECK(IsConsistent());
- UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn());
+void ReassemblyQueue::HandleForwardTsn(
+ TSN new_cumulative_tsn,
+ rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
+ UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(new_cumulative_tsn);
+ if (deferred_reset_streams_.has_value() &&
+ tsn > deferred_reset_streams_->sender_last_assigned_tsn) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
+ << "- deferring.";
+ deferred_reset_streams_->deferred_actions.emplace_back(
+ [this, new_cumulative_tsn,
+ streams = std::vector<AnyForwardTsnChunk::SkippedStream>(
+ skipped_streams.begin(), skipped_streams.end())] {
+ HandleForwardTsn(new_cumulative_tsn, streams);
+ });
+ RTC_DCHECK(IsConsistent());
+ return;
+ }
+
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
+ << " - performing.";
last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
delivered_tsns_.erase(delivered_tsns_.begin(),
delivered_tsns_.upper_bound(tsn));
-
MaybeMoveLastAssembledWatermarkFurther();
-
- queued_bytes_ -=
- streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams());
+ queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams);
RTC_DCHECK(IsConsistent());
}