diff options
Diffstat (limited to 'net/dcsctp/tx/rr_send_queue.cc')
-rw-r--r-- | net/dcsctp/tx/rr_send_queue.cc | 432 |
1 files changed, 432 insertions, 0 deletions
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc new file mode 100644 index 0000000000..4bfbaf718b --- /dev/null +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -0,0 +1,432 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/rr_send_queue.h" + +#include <cstdint> +#include <deque> +#include <map> +#include <utility> +#include <vector> + +#include "absl/algorithm/container.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/logging.h" + +namespace dcsctp { + +bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) { + while (!items_.empty()) { + RRSendQueue::OutgoingStream::Item& item = items_.front(); + if (item.message_id.has_value()) { + // Already partially sent messages can always continue to be sent. + return true; + } + + // Message has expired. Remove it and inspect the next one. + if (item.expires_at.has_value() && *item.expires_at <= now) { + buffered_amount_.Decrease(item.remaining_size); + total_buffered_amount_.Decrease(item.remaining_size); + items_.pop_front(); + RTC_DCHECK(IsConsistent()); + continue; + } + + if (is_paused_) { + // The stream has paused (and there is no partially sent message). + return false; + } + return true; + } + return false; +} + +bool RRSendQueue::IsConsistent() const { + size_t total_buffered_amount = 0; + for (const auto& stream_entry : streams_) { + total_buffered_amount += stream_entry.second.buffered_amount().value(); + } + + if (previous_message_has_ended_) { + auto it = streams_.find(current_stream_id_); + if (it != streams_.end() && it->second.has_partially_sent_message()) { + RTC_DLOG(LS_ERROR) + << "Previous message has ended, but still partial message in stream"; + return false; + } + } else { + auto it = streams_.find(current_stream_id_); + if (it == streams_.end() || !it->second.has_partially_sent_message()) { + RTC_DLOG(LS_ERROR) + << "Previous message has NOT ended, but there is no partial message"; + return false; + } + } + + return total_buffered_amount == total_buffered_amount_.value(); +} + +bool RRSendQueue::OutgoingStream::IsConsistent() const { + size_t bytes = 0; + for (const auto& item : items_) { + bytes += item.remaining_size; + } + return bytes == buffered_amount_.value(); +} + +void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) { + RTC_DCHECK(bytes <= value_); + size_t old_value = value_; + value_ -= bytes; + + if (old_value > low_threshold_ && value_ <= low_threshold_) { + on_threshold_reached_(); + } +} + +void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) { + // Betting on https://github.com/w3c/webrtc-pc/issues/2654 being accepted. + if (low_threshold_ < value_ && low_threshold >= value_) { + on_threshold_reached_(); + } + low_threshold_ = low_threshold; +} + +void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, + absl::optional<TimeMs> expires_at, + const SendOptions& send_options) { + buffered_amount_.Increase(message.payload().size()); + total_buffered_amount_.Increase(message.payload().size()); + items_.emplace_back(std::move(message), expires_at, send_options); + + RTC_DCHECK(IsConsistent()); +} + +absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce( + TimeMs now, + size_t max_size) { + RTC_DCHECK(!items_.empty()); + + Item* item = &items_.front(); + DcSctpMessage& message = item->message; + + if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) { + RTC_DCHECK(IsConsistent()); + return absl::nullopt; + } + + // Allocate Message ID and SSN when the first fragment is sent. + if (!item->message_id.has_value()) { + MID& mid = + item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_; + item->message_id = mid; + mid = MID(*mid + 1); + } + if (!item->send_options.unordered && !item->ssn.has_value()) { + item->ssn = next_ssn_; + next_ssn_ = SSN(*next_ssn_ + 1); + } + + // Grab the next `max_size` fragment from this message and calculate flags. + rtc::ArrayView<const uint8_t> chunk_payload = + item->message.payload().subview(item->remaining_offset, max_size); + rtc::ArrayView<const uint8_t> message_payload = message.payload(); + Data::IsBeginning is_beginning(chunk_payload.data() == + message_payload.data()); + Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) == + (message_payload.data() + message_payload.size())); + + StreamID stream_id = message.stream_id(); + PPID ppid = message.ppid(); + + // Zero-copy the payload if the message fits in a single chunk. + std::vector<uint8_t> payload = + is_beginning && is_end + ? std::move(message).ReleasePayload() + : std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end()); + + FSN fsn(item->current_fsn); + item->current_fsn = FSN(*item->current_fsn + 1); + buffered_amount_.Decrease(payload.size()); + total_buffered_amount_.Decrease(payload.size()); + + SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)), + item->message_id.value(), fsn, ppid, + std::move(payload), is_beginning, is_end, + item->send_options.unordered)); + chunk.max_retransmissions = item->send_options.max_retransmissions; + chunk.expires_at = item->expires_at; + + if (is_end) { + // The entire message has been sent, and its last data copied to `chunk`, so + // it can safely be discarded. + items_.pop_front(); + } else { + item->remaining_offset += chunk_payload.size(); + item->remaining_size -= chunk_payload.size(); + RTC_DCHECK(item->remaining_offset + item->remaining_size == + item->message.payload().size()); + RTC_DCHECK(item->remaining_size > 0); + } + RTC_DCHECK(IsConsistent()); + return chunk; +} + +bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, + MID message_id) { + bool result = false; + if (!items_.empty()) { + Item& item = items_.front(); + if (item.send_options.unordered == unordered && + item.message_id.has_value() && *item.message_id == message_id) { + buffered_amount_.Decrease(item.remaining_size); + total_buffered_amount_.Decrease(item.remaining_size); + items_.pop_front(); + // As the item still existed, it had unsent data. + result = true; + } + } + RTC_DCHECK(IsConsistent()); + return result; +} + +void RRSendQueue::OutgoingStream::Pause() { + is_paused_ = true; + + // A stream is paused when it's about to be reset. In this implementation, + // it will throw away all non-partially send messages. This is subject to + // change. It will however not discard any partially sent messages - only + // whole messages. Partially delivered messages (at the time of receiving a + // Stream Reset command) will always deliver all the fragments before + // actually resetting the stream. + for (auto it = items_.begin(); it != items_.end();) { + if (it->remaining_offset == 0) { + buffered_amount_.Decrease(it->remaining_size); + total_buffered_amount_.Decrease(it->remaining_size); + it = items_.erase(it); + } else { + ++it; + } + } + RTC_DCHECK(IsConsistent()); +} + +void RRSendQueue::OutgoingStream::Reset() { + if (!items_.empty()) { + // If this message has been partially sent, reset it so that it will be + // re-sent. + auto& item = items_.front(); + buffered_amount_.Increase(item.message.payload().size() - + item.remaining_size); + total_buffered_amount_.Increase(item.message.payload().size() - + item.remaining_size); + item.remaining_offset = 0; + item.remaining_size = item.message.payload().size(); + item.message_id = absl::nullopt; + item.ssn = absl::nullopt; + item.current_fsn = FSN(0); + } + is_paused_ = false; + next_ordered_mid_ = MID(0); + next_unordered_mid_ = MID(0); + next_ssn_ = SSN(0); + RTC_DCHECK(IsConsistent()); +} + +bool RRSendQueue::OutgoingStream::has_partially_sent_message() const { + if (items_.empty()) { + return false; + } + return items_.front().message_id.has_value(); +} + +void RRSendQueue::Add(TimeMs now, + DcSctpMessage message, + const SendOptions& send_options) { + RTC_DCHECK(!message.payload().empty()); + // Any limited lifetime should start counting from now - when the message + // has been added to the queue. + absl::optional<TimeMs> expires_at = absl::nullopt; + if (send_options.lifetime.has_value()) { + // `expires_at` is the time when it expires. Which is slightly larger than + // the message's lifetime, as the message is alive during its entire + // lifetime (which may be zero). + expires_at = now + *send_options.lifetime + DurationMs(1); + } + GetOrCreateStreamInfo(message.stream_id()) + .Add(std::move(message), expires_at, send_options); + RTC_DCHECK(IsConsistent()); +} + +bool RRSendQueue::IsFull() const { + return total_buffered_amount() >= buffer_size_; +} + +bool RRSendQueue::IsEmpty() const { + return total_buffered_amount() == 0; +} + +std::map<StreamID, RRSendQueue::OutgoingStream>::iterator +RRSendQueue::GetNextStream(TimeMs now) { + auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1)); + + for (auto it = start_it; it != streams_.end(); ++it) { + if (it->second.HasDataToSend(now)) { + current_stream_id_ = it->first; + return it; + } + } + + for (auto it = streams_.begin(); it != start_it; ++it) { + if (it->second.HasDataToSend(now)) { + current_stream_id_ = it->first; + return it; + } + } + return streams_.end(); +} + +absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now, + size_t max_size) { + std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it; + + if (previous_message_has_ended_) { + // Previous message has ended. Round-robin to a different stream, if there + // even is one with data to send. + stream_it = GetNextStream(now); + if (stream_it == streams_.end()) { + RTC_DLOG(LS_VERBOSE) + << log_prefix_ + << "There is no stream with data; Can't produce any data."; + return absl::nullopt; + } + } else { + // The previous message has not ended; Continue from the current stream. + stream_it = streams_.find(current_stream_id_); + RTC_DCHECK(stream_it != streams_.end()); + } + + absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size); + if (data.has_value()) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type=" + << (data->data.is_unordered ? "unordered" : "ordered") + << "::" + << (*data->data.is_beginning && *data->data.is_end + ? "complete" + : *data->data.is_beginning + ? "first" + : *data->data.is_end ? "last" : "middle") + << ", stream_id=" << *stream_it->first + << ", ppid=" << *data->data.ppid + << ", length=" << data->data.payload.size(); + + previous_message_has_ended_ = *data->data.is_end; + } + + RTC_DCHECK(IsConsistent()); + return data; +} + +bool RRSendQueue::Discard(IsUnordered unordered, + StreamID stream_id, + MID message_id) { + bool has_discarded = + GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); + if (has_discarded) { + // Only partially sent messages are discarded, so if a message was + // discarded, then it was the currently sent message. + previous_message_has_ended_ = true; + } + + return has_discarded; +} + +void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) { + for (StreamID stream_id : streams) { + GetOrCreateStreamInfo(stream_id).Pause(); + } + RTC_DCHECK(IsConsistent()); +} + +bool RRSendQueue::CanResetStreams() const { + // Streams can be reset if those streams that are paused don't have any + // messages that are partially sent. + for (auto& stream : streams_) { + if (stream.second.is_paused() && + stream.second.has_partially_sent_message()) { + return false; + } + } + return true; +} + +void RRSendQueue::CommitResetStreams() { + Reset(); + RTC_DCHECK(IsConsistent()); +} + +void RRSendQueue::RollbackResetStreams() { + for (auto& stream_entry : streams_) { + stream_entry.second.Resume(); + } + RTC_DCHECK(IsConsistent()); +} + +void RRSendQueue::Reset() { + // Recalculate buffered amount, as partially sent messages may have been put + // fully back in the queue. + for (auto& stream_entry : streams_) { + OutgoingStream& stream = stream_entry.second; + stream.Reset(); + } + previous_message_has_ended_ = true; +} + +size_t RRSendQueue::buffered_amount(StreamID stream_id) const { + auto it = streams_.find(stream_id); + if (it == streams_.end()) { + return 0; + } + return it->second.buffered_amount().value(); +} + +size_t RRSendQueue::buffered_amount_low_threshold(StreamID stream_id) const { + auto it = streams_.find(stream_id); + if (it == streams_.end()) { + return 0; + } + return it->second.buffered_amount().low_threshold(); +} + +void RRSendQueue::SetBufferedAmountLowThreshold(StreamID stream_id, + size_t bytes) { + GetOrCreateStreamInfo(stream_id).buffered_amount().SetLowThreshold(bytes); +} + +RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( + StreamID stream_id) { + auto it = streams_.find(stream_id); + if (it != streams_.end()) { + return it->second; + } + + return streams_ + .emplace(stream_id, + OutgoingStream( + [this, stream_id]() { on_buffered_amount_low_(stream_id); }, + total_buffered_amount_)) + .first->second; +} +} // namespace dcsctp |