diff options
Diffstat (limited to 'modules/rtp_rtcp/source/rtp_rtcp_impl2.cc')
-rw-r--r-- | modules/rtp_rtcp/source/rtp_rtcp_impl2.cc | 121 |
1 files changed, 110 insertions, 11 deletions
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc index 78ccf9907f..77054576a8 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc @@ -19,11 +19,17 @@ #include <string> #include <utility> +#include "absl/types/optional.h" +#include "api/sequence_checker.h" #include "api/transport/field_trial_based_config.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "modules/rtp_rtcp/source/rtcp_packet/dlrr.h" #include "modules/rtp_rtcp/source/rtp_rtcp_config.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" +#include "rtc_base/time_utils.h" #include "system_wrappers/include/ntp_time.h" #ifdef _WIN32 @@ -37,6 +43,22 @@ const int64_t kRtpRtcpMaxIdleTimeProcessMs = 5; const int64_t kDefaultExpectedRetransmissionTimeMs = 125; constexpr TimeDelta kRttUpdateInterval = TimeDelta::Millis(1000); + +RTCPSender::Configuration AddRtcpSendEvaluationCallback( + RTCPSender::Configuration config, + std::function<void(TimeDelta)> send_evaluation_callback) { + config.schedule_next_rtcp_send_evaluation_function = + std::move(send_evaluation_callback); + return config; +} + +int DelayMillisForDuration(TimeDelta duration) { + // TimeDelta::ms() rounds downwards sometimes which leads to too little time + // slept. Account for this, unless |duration| is exactly representable in + // millisecs. + return (duration.us() + rtc::kNumMillisecsPerSec - 1) / + rtc::kNumMicrosecsPerMillisec; +} } // namespace ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext( @@ -55,7 +77,11 @@ void ModuleRtpRtcpImpl2::RtpSenderContext::AssignSequenceNumber( ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration) : worker_queue_(TaskQueueBase::Current()), - rtcp_sender_(configuration), + rtcp_sender_(AddRtcpSendEvaluationCallback( + RTCPSender::Configuration::FromRtpRtcpConfiguration(configuration), + [this](TimeDelta duration) { + ScheduleRtcpSendEvaluation(duration); + })), rtcp_receiver_(configuration, this), clock_(configuration.clock), last_rtt_process_time_(clock_->TimeInMilliseconds()), @@ -69,6 +95,7 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration) rtt_ms_(0) { RTC_DCHECK(worker_queue_); process_thread_checker_.Detach(); + packet_sequence_checker_.Detach(); if (!configuration.receiver_only) { rtp_sender_ = std::make_unique<RtpSenderContext>(configuration); // Make sure rtcp sender use same timestamp offset as rtp sender. @@ -135,11 +162,6 @@ void ModuleRtpRtcpImpl2::Process() { rtcp_sender_.SetTargetBitrate(target_bitrate); } } - - // TODO(bugs.webrtc.org/11581): Run this on a separate set of delayed tasks - // based off of next_time_to_send_rtcp_ in RTCPSender. - if (rtcp_sender_.TimeToSendRTCPReport()) - rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport); } void ModuleRtpRtcpImpl2::SetRtxSendStatus(int mode) { @@ -169,6 +191,7 @@ absl::optional<uint32_t> ModuleRtpRtcpImpl2::FlexfecSsrc() const { void ModuleRtpRtcpImpl2::IncomingRtcpPacket(const uint8_t* rtcp_packet, const size_t length) { + RTC_DCHECK_RUN_ON(&packet_sequence_checker_); rtcp_receiver_.IncomingPacket(rtcp_packet, length); } @@ -219,6 +242,12 @@ RtpState ModuleRtpRtcpImpl2::GetRtxState() const { return rtp_sender_->packet_generator.GetRtxRtpState(); } +uint32_t ModuleRtpRtcpImpl2::local_media_ssrc() const { + RTC_DCHECK_RUN_ON(&packet_sequence_checker_); + RTC_DCHECK_EQ(rtcp_receiver_.local_media_ssrc(), rtcp_sender_.SSRC()); + return rtcp_receiver_.local_media_ssrc(); +} + void ModuleRtpRtcpImpl2::SetRid(const std::string& rid) { if (rtp_sender_) { rtp_sender_->packet_generator.SetRid(rid); @@ -286,9 +315,7 @@ RTCPSender::FeedbackState ModuleRtpRtcpImpl2::GetFeedbackState() { int32_t ModuleRtpRtcpImpl2::SetSendingStatus(const bool sending) { if (rtcp_sender_.Sending() != sending) { // Sends RTCP BYE when going from true to false - if (rtcp_sender_.SetSendingStatus(GetFeedbackState(), sending) != 0) { - RTC_LOG(LS_WARNING) << "Failed to send RTCP BYE"; - } + rtcp_sender_.SetSendingStatus(GetFeedbackState(), sending); } return 0; } @@ -330,7 +357,16 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp, if (!Sending()) return false; - rtcp_sender_.SetLastRtpTime(timestamp, capture_time_ms, payload_type); + // TODO(bugs.webrtc.org/12873): Migrate this method and it's users to use + // optional Timestamps. + absl::optional<Timestamp> capture_time; + if (capture_time_ms > 0) { + capture_time = Timestamp::Millis(capture_time_ms); + } + absl::optional<int> payload_type_optional; + if (payload_type >= 0) + payload_type_optional = payload_type; + rtcp_sender_.SetLastRtpTime(timestamp, capture_time, payload_type_optional); // Make sure an RTCP report isn't queued behind a key frame. if (rtcp_sender_.TimeToSendRTCPReport(force_sender_report)) rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport); @@ -652,8 +688,15 @@ void ModuleRtpRtcpImpl2::SetRemoteSSRC(const uint32_t ssrc) { rtcp_receiver_.SetRemoteSSRC(ssrc); } +void ModuleRtpRtcpImpl2::SetLocalSsrc(uint32_t local_ssrc) { + RTC_DCHECK_RUN_ON(&packet_sequence_checker_); + rtcp_receiver_.set_local_media_ssrc(local_ssrc); + rtcp_sender_.SetSsrc(local_ssrc); +} + RtpSendRates ModuleRtpRtcpImpl2::GetSendRates() const { - RTC_DCHECK_RUN_ON(worker_queue_); + // Typically called on the `rtp_transport_queue_` owned by an + // RtpTransportControllerSendInterface instance. return rtp_sender_->packet_sender.GetSendRates(); } @@ -746,4 +789,60 @@ void ModuleRtpRtcpImpl2::PeriodicUpdate() { rtcp_receiver_.NotifyTmmbrUpdated(); } +// RTC_RUN_ON(worker_queue_); +void ModuleRtpRtcpImpl2::MaybeSendRtcp() { + if (rtcp_sender_.TimeToSendRTCPReport()) + rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport); +} + +// TODO(bugs.webrtc.org/12889): Consider removing this function when the issue +// is resolved. +// RTC_RUN_ON(worker_queue_); +void ModuleRtpRtcpImpl2::MaybeSendRtcpAtOrAfterTimestamp( + Timestamp execution_time) { + Timestamp now = clock_->CurrentTime(); + if (now >= execution_time) { + MaybeSendRtcp(); + return; + } + + RTC_DLOG(LS_WARNING) + << "BUGBUG: Task queue scheduled delayed call too early."; + + ScheduleMaybeSendRtcpAtOrAfterTimestamp(execution_time, execution_time - now); +} + +void ModuleRtpRtcpImpl2::ScheduleRtcpSendEvaluation(TimeDelta duration) { + // We end up here under various sequences including the worker queue, and + // the RTCPSender lock is held. + // We're assuming that the fact that RTCPSender executes under other sequences + // than the worker queue on which it's created on implies that external + // synchronization is present and removes this activity before destruction. + if (duration.IsZero()) { + worker_queue_->PostTask(ToQueuedTask(task_safety_, [this] { + RTC_DCHECK_RUN_ON(worker_queue_); + MaybeSendRtcp(); + })); + } else { + Timestamp execution_time = clock_->CurrentTime() + duration; + ScheduleMaybeSendRtcpAtOrAfterTimestamp(execution_time, duration); + } +} + +void ModuleRtpRtcpImpl2::ScheduleMaybeSendRtcpAtOrAfterTimestamp( + Timestamp execution_time, + TimeDelta duration) { + // We end up here under various sequences including the worker queue, and + // the RTCPSender lock is held. + // See note in ScheduleRtcpSendEvaluation about why |worker_queue_| can be + // accessed. + worker_queue_->PostDelayedTask( + ToQueuedTask(task_safety_, + [this, execution_time] { + RTC_DCHECK_RUN_ON(worker_queue_); + MaybeSendRtcpAtOrAfterTimestamp(execution_time); + }), + DelayMillisForDuration(duration)); +} + } // namespace webrtc |