diff options
Diffstat (limited to 'audio/channel_receive.cc')
-rw-r--r-- | audio/channel_receive.cc | 246 |
1 files changed, 145 insertions, 101 deletions
diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc index 44a647b7a6..150e2074e4 100644 --- a/audio/channel_receive.cc +++ b/audio/channel_receive.cc @@ -10,8 +10,6 @@ #include "audio/channel_receive.h" -#include <assert.h> - #include <algorithm> #include <map> #include <memory> @@ -23,6 +21,7 @@ #include "api/frame_transformer_interface.h" #include "api/rtc_event_log/rtc_event_log.h" #include "api/sequence_checker.h" +#include "api/task_queue/task_queue_base.h" #include "audio/audio_level.h" #include "audio/channel_receive_frame_transformer_delegate.h" #include "audio/channel_send.h" @@ -34,7 +33,8 @@ #include "modules/pacing/packet_router.h" #include "modules/rtp_rtcp/include/receive_statistics.h" #include "modules/rtp_rtcp/include/remote_ntp_time_estimator.h" -#include "modules/rtp_rtcp/source/absolute_capture_time_receiver.h" +#include "modules/rtp_rtcp/source/absolute_capture_time_interpolator.h" +#include "modules/rtp_rtcp/source/capture_clock_offset_updater.h" #include "modules/rtp_rtcp/source/rtp_header_extensions.h" #include "modules/rtp_rtcp/source/rtp_packet_received.h" #include "modules/rtp_rtcp/source/rtp_rtcp_config.h" @@ -47,6 +47,9 @@ #include "rtc_base/numerics/safe_minmax.h" #include "rtc_base/race_checker.h" #include "rtc_base/synchronization/mutex.h" +#include "rtc_base/system/no_unique_address.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/metrics.h" @@ -174,44 +177,51 @@ class ChannelReceive : public ChannelReceiveInterface { rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer) override; + void SetFrameDecryptor(rtc::scoped_refptr<webrtc::FrameDecryptorInterface> + frame_decryptor) override; + + void OnLocalSsrcChange(uint32_t local_ssrc) override; + uint32_t GetLocalSsrc() const override; + private: void ReceivePacket(const uint8_t* packet, size_t packet_length, - const RTPHeader& header); + const RTPHeader& header) + RTC_RUN_ON(worker_thread_checker_); int ResendPackets(const uint16_t* sequence_numbers, int length); - void UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms); + void UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) + RTC_RUN_ON(worker_thread_checker_); int GetRtpTimestampRateHz() const; int64_t GetRTT() const; void OnReceivedPayloadData(rtc::ArrayView<const uint8_t> payload, - const RTPHeader& rtpHeader); + const RTPHeader& rtpHeader) + RTC_RUN_ON(worker_thread_checker_); void InitFrameTransformerDelegate( - rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer); - - bool Playing() const { - MutexLock lock(&playing_lock_); - return playing_; - } + rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer) + RTC_RUN_ON(worker_thread_checker_); // Thread checkers document and lock usage of some methods to specific threads // we know about. The goal is to eventually split up voe::ChannelReceive into // parts with single-threaded semantics, and thereby reduce the need for // locks. - SequenceChecker worker_thread_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_; + + TaskQueueBase* const worker_thread_; + ScopedTaskSafety worker_safety_; // Methods accessed from audio and video threads are checked for sequential- // only access. We don't necessarily own and control these threads, so thread // checkers cannot be used. E.g. Chromium may transfer "ownership" from one // audio thread to another, but access is still sequential. rtc::RaceChecker audio_thread_race_checker_; - rtc::RaceChecker video_capture_thread_race_checker_; Mutex callback_mutex_; Mutex volume_settings_mutex_; - mutable Mutex playing_lock_; - bool playing_ RTC_GUARDED_BY(&playing_lock_) = false; + bool playing_ RTC_GUARDED_BY(worker_thread_checker_) = false; RtcEventLog* const event_log_; @@ -225,11 +235,10 @@ class ChannelReceive : public ChannelReceiveInterface { // Info for GetSyncInfo is updated on network or worker thread, and queried on // the worker thread. - mutable Mutex sync_info_lock_; absl::optional<uint32_t> last_received_rtp_timestamp_ - RTC_GUARDED_BY(&sync_info_lock_); + RTC_GUARDED_BY(&worker_thread_checker_); absl::optional<int64_t> last_received_rtp_system_time_ms_ - RTC_GUARDED_BY(&sync_info_lock_); + RTC_GUARDED_BY(&worker_thread_checker_); // The AcmReceiver is thread safe, using its own lock. acm2::AcmReceiver acm_receiver_; @@ -242,15 +251,14 @@ class ChannelReceive : public ChannelReceiveInterface { // Timestamp of the audio pulled from NetEq. absl::optional<uint32_t> jitter_buffer_playout_timestamp_; - mutable Mutex video_sync_lock_; - uint32_t playout_timestamp_rtp_ RTC_GUARDED_BY(video_sync_lock_); + uint32_t playout_timestamp_rtp_ RTC_GUARDED_BY(worker_thread_checker_); absl::optional<int64_t> playout_timestamp_rtp_time_ms_ - RTC_GUARDED_BY(video_sync_lock_); - uint32_t playout_delay_ms_ RTC_GUARDED_BY(video_sync_lock_); + RTC_GUARDED_BY(worker_thread_checker_); + uint32_t playout_delay_ms_ RTC_GUARDED_BY(worker_thread_checker_); absl::optional<int64_t> playout_timestamp_ntp_ - RTC_GUARDED_BY(video_sync_lock_); + RTC_GUARDED_BY(worker_thread_checker_); absl::optional<int64_t> playout_timestamp_ntp_time_ms_ - RTC_GUARDED_BY(video_sync_lock_); + RTC_GUARDED_BY(worker_thread_checker_); mutable Mutex ts_stats_lock_; @@ -266,26 +274,39 @@ class ChannelReceive : public ChannelReceiveInterface { float _outputGain RTC_GUARDED_BY(volume_settings_mutex_); const ChannelSendInterface* associated_send_channel_ - RTC_GUARDED_BY(worker_thread_checker_); + RTC_GUARDED_BY(network_thread_checker_); PacketRouter* packet_router_ = nullptr; SequenceChecker construction_thread_; // E2EE Audio Frame Decryption - rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor_; + rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor_ + RTC_GUARDED_BY(worker_thread_checker_); webrtc::CryptoOptions crypto_options_; - webrtc::AbsoluteCaptureTimeReceiver absolute_capture_time_receiver_; + webrtc::AbsoluteCaptureTimeInterpolator absolute_capture_time_interpolator_ + RTC_GUARDED_BY(worker_thread_checker_); + + webrtc::CaptureClockOffsetUpdater capture_clock_offset_updater_; rtc::scoped_refptr<ChannelReceiveFrameTransformerDelegate> frame_transformer_delegate_; + + // Counter that's used to control the frequency of reporting histograms + // from the `GetAudioFrameWithInfo` callback. + int audio_frame_interval_count_ RTC_GUARDED_BY(audio_thread_race_checker_) = + 0; + // Controls how many callbacks we let pass by before reporting callback stats. + // A value of 100 means 100 callbacks, each one of which represents 10ms worth + // of data, so the stats reporting frequency will be 1Hz (modulo failures). + constexpr static int kHistogramReportingInterval = 100; }; void ChannelReceive::OnReceivedPayloadData( rtc::ArrayView<const uint8_t> payload, const RTPHeader& rtpHeader) { - if (!Playing()) { + if (!playing_) { // Avoid inserting into NetEQ when we are not playing. Count the // packet as discarded. @@ -299,7 +320,7 @@ void ChannelReceive::OnReceivedPayloadData( // updating RtpSource information. if (source_tracker_) { RtpPacketInfos::vector_type packet_vector = { - RtpPacketInfo(rtpHeader, clock_->TimeInMilliseconds())}; + RtpPacketInfo(rtpHeader, clock_->CurrentTime())}; source_tracker_->OnFrameDelivered(RtpPacketInfos(packet_vector)); } @@ -328,18 +349,20 @@ void ChannelReceive::InitFrameTransformerDelegate( rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer) { RTC_DCHECK(frame_transformer); RTC_DCHECK(!frame_transformer_delegate_); + RTC_DCHECK(worker_thread_->IsCurrent()); // Pass a callback to ChannelReceive::OnReceivedPayloadData, to be called by // the delegate to receive transformed audio. ChannelReceiveFrameTransformerDelegate::ReceiveFrameCallback receive_audio_callback = [this](rtc::ArrayView<const uint8_t> packet, const RTPHeader& header) { + RTC_DCHECK_RUN_ON(&worker_thread_checker_); OnReceivedPayloadData(packet, header); }; frame_transformer_delegate_ = - new rtc::RefCountedObject<ChannelReceiveFrameTransformerDelegate>( + rtc::make_ref_counted<ChannelReceiveFrameTransformerDelegate>( std::move(receive_audio_callback), std::move(frame_transformer), - rtc::Thread::Current()); + worker_thread_); frame_transformer_delegate_->Init(); } @@ -434,17 +457,37 @@ AudioMixer::Source::AudioFrameInfo ChannelReceive::GetAudioFrameWithInfo( } } - { - RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.TargetJitterBufferDelayMs", - acm_receiver_.TargetDelayMs()); - const int jitter_buffer_delay = acm_receiver_.FilteredCurrentDelayMs(); - MutexLock lock(&video_sync_lock_); - RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDelayEstimateMs", - jitter_buffer_delay + playout_delay_ms_); - RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverJitterBufferDelayMs", - jitter_buffer_delay); - RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDeviceDelayMs", - playout_delay_ms_); + // Fill in local capture clock offset in |audio_frame->packet_infos_|. + RtpPacketInfos::vector_type packet_infos; + for (auto& packet_info : audio_frame->packet_infos_) { + absl::optional<int64_t> local_capture_clock_offset; + if (packet_info.absolute_capture_time().has_value()) { + local_capture_clock_offset = + capture_clock_offset_updater_.AdjustEstimatedCaptureClockOffset( + packet_info.absolute_capture_time() + ->estimated_capture_clock_offset); + } + RtpPacketInfo new_packet_info(packet_info); + new_packet_info.set_local_capture_clock_offset(local_capture_clock_offset); + packet_infos.push_back(std::move(new_packet_info)); + } + audio_frame->packet_infos_ = RtpPacketInfos(packet_infos); + + ++audio_frame_interval_count_; + if (audio_frame_interval_count_ >= kHistogramReportingInterval) { + audio_frame_interval_count_ = 0; + worker_thread_->PostTask(ToQueuedTask(worker_safety_, [this]() { + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.TargetJitterBufferDelayMs", + acm_receiver_.TargetDelayMs()); + const int jitter_buffer_delay = acm_receiver_.FilteredCurrentDelayMs(); + RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDelayEstimateMs", + jitter_buffer_delay + playout_delay_ms_); + RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverJitterBufferDelayMs", + jitter_buffer_delay); + RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDeviceDelayMs", + playout_delay_ms_); + })); } return muted ? AudioMixer::Source::AudioFrameInfo::kMuted @@ -480,7 +523,8 @@ ChannelReceive::ChannelReceive( rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor, const webrtc::CryptoOptions& crypto_options, rtc::scoped_refptr<FrameTransformerInterface> frame_transformer) - : event_log_(rtc_event_log), + : worker_thread_(TaskQueueBase::Current()), + event_log_(rtc_event_log), rtp_receive_statistics_(ReceiveStatistics::Create(clock)), remote_ssrc_(remote_ssrc), acm_receiver_(AcmConfig(neteq_factory, @@ -502,10 +546,13 @@ ChannelReceive::ChannelReceive( associated_send_channel_(nullptr), frame_decryptor_(frame_decryptor), crypto_options_(crypto_options), - absolute_capture_time_receiver_(clock) { + absolute_capture_time_interpolator_(clock) { + RTC_DCHECK(worker_thread_); RTC_DCHECK(module_process_thread_); RTC_DCHECK(audio_device_module); + network_thread_checker_.Detach(); + acm_receiver_.ResetInitialDelay(); acm_receiver_.SetMinimumDelay(0); acm_receiver_.SetMaximumDelay(0); @@ -540,7 +587,7 @@ ChannelReceive::ChannelReceive( } ChannelReceive::~ChannelReceive() { - RTC_DCHECK(construction_thread_.IsCurrent()); + RTC_DCHECK_RUN_ON(&construction_thread_); // Unregister the module before stopping playout etc, to match the order // things were set up in the ctor. @@ -561,13 +608,11 @@ void ChannelReceive::SetSink(AudioSinkInterface* sink) { void ChannelReceive::StartPlayout() { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - MutexLock lock(&playing_lock_); playing_ = true; } void ChannelReceive::StopPlayout() { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - MutexLock lock(&playing_lock_); playing_ = false; _outputAudioLevel.ResetLevelFullRange(); } @@ -595,11 +640,8 @@ void ChannelReceive::OnRtpPacket(const RtpPacketReceived& packet) { // UpdatePlayoutTimestamp and int64_t now_ms = rtc::TimeMillis(); - { - MutexLock lock(&sync_info_lock_); - last_received_rtp_timestamp_ = packet.Timestamp(); - last_received_rtp_system_time_ms_ = now_ms; - } + last_received_rtp_timestamp_ = packet.Timestamp(); + last_received_rtp_system_time_ms_ = now_ms; // Store playout timestamp for the received RTP packet UpdatePlayoutTimestamp(false, now_ms); @@ -618,9 +660,9 @@ void ChannelReceive::OnRtpPacket(const RtpPacketReceived& packet) { // Interpolates absolute capture timestamp RTP header extension. header.extension.absolute_capture_time = - absolute_capture_time_receiver_.OnReceivePacket( - AbsoluteCaptureTimeReceiver::GetSource(header.ssrc, - header.arrOfCSRCs), + absolute_capture_time_interpolator_.OnReceivePacket( + AbsoluteCaptureTimeInterpolator::GetSource(header.ssrc, + header.arrOfCSRCs), header.timestamp, rtc::saturated_cast<uint32_t>(packet_copy.payload_type_frequency()), header.extension.absolute_capture_time); @@ -632,7 +674,7 @@ void ChannelReceive::ReceivePacket(const uint8_t* packet, size_t packet_length, const RTPHeader& header) { const uint8_t* payload = packet + header.headerLength; - assert(packet_length >= header.headerLength); + RTC_DCHECK_GE(packet_length, header.headerLength); size_t payload_length = packet_length - header.headerLength; size_t payload_data_length = payload_length - header.paddingLength; @@ -713,7 +755,7 @@ void ChannelReceive::ReceivedRTCPPacket(const uint8_t* data, size_t length) { absl::optional<int64_t> remote_to_local_clock_offset_ms = ntp_estimator_.EstimateRemoteToLocalClockOffsetMs(); if (remote_to_local_clock_offset_ms.has_value()) { - absolute_capture_time_receiver_.SetRemoteToLocalClockOffset( + capture_clock_offset_updater_.SetRemoteToLocalClockOffset( Int64MsToQ32x32(*remote_to_local_clock_offset_ms)); } } @@ -838,8 +880,7 @@ int ChannelReceive::ResendPackets(const uint16_t* sequence_numbers, void ChannelReceive::SetAssociatedSendChannel( const ChannelSendInterface* channel) { - // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); associated_send_channel_ = channel; } @@ -848,11 +889,33 @@ void ChannelReceive::SetDepacketizerToDecoderFrameTransformer( RTC_DCHECK_RUN_ON(&worker_thread_checker_); // Depending on when the channel is created, the transformer might be set // twice. Don't replace the delegate if it was already initialized. - if (!frame_transformer || frame_transformer_delegate_) + if (!frame_transformer || frame_transformer_delegate_) { + RTC_NOTREACHED() << "Not setting the transformer?"; return; + } + InitFrameTransformerDelegate(std::move(frame_transformer)); } +void ChannelReceive::SetFrameDecryptor( + rtc::scoped_refptr<webrtc::FrameDecryptorInterface> frame_decryptor) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + frame_decryptor_ = std::move(frame_decryptor); +} + +void ChannelReceive::OnLocalSsrcChange(uint32_t local_ssrc) { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + rtp_rtcp_->SetLocalSsrc(local_ssrc); +} + +uint32_t ChannelReceive::GetLocalSsrc() const { + // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + return rtp_rtcp_->local_media_ssrc(); +} + NetworkStatistics ChannelReceive::GetNetworkStatistics( bool get_and_clear_legacy_stats) const { RTC_DCHECK_RUN_ON(&worker_thread_checker_); @@ -870,14 +933,8 @@ AudioDecodingCallStats ChannelReceive::GetDecodingCallStatistics() const { uint32_t ChannelReceive::GetDelayEstimate() const { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - - uint32_t playout_delay; - { - MutexLock lock(&video_sync_lock_); - playout_delay = playout_delay_ms_; - } // Return the current jitter buffer delay + playout delay. - return acm_receiver_.FilteredCurrentDelayMs() + playout_delay; + return acm_receiver_.FilteredCurrentDelayMs() + playout_delay_ms_; } bool ChannelReceive::SetMinimumPlayoutDelay(int delay_ms) { @@ -899,21 +956,17 @@ bool ChannelReceive::SetMinimumPlayoutDelay(int delay_ms) { bool ChannelReceive::GetPlayoutRtpTimestamp(uint32_t* rtp_timestamp, int64_t* time_ms) const { - RTC_DCHECK_RUNS_SERIALIZED(&video_capture_thread_race_checker_); - { - MutexLock lock(&video_sync_lock_); - if (!playout_timestamp_rtp_time_ms_) - return false; - *rtp_timestamp = playout_timestamp_rtp_; - *time_ms = playout_timestamp_rtp_time_ms_.value(); - return true; - } + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + if (!playout_timestamp_rtp_time_ms_) + return false; + *rtp_timestamp = playout_timestamp_rtp_; + *time_ms = playout_timestamp_rtp_time_ms_.value(); + return true; } void ChannelReceive::SetEstimatedPlayoutNtpTimestampMs(int64_t ntp_timestamp_ms, int64_t time_ms) { - RTC_DCHECK_RUNS_SERIALIZED(&video_capture_thread_race_checker_); - MutexLock lock(&video_sync_lock_); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); playout_timestamp_ntp_ = ntp_timestamp_ms; playout_timestamp_ntp_time_ms_ = time_ms; } @@ -921,7 +974,6 @@ void ChannelReceive::SetEstimatedPlayoutNtpTimestampMs(int64_t ntp_timestamp_ms, absl::optional<int64_t> ChannelReceive::GetCurrentEstimatedPlayoutNtpTimestampMs(int64_t now_ms) const { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - MutexLock lock(&video_sync_lock_); if (!playout_timestamp_ntp_ || !playout_timestamp_ntp_time_ms_) return absl::nullopt; @@ -951,24 +1003,19 @@ absl::optional<Syncable::Info> ChannelReceive::GetSyncInfo() const { return absl::nullopt; } - { - MutexLock lock(&sync_info_lock_); - if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) { - return absl::nullopt; - } - info.latest_received_capture_timestamp = *last_received_rtp_timestamp_; - info.latest_receive_time_ms = *last_received_rtp_system_time_ms_; + if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) { + return absl::nullopt; } + info.latest_received_capture_timestamp = *last_received_rtp_timestamp_; + info.latest_receive_time_ms = *last_received_rtp_system_time_ms_; int jitter_buffer_delay = acm_receiver_.FilteredCurrentDelayMs(); - { - MutexLock lock(&video_sync_lock_); - info.current_delay_ms = jitter_buffer_delay + playout_delay_ms_; - } + info.current_delay_ms = jitter_buffer_delay + playout_delay_ms_; return info; } +// RTC_RUN_ON(worker_thread_checker_) void ChannelReceive::UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) { // TODO(bugs.webrtc.org/11993): Expect to be called exclusively on the // network thread. Once that's done, we won't need video_sync_lock_. @@ -995,14 +1042,11 @@ void ChannelReceive::UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) { // Remove the playout delay. playout_timestamp -= (delay_ms * (GetRtpTimestampRateHz() / 1000)); - { - MutexLock lock(&video_sync_lock_); - if (!rtcp && playout_timestamp != playout_timestamp_rtp_) { - playout_timestamp_rtp_ = playout_timestamp; - playout_timestamp_rtp_time_ms_ = now_ms; - } - playout_delay_ms_ = delay_ms; + if (!rtcp && playout_timestamp != playout_timestamp_rtp_) { + playout_timestamp_rtp_ = playout_timestamp; + playout_timestamp_rtp_time_ms_ = now_ms; } + playout_delay_ms_ = delay_ms; } int ChannelReceive::GetRtpTimestampRateHz() const { @@ -1020,7 +1064,7 @@ int ChannelReceive::GetRtpTimestampRateHz() const { } int64_t ChannelReceive::GetRTT() const { - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(&network_thread_checker_); std::vector<ReportBlockData> report_blocks = rtp_rtcp_->GetLatestReportBlockData(); @@ -1067,8 +1111,8 @@ std::unique_ptr<ChannelReceiveInterface> CreateChannelReceive( rtcp_send_transport, rtc_event_log, local_ssrc, remote_ssrc, jitter_buffer_max_packets, jitter_buffer_fast_playout, jitter_buffer_min_delay_ms, jitter_buffer_enable_rtx_handling, - decoder_factory, codec_pair_id, frame_decryptor, crypto_options, - std::move(frame_transformer)); + decoder_factory, codec_pair_id, std::move(frame_decryptor), + crypto_options, std::move(frame_transformer)); } } // namespace voe |