diff options
Diffstat (limited to 'webrtc/modules/pacing/paced_sender.cc')
-rw-r--r-- | webrtc/modules/pacing/paced_sender.cc | 141 |
1 files changed, 102 insertions, 39 deletions
diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc index 5d7ae17b23..121f860c7d 100644 --- a/webrtc/modules/pacing/paced_sender.cc +++ b/webrtc/modules/pacing/paced_sender.cc @@ -8,20 +8,19 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "webrtc/modules/pacing/include/paced_sender.h" - -#include <assert.h> +#include "webrtc/modules/pacing/paced_sender.h" #include <map> #include <queue> #include <set> -#include "webrtc/modules/interface/module_common_types.h" +#include "webrtc/base/checks.h" +#include "webrtc/base/logging.h" +#include "webrtc/modules/include/module_common_types.h" #include "webrtc/modules/pacing/bitrate_prober.h" #include "webrtc/system_wrappers/include/clock.h" #include "webrtc/system_wrappers/include/critical_section_wrapper.h" #include "webrtc/system_wrappers/include/field_trial.h" -#include "webrtc/system_wrappers/include/logging.h" namespace { // Time limit in milliseconds between packet bursts. @@ -33,6 +32,9 @@ const int64_t kMaxIntervalTimeMs = 30; } // namespace +// TODO(sprang): Move at least PacketQueue and MediaBudget out to separate +// files, so that we can more easily test them. + namespace webrtc { namespace paced_sender { struct Packet { @@ -86,13 +88,19 @@ struct Comparator { // Class encapsulating a priority queue with some extensions. class PacketQueue { public: - PacketQueue() : bytes_(0) {} + explicit PacketQueue(Clock* clock) + : bytes_(0), + clock_(clock), + queue_time_sum_(0), + time_last_updated_(clock_->TimeInMilliseconds()) {} virtual ~PacketQueue() {} void Push(const Packet& packet) { - if (!AddToDupeSet(packet)) { + if (!AddToDupeSet(packet)) return; - } + + UpdateQueueTime(packet.enqueue_time_ms); + // Store packet in list, use pointers in priority queue for cheaper moves. // Packets have a handle to its own iterator in the list, for easy removal // when popping from queue. @@ -114,7 +122,11 @@ class PacketQueue { void FinalizePop(const Packet& packet) { RemoveFromDupeSet(packet); bytes_ -= packet.bytes; + queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms); packet_list_.erase(packet.this_it); + RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); + if (packet_list_.empty()) + RTC_DCHECK_EQ(0u, queue_time_sum_); } bool Empty() const { return prio_queue_.empty(); } @@ -123,13 +135,29 @@ class PacketQueue { uint64_t SizeInBytes() const { return bytes_; } - int64_t OldestEnqueueTime() const { - std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin(); + int64_t OldestEnqueueTimeMs() const { + auto it = packet_list_.rbegin(); if (it == packet_list_.rend()) return 0; return it->enqueue_time_ms; } + void UpdateQueueTime(int64_t timestamp_ms) { + RTC_DCHECK_GE(timestamp_ms, time_last_updated_); + int64_t delta = timestamp_ms - time_last_updated_; + // Use packet packet_list_.size() not prio_queue_.size() here, as there + // might be an outstanding element popped from prio_queue_ currently in the + // SendPacket() call, while packet_list_ will always be correct. + queue_time_sum_ += delta * packet_list_.size(); + time_last_updated_ = timestamp_ms; + } + + int64_t AverageQueueTimeMs() const { + if (prio_queue_.empty()) + return 0; + return queue_time_sum_ / packet_list_.size(); + } + private: // Try to add a packet to the set of ssrc/seqno identifiers currently in the // queue. Return true if inserted, false if this is a duplicate. @@ -147,7 +175,7 @@ class PacketQueue { void RemoveFromDupeSet(const Packet& packet) { SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); - assert(it != dupe_map_.end()); + RTC_DCHECK(it != dupe_map_.end()); it->second.erase(packet.sequence_number); if (it->second.empty()) { dupe_map_.erase(it); @@ -165,6 +193,9 @@ class PacketQueue { // Map<ssrc, set<seq_no> >, for checking duplicates. typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap; SsrcSeqNoMap dupe_map_; + Clock* const clock_; + int64_t queue_time_sum_; + int64_t time_last_updated_; }; class IntervalBudget { @@ -209,6 +240,7 @@ class IntervalBudget { }; } // namespace paced_sender +const int64_t PacedSender::kMaxQueueLengthMs = 2000; const float PacedSender::kDefaultPaceMultiplier = 2.5f; PacedSender::PacedSender(Clock* clock, @@ -225,8 +257,9 @@ PacedSender::PacedSender(Clock* clock, padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)), prober_(new BitrateProber()), bitrate_bps_(1000 * bitrate_kbps), + max_bitrate_kbps_(max_bitrate_kbps), time_last_update_us_(clock->TimeInMicroseconds()), - packets_(new paced_sender::PacketQueue()), + packets_(new paced_sender::PacketQueue(clock)), packet_counter_(0) { UpdateBytesPerInterval(kMinPacketLimitMs); } @@ -244,7 +277,7 @@ void PacedSender::Resume() { } void PacedSender::SetProbingEnabled(bool enabled) { - assert(packet_counter_ == 0); + RTC_CHECK_EQ(0u, packet_counter_); probing_enabled_ = enabled; } @@ -252,9 +285,12 @@ void PacedSender::UpdateBitrate(int bitrate_kbps, int max_bitrate_kbps, int min_bitrate_kbps) { CriticalSectionScoped cs(critsect_.get()); - media_budget_->set_target_rate_kbps(max_bitrate_kbps); + // Don't set media bitrate here as it may be boosted in order to meet max + // queue time constraint. Just update max_bitrate_kbps_ and let media_budget_ + // be updated in Process(). padding_budget_->set_target_rate_kbps(min_bitrate_kbps); bitrate_bps_ = 1000 * bitrate_kbps; + max_bitrate_kbps_ = max_bitrate_kbps; } void PacedSender::InsertPacket(RtpPacketSender::Priority priority, @@ -265,25 +301,23 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, bool retransmission) { CriticalSectionScoped cs(critsect_.get()); - if (probing_enabled_ && !prober_->IsProbing()) { + if (probing_enabled_ && !prober_->IsProbing()) prober_->SetEnabled(true); - } prober_->MaybeInitializeProbe(bitrate_bps_); - if (capture_time_ms < 0) { - capture_time_ms = clock_->TimeInMilliseconds(); - } + int64_t now_ms = clock_->TimeInMilliseconds(); + if (capture_time_ms < 0) + capture_time_ms = now_ms; - packets_->Push(paced_sender::Packet( - priority, ssrc, sequence_number, capture_time_ms, - clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++)); + packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number, + capture_time_ms, now_ms, bytes, + retransmission, packet_counter_++)); } int64_t PacedSender::ExpectedQueueTimeMs() const { CriticalSectionScoped cs(critsect_.get()); - int target_rate = media_budget_->target_rate_kbps(); - assert(target_rate > 0); - return static_cast<int64_t>(packets_->SizeInBytes() * 8 / target_rate); + RTC_DCHECK_GT(max_bitrate_kbps_, 0); + return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_); } size_t PacedSender::QueueSizePackets() const { @@ -294,20 +328,25 @@ size_t PacedSender::QueueSizePackets() const { int64_t PacedSender::QueueInMs() const { CriticalSectionScoped cs(critsect_.get()); - int64_t oldest_packet = packets_->OldestEnqueueTime(); + int64_t oldest_packet = packets_->OldestEnqueueTimeMs(); if (oldest_packet == 0) return 0; return clock_->TimeInMilliseconds() - oldest_packet; } +int64_t PacedSender::AverageQueueTimeMs() { + CriticalSectionScoped cs(critsect_.get()); + packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); + return packets_->AverageQueueTimeMs(); +} + int64_t PacedSender::TimeUntilNextProcess() { CriticalSectionScoped cs(critsect_.get()); if (prober_->IsProbing()) { int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); - if (ret >= 0) { + if (ret >= 0) return ret; - } } int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; @@ -319,27 +358,42 @@ int32_t PacedSender::Process() { CriticalSectionScoped cs(critsect_.get()); int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000; time_last_update_us_ = now_us; - if (paused_) - return 0; - if (elapsed_time_ms > 0) { + int target_bitrate_kbps = max_bitrate_kbps_; + // TODO(holmer): Remove the !paused_ check when issue 5307 has been fixed. + if (!paused_ && elapsed_time_ms > 0) { + size_t queue_size_bytes = packets_->SizeInBytes(); + if (queue_size_bytes > 0) { + // Assuming equal size packets and input/output rate, the average packet + // has avg_time_left_ms left to get queue_size_bytes out of the queue, if + // time constraint shall be met. Determine bitrate needed for that. + packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); + int64_t avg_time_left_ms = std::max<int64_t>( + 1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs()); + int min_bitrate_needed_kbps = + static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms); + if (min_bitrate_needed_kbps > target_bitrate_kbps) + target_bitrate_kbps = min_bitrate_needed_kbps; + } + + media_budget_->set_target_rate_kbps(target_bitrate_kbps); + int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); UpdateBytesPerInterval(delta_time_ms); } while (!packets_->Empty()) { - if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) { + if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) return 0; - } // Since we need to release the lock in order to send, we first pop the // element from the priority queue but keep it in storage, so that we can // reinsert it if send fails. const paced_sender::Packet& packet = packets_->BeginPop(); + if (SendPacket(packet)) { // Send succeeded, remove it from the queue. packets_->FinalizePop(packet); - if (prober_->IsProbing()) { + if (prober_->IsProbing()) return 0; - } } else { // Send failed, put it back into the queue. packets_->CancelPop(packet); @@ -347,14 +401,16 @@ int32_t PacedSender::Process() { } } - if (!packets_->Empty()) + // TODO(holmer): Remove the paused_ check when issue 5307 has been fixed. + if (paused_ || !packets_->Empty()) return 0; size_t padding_needed; - if (prober_->IsProbing()) + if (prober_->IsProbing()) { padding_needed = prober_->RecommendedPacketSize(); - else + } else { padding_needed = padding_budget_->bytes_remaining(); + } if (padding_needed > 0) SendPadding(static_cast<size_t>(padding_needed)); @@ -362,6 +418,11 @@ int32_t PacedSender::Process() { } bool PacedSender::SendPacket(const paced_sender::Packet& packet) { + // TODO(holmer): Because of this bug issue 5307 we have to send audio + // packets even when the pacer is paused. Here we assume audio packets are + // always high priority and that they are the only high priority packets. + if (paused_ && packet.priority != kHighPriority) + return false; critsect_->Leave(); const bool success = callback_->TimeToSendPacket(packet.ssrc, packet.sequence_number, @@ -369,7 +430,9 @@ bool PacedSender::SendPacket(const paced_sender::Packet& packet) { packet.retransmission); critsect_->Enter(); - if (success) { + // TODO(holmer): High priority packets should only be accounted for if we are + // allocating bandwidth for audio. + if (success && packet.priority != kHighPriority) { // Update media bytes sent. prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes); media_budget_->UseBudget(packet.bytes); |