diff options
5 files changed, 67 insertions, 91 deletions
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 78ede7ea22..2a4a549596 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -29,6 +29,7 @@ bool TaskQueueExperimentEnabled() { std::unique_ptr<SendSideCongestionControllerInterface> CreateController( Clock* clock, + rtc::TaskQueue* task_queue, webrtc::RtcEventLog* event_log, PacedSender* pacer, const BitrateConstraints& bitrate_config, @@ -36,7 +37,7 @@ std::unique_ptr<SendSideCongestionControllerInterface> CreateController( if (task_queue_controller) { RTC_LOG(LS_INFO) << "Using TaskQueue based SSCC"; return rtc::MakeUnique<webrtc::webrtc_cc::SendSideCongestionController>( - clock, event_log, pacer, bitrate_config.start_bitrate_bps, + clock, task_queue, event_log, pacer, bitrate_config.start_bitrate_bps, bitrate_config.min_bitrate_bps, bitrate_config.max_bitrate_bps); } RTC_LOG(LS_INFO) << "Using Legacy SSCC"; @@ -59,13 +60,12 @@ RtpTransportControllerSend::RtpTransportControllerSend( bitrate_configurator_(bitrate_config), process_thread_(ProcessThread::Create("SendControllerThread")), observer_(nullptr), - send_side_cc_(CreateController(clock, - event_log, - &pacer_, - bitrate_config, - TaskQueueExperimentEnabled())), task_queue_("rtp_send_controller") { - send_side_cc_ptr_ = send_side_cc_.get(); + // Created after task_queue to be able to post to the task queue internally. + send_side_cc_ = + CreateController(clock, &task_queue_, event_log, &pacer_, bitrate_config, + TaskQueueExperimentEnabled()); + process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE); process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE); process_thread_->Start(); @@ -89,7 +89,7 @@ void RtpTransportControllerSend::OnNetworkChanged(uint32_t bitrate_bps, msg.network_estimate.at_time = msg.at_time; msg.network_estimate.bwe_period = TimeDelta::ms(probing_interval_ms); uint32_t bandwidth_bps; - if (send_side_cc_ptr_->AvailableBandwidth(&bandwidth_bps)) + if (send_side_cc_->AvailableBandwidth(&bandwidth_bps)) msg.network_estimate.bandwidth = DataRate::bps(bandwidth_bps); msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0; msg.network_estimate.round_trip_time = TimeDelta::ms(rtt_ms); diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index d5bc25ed45..67248b22c6 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -93,17 +93,7 @@ class RtpTransportControllerSend final const std::unique_ptr<ProcessThread> process_thread_; rtc::CriticalSection observer_crit_; TargetTransferRateObserver* observer_ RTC_GUARDED_BY(observer_crit_); - // Caches send_side_cc_.get(), to avoid racing with destructor. - // Note that this is declared before send_side_cc_ to ensure that it is not - // invalidated until no more tasks can be running on the send_side_cc_ task - // queue. - // TODO(srte): Remove this when only the task queue based send side congestion - // controller is used and it is no longer accessed synchronously in the - // OnNetworkChanged callback. - SendSideCongestionControllerInterface* send_side_cc_ptr_; - // Declared last since it will issue callbacks from a task queue. Declaring it - // last ensures that it is destroyed first. - const std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_; + std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_; // TODO(perkj): |task_queue_| is supposed to replace |process_thread_|. // |task_queue_| is defined last to ensure all pending tasks are cancelled // and deleted before any other members. diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h index a352cec851..90447e6860 100644 --- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h +++ b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h @@ -66,6 +66,7 @@ class SendSideCongestionController public RtcpBandwidthObserver { public: SendSideCongestionController(const Clock* clock, + rtc::TaskQueue* task_queue, RtcEventLog* event_log, PacedSender* pacer, int start_bitrate_bps, @@ -150,18 +151,18 @@ class SendSideCongestionController void WaitOnTasksForTest(); private: - void MaybeCreateControllers() RTC_RUN_ON(task_queue_ptr_); - void MaybeRecreateControllers() RTC_RUN_ON(task_queue_ptr_); + void MaybeCreateControllers() RTC_RUN_ON(task_queue_); + void MaybeRecreateControllers() RTC_RUN_ON(task_queue_); - void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_ptr_); - void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_ptr_); - void UpdatePacerQueue() RTC_RUN_ON(task_queue_ptr_); + void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_); + void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_); + void UpdatePacerQueue() RTC_RUN_ON(task_queue_); - void UpdateStreamsConfig() RTC_RUN_ON(task_queue_ptr_); + void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); void MaybeUpdateOutstandingData(); void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks, int64_t now_ms) - RTC_RUN_ON(task_queue_ptr_); + RTC_RUN_ON(task_queue_); const Clock* const clock_; // PacedSender is thread safe and doesn't need protection here. @@ -170,57 +171,47 @@ class SendSideCongestionController TransportFeedbackAdapter transport_feedback_adapter_; const std::unique_ptr<NetworkControllerFactoryInterface> - controller_factory_with_feedback_ RTC_GUARDED_BY(task_queue_ptr_); + controller_factory_with_feedback_ RTC_GUARDED_BY(task_queue_); const std::unique_ptr<NetworkControllerFactoryInterface> - controller_factory_fallback_ RTC_GUARDED_BY(task_queue_ptr_); + controller_factory_fallback_ RTC_GUARDED_BY(task_queue_); const std::unique_ptr<PacerController> pacer_controller_ - RTC_GUARDED_BY(task_queue_ptr_); + RTC_GUARDED_BY(task_queue_); std::unique_ptr<send_side_cc_internal::ControlHandler> control_handler_ - RTC_GUARDED_BY(task_queue_ptr_); + RTC_GUARDED_BY(task_queue_); std::unique_ptr<NetworkControllerInterface> controller_ - RTC_GUARDED_BY(task_queue_ptr_); + RTC_GUARDED_BY(task_queue_); - TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_ptr_); + TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_); std::map<uint32_t, RTCPReportBlock> last_report_blocks_ - RTC_GUARDED_BY(task_queue_ptr_); - Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_ptr_); + RTC_GUARDED_BY(task_queue_); + Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_); - NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_ptr_); - NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_ptr_); - StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_ptr_); + NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_); + NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_); + StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_); const bool send_side_bwe_with_overhead_; // Transport overhead is written by OnNetworkRouteChanged and read by // AddPacket. // TODO(srte): Remove atomic when feedback adapter runs on task queue. std::atomic<size_t> transport_overhead_bytes_per_packet_; - bool network_available_ RTC_GUARDED_BY(task_queue_ptr_); - bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_ptr_); - bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_ptr_); + bool network_available_ RTC_GUARDED_BY(task_queue_); + bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_); + bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_); send_side_cc_internal::PeriodicTask* pacer_queue_update_task_ - RTC_GUARDED_BY(task_queue_ptr_); + RTC_GUARDED_BY(task_queue_); send_side_cc_internal::PeriodicTask* controller_task_ - RTC_GUARDED_BY(task_queue_ptr_); + RTC_GUARDED_BY(task_queue_); // Protects access to last_packet_feedback_vector_ in feedback adapter. // TODO(srte): Remove this checker when feedback adapter runs on task queue. rtc::RaceChecker worker_race_; - // Caches task_queue_.get(), to avoid racing with destructor. - // Note that this is declared before task_queue_ to ensure that it is not - // invalidated until no more tasks can be running on the task queue. - rtc::TaskQueue* task_queue_ptr_; - - // Note that moving ownership of the task queue makes it neccessary to make - // sure that there is no outstanding tasks on it using destructed objects. - // This is currently guranteed by using explicit reset in the destructor of - // this class. It is declared last to indicate that it's lifetime is shorter - // than all other members. - std::unique_ptr<rtc::TaskQueue> task_queue_; + rtc::TaskQueue* task_queue_; RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SendSideCongestionController); }; diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc index 38d23f84b4..a55beae93f 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc @@ -305,6 +305,7 @@ rtc::Optional<TargetTransferRate> ControlHandler::last_transfer_rate() { SendSideCongestionController::SendSideCongestionController( const Clock* clock, + rtc::TaskQueue* task_queue, RtcEventLog* event_log, PacedSender* pacer, int start_bitrate_bps, @@ -328,8 +329,7 @@ SendSideCongestionController::SendSideCongestionController( packet_feedback_available_(false), pacer_queue_update_task_(nullptr), controller_task_(nullptr), - task_queue_(MakeUnique<rtc::TaskQueue>("SendSideCCQueue")) { - task_queue_ptr_ = task_queue_.get(); + task_queue_(task_queue) { initial_config_.constraints = ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_); RTC_DCHECK(start_bitrate_bps > 0); @@ -381,13 +381,7 @@ void SendSideCongestionController::MaybeRecreateControllers() { RTC_DCHECK(controller_); } -SendSideCongestionController::~SendSideCongestionController() { - // Must be destructed before any objects used by calls on the task queue. - task_queue_.reset(); - // Singe the task queue has been destructed, it is now safe to reset - // task_queue_raw_ which is only used by tasks on the task queue. - task_queue_ptr_ = nullptr; -} +SendSideCongestionController::~SendSideCongestionController() = default; void SendSideCongestionController::RegisterPacketFeedbackObserver( PacketFeedbackObserver* observer) { @@ -402,7 +396,7 @@ void SendSideCongestionController::DeRegisterPacketFeedbackObserver( void SendSideCongestionController::RegisterNetworkObserver( NetworkChangedObserver* observer) { task_queue_->PostTask([this, observer]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); RTC_DCHECK(observer_ == nullptr); observer_ = observer; MaybeCreateControllers(); @@ -415,7 +409,7 @@ void SendSideCongestionController::SetBweBitrates(int min_bitrate_bps, TargetRateConstraints constraints = ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_); task_queue_->PostTask([this, constraints, start_bitrate_bps]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); if (controller_) { control_handler_->PostUpdates( controller_->OnTargetRateConstraints(constraints)); @@ -433,7 +427,7 @@ void SendSideCongestionController::SetAllocatedSendBitrateLimits( int64_t max_total_bitrate_bps) { task_queue_->PostTask([this, min_send_bitrate_bps, max_padding_bitrate_bps, max_total_bitrate_bps]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps); streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps); streams_config_.max_total_allocated_bitrate = @@ -460,7 +454,7 @@ void SendSideCongestionController::OnNetworkRouteChanged( if (start_bitrate_bps > 0) msg.starting_rate = DataRate::bps(start_bitrate_bps); task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); if (controller_) { control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg)); } else { @@ -479,7 +473,7 @@ bool SendSideCongestionController::AvailableBandwidth( // running on the task queue. // TODO(srte): Remove this function when RtpTransportControllerSend stops // calling it. - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); if (!control_handler_) { return false; } @@ -500,7 +494,7 @@ RtcpBandwidthObserver* SendSideCongestionController::GetBandwidthObserver() { void SendSideCongestionController::SetPerPacketFeedbackAvailable( bool available) { task_queue_->PostTask([this, available]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); packet_feedback_available_ = available; MaybeRecreateControllers(); }); @@ -508,7 +502,7 @@ void SendSideCongestionController::SetPerPacketFeedbackAvailable( void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) { task_queue_->PostTask([this, enable]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); streams_config_.requests_alr_probing = enable; UpdateStreamsConfig(); }); @@ -533,7 +527,7 @@ void SendSideCongestionController::SignalNetworkState(NetworkState state) { msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.network_available = state == kNetworkUp; task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); network_available_ = msg.network_available; if (controller_) { control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg)); @@ -560,7 +554,7 @@ void SendSideCongestionController::OnSentPacket( msg.size = DataSize::bytes(packet->payload_size); msg.send_time = Timestamp::ms(packet->send_time_ms); task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); if (controller_) control_handler_->PostUpdates(controller_->OnSentPacket(msg)); }); @@ -575,7 +569,7 @@ void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms, report.round_trip_time = TimeDelta::ms(avg_rtt_ms); report.smoothed = true; task_queue_->PostTask([this, report]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); if (controller_) control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report)); }); @@ -594,9 +588,9 @@ void SendSideCongestionController::StartProcessPeriodicTasks() { if (!periodic_tasks_enabled_) return; if (!pacer_queue_update_task_) { - pacer_queue_update_task_ = StartPeriodicTask( - task_queue_ptr_, PacerQueueUpdateIntervalMs, [this]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + pacer_queue_update_task_ = + StartPeriodicTask(task_queue_, PacerQueueUpdateIntervalMs, [this]() { + RTC_DCHECK_RUN_ON(task_queue_); UpdatePacerQueue(); }); } @@ -611,8 +605,8 @@ void SendSideCongestionController::StartProcessPeriodicTasks() { // queue is destroyed or some time after Stop() is called, whichever comes // first. controller_task_ = - StartPeriodicTask(task_queue_ptr_, process_interval_.ms(), [this]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + StartPeriodicTask(task_queue_, process_interval_.ms(), [this]() { + RTC_DCHECK_RUN_ON(task_queue_); UpdateControllerWithTimeInterval(); }); } @@ -668,7 +662,7 @@ void SendSideCongestionController::OnTransportFeedback( msg.data_in_flight = DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); if (controller_) control_handler_->PostUpdates( controller_->OnTransportPacketsFeedback(msg)); @@ -680,7 +674,7 @@ void SendSideCongestionController::MaybeUpdateOutstandingData() { DataSize in_flight_data = DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); task_queue_->PostTask([this, in_flight_data]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); pacer_controller_->OnOutstandingData(in_flight_data); }); } @@ -693,7 +687,7 @@ SendSideCongestionController::GetTransportFeedbackVector() const { void SendSideCongestionController::PostPeriodicTasksForTest() { task_queue_->PostTask([this]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); UpdateControllerWithTimeInterval(); UpdatePacerQueue(); }); @@ -707,7 +701,7 @@ void SendSideCongestionController::WaitOnTasksForTest() { void SendSideCongestionController::SetPacingFactor(float pacing_factor) { task_queue_->PostTask([this, pacing_factor]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); streams_config_.pacing_factor = pacing_factor; UpdateStreamsConfig(); }); @@ -715,7 +709,7 @@ void SendSideCongestionController::SetPacingFactor(float pacing_factor) { void SendSideCongestionController::DisablePeriodicTasks() { task_queue_->PostTask([this]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); periodic_tasks_enabled_ = false; }); } @@ -726,7 +720,7 @@ void SendSideCongestionController::OnReceivedEstimatedBitrate( msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.bandwidth = DataRate::bps(bitrate); task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); if (controller_) control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg)); }); @@ -737,12 +731,12 @@ void SendSideCongestionController::OnReceivedRtcpReceiverReport( int64_t rtt_ms, int64_t now_ms) { task_queue_->PostTask([this, report_blocks, now_ms]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); }); task_queue_->PostTask([this, now_ms, rtt_ms]() { - RTC_DCHECK_RUN_ON(task_queue_ptr_); + RTC_DCHECK_RUN_ON(task_queue_); RoundTripTimeUpdate report; report.receive_time = Timestamp::ms(now_ms); report.round_trip_time = TimeDelta::ms(rtt_ms); diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc index 56f010f594..0f598082e4 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc @@ -74,10 +74,10 @@ class SendSideCongestionControllerTest : public ::testing::Test { SetPacingRates(kInitialBitrateBps * kDefaultPacingRate, _)); EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 3)); EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 5)); - + task_queue_ = rtc::MakeUnique<rtc::TaskQueue>("SSCC Test"); controller_.reset(new SendSideCongestionControllerForTest( - &clock_, &event_log_, pacer_.get(), kInitialBitrateBps, 0, - 5 * kInitialBitrateBps)); + &clock_, task_queue_.get(), &event_log_, pacer_.get(), + kInitialBitrateBps, 0, 5 * kInitialBitrateBps)); controller_->DisablePeriodicTasks(); controller_->RegisterNetworkObserver(&observer_); controller_->SignalNetworkState(NetworkState::kNetworkUp); @@ -94,9 +94,10 @@ class SendSideCongestionControllerTest : public ::testing::Test { void TargetBitrateTrackingSetup() { bandwidth_observer_ = nullptr; pacer_.reset(new NiceMock<MockPacedSender>()); + task_queue_ = rtc::MakeUnique<rtc::TaskQueue>("SSCC Test"); controller_.reset(new SendSideCongestionControllerForTest( - &clock_, &event_log_, pacer_.get(), kInitialBitrateBps, 0, - 5 * kInitialBitrateBps)); + &clock_, task_queue_.get(), &event_log_, pacer_.get(), + kInitialBitrateBps, 0, 5 * kInitialBitrateBps)); controller_->DisablePeriodicTasks(); controller_->RegisterNetworkObserver(&target_bitrate_observer_); controller_->SignalNetworkState(NetworkState::kNetworkUp); @@ -166,8 +167,8 @@ class SendSideCongestionControllerTest : public ::testing::Test { PacketRouter packet_router_; std::unique_ptr<NiceMock<MockPacedSender>> pacer_; std::unique_ptr<SendSideCongestionControllerForTest> controller_; - rtc::Optional<uint32_t> target_bitrate_bps_; + std::unique_ptr<rtc::TaskQueue> task_queue_; }; TEST_F(SendSideCongestionControllerTest, OnNetworkChanged) { |