diff options
Diffstat (limited to 'call/call.cc')
-rw-r--r-- | call/call.cc | 854 |
1 files changed, 478 insertions, 376 deletions
diff --git a/call/call.cc b/call/call.cc index 437f4df2fb..f4a7d7cc9e 100644 --- a/call/call.cc +++ b/call/call.cc @@ -13,12 +13,14 @@ #include <string.h> #include <algorithm> +#include <atomic> #include <map> #include <memory> #include <set> #include <utility> #include <vector> +#include "absl/functional/bind_front.h" #include "absl/types/optional.h" #include "api/rtc_event_log/rtc_event_log.h" #include "api/sequence_checker.h" @@ -32,6 +34,7 @@ #include "call/receive_time_calculator.h" #include "call/rtp_stream_receiver_controller.h" #include "call/rtp_transport_controller_send.h" +#include "call/rtp_transport_controller_send_factory.h" #include "call/version.h" #include "logging/rtc_event_log/events/rtc_event_audio_receive_stream_config.h" #include "logging/rtc_event_log/events/rtc_event_rtcp_packet_incoming.h" @@ -78,12 +81,10 @@ bool SendPeriodicFeedback(const std::vector<RtpExtension>& extensions) { return true; } -// TODO(nisse): This really begs for a shared context struct. -bool UseSendSideBwe(const std::vector<RtpExtension>& extensions, - bool transport_cc) { - if (!transport_cc) +bool UseSendSideBwe(const ReceiveStream::RtpConfig& rtp) { + if (!rtp.transport_cc) return false; - for (const auto& extension : extensions) { + for (const auto& extension : rtp.extensions) { if (extension.uri == RtpExtension::kTransportSequenceNumberUri || extension.uri == RtpExtension::kTransportSequenceNumberV2Uri) return true; @@ -91,18 +92,6 @@ bool UseSendSideBwe(const std::vector<RtpExtension>& extensions, return false; } -bool UseSendSideBwe(const VideoReceiveStream::Config& config) { - return UseSendSideBwe(config.rtp.extensions, config.rtp.transport_cc); -} - -bool UseSendSideBwe(const AudioReceiveStream::Config& config) { - return UseSendSideBwe(config.rtp.extensions, config.rtp.transport_cc); -} - -bool UseSendSideBwe(const FlexfecReceiveStream::Config& config) { - return UseSendSideBwe(config.rtp_header_extensions, config.transport_cc); -} - const int* FindKeyByValue(const std::map<int, int>& m, int v) { for (const auto& kv : m) { if (kv.second == v) @@ -167,34 +156,6 @@ TaskQueueBase* GetCurrentTaskQueueOrThread() { return current; } -// Called from the destructor of Call to report the collected send histograms. -void UpdateSendHistograms(Timestamp now, - Timestamp first_sent_packet, - AvgCounter& estimated_send_bitrate_kbps_counter, - AvgCounter& pacer_bitrate_kbps_counter) { - TimeDelta elapsed = now - first_sent_packet; - if (elapsed.seconds() < metrics::kMinRunTimeInSeconds) - return; - - const int kMinRequiredPeriodicSamples = 5; - AggregatedStats send_bitrate_stats = - estimated_send_bitrate_kbps_counter.ProcessAndGetStats(); - if (send_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.EstimatedSendBitrateInKbps", - send_bitrate_stats.average); - RTC_LOG(LS_INFO) << "WebRTC.Call.EstimatedSendBitrateInKbps, " - << send_bitrate_stats.ToString(); - } - AggregatedStats pacer_bitrate_stats = - pacer_bitrate_kbps_counter.ProcessAndGetStats(); - if (pacer_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.PacerBitrateInKbps", - pacer_bitrate_stats.average); - RTC_LOG(LS_INFO) << "WebRTC.Call.PacerBitrateInKbps, " - << pacer_bitrate_stats.ToString(); - } -} - } // namespace namespace internal { @@ -299,10 +260,6 @@ class Call final : public webrtc::Call, DeliveryStatus DeliverPacket(MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) override; - void DeliverPacketAsync(MediaType media_type, - rtc::CopyOnWriteBuffer packet, - int64_t packet_time_us, - PacketCallback callback) override; // Implements RecoveredPacketReceiver. void OnRecoveredPacket(const uint8_t* packet, size_t length) override; @@ -312,6 +269,12 @@ class Call final : public webrtc::Call, void OnAudioTransportOverheadChanged( int transport_overhead_per_packet) override; + void OnLocalSsrcUpdated(webrtc::AudioReceiveStream& stream, + uint32_t local_ssrc) override; + + void OnUpdateSyncGroup(webrtc::AudioReceiveStream& stream, + const std::string& sync_group) override; + void OnSentPacket(const rtc::SentPacket& sent_packet) override; // Implements TargetTransferRateObserver, @@ -324,45 +287,96 @@ class Call final : public webrtc::Call, void SetClientBitratePreferences(const BitrateSettings& preferences) override; private: - DeliveryStatus DeliverRtcp(MediaType media_type, - const uint8_t* packet, - size_t length) - RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); + // Thread-compatible class that collects received packet stats and exposes + // them as UMA histograms on destruction. + class ReceiveStats { + public: + explicit ReceiveStats(Clock* clock); + ~ReceiveStats(); + + void AddReceivedRtcpBytes(int bytes); + void AddReceivedAudioBytes(int bytes, webrtc::Timestamp arrival_time); + void AddReceivedVideoBytes(int bytes, webrtc::Timestamp arrival_time); + + private: + RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_; + RateCounter received_bytes_per_second_counter_ + RTC_GUARDED_BY(sequence_checker_); + RateCounter received_audio_bytes_per_second_counter_ + RTC_GUARDED_BY(sequence_checker_); + RateCounter received_video_bytes_per_second_counter_ + RTC_GUARDED_BY(sequence_checker_); + RateCounter received_rtcp_bytes_per_second_counter_ + RTC_GUARDED_BY(sequence_checker_); + absl::optional<Timestamp> first_received_rtp_audio_timestamp_ + RTC_GUARDED_BY(sequence_checker_); + absl::optional<Timestamp> last_received_rtp_audio_timestamp_ + RTC_GUARDED_BY(sequence_checker_); + absl::optional<Timestamp> first_received_rtp_video_timestamp_ + RTC_GUARDED_BY(sequence_checker_); + absl::optional<Timestamp> last_received_rtp_video_timestamp_ + RTC_GUARDED_BY(sequence_checker_); + }; + + // Thread-compatible class that collects sent packet stats and exposes + // them as UMA histograms on destruction, provided SetFirstPacketTime was + // called with a non-empty packet timestamp before the destructor. + class SendStats { + public: + explicit SendStats(Clock* clock); + ~SendStats(); + + void SetFirstPacketTime(absl::optional<Timestamp> first_sent_packet_time); + void PauseSendAndPacerBitrateCounters(); + void AddTargetBitrateSample(uint32_t target_bitrate_bps); + void SetMinAllocatableRate(BitrateAllocationLimits limits); + + private: + RTC_NO_UNIQUE_ADDRESS SequenceChecker destructor_sequence_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_; + Clock* const clock_ RTC_GUARDED_BY(destructor_sequence_checker_); + AvgCounter estimated_send_bitrate_kbps_counter_ + RTC_GUARDED_BY(sequence_checker_); + AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(sequence_checker_); + uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(sequence_checker_){ + 0}; + absl::optional<Timestamp> first_sent_packet_time_ + RTC_GUARDED_BY(destructor_sequence_checker_); + }; + + void DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) + RTC_RUN_ON(network_thread_); DeliveryStatus DeliverRtp(MediaType media_type, rtc::CopyOnWriteBuffer packet, - int64_t packet_time_us) - RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); - void ConfigureSync(const std::string& sync_group) - RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); + int64_t packet_time_us) RTC_RUN_ON(worker_thread_); + void ConfigureSync(const std::string& sync_group) RTC_RUN_ON(worker_thread_); void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, MediaType media_type) - RTC_SHARED_LOCKS_REQUIRED(worker_thread_); + RTC_RUN_ON(worker_thread_); - void UpdateReceiveHistograms(); void UpdateAggregateNetworkState(); // Ensure that necessary process threads are started, and any required // callbacks have been registered. - void EnsureStarted() RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); - - rtc::TaskQueue* send_transport_queue() const { - return transport_send_ptr_->GetWorkerQueue(); - } + void EnsureStarted() RTC_RUN_ON(worker_thread_); Clock* const clock_; TaskQueueFactory* const task_queue_factory_; TaskQueueBase* const worker_thread_; TaskQueueBase* const network_thread_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker send_transport_sequence_checker_; const int num_cpu_cores_; const rtc::scoped_refptr<SharedModuleThread> module_process_thread_; const std::unique_ptr<CallStats> call_stats_; const std::unique_ptr<BitrateAllocator> bitrate_allocator_; - Call::Config config_; + const Call::Config config_ RTC_GUARDED_BY(worker_thread_); + // Maps to config_.trials, can be used from any thread via `trials()`. + const WebRtcKeyValueConfig& trials_; - NetworkState audio_network_state_; - NetworkState video_network_state_; + NetworkState audio_network_state_ RTC_GUARDED_BY(worker_thread_); + NetworkState video_network_state_ RTC_GUARDED_BY(worker_thread_); // TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the // network thread. bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_); @@ -380,39 +394,17 @@ class Call final : public webrtc::Call, // TODO(nisse): Should eventually be injected at creation, // with a single object in the bundled case. - RtpStreamReceiverController audio_receiver_controller_; - RtpStreamReceiverController video_receiver_controller_; + RtpStreamReceiverController audio_receiver_controller_ + RTC_GUARDED_BY(worker_thread_); + RtpStreamReceiverController video_receiver_controller_ + RTC_GUARDED_BY(worker_thread_); // This extra map is used for receive processing which is // independent of media type. - // TODO(nisse): In the RTP transport refactoring, we should have a - // single mapping from ssrc to a more abstract receive stream, with - // accessor methods for all configuration we need at this level. - struct ReceiveRtpConfig { - explicit ReceiveRtpConfig(const webrtc::AudioReceiveStream::Config& config) - : extensions(config.rtp.extensions), - use_send_side_bwe(UseSendSideBwe(config)) {} - explicit ReceiveRtpConfig(const webrtc::VideoReceiveStream::Config& config) - : extensions(config.rtp.extensions), - use_send_side_bwe(UseSendSideBwe(config)) {} - explicit ReceiveRtpConfig(const FlexfecReceiveStream::Config& config) - : extensions(config.rtp_header_extensions), - use_send_side_bwe(UseSendSideBwe(config)) {} - - // Registered RTP header extensions for each stream. Note that RTP header - // extensions are negotiated per track ("m= line") in the SDP, but we have - // no notion of tracks at the Call level. We therefore store the RTP header - // extensions per SSRC instead, which leads to some storage overhead. - const RtpHeaderExtensionMap extensions; - // Set if both RTP extension the RTCP feedback message needed for - // send side BWE are negotiated. - const bool use_send_side_bwe; - }; - // TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the // network thread. - std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_ + std::map<uint32_t, ReceiveStream*> receive_rtp_config_ RTC_GUARDED_BY(worker_thread_); // Audio and Video send streams are owned by the client that creates them. @@ -421,6 +413,10 @@ class Call final : public webrtc::Call, std::map<uint32_t, VideoSendStream*> video_send_ssrcs_ RTC_GUARDED_BY(worker_thread_); std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(worker_thread_); + // True if |video_send_streams_| is empty, false if not. The atomic variable + // is used to decide UMA send statistics behavior and enables avoiding a + // PostTask(). + std::atomic<bool> video_send_streams_empty_{true}; // Each forwarder wraps an adaptation resource that was added to the call. std::vector<std::unique_ptr<ResourceVideoSendStreamForwarder>> @@ -434,49 +430,41 @@ class Call final : public webrtc::Call, RtpPayloadStateMap suspended_video_payload_states_ RTC_GUARDED_BY(worker_thread_); - webrtc::RtcEventLog* event_log_; - - // The following members are only accessed (exclusively) from one thread and - // from the destructor, and therefore doesn't need any explicit - // synchronization. - RateCounter received_bytes_per_second_counter_; - RateCounter received_audio_bytes_per_second_counter_; - RateCounter received_video_bytes_per_second_counter_; - RateCounter received_rtcp_bytes_per_second_counter_; - absl::optional<int64_t> first_received_rtp_audio_ms_; - absl::optional<int64_t> last_received_rtp_audio_ms_; - absl::optional<int64_t> first_received_rtp_video_ms_; - absl::optional<int64_t> last_received_rtp_video_ms_; - - uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_); - // TODO(holmer): Remove this lock once BitrateController no longer calls - // OnNetworkChanged from multiple threads. - uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_); - uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_); - AvgCounter estimated_send_bitrate_kbps_counter_ - RTC_GUARDED_BY(worker_thread_); - AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(worker_thread_); + webrtc::RtcEventLog* const event_log_; + + // TODO(bugs.webrtc.org/11993) ready to move stats access to the network + // thread. + ReceiveStats receive_stats_ RTC_GUARDED_BY(worker_thread_); + SendStats send_stats_ RTC_GUARDED_BY(send_transport_sequence_checker_); + // |last_bandwidth_bps_| and |configured_max_padding_bitrate_bps_| being + // atomic avoids a PostTask. The variables are used for stats gathering. + std::atomic<uint32_t> last_bandwidth_bps_{0}; + std::atomic<uint32_t> configured_max_padding_bitrate_bps_{0}; ReceiveSideCongestionController receive_side_cc_; const std::unique_ptr<ReceiveTimeCalculator> receive_time_calculator_; const std::unique_ptr<SendDelayStats> video_send_delay_stats_; - const int64_t start_ms_; + const Timestamp start_of_call_; // Note that |task_safety_| needs to be at a greater scope than the task queue // owned by |transport_send_| since calls might arrive on the network thread // while Call is being deleted and the task queue is being torn down. - ScopedTaskSafety task_safety_; + const ScopedTaskSafety task_safety_; // Caches transport_send_.get(), to avoid racing with destructor. // Note that this is declared before transport_send_ to ensure that it is not // invalidated until no more tasks can be running on the transport_send_ task // queue. - RtpTransportControllerSendInterface* const transport_send_ptr_; + // For more details on the background of this member variable, see: + // https://webrtc-review.googlesource.com/c/src/+/63023/9/call/call.cc + // https://bugs.chromium.org/p/chromium/issues/detail?id=992640 + RtpTransportControllerSendInterface* const transport_send_ptr_ + RTC_GUARDED_BY(send_transport_sequence_checker_); // Declared last since it will issue callbacks from a task queue. Declaring it // last ensures that it is destroyed first and any running tasks are finished. - std::unique_ptr<RtpTransportControllerSendInterface> transport_send_; + const std::unique_ptr<RtpTransportControllerSendInterface> transport_send_; bool is_started_ RTC_GUARDED_BY(worker_thread_) = false; @@ -501,11 +489,6 @@ Call* Call::Create(const Call::Config& config) { rtc::scoped_refptr<SharedModuleThread> call_thread = SharedModuleThread::Create(ProcessThread::Create("ModuleProcessThread"), nullptr); - return Create(config, std::move(call_thread)); -} - -Call* Call::Create(const Call::Config& config, - rtc::scoped_refptr<SharedModuleThread> call_thread) { return Create(config, Clock::GetRealTimeClock(), std::move(call_thread), ProcessThread::Create("PacerThread")); } @@ -515,15 +498,28 @@ Call* Call::Create(const Call::Config& config, rtc::scoped_refptr<SharedModuleThread> call_thread, std::unique_ptr<ProcessThread> pacer_thread) { RTC_DCHECK(config.task_queue_factory); + + RtpTransportControllerSendFactory transport_controller_factory_; + + RtpTransportConfig transportConfig = config.ExtractTransportConfig(); + return new internal::Call( clock, config, - std::make_unique<RtpTransportControllerSend>( - clock, config.event_log, config.network_state_predictor_factory, - config.network_controller_factory, config.bitrate_config, - std::move(pacer_thread), config.task_queue_factory, config.trials), + transport_controller_factory_.Create(transportConfig, clock, + std::move(pacer_thread)), std::move(call_thread), config.task_queue_factory); } +Call* Call::Create(const Call::Config& config, + Clock* clock, + rtc::scoped_refptr<SharedModuleThread> call_thread, + std::unique_ptr<RtpTransportControllerSendInterface> + transportControllerSend) { + RTC_DCHECK(config.task_queue_factory); + return new internal::Call(clock, config, std::move(transportControllerSend), + std::move(call_thread), config.task_queue_factory); +} + class SharedModuleThread::Impl { public: Impl(std::unique_ptr<ProcessThread> process_thread, @@ -628,6 +624,157 @@ VideoSendStream* Call::CreateVideoSendStream( namespace internal { +Call::ReceiveStats::ReceiveStats(Clock* clock) + : received_bytes_per_second_counter_(clock, nullptr, false), + received_audio_bytes_per_second_counter_(clock, nullptr, false), + received_video_bytes_per_second_counter_(clock, nullptr, false), + received_rtcp_bytes_per_second_counter_(clock, nullptr, false) { + sequence_checker_.Detach(); +} + +void Call::ReceiveStats::AddReceivedRtcpBytes(int bytes) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + if (received_bytes_per_second_counter_.HasSample()) { + // First RTP packet has been received. + received_bytes_per_second_counter_.Add(static_cast<int>(bytes)); + received_rtcp_bytes_per_second_counter_.Add(static_cast<int>(bytes)); + } +} + +void Call::ReceiveStats::AddReceivedAudioBytes(int bytes, + webrtc::Timestamp arrival_time) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + received_bytes_per_second_counter_.Add(bytes); + received_audio_bytes_per_second_counter_.Add(bytes); + if (!first_received_rtp_audio_timestamp_) + first_received_rtp_audio_timestamp_ = arrival_time; + last_received_rtp_audio_timestamp_ = arrival_time; +} + +void Call::ReceiveStats::AddReceivedVideoBytes(int bytes, + webrtc::Timestamp arrival_time) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + received_bytes_per_second_counter_.Add(bytes); + received_video_bytes_per_second_counter_.Add(bytes); + if (!first_received_rtp_video_timestamp_) + first_received_rtp_video_timestamp_ = arrival_time; + last_received_rtp_video_timestamp_ = arrival_time; +} + +Call::ReceiveStats::~ReceiveStats() { + RTC_DCHECK_RUN_ON(&sequence_checker_); + if (first_received_rtp_audio_timestamp_) { + RTC_HISTOGRAM_COUNTS_100000( + "WebRTC.Call.TimeReceivingAudioRtpPacketsInSeconds", + (*last_received_rtp_audio_timestamp_ - + *first_received_rtp_audio_timestamp_) + .seconds()); + } + if (first_received_rtp_video_timestamp_) { + RTC_HISTOGRAM_COUNTS_100000( + "WebRTC.Call.TimeReceivingVideoRtpPacketsInSeconds", + (*last_received_rtp_video_timestamp_ - + *first_received_rtp_video_timestamp_) + .seconds()); + } + const int kMinRequiredPeriodicSamples = 5; + AggregatedStats video_bytes_per_sec = + received_video_bytes_per_second_counter_.GetStats(); + if (video_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { + RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.VideoBitrateReceivedInKbps", + video_bytes_per_sec.average * 8 / 1000); + RTC_LOG(LS_INFO) << "WebRTC.Call.VideoBitrateReceivedInBps, " + << video_bytes_per_sec.ToStringWithMultiplier(8); + } + AggregatedStats audio_bytes_per_sec = + received_audio_bytes_per_second_counter_.GetStats(); + if (audio_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { + RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.AudioBitrateReceivedInKbps", + audio_bytes_per_sec.average * 8 / 1000); + RTC_LOG(LS_INFO) << "WebRTC.Call.AudioBitrateReceivedInBps, " + << audio_bytes_per_sec.ToStringWithMultiplier(8); + } + AggregatedStats rtcp_bytes_per_sec = + received_rtcp_bytes_per_second_counter_.GetStats(); + if (rtcp_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { + RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.RtcpBitrateReceivedInBps", + rtcp_bytes_per_sec.average * 8); + RTC_LOG(LS_INFO) << "WebRTC.Call.RtcpBitrateReceivedInBps, " + << rtcp_bytes_per_sec.ToStringWithMultiplier(8); + } + AggregatedStats recv_bytes_per_sec = + received_bytes_per_second_counter_.GetStats(); + if (recv_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { + RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.BitrateReceivedInKbps", + recv_bytes_per_sec.average * 8 / 1000); + RTC_LOG(LS_INFO) << "WebRTC.Call.BitrateReceivedInBps, " + << recv_bytes_per_sec.ToStringWithMultiplier(8); + } +} + +Call::SendStats::SendStats(Clock* clock) + : clock_(clock), + estimated_send_bitrate_kbps_counter_(clock, nullptr, true), + pacer_bitrate_kbps_counter_(clock, nullptr, true) { + destructor_sequence_checker_.Detach(); + sequence_checker_.Detach(); +} + +Call::SendStats::~SendStats() { + RTC_DCHECK_RUN_ON(&destructor_sequence_checker_); + if (!first_sent_packet_time_) + return; + + TimeDelta elapsed = clock_->CurrentTime() - *first_sent_packet_time_; + if (elapsed.seconds() < metrics::kMinRunTimeInSeconds) + return; + + const int kMinRequiredPeriodicSamples = 5; + AggregatedStats send_bitrate_stats = + estimated_send_bitrate_kbps_counter_.ProcessAndGetStats(); + if (send_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { + RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.EstimatedSendBitrateInKbps", + send_bitrate_stats.average); + RTC_LOG(LS_INFO) << "WebRTC.Call.EstimatedSendBitrateInKbps, " + << send_bitrate_stats.ToString(); + } + AggregatedStats pacer_bitrate_stats = + pacer_bitrate_kbps_counter_.ProcessAndGetStats(); + if (pacer_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { + RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.PacerBitrateInKbps", + pacer_bitrate_stats.average); + RTC_LOG(LS_INFO) << "WebRTC.Call.PacerBitrateInKbps, " + << pacer_bitrate_stats.ToString(); + } +} + +void Call::SendStats::SetFirstPacketTime( + absl::optional<Timestamp> first_sent_packet_time) { + RTC_DCHECK_RUN_ON(&destructor_sequence_checker_); + first_sent_packet_time_ = first_sent_packet_time; +} + +void Call::SendStats::PauseSendAndPacerBitrateCounters() { + RTC_DCHECK_RUN_ON(&sequence_checker_); + estimated_send_bitrate_kbps_counter_.ProcessAndPause(); + pacer_bitrate_kbps_counter_.ProcessAndPause(); +} + +void Call::SendStats::AddTargetBitrateSample(uint32_t target_bitrate_bps) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000); + // Pacer bitrate may be higher than bitrate estimate if enforcing min + // bitrate. + uint32_t pacer_bitrate_bps = + std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_); + pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000); +} + +void Call::SendStats::SetMinAllocatableRate(BitrateAllocationLimits limits) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps(); +} + Call::Call(Clock* clock, const Call::Config& config, std::unique_ptr<RtpTransportControllerSendInterface> transport_send, @@ -645,23 +792,22 @@ Call::Call(Clock* clock, call_stats_(new CallStats(clock_, worker_thread_)), bitrate_allocator_(new BitrateAllocator(this)), config_(config), + trials_(*config.trials), audio_network_state_(kNetworkDown), video_network_state_(kNetworkDown), aggregate_network_up_(false), event_log_(config.event_log), - received_bytes_per_second_counter_(clock_, nullptr, true), - received_audio_bytes_per_second_counter_(clock_, nullptr, true), - received_video_bytes_per_second_counter_(clock_, nullptr, true), - received_rtcp_bytes_per_second_counter_(clock_, nullptr, true), - last_bandwidth_bps_(0), - min_allocated_send_bitrate_bps_(0), - configured_max_padding_bitrate_bps_(0), - estimated_send_bitrate_kbps_counter_(clock_, nullptr, true), - pacer_bitrate_kbps_counter_(clock_, nullptr, true), - receive_side_cc_(clock_, transport_send->packet_router()), + receive_stats_(clock_), + send_stats_(clock_), + receive_side_cc_(clock, + absl::bind_front(&PacketRouter::SendCombinedRtcpPacket, + transport_send->packet_router()), + absl::bind_front(&PacketRouter::SendRemb, + transport_send->packet_router()), + /*network_state_estimator=*/nullptr), receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()), video_send_delay_stats_(new SendDelayStats(clock_)), - start_ms_(clock_->TimeInMilliseconds()), + start_of_call_(clock_->CurrentTime()), transport_send_ptr_(transport_send.get()), transport_send_(std::move(transport_send)) { RTC_DCHECK(config.event_log != nullptr); @@ -669,6 +815,8 @@ Call::Call(Clock* clock, RTC_DCHECK(network_thread_); RTC_DCHECK(worker_thread_->IsCurrent()); + send_transport_sequence_checker_.Detach(); + // Do not remove this call; it is here to convince the compiler that the // WebRTC source timestamp string needs to be in the final binary. LoadWebRTCVersionInRegister(); @@ -694,24 +842,11 @@ Call::~Call() { receive_side_cc_.GetRemoteBitrateEstimator(true)); module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_); call_stats_->DeregisterStatsObserver(&receive_side_cc_); + send_stats_.SetFirstPacketTime(transport_send_->GetFirstPacketTime()); - absl::optional<Timestamp> first_sent_packet_time = - transport_send_->GetFirstPacketTime(); - - Timestamp now = clock_->CurrentTime(); - - // Only update histograms after process threads have been shut down, so that - // they won't try to concurrently update stats. - if (first_sent_packet_time) { - UpdateSendHistograms(now, *first_sent_packet_time, - estimated_send_bitrate_kbps_counter_, - pacer_bitrate_kbps_counter_); - } - - UpdateReceiveHistograms(); - - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.LifetimeInSeconds", - (now.ms() - start_ms_) / 1000); + RTC_HISTOGRAM_COUNTS_100000( + "WebRTC.Call.LifetimeInSeconds", + (clock_->CurrentTime() - start_of_call_).seconds()); } void Call::EnsureStarted() { @@ -724,10 +859,10 @@ void Call::EnsureStarted() { // This call seems to kick off a number of things, so probably better left // off being kicked off on request rather than in the ctor. - transport_send_ptr_->RegisterTargetTransferRateObserver(this); + transport_send_->RegisterTargetTransferRateObserver(this); module_process_thread_->EnsureStarted(); - transport_send_ptr_->EnsureStarted(); + transport_send_->EnsureStarted(); } void Call::SetClientBitratePreferences(const BitrateSettings& preferences) { @@ -735,52 +870,6 @@ void Call::SetClientBitratePreferences(const BitrateSettings& preferences) { GetTransportControllerSend()->SetClientBitratePreferences(preferences); } -void Call::UpdateReceiveHistograms() { - if (first_received_rtp_audio_ms_) { - RTC_HISTOGRAM_COUNTS_100000( - "WebRTC.Call.TimeReceivingAudioRtpPacketsInSeconds", - (*last_received_rtp_audio_ms_ - *first_received_rtp_audio_ms_) / 1000); - } - if (first_received_rtp_video_ms_) { - RTC_HISTOGRAM_COUNTS_100000( - "WebRTC.Call.TimeReceivingVideoRtpPacketsInSeconds", - (*last_received_rtp_video_ms_ - *first_received_rtp_video_ms_) / 1000); - } - const int kMinRequiredPeriodicSamples = 5; - AggregatedStats video_bytes_per_sec = - received_video_bytes_per_second_counter_.GetStats(); - if (video_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.VideoBitrateReceivedInKbps", - video_bytes_per_sec.average * 8 / 1000); - RTC_LOG(LS_INFO) << "WebRTC.Call.VideoBitrateReceivedInBps, " - << video_bytes_per_sec.ToStringWithMultiplier(8); - } - AggregatedStats audio_bytes_per_sec = - received_audio_bytes_per_second_counter_.GetStats(); - if (audio_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.AudioBitrateReceivedInKbps", - audio_bytes_per_sec.average * 8 / 1000); - RTC_LOG(LS_INFO) << "WebRTC.Call.AudioBitrateReceivedInBps, " - << audio_bytes_per_sec.ToStringWithMultiplier(8); - } - AggregatedStats rtcp_bytes_per_sec = - received_rtcp_bytes_per_second_counter_.GetStats(); - if (rtcp_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.RtcpBitrateReceivedInBps", - rtcp_bytes_per_sec.average * 8); - RTC_LOG(LS_INFO) << "WebRTC.Call.RtcpBitrateReceivedInBps, " - << rtcp_bytes_per_sec.ToStringWithMultiplier(8); - } - AggregatedStats recv_bytes_per_sec = - received_bytes_per_second_counter_.GetStats(); - if (recv_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.BitrateReceivedInKbps", - recv_bytes_per_sec.average * 8 / 1000); - RTC_LOG(LS_INFO) << "WebRTC.Call.BitrateReceivedInBps, " - << recv_bytes_per_sec.ToStringWithMultiplier(8); - } -} - PacketReceiver* Call::Receiver() { return this; } @@ -804,7 +893,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( AudioSendStream* send_stream = new AudioSendStream( clock_, config, config_.audio_state, task_queue_factory_, - module_process_thread_->process_thread(), transport_send_ptr_, + module_process_thread_->process_thread(), transport_send_.get(), bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(), suspended_rtp_state); RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == @@ -814,7 +903,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( // TODO(bugs.webrtc.org/11993): call AssociateSendStream and // UpdateAggregateNetworkState asynchronously on the network thread. for (AudioReceiveStream* stream : audio_receive_streams_) { - if (stream->config().rtp.local_ssrc == config.rtp.ssrc) { + if (stream->local_ssrc() == config.rtp.ssrc) { stream->AssociateSendStream(send_stream); } } @@ -842,7 +931,7 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { // TODO(bugs.webrtc.org/11993): call AssociateSendStream and // UpdateAggregateNetworkState asynchronously on the network thread. for (AudioReceiveStream* stream : audio_receive_streams_) { - if (stream->config().rtp.local_ssrc == ssrc) { + if (stream->local_ssrc() == ssrc) { stream->AssociateSendStream(nullptr); } } @@ -860,20 +949,21 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>( CreateRtcLogStreamConfig(config))); - // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream| - // and |audio_receiver_controller_| out of AudioReceiveStream construction and - // set it up asynchronously on the network thread (the registration and - // |audio_receiver_controller_| need to live on the network thread). AudioReceiveStream* receive_stream = new AudioReceiveStream( - clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(), + clock_, transport_send_->packet_router(), module_process_thread_->process_thread(), config_.neteq_factory, config, config_.audio_state, event_log_); + audio_receive_streams_.insert(receive_stream); + + // TODO(bugs.webrtc.org/11993): Make the registration on the network thread + // (asynchronously). The registration and `audio_receiver_controller_` need + // to live on the network thread. + receive_stream->RegisterWithTransport(&audio_receiver_controller_); // TODO(bugs.webrtc.org/11993): Update the below on the network thread. // We could possibly set up the audio_receiver_controller_ association up // as part of the async setup. - receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config)); - audio_receive_streams_.insert(receive_stream); + receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream); ConfigureSync(config.sync_group); @@ -894,20 +984,22 @@ void Call::DestroyAudioReceiveStream( webrtc::internal::AudioReceiveStream* audio_receive_stream = static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream); + // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync + // and UpdateAggregateNetworkState on the network thread. The call to + // `UnregisterFromTransport` should also happen on the network thread. + audio_receive_stream->UnregisterFromTransport(); + + uint32_t ssrc = audio_receive_stream->remote_ssrc(); const AudioReceiveStream::Config& config = audio_receive_stream->config(); - uint32_t ssrc = config.rtp.remote_ssrc; - receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) + receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config.rtp)) ->RemoveStream(ssrc); - // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync - // and UpdateAggregateNetworkState on the network thread. audio_receive_streams_.erase(audio_receive_stream); - const std::string& sync_group = audio_receive_stream->config().sync_group; - const auto it = sync_stream_mapping_.find(sync_group); + const auto it = sync_stream_mapping_.find(config.sync_group); if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) { sync_stream_mapping_.erase(it); - ConfigureSync(sync_group); + ConfigureSync(config.sync_group); } receive_rtp_config_.erase(ssrc); @@ -942,7 +1034,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( VideoSendStream* send_stream = new VideoSendStream( clock_, num_cpu_cores_, module_process_thread_->process_thread(), - task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_ptr_, + task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_.get(), bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_, std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_, suspended_video_payload_states_, std::move(fec_controller)); @@ -952,6 +1044,8 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( video_send_ssrcs_[ssrc] = send_stream; } video_send_streams_.insert(send_stream); + video_send_streams_empty_.store(false, std::memory_order_relaxed); + // Forward resources that were previously added to the call to the new stream. for (const auto& resource_forwarder : adaptation_resource_forwarders_) { resource_forwarder->OnCreateVideoSendStream(send_stream); @@ -965,6 +1059,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( webrtc::VideoSendStream* Call::CreateVideoSendStream( webrtc::VideoSendStream::Config config, VideoEncoderConfig encoder_config) { + RTC_DCHECK_RUN_ON(worker_thread_); if (config_.fec_controller_factory) { RTC_LOG(LS_INFO) << "External FEC Controller will be used."; } @@ -981,9 +1076,12 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) { RTC_DCHECK(send_stream != nullptr); RTC_DCHECK_RUN_ON(worker_thread_); - send_stream->Stop(); - - VideoSendStream* send_stream_impl = nullptr; + VideoSendStream* send_stream_impl = + static_cast<VideoSendStream*>(send_stream); + VideoSendStream::RtpStateMap rtp_states; + VideoSendStream::RtpPayloadStateMap rtp_payload_states; + send_stream_impl->StopPermanentlyAndGetRtpStates(&rtp_states, + &rtp_payload_states); auto it = video_send_ssrcs_.begin(); while (it != video_send_ssrcs_.end()) { @@ -994,18 +1092,15 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) { ++it; } } + // Stop forwarding resources to the stream being destroyed. for (const auto& resource_forwarder : adaptation_resource_forwarders_) { resource_forwarder->OnDestroyVideoSendStream(send_stream_impl); } video_send_streams_.erase(send_stream_impl); + if (video_send_streams_.empty()) + video_send_streams_empty_.store(true, std::memory_order_relaxed); - RTC_CHECK(send_stream_impl != nullptr); - - VideoSendStream::RtpStateMap rtp_states; - VideoSendStream::RtpPayloadStateMap rtp_payload_states; - send_stream_impl->StopPermanentlyAndGetRtpStates(&rtp_states, - &rtp_payload_states); for (const auto& kv : rtp_states) { suspended_video_send_ssrcs_[kv.first] = kv.second; } @@ -1014,6 +1109,8 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) { } UpdateAggregateNetworkState(); + // TODO(tommi): consider deleting on the same thread as runs + // StopPermanentlyAndGetRtpStates. delete send_stream_impl; } @@ -1032,10 +1129,13 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( // and set it up asynchronously on the network thread (the registration and // |video_receiver_controller_| need to live on the network thread). VideoReceiveStream2* receive_stream = new VideoReceiveStream2( - task_queue_factory_, worker_thread_, &video_receiver_controller_, - num_cpu_cores_, transport_send_ptr_->packet_router(), - std::move(configuration), module_process_thread_->process_thread(), - call_stats_.get(), clock_, new VCMTiming(clock_)); + task_queue_factory_, this, num_cpu_cores_, + transport_send_->packet_router(), std::move(configuration), + module_process_thread_->process_thread(), call_stats_.get(), clock_, + new VCMTiming(clock_)); + // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network + // thread. + receive_stream->RegisterWithTransport(&video_receiver_controller_); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); if (config.rtp.rtx_ssrc) { @@ -1043,9 +1143,9 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( // stream. Since the transport_send_cc negotiation is per payload // type, we may get an incorrect value for the rtx stream, but // that is unlikely to matter in practice. - receive_rtp_config_.emplace(config.rtp.rtx_ssrc, ReceiveRtpConfig(config)); + receive_rtp_config_.emplace(config.rtp.rtx_ssrc, receive_stream); } - receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config)); + receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream); video_receive_streams_.insert(receive_stream); ConfigureSync(config.sync_group); @@ -1063,6 +1163,9 @@ void Call::DestroyVideoReceiveStream( RTC_DCHECK(receive_stream != nullptr); VideoReceiveStream2* receive_stream_impl = static_cast<VideoReceiveStream2*>(receive_stream); + // TODO(bugs.webrtc.org/11993): Unregister on the network thread. + receive_stream_impl->UnregisterFromTransport(); + const VideoReceiveStream::Config& config = receive_stream_impl->config(); // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a @@ -1074,7 +1177,7 @@ void Call::DestroyVideoReceiveStream( video_receive_streams_.erase(receive_stream_impl); ConfigureSync(config.sync_group); - receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) + receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config.rtp)) ->RemoveStream(config.rtp.remote_ssrc); UpdateAggregateNetworkState(); @@ -1097,12 +1200,16 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( // OnRtpPacket until the constructor is finished and the object is // in a valid state, since OnRtpPacket runs on the same thread. receive_stream = new FlexfecReceiveStreamImpl( - clock_, &video_receiver_controller_, config, recovered_packet_receiver, - call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread()); + clock_, config, recovered_packet_receiver, call_stats_->AsRtcpRttStats(), + module_process_thread_->process_thread()); - RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == + // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network + // thread. + receive_stream->RegisterWithTransport(&video_receiver_controller_); + + RTC_DCHECK(receive_rtp_config_.find(config.rtp.remote_ssrc) == receive_rtp_config_.end()); - receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config)); + receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream); // TODO(brandtr): Store config in RtcEventLog here. @@ -1113,15 +1220,19 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream"); RTC_DCHECK_RUN_ON(worker_thread_); + FlexfecReceiveStreamImpl* receive_stream_impl = + static_cast<FlexfecReceiveStreamImpl*>(receive_stream); + // TODO(bugs.webrtc.org/11993): Unregister on the network thread. + receive_stream_impl->UnregisterFromTransport(); + RTC_DCHECK(receive_stream != nullptr); - const FlexfecReceiveStream::Config& config = receive_stream->GetConfig(); - uint32_t ssrc = config.remote_ssrc; - receive_rtp_config_.erase(ssrc); + const FlexfecReceiveStream::RtpConfig& rtp = receive_stream->rtp_config(); + receive_rtp_config_.erase(rtp.remote_ssrc); // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be // destroyed. - receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) - ->RemoveStream(ssrc); + receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(rtp)) + ->RemoveStream(rtp.remote_ssrc); delete receive_stream; } @@ -1137,7 +1248,7 @@ void Call::AddAdaptationResource(rtc::scoped_refptr<Resource> resource) { } RtpTransportControllerSendInterface* Call::GetTransportControllerSend() { - return transport_send_ptr_; + return transport_send_.get(); } Call::Stats Call::GetStats() const { @@ -1147,7 +1258,7 @@ Call::Stats Call::GetStats() const { // TODO(srte): It is unclear if we only want to report queues if network is // available. stats.pacer_delay_ms = - aggregate_network_up_ ? transport_send_ptr_->GetPacerQueuingDelayMs() : 0; + aggregate_network_up_ ? transport_send_->GetPacerQueuingDelayMs() : 0; stats.rtt_ms = call_stats_->LastProcessedRtt(); @@ -1157,14 +1268,16 @@ Call::Stats Call::GetStats() const { receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate( &ssrcs, &recv_bandwidth); stats.recv_bandwidth_bps = recv_bandwidth; - stats.send_bandwidth_bps = last_bandwidth_bps_; - stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_; + stats.send_bandwidth_bps = + last_bandwidth_bps_.load(std::memory_order_relaxed); + stats.max_padding_bitrate_bps = + configured_max_padding_bitrate_bps_.load(std::memory_order_relaxed); return stats; } const WebRtcKeyValueConfig& Call::trials() const { - return *config_.trials; + return trials_; } TaskQueueBase* Call::network_thread() const { @@ -1246,7 +1359,28 @@ void Call::UpdateAggregateNetworkState() { } aggregate_network_up_ = aggregate_network_up; - transport_send_ptr_->OnNetworkAvailability(aggregate_network_up); + transport_send_->OnNetworkAvailability(aggregate_network_up); +} + +void Call::OnLocalSsrcUpdated(webrtc::AudioReceiveStream& stream, + uint32_t local_ssrc) { + RTC_DCHECK_RUN_ON(worker_thread_); + webrtc::internal::AudioReceiveStream& receive_stream = + static_cast<webrtc::internal::AudioReceiveStream&>(stream); + + receive_stream.SetLocalSsrc(local_ssrc); + auto it = audio_send_ssrcs_.find(local_ssrc); + receive_stream.AssociateSendStream(it != audio_send_ssrcs_.end() ? it->second + : nullptr); +} + +void Call::OnUpdateSyncGroup(webrtc::AudioReceiveStream& stream, + const std::string& sync_group) { + RTC_DCHECK_RUN_ON(worker_thread_); + webrtc::internal::AudioReceiveStream& receive_stream = + static_cast<webrtc::internal::AudioReceiveStream&>(stream); + receive_stream.SetSyncGroup(sync_group); + ConfigureSync(sync_group); } void Call::OnSentPacket(const rtc::SentPacket& sent_packet) { @@ -1258,56 +1392,47 @@ void Call::OnSentPacket(const rtc::SentPacket& sent_packet) { // implementations that either just do a PostTask or use locking. video_send_delay_stats_->OnSentPacket(sent_packet.packet_id, clock_->TimeInMilliseconds()); - transport_send_ptr_->OnSentPacket(sent_packet); + transport_send_->OnSentPacket(sent_packet); } void Call::OnStartRateUpdate(DataRate start_rate) { - RTC_DCHECK_RUN_ON(send_transport_queue()); + RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_); bitrate_allocator_->UpdateStartRate(start_rate.bps<uint32_t>()); } void Call::OnTargetTransferRate(TargetTransferRate msg) { - RTC_DCHECK_RUN_ON(send_transport_queue()); + RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_); uint32_t target_bitrate_bps = msg.target_rate.bps(); // For controlling the rate of feedback messages. receive_side_cc_.OnBitrateChanged(target_bitrate_bps); bitrate_allocator_->OnNetworkEstimateChanged(msg); - worker_thread_->PostTask( - ToQueuedTask(task_safety_, [this, target_bitrate_bps]() { - RTC_DCHECK_RUN_ON(worker_thread_); - last_bandwidth_bps_ = target_bitrate_bps; - - // Ignore updates if bitrate is zero (the aggregate network state is - // down) or if we're not sending video. - if (target_bitrate_bps == 0 || video_send_streams_.empty()) { - estimated_send_bitrate_kbps_counter_.ProcessAndPause(); - pacer_bitrate_kbps_counter_.ProcessAndPause(); - return; - } + last_bandwidth_bps_.store(target_bitrate_bps, std::memory_order_relaxed); - estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000); - // Pacer bitrate may be higher than bitrate estimate if enforcing min - // bitrate. - uint32_t pacer_bitrate_bps = - std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_); - pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000); - })); + // Ignore updates if bitrate is zero (the aggregate network state is + // down) or if we're not sending video. + // Using |video_send_streams_empty_| is racy but as the caller can't + // reasonably expect synchronize with changes in |video_send_streams_| (being + // on |send_transport_sequence_checker|), we can avoid a PostTask this way. + if (target_bitrate_bps == 0 || + video_send_streams_empty_.load(std::memory_order_relaxed)) { + send_stats_.PauseSendAndPacerBitrateCounters(); + } else { + send_stats_.AddTargetBitrateSample(target_bitrate_bps); + } } void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) { - RTC_DCHECK_RUN_ON(send_transport_queue()); + RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_); transport_send_ptr_->SetAllocatedSendBitrateLimits(limits); - - worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, limits]() { - RTC_DCHECK_RUN_ON(worker_thread_); - min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps(); - configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps(); - })); + send_stats_.SetMinAllocatableRate(limits); + configured_max_padding_bitrate_bps_.store(limits.max_padding_rate.bps(), + std::memory_order_relaxed); } +// RTC_RUN_ON(worker_thread_) void Call::ConfigureSync(const std::string& sync_group) { // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. // Set sync only if there was no previous one. @@ -1359,56 +1484,62 @@ void Call::ConfigureSync(const std::string& sync_group) { } } -PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type, - const uint8_t* packet, - size_t length) { +// RTC_RUN_ON(network_thread_) +void Call::DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) { TRACE_EVENT0("webrtc", "Call::DeliverRtcp"); - // TODO(pbos): Make sure it's a valid packet. - // Return DELIVERY_UNKNOWN_SSRC if it can be determined that - // there's no receiver of the packet. - if (received_bytes_per_second_counter_.HasSample()) { - // First RTP packet has been received. - received_bytes_per_second_counter_.Add(static_cast<int>(length)); - received_rtcp_bytes_per_second_counter_.Add(static_cast<int>(length)); - } - bool rtcp_delivered = false; - if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { - for (VideoReceiveStream2* stream : video_receive_streams_) { - if (stream->DeliverRtcp(packet, length)) - rtcp_delivered = true; - } - } - if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) { - for (AudioReceiveStream* stream : audio_receive_streams_) { - stream->DeliverRtcp(packet, length); - rtcp_delivered = true; - } - } - if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { - for (VideoSendStream* stream : video_send_streams_) { - stream->DeliverRtcp(packet, length); - rtcp_delivered = true; - } - } - if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) { - for (auto& kv : audio_send_ssrcs_) { - kv.second->DeliverRtcp(packet, length); - rtcp_delivered = true; - } - } - if (rtcp_delivered) { - event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>( - rtc::MakeArrayView(packet, length))); - } + // TODO(bugs.webrtc.org/11993): This DCHECK is here just to maintain the + // invariant that currently the only call path to this function is via + // `PeerConnection::InitializeRtcpCallback()`. DeliverRtp on the other hand + // gets called via the channel classes and + // WebRtc[Audio|Video]Channel's `OnPacketReceived`. We'll remove the + // PeerConnection involvement as well as + // `JsepTransportController::OnRtcpPacketReceived_n` and `rtcp_handler` + // and make sure that the flow of packets is consistent from the + // `RtpTransport` class, via the *Channel and *Engine classes and into Call. + // This way we'll also know more about the context of the packet. + RTC_DCHECK_EQ(media_type, MediaType::ANY); + + // TODO(bugs.webrtc.org/11993): This should execute directly on the network + // thread. + worker_thread_->PostTask( + ToQueuedTask(task_safety_, [this, packet = std::move(packet)]() { + RTC_DCHECK_RUN_ON(worker_thread_); + + receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size())); + bool rtcp_delivered = false; + for (VideoReceiveStream2* stream : video_receive_streams_) { + if (stream->DeliverRtcp(packet.cdata(), packet.size())) + rtcp_delivered = true; + } + + for (AudioReceiveStream* stream : audio_receive_streams_) { + stream->DeliverRtcp(packet.cdata(), packet.size()); + rtcp_delivered = true; + } + + for (VideoSendStream* stream : video_send_streams_) { + stream->DeliverRtcp(packet.cdata(), packet.size()); + rtcp_delivered = true; + } + + for (auto& kv : audio_send_ssrcs_) { + kv.second->DeliverRtcp(packet.cdata(), packet.size()); + rtcp_delivered = true; + } - return rtcp_delivered ? DELIVERY_OK : DELIVERY_PACKET_ERROR; + if (rtcp_delivered) { + event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>( + rtc::MakeArrayView(packet.cdata(), packet.size()))); + } + })); } PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) { TRACE_EVENT0("webrtc", "Call::DeliverRtp"); + RTC_DCHECK_NE(media_type, MediaType::ANY); RtpPacketReceived parsed_packet; if (!parsed_packet.Parse(std::move(packet))) @@ -1421,9 +1552,9 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, packet_time_us = receive_time_calculator_->ReconcileReceiveTimes( packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds()); } - parsed_packet.set_arrival_time_ms((packet_time_us + 500) / 1000); + parsed_packet.set_arrival_time(Timestamp::Micros(packet_time_us)); } else { - parsed_packet.set_arrival_time_ms(clock_->TimeInMilliseconds()); + parsed_packet.set_arrival_time(clock_->CurrentTime()); } // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6. @@ -1446,7 +1577,8 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, return DELIVERY_UNKNOWN_SSRC; } - parsed_packet.IdentifyExtensions(it->second.extensions); + parsed_packet.IdentifyExtensions( + RtpHeaderExtensionMap(it->second->rtp_config().extensions)); NotifyBweOfReceivedPacket(parsed_packet, media_type); @@ -1455,29 +1587,19 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, int length = static_cast<int>(parsed_packet.size()); if (media_type == MediaType::AUDIO) { if (audio_receiver_controller_.OnRtpPacket(parsed_packet)) { - received_bytes_per_second_counter_.Add(length); - received_audio_bytes_per_second_counter_.Add(length); + receive_stats_.AddReceivedAudioBytes(length, + parsed_packet.arrival_time()); event_log_->Log( std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet)); - const int64_t arrival_time_ms = parsed_packet.arrival_time_ms(); - if (!first_received_rtp_audio_ms_) { - first_received_rtp_audio_ms_.emplace(arrival_time_ms); - } - last_received_rtp_audio_ms_.emplace(arrival_time_ms); return DELIVERY_OK; } } else if (media_type == MediaType::VIDEO) { parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency); if (video_receiver_controller_.OnRtpPacket(parsed_packet)) { - received_bytes_per_second_counter_.Add(length); - received_video_bytes_per_second_counter_.Add(length); + receive_stats_.AddReceivedVideoBytes(length, + parsed_packet.arrival_time()); event_log_->Log( std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet)); - const int64_t arrival_time_ms = parsed_packet.arrival_time_ms(); - if (!first_received_rtp_video_ms_) { - first_received_rtp_video_ms_.emplace(arrival_time_ms); - } - last_received_rtp_video_ms_.emplace(arrival_time_ms); return DELIVERY_OK; } } @@ -1488,38 +1610,16 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket( MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) { - RTC_DCHECK_RUN_ON(worker_thread_); - - if (IsRtcp(packet.cdata(), packet.size())) - return DeliverRtcp(media_type, packet.cdata(), packet.size()); + if (IsRtcp(packet.cdata(), packet.size())) { + RTC_DCHECK_RUN_ON(network_thread_); + DeliverRtcp(media_type, std::move(packet)); + return DELIVERY_OK; + } + RTC_DCHECK_RUN_ON(worker_thread_); return DeliverRtp(media_type, std::move(packet), packet_time_us); } -void Call::DeliverPacketAsync(MediaType media_type, - rtc::CopyOnWriteBuffer packet, - int64_t packet_time_us, - PacketCallback callback) { - RTC_DCHECK_RUN_ON(network_thread_); - - TaskQueueBase* network_thread = rtc::Thread::Current(); - RTC_DCHECK(network_thread); - - worker_thread_->PostTask(ToQueuedTask( - task_safety_, [this, network_thread, media_type, p = std::move(packet), - packet_time_us, cb = std::move(callback)] { - RTC_DCHECK_RUN_ON(worker_thread_); - DeliveryStatus status = DeliverPacket(media_type, p, packet_time_us); - if (cb) { - network_thread->PostTask( - ToQueuedTask([cb = std::move(cb), status, media_type, - p = std::move(p), packet_time_us]() { - cb(status, media_type, std::move(p), packet_time_us); - })); - } - })); -} - void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. // This method is called synchronously via |OnRtpPacket()| (see DeliverRtp) @@ -1543,29 +1643,31 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { // which is being torn down. return; } - parsed_packet.IdentifyExtensions(it->second.extensions); + parsed_packet.IdentifyExtensions( + RtpHeaderExtensionMap(it->second->rtp_config().extensions)); // TODO(brandtr): Update here when we support protecting audio packets too. parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency); video_receiver_controller_.OnRtpPacket(parsed_packet); } +// RTC_RUN_ON(worker_thread_) void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, MediaType media_type) { auto it = receive_rtp_config_.find(packet.Ssrc()); - bool use_send_side_bwe = - (it != receive_rtp_config_.end()) && it->second.use_send_side_bwe; + bool use_send_side_bwe = (it != receive_rtp_config_.end()) && + UseSendSideBwe(it->second->rtp_config()); RTPHeader header; packet.GetHeader(&header); ReceivedPacket packet_msg; packet_msg.size = DataSize::Bytes(packet.payload_size()); - packet_msg.receive_time = Timestamp::Millis(packet.arrival_time_ms()); + packet_msg.receive_time = packet.arrival_time(); if (header.extension.hasAbsoluteSendTime) { packet_msg.send_time = header.extension.GetAbsoluteSendTimestamp(); } - transport_send_ptr_->OnReceivedPacket(packet_msg); + transport_send_->OnReceivedPacket(packet_msg); if (!use_send_side_bwe && header.extension.hasTransportSequenceNumber) { // Inconsistent configuration of send side BWE. Do nothing. @@ -1581,8 +1683,8 @@ void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, if (media_type == MediaType::VIDEO || (use_send_side_bwe && header.extension.hasTransportSequenceNumber)) { receive_side_cc_.OnReceivedPacket( - packet.arrival_time_ms(), packet.payload_size() + packet.padding_size(), - header); + packet.arrival_time().ms(), + packet.payload_size() + packet.padding_size(), header); } } |