summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsprang@webrtc.org <sprang@webrtc.org>2014-11-04 16:27:16 +0000
committersprang@webrtc.org <sprang@webrtc.org>2014-11-04 16:27:16 +0000
commit9a8c28f10f329c5ce91e77057933e60224000627 (patch)
treed8f7d7681e46e4de5fc5a7a5725bb06f1226ed0f
parent8d28158fe5a9526c3b8138fc3c27bcea768eb6f5 (diff)
downloadwebrtc-9a8c28f10f329c5ce91e77057933e60224000627.tar.gz
Reworked paced sender queue
Packet queue in the paced sender is now based on a priority queue rather than having a separate fifo-queue per priority level. This allows more flexible sorting and cleaner usage. Packets with earlier capture times are now prioritized higher. In situations with high packet loss, the queue might contain packets from several subsequent frames. Retransmit packets from the earlier frames first, since the later ones will probably be dependent on these. Also, don't force sending of packets after a certain time of inactivity or when packets grow too old, since this was causing consistent overuse on poor connections. Instead, drop frames in vie encoder if pacer queue is too long. BUG= R=mflodman@webrtc.org, stefan@webrtc.org Review URL: https://webrtc-codereview.appspot.com/27869004 git-svn-id: http://webrtc.googlecode.com/svn/trunk/webrtc@7617 4adac7df-926f-26a2-2b94-8c16560cd09d
-rw-r--r--modules/pacing/include/paced_sender.h40
-rw-r--r--modules/pacing/paced_sender.cc354
-rw-r--r--modules/pacing/paced_sender_unittest.cc124
-rw-r--r--modules/rtp_rtcp/source/rtp_sender.cc36
-rw-r--r--modules/rtp_rtcp/source/rtp_sender.h2
-rw-r--r--video_engine/vie_encoder.cc37
-rw-r--r--video_engine/vie_encoder.h2
7 files changed, 323 insertions, 272 deletions
diff --git a/modules/pacing/include/paced_sender.h b/modules/pacing/include/paced_sender.h
index d3034466..d7efb8ea 100644
--- a/modules/pacing/include/paced_sender.h
+++ b/modules/pacing/include/paced_sender.h
@@ -27,7 +27,7 @@ class CriticalSectionWrapper;
namespace paced_sender {
class IntervalBudget;
struct Packet;
-class PacketList;
+class PacketQueue;
} // namespace paced_sender
class PacedSender : public Module {
@@ -105,15 +105,15 @@ class PacedSender : public Module {
int bytes,
bool retransmission);
- // Sets the max length of the pacer queue in milliseconds.
- // A negative queue size is interpreted as infinite.
- virtual void set_max_queue_length_ms(int max_queue_length_ms);
-
// Returns the time since the oldest queued packet was enqueued.
virtual int QueueInMs() const;
virtual size_t QueueSizePackets() const;
+ // Returns the number of milliseconds it will take to send the current
+ // packets in the queue, given the current size and bitrate, ignoring prio.
+ virtual int ExpectedQueueTimeMs() const;
+
// Returns the number of milliseconds until the module want a worker thread
// to call Process.
virtual int32_t TimeUntilNextProcess() OVERRIDE;
@@ -125,24 +125,13 @@ class PacedSender : public Module {
virtual bool ProbingExperimentIsEnabled() const;
private:
- // Return true if next packet in line should be transmitted.
- // Return packet list that contains the next packet.
- bool ShouldSendNextPacket(paced_sender::PacketList** packet_list, bool probe)
- EXCLUSIVE_LOCKS_REQUIRED(critsect_);
-
- // Local helper function to GetNextPacket.
- paced_sender::Packet GetNextPacketFromList(paced_sender::PacketList* packets)
- EXCLUSIVE_LOCKS_REQUIRED(critsect_);
-
- bool SendPacketFromList(paced_sender::PacketList* packet_list)
- EXCLUSIVE_LOCKS_REQUIRED(critsect_);
-
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBytesPerInterval(uint32_t delta_time_in_ms)
EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- // Updates the buffers with the number of bytes that we sent.
- void UpdateMediaBytesSent(int num_bytes) EXCLUSIVE_LOCKS_REQUIRED(critsect_);
+ bool SendPacket(const paced_sender::Packet& packet)
+ EXCLUSIVE_LOCKS_REQUIRED(critsect_);
+ void SendPadding(int padding_needed) EXCLUSIVE_LOCKS_REQUIRED(critsect_);
Clock* const clock_;
Callback* const callback_;
@@ -150,7 +139,6 @@ class PacedSender : public Module {
scoped_ptr<CriticalSectionWrapper> critsect_;
bool enabled_ GUARDED_BY(critsect_);
bool paused_ GUARDED_BY(critsect_);
- int max_queue_length_ms_ GUARDED_BY(critsect_);
// This is the media budget, keeping track of how many bits of media
// we can pace out during the current interval.
scoped_ptr<paced_sender::IntervalBudget> media_budget_ GUARDED_BY(critsect_);
@@ -164,17 +152,9 @@ class PacedSender : public Module {
int bitrate_bps_ GUARDED_BY(critsect_);
int64_t time_last_update_us_ GUARDED_BY(critsect_);
- // Only accessed via process thread.
- int64_t time_last_media_send_us_;
- int64_t capture_time_ms_last_queued_ GUARDED_BY(critsect_);
- int64_t capture_time_ms_last_sent_ GUARDED_BY(critsect_);
- scoped_ptr<paced_sender::PacketList> high_priority_packets_
- GUARDED_BY(critsect_);
- scoped_ptr<paced_sender::PacketList> normal_priority_packets_
- GUARDED_BY(critsect_);
- scoped_ptr<paced_sender::PacketList> low_priority_packets_
- GUARDED_BY(critsect_);
+ scoped_ptr<paced_sender::PacketQueue> packets_ GUARDED_BY(critsect_);
+ uint64_t packet_counter_ GUARDED_BY(critsect_);
};
} // namespace webrtc
#endif // WEBRTC_MODULES_PACING_INCLUDE_PACED_SENDER_H_
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index 64b3eb1e..a071ffcc 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -13,6 +13,7 @@
#include <assert.h>
#include <map>
+#include <queue>
#include <set>
#include "webrtc/modules/interface/module_common_types.h"
@@ -31,80 +32,140 @@ const int kMinPacketLimitMs = 5;
// time.
const int kMaxIntervalTimeMs = 30;
-// Max time that the first packet in the queue can sit in the queue if no
-// packets are sent, regardless of buffer state. In practice only in effect at
-// low bitrates (less than 320 kbits/s).
-const int kMaxQueueTimeWithoutSendingUs = 30000;
-
} // namespace
namespace webrtc {
namespace paced_sender {
struct Packet {
- Packet(uint32_t ssrc,
+ Packet(PacedSender::Priority priority,
+ uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
int length_in_bytes,
- bool retransmission)
- : ssrc(ssrc),
+ bool retransmission,
+ uint64_t enqueue_order)
+ : priority(priority),
+ ssrc(ssrc),
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
bytes(length_in_bytes),
- retransmission(retransmission) {}
+ retransmission(retransmission),
+ enqueue_order(enqueue_order) {}
+
+ PacedSender::Priority priority;
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms;
int64_t enqueue_time_ms;
int bytes;
bool retransmission;
+ uint64_t enqueue_order;
+ std::list<Packet>::iterator this_it;
+};
+
+// Used by priority queue to sort packets.
+struct Comparator {
+ bool operator()(const Packet* first, const Packet* second) {
+ // Highest prio = 0.
+ if (first->priority != second->priority)
+ return first->priority > second->priority;
+
+ // Retransmissions go first.
+ if (second->retransmission && !first->retransmission)
+ return true;
+
+ // Older frames have higher prio.
+ if (first->capture_time_ms != second->capture_time_ms)
+ return first->capture_time_ms > second->capture_time_ms;
+
+ return first->enqueue_order > second->enqueue_order;
+ }
};
-// STL list style class which prevents duplicates in the list.
-class PacketList {
+// Class encapsulating a priority queue with some extensions.
+class PacketQueue {
public:
- PacketList() {};
+ PacketQueue() : bytes_(0) {}
+ virtual ~PacketQueue() {}
+
+ void Push(const Packet& packet) {
+ if (!AddToDupeSet(packet))
+ return;
+
+ // Store packet in list, use pointers in priority queue for cheaper moves.
+ // Packets have a handle to its own iterator in the list, for easy removal
+ // when popping from queue.
+ packet_list_.push_front(packet);
+ std::list<Packet>::iterator it = packet_list_.begin();
+ it->this_it = it; // Handle for direct removal from list.
+ prio_queue_.push(&(*it)); // Pointer into list.
+ bytes_ += packet.bytes;
+ }
- bool empty() const {
- return packet_list_.empty();
+ const Packet& BeginPop() {
+ const Packet& packet = *prio_queue_.top();
+ prio_queue_.pop();
+ return packet;
}
- Packet front() const {
- return packet_list_.front();
+ void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
+
+ void FinalizePop(const Packet& packet) {
+ RemoveFromDupeSet(packet);
+ bytes_ -= packet.bytes;
+ packet_list_.erase(packet.this_it);
}
- size_t size() const {
- size_t sum = 0;
- for (std::map<uint32_t, std::set<uint16_t> >::const_iterator it =
- sequence_number_set_.begin();
- it != sequence_number_set_.end();
- ++it) {
- sum += it->second.size();
- }
- return sum;
+ bool Empty() const { return prio_queue_.empty(); }
+
+ size_t SizeInPackets() const { return prio_queue_.size(); }
+
+ uint32_t SizeInBytes() const { return bytes_; }
+
+ int64_t OldestEnqueueTime() const {
+ std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin();
+ if (it == packet_list_.rend())
+ return 0;
+ return it->enqueue_time_ms;
}
- void pop_front() {
- Packet& packet = packet_list_.front();
- uint16_t sequence_number = packet.sequence_number;
- uint32_t ssrc = packet.ssrc;
- packet_list_.pop_front();
- sequence_number_set_[ssrc].erase(sequence_number);
+ private:
+ // Try to add a packet to the set of ssrc/seqno identifiers currently in the
+ // queue. Return true if inserted, false if this is a duplicate.
+ bool AddToDupeSet(const Packet& packet) {
+ SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+ if (it == dupe_map_.end()) {
+ // First for this ssrc, just insert.
+ dupe_map_[packet.ssrc].insert(packet.sequence_number);
+ return true;
+ }
+
+ // Insert returns a pair, where second is a bool set to true if new element.
+ return it->second.insert(packet.sequence_number).second;
}
- void push_back(const Packet& packet) {
- if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) ==
- sequence_number_set_[packet.ssrc].end()) {
- // Don't insert duplicates.
- packet_list_.push_back(packet);
- sequence_number_set_[packet.ssrc].insert(packet.sequence_number);
+ void RemoveFromDupeSet(const Packet& packet) {
+ SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+ assert(it != dupe_map_.end());
+ it->second.erase(packet.sequence_number);
+ if (it->second.empty()) {
+ dupe_map_.erase(it);
}
}
- private:
+ // List of packets, in the order the were enqueued. Since dequeueing may
+ // occur out of order, use list instead of vector.
std::list<Packet> packet_list_;
- std::map<uint32_t, std::set<uint16_t> > sequence_number_set_;
+ // Priority queue of the packets, sorted according to Comparator.
+ // Use pointers into list, to avoid moving whole struct within heap.
+ std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
+ // Total number of bytes in the queue.
+ uint64_t bytes_;
+ // Map<ssrc, set<seq_no> >, for checking duplicates.
+ typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
+ SsrcSeqNoMap dupe_map_;
};
class IntervalBudget {
@@ -135,6 +196,8 @@ class IntervalBudget {
int bytes_remaining() const { return bytes_remaining_; }
+ int target_rate_kbps() const { return target_rate_kbps_; }
+
private:
int target_rate_kbps_;
int bytes_remaining_;
@@ -153,18 +216,13 @@ PacedSender::PacedSender(Clock* clock,
critsect_(CriticalSectionWrapper::CreateCriticalSection()),
enabled_(true),
paused_(false),
- max_queue_length_ms_(kDefaultMaxQueueLengthMs),
media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
prober_(new BitrateProber()),
bitrate_bps_(1000 * bitrate_kbps),
time_last_update_us_(clock->TimeInMicroseconds()),
- time_last_media_send_us_(-1),
- capture_time_ms_last_queued_(0),
- capture_time_ms_last_sent_(0),
- high_priority_packets_(new paced_sender::PacketList),
- normal_priority_packets_(new paced_sender::PacketList),
- low_priority_packets_(new paced_sender::PacketList) {
+ packets_(new paced_sender::PacketQueue()),
+ packet_counter_(0) {
UpdateBytesPerInterval(kMinPacketLimitMs);
}
@@ -216,64 +274,33 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
if (capture_time_ms < 0) {
capture_time_ms = clock_->TimeInMilliseconds();
}
- if (priority != kHighPriority &&
- capture_time_ms > capture_time_ms_last_queued_) {
- capture_time_ms_last_queued_ = capture_time_ms;
- TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
- "capture_time_ms", capture_time_ms);
- }
- paced_sender::PacketList* packet_list = NULL;
- switch (priority) {
- case kHighPriority:
- packet_list = high_priority_packets_.get();
- break;
- case kNormalPriority:
- packet_list = normal_priority_packets_.get();
- break;
- case kLowPriority:
- packet_list = low_priority_packets_.get();
- break;
- }
- packet_list->push_back(paced_sender::Packet(ssrc,
- sequence_number,
- capture_time_ms,
- clock_->TimeInMilliseconds(),
- bytes,
- retransmission));
+
+ packets_->Push(paced_sender::Packet(
+ priority, ssrc, sequence_number, capture_time_ms,
+ clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++));
return false;
}
-void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
+int PacedSender::ExpectedQueueTimeMs() const {
CriticalSectionScoped cs(critsect_.get());
- max_queue_length_ms_ = max_queue_length_ms;
+ int target_rate = media_budget_->target_rate_kbps();
+ assert(target_rate > 0);
+ return packets_->SizeInBytes() * 8 / target_rate;
}
-int PacedSender::QueueInMs() const {
+size_t PacedSender::QueueSizePackets() const {
CriticalSectionScoped cs(critsect_.get());
- int64_t now_ms = clock_->TimeInMilliseconds();
- int64_t oldest_packet_enqueue_time = now_ms;
- if (!high_priority_packets_->empty()) {
- oldest_packet_enqueue_time =
- std::min(oldest_packet_enqueue_time,
- high_priority_packets_->front().enqueue_time_ms);
- }
- if (!normal_priority_packets_->empty()) {
- oldest_packet_enqueue_time =
- std::min(oldest_packet_enqueue_time,
- normal_priority_packets_->front().enqueue_time_ms);
- }
- if (!low_priority_packets_->empty()) {
- oldest_packet_enqueue_time =
- std::min(oldest_packet_enqueue_time,
- low_priority_packets_->front().enqueue_time_ms);
- }
- return now_ms - oldest_packet_enqueue_time;
+ return packets_->SizeInPackets();
}
-size_t PacedSender::QueueSizePackets() const {
+int PacedSender::QueueInMs() const {
CriticalSectionScoped cs(critsect_.get());
- return low_priority_packets_->size() + normal_priority_packets_->size() +
- high_priority_packets_->size();
+
+ int64_t oldest_packet = packets_->OldestEnqueueTime();
+ if (oldest_packet == 0)
+ return 0;
+
+ return clock_->TimeInMilliseconds() - oldest_packet;
}
int32_t PacedSender::TimeUntilNextProcess() {
@@ -303,127 +330,66 @@ int32_t PacedSender::Process() {
uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
UpdateBytesPerInterval(delta_time_ms);
}
- paced_sender::PacketList* packet_list;
- while (ShouldSendNextPacket(&packet_list, prober_->IsProbing())) {
- if (!SendPacketFromList(packet_list))
+
+ while (!packets_->Empty()) {
+ if (media_budget_->bytes_remaining() <= 0 && !prober_->IsProbing())
return 0;
- // Send one packet per Process() call when probing, so that we have
- // better control over the delta between packets.
- if (prober_->IsProbing())
+
+ // Since we need to release the lock in order to send, we first pop the
+ // element from the priority queue but keep it in storage, so that we can
+ // reinsert it if send fails.
+ const paced_sender::Packet& packet = packets_->BeginPop();
+ if (SendPacket(packet)) {
+ // Send succeeded, remove it from the queue.
+ packets_->FinalizePop(packet);
+ if (prober_->IsProbing())
+ return 0;
+ } else {
+ // Send failed, put it back into the queue.
+ packets_->CancelPop(packet);
return 0;
+ }
}
- if (high_priority_packets_->empty() && normal_priority_packets_->empty() &&
- low_priority_packets_->empty() &&
- padding_budget_->bytes_remaining() > 0) {
- int padding_needed = padding_budget_->bytes_remaining();
- critsect_->Leave();
- int bytes_sent = callback_->TimeToSendPadding(padding_needed);
- critsect_->Enter();
- media_budget_->UseBudget(bytes_sent);
- padding_budget_->UseBudget(bytes_sent);
+
+ int padding_needed = padding_budget_->bytes_remaining();
+ if (padding_needed > 0) {
+ SendPadding(padding_needed);
}
}
return 0;
}
-bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
- EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
- paced_sender::Packet packet = GetNextPacketFromList(packet_list);
+bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
critsect_->Leave();
-
const bool success = callback_->TimeToSendPacket(packet.ssrc,
packet.sequence_number,
packet.capture_time_ms,
packet.retransmission);
critsect_->Enter();
- // If packet cannot be sent then keep it in packet list and exit early.
- // There's no need to send more packets.
- if (!success) {
- return false;
- }
- packet_list->pop_front();
- const bool last_packet =
- packet_list->empty() ||
- packet_list->front().capture_time_ms > packet.capture_time_ms;
- if (packet_list != high_priority_packets_.get()) {
- if (packet.capture_time_ms > capture_time_ms_last_sent_) {
- capture_time_ms_last_sent_ = packet.capture_time_ms;
- } else if (packet.capture_time_ms == capture_time_ms_last_sent_ &&
- last_packet) {
- TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms);
- }
+
+ if (success) {
+ // Update media bytes sent.
+ prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes);
+ media_budget_->UseBudget(packet.bytes);
+ padding_budget_->UseBudget(packet.bytes);
}
- return true;
-}
-void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
- media_budget_->IncreaseBudget(delta_time_ms);
- padding_budget_->IncreaseBudget(delta_time_ms);
+ return success;
}
-bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list,
- bool probe) {
- *packet_list = NULL;
- if (!probe && media_budget_->bytes_remaining() <= 0) {
- // All bytes consumed for this interval.
- // Check if we have not sent in a too long time.
- if (clock_->TimeInMicroseconds() - time_last_media_send_us_ >
- kMaxQueueTimeWithoutSendingUs) {
- if (!high_priority_packets_->empty()) {
- *packet_list = high_priority_packets_.get();
- return true;
- }
- if (!normal_priority_packets_->empty()) {
- *packet_list = normal_priority_packets_.get();
- return true;
- }
- }
- // Send any old packets to avoid queuing for too long.
- if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
- int64_t high_priority_capture_time = -1;
- if (!high_priority_packets_->empty()) {
- high_priority_capture_time =
- high_priority_packets_->front().capture_time_ms;
- *packet_list = high_priority_packets_.get();
- }
- if (!normal_priority_packets_->empty() &&
- (high_priority_capture_time == -1 ||
- high_priority_capture_time >
- normal_priority_packets_->front().capture_time_ms)) {
- *packet_list = normal_priority_packets_.get();
- }
- if (*packet_list)
- return true;
- }
- return false;
- }
- if (!high_priority_packets_->empty()) {
- *packet_list = high_priority_packets_.get();
- return true;
- }
- if (!normal_priority_packets_->empty()) {
- *packet_list = normal_priority_packets_.get();
- return true;
- }
- if (!low_priority_packets_->empty()) {
- *packet_list = low_priority_packets_.get();
- return true;
- }
- return false;
-}
+void PacedSender::SendPadding(int padding_needed) {
+ critsect_->Leave();
+ int bytes_sent = callback_->TimeToSendPadding(padding_needed);
+ critsect_->Enter();
-paced_sender::Packet PacedSender::GetNextPacketFromList(
- paced_sender::PacketList* packets) {
- paced_sender::Packet packet = packets->front();
- UpdateMediaBytesSent(packet.bytes);
- return packet;
+ // Update padding bytes sent.
+ media_budget_->UseBudget(bytes_sent);
+ padding_budget_->UseBudget(bytes_sent);
}
-void PacedSender::UpdateMediaBytesSent(int num_bytes) {
- prober_->PacketSent(clock_->TimeInMilliseconds(), num_bytes);
- time_last_media_send_us_ = clock_->TimeInMicroseconds();
- media_budget_->UseBudget(num_bytes);
- padding_budget_->UseBudget(num_bytes);
+void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
+ media_budget_->IncreaseBudget(delta_time_ms);
+ padding_budget_->IncreaseBudget(delta_time_ms);
}
bool PacedSender::ProbingExperimentIsEnabled() const {
diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc
index f8028a91..34787d16 100644
--- a/modules/pacing/paced_sender_unittest.cc
+++ b/modules/pacing/paced_sender_unittest.cc
@@ -639,34 +639,40 @@ TEST_F(PacedSenderTest, ResendPacket) {
EXPECT_EQ(0, send_bucket_->QueueInMs());
}
-TEST_F(PacedSenderTest, MaxQueueLength) {
+TEST_F(PacedSenderTest, ExpectedQueueTimeMs) {
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
- EXPECT_EQ(0, send_bucket_->QueueInMs());
+ const int32_t kNumPackets = 60;
+ const int32_t kPacketSize = 1200;
+ const int32_t kMaxBitrate = kPaceMultiplier * 30;
+ EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs());
- send_bucket_->UpdateBitrate(30, kPaceMultiplier * 30, 0);
- for (int i = 0; i < 30; ++i) {
- SendAndExpectPacket(PacedSender::kNormalPriority,
- ssrc,
- sequence_number++,
- clock_.TimeInMilliseconds(),
- 1200,
- false);
+ send_bucket_->UpdateBitrate(30, kMaxBitrate, 0);
+ for (int i = 0; i < kNumPackets; ++i) {
+ SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize, false);
}
- clock_.AdvanceTimeMilliseconds(2001);
- SendAndExpectPacket(PacedSender::kNormalPriority,
- ssrc,
- sequence_number++,
- clock_.TimeInMilliseconds(),
- 1200,
- false);
- EXPECT_EQ(2001, send_bucket_->QueueInMs());
- send_bucket_->Process();
- EXPECT_EQ(0, send_bucket_->QueueInMs());
- clock_.AdvanceTimeMilliseconds(31);
+ // Queue in ms = 1000 * (bytes in queue) / (kbit per second * 1000 / 8)
+ int32_t queue_in_ms = kNumPackets * kPacketSize * 8 / kMaxBitrate;
+ EXPECT_EQ(queue_in_ms, send_bucket_->ExpectedQueueTimeMs());
- send_bucket_->Process();
+ int64_t time_start = clock_.TimeInMilliseconds();
+ while (send_bucket_->QueueSizePackets() > 0) {
+ int time_until_process = send_bucket_->TimeUntilNextProcess();
+ if (time_until_process <= 0) {
+ send_bucket_->Process();
+ } else {
+ clock_.AdvanceTimeMilliseconds(time_until_process);
+ }
+ }
+ int64_t duration = clock_.TimeInMilliseconds() - time_start;
+
+ EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs());
+
+ // Allow for aliasing, duration should be in [expected(n - 1), expected(n)].
+ EXPECT_LE(duration, queue_in_ms);
+ EXPECT_GE(duration, queue_in_ms - (kPacketSize * 8 / kMaxBitrate));
}
TEST_F(PacedSenderTest, QueueTimeGrowsOverTime) {
@@ -738,5 +744,79 @@ TEST_F(PacedSenderTest, ProbingWithInitialFrame) {
}
}
}
+
+TEST_F(PacedSenderTest, PriorityInversion) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const int32_t kPacketSize = 1200;
+
+ EXPECT_FALSE(send_bucket_->SendPacket(
+ PacedSender::kHighPriority, ssrc, sequence_number + 3,
+ clock_.TimeInMilliseconds() + 33, kPacketSize, true));
+
+ EXPECT_FALSE(send_bucket_->SendPacket(
+ PacedSender::kHighPriority, ssrc, sequence_number + 2,
+ clock_.TimeInMilliseconds() + 33, kPacketSize, true));
+
+ EXPECT_FALSE(send_bucket_->SendPacket(
+ PacedSender::kHighPriority, ssrc, sequence_number,
+ clock_.TimeInMilliseconds(), kPacketSize, true));
+
+ EXPECT_FALSE(send_bucket_->SendPacket(
+ PacedSender::kHighPriority, ssrc, sequence_number + 1,
+ clock_.TimeInMilliseconds(), kPacketSize, true));
+
+ // Packets from earlier frames should be sent first.
+ {
+ ::testing::InSequence sequence;
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number,
+ clock_.TimeInMilliseconds(), true))
+ .WillOnce(Return(true));
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1,
+ clock_.TimeInMilliseconds(), true))
+ .WillOnce(Return(true));
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 3,
+ clock_.TimeInMilliseconds() + 33,
+ true)).WillOnce(Return(true));
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 2,
+ clock_.TimeInMilliseconds() + 33,
+ true)).WillOnce(Return(true));
+
+ while (send_bucket_->QueueSizePackets() > 0) {
+ int time_until_process = send_bucket_->TimeUntilNextProcess();
+ if (time_until_process <= 0) {
+ send_bucket_->Process();
+ } else {
+ clock_.AdvanceTimeMilliseconds(time_until_process);
+ }
+ }
+ }
+}
+
+TEST_F(PacedSenderTest, PaddingOveruse) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const int32_t kPacketSize = 1200;
+
+ // Min bitrate 0 => no padding, padding budget will stay at 0.
+ send_bucket_->UpdateBitrate(60, 90, 0);
+ SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize, false);
+ send_bucket_->Process();
+
+ // Add 30kbit padding. When increasing budget, media budget will increase from
+ // negative (overuse) while padding budget will increase form 0.
+ clock_.AdvanceTimeMilliseconds(5);
+ send_bucket_->UpdateBitrate(60, 90, 30);
+
+ EXPECT_FALSE(send_bucket_->SendPacket(
+ PacedSender::kHighPriority, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize, false));
+
+ // Don't send padding if queue is non-empty, even if padding budget > 0.
+ EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
+ send_bucket_->Process();
+}
+
} // namespace test
} // namespace webrtc
diff --git a/modules/rtp_rtcp/source/rtp_sender.cc b/modules/rtp_rtcp/source/rtp_sender.cc
index d5412668..0438b9f7 100644
--- a/modules/rtp_rtcp/source/rtp_sender.cc
+++ b/modules/rtp_rtcp/source/rtp_sender.cc
@@ -50,12 +50,17 @@ RTPSender::RTPSender(const int32_t id,
FrameCountObserver* frame_count_observer,
SendSideDelayObserver* send_side_delay_observer)
: clock_(clock),
+ // TODO(holmer): Remove this conversion when we remove the use of
+ // TickTime.
+ clock_delta_ms_(clock_->TimeInMilliseconds() -
+ TickTime::MillisecondTimestamp()),
bitrate_sent_(clock, this),
id_(id),
audio_configured_(audio),
audio_(NULL),
video_(NULL),
paced_sender_(paced_sender),
+ last_capture_time_ms_sent_(0),
send_critsect_(CriticalSectionWrapper::CreateCriticalSection()),
transport_(transport),
sending_media_(true), // Default to sending media.
@@ -622,15 +627,10 @@ int32_t RTPSender::ReSendPacket(uint16_t packet_id, uint32_t min_resend_time) {
}
// Convert from TickTime to Clock since capture_time_ms is based on
// TickTime.
- // TODO(holmer): Remove this conversion when we remove the use of TickTime.
- int64_t clock_delta_ms = clock_->TimeInMilliseconds() -
- TickTime::MillisecondTimestamp();
- if (!paced_sender_->SendPacket(PacedSender::kHighPriority,
- header.ssrc,
- header.sequenceNumber,
- capture_time_ms + clock_delta_ms,
- length - header.headerLength,
- true)) {
+ int64_t corrected_capture_tims_ms = capture_time_ms + clock_delta_ms_;
+ if (!paced_sender_->SendPacket(
+ PacedSender::kHighPriority, header.ssrc, header.sequenceNumber,
+ corrected_capture_tims_ms, length - header.headerLength, true)) {
// We can't send the packet right now.
// We will be called when it is time.
return length;
@@ -819,6 +819,10 @@ bool RTPSender::PrepareAndSendPacket(uint8_t* buffer,
RtpUtility::RtpHeaderParser rtp_parser(buffer, length);
RTPHeader rtp_header;
rtp_parser.Parse(rtp_header);
+ if (!is_retransmit && rtp_header.markerBit) {
+ TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms);
+ }
+
TRACE_EVENT_INSTANT2("webrtc_rtp", "PrepareAndSendPacket",
"timestamp", rtp_header.timestamp,
"seqnum", rtp_header.sequenceNumber);
@@ -937,12 +941,18 @@ int32_t RTPSender::SendToNetwork(
}
if (paced_sender_ && storage != kDontStore) {
- int64_t clock_delta_ms = clock_->TimeInMilliseconds() -
- TickTime::MillisecondTimestamp();
+ // Correct offset between implementations of millisecond time stamps in
+ // TickTime and Clock.
+ int64_t corrected_time_ms = capture_time_ms + clock_delta_ms_;
if (!paced_sender_->SendPacket(priority, rtp_header.ssrc,
- rtp_header.sequenceNumber,
- capture_time_ms + clock_delta_ms,
+ rtp_header.sequenceNumber, corrected_time_ms,
payload_length, false)) {
+ if (last_capture_time_ms_sent_ == 0 ||
+ corrected_time_ms > last_capture_time_ms_sent_) {
+ last_capture_time_ms_sent_ = corrected_time_ms;
+ TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", corrected_time_ms,
+ "capture_time_ms", corrected_time_ms);
+ }
// We can't send the packet right now.
// We will be called when it is time.
return 0;
diff --git a/modules/rtp_rtcp/source/rtp_sender.h b/modules/rtp_rtcp/source/rtp_sender.h
index b2f2e0c4..780baa1f 100644
--- a/modules/rtp_rtcp/source/rtp_sender.h
+++ b/modules/rtp_rtcp/source/rtp_sender.h
@@ -336,6 +336,7 @@ class RTPSender : public RTPSenderInterface, public Bitrate::Observer {
bool IsFecPacket(const uint8_t* buffer, const RTPHeader& header) const;
Clock* clock_;
+ int64_t clock_delta_ms_;
Bitrate bitrate_sent_;
int32_t id_;
@@ -344,6 +345,7 @@ class RTPSender : public RTPSenderInterface, public Bitrate::Observer {
RTPSenderVideo *video_;
PacedSender *paced_sender_;
+ int64_t last_capture_time_ms_sent_;
CriticalSectionWrapper *send_critsect_;
Transport *transport_;
diff --git a/video_engine/vie_encoder.cc b/video_engine/vie_encoder.cc
index 9d6da977..3cb0ae70 100644
--- a/video_engine/vie_encoder.cc
+++ b/video_engine/vie_encoder.cc
@@ -470,9 +470,31 @@ bool ViEEncoder::EncoderPaused() const {
std::max(static_cast<int>(target_delay_ms_ * kEncoderPausePacerMargin),
kMinPacingDelayMs);
}
+ if (paced_sender_->ExpectedQueueTimeMs() >
+ PacedSender::kDefaultMaxQueueLengthMs) {
+ // Too much data in pacer queue, drop frame.
+ return true;
+ }
return !network_is_transmitting_;
}
+void ViEEncoder::TraceFrameDropStart() {
+ // Start trace event only on the first frame after encoder is paused.
+ if (!encoder_paused_and_dropped_frame_) {
+ TRACE_EVENT_ASYNC_BEGIN0("webrtc", "EncoderPaused", this);
+ }
+ encoder_paused_and_dropped_frame_ = true;
+ return;
+}
+
+void ViEEncoder::TraceFrameDropEnd() {
+ // End trace event on first frame after encoder resumes, if frame was dropped.
+ if (encoder_paused_and_dropped_frame_) {
+ TRACE_EVENT_ASYNC_END0("webrtc", "EncoderPaused", this);
+ }
+ encoder_paused_and_dropped_frame_ = false;
+}
+
RtpRtcp* ViEEncoder::SendRtpRtcpModule() {
return default_rtp_rtcp_.get();
}
@@ -489,16 +511,10 @@ void ViEEncoder::DeliverFrame(int id,
CriticalSectionScoped cs(data_cs_.get());
time_of_last_incoming_frame_ms_ = TickTime::MillisecondTimestamp();
if (EncoderPaused()) {
- if (!encoder_paused_and_dropped_frame_) {
- TRACE_EVENT_ASYNC_BEGIN0("webrtc", "EncoderPaused", this);
- }
- encoder_paused_and_dropped_frame_ = true;
+ TraceFrameDropStart();
return;
}
- if (encoder_paused_and_dropped_frame_) {
- TRACE_EVENT_ASYNC_END0("webrtc", "EncoderPaused", this);
- }
- encoder_paused_and_dropped_frame_ = false;
+ TraceFrameDropEnd();
}
// Convert render time, in ms, to RTP timestamp.
@@ -702,15 +718,10 @@ void ViEEncoder::SetSenderBufferingMode(int target_delay_ms) {
// Disable external frame-droppers.
vcm_.EnableFrameDropper(false);
vpm_.EnableTemporalDecimation(false);
- // We don't put any limits on the pacer queue when running in buffered mode
- // since the encoder will be paused if the queue grow too large.
- paced_sender_->set_max_queue_length_ms(-1);
} else {
// Real-time mode - enable frame droppers.
vpm_.EnableTemporalDecimation(true);
vcm_.EnableFrameDropper(true);
- paced_sender_->set_max_queue_length_ms(
- PacedSender::kDefaultMaxQueueLengthMs);
}
}
diff --git a/video_engine/vie_encoder.h b/video_engine/vie_encoder.h
index 36f87faa..1e358def 100644
--- a/video_engine/vie_encoder.h
+++ b/video_engine/vie_encoder.h
@@ -192,6 +192,8 @@ class ViEEncoder
int TimeToSendPadding(int bytes);
private:
bool EncoderPaused() const EXCLUSIVE_LOCKS_REQUIRED(data_cs_);
+ void TraceFrameDropStart() EXCLUSIVE_LOCKS_REQUIRED(data_cs_);
+ void TraceFrameDropEnd() EXCLUSIVE_LOCKS_REQUIRED(data_cs_);
void UpdateHistograms();