aboutsummaryrefslogtreecommitdiff
path: root/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
blob: 6ab3ad80faf556c61074351cdd523244cae938fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/*
 *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */
#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h"
#include "absl/algorithm/container.h"
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"

namespace webrtc {
namespace {
static const size_t kMaxPacketsInHistory = 5000;
}
void TransportFeedbackDemuxer::RegisterStreamFeedbackObserver(
    std::vector<uint32_t> ssrcs,
    StreamFeedbackObserver* observer) {
  MutexLock lock(&observers_lock_);
  RTC_DCHECK(observer);
  RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) {
               return pair.second == observer;
             }) == observers_.end());
  observers_.push_back({ssrcs, observer});
}

void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver(
    StreamFeedbackObserver* observer) {
  MutexLock lock(&observers_lock_);
  RTC_DCHECK(observer);
  const auto it = absl::c_find_if(
      observers_, [=](const auto& pair) { return pair.second == observer; });
  RTC_DCHECK(it != observers_.end());
  observers_.erase(it);
}

void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
  MutexLock lock(&lock_);

  StreamFeedbackObserver::StreamPacketInfo info;
  info.ssrc = packet_info.media_ssrc;
  info.rtp_sequence_number = packet_info.rtp_sequence_number;
  info.received = false;
  info.is_retransmission =
      packet_info.packet_type == RtpPacketMediaType::kRetransmission;
  history_.insert(
      {seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number), info});

  while (history_.size() > kMaxPacketsInHistory) {
    history_.erase(history_.begin());
  }
}

void TransportFeedbackDemuxer::OnTransportFeedback(
    const rtcp::TransportFeedback& feedback) {
  std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
  {
    MutexLock lock(&lock_);
    for (const auto& packet : feedback.GetAllPackets()) {
      int64_t seq_num =
          seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
      auto it = history_.find(seq_num);
      if (it != history_.end()) {
        auto packet_info = it->second;
        packet_info.received = packet.received();
        stream_feedbacks.push_back(packet_info);
        if (packet.received())
          history_.erase(it);
      }
    }
  }

  MutexLock lock(&observers_lock_);
  for (auto& observer : observers_) {
    std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
    for (const auto& packet_info : stream_feedbacks) {
      if (absl::c_count(observer.first, packet_info.ssrc) > 0) {
        selected_feedback.push_back(packet_info);
      }
    }
    if (!selected_feedback.empty()) {
      observer.second->OnPacketFeedbackVector(std::move(selected_feedback));
    }
  }
}

}  // namespace webrtc