aboutsummaryrefslogtreecommitdiff
path: root/webrtc/modules/pacing/paced_sender.cc
diff options
context:
space:
mode:
Diffstat (limited to 'webrtc/modules/pacing/paced_sender.cc')
-rw-r--r--webrtc/modules/pacing/paced_sender.cc141
1 files changed, 102 insertions, 39 deletions
diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc
index 5d7ae17b23..121f860c7d 100644
--- a/webrtc/modules/pacing/paced_sender.cc
+++ b/webrtc/modules/pacing/paced_sender.cc
@@ -8,20 +8,19 @@
* be found in the AUTHORS file in the root of the source tree.
*/
-#include "webrtc/modules/pacing/include/paced_sender.h"
-
-#include <assert.h>
+#include "webrtc/modules/pacing/paced_sender.h"
#include <map>
#include <queue>
#include <set>
-#include "webrtc/modules/interface/module_common_types.h"
+#include "webrtc/base/checks.h"
+#include "webrtc/base/logging.h"
+#include "webrtc/modules/include/module_common_types.h"
#include "webrtc/modules/pacing/bitrate_prober.h"
#include "webrtc/system_wrappers/include/clock.h"
#include "webrtc/system_wrappers/include/critical_section_wrapper.h"
#include "webrtc/system_wrappers/include/field_trial.h"
-#include "webrtc/system_wrappers/include/logging.h"
namespace {
// Time limit in milliseconds between packet bursts.
@@ -33,6 +32,9 @@ const int64_t kMaxIntervalTimeMs = 30;
} // namespace
+// TODO(sprang): Move at least PacketQueue and MediaBudget out to separate
+// files, so that we can more easily test them.
+
namespace webrtc {
namespace paced_sender {
struct Packet {
@@ -86,13 +88,19 @@ struct Comparator {
// Class encapsulating a priority queue with some extensions.
class PacketQueue {
public:
- PacketQueue() : bytes_(0) {}
+ explicit PacketQueue(Clock* clock)
+ : bytes_(0),
+ clock_(clock),
+ queue_time_sum_(0),
+ time_last_updated_(clock_->TimeInMilliseconds()) {}
virtual ~PacketQueue() {}
void Push(const Packet& packet) {
- if (!AddToDupeSet(packet)) {
+ if (!AddToDupeSet(packet))
return;
- }
+
+ UpdateQueueTime(packet.enqueue_time_ms);
+
// Store packet in list, use pointers in priority queue for cheaper moves.
// Packets have a handle to its own iterator in the list, for easy removal
// when popping from queue.
@@ -114,7 +122,11 @@ class PacketQueue {
void FinalizePop(const Packet& packet) {
RemoveFromDupeSet(packet);
bytes_ -= packet.bytes;
+ queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms);
packet_list_.erase(packet.this_it);
+ RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
+ if (packet_list_.empty())
+ RTC_DCHECK_EQ(0u, queue_time_sum_);
}
bool Empty() const { return prio_queue_.empty(); }
@@ -123,13 +135,29 @@ class PacketQueue {
uint64_t SizeInBytes() const { return bytes_; }
- int64_t OldestEnqueueTime() const {
- std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin();
+ int64_t OldestEnqueueTimeMs() const {
+ auto it = packet_list_.rbegin();
if (it == packet_list_.rend())
return 0;
return it->enqueue_time_ms;
}
+ void UpdateQueueTime(int64_t timestamp_ms) {
+ RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
+ int64_t delta = timestamp_ms - time_last_updated_;
+ // Use packet packet_list_.size() not prio_queue_.size() here, as there
+ // might be an outstanding element popped from prio_queue_ currently in the
+ // SendPacket() call, while packet_list_ will always be correct.
+ queue_time_sum_ += delta * packet_list_.size();
+ time_last_updated_ = timestamp_ms;
+ }
+
+ int64_t AverageQueueTimeMs() const {
+ if (prio_queue_.empty())
+ return 0;
+ return queue_time_sum_ / packet_list_.size();
+ }
+
private:
// Try to add a packet to the set of ssrc/seqno identifiers currently in the
// queue. Return true if inserted, false if this is a duplicate.
@@ -147,7 +175,7 @@ class PacketQueue {
void RemoveFromDupeSet(const Packet& packet) {
SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
- assert(it != dupe_map_.end());
+ RTC_DCHECK(it != dupe_map_.end());
it->second.erase(packet.sequence_number);
if (it->second.empty()) {
dupe_map_.erase(it);
@@ -165,6 +193,9 @@ class PacketQueue {
// Map<ssrc, set<seq_no> >, for checking duplicates.
typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
SsrcSeqNoMap dupe_map_;
+ Clock* const clock_;
+ int64_t queue_time_sum_;
+ int64_t time_last_updated_;
};
class IntervalBudget {
@@ -209,6 +240,7 @@ class IntervalBudget {
};
} // namespace paced_sender
+const int64_t PacedSender::kMaxQueueLengthMs = 2000;
const float PacedSender::kDefaultPaceMultiplier = 2.5f;
PacedSender::PacedSender(Clock* clock,
@@ -225,8 +257,9 @@ PacedSender::PacedSender(Clock* clock,
padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
prober_(new BitrateProber()),
bitrate_bps_(1000 * bitrate_kbps),
+ max_bitrate_kbps_(max_bitrate_kbps),
time_last_update_us_(clock->TimeInMicroseconds()),
- packets_(new paced_sender::PacketQueue()),
+ packets_(new paced_sender::PacketQueue(clock)),
packet_counter_(0) {
UpdateBytesPerInterval(kMinPacketLimitMs);
}
@@ -244,7 +277,7 @@ void PacedSender::Resume() {
}
void PacedSender::SetProbingEnabled(bool enabled) {
- assert(packet_counter_ == 0);
+ RTC_CHECK_EQ(0u, packet_counter_);
probing_enabled_ = enabled;
}
@@ -252,9 +285,12 @@ void PacedSender::UpdateBitrate(int bitrate_kbps,
int max_bitrate_kbps,
int min_bitrate_kbps) {
CriticalSectionScoped cs(critsect_.get());
- media_budget_->set_target_rate_kbps(max_bitrate_kbps);
+ // Don't set media bitrate here as it may be boosted in order to meet max
+ // queue time constraint. Just update max_bitrate_kbps_ and let media_budget_
+ // be updated in Process().
padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
bitrate_bps_ = 1000 * bitrate_kbps;
+ max_bitrate_kbps_ = max_bitrate_kbps;
}
void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
@@ -265,25 +301,23 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
bool retransmission) {
CriticalSectionScoped cs(critsect_.get());
- if (probing_enabled_ && !prober_->IsProbing()) {
+ if (probing_enabled_ && !prober_->IsProbing())
prober_->SetEnabled(true);
- }
prober_->MaybeInitializeProbe(bitrate_bps_);
- if (capture_time_ms < 0) {
- capture_time_ms = clock_->TimeInMilliseconds();
- }
+ int64_t now_ms = clock_->TimeInMilliseconds();
+ if (capture_time_ms < 0)
+ capture_time_ms = now_ms;
- packets_->Push(paced_sender::Packet(
- priority, ssrc, sequence_number, capture_time_ms,
- clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++));
+ packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number,
+ capture_time_ms, now_ms, bytes,
+ retransmission, packet_counter_++));
}
int64_t PacedSender::ExpectedQueueTimeMs() const {
CriticalSectionScoped cs(critsect_.get());
- int target_rate = media_budget_->target_rate_kbps();
- assert(target_rate > 0);
- return static_cast<int64_t>(packets_->SizeInBytes() * 8 / target_rate);
+ RTC_DCHECK_GT(max_bitrate_kbps_, 0);
+ return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_);
}
size_t PacedSender::QueueSizePackets() const {
@@ -294,20 +328,25 @@ size_t PacedSender::QueueSizePackets() const {
int64_t PacedSender::QueueInMs() const {
CriticalSectionScoped cs(critsect_.get());
- int64_t oldest_packet = packets_->OldestEnqueueTime();
+ int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
if (oldest_packet == 0)
return 0;
return clock_->TimeInMilliseconds() - oldest_packet;
}
+int64_t PacedSender::AverageQueueTimeMs() {
+ CriticalSectionScoped cs(critsect_.get());
+ packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
+ return packets_->AverageQueueTimeMs();
+}
+
int64_t PacedSender::TimeUntilNextProcess() {
CriticalSectionScoped cs(critsect_.get());
if (prober_->IsProbing()) {
int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
- if (ret >= 0) {
+ if (ret >= 0)
return ret;
- }
}
int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
@@ -319,27 +358,42 @@ int32_t PacedSender::Process() {
CriticalSectionScoped cs(critsect_.get());
int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
time_last_update_us_ = now_us;
- if (paused_)
- return 0;
- if (elapsed_time_ms > 0) {
+ int target_bitrate_kbps = max_bitrate_kbps_;
+ // TODO(holmer): Remove the !paused_ check when issue 5307 has been fixed.
+ if (!paused_ && elapsed_time_ms > 0) {
+ size_t queue_size_bytes = packets_->SizeInBytes();
+ if (queue_size_bytes > 0) {
+ // Assuming equal size packets and input/output rate, the average packet
+ // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
+ // time constraint shall be met. Determine bitrate needed for that.
+ packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
+ int64_t avg_time_left_ms = std::max<int64_t>(
+ 1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs());
+ int min_bitrate_needed_kbps =
+ static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
+ if (min_bitrate_needed_kbps > target_bitrate_kbps)
+ target_bitrate_kbps = min_bitrate_needed_kbps;
+ }
+
+ media_budget_->set_target_rate_kbps(target_bitrate_kbps);
+
int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
UpdateBytesPerInterval(delta_time_ms);
}
while (!packets_->Empty()) {
- if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) {
+ if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing())
return 0;
- }
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
const paced_sender::Packet& packet = packets_->BeginPop();
+
if (SendPacket(packet)) {
// Send succeeded, remove it from the queue.
packets_->FinalizePop(packet);
- if (prober_->IsProbing()) {
+ if (prober_->IsProbing())
return 0;
- }
} else {
// Send failed, put it back into the queue.
packets_->CancelPop(packet);
@@ -347,14 +401,16 @@ int32_t PacedSender::Process() {
}
}
- if (!packets_->Empty())
+ // TODO(holmer): Remove the paused_ check when issue 5307 has been fixed.
+ if (paused_ || !packets_->Empty())
return 0;
size_t padding_needed;
- if (prober_->IsProbing())
+ if (prober_->IsProbing()) {
padding_needed = prober_->RecommendedPacketSize();
- else
+ } else {
padding_needed = padding_budget_->bytes_remaining();
+ }
if (padding_needed > 0)
SendPadding(static_cast<size_t>(padding_needed));
@@ -362,6 +418,11 @@ int32_t PacedSender::Process() {
}
bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
+ // TODO(holmer): Because of this bug issue 5307 we have to send audio
+ // packets even when the pacer is paused. Here we assume audio packets are
+ // always high priority and that they are the only high priority packets.
+ if (paused_ && packet.priority != kHighPriority)
+ return false;
critsect_->Leave();
const bool success = callback_->TimeToSendPacket(packet.ssrc,
packet.sequence_number,
@@ -369,7 +430,9 @@ bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
packet.retransmission);
critsect_->Enter();
- if (success) {
+ // TODO(holmer): High priority packets should only be accounted for if we are
+ // allocating bandwidth for audio.
+ if (success && packet.priority != kHighPriority) {
// Update media bytes sent.
prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes);
media_budget_->UseBudget(packet.bytes);