diff options
author | Victor Boivie <boivie@webrtc.org> | 2023-12-01 20:47:05 +0100 |
---|---|---|
committer | WebRTC LUCI CQ <webrtc-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2023-12-04 15:10:15 +0000 |
commit | 9abc4865a4fd5b456fd42d5903cda740f033414a (patch) | |
tree | beccc54f6276da87fd651393c8849a17c6488744 | |
parent | fe66dda7339bb1f868e514fdb847bf41fbca5ab1 (diff) | |
download | webrtc-9abc4865a4fd5b456fd42d5903cda740f033414a.tar.gz |
dcsctp: Use std::deque for outstanding_data
A std::map is a fairly inefficient data structure. Accessing an item
is O(log(N)), but as every item is a separate allocation, iterating it
and searching for items is not very kind to the data caches.
As the outstanding data is a contiguous list (no gaps) where you only
append to the end and remove from the front, use a std::deque instead.
Bug: webrtc:15699
Co-authored-by: Daniel Collins <dpcollins@google.com>
Change-Id: I1f5fe97d06204c75b2b9553856af24e50f2ce715
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/329422
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41310}
-rw-r--r-- | net/dcsctp/tx/outstanding_data.cc | 176 | ||||
-rw-r--r-- | net/dcsctp/tx/outstanding_data.h | 11 |
2 files changed, 110 insertions, 77 deletions
diff --git a/net/dcsctp/tx/outstanding_data.cc b/net/dcsctp/tx/outstanding_data.cc index 1e584064a6..5c1bbd09ce 100644 --- a/net/dcsctp/tx/outstanding_data.cc +++ b/net/dcsctp/tx/outstanding_data.cc @@ -19,6 +19,7 @@ #include "net/dcsctp/common/math.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/public/types.h" +#include "rtc_base/checks.h" #include "rtc_base/logging.h" namespace dcsctp { @@ -86,7 +87,9 @@ bool OutstandingData::IsConsistent() const { to_be_fast_retransmitted_.end()); std::set<UnwrappedTSN> actual_combined_to_be_retransmitted; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (item.is_outstanding()) { actual_outstanding_bytes += GetSerializedChunkSize(item.data()); ++actual_outstanding_items; @@ -103,22 +106,22 @@ bool OutstandingData::IsConsistent() const { } void OutstandingData::AckChunk(AckInfo& ack_info, - std::map<UnwrappedTSN, Item>::iterator iter) { - if (!iter->second.is_acked()) { - size_t serialized_size = GetSerializedChunkSize(iter->second.data()); + UnwrappedTSN tsn, + Item& item) { + if (!item.is_acked()) { + size_t serialized_size = GetSerializedChunkSize(item.data()); ack_info.bytes_acked += serialized_size; - if (iter->second.is_outstanding()) { + if (item.is_outstanding()) { outstanding_bytes_ -= serialized_size; --outstanding_items_; } - if (iter->second.should_be_retransmitted()) { - RTC_DCHECK(to_be_fast_retransmitted_.find(iter->first) == + if (item.should_be_retransmitted()) { + RTC_DCHECK(to_be_fast_retransmitted_.find(tsn) == to_be_fast_retransmitted_.end()); - to_be_retransmitted_.erase(iter->first); + to_be_retransmitted_.erase(tsn); } - iter->second.Ack(); - ack_info.highest_tsn_acked = - std::max(ack_info.highest_tsn_acked, iter->first); + item.Ack(); + ack_info.highest_tsn_acked = std::max(ack_info.highest_tsn_acked, tsn); } } @@ -141,24 +144,43 @@ OutstandingData::AckInfo OutstandingData::HandleSack( return ack_info; } +OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) { + RTC_DCHECK(tsn > last_cumulative_tsn_ack_); + RTC_DCHECK(tsn < next_tsn()); + int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1; + RTC_DCHECK(index >= 0); + RTC_DCHECK(index < static_cast<int>(outstanding_data_.size())); + return outstanding_data_[index]; +} + +const OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) const { + RTC_DCHECK(tsn > last_cumulative_tsn_ack_); + RTC_DCHECK(tsn < next_tsn()); + int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1; + RTC_DCHECK(index >= 0); + RTC_DCHECK(index < static_cast<int>(outstanding_data_.size())); + return outstanding_data_[index]; +} + void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info) { - auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack); - - for (auto iter = outstanding_data_.begin(); iter != first_unacked; ++iter) { - AckChunk(ack_info, iter); - if (iter->second.lifecycle_id().IsSet()) { - RTC_DCHECK(iter->second.data().is_end); - if (iter->second.is_abandoned()) { - ack_info.abandoned_lifecycle_ids.push_back(iter->second.lifecycle_id()); + while (!outstanding_data_.empty() && + last_cumulative_tsn_ack_ < cumulative_tsn_ack) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_.next_value(); + Item& item = outstanding_data_.front(); + AckChunk(ack_info, tsn, item); + if (item.lifecycle_id().IsSet()) { + RTC_DCHECK(item.data().is_end); + if (item.is_abandoned()) { + ack_info.abandoned_lifecycle_ids.push_back(item.lifecycle_id()); } else { - ack_info.acked_lifecycle_ids.push_back(iter->second.lifecycle_id()); + ack_info.acked_lifecycle_ids.push_back(item.lifecycle_id()); } } + outstanding_data_.pop_front(); + last_cumulative_tsn_ack_.Increment(); } - outstanding_data_.erase(outstanding_data_.begin(), first_unacked); - last_cumulative_tsn_ack_ = cumulative_tsn_ack; stream_reset_breakpoint_tsns_.erase(stream_reset_breakpoint_tsns_.begin(), stream_reset_breakpoint_tsns_.upper_bound( cumulative_tsn_ack.next_value())); @@ -174,12 +196,13 @@ void OutstandingData::AckGapBlocks( // handled differently. for (auto& block : gap_ack_blocks) { - auto start = outstanding_data_.lower_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start)); - auto end = outstanding_data_.upper_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end)); - for (auto iter = start; iter != end; ++iter) { - AckChunk(ack_info, iter); + UnwrappedTSN start = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); + UnwrappedTSN end = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); + for (UnwrappedTSN tsn = start; tsn <= end; tsn = tsn.next_value()) { + if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) { + Item& item = GetItem(tsn); + AckChunk(ack_info, tsn, item); + } } } } @@ -214,13 +237,13 @@ void OutstandingData::NackBetweenAckBlocks( for (auto& block : gap_ack_blocks) { UnwrappedTSN cur_block_first_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); - for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked); - iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) { - if (iter->first <= max_tsn_to_nack) { - ack_info.has_packet_loss |= - NackItem(iter->first, iter->second, /*retransmit_now=*/false, - /*do_fast_retransmit=*/!is_in_fast_recovery); - } + for (UnwrappedTSN tsn = prev_block_last_acked.next_value(); + tsn < cur_block_first_acked && tsn <= max_tsn_to_nack; + tsn = tsn.next_value()) { + Item& item = GetItem(tsn); + ack_info.has_packet_loss |= + NackItem(tsn, item, /*retransmit_now=*/false, + /*do_fast_retransmit=*/!is_in_fast_recovery); } prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); } @@ -275,14 +298,10 @@ void OutstandingData::AbandonAllFor(const Item& item) { Data::IsBeginning(false), Data::IsEnd(true), item.data().is_unordered); UnwrappedTSN tsn = next_tsn(); - Item& added_item = - outstanding_data_ - .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple( - item.message_id(), std::move(message_end), - Timestamp::Zero(), MaxRetransmits(0), - Timestamp::PlusInfinity(), LifecycleId::NotSet())) - .first->second; + Item& added_item = outstanding_data_.emplace_back( + item.message_id(), std::move(message_end), Timestamp::Zero(), + MaxRetransmits(0), Timestamp::PlusInfinity(), LifecycleId::NotSet()); + // The added chunk shouldn't be included in `outstanding_bytes`, so set it // as acked. added_item.Ack(); @@ -290,7 +309,9 @@ void OutstandingData::AbandonAllFor(const Item& item) { << *tsn.Wrap(); } - for (auto& [tsn, other] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (Item& other : outstanding_data_) { + tsn.Increment(); if (!other.is_abandoned() && other.data().stream_id == item.data().stream_id && other.message_id() == item.message_id()) { @@ -312,9 +333,7 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit( for (auto it = chunks.begin(); it != chunks.end();) { UnwrappedTSN tsn = *it; - auto elem = outstanding_data_.find(tsn); - RTC_DCHECK(elem != outstanding_data_.end()); - Item& item = elem->second; + Item& item = GetItem(tsn); RTC_DCHECK(item.should_be_retransmitted()); RTC_DCHECK(!item.is_outstanding()); RTC_DCHECK(!item.is_abandoned()); @@ -368,7 +387,9 @@ std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted( } void OutstandingData::ExpireOutstandingChunks(Timestamp now) { - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); // Chunks that are nacked can be expired. Care should be taken not to expire // unacked (in-flight) chunks as they might have been received, but the SACK // is either delayed or in-flight and may be received later. @@ -404,20 +425,17 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert( outstanding_bytes_ += chunk_size; ++outstanding_items_; UnwrappedTSN tsn = next_tsn(); - auto it = outstanding_data_ - .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple(message_id, data.Clone(), - time_sent, max_retransmissions, - expires_at, lifecycle_id)) - .first; - - if (it->second.has_expired(time_sent)) { + Item& item = outstanding_data_.emplace_back(message_id, data.Clone(), + time_sent, max_retransmissions, + expires_at, lifecycle_id); + + if (item.has_expired(time_sent)) { // No need to send it - it was expired when it was in the send // queue. - RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " - << *it->first.Wrap() << " and message " - << *it->second.data().mid << " as expired"; - AbandonAllFor(it->second); + RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " << *tsn.Wrap() + << " and message " << *item.data().mid + << " as expired"; + AbandonAllFor(item); RTC_DCHECK(IsConsistent()); return absl::nullopt; } @@ -427,7 +445,9 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert( } void OutstandingData::NackAll() { - for (auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (Item& item : outstanding_data_) { + tsn.Increment(); if (!item.is_acked()) { NackItem(tsn, item, /*retransmit_now=*/true, /*do_fast_retransmit=*/false); @@ -438,14 +458,16 @@ void OutstandingData::NackAll() { webrtc::TimeDelta OutstandingData::MeasureRTT(Timestamp now, UnwrappedTSN tsn) const { - auto it = outstanding_data_.find(tsn); - if (it != outstanding_data_.end() && !it->second.has_been_retransmitted()) { - // https://tools.ietf.org/html/rfc4960#section-6.3.1 - // "Karn's algorithm: RTT measurements MUST NOT be made using - // packets that were retransmitted (and thus for which it is ambiguous - // whether the reply was for the first instance of the chunk or for a - // later instance)" - return now - it->second.time_sent(); + if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) { + const Item& item = GetItem(tsn); + if (!item.has_been_retransmitted()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.1 + // "Karn's algorithm: RTT measurements MUST NOT be made using + // packets that were retransmitted (and thus for which it is ambiguous + // whether the reply was for the first instance of the chunk or for a + // later instance)" + return now - item.time_sent(); + } } return webrtc::TimeDelta::PlusInfinity(); } @@ -454,7 +476,9 @@ std::vector<std::pair<TSN, OutstandingData::State>> OutstandingData::GetChunkStatesForTesting() const { std::vector<std::pair<TSN, State>> states; states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked); - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); State state; if (item.is_abandoned()) { state = State::kAbandoned; @@ -475,9 +499,7 @@ OutstandingData::GetChunkStatesForTesting() const { bool OutstandingData::ShouldSendForwardTsn() const { if (!outstanding_data_.empty()) { - auto it = outstanding_data_.begin(); - return it->first == last_cumulative_tsn_ack_.next_value() && - it->second.is_abandoned(); + return outstanding_data_.front().is_abandoned(); } return false; } @@ -486,7 +508,9 @@ ForwardTsnChunk OutstandingData::CreateForwardTsn() const { std::map<StreamID, SSN> skipped_per_ordered_stream; UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (stream_reset_breakpoint_tsns_.contains(tsn) || (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; @@ -510,7 +534,9 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const { std::map<std::pair<IsUnordered, StreamID>, MID> skipped_per_stream; UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (stream_reset_breakpoint_tsns_.contains(tsn) || (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; diff --git a/net/dcsctp/tx/outstanding_data.h b/net/dcsctp/tx/outstanding_data.h index 406f80bbc1..509e10c451 100644 --- a/net/dcsctp/tx/outstanding_data.h +++ b/net/dcsctp/tx/outstanding_data.h @@ -10,6 +10,7 @@ #ifndef NET_DCSCTP_TX_OUTSTANDING_DATA_H_ #define NET_DCSCTP_TX_OUTSTANDING_DATA_H_ +#include <deque> #include <map> #include <set> #include <utility> @@ -290,6 +291,9 @@ class OutstandingData { // Returns how large a chunk will be, serialized, carrying the data size_t GetSerializedChunkSize(const Data& data) const; + Item& GetItem(UnwrappedTSN tsn); + const Item& GetItem(UnwrappedTSN tsn) const; + // Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items // in the retransmission queue up until this value and will update `ack_info` // by setting `bytes_acked_by_cumulative_tsn_ack`. @@ -313,7 +317,7 @@ class OutstandingData { // Process the acknowledgement of the chunk referenced by `iter` and updates // state in `ack_info` and the object's state. - void AckChunk(AckInfo& ack_info, std::map<UnwrappedTSN, Item>::iterator iter); + void AckChunk(AckInfo& ack_info, UnwrappedTSN tsn, Item& item); // Helper method to process an incoming nack of an item and perform the // correct operations given the action indicated when nacking an item (e.g. @@ -346,7 +350,10 @@ class OutstandingData { // Callback when to discard items from the send queue. std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue_; - std::map<UnwrappedTSN, Item> outstanding_data_; + // Outstanding items. If non-empty, the first element has + // `TSN=last_cumulative_tsn_ack_ + 1` and the following items are in strict + // increasing TSN order. The last item has `TSN=highest_outstanding_tsn()`. + std::deque<Item> outstanding_data_; // The number of bytes that are in-flight (sent but not yet acked or nacked). size_t outstanding_bytes_ = 0; // The number of DATA chunks that are in-flight (sent but not yet acked or |