aboutsummaryrefslogtreecommitdiff
path: root/call/call.cc
diff options
context:
space:
mode:
Diffstat (limited to 'call/call.cc')
-rw-r--r--call/call.cc854
1 files changed, 478 insertions, 376 deletions
diff --git a/call/call.cc b/call/call.cc
index 437f4df2fb..f4a7d7cc9e 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -13,12 +13,14 @@
#include <string.h>
#include <algorithm>
+#include <atomic>
#include <map>
#include <memory>
#include <set>
#include <utility>
#include <vector>
+#include "absl/functional/bind_front.h"
#include "absl/types/optional.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/sequence_checker.h"
@@ -32,6 +34,7 @@
#include "call/receive_time_calculator.h"
#include "call/rtp_stream_receiver_controller.h"
#include "call/rtp_transport_controller_send.h"
+#include "call/rtp_transport_controller_send_factory.h"
#include "call/version.h"
#include "logging/rtc_event_log/events/rtc_event_audio_receive_stream_config.h"
#include "logging/rtc_event_log/events/rtc_event_rtcp_packet_incoming.h"
@@ -78,12 +81,10 @@ bool SendPeriodicFeedback(const std::vector<RtpExtension>& extensions) {
return true;
}
-// TODO(nisse): This really begs for a shared context struct.
-bool UseSendSideBwe(const std::vector<RtpExtension>& extensions,
- bool transport_cc) {
- if (!transport_cc)
+bool UseSendSideBwe(const ReceiveStream::RtpConfig& rtp) {
+ if (!rtp.transport_cc)
return false;
- for (const auto& extension : extensions) {
+ for (const auto& extension : rtp.extensions) {
if (extension.uri == RtpExtension::kTransportSequenceNumberUri ||
extension.uri == RtpExtension::kTransportSequenceNumberV2Uri)
return true;
@@ -91,18 +92,6 @@ bool UseSendSideBwe(const std::vector<RtpExtension>& extensions,
return false;
}
-bool UseSendSideBwe(const VideoReceiveStream::Config& config) {
- return UseSendSideBwe(config.rtp.extensions, config.rtp.transport_cc);
-}
-
-bool UseSendSideBwe(const AudioReceiveStream::Config& config) {
- return UseSendSideBwe(config.rtp.extensions, config.rtp.transport_cc);
-}
-
-bool UseSendSideBwe(const FlexfecReceiveStream::Config& config) {
- return UseSendSideBwe(config.rtp_header_extensions, config.transport_cc);
-}
-
const int* FindKeyByValue(const std::map<int, int>& m, int v) {
for (const auto& kv : m) {
if (kv.second == v)
@@ -167,34 +156,6 @@ TaskQueueBase* GetCurrentTaskQueueOrThread() {
return current;
}
-// Called from the destructor of Call to report the collected send histograms.
-void UpdateSendHistograms(Timestamp now,
- Timestamp first_sent_packet,
- AvgCounter& estimated_send_bitrate_kbps_counter,
- AvgCounter& pacer_bitrate_kbps_counter) {
- TimeDelta elapsed = now - first_sent_packet;
- if (elapsed.seconds() < metrics::kMinRunTimeInSeconds)
- return;
-
- const int kMinRequiredPeriodicSamples = 5;
- AggregatedStats send_bitrate_stats =
- estimated_send_bitrate_kbps_counter.ProcessAndGetStats();
- if (send_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) {
- RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.EstimatedSendBitrateInKbps",
- send_bitrate_stats.average);
- RTC_LOG(LS_INFO) << "WebRTC.Call.EstimatedSendBitrateInKbps, "
- << send_bitrate_stats.ToString();
- }
- AggregatedStats pacer_bitrate_stats =
- pacer_bitrate_kbps_counter.ProcessAndGetStats();
- if (pacer_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) {
- RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.PacerBitrateInKbps",
- pacer_bitrate_stats.average);
- RTC_LOG(LS_INFO) << "WebRTC.Call.PacerBitrateInKbps, "
- << pacer_bitrate_stats.ToString();
- }
-}
-
} // namespace
namespace internal {
@@ -299,10 +260,6 @@ class Call final : public webrtc::Call,
DeliveryStatus DeliverPacket(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
- void DeliverPacketAsync(MediaType media_type,
- rtc::CopyOnWriteBuffer packet,
- int64_t packet_time_us,
- PacketCallback callback) override;
// Implements RecoveredPacketReceiver.
void OnRecoveredPacket(const uint8_t* packet, size_t length) override;
@@ -312,6 +269,12 @@ class Call final : public webrtc::Call,
void OnAudioTransportOverheadChanged(
int transport_overhead_per_packet) override;
+ void OnLocalSsrcUpdated(webrtc::AudioReceiveStream& stream,
+ uint32_t local_ssrc) override;
+
+ void OnUpdateSyncGroup(webrtc::AudioReceiveStream& stream,
+ const std::string& sync_group) override;
+
void OnSentPacket(const rtc::SentPacket& sent_packet) override;
// Implements TargetTransferRateObserver,
@@ -324,45 +287,96 @@ class Call final : public webrtc::Call,
void SetClientBitratePreferences(const BitrateSettings& preferences) override;
private:
- DeliveryStatus DeliverRtcp(MediaType media_type,
- const uint8_t* packet,
- size_t length)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
+ // Thread-compatible class that collects received packet stats and exposes
+ // them as UMA histograms on destruction.
+ class ReceiveStats {
+ public:
+ explicit ReceiveStats(Clock* clock);
+ ~ReceiveStats();
+
+ void AddReceivedRtcpBytes(int bytes);
+ void AddReceivedAudioBytes(int bytes, webrtc::Timestamp arrival_time);
+ void AddReceivedVideoBytes(int bytes, webrtc::Timestamp arrival_time);
+
+ private:
+ RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_;
+ RateCounter received_bytes_per_second_counter_
+ RTC_GUARDED_BY(sequence_checker_);
+ RateCounter received_audio_bytes_per_second_counter_
+ RTC_GUARDED_BY(sequence_checker_);
+ RateCounter received_video_bytes_per_second_counter_
+ RTC_GUARDED_BY(sequence_checker_);
+ RateCounter received_rtcp_bytes_per_second_counter_
+ RTC_GUARDED_BY(sequence_checker_);
+ absl::optional<Timestamp> first_received_rtp_audio_timestamp_
+ RTC_GUARDED_BY(sequence_checker_);
+ absl::optional<Timestamp> last_received_rtp_audio_timestamp_
+ RTC_GUARDED_BY(sequence_checker_);
+ absl::optional<Timestamp> first_received_rtp_video_timestamp_
+ RTC_GUARDED_BY(sequence_checker_);
+ absl::optional<Timestamp> last_received_rtp_video_timestamp_
+ RTC_GUARDED_BY(sequence_checker_);
+ };
+
+ // Thread-compatible class that collects sent packet stats and exposes
+ // them as UMA histograms on destruction, provided SetFirstPacketTime was
+ // called with a non-empty packet timestamp before the destructor.
+ class SendStats {
+ public:
+ explicit SendStats(Clock* clock);
+ ~SendStats();
+
+ void SetFirstPacketTime(absl::optional<Timestamp> first_sent_packet_time);
+ void PauseSendAndPacerBitrateCounters();
+ void AddTargetBitrateSample(uint32_t target_bitrate_bps);
+ void SetMinAllocatableRate(BitrateAllocationLimits limits);
+
+ private:
+ RTC_NO_UNIQUE_ADDRESS SequenceChecker destructor_sequence_checker_;
+ RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_;
+ Clock* const clock_ RTC_GUARDED_BY(destructor_sequence_checker_);
+ AvgCounter estimated_send_bitrate_kbps_counter_
+ RTC_GUARDED_BY(sequence_checker_);
+ AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(sequence_checker_);
+ uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(sequence_checker_){
+ 0};
+ absl::optional<Timestamp> first_sent_packet_time_
+ RTC_GUARDED_BY(destructor_sequence_checker_);
+ };
+
+ void DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet)
+ RTC_RUN_ON(network_thread_);
DeliveryStatus DeliverRtp(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
- int64_t packet_time_us)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
- void ConfigureSync(const std::string& sync_group)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
+ int64_t packet_time_us) RTC_RUN_ON(worker_thread_);
+ void ConfigureSync(const std::string& sync_group) RTC_RUN_ON(worker_thread_);
void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
MediaType media_type)
- RTC_SHARED_LOCKS_REQUIRED(worker_thread_);
+ RTC_RUN_ON(worker_thread_);
- void UpdateReceiveHistograms();
void UpdateAggregateNetworkState();
// Ensure that necessary process threads are started, and any required
// callbacks have been registered.
- void EnsureStarted() RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
-
- rtc::TaskQueue* send_transport_queue() const {
- return transport_send_ptr_->GetWorkerQueue();
- }
+ void EnsureStarted() RTC_RUN_ON(worker_thread_);
Clock* const clock_;
TaskQueueFactory* const task_queue_factory_;
TaskQueueBase* const worker_thread_;
TaskQueueBase* const network_thread_;
+ RTC_NO_UNIQUE_ADDRESS SequenceChecker send_transport_sequence_checker_;
const int num_cpu_cores_;
const rtc::scoped_refptr<SharedModuleThread> module_process_thread_;
const std::unique_ptr<CallStats> call_stats_;
const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
- Call::Config config_;
+ const Call::Config config_ RTC_GUARDED_BY(worker_thread_);
+ // Maps to config_.trials, can be used from any thread via `trials()`.
+ const WebRtcKeyValueConfig& trials_;
- NetworkState audio_network_state_;
- NetworkState video_network_state_;
+ NetworkState audio_network_state_ RTC_GUARDED_BY(worker_thread_);
+ NetworkState video_network_state_ RTC_GUARDED_BY(worker_thread_);
// TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the
// network thread.
bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
@@ -380,39 +394,17 @@ class Call final : public webrtc::Call,
// TODO(nisse): Should eventually be injected at creation,
// with a single object in the bundled case.
- RtpStreamReceiverController audio_receiver_controller_;
- RtpStreamReceiverController video_receiver_controller_;
+ RtpStreamReceiverController audio_receiver_controller_
+ RTC_GUARDED_BY(worker_thread_);
+ RtpStreamReceiverController video_receiver_controller_
+ RTC_GUARDED_BY(worker_thread_);
// This extra map is used for receive processing which is
// independent of media type.
- // TODO(nisse): In the RTP transport refactoring, we should have a
- // single mapping from ssrc to a more abstract receive stream, with
- // accessor methods for all configuration we need at this level.
- struct ReceiveRtpConfig {
- explicit ReceiveRtpConfig(const webrtc::AudioReceiveStream::Config& config)
- : extensions(config.rtp.extensions),
- use_send_side_bwe(UseSendSideBwe(config)) {}
- explicit ReceiveRtpConfig(const webrtc::VideoReceiveStream::Config& config)
- : extensions(config.rtp.extensions),
- use_send_side_bwe(UseSendSideBwe(config)) {}
- explicit ReceiveRtpConfig(const FlexfecReceiveStream::Config& config)
- : extensions(config.rtp_header_extensions),
- use_send_side_bwe(UseSendSideBwe(config)) {}
-
- // Registered RTP header extensions for each stream. Note that RTP header
- // extensions are negotiated per track ("m= line") in the SDP, but we have
- // no notion of tracks at the Call level. We therefore store the RTP header
- // extensions per SSRC instead, which leads to some storage overhead.
- const RtpHeaderExtensionMap extensions;
- // Set if both RTP extension the RTCP feedback message needed for
- // send side BWE are negotiated.
- const bool use_send_side_bwe;
- };
-
// TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the
// network thread.
- std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
+ std::map<uint32_t, ReceiveStream*> receive_rtp_config_
RTC_GUARDED_BY(worker_thread_);
// Audio and Video send streams are owned by the client that creates them.
@@ -421,6 +413,10 @@ class Call final : public webrtc::Call,
std::map<uint32_t, VideoSendStream*> video_send_ssrcs_
RTC_GUARDED_BY(worker_thread_);
std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(worker_thread_);
+ // True if |video_send_streams_| is empty, false if not. The atomic variable
+ // is used to decide UMA send statistics behavior and enables avoiding a
+ // PostTask().
+ std::atomic<bool> video_send_streams_empty_{true};
// Each forwarder wraps an adaptation resource that was added to the call.
std::vector<std::unique_ptr<ResourceVideoSendStreamForwarder>>
@@ -434,49 +430,41 @@ class Call final : public webrtc::Call,
RtpPayloadStateMap suspended_video_payload_states_
RTC_GUARDED_BY(worker_thread_);
- webrtc::RtcEventLog* event_log_;
-
- // The following members are only accessed (exclusively) from one thread and
- // from the destructor, and therefore doesn't need any explicit
- // synchronization.
- RateCounter received_bytes_per_second_counter_;
- RateCounter received_audio_bytes_per_second_counter_;
- RateCounter received_video_bytes_per_second_counter_;
- RateCounter received_rtcp_bytes_per_second_counter_;
- absl::optional<int64_t> first_received_rtp_audio_ms_;
- absl::optional<int64_t> last_received_rtp_audio_ms_;
- absl::optional<int64_t> first_received_rtp_video_ms_;
- absl::optional<int64_t> last_received_rtp_video_ms_;
-
- uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_);
- // TODO(holmer): Remove this lock once BitrateController no longer calls
- // OnNetworkChanged from multiple threads.
- uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
- uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
- AvgCounter estimated_send_bitrate_kbps_counter_
- RTC_GUARDED_BY(worker_thread_);
- AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(worker_thread_);
+ webrtc::RtcEventLog* const event_log_;
+
+ // TODO(bugs.webrtc.org/11993) ready to move stats access to the network
+ // thread.
+ ReceiveStats receive_stats_ RTC_GUARDED_BY(worker_thread_);
+ SendStats send_stats_ RTC_GUARDED_BY(send_transport_sequence_checker_);
+ // |last_bandwidth_bps_| and |configured_max_padding_bitrate_bps_| being
+ // atomic avoids a PostTask. The variables are used for stats gathering.
+ std::atomic<uint32_t> last_bandwidth_bps_{0};
+ std::atomic<uint32_t> configured_max_padding_bitrate_bps_{0};
ReceiveSideCongestionController receive_side_cc_;
const std::unique_ptr<ReceiveTimeCalculator> receive_time_calculator_;
const std::unique_ptr<SendDelayStats> video_send_delay_stats_;
- const int64_t start_ms_;
+ const Timestamp start_of_call_;
// Note that |task_safety_| needs to be at a greater scope than the task queue
// owned by |transport_send_| since calls might arrive on the network thread
// while Call is being deleted and the task queue is being torn down.
- ScopedTaskSafety task_safety_;
+ const ScopedTaskSafety task_safety_;
// Caches transport_send_.get(), to avoid racing with destructor.
// Note that this is declared before transport_send_ to ensure that it is not
// invalidated until no more tasks can be running on the transport_send_ task
// queue.
- RtpTransportControllerSendInterface* const transport_send_ptr_;
+ // For more details on the background of this member variable, see:
+ // https://webrtc-review.googlesource.com/c/src/+/63023/9/call/call.cc
+ // https://bugs.chromium.org/p/chromium/issues/detail?id=992640
+ RtpTransportControllerSendInterface* const transport_send_ptr_
+ RTC_GUARDED_BY(send_transport_sequence_checker_);
// Declared last since it will issue callbacks from a task queue. Declaring it
// last ensures that it is destroyed first and any running tasks are finished.
- std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
+ const std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
bool is_started_ RTC_GUARDED_BY(worker_thread_) = false;
@@ -501,11 +489,6 @@ Call* Call::Create(const Call::Config& config) {
rtc::scoped_refptr<SharedModuleThread> call_thread =
SharedModuleThread::Create(ProcessThread::Create("ModuleProcessThread"),
nullptr);
- return Create(config, std::move(call_thread));
-}
-
-Call* Call::Create(const Call::Config& config,
- rtc::scoped_refptr<SharedModuleThread> call_thread) {
return Create(config, Clock::GetRealTimeClock(), std::move(call_thread),
ProcessThread::Create("PacerThread"));
}
@@ -515,15 +498,28 @@ Call* Call::Create(const Call::Config& config,
rtc::scoped_refptr<SharedModuleThread> call_thread,
std::unique_ptr<ProcessThread> pacer_thread) {
RTC_DCHECK(config.task_queue_factory);
+
+ RtpTransportControllerSendFactory transport_controller_factory_;
+
+ RtpTransportConfig transportConfig = config.ExtractTransportConfig();
+
return new internal::Call(
clock, config,
- std::make_unique<RtpTransportControllerSend>(
- clock, config.event_log, config.network_state_predictor_factory,
- config.network_controller_factory, config.bitrate_config,
- std::move(pacer_thread), config.task_queue_factory, config.trials),
+ transport_controller_factory_.Create(transportConfig, clock,
+ std::move(pacer_thread)),
std::move(call_thread), config.task_queue_factory);
}
+Call* Call::Create(const Call::Config& config,
+ Clock* clock,
+ rtc::scoped_refptr<SharedModuleThread> call_thread,
+ std::unique_ptr<RtpTransportControllerSendInterface>
+ transportControllerSend) {
+ RTC_DCHECK(config.task_queue_factory);
+ return new internal::Call(clock, config, std::move(transportControllerSend),
+ std::move(call_thread), config.task_queue_factory);
+}
+
class SharedModuleThread::Impl {
public:
Impl(std::unique_ptr<ProcessThread> process_thread,
@@ -628,6 +624,157 @@ VideoSendStream* Call::CreateVideoSendStream(
namespace internal {
+Call::ReceiveStats::ReceiveStats(Clock* clock)
+ : received_bytes_per_second_counter_(clock, nullptr, false),
+ received_audio_bytes_per_second_counter_(clock, nullptr, false),
+ received_video_bytes_per_second_counter_(clock, nullptr, false),
+ received_rtcp_bytes_per_second_counter_(clock, nullptr, false) {
+ sequence_checker_.Detach();
+}
+
+void Call::ReceiveStats::AddReceivedRtcpBytes(int bytes) {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ if (received_bytes_per_second_counter_.HasSample()) {
+ // First RTP packet has been received.
+ received_bytes_per_second_counter_.Add(static_cast<int>(bytes));
+ received_rtcp_bytes_per_second_counter_.Add(static_cast<int>(bytes));
+ }
+}
+
+void Call::ReceiveStats::AddReceivedAudioBytes(int bytes,
+ webrtc::Timestamp arrival_time) {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ received_bytes_per_second_counter_.Add(bytes);
+ received_audio_bytes_per_second_counter_.Add(bytes);
+ if (!first_received_rtp_audio_timestamp_)
+ first_received_rtp_audio_timestamp_ = arrival_time;
+ last_received_rtp_audio_timestamp_ = arrival_time;
+}
+
+void Call::ReceiveStats::AddReceivedVideoBytes(int bytes,
+ webrtc::Timestamp arrival_time) {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ received_bytes_per_second_counter_.Add(bytes);
+ received_video_bytes_per_second_counter_.Add(bytes);
+ if (!first_received_rtp_video_timestamp_)
+ first_received_rtp_video_timestamp_ = arrival_time;
+ last_received_rtp_video_timestamp_ = arrival_time;
+}
+
+Call::ReceiveStats::~ReceiveStats() {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ if (first_received_rtp_audio_timestamp_) {
+ RTC_HISTOGRAM_COUNTS_100000(
+ "WebRTC.Call.TimeReceivingAudioRtpPacketsInSeconds",
+ (*last_received_rtp_audio_timestamp_ -
+ *first_received_rtp_audio_timestamp_)
+ .seconds());
+ }
+ if (first_received_rtp_video_timestamp_) {
+ RTC_HISTOGRAM_COUNTS_100000(
+ "WebRTC.Call.TimeReceivingVideoRtpPacketsInSeconds",
+ (*last_received_rtp_video_timestamp_ -
+ *first_received_rtp_video_timestamp_)
+ .seconds());
+ }
+ const int kMinRequiredPeriodicSamples = 5;
+ AggregatedStats video_bytes_per_sec =
+ received_video_bytes_per_second_counter_.GetStats();
+ if (video_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
+ RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.VideoBitrateReceivedInKbps",
+ video_bytes_per_sec.average * 8 / 1000);
+ RTC_LOG(LS_INFO) << "WebRTC.Call.VideoBitrateReceivedInBps, "
+ << video_bytes_per_sec.ToStringWithMultiplier(8);
+ }
+ AggregatedStats audio_bytes_per_sec =
+ received_audio_bytes_per_second_counter_.GetStats();
+ if (audio_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
+ RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.AudioBitrateReceivedInKbps",
+ audio_bytes_per_sec.average * 8 / 1000);
+ RTC_LOG(LS_INFO) << "WebRTC.Call.AudioBitrateReceivedInBps, "
+ << audio_bytes_per_sec.ToStringWithMultiplier(8);
+ }
+ AggregatedStats rtcp_bytes_per_sec =
+ received_rtcp_bytes_per_second_counter_.GetStats();
+ if (rtcp_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
+ RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.RtcpBitrateReceivedInBps",
+ rtcp_bytes_per_sec.average * 8);
+ RTC_LOG(LS_INFO) << "WebRTC.Call.RtcpBitrateReceivedInBps, "
+ << rtcp_bytes_per_sec.ToStringWithMultiplier(8);
+ }
+ AggregatedStats recv_bytes_per_sec =
+ received_bytes_per_second_counter_.GetStats();
+ if (recv_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
+ RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.BitrateReceivedInKbps",
+ recv_bytes_per_sec.average * 8 / 1000);
+ RTC_LOG(LS_INFO) << "WebRTC.Call.BitrateReceivedInBps, "
+ << recv_bytes_per_sec.ToStringWithMultiplier(8);
+ }
+}
+
+Call::SendStats::SendStats(Clock* clock)
+ : clock_(clock),
+ estimated_send_bitrate_kbps_counter_(clock, nullptr, true),
+ pacer_bitrate_kbps_counter_(clock, nullptr, true) {
+ destructor_sequence_checker_.Detach();
+ sequence_checker_.Detach();
+}
+
+Call::SendStats::~SendStats() {
+ RTC_DCHECK_RUN_ON(&destructor_sequence_checker_);
+ if (!first_sent_packet_time_)
+ return;
+
+ TimeDelta elapsed = clock_->CurrentTime() - *first_sent_packet_time_;
+ if (elapsed.seconds() < metrics::kMinRunTimeInSeconds)
+ return;
+
+ const int kMinRequiredPeriodicSamples = 5;
+ AggregatedStats send_bitrate_stats =
+ estimated_send_bitrate_kbps_counter_.ProcessAndGetStats();
+ if (send_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) {
+ RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.EstimatedSendBitrateInKbps",
+ send_bitrate_stats.average);
+ RTC_LOG(LS_INFO) << "WebRTC.Call.EstimatedSendBitrateInKbps, "
+ << send_bitrate_stats.ToString();
+ }
+ AggregatedStats pacer_bitrate_stats =
+ pacer_bitrate_kbps_counter_.ProcessAndGetStats();
+ if (pacer_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) {
+ RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.PacerBitrateInKbps",
+ pacer_bitrate_stats.average);
+ RTC_LOG(LS_INFO) << "WebRTC.Call.PacerBitrateInKbps, "
+ << pacer_bitrate_stats.ToString();
+ }
+}
+
+void Call::SendStats::SetFirstPacketTime(
+ absl::optional<Timestamp> first_sent_packet_time) {
+ RTC_DCHECK_RUN_ON(&destructor_sequence_checker_);
+ first_sent_packet_time_ = first_sent_packet_time;
+}
+
+void Call::SendStats::PauseSendAndPacerBitrateCounters() {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ estimated_send_bitrate_kbps_counter_.ProcessAndPause();
+ pacer_bitrate_kbps_counter_.ProcessAndPause();
+}
+
+void Call::SendStats::AddTargetBitrateSample(uint32_t target_bitrate_bps) {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000);
+ // Pacer bitrate may be higher than bitrate estimate if enforcing min
+ // bitrate.
+ uint32_t pacer_bitrate_bps =
+ std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_);
+ pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000);
+}
+
+void Call::SendStats::SetMinAllocatableRate(BitrateAllocationLimits limits) {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps();
+}
+
Call::Call(Clock* clock,
const Call::Config& config,
std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
@@ -645,23 +792,22 @@ Call::Call(Clock* clock,
call_stats_(new CallStats(clock_, worker_thread_)),
bitrate_allocator_(new BitrateAllocator(this)),
config_(config),
+ trials_(*config.trials),
audio_network_state_(kNetworkDown),
video_network_state_(kNetworkDown),
aggregate_network_up_(false),
event_log_(config.event_log),
- received_bytes_per_second_counter_(clock_, nullptr, true),
- received_audio_bytes_per_second_counter_(clock_, nullptr, true),
- received_video_bytes_per_second_counter_(clock_, nullptr, true),
- received_rtcp_bytes_per_second_counter_(clock_, nullptr, true),
- last_bandwidth_bps_(0),
- min_allocated_send_bitrate_bps_(0),
- configured_max_padding_bitrate_bps_(0),
- estimated_send_bitrate_kbps_counter_(clock_, nullptr, true),
- pacer_bitrate_kbps_counter_(clock_, nullptr, true),
- receive_side_cc_(clock_, transport_send->packet_router()),
+ receive_stats_(clock_),
+ send_stats_(clock_),
+ receive_side_cc_(clock,
+ absl::bind_front(&PacketRouter::SendCombinedRtcpPacket,
+ transport_send->packet_router()),
+ absl::bind_front(&PacketRouter::SendRemb,
+ transport_send->packet_router()),
+ /*network_state_estimator=*/nullptr),
receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()),
video_send_delay_stats_(new SendDelayStats(clock_)),
- start_ms_(clock_->TimeInMilliseconds()),
+ start_of_call_(clock_->CurrentTime()),
transport_send_ptr_(transport_send.get()),
transport_send_(std::move(transport_send)) {
RTC_DCHECK(config.event_log != nullptr);
@@ -669,6 +815,8 @@ Call::Call(Clock* clock,
RTC_DCHECK(network_thread_);
RTC_DCHECK(worker_thread_->IsCurrent());
+ send_transport_sequence_checker_.Detach();
+
// Do not remove this call; it is here to convince the compiler that the
// WebRTC source timestamp string needs to be in the final binary.
LoadWebRTCVersionInRegister();
@@ -694,24 +842,11 @@ Call::~Call() {
receive_side_cc_.GetRemoteBitrateEstimator(true));
module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_);
call_stats_->DeregisterStatsObserver(&receive_side_cc_);
+ send_stats_.SetFirstPacketTime(transport_send_->GetFirstPacketTime());
- absl::optional<Timestamp> first_sent_packet_time =
- transport_send_->GetFirstPacketTime();
-
- Timestamp now = clock_->CurrentTime();
-
- // Only update histograms after process threads have been shut down, so that
- // they won't try to concurrently update stats.
- if (first_sent_packet_time) {
- UpdateSendHistograms(now, *first_sent_packet_time,
- estimated_send_bitrate_kbps_counter_,
- pacer_bitrate_kbps_counter_);
- }
-
- UpdateReceiveHistograms();
-
- RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.LifetimeInSeconds",
- (now.ms() - start_ms_) / 1000);
+ RTC_HISTOGRAM_COUNTS_100000(
+ "WebRTC.Call.LifetimeInSeconds",
+ (clock_->CurrentTime() - start_of_call_).seconds());
}
void Call::EnsureStarted() {
@@ -724,10 +859,10 @@ void Call::EnsureStarted() {
// This call seems to kick off a number of things, so probably better left
// off being kicked off on request rather than in the ctor.
- transport_send_ptr_->RegisterTargetTransferRateObserver(this);
+ transport_send_->RegisterTargetTransferRateObserver(this);
module_process_thread_->EnsureStarted();
- transport_send_ptr_->EnsureStarted();
+ transport_send_->EnsureStarted();
}
void Call::SetClientBitratePreferences(const BitrateSettings& preferences) {
@@ -735,52 +870,6 @@ void Call::SetClientBitratePreferences(const BitrateSettings& preferences) {
GetTransportControllerSend()->SetClientBitratePreferences(preferences);
}
-void Call::UpdateReceiveHistograms() {
- if (first_received_rtp_audio_ms_) {
- RTC_HISTOGRAM_COUNTS_100000(
- "WebRTC.Call.TimeReceivingAudioRtpPacketsInSeconds",
- (*last_received_rtp_audio_ms_ - *first_received_rtp_audio_ms_) / 1000);
- }
- if (first_received_rtp_video_ms_) {
- RTC_HISTOGRAM_COUNTS_100000(
- "WebRTC.Call.TimeReceivingVideoRtpPacketsInSeconds",
- (*last_received_rtp_video_ms_ - *first_received_rtp_video_ms_) / 1000);
- }
- const int kMinRequiredPeriodicSamples = 5;
- AggregatedStats video_bytes_per_sec =
- received_video_bytes_per_second_counter_.GetStats();
- if (video_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
- RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.VideoBitrateReceivedInKbps",
- video_bytes_per_sec.average * 8 / 1000);
- RTC_LOG(LS_INFO) << "WebRTC.Call.VideoBitrateReceivedInBps, "
- << video_bytes_per_sec.ToStringWithMultiplier(8);
- }
- AggregatedStats audio_bytes_per_sec =
- received_audio_bytes_per_second_counter_.GetStats();
- if (audio_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
- RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.AudioBitrateReceivedInKbps",
- audio_bytes_per_sec.average * 8 / 1000);
- RTC_LOG(LS_INFO) << "WebRTC.Call.AudioBitrateReceivedInBps, "
- << audio_bytes_per_sec.ToStringWithMultiplier(8);
- }
- AggregatedStats rtcp_bytes_per_sec =
- received_rtcp_bytes_per_second_counter_.GetStats();
- if (rtcp_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
- RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.RtcpBitrateReceivedInBps",
- rtcp_bytes_per_sec.average * 8);
- RTC_LOG(LS_INFO) << "WebRTC.Call.RtcpBitrateReceivedInBps, "
- << rtcp_bytes_per_sec.ToStringWithMultiplier(8);
- }
- AggregatedStats recv_bytes_per_sec =
- received_bytes_per_second_counter_.GetStats();
- if (recv_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
- RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.BitrateReceivedInKbps",
- recv_bytes_per_sec.average * 8 / 1000);
- RTC_LOG(LS_INFO) << "WebRTC.Call.BitrateReceivedInBps, "
- << recv_bytes_per_sec.ToStringWithMultiplier(8);
- }
-}
-
PacketReceiver* Call::Receiver() {
return this;
}
@@ -804,7 +893,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
AudioSendStream* send_stream = new AudioSendStream(
clock_, config, config_.audio_state, task_queue_factory_,
- module_process_thread_->process_thread(), transport_send_ptr_,
+ module_process_thread_->process_thread(), transport_send_.get(),
bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(),
suspended_rtp_state);
RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
@@ -814,7 +903,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
// TODO(bugs.webrtc.org/11993): call AssociateSendStream and
// UpdateAggregateNetworkState asynchronously on the network thread.
for (AudioReceiveStream* stream : audio_receive_streams_) {
- if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
+ if (stream->local_ssrc() == config.rtp.ssrc) {
stream->AssociateSendStream(send_stream);
}
}
@@ -842,7 +931,7 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
// TODO(bugs.webrtc.org/11993): call AssociateSendStream and
// UpdateAggregateNetworkState asynchronously on the network thread.
for (AudioReceiveStream* stream : audio_receive_streams_) {
- if (stream->config().rtp.local_ssrc == ssrc) {
+ if (stream->local_ssrc() == ssrc) {
stream->AssociateSendStream(nullptr);
}
}
@@ -860,20 +949,21 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
CreateRtcLogStreamConfig(config)));
- // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream|
- // and |audio_receiver_controller_| out of AudioReceiveStream construction and
- // set it up asynchronously on the network thread (the registration and
- // |audio_receiver_controller_| need to live on the network thread).
AudioReceiveStream* receive_stream = new AudioReceiveStream(
- clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(),
+ clock_, transport_send_->packet_router(),
module_process_thread_->process_thread(), config_.neteq_factory, config,
config_.audio_state, event_log_);
+ audio_receive_streams_.insert(receive_stream);
+
+ // TODO(bugs.webrtc.org/11993): Make the registration on the network thread
+ // (asynchronously). The registration and `audio_receiver_controller_` need
+ // to live on the network thread.
+ receive_stream->RegisterWithTransport(&audio_receiver_controller_);
// TODO(bugs.webrtc.org/11993): Update the below on the network thread.
// We could possibly set up the audio_receiver_controller_ association up
// as part of the async setup.
- receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
- audio_receive_streams_.insert(receive_stream);
+ receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream);
ConfigureSync(config.sync_group);
@@ -894,20 +984,22 @@ void Call::DestroyAudioReceiveStream(
webrtc::internal::AudioReceiveStream* audio_receive_stream =
static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
+ // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync
+ // and UpdateAggregateNetworkState on the network thread. The call to
+ // `UnregisterFromTransport` should also happen on the network thread.
+ audio_receive_stream->UnregisterFromTransport();
+
+ uint32_t ssrc = audio_receive_stream->remote_ssrc();
const AudioReceiveStream::Config& config = audio_receive_stream->config();
- uint32_t ssrc = config.rtp.remote_ssrc;
- receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
+ receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config.rtp))
->RemoveStream(ssrc);
- // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync
- // and UpdateAggregateNetworkState on the network thread.
audio_receive_streams_.erase(audio_receive_stream);
- const std::string& sync_group = audio_receive_stream->config().sync_group;
- const auto it = sync_stream_mapping_.find(sync_group);
+ const auto it = sync_stream_mapping_.find(config.sync_group);
if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) {
sync_stream_mapping_.erase(it);
- ConfigureSync(sync_group);
+ ConfigureSync(config.sync_group);
}
receive_rtp_config_.erase(ssrc);
@@ -942,7 +1034,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
VideoSendStream* send_stream = new VideoSendStream(
clock_, num_cpu_cores_, module_process_thread_->process_thread(),
- task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_ptr_,
+ task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_.get(),
bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_,
std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_,
suspended_video_payload_states_, std::move(fec_controller));
@@ -952,6 +1044,8 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
video_send_ssrcs_[ssrc] = send_stream;
}
video_send_streams_.insert(send_stream);
+ video_send_streams_empty_.store(false, std::memory_order_relaxed);
+
// Forward resources that were previously added to the call to the new stream.
for (const auto& resource_forwarder : adaptation_resource_forwarders_) {
resource_forwarder->OnCreateVideoSendStream(send_stream);
@@ -965,6 +1059,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
webrtc::VideoSendStream* Call::CreateVideoSendStream(
webrtc::VideoSendStream::Config config,
VideoEncoderConfig encoder_config) {
+ RTC_DCHECK_RUN_ON(worker_thread_);
if (config_.fec_controller_factory) {
RTC_LOG(LS_INFO) << "External FEC Controller will be used.";
}
@@ -981,9 +1076,12 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
RTC_DCHECK(send_stream != nullptr);
RTC_DCHECK_RUN_ON(worker_thread_);
- send_stream->Stop();
-
- VideoSendStream* send_stream_impl = nullptr;
+ VideoSendStream* send_stream_impl =
+ static_cast<VideoSendStream*>(send_stream);
+ VideoSendStream::RtpStateMap rtp_states;
+ VideoSendStream::RtpPayloadStateMap rtp_payload_states;
+ send_stream_impl->StopPermanentlyAndGetRtpStates(&rtp_states,
+ &rtp_payload_states);
auto it = video_send_ssrcs_.begin();
while (it != video_send_ssrcs_.end()) {
@@ -994,18 +1092,15 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
++it;
}
}
+
// Stop forwarding resources to the stream being destroyed.
for (const auto& resource_forwarder : adaptation_resource_forwarders_) {
resource_forwarder->OnDestroyVideoSendStream(send_stream_impl);
}
video_send_streams_.erase(send_stream_impl);
+ if (video_send_streams_.empty())
+ video_send_streams_empty_.store(true, std::memory_order_relaxed);
- RTC_CHECK(send_stream_impl != nullptr);
-
- VideoSendStream::RtpStateMap rtp_states;
- VideoSendStream::RtpPayloadStateMap rtp_payload_states;
- send_stream_impl->StopPermanentlyAndGetRtpStates(&rtp_states,
- &rtp_payload_states);
for (const auto& kv : rtp_states) {
suspended_video_send_ssrcs_[kv.first] = kv.second;
}
@@ -1014,6 +1109,8 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
}
UpdateAggregateNetworkState();
+ // TODO(tommi): consider deleting on the same thread as runs
+ // StopPermanentlyAndGetRtpStates.
delete send_stream_impl;
}
@@ -1032,10 +1129,13 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
// and set it up asynchronously on the network thread (the registration and
// |video_receiver_controller_| need to live on the network thread).
VideoReceiveStream2* receive_stream = new VideoReceiveStream2(
- task_queue_factory_, worker_thread_, &video_receiver_controller_,
- num_cpu_cores_, transport_send_ptr_->packet_router(),
- std::move(configuration), module_process_thread_->process_thread(),
- call_stats_.get(), clock_, new VCMTiming(clock_));
+ task_queue_factory_, this, num_cpu_cores_,
+ transport_send_->packet_router(), std::move(configuration),
+ module_process_thread_->process_thread(), call_stats_.get(), clock_,
+ new VCMTiming(clock_));
+ // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network
+ // thread.
+ receive_stream->RegisterWithTransport(&video_receiver_controller_);
const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
if (config.rtp.rtx_ssrc) {
@@ -1043,9 +1143,9 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
// stream. Since the transport_send_cc negotiation is per payload
// type, we may get an incorrect value for the rtx stream, but
// that is unlikely to matter in practice.
- receive_rtp_config_.emplace(config.rtp.rtx_ssrc, ReceiveRtpConfig(config));
+ receive_rtp_config_.emplace(config.rtp.rtx_ssrc, receive_stream);
}
- receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
+ receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream);
video_receive_streams_.insert(receive_stream);
ConfigureSync(config.sync_group);
@@ -1063,6 +1163,9 @@ void Call::DestroyVideoReceiveStream(
RTC_DCHECK(receive_stream != nullptr);
VideoReceiveStream2* receive_stream_impl =
static_cast<VideoReceiveStream2*>(receive_stream);
+ // TODO(bugs.webrtc.org/11993): Unregister on the network thread.
+ receive_stream_impl->UnregisterFromTransport();
+
const VideoReceiveStream::Config& config = receive_stream_impl->config();
// Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
@@ -1074,7 +1177,7 @@ void Call::DestroyVideoReceiveStream(
video_receive_streams_.erase(receive_stream_impl);
ConfigureSync(config.sync_group);
- receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
+ receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config.rtp))
->RemoveStream(config.rtp.remote_ssrc);
UpdateAggregateNetworkState();
@@ -1097,12 +1200,16 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
// OnRtpPacket until the constructor is finished and the object is
// in a valid state, since OnRtpPacket runs on the same thread.
receive_stream = new FlexfecReceiveStreamImpl(
- clock_, &video_receiver_controller_, config, recovered_packet_receiver,
- call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread());
+ clock_, config, recovered_packet_receiver, call_stats_->AsRtcpRttStats(),
+ module_process_thread_->process_thread());
- RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
+ // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network
+ // thread.
+ receive_stream->RegisterWithTransport(&video_receiver_controller_);
+
+ RTC_DCHECK(receive_rtp_config_.find(config.rtp.remote_ssrc) ==
receive_rtp_config_.end());
- receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config));
+ receive_rtp_config_.emplace(config.rtp.remote_ssrc, receive_stream);
// TODO(brandtr): Store config in RtcEventLog here.
@@ -1113,15 +1220,19 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
RTC_DCHECK_RUN_ON(worker_thread_);
+ FlexfecReceiveStreamImpl* receive_stream_impl =
+ static_cast<FlexfecReceiveStreamImpl*>(receive_stream);
+ // TODO(bugs.webrtc.org/11993): Unregister on the network thread.
+ receive_stream_impl->UnregisterFromTransport();
+
RTC_DCHECK(receive_stream != nullptr);
- const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
- uint32_t ssrc = config.remote_ssrc;
- receive_rtp_config_.erase(ssrc);
+ const FlexfecReceiveStream::RtpConfig& rtp = receive_stream->rtp_config();
+ receive_rtp_config_.erase(rtp.remote_ssrc);
// Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
// destroyed.
- receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
- ->RemoveStream(ssrc);
+ receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(rtp))
+ ->RemoveStream(rtp.remote_ssrc);
delete receive_stream;
}
@@ -1137,7 +1248,7 @@ void Call::AddAdaptationResource(rtc::scoped_refptr<Resource> resource) {
}
RtpTransportControllerSendInterface* Call::GetTransportControllerSend() {
- return transport_send_ptr_;
+ return transport_send_.get();
}
Call::Stats Call::GetStats() const {
@@ -1147,7 +1258,7 @@ Call::Stats Call::GetStats() const {
// TODO(srte): It is unclear if we only want to report queues if network is
// available.
stats.pacer_delay_ms =
- aggregate_network_up_ ? transport_send_ptr_->GetPacerQueuingDelayMs() : 0;
+ aggregate_network_up_ ? transport_send_->GetPacerQueuingDelayMs() : 0;
stats.rtt_ms = call_stats_->LastProcessedRtt();
@@ -1157,14 +1268,16 @@ Call::Stats Call::GetStats() const {
receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate(
&ssrcs, &recv_bandwidth);
stats.recv_bandwidth_bps = recv_bandwidth;
- stats.send_bandwidth_bps = last_bandwidth_bps_;
- stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
+ stats.send_bandwidth_bps =
+ last_bandwidth_bps_.load(std::memory_order_relaxed);
+ stats.max_padding_bitrate_bps =
+ configured_max_padding_bitrate_bps_.load(std::memory_order_relaxed);
return stats;
}
const WebRtcKeyValueConfig& Call::trials() const {
- return *config_.trials;
+ return trials_;
}
TaskQueueBase* Call::network_thread() const {
@@ -1246,7 +1359,28 @@ void Call::UpdateAggregateNetworkState() {
}
aggregate_network_up_ = aggregate_network_up;
- transport_send_ptr_->OnNetworkAvailability(aggregate_network_up);
+ transport_send_->OnNetworkAvailability(aggregate_network_up);
+}
+
+void Call::OnLocalSsrcUpdated(webrtc::AudioReceiveStream& stream,
+ uint32_t local_ssrc) {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ webrtc::internal::AudioReceiveStream& receive_stream =
+ static_cast<webrtc::internal::AudioReceiveStream&>(stream);
+
+ receive_stream.SetLocalSsrc(local_ssrc);
+ auto it = audio_send_ssrcs_.find(local_ssrc);
+ receive_stream.AssociateSendStream(it != audio_send_ssrcs_.end() ? it->second
+ : nullptr);
+}
+
+void Call::OnUpdateSyncGroup(webrtc::AudioReceiveStream& stream,
+ const std::string& sync_group) {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ webrtc::internal::AudioReceiveStream& receive_stream =
+ static_cast<webrtc::internal::AudioReceiveStream&>(stream);
+ receive_stream.SetSyncGroup(sync_group);
+ ConfigureSync(sync_group);
}
void Call::OnSentPacket(const rtc::SentPacket& sent_packet) {
@@ -1258,56 +1392,47 @@ void Call::OnSentPacket(const rtc::SentPacket& sent_packet) {
// implementations that either just do a PostTask or use locking.
video_send_delay_stats_->OnSentPacket(sent_packet.packet_id,
clock_->TimeInMilliseconds());
- transport_send_ptr_->OnSentPacket(sent_packet);
+ transport_send_->OnSentPacket(sent_packet);
}
void Call::OnStartRateUpdate(DataRate start_rate) {
- RTC_DCHECK_RUN_ON(send_transport_queue());
+ RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_);
bitrate_allocator_->UpdateStartRate(start_rate.bps<uint32_t>());
}
void Call::OnTargetTransferRate(TargetTransferRate msg) {
- RTC_DCHECK_RUN_ON(send_transport_queue());
+ RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_);
uint32_t target_bitrate_bps = msg.target_rate.bps();
// For controlling the rate of feedback messages.
receive_side_cc_.OnBitrateChanged(target_bitrate_bps);
bitrate_allocator_->OnNetworkEstimateChanged(msg);
- worker_thread_->PostTask(
- ToQueuedTask(task_safety_, [this, target_bitrate_bps]() {
- RTC_DCHECK_RUN_ON(worker_thread_);
- last_bandwidth_bps_ = target_bitrate_bps;
-
- // Ignore updates if bitrate is zero (the aggregate network state is
- // down) or if we're not sending video.
- if (target_bitrate_bps == 0 || video_send_streams_.empty()) {
- estimated_send_bitrate_kbps_counter_.ProcessAndPause();
- pacer_bitrate_kbps_counter_.ProcessAndPause();
- return;
- }
+ last_bandwidth_bps_.store(target_bitrate_bps, std::memory_order_relaxed);
- estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000);
- // Pacer bitrate may be higher than bitrate estimate if enforcing min
- // bitrate.
- uint32_t pacer_bitrate_bps =
- std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_);
- pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000);
- }));
+ // Ignore updates if bitrate is zero (the aggregate network state is
+ // down) or if we're not sending video.
+ // Using |video_send_streams_empty_| is racy but as the caller can't
+ // reasonably expect synchronize with changes in |video_send_streams_| (being
+ // on |send_transport_sequence_checker|), we can avoid a PostTask this way.
+ if (target_bitrate_bps == 0 ||
+ video_send_streams_empty_.load(std::memory_order_relaxed)) {
+ send_stats_.PauseSendAndPacerBitrateCounters();
+ } else {
+ send_stats_.AddTargetBitrateSample(target_bitrate_bps);
+ }
}
void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) {
- RTC_DCHECK_RUN_ON(send_transport_queue());
+ RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_);
transport_send_ptr_->SetAllocatedSendBitrateLimits(limits);
-
- worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, limits]() {
- RTC_DCHECK_RUN_ON(worker_thread_);
- min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps();
- configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps();
- }));
+ send_stats_.SetMinAllocatableRate(limits);
+ configured_max_padding_bitrate_bps_.store(limits.max_padding_rate.bps(),
+ std::memory_order_relaxed);
}
+// RTC_RUN_ON(worker_thread_)
void Call::ConfigureSync(const std::string& sync_group) {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
// Set sync only if there was no previous one.
@@ -1359,56 +1484,62 @@ void Call::ConfigureSync(const std::string& sync_group) {
}
}
-PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type,
- const uint8_t* packet,
- size_t length) {
+// RTC_RUN_ON(network_thread_)
+void Call::DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) {
TRACE_EVENT0("webrtc", "Call::DeliverRtcp");
- // TODO(pbos): Make sure it's a valid packet.
- // Return DELIVERY_UNKNOWN_SSRC if it can be determined that
- // there's no receiver of the packet.
- if (received_bytes_per_second_counter_.HasSample()) {
- // First RTP packet has been received.
- received_bytes_per_second_counter_.Add(static_cast<int>(length));
- received_rtcp_bytes_per_second_counter_.Add(static_cast<int>(length));
- }
- bool rtcp_delivered = false;
- if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
- for (VideoReceiveStream2* stream : video_receive_streams_) {
- if (stream->DeliverRtcp(packet, length))
- rtcp_delivered = true;
- }
- }
- if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
- for (AudioReceiveStream* stream : audio_receive_streams_) {
- stream->DeliverRtcp(packet, length);
- rtcp_delivered = true;
- }
- }
- if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
- for (VideoSendStream* stream : video_send_streams_) {
- stream->DeliverRtcp(packet, length);
- rtcp_delivered = true;
- }
- }
- if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
- for (auto& kv : audio_send_ssrcs_) {
- kv.second->DeliverRtcp(packet, length);
- rtcp_delivered = true;
- }
- }
- if (rtcp_delivered) {
- event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(
- rtc::MakeArrayView(packet, length)));
- }
+ // TODO(bugs.webrtc.org/11993): This DCHECK is here just to maintain the
+ // invariant that currently the only call path to this function is via
+ // `PeerConnection::InitializeRtcpCallback()`. DeliverRtp on the other hand
+ // gets called via the channel classes and
+ // WebRtc[Audio|Video]Channel's `OnPacketReceived`. We'll remove the
+ // PeerConnection involvement as well as
+ // `JsepTransportController::OnRtcpPacketReceived_n` and `rtcp_handler`
+ // and make sure that the flow of packets is consistent from the
+ // `RtpTransport` class, via the *Channel and *Engine classes and into Call.
+ // This way we'll also know more about the context of the packet.
+ RTC_DCHECK_EQ(media_type, MediaType::ANY);
+
+ // TODO(bugs.webrtc.org/11993): This should execute directly on the network
+ // thread.
+ worker_thread_->PostTask(
+ ToQueuedTask(task_safety_, [this, packet = std::move(packet)]() {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+
+ receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));
+ bool rtcp_delivered = false;
+ for (VideoReceiveStream2* stream : video_receive_streams_) {
+ if (stream->DeliverRtcp(packet.cdata(), packet.size()))
+ rtcp_delivered = true;
+ }
+
+ for (AudioReceiveStream* stream : audio_receive_streams_) {
+ stream->DeliverRtcp(packet.cdata(), packet.size());
+ rtcp_delivered = true;
+ }
+
+ for (VideoSendStream* stream : video_send_streams_) {
+ stream->DeliverRtcp(packet.cdata(), packet.size());
+ rtcp_delivered = true;
+ }
+
+ for (auto& kv : audio_send_ssrcs_) {
+ kv.second->DeliverRtcp(packet.cdata(), packet.size());
+ rtcp_delivered = true;
+ }
- return rtcp_delivered ? DELIVERY_OK : DELIVERY_PACKET_ERROR;
+ if (rtcp_delivered) {
+ event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(
+ rtc::MakeArrayView(packet.cdata(), packet.size())));
+ }
+ }));
}
PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
TRACE_EVENT0("webrtc", "Call::DeliverRtp");
+ RTC_DCHECK_NE(media_type, MediaType::ANY);
RtpPacketReceived parsed_packet;
if (!parsed_packet.Parse(std::move(packet)))
@@ -1421,9 +1552,9 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
}
- parsed_packet.set_arrival_time_ms((packet_time_us + 500) / 1000);
+ parsed_packet.set_arrival_time(Timestamp::Micros(packet_time_us));
} else {
- parsed_packet.set_arrival_time_ms(clock_->TimeInMilliseconds());
+ parsed_packet.set_arrival_time(clock_->CurrentTime());
}
// We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
@@ -1446,7 +1577,8 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
return DELIVERY_UNKNOWN_SSRC;
}
- parsed_packet.IdentifyExtensions(it->second.extensions);
+ parsed_packet.IdentifyExtensions(
+ RtpHeaderExtensionMap(it->second->rtp_config().extensions));
NotifyBweOfReceivedPacket(parsed_packet, media_type);
@@ -1455,29 +1587,19 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
int length = static_cast<int>(parsed_packet.size());
if (media_type == MediaType::AUDIO) {
if (audio_receiver_controller_.OnRtpPacket(parsed_packet)) {
- received_bytes_per_second_counter_.Add(length);
- received_audio_bytes_per_second_counter_.Add(length);
+ receive_stats_.AddReceivedAudioBytes(length,
+ parsed_packet.arrival_time());
event_log_->Log(
std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
- const int64_t arrival_time_ms = parsed_packet.arrival_time_ms();
- if (!first_received_rtp_audio_ms_) {
- first_received_rtp_audio_ms_.emplace(arrival_time_ms);
- }
- last_received_rtp_audio_ms_.emplace(arrival_time_ms);
return DELIVERY_OK;
}
} else if (media_type == MediaType::VIDEO) {
parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
if (video_receiver_controller_.OnRtpPacket(parsed_packet)) {
- received_bytes_per_second_counter_.Add(length);
- received_video_bytes_per_second_counter_.Add(length);
+ receive_stats_.AddReceivedVideoBytes(length,
+ parsed_packet.arrival_time());
event_log_->Log(
std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
- const int64_t arrival_time_ms = parsed_packet.arrival_time_ms();
- if (!first_received_rtp_video_ms_) {
- first_received_rtp_video_ms_.emplace(arrival_time_ms);
- }
- last_received_rtp_video_ms_.emplace(arrival_time_ms);
return DELIVERY_OK;
}
}
@@ -1488,38 +1610,16 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket(
MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
- RTC_DCHECK_RUN_ON(worker_thread_);
-
- if (IsRtcp(packet.cdata(), packet.size()))
- return DeliverRtcp(media_type, packet.cdata(), packet.size());
+ if (IsRtcp(packet.cdata(), packet.size())) {
+ RTC_DCHECK_RUN_ON(network_thread_);
+ DeliverRtcp(media_type, std::move(packet));
+ return DELIVERY_OK;
+ }
+ RTC_DCHECK_RUN_ON(worker_thread_);
return DeliverRtp(media_type, std::move(packet), packet_time_us);
}
-void Call::DeliverPacketAsync(MediaType media_type,
- rtc::CopyOnWriteBuffer packet,
- int64_t packet_time_us,
- PacketCallback callback) {
- RTC_DCHECK_RUN_ON(network_thread_);
-
- TaskQueueBase* network_thread = rtc::Thread::Current();
- RTC_DCHECK(network_thread);
-
- worker_thread_->PostTask(ToQueuedTask(
- task_safety_, [this, network_thread, media_type, p = std::move(packet),
- packet_time_us, cb = std::move(callback)] {
- RTC_DCHECK_RUN_ON(worker_thread_);
- DeliveryStatus status = DeliverPacket(media_type, p, packet_time_us);
- if (cb) {
- network_thread->PostTask(
- ToQueuedTask([cb = std::move(cb), status, media_type,
- p = std::move(p), packet_time_us]() {
- cb(status, media_type, std::move(p), packet_time_us);
- }));
- }
- }));
-}
-
void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
// TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
// This method is called synchronously via |OnRtpPacket()| (see DeliverRtp)
@@ -1543,29 +1643,31 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
// which is being torn down.
return;
}
- parsed_packet.IdentifyExtensions(it->second.extensions);
+ parsed_packet.IdentifyExtensions(
+ RtpHeaderExtensionMap(it->second->rtp_config().extensions));
// TODO(brandtr): Update here when we support protecting audio packets too.
parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
video_receiver_controller_.OnRtpPacket(parsed_packet);
}
+// RTC_RUN_ON(worker_thread_)
void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
MediaType media_type) {
auto it = receive_rtp_config_.find(packet.Ssrc());
- bool use_send_side_bwe =
- (it != receive_rtp_config_.end()) && it->second.use_send_side_bwe;
+ bool use_send_side_bwe = (it != receive_rtp_config_.end()) &&
+ UseSendSideBwe(it->second->rtp_config());
RTPHeader header;
packet.GetHeader(&header);
ReceivedPacket packet_msg;
packet_msg.size = DataSize::Bytes(packet.payload_size());
- packet_msg.receive_time = Timestamp::Millis(packet.arrival_time_ms());
+ packet_msg.receive_time = packet.arrival_time();
if (header.extension.hasAbsoluteSendTime) {
packet_msg.send_time = header.extension.GetAbsoluteSendTimestamp();
}
- transport_send_ptr_->OnReceivedPacket(packet_msg);
+ transport_send_->OnReceivedPacket(packet_msg);
if (!use_send_side_bwe && header.extension.hasTransportSequenceNumber) {
// Inconsistent configuration of send side BWE. Do nothing.
@@ -1581,8 +1683,8 @@ void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
if (media_type == MediaType::VIDEO ||
(use_send_side_bwe && header.extension.hasTransportSequenceNumber)) {
receive_side_cc_.OnReceivedPacket(
- packet.arrival_time_ms(), packet.payload_size() + packet.padding_size(),
- header);
+ packet.arrival_time().ms(),
+ packet.payload_size() + packet.padding_size(), header);
}
}