aboutsummaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorVictor Boivie <boivie@webrtc.org>2022-05-24 21:56:28 +0200
committerWebRTC LUCI CQ <webrtc-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-07-01 13:53:14 +0000
commit8967672f6da421ced7f23d3a54b9bfdd457f683b (patch)
tree4920f76ec9e0620434f271a0e81ecfaa8797e955 /net
parent74680c0234f5d3c4b27acb05e6ad010e9e837d92 (diff)
downloadwebrtc-8967672f6da421ced7f23d3a54b9bfdd457f683b.tar.gz
dcsctp: Refactor send queue (1/2)
Let the OutgoingStream reference the parent instead of passing references to individual items it needs, as follow-up CLs will add even more items. No functional change - pure refactoring. Bug: webrtc:5696 Change-Id: I914e590c0d90e898d7d230a16170cf4faff2338c Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/264142 Reviewed-by: Florent Castelli <orphis@webrtc.org> Commit-Queue: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37398}
Diffstat (limited to 'net')
-rw-r--r--net/dcsctp/tx/rr_send_queue.cc23
-rw-r--r--net/dcsctp/tx/rr_send_queue.h14
2 files changed, 17 insertions, 20 deletions
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 174d19b77c..bee9d515b8 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -123,7 +123,7 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
const SendOptions& send_options) {
bool was_active = bytes_to_send_in_next_message() > 0;
buffered_amount_.Increase(message.payload().size());
- total_buffered_amount_.Increase(message.payload().size());
+ parent_.total_buffered_amount_.Increase(message.payload().size());
items_.emplace_back(std::move(message), expires_at, send_options);
if (!was_active) {
@@ -148,7 +148,7 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
// Oops, this entire message has already expired. Try the next one.
if (item.expires_at <= now) {
buffered_amount_.Decrease(item.remaining_size);
- total_buffered_amount_.Decrease(item.remaining_size);
+ parent_.total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front();
continue;
}
@@ -184,7 +184,7 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
FSN fsn(item.current_fsn);
item.current_fsn = FSN(*item.current_fsn + 1);
buffered_amount_.Decrease(payload.size());
- total_buffered_amount_.Decrease(payload.size());
+ parent_.total_buffered_amount_.Decrease(payload.size());
SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)),
item.message_id.value(), fsn, ppid,
@@ -232,7 +232,7 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
if (item.send_options.unordered == unordered &&
item.message_id.has_value() && *item.message_id == message_id) {
buffered_amount_.Decrease(item.remaining_size);
- total_buffered_amount_.Decrease(item.remaining_size);
+ parent_.total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front();
// Only partially sent messages are discarded, so if a message was
@@ -278,7 +278,7 @@ void RRSendQueue::OutgoingStream::Pause() {
for (auto it = items_.begin(); it != items_.end();) {
if (it->remaining_offset == 0) {
buffered_amount_.Decrease(it->remaining_size);
- total_buffered_amount_.Decrease(it->remaining_size);
+ parent_.total_buffered_amount_.Decrease(it->remaining_size);
it = items_.erase(it);
} else {
++it;
@@ -320,8 +320,8 @@ void RRSendQueue::OutgoingStream::Reset() {
auto& item = items_.front();
buffered_amount_.Increase(item.message.payload().size() -
item.remaining_size);
- total_buffered_amount_.Increase(item.message.payload().size() -
- item.remaining_size);
+ parent_.total_buffered_amount_.Increase(item.message.payload().size() -
+ item.remaining_size);
item.remaining_offset = 0;
item.remaining_size = item.message.payload().size();
item.message_id = absl::nullopt;
@@ -474,9 +474,8 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
return streams_
.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
std::forward_as_tuple(
- &scheduler_, stream_id, default_priority_,
- [this, stream_id]() { on_buffered_amount_low_(stream_id); },
- total_buffered_amount_))
+ this, &scheduler_, stream_id, default_priority_,
+ [this, stream_id]() { on_buffered_amount_low_(stream_id); }))
.first->second;
}
@@ -520,9 +519,9 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
streams_.emplace(
std::piecewise_construct, std::forward_as_tuple(stream_id),
std::forward_as_tuple(
- &scheduler_, stream_id, StreamPriority(state_stream.priority),
+ this, &scheduler_, stream_id, StreamPriority(state_stream.priority),
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
- total_buffered_amount_, &state_stream));
+ &state_stream));
}
}
} // namespace dcsctp
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index 49c36feab5..8e6085f6b8 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -120,18 +120,18 @@ class RRSendQueue : public SendQueue {
class OutgoingStream : public StreamScheduler::StreamProducer {
public:
OutgoingStream(
+ RRSendQueue* parent,
StreamScheduler* scheduler,
StreamID stream_id,
StreamPriority priority,
std::function<void()> on_buffered_amount_low,
- ThresholdWatcher& total_buffered_amount,
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
- : scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
+ : parent_(*parent),
+ scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
next_ssn_(SSN(state ? state->next_ssn : 0)),
- buffered_amount_(std::move(on_buffered_amount_low)),
- total_buffered_amount_(total_buffered_amount) {}
+ buffered_amount_(std::move(on_buffered_amount_low)) {}
StreamID stream_id() const { return scheduler_stream_->stream_id(); }
@@ -230,6 +230,8 @@ class RRSendQueue : public SendQueue {
bool IsConsistent() const;
+ RRSendQueue& parent_;
+
const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
PauseState pause_state_ = PauseState::kNotPaused;
@@ -243,10 +245,6 @@ class RRSendQueue : public SendQueue {
// The current amount of buffered data.
ThresholdWatcher buffered_amount_;
-
- // Reference to the total buffered amount, which is updated directly by each
- // stream.
- ThresholdWatcher& total_buffered_amount_;
};
bool IsConsistent() const;