diff options
Diffstat (limited to 'net/dcsctp/rx/reassembly_queue.cc')
-rw-r--r-- | net/dcsctp/rx/reassembly_queue.cc | 158 |
1 files changed, 71 insertions, 87 deletions
diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc index b5ab087f68..ae672731c0 100644 --- a/net/dcsctp/rx/reassembly_queue.cc +++ b/net/dcsctp/rx/reassembly_queue.cc @@ -23,16 +23,17 @@ #include "absl/types/optional.h" #include "api/array_view.h" #include "net/dcsctp/common/sequence_numbers.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/chunk/forward_tsn_common.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h" #include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/types.h" #include "net/dcsctp/rx/interleaved_reassembly_streams.h" #include "net/dcsctp/rx/reassembly_streams.h" #include "net/dcsctp/rx/traditional_reassembly_streams.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" namespace dcsctp { namespace { @@ -68,10 +69,9 @@ ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix, use_message_interleaving)) {} void ReassemblyQueue::Add(TSN tsn, Data data) { - RTC_DCHECK(IsConsistent()); RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn - << ", stream=" << *data.stream_id << ":" - << *data.message_id << ":" << *data.fsn << ", type=" + << ", stream=" << *data.stream_id << ":" << *data.mid + << ":" << *data.fsn << ", type=" << (data.is_beginning && data.is_end ? "complete" : data.is_beginning ? "first" : data.is_end ? "last" @@ -83,21 +83,23 @@ void ReassemblyQueue::Add(TSN tsn, Data data) { // the future, the socket is in "deferred reset processing" mode and must // buffer chunks until it's exited. if (deferred_reset_streams_.has_value() && - unwrapped_tsn > - tsn_unwrapper_.Unwrap( - deferred_reset_streams_->req.sender_last_assigned_tsn())) { + unwrapped_tsn > deferred_reset_streams_->sender_last_assigned_tsn && + deferred_reset_streams_->streams.contains(data.stream_id)) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Deferring chunk with tsn=" << *tsn - << " until cum_ack_tsn=" - << *deferred_reset_streams_->req.sender_last_assigned_tsn(); + << ", sid=" << *data.stream_id << " until tsn=" + << *deferred_reset_streams_->sender_last_assigned_tsn.Wrap(); // https://tools.ietf.org/html/rfc6525#section-5.2.2 // "In this mode, any data arriving with a TSN larger than the // Sender's Last Assigned TSN for the affected stream(s) MUST be queued // locally and held until the cumulative acknowledgment point reaches the // Sender's Last Assigned TSN." queued_bytes_ += data.size(); - deferred_reset_streams_->deferred_chunks.emplace_back( - std::make_pair(tsn, std::move(data))); + deferred_reset_streams_->deferred_actions.push_back( + [this, tsn, data = std::move(data)]() mutable { + queued_bytes_ -= data.size(); + Add(tsn, std::move(data)); + }); } else { queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data)); } @@ -113,83 +115,51 @@ void ReassemblyQueue::Add(TSN tsn, Data data) { RTC_DCHECK(IsConsistent()); } -ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams( - const OutgoingSSNResetRequestParameter& req, - TSN cum_tsn_ack) { - RTC_DCHECK(IsConsistent()); - if (deferred_reset_streams_.has_value()) { - // In deferred mode already. - return ReconfigurationResponseParameter::Result::kInProgress; - } else if (req.request_sequence_number() <= - last_completed_reset_req_seq_nbr_) { - // Already performed at some time previously. - return ReconfigurationResponseParameter::Result::kSuccessPerformed; - } - - UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn()); - UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack); - - // https://tools.ietf.org/html/rfc6525#section-5.2.2 - // "If the Sender's Last Assigned TSN is greater than the - // cumulative acknowledgment point, then the endpoint MUST enter "deferred - // reset processing"." - if (sla_tsn > unwrapped_cum_tsn_ack) { - RTC_DLOG(LS_VERBOSE) - << log_prefix_ - << "Entering deferred reset processing mode until cum_tsn_ack=" - << *req.sender_last_assigned_tsn(); - deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req); - return ReconfigurationResponseParameter::Result::kInProgress; - } +void ReassemblyQueue::ResetStreamsAndLeaveDeferredReset( + rtc::ArrayView<const StreamID> stream_ids) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Resetting streams: [" + << StrJoin(stream_ids, ",", + [](rtc::StringBuilder& sb, StreamID sid) { + sb << *sid; + }) + << "]"; // https://tools.ietf.org/html/rfc6525#section-5.2.2 // "... streams MUST be reset to 0 as the next expected SSN." - streams_->ResetStreams(req.stream_ids()); - last_completed_reset_req_seq_nbr_ = req.request_sequence_number(); - RTC_DCHECK(IsConsistent()); - return ReconfigurationResponseParameter::Result::kSuccessPerformed; -} + streams_->ResetStreams(stream_ids); -bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) { - RTC_DCHECK(IsConsistent()); if (deferred_reset_streams_.has_value()) { - UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn); - UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap( - deferred_reset_streams_->req.sender_last_assigned_tsn()); - if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) { - RTC_DLOG(LS_VERBOSE) << log_prefix_ - << "Leaving deferred reset processing with tsn=" - << *cum_ack_tsn << ", feeding back " - << deferred_reset_streams_->deferred_chunks.size() - << " chunks"; - // https://tools.ietf.org/html/rfc6525#section-5.2.2 - // "... streams MUST be reset to 0 as the next expected SSN." - streams_->ResetStreams(deferred_reset_streams_->req.stream_ids()); - std::vector<std::pair<TSN, Data>> deferred_chunks = - std::move(deferred_reset_streams_->deferred_chunks); - // The response will not be sent now, but as a reply to the retried - // request, which will come as "in progress" has been sent prior. - last_completed_reset_req_seq_nbr_ = - deferred_reset_streams_->req.request_sequence_number(); - deferred_reset_streams_ = absl::nullopt; - - // https://tools.ietf.org/html/rfc6525#section-5.2.2 - // "Any queued TSNs (queued at step E2) MUST now be released and processed - // normally." - for (auto& [tsn, data] : deferred_chunks) { - queued_bytes_ -= data.size(); - Add(tsn, std::move(data)); - } - - RTC_DCHECK(IsConsistent()); - return true; - } else { - RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn=" - << *cum_ack_tsn; + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Leaving deferred reset processing, feeding back " + << deferred_reset_streams_->deferred_actions.size() + << " actions"; + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "Any queued TSNs (queued at step E2) MUST now be released and processed + // normally." + auto deferred_actions = + std::move(deferred_reset_streams_->deferred_actions); + deferred_reset_streams_ = absl::nullopt; + + for (auto& action : deferred_actions) { + action(); } } - return false; + RTC_DCHECK(IsConsistent()); +} + +void ReassemblyQueue::EnterDeferredReset( + TSN sender_last_assigned_tsn, + rtc::ArrayView<const StreamID> streams) { + if (!deferred_reset_streams_.has_value()) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Entering deferred reset; sender_last_assigned_tsn=" + << *sender_last_assigned_tsn; + deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>( + tsn_unwrapper_.Unwrap(sender_last_assigned_tsn), + webrtc::flat_set<StreamID>(streams.begin(), streams.end())); + } + RTC_DCHECK(IsConsistent()); } std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() { @@ -237,18 +207,32 @@ void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() { } } -void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) { - RTC_DCHECK(IsConsistent()); - UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn()); +void ReassemblyQueue::HandleForwardTsn( + TSN new_cumulative_tsn, + rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) { + UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(new_cumulative_tsn); + if (deferred_reset_streams_.has_value() && + tsn > deferred_reset_streams_->sender_last_assigned_tsn) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap() + << "- deferring."; + deferred_reset_streams_->deferred_actions.emplace_back( + [this, new_cumulative_tsn, + streams = std::vector<AnyForwardTsnChunk::SkippedStream>( + skipped_streams.begin(), skipped_streams.end())] { + HandleForwardTsn(new_cumulative_tsn, streams); + }); + RTC_DCHECK(IsConsistent()); + return; + } + + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap() + << " - performing."; last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn); delivered_tsns_.erase(delivered_tsns_.begin(), delivered_tsns_.upper_bound(tsn)); - MaybeMoveLastAssembledWatermarkFurther(); - - queued_bytes_ -= - streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams()); + queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams); RTC_DCHECK(IsConsistent()); } |