aboutsummaryrefslogtreecommitdiff
path: root/net/dcsctp/rx/reassembly_queue.cc
blob: ae672731c0f8e75ac9a3d3c88ffd01c6ebc024fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/*
 *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */
#include "net/dcsctp/rx/reassembly_queue.h"

#include <stddef.h>

#include <algorithm>
#include <cstdint>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
#include "net/dcsctp/rx/reassembly_streams.h"
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/str_join.h"

namespace dcsctp {
namespace {
std::unique_ptr<ReassemblyStreams> CreateStreams(
    absl::string_view log_prefix,
    ReassemblyStreams::OnAssembledMessage on_assembled_message,
    bool use_message_interleaving) {
  if (use_message_interleaving) {
    return std::make_unique<InterleavedReassemblyStreams>(
        log_prefix, std::move(on_assembled_message));
  }
  return std::make_unique<TraditionalReassemblyStreams>(
      log_prefix, std::move(on_assembled_message));
}
}  // namespace

ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
                                 TSN peer_initial_tsn,
                                 size_t max_size_bytes,
                                 bool use_message_interleaving)
    : log_prefix_(log_prefix),
      max_size_bytes_(max_size_bytes),
      watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
      last_assembled_tsn_watermark_(
          tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
      last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)),
      streams_(CreateStreams(
          log_prefix_,
          [this](rtc::ArrayView<const UnwrappedTSN> tsns,
                 DcSctpMessage message) {
            AddReassembledMessage(tsns, std::move(message));
          },
          use_message_interleaving)) {}

void ReassemblyQueue::Add(TSN tsn, Data data) {
  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
                       << ", stream=" << *data.stream_id << ":" << *data.mid
                       << ":" << *data.fsn << ", type="
                       << (data.is_beginning && data.is_end ? "complete"
                           : data.is_beginning              ? "first"
                           : data.is_end                    ? "last"
                                                            : "middle");

  UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);

  // If a stream reset has been received with a "sender's last assigned tsn" in
  // the future, the socket is in "deferred reset processing" mode and must
  // buffer chunks until it's exited.
  if (deferred_reset_streams_.has_value() &&
      unwrapped_tsn > deferred_reset_streams_->sender_last_assigned_tsn &&
      deferred_reset_streams_->streams.contains(data.stream_id)) {
    RTC_DLOG(LS_VERBOSE)
        << log_prefix_ << "Deferring chunk with tsn=" << *tsn
        << ", sid=" << *data.stream_id << " until tsn="
        << *deferred_reset_streams_->sender_last_assigned_tsn.Wrap();
    // https://tools.ietf.org/html/rfc6525#section-5.2.2
    // "In this mode, any data arriving with a TSN larger than the
    // Sender's Last Assigned TSN for the affected stream(s) MUST be queued
    // locally and held until the cumulative acknowledgment point reaches the
    // Sender's Last Assigned TSN."
    queued_bytes_ += data.size();
    deferred_reset_streams_->deferred_actions.push_back(
        [this, tsn, data = std::move(data)]() mutable {
          queued_bytes_ -= data.size();
          Add(tsn, std::move(data));
        });
  } else {
    queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
  }

  // https://tools.ietf.org/html/rfc4960#section-6.9
  // "Note: If the data receiver runs out of buffer space while still
  // waiting for more fragments to complete the reassembly of the message, it
  // should dispatch part of its inbound message through a partial delivery
  // API (see Section 10), freeing some of its receive buffer space so that
  // the rest of the message may be received."

  // TODO(boivie): Support EOR flag and partial delivery?
  RTC_DCHECK(IsConsistent());
}

void ReassemblyQueue::ResetStreamsAndLeaveDeferredReset(
    rtc::ArrayView<const StreamID> stream_ids) {
  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Resetting streams: ["
                       << StrJoin(stream_ids, ",",
                                  [](rtc::StringBuilder& sb, StreamID sid) {
                                    sb << *sid;
                                  })
                       << "]";

  // https://tools.ietf.org/html/rfc6525#section-5.2.2
  // "... streams MUST be reset to 0 as the next expected SSN."
  streams_->ResetStreams(stream_ids);

  if (deferred_reset_streams_.has_value()) {
    RTC_DLOG(LS_VERBOSE) << log_prefix_
                         << "Leaving deferred reset processing, feeding back "
                         << deferred_reset_streams_->deferred_actions.size()
                         << " actions";
    // https://tools.ietf.org/html/rfc6525#section-5.2.2
    // "Any queued TSNs (queued at step E2) MUST now be released and processed
    // normally."
    auto deferred_actions =
        std::move(deferred_reset_streams_->deferred_actions);
    deferred_reset_streams_ = absl::nullopt;

    for (auto& action : deferred_actions) {
      action();
    }
  }

  RTC_DCHECK(IsConsistent());
}

void ReassemblyQueue::EnterDeferredReset(
    TSN sender_last_assigned_tsn,
    rtc::ArrayView<const StreamID> streams) {
  if (!deferred_reset_streams_.has_value()) {
    RTC_DLOG(LS_VERBOSE) << log_prefix_
                         << "Entering deferred reset; sender_last_assigned_tsn="
                         << *sender_last_assigned_tsn;
    deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(
        tsn_unwrapper_.Unwrap(sender_last_assigned_tsn),
        webrtc::flat_set<StreamID>(streams.begin(), streams.end()));
  }
  RTC_DCHECK(IsConsistent());
}

std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
  std::vector<DcSctpMessage> ret;
  reassembled_messages_.swap(ret);
  return ret;
}

void ReassemblyQueue::AddReassembledMessage(
    rtc::ArrayView<const UnwrappedTSN> tsns,
    DcSctpMessage message) {
  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
                       << StrJoin(tsns, ",",
                                  [](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
                                    sb << *tsn.Wrap();
                                  })
                       << "], message; stream_id=" << *message.stream_id()
                       << ", ppid=" << *message.ppid()
                       << ", payload=" << message.payload().size() << " bytes";

  for (const UnwrappedTSN tsn : tsns) {
    if (tsn == last_assembled_tsn_watermark_.next_value()) {
      // Update watermark, or insert into delivered_tsns_
      last_assembled_tsn_watermark_.Increment();
    } else {
      delivered_tsns_.insert(tsn);
    }
  }

  // With new TSNs in delivered_tsns, gaps might be filled.
  MaybeMoveLastAssembledWatermarkFurther();

  reassembled_messages_.emplace_back(std::move(message));
}

void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
  // `delivered_tsns_` contain TSNS when there is a gap between ranges of
  // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to
  // that list, because if so, it can be moved.
  while (!delivered_tsns_.empty() &&
         *delivered_tsns_.begin() ==
             last_assembled_tsn_watermark_.next_value()) {
    last_assembled_tsn_watermark_.Increment();
    delivered_tsns_.erase(delivered_tsns_.begin());
  }
}

void ReassemblyQueue::HandleForwardTsn(
    TSN new_cumulative_tsn,
    rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
  UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(new_cumulative_tsn);

  if (deferred_reset_streams_.has_value() &&
      tsn > deferred_reset_streams_->sender_last_assigned_tsn) {
    RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
                         << "- deferring.";
    deferred_reset_streams_->deferred_actions.emplace_back(
        [this, new_cumulative_tsn,
         streams = std::vector<AnyForwardTsnChunk::SkippedStream>(
             skipped_streams.begin(), skipped_streams.end())] {
          HandleForwardTsn(new_cumulative_tsn, streams);
        });
    RTC_DCHECK(IsConsistent());
    return;
  }

  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
                       << " - performing.";
  last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
  delivered_tsns_.erase(delivered_tsns_.begin(),
                        delivered_tsns_.upper_bound(tsn));
  MaybeMoveLastAssembledWatermarkFurther();
  queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams);
  RTC_DCHECK(IsConsistent());
}

bool ReassemblyQueue::IsConsistent() const {
  // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be
  // adjacent.
  if (!delivered_tsns_.empty() &&
      last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) {
    return false;
  }

  // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
  // enforced in this class. This comparison will still trigger if queued_bytes_
  // became "negative".
  return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
}

HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
  HandoverReadinessStatus status = streams_->GetHandoverReadiness();
  if (!delivered_tsns_.empty()) {
    status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
  }
  if (deferred_reset_streams_.has_value()) {
    status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
  }
  return status;
}

void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
  state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
  state.rx.last_completed_deferred_reset_req_sn =
      last_completed_reset_req_seq_nbr_.value();
  streams_->AddHandoverState(state);
}

void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
  // Validate that the component is in pristine state.
  RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0));

  last_assembled_tsn_watermark_ =
      tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn));
  last_completed_reset_req_seq_nbr_ =
      ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn);
  streams_->RestoreFromState(state);
}
}  // namespace dcsctp