aboutsummaryrefslogtreecommitdiff
path: root/webrtc/modules/pacing
diff options
context:
space:
mode:
Diffstat (limited to 'webrtc/modules/pacing')
-rw-r--r--webrtc/modules/pacing/BUILD.gn33
-rw-r--r--webrtc/modules/pacing/OWNERS10
-rw-r--r--webrtc/modules/pacing/bitrate_prober.cc129
-rw-r--r--webrtc/modules/pacing/bitrate_prober.h61
-rw-r--r--webrtc/modules/pacing/bitrate_prober_unittest.cc51
-rw-r--r--webrtc/modules/pacing/include/mock/mock_paced_sender.h38
-rw-r--r--webrtc/modules/pacing/include/paced_sender.h153
-rw-r--r--webrtc/modules/pacing/include/packet_router.h66
-rw-r--r--webrtc/modules/pacing/paced_sender.cc398
-rw-r--r--webrtc/modules/pacing/paced_sender_unittest.cc834
-rw-r--r--webrtc/modules/pacing/pacing.gypi29
-rw-r--r--webrtc/modules/pacing/packet_router.cc103
-rw-r--r--webrtc/modules/pacing/packet_router_unittest.cc172
13 files changed, 2077 insertions, 0 deletions
diff --git a/webrtc/modules/pacing/BUILD.gn b/webrtc/modules/pacing/BUILD.gn
new file mode 100644
index 0000000000..3e478c1e76
--- /dev/null
+++ b/webrtc/modules/pacing/BUILD.gn
@@ -0,0 +1,33 @@
+# Copyright (c) 2014 The WebRTC project authors. All Rights Reserved.
+#
+# Use of this source code is governed by a BSD-style license
+# that can be found in the LICENSE file in the root of the source
+# tree. An additional intellectual property rights grant can be found
+# in the file PATENTS. All contributing project authors may
+# be found in the AUTHORS file in the root of the source tree.
+
+source_set("pacing") {
+ sources = [
+ "bitrate_prober.cc",
+ "bitrate_prober.h",
+ "include/paced_sender.h",
+ "include/packet_router.h",
+ "paced_sender.cc",
+ "packet_router.cc",
+ ]
+
+ configs += [ "../..:common_config" ]
+ public_configs = [ "../..:common_inherited_config" ]
+
+ if (is_clang) {
+ # Suppress warnings from Chrome's Clang plugins.
+ # See http://code.google.com/p/webrtc/issues/detail?id=163 for details.
+ configs -= [ "//build/config/clang:find_bad_constructs" ]
+ }
+
+ deps = [
+ "../../system_wrappers",
+ "../bitrate_controller",
+ "../rtp_rtcp",
+ ]
+}
diff --git a/webrtc/modules/pacing/OWNERS b/webrtc/modules/pacing/OWNERS
new file mode 100644
index 0000000000..bde04e2c20
--- /dev/null
+++ b/webrtc/modules/pacing/OWNERS
@@ -0,0 +1,10 @@
+stefan@webrtc.org
+mflodman@webrtc.org
+asapersson@webrtc.org
+
+# These are for the common case of adding or renaming files. If you're doing
+# structural changes, please get a review from a reviewer in this file.
+per-file *.gyp=*
+per-file *.gypi=*
+
+per-file BUILD.gn=kjellander@webrtc.org
diff --git a/webrtc/modules/pacing/bitrate_prober.cc b/webrtc/modules/pacing/bitrate_prober.cc
new file mode 100644
index 0000000000..bbbe54f54e
--- /dev/null
+++ b/webrtc/modules/pacing/bitrate_prober.cc
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/modules/pacing/bitrate_prober.h"
+
+#include <assert.h>
+#include <algorithm>
+#include <limits>
+#include <sstream>
+
+#include "webrtc/modules/pacing/include/paced_sender.h"
+#include "webrtc/system_wrappers/include/logging.h"
+
+namespace webrtc {
+
+namespace {
+int ComputeDeltaFromBitrate(size_t packet_size, int bitrate_bps) {
+ assert(bitrate_bps > 0);
+ // Compute the time delta needed to send packet_size bytes at bitrate_bps
+ // bps. Result is in milliseconds.
+ return static_cast<int>(1000ll * static_cast<int64_t>(packet_size) * 8ll /
+ bitrate_bps);
+}
+} // namespace
+
+BitrateProber::BitrateProber()
+ : probing_state_(kDisabled),
+ packet_size_last_send_(0),
+ time_last_send_ms_(-1) {
+}
+
+void BitrateProber::SetEnabled(bool enable) {
+ if (enable) {
+ if (probing_state_ == kDisabled) {
+ probing_state_ = kAllowedToProbe;
+ LOG(LS_INFO) << "Initial bandwidth probing enabled";
+ }
+ } else {
+ probing_state_ = kDisabled;
+ LOG(LS_INFO) << "Initial bandwidth probing disabled";
+ }
+}
+
+bool BitrateProber::IsProbing() const {
+ return probing_state_ == kProbing;
+}
+
+void BitrateProber::MaybeInitializeProbe(int bitrate_bps) {
+ if (probing_state_ != kAllowedToProbe)
+ return;
+ probe_bitrates_.clear();
+ // Max number of packets used for probing.
+ const int kMaxNumProbes = 2;
+ const int kPacketsPerProbe = 5;
+ const float kProbeBitrateMultipliers[kMaxNumProbes] = {3, 6};
+ int bitrates_bps[kMaxNumProbes];
+ std::stringstream bitrate_log;
+ bitrate_log << "Start probing for bandwidth, bitrates:";
+ for (int i = 0; i < kMaxNumProbes; ++i) {
+ bitrates_bps[i] = kProbeBitrateMultipliers[i] * bitrate_bps;
+ bitrate_log << " " << bitrates_bps[i];
+ // We need one extra to get 5 deltas for the first probe.
+ if (i == 0)
+ probe_bitrates_.push_back(bitrates_bps[i]);
+ for (int j = 0; j < kPacketsPerProbe; ++j)
+ probe_bitrates_.push_back(bitrates_bps[i]);
+ }
+ bitrate_log << ", num packets: " << probe_bitrates_.size();
+ LOG(LS_INFO) << bitrate_log.str().c_str();
+ probing_state_ = kProbing;
+}
+
+int BitrateProber::TimeUntilNextProbe(int64_t now_ms) {
+ if (probing_state_ != kDisabled && probe_bitrates_.empty()) {
+ probing_state_ = kWait;
+ }
+ if (probe_bitrates_.empty()) {
+ // No probe started, or waiting for next probe.
+ return -1;
+ }
+ int64_t elapsed_time_ms = now_ms - time_last_send_ms_;
+ // We will send the first probe packet immediately if no packet has been
+ // sent before.
+ int time_until_probe_ms = 0;
+ if (packet_size_last_send_ > PacedSender::kMinProbePacketSize &&
+ probing_state_ == kProbing) {
+ int next_delta_ms = ComputeDeltaFromBitrate(packet_size_last_send_,
+ probe_bitrates_.front());
+ time_until_probe_ms = next_delta_ms - elapsed_time_ms;
+ // There is no point in trying to probe with less than 1 ms between packets
+ // as it essentially means trying to probe at infinite bandwidth.
+ const int kMinProbeDeltaMs = 1;
+ // If we have waited more than 3 ms for a new packet to probe with we will
+ // consider this probing session over.
+ const int kMaxProbeDelayMs = 3;
+ if (next_delta_ms < kMinProbeDeltaMs ||
+ time_until_probe_ms < -kMaxProbeDelayMs) {
+ // We currently disable probing after the first probe, as we only want
+ // to probe at the beginning of a connection. We should set this to
+ // kWait if we later want to probe periodically.
+ probing_state_ = kWait;
+ LOG(LS_INFO) << "Next delta too small, stop probing.";
+ time_until_probe_ms = 0;
+ }
+ }
+ return std::max(time_until_probe_ms, 0);
+}
+
+size_t BitrateProber::RecommendedPacketSize() const {
+ return packet_size_last_send_;
+}
+
+void BitrateProber::PacketSent(int64_t now_ms, size_t packet_size) {
+ assert(packet_size > 0);
+ packet_size_last_send_ = packet_size;
+ time_last_send_ms_ = now_ms;
+ if (probing_state_ != kProbing)
+ return;
+ if (!probe_bitrates_.empty())
+ probe_bitrates_.pop_front();
+}
+} // namespace webrtc
diff --git a/webrtc/modules/pacing/bitrate_prober.h b/webrtc/modules/pacing/bitrate_prober.h
new file mode 100644
index 0000000000..b3f52afeb6
--- /dev/null
+++ b/webrtc/modules/pacing/bitrate_prober.h
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_MODULES_PACING_BITRATE_PROBER_H_
+#define WEBRTC_MODULES_PACING_BITRATE_PROBER_H_
+
+#include <cstddef>
+#include <list>
+
+#include "webrtc/typedefs.h"
+
+namespace webrtc {
+
+// Note that this class isn't thread-safe by itself and therefore relies
+// on being protected by the caller.
+class BitrateProber {
+ public:
+ BitrateProber();
+
+ void SetEnabled(bool enable);
+
+ // Returns true if the prober is in a probing session, i.e., it currently
+ // wants packets to be sent out according to the time returned by
+ // TimeUntilNextProbe().
+ bool IsProbing() const;
+
+ // Initializes a new probing session if the prober is allowed to probe.
+ void MaybeInitializeProbe(int bitrate_bps);
+
+ // Returns the number of milliseconds until the next packet should be sent to
+ // get accurate probing.
+ int TimeUntilNextProbe(int64_t now_ms);
+
+ // Returns the number of bytes that the prober recommends for the next probe
+ // packet.
+ size_t RecommendedPacketSize() const;
+
+ // Called to report to the prober that a packet has been sent, which helps the
+ // prober know when to move to the next packet in a probe.
+ void PacketSent(int64_t now_ms, size_t packet_size);
+
+ private:
+ enum ProbingState { kDisabled, kAllowedToProbe, kProbing, kWait };
+
+ ProbingState probing_state_;
+ // Probe bitrate per packet. These are used to compute the delta relative to
+ // the previous probe packet based on the size and time when that packet was
+ // sent.
+ std::list<int> probe_bitrates_;
+ size_t packet_size_last_send_;
+ int64_t time_last_send_ms_;
+};
+} // namespace webrtc
+#endif // WEBRTC_MODULES_PACING_BITRATE_PROBER_H_
diff --git a/webrtc/modules/pacing/bitrate_prober_unittest.cc b/webrtc/modules/pacing/bitrate_prober_unittest.cc
new file mode 100644
index 0000000000..c966f5cfa8
--- /dev/null
+++ b/webrtc/modules/pacing/bitrate_prober_unittest.cc
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include <limits>
+
+#include "testing/gtest/include/gtest/gtest.h"
+#include "webrtc/modules/pacing/bitrate_prober.h"
+
+namespace webrtc {
+
+TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) {
+ BitrateProber prober;
+ EXPECT_FALSE(prober.IsProbing());
+ int64_t now_ms = 0;
+ EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
+
+ prober.SetEnabled(true);
+ EXPECT_FALSE(prober.IsProbing());
+
+ prober.MaybeInitializeProbe(300000);
+ EXPECT_TRUE(prober.IsProbing());
+
+ EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
+ prober.PacketSent(now_ms, 1000);
+
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_EQ(8, prober.TimeUntilNextProbe(now_ms));
+ now_ms += 4;
+ EXPECT_EQ(4, prober.TimeUntilNextProbe(now_ms));
+ now_ms += 4;
+ EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
+ prober.PacketSent(now_ms, 1000);
+ }
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_EQ(4, prober.TimeUntilNextProbe(now_ms));
+ now_ms += 4;
+ EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
+ prober.PacketSent(now_ms, 1000);
+ }
+
+ EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
+ EXPECT_FALSE(prober.IsProbing());
+}
+} // namespace webrtc
diff --git a/webrtc/modules/pacing/include/mock/mock_paced_sender.h b/webrtc/modules/pacing/include/mock/mock_paced_sender.h
new file mode 100644
index 0000000000..b2cefdff8b
--- /dev/null
+++ b/webrtc/modules/pacing/include/mock/mock_paced_sender.h
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_
+#define WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_
+
+#include "testing/gmock/include/gmock/gmock.h"
+
+#include <vector>
+
+#include "webrtc/modules/pacing/include/paced_sender.h"
+#include "webrtc/system_wrappers/include/clock.h"
+
+namespace webrtc {
+
+class MockPacedSender : public PacedSender {
+ public:
+ MockPacedSender() : PacedSender(Clock::GetRealTimeClock(), NULL, 0, 0, 0) {}
+ MOCK_METHOD6(SendPacket, bool(Priority priority,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ size_t bytes,
+ bool retransmission));
+ MOCK_CONST_METHOD0(QueueInMs, int64_t());
+ MOCK_CONST_METHOD0(QueueInPackets, int());
+};
+
+} // namespace webrtc
+
+#endif // WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_
diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h
new file mode 100644
index 0000000000..f142f55173
--- /dev/null
+++ b/webrtc/modules/pacing/include/paced_sender.h
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_MODULES_PACING_INCLUDE_PACED_SENDER_H_
+#define WEBRTC_MODULES_PACING_INCLUDE_PACED_SENDER_H_
+
+#include <list>
+#include <set>
+
+#include "webrtc/base/scoped_ptr.h"
+#include "webrtc/base/thread_annotations.h"
+#include "webrtc/modules/interface/module.h"
+#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp_defines.h"
+#include "webrtc/typedefs.h"
+
+namespace webrtc {
+class BitrateProber;
+class Clock;
+class CriticalSectionWrapper;
+
+namespace paced_sender {
+class IntervalBudget;
+struct Packet;
+class PacketQueue;
+} // namespace paced_sender
+
+class PacedSender : public Module, public RtpPacketSender {
+ public:
+ class Callback {
+ public:
+ // Note: packets sent as a result of a callback should not pass by this
+ // module again.
+ // Called when it's time to send a queued packet.
+ // Returns false if packet cannot be sent.
+ virtual bool TimeToSendPacket(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ bool retransmission) = 0;
+ // Called when it's a good time to send a padding data.
+ // Returns the number of bytes sent.
+ virtual size_t TimeToSendPadding(size_t bytes) = 0;
+
+ protected:
+ virtual ~Callback() {}
+ };
+
+ static const int64_t kDefaultMaxQueueLengthMs = 2000;
+ // Pace in kbits/s until we receive first estimate.
+ static const int kDefaultInitialPaceKbps = 2000;
+ // Pacing-rate relative to our target send rate.
+ // Multiplicative factor that is applied to the target bitrate to calculate
+ // the number of bytes that can be transmitted per interval.
+ // Increasing this factor will result in lower delays in cases of bitrate
+ // overshoots from the encoder.
+ static const float kDefaultPaceMultiplier;
+
+ static const size_t kMinProbePacketSize = 200;
+
+ PacedSender(Clock* clock,
+ Callback* callback,
+ int bitrate_kbps,
+ int max_bitrate_kbps,
+ int min_bitrate_kbps);
+
+ virtual ~PacedSender();
+
+ // Temporarily pause all sending.
+ void Pause();
+
+ // Resume sending packets.
+ void Resume();
+
+ // Enable bitrate probing. Enabled by default, mostly here to simplify
+ // testing. Must be called before any packets are being sent to have an
+ // effect.
+ void SetProbingEnabled(bool enabled);
+
+ // Set target bitrates for the pacer.
+ // We will pace out bursts of packets at a bitrate of |max_bitrate_kbps|.
+ // |bitrate_kbps| is our estimate of what we are allowed to send on average.
+ // Padding packets will be utilized to reach |min_bitrate| unless enough media
+ // packets are available.
+ void UpdateBitrate(int bitrate_kbps,
+ int max_bitrate_kbps,
+ int min_bitrate_kbps);
+
+ // Returns true if we send the packet now, else it will add the packet
+ // information to the queue and call TimeToSendPacket when it's time to send.
+ void InsertPacket(RtpPacketSender::Priority priority,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ size_t bytes,
+ bool retransmission) override;
+
+ // Returns the time since the oldest queued packet was enqueued.
+ virtual int64_t QueueInMs() const;
+
+ virtual size_t QueueSizePackets() const;
+
+ // Returns the number of milliseconds it will take to send the current
+ // packets in the queue, given the current size and bitrate, ignoring prio.
+ virtual int64_t ExpectedQueueTimeMs() const;
+
+ // Returns the number of milliseconds until the module want a worker thread
+ // to call Process.
+ int64_t TimeUntilNextProcess() override;
+
+ // Process any pending packets in the queue(s).
+ int32_t Process() override;
+
+ private:
+ // Updates the number of bytes that can be sent for the next time interval.
+ void UpdateBytesPerInterval(int64_t delta_time_in_ms)
+ EXCLUSIVE_LOCKS_REQUIRED(critsect_);
+
+ bool SendPacket(const paced_sender::Packet& packet)
+ EXCLUSIVE_LOCKS_REQUIRED(critsect_);
+ void SendPadding(size_t padding_needed) EXCLUSIVE_LOCKS_REQUIRED(critsect_);
+
+ Clock* const clock_;
+ Callback* const callback_;
+
+ rtc::scoped_ptr<CriticalSectionWrapper> critsect_;
+ bool paused_ GUARDED_BY(critsect_);
+ bool probing_enabled_;
+ // This is the media budget, keeping track of how many bits of media
+ // we can pace out during the current interval.
+ rtc::scoped_ptr<paced_sender::IntervalBudget> media_budget_
+ GUARDED_BY(critsect_);
+ // This is the padding budget, keeping track of how many bits of padding we're
+ // allowed to send out during the current interval. This budget will be
+ // utilized when there's no media to send.
+ rtc::scoped_ptr<paced_sender::IntervalBudget> padding_budget_
+ GUARDED_BY(critsect_);
+
+ rtc::scoped_ptr<BitrateProber> prober_ GUARDED_BY(critsect_);
+ int bitrate_bps_ GUARDED_BY(critsect_);
+
+ int64_t time_last_update_us_ GUARDED_BY(critsect_);
+
+ rtc::scoped_ptr<paced_sender::PacketQueue> packets_ GUARDED_BY(critsect_);
+ uint64_t packet_counter_;
+};
+} // namespace webrtc
+#endif // WEBRTC_MODULES_PACING_INCLUDE_PACED_SENDER_H_
diff --git a/webrtc/modules/pacing/include/packet_router.h b/webrtc/modules/pacing/include/packet_router.h
new file mode 100644
index 0000000000..9d461d13a9
--- /dev/null
+++ b/webrtc/modules/pacing/include/packet_router.h
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_MODULES_PACING_INCLUDE_PACKET_ROUTER_H_
+#define WEBRTC_MODULES_PACING_INCLUDE_PACKET_ROUTER_H_
+
+#include <list>
+
+#include "webrtc/base/constructormagic.h"
+#include "webrtc/base/criticalsection.h"
+#include "webrtc/base/scoped_ptr.h"
+#include "webrtc/base/thread_annotations.h"
+#include "webrtc/common_types.h"
+#include "webrtc/modules/pacing/include/paced_sender.h"
+#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp_defines.h"
+
+namespace webrtc {
+
+class RtpRtcp;
+namespace rtcp {
+class TransportFeedback;
+} // namespace rtcp
+
+// PacketRouter routes outgoing data to the correct sending RTP module, based
+// on the simulcast layer in RTPVideoHeader.
+class PacketRouter : public PacedSender::Callback,
+ public TransportSequenceNumberAllocator {
+ public:
+ PacketRouter();
+ virtual ~PacketRouter();
+
+ void AddRtpModule(RtpRtcp* rtp_module);
+ void RemoveRtpModule(RtpRtcp* rtp_module);
+
+ // Implements PacedSender::Callback.
+ bool TimeToSendPacket(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_timestamp,
+ bool retransmission) override;
+
+ size_t TimeToSendPadding(size_t bytes) override;
+
+ void SetTransportWideSequenceNumber(uint16_t sequence_number);
+ uint16_t AllocateSequenceNumber() override;
+
+ // Send transport feedback packet to send-side.
+ virtual bool SendFeedback(rtcp::TransportFeedback* packet);
+
+ private:
+ rtc::CriticalSection modules_lock_;
+ // Map from ssrc to sending rtp module.
+ std::list<RtpRtcp*> rtp_modules_ GUARDED_BY(modules_lock_);
+
+ volatile int transport_seq_;
+
+ RTC_DISALLOW_COPY_AND_ASSIGN(PacketRouter);
+};
+} // namespace webrtc
+#endif // WEBRTC_MODULES_PACING_INCLUDE_PACKET_ROUTER_H_
diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc
new file mode 100644
index 0000000000..5d7ae17b23
--- /dev/null
+++ b/webrtc/modules/pacing/paced_sender.cc
@@ -0,0 +1,398 @@
+/*
+ * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/modules/pacing/include/paced_sender.h"
+
+#include <assert.h>
+
+#include <map>
+#include <queue>
+#include <set>
+
+#include "webrtc/modules/interface/module_common_types.h"
+#include "webrtc/modules/pacing/bitrate_prober.h"
+#include "webrtc/system_wrappers/include/clock.h"
+#include "webrtc/system_wrappers/include/critical_section_wrapper.h"
+#include "webrtc/system_wrappers/include/field_trial.h"
+#include "webrtc/system_wrappers/include/logging.h"
+
+namespace {
+// Time limit in milliseconds between packet bursts.
+const int64_t kMinPacketLimitMs = 5;
+
+// Upper cap on process interval, in case process has not been called in a long
+// time.
+const int64_t kMaxIntervalTimeMs = 30;
+
+} // namespace
+
+namespace webrtc {
+namespace paced_sender {
+struct Packet {
+ Packet(RtpPacketSender::Priority priority,
+ uint32_t ssrc,
+ uint16_t seq_number,
+ int64_t capture_time_ms,
+ int64_t enqueue_time_ms,
+ size_t length_in_bytes,
+ bool retransmission,
+ uint64_t enqueue_order)
+ : priority(priority),
+ ssrc(ssrc),
+ sequence_number(seq_number),
+ capture_time_ms(capture_time_ms),
+ enqueue_time_ms(enqueue_time_ms),
+ bytes(length_in_bytes),
+ retransmission(retransmission),
+ enqueue_order(enqueue_order) {}
+
+ RtpPacketSender::Priority priority;
+ uint32_t ssrc;
+ uint16_t sequence_number;
+ int64_t capture_time_ms;
+ int64_t enqueue_time_ms;
+ size_t bytes;
+ bool retransmission;
+ uint64_t enqueue_order;
+ std::list<Packet>::iterator this_it;
+};
+
+// Used by priority queue to sort packets.
+struct Comparator {
+ bool operator()(const Packet* first, const Packet* second) {
+ // Highest prio = 0.
+ if (first->priority != second->priority)
+ return first->priority > second->priority;
+
+ // Retransmissions go first.
+ if (second->retransmission && !first->retransmission)
+ return true;
+
+ // Older frames have higher prio.
+ if (first->capture_time_ms != second->capture_time_ms)
+ return first->capture_time_ms > second->capture_time_ms;
+
+ return first->enqueue_order > second->enqueue_order;
+ }
+};
+
+// Class encapsulating a priority queue with some extensions.
+class PacketQueue {
+ public:
+ PacketQueue() : bytes_(0) {}
+ virtual ~PacketQueue() {}
+
+ void Push(const Packet& packet) {
+ if (!AddToDupeSet(packet)) {
+ return;
+ }
+ // Store packet in list, use pointers in priority queue for cheaper moves.
+ // Packets have a handle to its own iterator in the list, for easy removal
+ // when popping from queue.
+ packet_list_.push_front(packet);
+ std::list<Packet>::iterator it = packet_list_.begin();
+ it->this_it = it; // Handle for direct removal from list.
+ prio_queue_.push(&(*it)); // Pointer into list.
+ bytes_ += packet.bytes;
+ }
+
+ const Packet& BeginPop() {
+ const Packet& packet = *prio_queue_.top();
+ prio_queue_.pop();
+ return packet;
+ }
+
+ void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
+
+ void FinalizePop(const Packet& packet) {
+ RemoveFromDupeSet(packet);
+ bytes_ -= packet.bytes;
+ packet_list_.erase(packet.this_it);
+ }
+
+ bool Empty() const { return prio_queue_.empty(); }
+
+ size_t SizeInPackets() const { return prio_queue_.size(); }
+
+ uint64_t SizeInBytes() const { return bytes_; }
+
+ int64_t OldestEnqueueTime() const {
+ std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin();
+ if (it == packet_list_.rend())
+ return 0;
+ return it->enqueue_time_ms;
+ }
+
+ private:
+ // Try to add a packet to the set of ssrc/seqno identifiers currently in the
+ // queue. Return true if inserted, false if this is a duplicate.
+ bool AddToDupeSet(const Packet& packet) {
+ SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+ if (it == dupe_map_.end()) {
+ // First for this ssrc, just insert.
+ dupe_map_[packet.ssrc].insert(packet.sequence_number);
+ return true;
+ }
+
+ // Insert returns a pair, where second is a bool set to true if new element.
+ return it->second.insert(packet.sequence_number).second;
+ }
+
+ void RemoveFromDupeSet(const Packet& packet) {
+ SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+ assert(it != dupe_map_.end());
+ it->second.erase(packet.sequence_number);
+ if (it->second.empty()) {
+ dupe_map_.erase(it);
+ }
+ }
+
+ // List of packets, in the order the were enqueued. Since dequeueing may
+ // occur out of order, use list instead of vector.
+ std::list<Packet> packet_list_;
+ // Priority queue of the packets, sorted according to Comparator.
+ // Use pointers into list, to avoid moving whole struct within heap.
+ std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
+ // Total number of bytes in the queue.
+ uint64_t bytes_;
+ // Map<ssrc, set<seq_no> >, for checking duplicates.
+ typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
+ SsrcSeqNoMap dupe_map_;
+};
+
+class IntervalBudget {
+ public:
+ explicit IntervalBudget(int initial_target_rate_kbps)
+ : target_rate_kbps_(initial_target_rate_kbps),
+ bytes_remaining_(0) {}
+
+ void set_target_rate_kbps(int target_rate_kbps) {
+ target_rate_kbps_ = target_rate_kbps;
+ bytes_remaining_ =
+ std::max(-kWindowMs * target_rate_kbps_ / 8, bytes_remaining_);
+ }
+
+ void IncreaseBudget(int64_t delta_time_ms) {
+ int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;
+ if (bytes_remaining_ < 0) {
+ // We overused last interval, compensate this interval.
+ bytes_remaining_ = bytes_remaining_ + bytes;
+ } else {
+ // If we underused last interval we can't use it this interval.
+ bytes_remaining_ = bytes;
+ }
+ }
+
+ void UseBudget(size_t bytes) {
+ bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes),
+ -kWindowMs * target_rate_kbps_ / 8);
+ }
+
+ size_t bytes_remaining() const {
+ return static_cast<size_t>(std::max(0, bytes_remaining_));
+ }
+
+ int target_rate_kbps() const { return target_rate_kbps_; }
+
+ private:
+ static const int kWindowMs = 500;
+
+ int target_rate_kbps_;
+ int bytes_remaining_;
+};
+} // namespace paced_sender
+
+const float PacedSender::kDefaultPaceMultiplier = 2.5f;
+
+PacedSender::PacedSender(Clock* clock,
+ Callback* callback,
+ int bitrate_kbps,
+ int max_bitrate_kbps,
+ int min_bitrate_kbps)
+ : clock_(clock),
+ callback_(callback),
+ critsect_(CriticalSectionWrapper::CreateCriticalSection()),
+ paused_(false),
+ probing_enabled_(true),
+ media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
+ padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
+ prober_(new BitrateProber()),
+ bitrate_bps_(1000 * bitrate_kbps),
+ time_last_update_us_(clock->TimeInMicroseconds()),
+ packets_(new paced_sender::PacketQueue()),
+ packet_counter_(0) {
+ UpdateBytesPerInterval(kMinPacketLimitMs);
+}
+
+PacedSender::~PacedSender() {}
+
+void PacedSender::Pause() {
+ CriticalSectionScoped cs(critsect_.get());
+ paused_ = true;
+}
+
+void PacedSender::Resume() {
+ CriticalSectionScoped cs(critsect_.get());
+ paused_ = false;
+}
+
+void PacedSender::SetProbingEnabled(bool enabled) {
+ assert(packet_counter_ == 0);
+ probing_enabled_ = enabled;
+}
+
+void PacedSender::UpdateBitrate(int bitrate_kbps,
+ int max_bitrate_kbps,
+ int min_bitrate_kbps) {
+ CriticalSectionScoped cs(critsect_.get());
+ media_budget_->set_target_rate_kbps(max_bitrate_kbps);
+ padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
+ bitrate_bps_ = 1000 * bitrate_kbps;
+}
+
+void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ size_t bytes,
+ bool retransmission) {
+ CriticalSectionScoped cs(critsect_.get());
+
+ if (probing_enabled_ && !prober_->IsProbing()) {
+ prober_->SetEnabled(true);
+ }
+ prober_->MaybeInitializeProbe(bitrate_bps_);
+
+ if (capture_time_ms < 0) {
+ capture_time_ms = clock_->TimeInMilliseconds();
+ }
+
+ packets_->Push(paced_sender::Packet(
+ priority, ssrc, sequence_number, capture_time_ms,
+ clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++));
+}
+
+int64_t PacedSender::ExpectedQueueTimeMs() const {
+ CriticalSectionScoped cs(critsect_.get());
+ int target_rate = media_budget_->target_rate_kbps();
+ assert(target_rate > 0);
+ return static_cast<int64_t>(packets_->SizeInBytes() * 8 / target_rate);
+}
+
+size_t PacedSender::QueueSizePackets() const {
+ CriticalSectionScoped cs(critsect_.get());
+ return packets_->SizeInPackets();
+}
+
+int64_t PacedSender::QueueInMs() const {
+ CriticalSectionScoped cs(critsect_.get());
+
+ int64_t oldest_packet = packets_->OldestEnqueueTime();
+ if (oldest_packet == 0)
+ return 0;
+
+ return clock_->TimeInMilliseconds() - oldest_packet;
+}
+
+int64_t PacedSender::TimeUntilNextProcess() {
+ CriticalSectionScoped cs(critsect_.get());
+ if (prober_->IsProbing()) {
+ int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
+ if (ret >= 0) {
+ return ret;
+ }
+ }
+ int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
+ int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
+ return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
+}
+
+int32_t PacedSender::Process() {
+ int64_t now_us = clock_->TimeInMicroseconds();
+ CriticalSectionScoped cs(critsect_.get());
+ int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
+ time_last_update_us_ = now_us;
+ if (paused_)
+ return 0;
+ if (elapsed_time_ms > 0) {
+ int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
+ UpdateBytesPerInterval(delta_time_ms);
+ }
+ while (!packets_->Empty()) {
+ if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) {
+ return 0;
+ }
+
+ // Since we need to release the lock in order to send, we first pop the
+ // element from the priority queue but keep it in storage, so that we can
+ // reinsert it if send fails.
+ const paced_sender::Packet& packet = packets_->BeginPop();
+ if (SendPacket(packet)) {
+ // Send succeeded, remove it from the queue.
+ packets_->FinalizePop(packet);
+ if (prober_->IsProbing()) {
+ return 0;
+ }
+ } else {
+ // Send failed, put it back into the queue.
+ packets_->CancelPop(packet);
+ return 0;
+ }
+ }
+
+ if (!packets_->Empty())
+ return 0;
+
+ size_t padding_needed;
+ if (prober_->IsProbing())
+ padding_needed = prober_->RecommendedPacketSize();
+ else
+ padding_needed = padding_budget_->bytes_remaining();
+
+ if (padding_needed > 0)
+ SendPadding(static_cast<size_t>(padding_needed));
+ return 0;
+}
+
+bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
+ critsect_->Leave();
+ const bool success = callback_->TimeToSendPacket(packet.ssrc,
+ packet.sequence_number,
+ packet.capture_time_ms,
+ packet.retransmission);
+ critsect_->Enter();
+
+ if (success) {
+ // Update media bytes sent.
+ prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes);
+ media_budget_->UseBudget(packet.bytes);
+ padding_budget_->UseBudget(packet.bytes);
+ }
+
+ return success;
+}
+
+void PacedSender::SendPadding(size_t padding_needed) {
+ critsect_->Leave();
+ size_t bytes_sent = callback_->TimeToSendPadding(padding_needed);
+ critsect_->Enter();
+
+ if (bytes_sent > 0) {
+ prober_->PacketSent(clock_->TimeInMilliseconds(), bytes_sent);
+ media_budget_->UseBudget(bytes_sent);
+ padding_budget_->UseBudget(bytes_sent);
+ }
+}
+
+void PacedSender::UpdateBytesPerInterval(int64_t delta_time_ms) {
+ media_budget_->IncreaseBudget(delta_time_ms);
+ padding_budget_->IncreaseBudget(delta_time_ms);
+}
+} // namespace webrtc
diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc
new file mode 100644
index 0000000000..c27444c5ac
--- /dev/null
+++ b/webrtc/modules/pacing/paced_sender_unittest.cc
@@ -0,0 +1,834 @@
+/*
+ * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include <list>
+
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "webrtc/modules/pacing/include/paced_sender.h"
+#include "webrtc/system_wrappers/include/clock.h"
+
+using testing::_;
+using testing::Return;
+
+namespace webrtc {
+namespace test {
+
+static const int kTargetBitrate = 800;
+static const float kPaceMultiplier = 1.5f;
+
+class MockPacedSenderCallback : public PacedSender::Callback {
+ public:
+ MOCK_METHOD4(TimeToSendPacket,
+ bool(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ bool retransmission));
+ MOCK_METHOD1(TimeToSendPadding,
+ size_t(size_t bytes));
+};
+
+class PacedSenderPadding : public PacedSender::Callback {
+ public:
+ PacedSenderPadding() : padding_sent_(0) {}
+
+ bool TimeToSendPacket(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ bool retransmission) {
+ return true;
+ }
+
+ size_t TimeToSendPadding(size_t bytes) {
+ const size_t kPaddingPacketSize = 224;
+ size_t num_packets = (bytes + kPaddingPacketSize - 1) / kPaddingPacketSize;
+ padding_sent_ += kPaddingPacketSize * num_packets;
+ return kPaddingPacketSize * num_packets;
+ }
+
+ size_t padding_sent() { return padding_sent_; }
+
+ private:
+ size_t padding_sent_;
+};
+
+class PacedSenderProbing : public PacedSender::Callback {
+ public:
+ PacedSenderProbing(const std::list<int>& expected_deltas, Clock* clock)
+ : prev_packet_time_ms_(-1),
+ expected_deltas_(expected_deltas),
+ packets_sent_(0),
+ clock_(clock) {}
+
+ bool TimeToSendPacket(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ bool retransmission) {
+ ExpectAndCountPacket();
+ return true;
+ }
+
+ size_t TimeToSendPadding(size_t bytes) {
+ ExpectAndCountPacket();
+ return bytes;
+ }
+
+ void ExpectAndCountPacket() {
+ ++packets_sent_;
+ EXPECT_FALSE(expected_deltas_.empty());
+ if (expected_deltas_.empty())
+ return;
+ int64_t now_ms = clock_->TimeInMilliseconds();
+ if (prev_packet_time_ms_ >= 0) {
+ EXPECT_EQ(expected_deltas_.front(), now_ms - prev_packet_time_ms_);
+ expected_deltas_.pop_front();
+ }
+ prev_packet_time_ms_ = now_ms;
+ }
+
+ int packets_sent() const { return packets_sent_; }
+
+ private:
+ int64_t prev_packet_time_ms_;
+ std::list<int> expected_deltas_;
+ int packets_sent_;
+ Clock* clock_;
+};
+
+class PacedSenderTest : public ::testing::Test {
+ protected:
+ PacedSenderTest() : clock_(123456) {
+ srand(0);
+ // Need to initialize PacedSender after we initialize clock.
+ send_bucket_.reset(new PacedSender(&clock_,
+ &callback_,
+ kTargetBitrate,
+ kPaceMultiplier * kTargetBitrate,
+ 0));
+ // Default to bitrate probing disabled for testing purposes. Probing tests
+ // have to enable probing, either by creating a new PacedSender instance or
+ // by calling SetProbingEnabled(true).
+ send_bucket_->SetProbingEnabled(false);
+ }
+
+ void SendAndExpectPacket(PacedSender::Priority priority,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ size_t size,
+ bool retransmission) {
+ send_bucket_->InsertPacket(priority, ssrc, sequence_number, capture_time_ms,
+ size, retransmission);
+ EXPECT_CALL(callback_,
+ TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false))
+ .Times(1)
+ .WillRepeatedly(Return(true));
+ }
+
+ SimulatedClock clock_;
+ MockPacedSenderCallback callback_;
+ rtc::scoped_ptr<PacedSender> send_bucket_;
+};
+
+TEST_F(PacedSenderTest, QueuePacket) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+ // Due to the multiplicative factor we can send 3 packets not 2 packets.
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ int64_t queued_packet_timestamp = clock_.TimeInMilliseconds();
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number, queued_packet_timestamp, 250,
+ false);
+ send_bucket_->Process();
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
+ clock_.AdvanceTimeMilliseconds(4);
+ EXPECT_EQ(1, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(1);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_CALL(
+ callback_,
+ TimeToSendPacket(ssrc, sequence_number++, queued_packet_timestamp, false))
+ .Times(1)
+ .WillRepeatedly(Return(true));
+ send_bucket_->Process();
+ sequence_number++;
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, clock_.TimeInMilliseconds(),
+ 250, false);
+ send_bucket_->Process();
+}
+
+TEST_F(PacedSenderTest, PaceQueuedPackets) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+
+ // Due to the multiplicative factor we can send 3 packets not 2 packets.
+ for (int i = 0; i < 3; ++i) {
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ }
+ for (int j = 0; j < 30; ++j) {
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, clock_.TimeInMilliseconds(),
+ 250, false);
+ }
+ send_bucket_->Process();
+ EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
+ for (int k = 0; k < 10; ++k) {
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false))
+ .Times(3)
+ .WillRepeatedly(Return(true));
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+ }
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number, clock_.TimeInMilliseconds(), 250,
+ false);
+ send_bucket_->Process();
+}
+
+TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+ uint16_t queued_sequence_number;
+
+ // Due to the multiplicative factor we can send 3 packets not 2 packets.
+ for (int i = 0; i < 3; ++i) {
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ }
+ queued_sequence_number = sequence_number;
+
+ for (int j = 0; j < 30; ++j) {
+ // Send in duplicate packets.
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number, clock_.TimeInMilliseconds(),
+ 250, false);
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, clock_.TimeInMilliseconds(),
+ 250, false);
+ }
+ EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
+ send_bucket_->Process();
+ for (int k = 0; k < 10; ++k) {
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+
+ for (int i = 0; i < 3; ++i) {
+ EXPECT_CALL(callback_,
+ TimeToSendPacket(ssrc, queued_sequence_number++, _, false))
+ .Times(1)
+ .WillRepeatedly(Return(true));
+ }
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+ }
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, clock_.TimeInMilliseconds(),
+ 250, false);
+ send_bucket_->Process();
+}
+
+TEST_F(PacedSenderTest, CanQueuePacketsWithSameSequenceNumberOnDifferentSsrcs) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+
+ // Expect packet on second ssrc to be queued and sent as well.
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc + 1,
+ sequence_number,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+
+ clock_.AdvanceTimeMilliseconds(1000);
+ send_bucket_->Process();
+}
+
+TEST_F(PacedSenderTest, Padding) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+
+ send_bucket_->UpdateBitrate(
+ kTargetBitrate, kPaceMultiplier * kTargetBitrate, kTargetBitrate);
+ // Due to the multiplicative factor we can send 3 packets not 2 packets.
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ clock_.TimeInMilliseconds(),
+ 250,
+ false);
+ // No padding is expected since we have sent too much already.
+ EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+
+ // 5 milliseconds later we have enough budget to send some padding.
+ EXPECT_CALL(callback_, TimeToSendPadding(250)).Times(1).
+ WillOnce(Return(250));
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+}
+
+TEST_F(PacedSenderTest, VerifyPaddingUpToBitrate) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = 56789;
+ const int kTimeStep = 5;
+ const int64_t kBitrateWindow = 100;
+ send_bucket_->UpdateBitrate(
+ kTargetBitrate, kPaceMultiplier * kTargetBitrate, kTargetBitrate);
+ int64_t start_time = clock_.TimeInMilliseconds();
+ while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) {
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ capture_time_ms,
+ 250,
+ false);
+ clock_.AdvanceTimeMilliseconds(kTimeStep);
+ EXPECT_CALL(callback_, TimeToSendPadding(250)).Times(1).
+ WillOnce(Return(250));
+ send_bucket_->Process();
+ }
+}
+
+TEST_F(PacedSenderTest, VerifyAverageBitrateVaryingMediaPayload) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = 56789;
+ const int kTimeStep = 5;
+ const int64_t kBitrateWindow = 10000;
+ PacedSenderPadding callback;
+ send_bucket_.reset(new PacedSender(
+ &clock_, &callback, kTargetBitrate, kPaceMultiplier * kTargetBitrate, 0));
+ send_bucket_->SetProbingEnabled(false);
+ send_bucket_->UpdateBitrate(
+ kTargetBitrate, kPaceMultiplier * kTargetBitrate, kTargetBitrate);
+ int64_t start_time = clock_.TimeInMilliseconds();
+ size_t media_bytes = 0;
+ while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) {
+ size_t media_payload = rand() % 100 + 200; // [200, 300] bytes.
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, capture_time_ms,
+ media_payload, false);
+ media_bytes += media_payload;
+ clock_.AdvanceTimeMilliseconds(kTimeStep);
+ send_bucket_->Process();
+ }
+ EXPECT_NEAR(kTargetBitrate,
+ static_cast<int>(8 * (media_bytes + callback.padding_sent()) /
+ kBitrateWindow), 1);
+}
+
+TEST_F(PacedSenderTest, Priority) {
+ uint32_t ssrc_low_priority = 12345;
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = 56789;
+ int64_t capture_time_ms_low_priority = 1234567;
+
+ // Due to the multiplicative factor we can send 3 packets not 2 packets.
+ SendAndExpectPacket(PacedSender::kLowPriority,
+ ssrc,
+ sequence_number++,
+ capture_time_ms,
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ capture_time_ms,
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ capture_time_ms,
+ 250,
+ false);
+ send_bucket_->Process();
+
+ // Expect normal and low priority to be queued and high to pass through.
+ send_bucket_->InsertPacket(PacedSender::kLowPriority, ssrc_low_priority,
+ sequence_number++, capture_time_ms_low_priority,
+ 250, false);
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, capture_time_ms, 250, false);
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, capture_time_ms, 250, false);
+ send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc,
+ sequence_number++, capture_time_ms, 250, false);
+
+ // Expect all high and normal priority to be sent out first.
+ EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms, false))
+ .Times(3)
+ .WillRepeatedly(Return(true));
+
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+
+ EXPECT_CALL(callback_,
+ TimeToSendPacket(
+ ssrc_low_priority, _, capture_time_ms_low_priority, false))
+ .Times(1)
+ .WillRepeatedly(Return(true));
+
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+}
+
+TEST_F(PacedSenderTest, Pause) {
+ uint32_t ssrc_low_priority = 12345;
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = clock_.TimeInMilliseconds();
+
+ EXPECT_EQ(0, send_bucket_->QueueInMs());
+
+ // Due to the multiplicative factor we can send 3 packets not 2 packets.
+ SendAndExpectPacket(PacedSender::kLowPriority,
+ ssrc,
+ sequence_number++,
+ capture_time_ms,
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ capture_time_ms,
+ 250,
+ false);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number++,
+ capture_time_ms,
+ 250,
+ false);
+ send_bucket_->Process();
+
+ send_bucket_->Pause();
+
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, capture_time_ms, 250, false);
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, capture_time_ms, 250, false);
+ send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc,
+ sequence_number++, capture_time_ms, 250, false);
+
+ clock_.AdvanceTimeMilliseconds(10000);
+ int64_t second_capture_time_ms = clock_.TimeInMilliseconds();
+
+ // Expect everything to be queued.
+ send_bucket_->InsertPacket(PacedSender::kLowPriority, ssrc_low_priority,
+ sequence_number++, second_capture_time_ms, 250,
+ false);
+
+ EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms,
+ send_bucket_->QueueInMs());
+
+ // Expect no packet to come out while paused.
+ EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
+ EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _)).Times(0);
+
+ for (int i = 0; i < 10; ++i) {
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+ }
+ // Expect high prio packets to come out first followed by all packets in the
+ // way they were added.
+ EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms, false))
+ .Times(3)
+ .WillRepeatedly(Return(true));
+ send_bucket_->Resume();
+
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+
+ EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms, false))
+ .Times(1)
+ .WillRepeatedly(Return(true));
+ EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
+ EXPECT_EQ(0, send_bucket_->Process());
+ EXPECT_EQ(0, send_bucket_->QueueInMs());
+}
+
+TEST_F(PacedSenderTest, ResendPacket) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = clock_.TimeInMilliseconds();
+ EXPECT_EQ(0, send_bucket_->QueueInMs());
+
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number, capture_time_ms, 250, false);
+ clock_.AdvanceTimeMilliseconds(1);
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number + 1, capture_time_ms + 1, 250,
+ false);
+ clock_.AdvanceTimeMilliseconds(9999);
+ EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms,
+ send_bucket_->QueueInMs());
+ // Fails to send first packet so only one call.
+ EXPECT_CALL(callback_,
+ TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false))
+ .Times(1)
+ .WillOnce(Return(false));
+ clock_.AdvanceTimeMilliseconds(10000);
+ send_bucket_->Process();
+
+ // Queue remains unchanged.
+ EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms,
+ send_bucket_->QueueInMs());
+
+ // Fails to send second packet.
+ EXPECT_CALL(callback_,
+ TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false))
+ .Times(1)
+ .WillOnce(Return(true));
+ EXPECT_CALL(
+ callback_,
+ TimeToSendPacket(ssrc, sequence_number + 1, capture_time_ms + 1, false))
+ .Times(1)
+ .WillOnce(Return(false));
+ clock_.AdvanceTimeMilliseconds(10000);
+ send_bucket_->Process();
+
+ // Queue is reduced by 1 packet.
+ EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms - 1,
+ send_bucket_->QueueInMs());
+
+ // Send second packet and queue becomes empty.
+ EXPECT_CALL(
+ callback_,
+ TimeToSendPacket(ssrc, sequence_number + 1, capture_time_ms + 1, false))
+ .Times(1)
+ .WillOnce(Return(true));
+ clock_.AdvanceTimeMilliseconds(10000);
+ send_bucket_->Process();
+ EXPECT_EQ(0, send_bucket_->QueueInMs());
+}
+
+TEST_F(PacedSenderTest, ExpectedQueueTimeMs) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const size_t kNumPackets = 60;
+ const size_t kPacketSize = 1200;
+ const int32_t kMaxBitrate = kPaceMultiplier * 30;
+ EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs());
+
+ send_bucket_->UpdateBitrate(30, kMaxBitrate, 0);
+ for (size_t i = 0; i < kNumPackets; ++i) {
+ SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize, false);
+ }
+
+ // Queue in ms = 1000 * (bytes in queue) / (kbit per second * 1000 / 8)
+ int64_t queue_in_ms =
+ static_cast<int64_t>(kNumPackets * kPacketSize * 8 / kMaxBitrate);
+ EXPECT_EQ(queue_in_ms, send_bucket_->ExpectedQueueTimeMs());
+
+ int64_t time_start = clock_.TimeInMilliseconds();
+ while (send_bucket_->QueueSizePackets() > 0) {
+ int time_until_process = send_bucket_->TimeUntilNextProcess();
+ if (time_until_process <= 0) {
+ send_bucket_->Process();
+ } else {
+ clock_.AdvanceTimeMilliseconds(time_until_process);
+ }
+ }
+ int64_t duration = clock_.TimeInMilliseconds() - time_start;
+
+ EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs());
+
+ // Allow for aliasing, duration should be in [expected(n - 1), expected(n)].
+ EXPECT_LE(duration, queue_in_ms);
+ EXPECT_GE(duration,
+ queue_in_ms - static_cast<int64_t>(kPacketSize * 8 / kMaxBitrate));
+}
+
+TEST_F(PacedSenderTest, QueueTimeGrowsOverTime) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ EXPECT_EQ(0, send_bucket_->QueueInMs());
+
+ send_bucket_->UpdateBitrate(30, kPaceMultiplier * 30, 0);
+ SendAndExpectPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number,
+ clock_.TimeInMilliseconds(),
+ 1200,
+ false);
+
+ clock_.AdvanceTimeMilliseconds(500);
+ EXPECT_EQ(500, send_bucket_->QueueInMs());
+ send_bucket_->Process();
+ EXPECT_EQ(0, send_bucket_->QueueInMs());
+}
+
+TEST_F(PacedSenderTest, ProbingWithInitialFrame) {
+ const int kNumPackets = 11;
+ const int kNumDeltas = kNumPackets - 1;
+ const size_t kPacketSize = 1200;
+ const int kInitialBitrateKbps = 300;
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const int expected_deltas[kNumDeltas] = {
+ 10, 10, 10, 10, 10, 5, 5, 5, 5, 5};
+ std::list<int> expected_deltas_list(expected_deltas,
+ expected_deltas + kNumPackets - 1);
+ PacedSenderProbing callback(expected_deltas_list, &clock_);
+ send_bucket_.reset(
+ new PacedSender(&clock_,
+ &callback,
+ kInitialBitrateKbps,
+ kPaceMultiplier * kInitialBitrateKbps,
+ 0));
+
+ for (int i = 0; i < kNumPackets; ++i) {
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, clock_.TimeInMilliseconds(),
+ kPacketSize, false);
+ }
+ while (callback.packets_sent() < kNumPackets) {
+ int time_until_process = send_bucket_->TimeUntilNextProcess();
+ if (time_until_process <= 0) {
+ send_bucket_->Process();
+ } else {
+ clock_.AdvanceTimeMilliseconds(time_until_process);
+ }
+ }
+}
+
+TEST_F(PacedSenderTest, ProbingWithTooSmallInitialFrame) {
+ const int kNumPackets = 11;
+ const int kNumDeltas = kNumPackets - 1;
+ const size_t kPacketSize = 1200;
+ const int kInitialBitrateKbps = 300;
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const int expected_deltas[kNumDeltas] = {10, 10, 10, 10, 10, 5, 5, 5, 5, 5};
+ std::list<int> expected_deltas_list(expected_deltas,
+ expected_deltas + kNumPackets - 1);
+ PacedSenderProbing callback(expected_deltas_list, &clock_);
+ send_bucket_.reset(new PacedSender(&clock_, &callback, kInitialBitrateKbps,
+ kPaceMultiplier * kInitialBitrateKbps, 0));
+
+ for (int i = 0; i < kNumPackets - 5; ++i) {
+ send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
+ sequence_number++, clock_.TimeInMilliseconds(),
+ kPacketSize, false);
+ }
+ while (callback.packets_sent() < kNumPackets) {
+ int time_until_process = send_bucket_->TimeUntilNextProcess();
+ if (time_until_process <= 0) {
+ send_bucket_->Process();
+ } else {
+ clock_.AdvanceTimeMilliseconds(time_until_process);
+ }
+ }
+
+ // Process one more time and make sure we don't send any more probes.
+ int time_until_process = send_bucket_->TimeUntilNextProcess();
+ clock_.AdvanceTimeMilliseconds(time_until_process);
+ send_bucket_->Process();
+ EXPECT_EQ(kNumPackets, callback.packets_sent());
+}
+
+TEST_F(PacedSenderTest, PriorityInversion) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const size_t kPacketSize = 1200;
+
+ send_bucket_->InsertPacket(
+ PacedSender::kHighPriority, ssrc, sequence_number + 3,
+ clock_.TimeInMilliseconds() + 33, kPacketSize, true);
+
+ send_bucket_->InsertPacket(
+ PacedSender::kHighPriority, ssrc, sequence_number + 2,
+ clock_.TimeInMilliseconds() + 33, kPacketSize, true);
+
+ send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, sequence_number,
+ clock_.TimeInMilliseconds(), kPacketSize, true);
+
+ send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc,
+ sequence_number + 1, clock_.TimeInMilliseconds(),
+ kPacketSize, true);
+
+ // Packets from earlier frames should be sent first.
+ {
+ ::testing::InSequence sequence;
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number,
+ clock_.TimeInMilliseconds(), true))
+ .WillOnce(Return(true));
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1,
+ clock_.TimeInMilliseconds(), true))
+ .WillOnce(Return(true));
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 3,
+ clock_.TimeInMilliseconds() + 33,
+ true)).WillOnce(Return(true));
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 2,
+ clock_.TimeInMilliseconds() + 33,
+ true)).WillOnce(Return(true));
+
+ while (send_bucket_->QueueSizePackets() > 0) {
+ int time_until_process = send_bucket_->TimeUntilNextProcess();
+ if (time_until_process <= 0) {
+ send_bucket_->Process();
+ } else {
+ clock_.AdvanceTimeMilliseconds(time_until_process);
+ }
+ }
+ }
+}
+
+TEST_F(PacedSenderTest, PaddingOveruse) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const size_t kPacketSize = 1200;
+
+ // Min bitrate 0 => no padding, padding budget will stay at 0.
+ send_bucket_->UpdateBitrate(60, 90, 0);
+ SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize, false);
+ send_bucket_->Process();
+
+ // Add 30kbit padding. When increasing budget, media budget will increase from
+ // negative (overuse) while padding budget will increase form 0.
+ clock_.AdvanceTimeMilliseconds(5);
+ send_bucket_->UpdateBitrate(60, 90, 30);
+
+ send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc,
+ sequence_number++, clock_.TimeInMilliseconds(),
+ kPacketSize, false);
+
+ // Don't send padding if queue is non-empty, even if padding budget > 0.
+ EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
+ send_bucket_->Process();
+}
+
+} // namespace test
+} // namespace webrtc
diff --git a/webrtc/modules/pacing/pacing.gypi b/webrtc/modules/pacing/pacing.gypi
new file mode 100644
index 0000000000..faa97841c1
--- /dev/null
+++ b/webrtc/modules/pacing/pacing.gypi
@@ -0,0 +1,29 @@
+# Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
+#
+# Use of this source code is governed by a BSD-style license
+# that can be found in the LICENSE file in the root of the source
+# tree. An additional intellectual property rights grant can be found
+# in the file PATENTS. All contributing project authors may
+# be found in the AUTHORS file in the root of the source tree.
+
+{
+ 'targets': [
+ {
+ 'target_name': 'paced_sender',
+ 'type': 'static_library',
+ 'dependencies': [
+ '<(webrtc_root)/system_wrappers/system_wrappers.gyp:system_wrappers',
+ '<(webrtc_root)/modules/modules.gyp:bitrate_controller',
+ '<(webrtc_root)/modules/modules.gyp:rtp_rtcp',
+ ],
+ 'sources': [
+ 'include/paced_sender.h',
+ 'include/packet_router.h',
+ 'bitrate_prober.cc',
+ 'bitrate_prober.h',
+ 'paced_sender.cc',
+ 'packet_router.cc',
+ ],
+ },
+ ], # targets
+}
diff --git a/webrtc/modules/pacing/packet_router.cc b/webrtc/modules/pacing/packet_router.cc
new file mode 100644
index 0000000000..563773b41f
--- /dev/null
+++ b/webrtc/modules/pacing/packet_router.cc
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/modules/pacing/include/packet_router.h"
+
+#include "webrtc/base/atomicops.h"
+#include "webrtc/base/checks.h"
+#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h"
+#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp_defines.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
+
+namespace webrtc {
+
+PacketRouter::PacketRouter() : transport_seq_(0) {
+}
+
+PacketRouter::~PacketRouter() {
+ RTC_DCHECK(rtp_modules_.empty());
+}
+
+void PacketRouter::AddRtpModule(RtpRtcp* rtp_module) {
+ rtc::CritScope cs(&modules_lock_);
+ RTC_DCHECK(std::find(rtp_modules_.begin(), rtp_modules_.end(), rtp_module) ==
+ rtp_modules_.end());
+ rtp_modules_.push_back(rtp_module);
+}
+
+void PacketRouter::RemoveRtpModule(RtpRtcp* rtp_module) {
+ rtc::CritScope cs(&modules_lock_);
+ auto it = std::find(rtp_modules_.begin(), rtp_modules_.end(), rtp_module);
+ RTC_DCHECK(it != rtp_modules_.end());
+ rtp_modules_.erase(it);
+}
+
+bool PacketRouter::TimeToSendPacket(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_timestamp,
+ bool retransmission) {
+ rtc::CritScope cs(&modules_lock_);
+ for (auto* rtp_module : rtp_modules_) {
+ if (rtp_module->SendingMedia() && ssrc == rtp_module->SSRC()) {
+ return rtp_module->TimeToSendPacket(ssrc, sequence_number,
+ capture_timestamp, retransmission);
+ }
+ }
+ return true;
+}
+
+size_t PacketRouter::TimeToSendPadding(size_t bytes_to_send) {
+ size_t total_bytes_sent = 0;
+ rtc::CritScope cs(&modules_lock_);
+ for (RtpRtcp* module : rtp_modules_) {
+ if (module->SendingMedia()) {
+ size_t bytes_sent =
+ module->TimeToSendPadding(bytes_to_send - total_bytes_sent);
+ total_bytes_sent += bytes_sent;
+ if (total_bytes_sent >= bytes_to_send)
+ break;
+ }
+ }
+ return total_bytes_sent;
+}
+
+void PacketRouter::SetTransportWideSequenceNumber(uint16_t sequence_number) {
+ rtc::AtomicOps::ReleaseStore(&transport_seq_, sequence_number);
+}
+
+uint16_t PacketRouter::AllocateSequenceNumber() {
+ int prev_seq = rtc::AtomicOps::AcquireLoad(&transport_seq_);
+ int desired_prev_seq;
+ int new_seq;
+ do {
+ desired_prev_seq = prev_seq;
+ new_seq = (desired_prev_seq + 1) & 0xFFFF;
+ // Note: CompareAndSwap returns the actual value of transport_seq at the
+ // time the CAS operation was executed. Thus, if prev_seq is returned, the
+ // operation was successful - otherwise we need to retry. Saving the
+ // return value saves us a load on retry.
+ prev_seq = rtc::AtomicOps::CompareAndSwap(&transport_seq_, desired_prev_seq,
+ new_seq);
+ } while (prev_seq != desired_prev_seq);
+
+ return new_seq;
+}
+
+bool PacketRouter::SendFeedback(rtcp::TransportFeedback* packet) {
+ rtc::CritScope cs(&modules_lock_);
+ for (auto* rtp_module : rtp_modules_) {
+ packet->WithPacketSenderSsrc(rtp_module->SSRC());
+ if (rtp_module->SendFeedbackPacket(*packet))
+ return true;
+ }
+ return false;
+}
+
+} // namespace webrtc
diff --git a/webrtc/modules/pacing/packet_router_unittest.cc b/webrtc/modules/pacing/packet_router_unittest.cc
new file mode 100644
index 0000000000..eecb13757c
--- /dev/null
+++ b/webrtc/modules/pacing/packet_router_unittest.cc
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include <list>
+
+#include "webrtc/base/checks.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "webrtc/modules/pacing/include/packet_router.h"
+#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h"
+#include "webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h"
+#include "webrtc/base/scoped_ptr.h"
+
+using ::testing::_;
+using ::testing::AnyNumber;
+using ::testing::NiceMock;
+using ::testing::Return;
+
+namespace webrtc {
+
+class PacketRouterTest : public ::testing::Test {
+ public:
+ PacketRouterTest() : packet_router_(new PacketRouter()) {}
+ protected:
+ const rtc::scoped_ptr<PacketRouter> packet_router_;
+};
+
+TEST_F(PacketRouterTest, TimeToSendPacket) {
+ MockRtpRtcp rtp_1;
+ MockRtpRtcp rtp_2;
+ packet_router_->AddRtpModule(&rtp_1);
+ packet_router_->AddRtpModule(&rtp_2);
+
+ const uint16_t kSsrc1 = 1234;
+ uint16_t sequence_number = 17;
+ uint64_t timestamp = 7890;
+ bool retransmission = false;
+
+ // Send on the first module by letting rtp_1 be sending with correct ssrc.
+ EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true));
+ EXPECT_CALL(rtp_1, SSRC()).Times(1).WillOnce(Return(kSsrc1));
+ EXPECT_CALL(rtp_1, TimeToSendPacket(kSsrc1, sequence_number, timestamp,
+ retransmission))
+ .Times(1)
+ .WillOnce(Return(true));
+ EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0);
+ EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1, sequence_number,
+ timestamp, retransmission));
+
+ // Send on the second module by letting rtp_2 be sending, but not rtp_1.
+ ++sequence_number;
+ timestamp += 30;
+ retransmission = true;
+ const uint16_t kSsrc2 = 4567;
+ EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false));
+ EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true));
+ EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2));
+ EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _)).Times(0);
+ EXPECT_CALL(rtp_2, TimeToSendPacket(kSsrc2, sequence_number, timestamp,
+ retransmission))
+ .Times(1)
+ .WillOnce(Return(true));
+ EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc2, sequence_number,
+ timestamp, retransmission));
+
+ // No module is sending, hence no packet should be sent.
+ EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false));
+ EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _)).Times(0);
+ EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(false));
+ EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0);
+ EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1, sequence_number,
+ timestamp, retransmission));
+
+ // Add a packet with incorrect ssrc and test it's dropped in the router.
+ EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true));
+ EXPECT_CALL(rtp_1, SSRC()).Times(1).WillOnce(Return(kSsrc1));
+ EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true));
+ EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2));
+ EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _)).Times(0);
+ EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0);
+ EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1 + kSsrc2, sequence_number,
+ timestamp, retransmission));
+
+ packet_router_->RemoveRtpModule(&rtp_1);
+
+ // rtp_1 has been removed, try sending a packet on that ssrc and make sure
+ // it is dropped as expected by not expecting any calls to rtp_1.
+ EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true));
+ EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2));
+ EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0);
+ EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1, sequence_number,
+ timestamp, retransmission));
+
+ packet_router_->RemoveRtpModule(&rtp_2);
+}
+
+TEST_F(PacketRouterTest, TimeToSendPadding) {
+ const uint16_t kSsrc1 = 1234;
+ const uint16_t kSsrc2 = 4567;
+
+ MockRtpRtcp rtp_1;
+ EXPECT_CALL(rtp_1, SSRC()).WillRepeatedly(Return(kSsrc1));
+ MockRtpRtcp rtp_2;
+ EXPECT_CALL(rtp_2, SSRC()).WillRepeatedly(Return(kSsrc2));
+ packet_router_->AddRtpModule(&rtp_1);
+ packet_router_->AddRtpModule(&rtp_2);
+
+ // Default configuration, sending padding on all modules sending media,
+ // ordered by SSRC.
+ const size_t requested_padding_bytes = 1000;
+ const size_t sent_padding_bytes = 890;
+ EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true));
+ EXPECT_CALL(rtp_1, TimeToSendPadding(requested_padding_bytes))
+ .Times(1)
+ .WillOnce(Return(sent_padding_bytes));
+ EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true));
+ EXPECT_CALL(rtp_2,
+ TimeToSendPadding(requested_padding_bytes - sent_padding_bytes))
+ .Times(1)
+ .WillOnce(Return(requested_padding_bytes - sent_padding_bytes));
+ EXPECT_EQ(requested_padding_bytes,
+ packet_router_->TimeToSendPadding(requested_padding_bytes));
+
+ // Let only the second module be sending and verify the padding request is
+ // routed there.
+ EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false));
+ EXPECT_CALL(rtp_1, TimeToSendPadding(requested_padding_bytes)).Times(0);
+ EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true));
+ EXPECT_CALL(rtp_2, TimeToSendPadding(_))
+ .Times(1)
+ .WillOnce(Return(sent_padding_bytes));
+ EXPECT_EQ(sent_padding_bytes,
+ packet_router_->TimeToSendPadding(requested_padding_bytes));
+
+ // No sending module at all.
+ EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false));
+ EXPECT_CALL(rtp_1, TimeToSendPadding(requested_padding_bytes)).Times(0);
+ EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(false));
+ EXPECT_CALL(rtp_2, TimeToSendPadding(_)).Times(0);
+ EXPECT_EQ(0u, packet_router_->TimeToSendPadding(requested_padding_bytes));
+
+ packet_router_->RemoveRtpModule(&rtp_1);
+
+ // rtp_1 has been removed, try sending padding and make sure rtp_1 isn't asked
+ // to send by not expecting any calls. Instead verify rtp_2 is called.
+ EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true));
+ EXPECT_CALL(rtp_2, TimeToSendPadding(requested_padding_bytes)).Times(1);
+ EXPECT_EQ(0u, packet_router_->TimeToSendPadding(requested_padding_bytes));
+
+ packet_router_->RemoveRtpModule(&rtp_2);
+}
+
+TEST_F(PacketRouterTest, AllocateSequenceNumbers) {
+ const uint16_t kStartSeq = 0xFFF0;
+ const size_t kNumPackets = 32;
+
+ packet_router_->SetTransportWideSequenceNumber(kStartSeq - 1);
+
+ for (size_t i = 0; i < kNumPackets; ++i) {
+ uint16_t seq = packet_router_->AllocateSequenceNumber();
+ uint32_t expected_unwrapped_seq = static_cast<uint32_t>(kStartSeq) + i;
+ EXPECT_EQ(static_cast<uint16_t>(expected_unwrapped_seq & 0xFFFF), seq);
+ }
+}
+} // namespace webrtc