diff options
Diffstat (limited to 'webrtc/modules/pacing')
-rw-r--r-- | webrtc/modules/pacing/BUILD.gn | 33 | ||||
-rw-r--r-- | webrtc/modules/pacing/OWNERS | 10 | ||||
-rw-r--r-- | webrtc/modules/pacing/bitrate_prober.cc | 129 | ||||
-rw-r--r-- | webrtc/modules/pacing/bitrate_prober.h | 61 | ||||
-rw-r--r-- | webrtc/modules/pacing/bitrate_prober_unittest.cc | 51 | ||||
-rw-r--r-- | webrtc/modules/pacing/include/mock/mock_paced_sender.h | 38 | ||||
-rw-r--r-- | webrtc/modules/pacing/include/paced_sender.h | 153 | ||||
-rw-r--r-- | webrtc/modules/pacing/include/packet_router.h | 66 | ||||
-rw-r--r-- | webrtc/modules/pacing/paced_sender.cc | 398 | ||||
-rw-r--r-- | webrtc/modules/pacing/paced_sender_unittest.cc | 834 | ||||
-rw-r--r-- | webrtc/modules/pacing/pacing.gypi | 29 | ||||
-rw-r--r-- | webrtc/modules/pacing/packet_router.cc | 103 | ||||
-rw-r--r-- | webrtc/modules/pacing/packet_router_unittest.cc | 172 |
13 files changed, 2077 insertions, 0 deletions
diff --git a/webrtc/modules/pacing/BUILD.gn b/webrtc/modules/pacing/BUILD.gn new file mode 100644 index 0000000000..3e478c1e76 --- /dev/null +++ b/webrtc/modules/pacing/BUILD.gn @@ -0,0 +1,33 @@ +# Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. +# +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file in the root of the source +# tree. An additional intellectual property rights grant can be found +# in the file PATENTS. All contributing project authors may +# be found in the AUTHORS file in the root of the source tree. + +source_set("pacing") { + sources = [ + "bitrate_prober.cc", + "bitrate_prober.h", + "include/paced_sender.h", + "include/packet_router.h", + "paced_sender.cc", + "packet_router.cc", + ] + + configs += [ "../..:common_config" ] + public_configs = [ "../..:common_inherited_config" ] + + if (is_clang) { + # Suppress warnings from Chrome's Clang plugins. + # See http://code.google.com/p/webrtc/issues/detail?id=163 for details. + configs -= [ "//build/config/clang:find_bad_constructs" ] + } + + deps = [ + "../../system_wrappers", + "../bitrate_controller", + "../rtp_rtcp", + ] +} diff --git a/webrtc/modules/pacing/OWNERS b/webrtc/modules/pacing/OWNERS new file mode 100644 index 0000000000..bde04e2c20 --- /dev/null +++ b/webrtc/modules/pacing/OWNERS @@ -0,0 +1,10 @@ +stefan@webrtc.org +mflodman@webrtc.org +asapersson@webrtc.org + +# These are for the common case of adding or renaming files. If you're doing +# structural changes, please get a review from a reviewer in this file. +per-file *.gyp=* +per-file *.gypi=* + +per-file BUILD.gn=kjellander@webrtc.org diff --git a/webrtc/modules/pacing/bitrate_prober.cc b/webrtc/modules/pacing/bitrate_prober.cc new file mode 100644 index 0000000000..bbbe54f54e --- /dev/null +++ b/webrtc/modules/pacing/bitrate_prober.cc @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/modules/pacing/bitrate_prober.h" + +#include <assert.h> +#include <algorithm> +#include <limits> +#include <sstream> + +#include "webrtc/modules/pacing/include/paced_sender.h" +#include "webrtc/system_wrappers/include/logging.h" + +namespace webrtc { + +namespace { +int ComputeDeltaFromBitrate(size_t packet_size, int bitrate_bps) { + assert(bitrate_bps > 0); + // Compute the time delta needed to send packet_size bytes at bitrate_bps + // bps. Result is in milliseconds. + return static_cast<int>(1000ll * static_cast<int64_t>(packet_size) * 8ll / + bitrate_bps); +} +} // namespace + +BitrateProber::BitrateProber() + : probing_state_(kDisabled), + packet_size_last_send_(0), + time_last_send_ms_(-1) { +} + +void BitrateProber::SetEnabled(bool enable) { + if (enable) { + if (probing_state_ == kDisabled) { + probing_state_ = kAllowedToProbe; + LOG(LS_INFO) << "Initial bandwidth probing enabled"; + } + } else { + probing_state_ = kDisabled; + LOG(LS_INFO) << "Initial bandwidth probing disabled"; + } +} + +bool BitrateProber::IsProbing() const { + return probing_state_ == kProbing; +} + +void BitrateProber::MaybeInitializeProbe(int bitrate_bps) { + if (probing_state_ != kAllowedToProbe) + return; + probe_bitrates_.clear(); + // Max number of packets used for probing. + const int kMaxNumProbes = 2; + const int kPacketsPerProbe = 5; + const float kProbeBitrateMultipliers[kMaxNumProbes] = {3, 6}; + int bitrates_bps[kMaxNumProbes]; + std::stringstream bitrate_log; + bitrate_log << "Start probing for bandwidth, bitrates:"; + for (int i = 0; i < kMaxNumProbes; ++i) { + bitrates_bps[i] = kProbeBitrateMultipliers[i] * bitrate_bps; + bitrate_log << " " << bitrates_bps[i]; + // We need one extra to get 5 deltas for the first probe. + if (i == 0) + probe_bitrates_.push_back(bitrates_bps[i]); + for (int j = 0; j < kPacketsPerProbe; ++j) + probe_bitrates_.push_back(bitrates_bps[i]); + } + bitrate_log << ", num packets: " << probe_bitrates_.size(); + LOG(LS_INFO) << bitrate_log.str().c_str(); + probing_state_ = kProbing; +} + +int BitrateProber::TimeUntilNextProbe(int64_t now_ms) { + if (probing_state_ != kDisabled && probe_bitrates_.empty()) { + probing_state_ = kWait; + } + if (probe_bitrates_.empty()) { + // No probe started, or waiting for next probe. + return -1; + } + int64_t elapsed_time_ms = now_ms - time_last_send_ms_; + // We will send the first probe packet immediately if no packet has been + // sent before. + int time_until_probe_ms = 0; + if (packet_size_last_send_ > PacedSender::kMinProbePacketSize && + probing_state_ == kProbing) { + int next_delta_ms = ComputeDeltaFromBitrate(packet_size_last_send_, + probe_bitrates_.front()); + time_until_probe_ms = next_delta_ms - elapsed_time_ms; + // There is no point in trying to probe with less than 1 ms between packets + // as it essentially means trying to probe at infinite bandwidth. + const int kMinProbeDeltaMs = 1; + // If we have waited more than 3 ms for a new packet to probe with we will + // consider this probing session over. + const int kMaxProbeDelayMs = 3; + if (next_delta_ms < kMinProbeDeltaMs || + time_until_probe_ms < -kMaxProbeDelayMs) { + // We currently disable probing after the first probe, as we only want + // to probe at the beginning of a connection. We should set this to + // kWait if we later want to probe periodically. + probing_state_ = kWait; + LOG(LS_INFO) << "Next delta too small, stop probing."; + time_until_probe_ms = 0; + } + } + return std::max(time_until_probe_ms, 0); +} + +size_t BitrateProber::RecommendedPacketSize() const { + return packet_size_last_send_; +} + +void BitrateProber::PacketSent(int64_t now_ms, size_t packet_size) { + assert(packet_size > 0); + packet_size_last_send_ = packet_size; + time_last_send_ms_ = now_ms; + if (probing_state_ != kProbing) + return; + if (!probe_bitrates_.empty()) + probe_bitrates_.pop_front(); +} +} // namespace webrtc diff --git a/webrtc/modules/pacing/bitrate_prober.h b/webrtc/modules/pacing/bitrate_prober.h new file mode 100644 index 0000000000..b3f52afeb6 --- /dev/null +++ b/webrtc/modules/pacing/bitrate_prober.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_MODULES_PACING_BITRATE_PROBER_H_ +#define WEBRTC_MODULES_PACING_BITRATE_PROBER_H_ + +#include <cstddef> +#include <list> + +#include "webrtc/typedefs.h" + +namespace webrtc { + +// Note that this class isn't thread-safe by itself and therefore relies +// on being protected by the caller. +class BitrateProber { + public: + BitrateProber(); + + void SetEnabled(bool enable); + + // Returns true if the prober is in a probing session, i.e., it currently + // wants packets to be sent out according to the time returned by + // TimeUntilNextProbe(). + bool IsProbing() const; + + // Initializes a new probing session if the prober is allowed to probe. + void MaybeInitializeProbe(int bitrate_bps); + + // Returns the number of milliseconds until the next packet should be sent to + // get accurate probing. + int TimeUntilNextProbe(int64_t now_ms); + + // Returns the number of bytes that the prober recommends for the next probe + // packet. + size_t RecommendedPacketSize() const; + + // Called to report to the prober that a packet has been sent, which helps the + // prober know when to move to the next packet in a probe. + void PacketSent(int64_t now_ms, size_t packet_size); + + private: + enum ProbingState { kDisabled, kAllowedToProbe, kProbing, kWait }; + + ProbingState probing_state_; + // Probe bitrate per packet. These are used to compute the delta relative to + // the previous probe packet based on the size and time when that packet was + // sent. + std::list<int> probe_bitrates_; + size_t packet_size_last_send_; + int64_t time_last_send_ms_; +}; +} // namespace webrtc +#endif // WEBRTC_MODULES_PACING_BITRATE_PROBER_H_ diff --git a/webrtc/modules/pacing/bitrate_prober_unittest.cc b/webrtc/modules/pacing/bitrate_prober_unittest.cc new file mode 100644 index 0000000000..c966f5cfa8 --- /dev/null +++ b/webrtc/modules/pacing/bitrate_prober_unittest.cc @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include <limits> + +#include "testing/gtest/include/gtest/gtest.h" +#include "webrtc/modules/pacing/bitrate_prober.h" + +namespace webrtc { + +TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) { + BitrateProber prober; + EXPECT_FALSE(prober.IsProbing()); + int64_t now_ms = 0; + EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + + prober.SetEnabled(true); + EXPECT_FALSE(prober.IsProbing()); + + prober.MaybeInitializeProbe(300000); + EXPECT_TRUE(prober.IsProbing()); + + EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + prober.PacketSent(now_ms, 1000); + + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(8, prober.TimeUntilNextProbe(now_ms)); + now_ms += 4; + EXPECT_EQ(4, prober.TimeUntilNextProbe(now_ms)); + now_ms += 4; + EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + prober.PacketSent(now_ms, 1000); + } + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(4, prober.TimeUntilNextProbe(now_ms)); + now_ms += 4; + EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + prober.PacketSent(now_ms, 1000); + } + + EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + EXPECT_FALSE(prober.IsProbing()); +} +} // namespace webrtc diff --git a/webrtc/modules/pacing/include/mock/mock_paced_sender.h b/webrtc/modules/pacing/include/mock/mock_paced_sender.h new file mode 100644 index 0000000000..b2cefdff8b --- /dev/null +++ b/webrtc/modules/pacing/include/mock/mock_paced_sender.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_ +#define WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_ + +#include "testing/gmock/include/gmock/gmock.h" + +#include <vector> + +#include "webrtc/modules/pacing/include/paced_sender.h" +#include "webrtc/system_wrappers/include/clock.h" + +namespace webrtc { + +class MockPacedSender : public PacedSender { + public: + MockPacedSender() : PacedSender(Clock::GetRealTimeClock(), NULL, 0, 0, 0) {} + MOCK_METHOD6(SendPacket, bool(Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t bytes, + bool retransmission)); + MOCK_CONST_METHOD0(QueueInMs, int64_t()); + MOCK_CONST_METHOD0(QueueInPackets, int()); +}; + +} // namespace webrtc + +#endif // WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_ diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h new file mode 100644 index 0000000000..f142f55173 --- /dev/null +++ b/webrtc/modules/pacing/include/paced_sender.h @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_MODULES_PACING_INCLUDE_PACED_SENDER_H_ +#define WEBRTC_MODULES_PACING_INCLUDE_PACED_SENDER_H_ + +#include <list> +#include <set> + +#include "webrtc/base/scoped_ptr.h" +#include "webrtc/base/thread_annotations.h" +#include "webrtc/modules/interface/module.h" +#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp_defines.h" +#include "webrtc/typedefs.h" + +namespace webrtc { +class BitrateProber; +class Clock; +class CriticalSectionWrapper; + +namespace paced_sender { +class IntervalBudget; +struct Packet; +class PacketQueue; +} // namespace paced_sender + +class PacedSender : public Module, public RtpPacketSender { + public: + class Callback { + public: + // Note: packets sent as a result of a callback should not pass by this + // module again. + // Called when it's time to send a queued packet. + // Returns false if packet cannot be sent. + virtual bool TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission) = 0; + // Called when it's a good time to send a padding data. + // Returns the number of bytes sent. + virtual size_t TimeToSendPadding(size_t bytes) = 0; + + protected: + virtual ~Callback() {} + }; + + static const int64_t kDefaultMaxQueueLengthMs = 2000; + // Pace in kbits/s until we receive first estimate. + static const int kDefaultInitialPaceKbps = 2000; + // Pacing-rate relative to our target send rate. + // Multiplicative factor that is applied to the target bitrate to calculate + // the number of bytes that can be transmitted per interval. + // Increasing this factor will result in lower delays in cases of bitrate + // overshoots from the encoder. + static const float kDefaultPaceMultiplier; + + static const size_t kMinProbePacketSize = 200; + + PacedSender(Clock* clock, + Callback* callback, + int bitrate_kbps, + int max_bitrate_kbps, + int min_bitrate_kbps); + + virtual ~PacedSender(); + + // Temporarily pause all sending. + void Pause(); + + // Resume sending packets. + void Resume(); + + // Enable bitrate probing. Enabled by default, mostly here to simplify + // testing. Must be called before any packets are being sent to have an + // effect. + void SetProbingEnabled(bool enabled); + + // Set target bitrates for the pacer. + // We will pace out bursts of packets at a bitrate of |max_bitrate_kbps|. + // |bitrate_kbps| is our estimate of what we are allowed to send on average. + // Padding packets will be utilized to reach |min_bitrate| unless enough media + // packets are available. + void UpdateBitrate(int bitrate_kbps, + int max_bitrate_kbps, + int min_bitrate_kbps); + + // Returns true if we send the packet now, else it will add the packet + // information to the queue and call TimeToSendPacket when it's time to send. + void InsertPacket(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t bytes, + bool retransmission) override; + + // Returns the time since the oldest queued packet was enqueued. + virtual int64_t QueueInMs() const; + + virtual size_t QueueSizePackets() const; + + // Returns the number of milliseconds it will take to send the current + // packets in the queue, given the current size and bitrate, ignoring prio. + virtual int64_t ExpectedQueueTimeMs() const; + + // Returns the number of milliseconds until the module want a worker thread + // to call Process. + int64_t TimeUntilNextProcess() override; + + // Process any pending packets in the queue(s). + int32_t Process() override; + + private: + // Updates the number of bytes that can be sent for the next time interval. + void UpdateBytesPerInterval(int64_t delta_time_in_ms) + EXCLUSIVE_LOCKS_REQUIRED(critsect_); + + bool SendPacket(const paced_sender::Packet& packet) + EXCLUSIVE_LOCKS_REQUIRED(critsect_); + void SendPadding(size_t padding_needed) EXCLUSIVE_LOCKS_REQUIRED(critsect_); + + Clock* const clock_; + Callback* const callback_; + + rtc::scoped_ptr<CriticalSectionWrapper> critsect_; + bool paused_ GUARDED_BY(critsect_); + bool probing_enabled_; + // This is the media budget, keeping track of how many bits of media + // we can pace out during the current interval. + rtc::scoped_ptr<paced_sender::IntervalBudget> media_budget_ + GUARDED_BY(critsect_); + // This is the padding budget, keeping track of how many bits of padding we're + // allowed to send out during the current interval. This budget will be + // utilized when there's no media to send. + rtc::scoped_ptr<paced_sender::IntervalBudget> padding_budget_ + GUARDED_BY(critsect_); + + rtc::scoped_ptr<BitrateProber> prober_ GUARDED_BY(critsect_); + int bitrate_bps_ GUARDED_BY(critsect_); + + int64_t time_last_update_us_ GUARDED_BY(critsect_); + + rtc::scoped_ptr<paced_sender::PacketQueue> packets_ GUARDED_BY(critsect_); + uint64_t packet_counter_; +}; +} // namespace webrtc +#endif // WEBRTC_MODULES_PACING_INCLUDE_PACED_SENDER_H_ diff --git a/webrtc/modules/pacing/include/packet_router.h b/webrtc/modules/pacing/include/packet_router.h new file mode 100644 index 0000000000..9d461d13a9 --- /dev/null +++ b/webrtc/modules/pacing/include/packet_router.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_MODULES_PACING_INCLUDE_PACKET_ROUTER_H_ +#define WEBRTC_MODULES_PACING_INCLUDE_PACKET_ROUTER_H_ + +#include <list> + +#include "webrtc/base/constructormagic.h" +#include "webrtc/base/criticalsection.h" +#include "webrtc/base/scoped_ptr.h" +#include "webrtc/base/thread_annotations.h" +#include "webrtc/common_types.h" +#include "webrtc/modules/pacing/include/paced_sender.h" +#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp_defines.h" + +namespace webrtc { + +class RtpRtcp; +namespace rtcp { +class TransportFeedback; +} // namespace rtcp + +// PacketRouter routes outgoing data to the correct sending RTP module, based +// on the simulcast layer in RTPVideoHeader. +class PacketRouter : public PacedSender::Callback, + public TransportSequenceNumberAllocator { + public: + PacketRouter(); + virtual ~PacketRouter(); + + void AddRtpModule(RtpRtcp* rtp_module); + void RemoveRtpModule(RtpRtcp* rtp_module); + + // Implements PacedSender::Callback. + bool TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_timestamp, + bool retransmission) override; + + size_t TimeToSendPadding(size_t bytes) override; + + void SetTransportWideSequenceNumber(uint16_t sequence_number); + uint16_t AllocateSequenceNumber() override; + + // Send transport feedback packet to send-side. + virtual bool SendFeedback(rtcp::TransportFeedback* packet); + + private: + rtc::CriticalSection modules_lock_; + // Map from ssrc to sending rtp module. + std::list<RtpRtcp*> rtp_modules_ GUARDED_BY(modules_lock_); + + volatile int transport_seq_; + + RTC_DISALLOW_COPY_AND_ASSIGN(PacketRouter); +}; +} // namespace webrtc +#endif // WEBRTC_MODULES_PACING_INCLUDE_PACKET_ROUTER_H_ diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc new file mode 100644 index 0000000000..5d7ae17b23 --- /dev/null +++ b/webrtc/modules/pacing/paced_sender.cc @@ -0,0 +1,398 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/modules/pacing/include/paced_sender.h" + +#include <assert.h> + +#include <map> +#include <queue> +#include <set> + +#include "webrtc/modules/interface/module_common_types.h" +#include "webrtc/modules/pacing/bitrate_prober.h" +#include "webrtc/system_wrappers/include/clock.h" +#include "webrtc/system_wrappers/include/critical_section_wrapper.h" +#include "webrtc/system_wrappers/include/field_trial.h" +#include "webrtc/system_wrappers/include/logging.h" + +namespace { +// Time limit in milliseconds between packet bursts. +const int64_t kMinPacketLimitMs = 5; + +// Upper cap on process interval, in case process has not been called in a long +// time. +const int64_t kMaxIntervalTimeMs = 30; + +} // namespace + +namespace webrtc { +namespace paced_sender { +struct Packet { + Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order) + : priority(priority), + ssrc(ssrc), + sequence_number(seq_number), + capture_time_ms(capture_time_ms), + enqueue_time_ms(enqueue_time_ms), + bytes(length_in_bytes), + retransmission(retransmission), + enqueue_order(enqueue_order) {} + + RtpPacketSender::Priority priority; + uint32_t ssrc; + uint16_t sequence_number; + int64_t capture_time_ms; + int64_t enqueue_time_ms; + size_t bytes; + bool retransmission; + uint64_t enqueue_order; + std::list<Packet>::iterator this_it; +}; + +// Used by priority queue to sort packets. +struct Comparator { + bool operator()(const Packet* first, const Packet* second) { + // Highest prio = 0. + if (first->priority != second->priority) + return first->priority > second->priority; + + // Retransmissions go first. + if (second->retransmission && !first->retransmission) + return true; + + // Older frames have higher prio. + if (first->capture_time_ms != second->capture_time_ms) + return first->capture_time_ms > second->capture_time_ms; + + return first->enqueue_order > second->enqueue_order; + } +}; + +// Class encapsulating a priority queue with some extensions. +class PacketQueue { + public: + PacketQueue() : bytes_(0) {} + virtual ~PacketQueue() {} + + void Push(const Packet& packet) { + if (!AddToDupeSet(packet)) { + return; + } + // Store packet in list, use pointers in priority queue for cheaper moves. + // Packets have a handle to its own iterator in the list, for easy removal + // when popping from queue. + packet_list_.push_front(packet); + std::list<Packet>::iterator it = packet_list_.begin(); + it->this_it = it; // Handle for direct removal from list. + prio_queue_.push(&(*it)); // Pointer into list. + bytes_ += packet.bytes; + } + + const Packet& BeginPop() { + const Packet& packet = *prio_queue_.top(); + prio_queue_.pop(); + return packet; + } + + void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); } + + void FinalizePop(const Packet& packet) { + RemoveFromDupeSet(packet); + bytes_ -= packet.bytes; + packet_list_.erase(packet.this_it); + } + + bool Empty() const { return prio_queue_.empty(); } + + size_t SizeInPackets() const { return prio_queue_.size(); } + + uint64_t SizeInBytes() const { return bytes_; } + + int64_t OldestEnqueueTime() const { + std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin(); + if (it == packet_list_.rend()) + return 0; + return it->enqueue_time_ms; + } + + private: + // Try to add a packet to the set of ssrc/seqno identifiers currently in the + // queue. Return true if inserted, false if this is a duplicate. + bool AddToDupeSet(const Packet& packet) { + SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); + if (it == dupe_map_.end()) { + // First for this ssrc, just insert. + dupe_map_[packet.ssrc].insert(packet.sequence_number); + return true; + } + + // Insert returns a pair, where second is a bool set to true if new element. + return it->second.insert(packet.sequence_number).second; + } + + void RemoveFromDupeSet(const Packet& packet) { + SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); + assert(it != dupe_map_.end()); + it->second.erase(packet.sequence_number); + if (it->second.empty()) { + dupe_map_.erase(it); + } + } + + // List of packets, in the order the were enqueued. Since dequeueing may + // occur out of order, use list instead of vector. + std::list<Packet> packet_list_; + // Priority queue of the packets, sorted according to Comparator. + // Use pointers into list, to avoid moving whole struct within heap. + std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_; + // Total number of bytes in the queue. + uint64_t bytes_; + // Map<ssrc, set<seq_no> >, for checking duplicates. + typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap; + SsrcSeqNoMap dupe_map_; +}; + +class IntervalBudget { + public: + explicit IntervalBudget(int initial_target_rate_kbps) + : target_rate_kbps_(initial_target_rate_kbps), + bytes_remaining_(0) {} + + void set_target_rate_kbps(int target_rate_kbps) { + target_rate_kbps_ = target_rate_kbps; + bytes_remaining_ = + std::max(-kWindowMs * target_rate_kbps_ / 8, bytes_remaining_); + } + + void IncreaseBudget(int64_t delta_time_ms) { + int64_t bytes = target_rate_kbps_ * delta_time_ms / 8; + if (bytes_remaining_ < 0) { + // We overused last interval, compensate this interval. + bytes_remaining_ = bytes_remaining_ + bytes; + } else { + // If we underused last interval we can't use it this interval. + bytes_remaining_ = bytes; + } + } + + void UseBudget(size_t bytes) { + bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes), + -kWindowMs * target_rate_kbps_ / 8); + } + + size_t bytes_remaining() const { + return static_cast<size_t>(std::max(0, bytes_remaining_)); + } + + int target_rate_kbps() const { return target_rate_kbps_; } + + private: + static const int kWindowMs = 500; + + int target_rate_kbps_; + int bytes_remaining_; +}; +} // namespace paced_sender + +const float PacedSender::kDefaultPaceMultiplier = 2.5f; + +PacedSender::PacedSender(Clock* clock, + Callback* callback, + int bitrate_kbps, + int max_bitrate_kbps, + int min_bitrate_kbps) + : clock_(clock), + callback_(callback), + critsect_(CriticalSectionWrapper::CreateCriticalSection()), + paused_(false), + probing_enabled_(true), + media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)), + padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)), + prober_(new BitrateProber()), + bitrate_bps_(1000 * bitrate_kbps), + time_last_update_us_(clock->TimeInMicroseconds()), + packets_(new paced_sender::PacketQueue()), + packet_counter_(0) { + UpdateBytesPerInterval(kMinPacketLimitMs); +} + +PacedSender::~PacedSender() {} + +void PacedSender::Pause() { + CriticalSectionScoped cs(critsect_.get()); + paused_ = true; +} + +void PacedSender::Resume() { + CriticalSectionScoped cs(critsect_.get()); + paused_ = false; +} + +void PacedSender::SetProbingEnabled(bool enabled) { + assert(packet_counter_ == 0); + probing_enabled_ = enabled; +} + +void PacedSender::UpdateBitrate(int bitrate_kbps, + int max_bitrate_kbps, + int min_bitrate_kbps) { + CriticalSectionScoped cs(critsect_.get()); + media_budget_->set_target_rate_kbps(max_bitrate_kbps); + padding_budget_->set_target_rate_kbps(min_bitrate_kbps); + bitrate_bps_ = 1000 * bitrate_kbps; +} + +void PacedSender::InsertPacket(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t bytes, + bool retransmission) { + CriticalSectionScoped cs(critsect_.get()); + + if (probing_enabled_ && !prober_->IsProbing()) { + prober_->SetEnabled(true); + } + prober_->MaybeInitializeProbe(bitrate_bps_); + + if (capture_time_ms < 0) { + capture_time_ms = clock_->TimeInMilliseconds(); + } + + packets_->Push(paced_sender::Packet( + priority, ssrc, sequence_number, capture_time_ms, + clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++)); +} + +int64_t PacedSender::ExpectedQueueTimeMs() const { + CriticalSectionScoped cs(critsect_.get()); + int target_rate = media_budget_->target_rate_kbps(); + assert(target_rate > 0); + return static_cast<int64_t>(packets_->SizeInBytes() * 8 / target_rate); +} + +size_t PacedSender::QueueSizePackets() const { + CriticalSectionScoped cs(critsect_.get()); + return packets_->SizeInPackets(); +} + +int64_t PacedSender::QueueInMs() const { + CriticalSectionScoped cs(critsect_.get()); + + int64_t oldest_packet = packets_->OldestEnqueueTime(); + if (oldest_packet == 0) + return 0; + + return clock_->TimeInMilliseconds() - oldest_packet; +} + +int64_t PacedSender::TimeUntilNextProcess() { + CriticalSectionScoped cs(critsect_.get()); + if (prober_->IsProbing()) { + int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); + if (ret >= 0) { + return ret; + } + } + int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; + int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; + return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0); +} + +int32_t PacedSender::Process() { + int64_t now_us = clock_->TimeInMicroseconds(); + CriticalSectionScoped cs(critsect_.get()); + int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000; + time_last_update_us_ = now_us; + if (paused_) + return 0; + if (elapsed_time_ms > 0) { + int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); + UpdateBytesPerInterval(delta_time_ms); + } + while (!packets_->Empty()) { + if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) { + return 0; + } + + // Since we need to release the lock in order to send, we first pop the + // element from the priority queue but keep it in storage, so that we can + // reinsert it if send fails. + const paced_sender::Packet& packet = packets_->BeginPop(); + if (SendPacket(packet)) { + // Send succeeded, remove it from the queue. + packets_->FinalizePop(packet); + if (prober_->IsProbing()) { + return 0; + } + } else { + // Send failed, put it back into the queue. + packets_->CancelPop(packet); + return 0; + } + } + + if (!packets_->Empty()) + return 0; + + size_t padding_needed; + if (prober_->IsProbing()) + padding_needed = prober_->RecommendedPacketSize(); + else + padding_needed = padding_budget_->bytes_remaining(); + + if (padding_needed > 0) + SendPadding(static_cast<size_t>(padding_needed)); + return 0; +} + +bool PacedSender::SendPacket(const paced_sender::Packet& packet) { + critsect_->Leave(); + const bool success = callback_->TimeToSendPacket(packet.ssrc, + packet.sequence_number, + packet.capture_time_ms, + packet.retransmission); + critsect_->Enter(); + + if (success) { + // Update media bytes sent. + prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes); + media_budget_->UseBudget(packet.bytes); + padding_budget_->UseBudget(packet.bytes); + } + + return success; +} + +void PacedSender::SendPadding(size_t padding_needed) { + critsect_->Leave(); + size_t bytes_sent = callback_->TimeToSendPadding(padding_needed); + critsect_->Enter(); + + if (bytes_sent > 0) { + prober_->PacketSent(clock_->TimeInMilliseconds(), bytes_sent); + media_budget_->UseBudget(bytes_sent); + padding_budget_->UseBudget(bytes_sent); + } +} + +void PacedSender::UpdateBytesPerInterval(int64_t delta_time_ms) { + media_budget_->IncreaseBudget(delta_time_ms); + padding_budget_->IncreaseBudget(delta_time_ms); +} +} // namespace webrtc diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc new file mode 100644 index 0000000000..c27444c5ac --- /dev/null +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -0,0 +1,834 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include <list> + +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "webrtc/modules/pacing/include/paced_sender.h" +#include "webrtc/system_wrappers/include/clock.h" + +using testing::_; +using testing::Return; + +namespace webrtc { +namespace test { + +static const int kTargetBitrate = 800; +static const float kPaceMultiplier = 1.5f; + +class MockPacedSenderCallback : public PacedSender::Callback { + public: + MOCK_METHOD4(TimeToSendPacket, + bool(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission)); + MOCK_METHOD1(TimeToSendPadding, + size_t(size_t bytes)); +}; + +class PacedSenderPadding : public PacedSender::Callback { + public: + PacedSenderPadding() : padding_sent_(0) {} + + bool TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission) { + return true; + } + + size_t TimeToSendPadding(size_t bytes) { + const size_t kPaddingPacketSize = 224; + size_t num_packets = (bytes + kPaddingPacketSize - 1) / kPaddingPacketSize; + padding_sent_ += kPaddingPacketSize * num_packets; + return kPaddingPacketSize * num_packets; + } + + size_t padding_sent() { return padding_sent_; } + + private: + size_t padding_sent_; +}; + +class PacedSenderProbing : public PacedSender::Callback { + public: + PacedSenderProbing(const std::list<int>& expected_deltas, Clock* clock) + : prev_packet_time_ms_(-1), + expected_deltas_(expected_deltas), + packets_sent_(0), + clock_(clock) {} + + bool TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission) { + ExpectAndCountPacket(); + return true; + } + + size_t TimeToSendPadding(size_t bytes) { + ExpectAndCountPacket(); + return bytes; + } + + void ExpectAndCountPacket() { + ++packets_sent_; + EXPECT_FALSE(expected_deltas_.empty()); + if (expected_deltas_.empty()) + return; + int64_t now_ms = clock_->TimeInMilliseconds(); + if (prev_packet_time_ms_ >= 0) { + EXPECT_EQ(expected_deltas_.front(), now_ms - prev_packet_time_ms_); + expected_deltas_.pop_front(); + } + prev_packet_time_ms_ = now_ms; + } + + int packets_sent() const { return packets_sent_; } + + private: + int64_t prev_packet_time_ms_; + std::list<int> expected_deltas_; + int packets_sent_; + Clock* clock_; +}; + +class PacedSenderTest : public ::testing::Test { + protected: + PacedSenderTest() : clock_(123456) { + srand(0); + // Need to initialize PacedSender after we initialize clock. + send_bucket_.reset(new PacedSender(&clock_, + &callback_, + kTargetBitrate, + kPaceMultiplier * kTargetBitrate, + 0)); + // Default to bitrate probing disabled for testing purposes. Probing tests + // have to enable probing, either by creating a new PacedSender instance or + // by calling SetProbingEnabled(true). + send_bucket_->SetProbingEnabled(false); + } + + void SendAndExpectPacket(PacedSender::Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t size, + bool retransmission) { + send_bucket_->InsertPacket(priority, ssrc, sequence_number, capture_time_ms, + size, retransmission); + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false)) + .Times(1) + .WillRepeatedly(Return(true)); + } + + SimulatedClock clock_; + MockPacedSenderCallback callback_; + rtc::scoped_ptr<PacedSender> send_bucket_; +}; + +TEST_F(PacedSenderTest, QueuePacket) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + // Due to the multiplicative factor we can send 3 packets not 2 packets. + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + int64_t queued_packet_timestamp = clock_.TimeInMilliseconds(); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, queued_packet_timestamp, 250, + false); + send_bucket_->Process(); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + clock_.AdvanceTimeMilliseconds(4); + EXPECT_EQ(1, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(1); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_CALL( + callback_, + TimeToSendPacket(ssrc, sequence_number++, queued_packet_timestamp, false)) + .Times(1) + .WillRepeatedly(Return(true)); + send_bucket_->Process(); + sequence_number++; + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + 250, false); + send_bucket_->Process(); +} + +TEST_F(PacedSenderTest, PaceQueuedPackets) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + + // Due to the multiplicative factor we can send 3 packets not 2 packets. + for (int i = 0; i < 3; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + } + for (int j = 0; j < 30; ++j) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + 250, false); + } + send_bucket_->Process(); + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + for (int k = 0; k < 10; ++k) { + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false)) + .Times(3) + .WillRepeatedly(Return(true)); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + } + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, clock_.TimeInMilliseconds(), 250, + false); + send_bucket_->Process(); +} + +TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + uint16_t queued_sequence_number; + + // Due to the multiplicative factor we can send 3 packets not 2 packets. + for (int i = 0; i < 3; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + } + queued_sequence_number = sequence_number; + + for (int j = 0; j < 30; ++j) { + // Send in duplicate packets. + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, clock_.TimeInMilliseconds(), + 250, false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + 250, false); + } + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + send_bucket_->Process(); + for (int k = 0; k < 10; ++k) { + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + + for (int i = 0; i < 3; ++i) { + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, queued_sequence_number++, _, false)) + .Times(1) + .WillRepeatedly(Return(true)); + } + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + } + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + 250, false); + send_bucket_->Process(); +} + +TEST_F(PacedSenderTest, CanQueuePacketsWithSameSequenceNumberOnDifferentSsrcs) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number, + clock_.TimeInMilliseconds(), + 250, + false); + + // Expect packet on second ssrc to be queued and sent as well. + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc + 1, + sequence_number, + clock_.TimeInMilliseconds(), + 250, + false); + + clock_.AdvanceTimeMilliseconds(1000); + send_bucket_->Process(); +} + +TEST_F(PacedSenderTest, Padding) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + + send_bucket_->UpdateBitrate( + kTargetBitrate, kPaceMultiplier * kTargetBitrate, kTargetBitrate); + // Due to the multiplicative factor we can send 3 packets not 2 packets. + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + clock_.TimeInMilliseconds(), + 250, + false); + // No padding is expected since we have sent too much already. + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + + // 5 milliseconds later we have enough budget to send some padding. + EXPECT_CALL(callback_, TimeToSendPadding(250)).Times(1). + WillOnce(Return(250)); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); +} + +TEST_F(PacedSenderTest, VerifyPaddingUpToBitrate) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + const int kTimeStep = 5; + const int64_t kBitrateWindow = 100; + send_bucket_->UpdateBitrate( + kTargetBitrate, kPaceMultiplier * kTargetBitrate, kTargetBitrate); + int64_t start_time = clock_.TimeInMilliseconds(); + while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) { + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + capture_time_ms, + 250, + false); + clock_.AdvanceTimeMilliseconds(kTimeStep); + EXPECT_CALL(callback_, TimeToSendPadding(250)).Times(1). + WillOnce(Return(250)); + send_bucket_->Process(); + } +} + +TEST_F(PacedSenderTest, VerifyAverageBitrateVaryingMediaPayload) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + const int kTimeStep = 5; + const int64_t kBitrateWindow = 10000; + PacedSenderPadding callback; + send_bucket_.reset(new PacedSender( + &clock_, &callback, kTargetBitrate, kPaceMultiplier * kTargetBitrate, 0)); + send_bucket_->SetProbingEnabled(false); + send_bucket_->UpdateBitrate( + kTargetBitrate, kPaceMultiplier * kTargetBitrate, kTargetBitrate); + int64_t start_time = clock_.TimeInMilliseconds(); + size_t media_bytes = 0; + while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) { + size_t media_payload = rand() % 100 + 200; // [200, 300] bytes. + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, + media_payload, false); + media_bytes += media_payload; + clock_.AdvanceTimeMilliseconds(kTimeStep); + send_bucket_->Process(); + } + EXPECT_NEAR(kTargetBitrate, + static_cast<int>(8 * (media_bytes + callback.padding_sent()) / + kBitrateWindow), 1); +} + +TEST_F(PacedSenderTest, Priority) { + uint32_t ssrc_low_priority = 12345; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + int64_t capture_time_ms_low_priority = 1234567; + + // Due to the multiplicative factor we can send 3 packets not 2 packets. + SendAndExpectPacket(PacedSender::kLowPriority, + ssrc, + sequence_number++, + capture_time_ms, + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + capture_time_ms, + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + capture_time_ms, + 250, + false); + send_bucket_->Process(); + + // Expect normal and low priority to be queued and high to pass through. + send_bucket_->InsertPacket(PacedSender::kLowPriority, ssrc_low_priority, + sequence_number++, capture_time_ms_low_priority, + 250, false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + + // Expect all high and normal priority to be sent out first. + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms, false)) + .Times(3) + .WillRepeatedly(Return(true)); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + + EXPECT_CALL(callback_, + TimeToSendPacket( + ssrc_low_priority, _, capture_time_ms_low_priority, false)) + .Times(1) + .WillRepeatedly(Return(true)); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); +} + +TEST_F(PacedSenderTest, Pause) { + uint32_t ssrc_low_priority = 12345; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = clock_.TimeInMilliseconds(); + + EXPECT_EQ(0, send_bucket_->QueueInMs()); + + // Due to the multiplicative factor we can send 3 packets not 2 packets. + SendAndExpectPacket(PacedSender::kLowPriority, + ssrc, + sequence_number++, + capture_time_ms, + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + capture_time_ms, + 250, + false); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + capture_time_ms, + 250, + false); + send_bucket_->Process(); + + send_bucket_->Pause(); + + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + + clock_.AdvanceTimeMilliseconds(10000); + int64_t second_capture_time_ms = clock_.TimeInMilliseconds(); + + // Expect everything to be queued. + send_bucket_->InsertPacket(PacedSender::kLowPriority, ssrc_low_priority, + sequence_number++, second_capture_time_ms, 250, + false); + + EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms, + send_bucket_->QueueInMs()); + + // Expect no packet to come out while paused. + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _)).Times(0); + + for (int i = 0; i < 10; ++i) { + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + } + // Expect high prio packets to come out first followed by all packets in the + // way they were added. + EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms, false)) + .Times(3) + .WillRepeatedly(Return(true)); + send_bucket_->Resume(); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + + EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms, false)) + .Times(1) + .WillRepeatedly(Return(true)); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + EXPECT_EQ(0, send_bucket_->QueueInMs()); +} + +TEST_F(PacedSenderTest, ResendPacket) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = clock_.TimeInMilliseconds(); + EXPECT_EQ(0, send_bucket_->QueueInMs()); + + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, capture_time_ms, 250, false); + clock_.AdvanceTimeMilliseconds(1); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number + 1, capture_time_ms + 1, 250, + false); + clock_.AdvanceTimeMilliseconds(9999); + EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms, + send_bucket_->QueueInMs()); + // Fails to send first packet so only one call. + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false)) + .Times(1) + .WillOnce(Return(false)); + clock_.AdvanceTimeMilliseconds(10000); + send_bucket_->Process(); + + // Queue remains unchanged. + EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms, + send_bucket_->QueueInMs()); + + // Fails to send second packet. + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false)) + .Times(1) + .WillOnce(Return(true)); + EXPECT_CALL( + callback_, + TimeToSendPacket(ssrc, sequence_number + 1, capture_time_ms + 1, false)) + .Times(1) + .WillOnce(Return(false)); + clock_.AdvanceTimeMilliseconds(10000); + send_bucket_->Process(); + + // Queue is reduced by 1 packet. + EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms - 1, + send_bucket_->QueueInMs()); + + // Send second packet and queue becomes empty. + EXPECT_CALL( + callback_, + TimeToSendPacket(ssrc, sequence_number + 1, capture_time_ms + 1, false)) + .Times(1) + .WillOnce(Return(true)); + clock_.AdvanceTimeMilliseconds(10000); + send_bucket_->Process(); + EXPECT_EQ(0, send_bucket_->QueueInMs()); +} + +TEST_F(PacedSenderTest, ExpectedQueueTimeMs) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kNumPackets = 60; + const size_t kPacketSize = 1200; + const int32_t kMaxBitrate = kPaceMultiplier * 30; + EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs()); + + send_bucket_->UpdateBitrate(30, kMaxBitrate, 0); + for (size_t i = 0; i < kNumPackets; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + } + + // Queue in ms = 1000 * (bytes in queue) / (kbit per second * 1000 / 8) + int64_t queue_in_ms = + static_cast<int64_t>(kNumPackets * kPacketSize * 8 / kMaxBitrate); + EXPECT_EQ(queue_in_ms, send_bucket_->ExpectedQueueTimeMs()); + + int64_t time_start = clock_.TimeInMilliseconds(); + while (send_bucket_->QueueSizePackets() > 0) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + if (time_until_process <= 0) { + send_bucket_->Process(); + } else { + clock_.AdvanceTimeMilliseconds(time_until_process); + } + } + int64_t duration = clock_.TimeInMilliseconds() - time_start; + + EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs()); + + // Allow for aliasing, duration should be in [expected(n - 1), expected(n)]. + EXPECT_LE(duration, queue_in_ms); + EXPECT_GE(duration, + queue_in_ms - static_cast<int64_t>(kPacketSize * 8 / kMaxBitrate)); +} + +TEST_F(PacedSenderTest, QueueTimeGrowsOverTime) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + EXPECT_EQ(0, send_bucket_->QueueInMs()); + + send_bucket_->UpdateBitrate(30, kPaceMultiplier * 30, 0); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number, + clock_.TimeInMilliseconds(), + 1200, + false); + + clock_.AdvanceTimeMilliseconds(500); + EXPECT_EQ(500, send_bucket_->QueueInMs()); + send_bucket_->Process(); + EXPECT_EQ(0, send_bucket_->QueueInMs()); +} + +TEST_F(PacedSenderTest, ProbingWithInitialFrame) { + const int kNumPackets = 11; + const int kNumDeltas = kNumPackets - 1; + const size_t kPacketSize = 1200; + const int kInitialBitrateKbps = 300; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const int expected_deltas[kNumDeltas] = { + 10, 10, 10, 10, 10, 5, 5, 5, 5, 5}; + std::list<int> expected_deltas_list(expected_deltas, + expected_deltas + kNumPackets - 1); + PacedSenderProbing callback(expected_deltas_list, &clock_); + send_bucket_.reset( + new PacedSender(&clock_, + &callback, + kInitialBitrateKbps, + kPaceMultiplier * kInitialBitrateKbps, + 0)); + + for (int i = 0; i < kNumPackets; ++i) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + } + while (callback.packets_sent() < kNumPackets) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + if (time_until_process <= 0) { + send_bucket_->Process(); + } else { + clock_.AdvanceTimeMilliseconds(time_until_process); + } + } +} + +TEST_F(PacedSenderTest, ProbingWithTooSmallInitialFrame) { + const int kNumPackets = 11; + const int kNumDeltas = kNumPackets - 1; + const size_t kPacketSize = 1200; + const int kInitialBitrateKbps = 300; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const int expected_deltas[kNumDeltas] = {10, 10, 10, 10, 10, 5, 5, 5, 5, 5}; + std::list<int> expected_deltas_list(expected_deltas, + expected_deltas + kNumPackets - 1); + PacedSenderProbing callback(expected_deltas_list, &clock_); + send_bucket_.reset(new PacedSender(&clock_, &callback, kInitialBitrateKbps, + kPaceMultiplier * kInitialBitrateKbps, 0)); + + for (int i = 0; i < kNumPackets - 5; ++i) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + } + while (callback.packets_sent() < kNumPackets) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + if (time_until_process <= 0) { + send_bucket_->Process(); + } else { + clock_.AdvanceTimeMilliseconds(time_until_process); + } + } + + // Process one more time and make sure we don't send any more probes. + int time_until_process = send_bucket_->TimeUntilNextProcess(); + clock_.AdvanceTimeMilliseconds(time_until_process); + send_bucket_->Process(); + EXPECT_EQ(kNumPackets, callback.packets_sent()); +} + +TEST_F(PacedSenderTest, PriorityInversion) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = 1200; + + send_bucket_->InsertPacket( + PacedSender::kHighPriority, ssrc, sequence_number + 3, + clock_.TimeInMilliseconds() + 33, kPacketSize, true); + + send_bucket_->InsertPacket( + PacedSender::kHighPriority, ssrc, sequence_number + 2, + clock_.TimeInMilliseconds() + 33, kPacketSize, true); + + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, sequence_number, + clock_.TimeInMilliseconds(), kPacketSize, true); + + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, + sequence_number + 1, clock_.TimeInMilliseconds(), + kPacketSize, true); + + // Packets from earlier frames should be sent first. + { + ::testing::InSequence sequence; + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number, + clock_.TimeInMilliseconds(), true)) + .WillOnce(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1, + clock_.TimeInMilliseconds(), true)) + .WillOnce(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 3, + clock_.TimeInMilliseconds() + 33, + true)).WillOnce(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 2, + clock_.TimeInMilliseconds() + 33, + true)).WillOnce(Return(true)); + + while (send_bucket_->QueueSizePackets() > 0) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + if (time_until_process <= 0) { + send_bucket_->Process(); + } else { + clock_.AdvanceTimeMilliseconds(time_until_process); + } + } + } +} + +TEST_F(PacedSenderTest, PaddingOveruse) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = 1200; + + // Min bitrate 0 => no padding, padding budget will stay at 0. + send_bucket_->UpdateBitrate(60, 90, 0); + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + send_bucket_->Process(); + + // Add 30kbit padding. When increasing budget, media budget will increase from + // negative (overuse) while padding budget will increase form 0. + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->UpdateBitrate(60, 90, 30); + + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + + // Don't send padding if queue is non-empty, even if padding budget > 0. + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + send_bucket_->Process(); +} + +} // namespace test +} // namespace webrtc diff --git a/webrtc/modules/pacing/pacing.gypi b/webrtc/modules/pacing/pacing.gypi new file mode 100644 index 0000000000..faa97841c1 --- /dev/null +++ b/webrtc/modules/pacing/pacing.gypi @@ -0,0 +1,29 @@ +# Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. +# +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file in the root of the source +# tree. An additional intellectual property rights grant can be found +# in the file PATENTS. All contributing project authors may +# be found in the AUTHORS file in the root of the source tree. + +{ + 'targets': [ + { + 'target_name': 'paced_sender', + 'type': 'static_library', + 'dependencies': [ + '<(webrtc_root)/system_wrappers/system_wrappers.gyp:system_wrappers', + '<(webrtc_root)/modules/modules.gyp:bitrate_controller', + '<(webrtc_root)/modules/modules.gyp:rtp_rtcp', + ], + 'sources': [ + 'include/paced_sender.h', + 'include/packet_router.h', + 'bitrate_prober.cc', + 'bitrate_prober.h', + 'paced_sender.cc', + 'packet_router.cc', + ], + }, + ], # targets +} diff --git a/webrtc/modules/pacing/packet_router.cc b/webrtc/modules/pacing/packet_router.cc new file mode 100644 index 0000000000..563773b41f --- /dev/null +++ b/webrtc/modules/pacing/packet_router.cc @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/modules/pacing/include/packet_router.h" + +#include "webrtc/base/atomicops.h" +#include "webrtc/base/checks.h" +#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h" +#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp_defines.h" +#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" + +namespace webrtc { + +PacketRouter::PacketRouter() : transport_seq_(0) { +} + +PacketRouter::~PacketRouter() { + RTC_DCHECK(rtp_modules_.empty()); +} + +void PacketRouter::AddRtpModule(RtpRtcp* rtp_module) { + rtc::CritScope cs(&modules_lock_); + RTC_DCHECK(std::find(rtp_modules_.begin(), rtp_modules_.end(), rtp_module) == + rtp_modules_.end()); + rtp_modules_.push_back(rtp_module); +} + +void PacketRouter::RemoveRtpModule(RtpRtcp* rtp_module) { + rtc::CritScope cs(&modules_lock_); + auto it = std::find(rtp_modules_.begin(), rtp_modules_.end(), rtp_module); + RTC_DCHECK(it != rtp_modules_.end()); + rtp_modules_.erase(it); +} + +bool PacketRouter::TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_timestamp, + bool retransmission) { + rtc::CritScope cs(&modules_lock_); + for (auto* rtp_module : rtp_modules_) { + if (rtp_module->SendingMedia() && ssrc == rtp_module->SSRC()) { + return rtp_module->TimeToSendPacket(ssrc, sequence_number, + capture_timestamp, retransmission); + } + } + return true; +} + +size_t PacketRouter::TimeToSendPadding(size_t bytes_to_send) { + size_t total_bytes_sent = 0; + rtc::CritScope cs(&modules_lock_); + for (RtpRtcp* module : rtp_modules_) { + if (module->SendingMedia()) { + size_t bytes_sent = + module->TimeToSendPadding(bytes_to_send - total_bytes_sent); + total_bytes_sent += bytes_sent; + if (total_bytes_sent >= bytes_to_send) + break; + } + } + return total_bytes_sent; +} + +void PacketRouter::SetTransportWideSequenceNumber(uint16_t sequence_number) { + rtc::AtomicOps::ReleaseStore(&transport_seq_, sequence_number); +} + +uint16_t PacketRouter::AllocateSequenceNumber() { + int prev_seq = rtc::AtomicOps::AcquireLoad(&transport_seq_); + int desired_prev_seq; + int new_seq; + do { + desired_prev_seq = prev_seq; + new_seq = (desired_prev_seq + 1) & 0xFFFF; + // Note: CompareAndSwap returns the actual value of transport_seq at the + // time the CAS operation was executed. Thus, if prev_seq is returned, the + // operation was successful - otherwise we need to retry. Saving the + // return value saves us a load on retry. + prev_seq = rtc::AtomicOps::CompareAndSwap(&transport_seq_, desired_prev_seq, + new_seq); + } while (prev_seq != desired_prev_seq); + + return new_seq; +} + +bool PacketRouter::SendFeedback(rtcp::TransportFeedback* packet) { + rtc::CritScope cs(&modules_lock_); + for (auto* rtp_module : rtp_modules_) { + packet->WithPacketSenderSsrc(rtp_module->SSRC()); + if (rtp_module->SendFeedbackPacket(*packet)) + return true; + } + return false; +} + +} // namespace webrtc diff --git a/webrtc/modules/pacing/packet_router_unittest.cc b/webrtc/modules/pacing/packet_router_unittest.cc new file mode 100644 index 0000000000..eecb13757c --- /dev/null +++ b/webrtc/modules/pacing/packet_router_unittest.cc @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include <list> + +#include "webrtc/base/checks.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "webrtc/modules/pacing/include/packet_router.h" +#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h" +#include "webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h" +#include "webrtc/base/scoped_ptr.h" + +using ::testing::_; +using ::testing::AnyNumber; +using ::testing::NiceMock; +using ::testing::Return; + +namespace webrtc { + +class PacketRouterTest : public ::testing::Test { + public: + PacketRouterTest() : packet_router_(new PacketRouter()) {} + protected: + const rtc::scoped_ptr<PacketRouter> packet_router_; +}; + +TEST_F(PacketRouterTest, TimeToSendPacket) { + MockRtpRtcp rtp_1; + MockRtpRtcp rtp_2; + packet_router_->AddRtpModule(&rtp_1); + packet_router_->AddRtpModule(&rtp_2); + + const uint16_t kSsrc1 = 1234; + uint16_t sequence_number = 17; + uint64_t timestamp = 7890; + bool retransmission = false; + + // Send on the first module by letting rtp_1 be sending with correct ssrc. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, SSRC()).Times(1).WillOnce(Return(kSsrc1)); + EXPECT_CALL(rtp_1, TimeToSendPacket(kSsrc1, sequence_number, timestamp, + retransmission)) + .Times(1) + .WillOnce(Return(true)); + EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0); + EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1, sequence_number, + timestamp, retransmission)); + + // Send on the second module by letting rtp_2 be sending, but not rtp_1. + ++sequence_number; + timestamp += 30; + retransmission = true; + const uint16_t kSsrc2 = 4567; + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2)); + EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _)).Times(0); + EXPECT_CALL(rtp_2, TimeToSendPacket(kSsrc2, sequence_number, timestamp, + retransmission)) + .Times(1) + .WillOnce(Return(true)); + EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc2, sequence_number, + timestamp, retransmission)); + + // No module is sending, hence no packet should be sent. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _)).Times(0); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0); + EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1, sequence_number, + timestamp, retransmission)); + + // Add a packet with incorrect ssrc and test it's dropped in the router. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, SSRC()).Times(1).WillOnce(Return(kSsrc1)); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2)); + EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _)).Times(0); + EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0); + EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1 + kSsrc2, sequence_number, + timestamp, retransmission)); + + packet_router_->RemoveRtpModule(&rtp_1); + + // rtp_1 has been removed, try sending a packet on that ssrc and make sure + // it is dropped as expected by not expecting any calls to rtp_1. + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2)); + EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _)).Times(0); + EXPECT_TRUE(packet_router_->TimeToSendPacket(kSsrc1, sequence_number, + timestamp, retransmission)); + + packet_router_->RemoveRtpModule(&rtp_2); +} + +TEST_F(PacketRouterTest, TimeToSendPadding) { + const uint16_t kSsrc1 = 1234; + const uint16_t kSsrc2 = 4567; + + MockRtpRtcp rtp_1; + EXPECT_CALL(rtp_1, SSRC()).WillRepeatedly(Return(kSsrc1)); + MockRtpRtcp rtp_2; + EXPECT_CALL(rtp_2, SSRC()).WillRepeatedly(Return(kSsrc2)); + packet_router_->AddRtpModule(&rtp_1); + packet_router_->AddRtpModule(&rtp_2); + + // Default configuration, sending padding on all modules sending media, + // ordered by SSRC. + const size_t requested_padding_bytes = 1000; + const size_t sent_padding_bytes = 890; + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, TimeToSendPadding(requested_padding_bytes)) + .Times(1) + .WillOnce(Return(sent_padding_bytes)); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, + TimeToSendPadding(requested_padding_bytes - sent_padding_bytes)) + .Times(1) + .WillOnce(Return(requested_padding_bytes - sent_padding_bytes)); + EXPECT_EQ(requested_padding_bytes, + packet_router_->TimeToSendPadding(requested_padding_bytes)); + + // Let only the second module be sending and verify the padding request is + // routed there. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_1, TimeToSendPadding(requested_padding_bytes)).Times(0); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, TimeToSendPadding(_)) + .Times(1) + .WillOnce(Return(sent_padding_bytes)); + EXPECT_EQ(sent_padding_bytes, + packet_router_->TimeToSendPadding(requested_padding_bytes)); + + // No sending module at all. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_1, TimeToSendPadding(requested_padding_bytes)).Times(0); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_2, TimeToSendPadding(_)).Times(0); + EXPECT_EQ(0u, packet_router_->TimeToSendPadding(requested_padding_bytes)); + + packet_router_->RemoveRtpModule(&rtp_1); + + // rtp_1 has been removed, try sending padding and make sure rtp_1 isn't asked + // to send by not expecting any calls. Instead verify rtp_2 is called. + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, TimeToSendPadding(requested_padding_bytes)).Times(1); + EXPECT_EQ(0u, packet_router_->TimeToSendPadding(requested_padding_bytes)); + + packet_router_->RemoveRtpModule(&rtp_2); +} + +TEST_F(PacketRouterTest, AllocateSequenceNumbers) { + const uint16_t kStartSeq = 0xFFF0; + const size_t kNumPackets = 32; + + packet_router_->SetTransportWideSequenceNumber(kStartSeq - 1); + + for (size_t i = 0; i < kNumPackets; ++i) { + uint16_t seq = packet_router_->AllocateSequenceNumber(); + uint32_t expected_unwrapped_seq = static_cast<uint32_t>(kStartSeq) + i; + EXPECT_EQ(static_cast<uint16_t>(expected_unwrapped_seq & 0xFFFF), seq); + } +} +} // namespace webrtc |