diff options
Diffstat (limited to 'webrtc/modules/remote_bitrate_estimator/test/packet_sender.cc')
-rw-r--r-- | webrtc/modules/remote_bitrate_estimator/test/packet_sender.cc | 494 |
1 files changed, 494 insertions, 0 deletions
diff --git a/webrtc/modules/remote_bitrate_estimator/test/packet_sender.cc b/webrtc/modules/remote_bitrate_estimator/test/packet_sender.cc new file mode 100644 index 0000000000..f1faa49d7e --- /dev/null +++ b/webrtc/modules/remote_bitrate_estimator/test/packet_sender.cc @@ -0,0 +1,494 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/modules/remote_bitrate_estimator/test/packet_sender.h" + +#include <algorithm> +#include <list> +#include <sstream> + +#include "webrtc/base/checks.h" +#include "webrtc/modules/interface/module_common_types.h" +#include "webrtc/modules/remote_bitrate_estimator/test/bwe.h" +#include "webrtc/modules/remote_bitrate_estimator/test/metric_recorder.h" + +namespace webrtc { +namespace testing { +namespace bwe { + +void PacketSender::Pause() { + running_ = false; + if (metric_recorder_ != nullptr) { + metric_recorder_->PauseFlow(); + } +} + +void PacketSender::Resume(int64_t paused_time_ms) { + running_ = true; + if (metric_recorder_ != nullptr) { + metric_recorder_->ResumeFlow(paused_time_ms); + } +} + +void PacketSender::set_metric_recorder(MetricRecorder* metric_recorder) { + metric_recorder_ = metric_recorder; +} + +void PacketSender::RecordBitrate() { + if (metric_recorder_ != nullptr) { + BWE_TEST_LOGGING_CONTEXT("Sender"); + BWE_TEST_LOGGING_CONTEXT(*flow_ids().begin()); + metric_recorder_->UpdateTimeMs(clock_.TimeInMilliseconds()); + metric_recorder_->UpdateSendingEstimateKbps(TargetBitrateKbps()); + } +} + +std::list<FeedbackPacket*> GetFeedbackPackets(Packets* in_out, + int64_t end_time_ms, + int flow_id) { + std::list<FeedbackPacket*> fb_packets; + for (auto it = in_out->begin(); it != in_out->end();) { + if ((*it)->send_time_us() > 1000 * end_time_ms) + break; + if ((*it)->GetPacketType() == Packet::kFeedback && + flow_id == (*it)->flow_id()) { + fb_packets.push_back(static_cast<FeedbackPacket*>(*it)); + it = in_out->erase(it); + } else { + ++it; + } + } + return fb_packets; +} + +VideoSender::VideoSender(PacketProcessorListener* listener, + VideoSource* source, + BandwidthEstimatorType estimator_type) + : PacketSender(listener, source->flow_id()), + source_(source), + bwe_(CreateBweSender(estimator_type, + source_->bits_per_second() / 1000, + this, + &clock_)), + previous_sending_bitrate_(0) { + modules_.push_back(bwe_.get()); +} + +VideoSender::~VideoSender() { +} + +void VideoSender::Pause() { + previous_sending_bitrate_ = TargetBitrateKbps(); + PacketSender::Pause(); +} + +void VideoSender::Resume(int64_t paused_time_ms) { + source_->SetBitrateBps(previous_sending_bitrate_); + PacketSender::Resume(paused_time_ms); +} + +void VideoSender::RunFor(int64_t time_ms, Packets* in_out) { + std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets( + in_out, clock_.TimeInMilliseconds() + time_ms, source_->flow_id()); + ProcessFeedbackAndGeneratePackets(time_ms, &feedbacks, in_out); +} + +void VideoSender::ProcessFeedbackAndGeneratePackets( + int64_t time_ms, + std::list<FeedbackPacket*>* feedbacks, + Packets* packets) { + do { + // Make sure to at least run Process() below every 100 ms. + int64_t time_to_run_ms = std::min<int64_t>(time_ms, 100); + if (!feedbacks->empty()) { + int64_t time_until_feedback_ms = + feedbacks->front()->send_time_ms() - clock_.TimeInMilliseconds(); + time_to_run_ms = + std::max<int64_t>(std::min(time_ms, time_until_feedback_ms), 0); + } + + if (!running_) { + source_->SetBitrateBps(0); + } + + Packets generated; + source_->RunFor(time_to_run_ms, &generated); + bwe_->OnPacketsSent(generated); + packets->merge(generated, DereferencingComparator<Packet>); + + clock_.AdvanceTimeMilliseconds(time_to_run_ms); + + if (!feedbacks->empty()) { + bwe_->GiveFeedback(*feedbacks->front()); + delete feedbacks->front(); + feedbacks->pop_front(); + } + + bwe_->Process(); + + time_ms -= time_to_run_ms; + } while (time_ms > 0); + assert(feedbacks->empty()); +} + +int VideoSender::GetFeedbackIntervalMs() const { + return bwe_->GetFeedbackIntervalMs(); +} + +void VideoSender::OnNetworkChanged(uint32_t target_bitrate_bps, + uint8_t fraction_lost, + int64_t rtt) { + source_->SetBitrateBps(target_bitrate_bps); + RecordBitrate(); +} + +uint32_t VideoSender::TargetBitrateKbps() { + return (source_->bits_per_second() + 500) / 1000; +} + +PacedVideoSender::PacedVideoSender(PacketProcessorListener* listener, + VideoSource* source, + BandwidthEstimatorType estimator) + : VideoSender(listener, source, estimator), + pacer_(&clock_, + this, + source->bits_per_second() / 1000, + PacedSender::kDefaultPaceMultiplier * source->bits_per_second() / + 1000, + 0) { + modules_.push_back(&pacer_); +} + +PacedVideoSender::~PacedVideoSender() { + for (Packet* packet : pacer_queue_) + delete packet; + for (Packet* packet : queue_) + delete packet; +} + +void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) { + int64_t end_time_ms = clock_.TimeInMilliseconds() + time_ms; + // Run process periodically to allow the packets to be paced out. + std::list<FeedbackPacket*> feedbacks = + GetFeedbackPackets(in_out, end_time_ms, source_->flow_id()); + int64_t last_run_time_ms = -1; + BWE_TEST_LOGGING_CONTEXT("Sender"); + BWE_TEST_LOGGING_CONTEXT(source_->flow_id()); + do { + int64_t time_until_process_ms = TimeUntilNextProcess(modules_); + int64_t time_until_feedback_ms = time_ms; + if (!feedbacks.empty()) + time_until_feedback_ms = std::max<int64_t>( + feedbacks.front()->send_time_ms() - clock_.TimeInMilliseconds(), 0); + + int64_t time_until_next_event_ms = + std::min(time_until_feedback_ms, time_until_process_ms); + + time_until_next_event_ms = + std::min(source_->GetTimeUntilNextFrameMs(), time_until_next_event_ms); + + // Never run for longer than we have been asked for. + if (clock_.TimeInMilliseconds() + time_until_next_event_ms > end_time_ms) + time_until_next_event_ms = end_time_ms - clock_.TimeInMilliseconds(); + + // Make sure we don't get stuck if an event doesn't trigger. This typically + // happens if the prober wants to probe, but there's no packet to send. + if (time_until_next_event_ms == 0 && last_run_time_ms == 0) + time_until_next_event_ms = 1; + last_run_time_ms = time_until_next_event_ms; + + Packets generated_packets; + source_->RunFor(time_until_next_event_ms, &generated_packets); + if (!generated_packets.empty()) { + for (Packet* packet : generated_packets) { + MediaPacket* media_packet = static_cast<MediaPacket*>(packet); + pacer_.InsertPacket( + PacedSender::kNormalPriority, media_packet->header().ssrc, + media_packet->header().sequenceNumber, media_packet->send_time_ms(), + media_packet->payload_size(), false); + pacer_queue_.push_back(packet); + assert(pacer_queue_.size() < 10000); + } + } + + clock_.AdvanceTimeMilliseconds(time_until_next_event_ms); + + if (time_until_next_event_ms == time_until_feedback_ms) { + if (!feedbacks.empty()) { + bwe_->GiveFeedback(*feedbacks.front()); + delete feedbacks.front(); + feedbacks.pop_front(); + } + bwe_->Process(); + } + + if (time_until_next_event_ms == time_until_process_ms) { + CallProcess(modules_); + } + } while (clock_.TimeInMilliseconds() < end_time_ms); + QueuePackets(in_out, end_time_ms * 1000); +} + +int64_t PacedVideoSender::TimeUntilNextProcess( + const std::list<Module*>& modules) { + int64_t time_until_next_process_ms = 10; + for (Module* module : modules) { + int64_t next_process_ms = module->TimeUntilNextProcess(); + if (next_process_ms < time_until_next_process_ms) + time_until_next_process_ms = next_process_ms; + } + if (time_until_next_process_ms < 0) + time_until_next_process_ms = 0; + return time_until_next_process_ms; +} + +void PacedVideoSender::CallProcess(const std::list<Module*>& modules) { + for (Module* module : modules) { + if (module->TimeUntilNextProcess() <= 0) { + module->Process(); + } + } +} + +void PacedVideoSender::QueuePackets(Packets* batch, + int64_t end_of_batch_time_us) { + queue_.merge(*batch, DereferencingComparator<Packet>); + if (queue_.empty()) { + return; + } + Packets::iterator it = queue_.begin(); + for (; it != queue_.end(); ++it) { + if ((*it)->send_time_us() > end_of_batch_time_us) { + break; + } + } + Packets to_transfer; + to_transfer.splice(to_transfer.begin(), queue_, queue_.begin(), it); + for (Packet* packet : to_transfer) + packet->set_paced(true); + bwe_->OnPacketsSent(to_transfer); + batch->merge(to_transfer, DereferencingComparator<Packet>); +} + +bool PacedVideoSender::TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission) { + for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end(); + ++it) { + MediaPacket* media_packet = static_cast<MediaPacket*>(*it); + if (media_packet->header().sequenceNumber == sequence_number) { + int64_t pace_out_time_ms = clock_.TimeInMilliseconds(); + + // Make sure a packet is never paced out earlier than when it was put into + // the pacer. + assert(pace_out_time_ms >= media_packet->send_time_ms()); + + media_packet->SetAbsSendTimeMs(pace_out_time_ms); + media_packet->set_send_time_us(1000 * pace_out_time_ms); + media_packet->set_sender_timestamp_us(1000 * pace_out_time_ms); + queue_.push_back(media_packet); + pacer_queue_.erase(it); + return true; + } + } + return false; +} + +size_t PacedVideoSender::TimeToSendPadding(size_t bytes) { + return 0; +} + +void PacedVideoSender::OnNetworkChanged(uint32_t target_bitrate_bps, + uint8_t fraction_lost, + int64_t rtt) { + VideoSender::OnNetworkChanged(target_bitrate_bps, fraction_lost, rtt); + pacer_.UpdateBitrate( + target_bitrate_bps / 1000, + PacedSender::kDefaultPaceMultiplier * target_bitrate_bps / 1000, 0); +} + +const int kNoLimit = std::numeric_limits<int>::max(); +const int kPacketSizeBytes = 1200; + +TcpSender::TcpSender(PacketProcessorListener* listener, + int flow_id, + int64_t offset_ms) + : TcpSender(listener, flow_id, offset_ms, kNoLimit) { +} + +TcpSender::TcpSender(PacketProcessorListener* listener, + int flow_id, + int64_t offset_ms, + int send_limit_bytes) + : PacketSender(listener, flow_id), + cwnd_(10), + ssthresh_(kNoLimit), + ack_received_(false), + last_acked_seq_num_(0), + next_sequence_number_(0), + offset_ms_(offset_ms), + last_reduction_time_ms_(-1), + last_rtt_ms_(0), + total_sent_bytes_(0), + send_limit_bytes_(send_limit_bytes), + last_generated_packets_ms_(0), + num_recent_sent_packets_(0), + bitrate_kbps_(0) { +} + +void TcpSender::RunFor(int64_t time_ms, Packets* in_out) { + if (clock_.TimeInMilliseconds() + time_ms < offset_ms_) { + clock_.AdvanceTimeMilliseconds(time_ms); + if (running_) { + Pause(); + } + return; + } + + if (!running_ && total_sent_bytes_ == 0) { + Resume(offset_ms_); + } + + int64_t start_time_ms = clock_.TimeInMilliseconds(); + + std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets( + in_out, clock_.TimeInMilliseconds() + time_ms, *flow_ids().begin()); + // The number of packets which are sent in during time_ms depends on the + // number of packets in_flight_ and the max number of packets in flight + // (cwnd_). Therefore SendPackets() isn't directly dependent on time_ms. + for (FeedbackPacket* fb : feedbacks) { + clock_.AdvanceTimeMilliseconds(fb->send_time_ms() - + clock_.TimeInMilliseconds()); + last_rtt_ms_ = fb->send_time_ms() - fb->latest_send_time_ms(); + UpdateCongestionControl(fb); + SendPackets(in_out); + } + + for (auto it = in_flight_.begin(); it != in_flight_.end();) { + if (it->time_ms < clock_.TimeInMilliseconds() - 1000) + in_flight_.erase(it++); + else + ++it; + } + + clock_.AdvanceTimeMilliseconds(time_ms - + (clock_.TimeInMilliseconds() - start_time_ms)); + SendPackets(in_out); +} + +void TcpSender::SendPackets(Packets* in_out) { + int cwnd = ceil(cwnd_); + int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0); + int timed_out = TriggerTimeouts(); + if (timed_out > 0) { + HandleLoss(); + } + if (packets_to_send > 0) { + Packets generated = GeneratePackets(packets_to_send); + for (Packet* packet : generated) + in_flight_.insert(InFlight(*static_cast<MediaPacket*>(packet))); + + in_out->merge(generated, DereferencingComparator<Packet>); + } +} + +void TcpSender::UpdateCongestionControl(const FeedbackPacket* fb) { + const TcpFeedback* tcp_fb = static_cast<const TcpFeedback*>(fb); + RTC_DCHECK(!tcp_fb->acked_packets().empty()); + ack_received_ = true; + + uint16_t expected = tcp_fb->acked_packets().back() - last_acked_seq_num_; + uint16_t missing = + expected - static_cast<uint16_t>(tcp_fb->acked_packets().size()); + + for (uint16_t ack_seq_num : tcp_fb->acked_packets()) + in_flight_.erase(InFlight(ack_seq_num, clock_.TimeInMilliseconds())); + + if (missing > 0) { + HandleLoss(); + } else if (cwnd_ <= ssthresh_) { + cwnd_ += tcp_fb->acked_packets().size(); + } else { + cwnd_ += 1.0f / cwnd_; + } + + last_acked_seq_num_ = + LatestSequenceNumber(tcp_fb->acked_packets().back(), last_acked_seq_num_); +} + +int TcpSender::TriggerTimeouts() { + int timed_out = 0; + for (auto it = in_flight_.begin(); it != in_flight_.end();) { + if (it->time_ms < clock_.TimeInMilliseconds() - 1000) { + in_flight_.erase(it++); + ++timed_out; + } else { + ++it; + } + } + return timed_out; +} + +void TcpSender::HandleLoss() { + if (clock_.TimeInMilliseconds() - last_reduction_time_ms_ < last_rtt_ms_) + return; + last_reduction_time_ms_ = clock_.TimeInMilliseconds(); + ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2); + cwnd_ = ssthresh_; +} + +Packets TcpSender::GeneratePackets(size_t num_packets) { + Packets generated; + + UpdateSendBitrateEstimate(num_packets); + + for (size_t i = 0; i < num_packets; ++i) { + if ((total_sent_bytes_ + kPacketSizeBytes) > send_limit_bytes_) { + if (running_) { + Pause(); + } + break; + } + generated.push_back( + new MediaPacket(*flow_ids().begin(), 1000 * clock_.TimeInMilliseconds(), + kPacketSizeBytes, next_sequence_number_++)); + generated.back()->set_sender_timestamp_us( + 1000 * clock_.TimeInMilliseconds()); + + total_sent_bytes_ += kPacketSizeBytes; + } + + return generated; +} + +void TcpSender::UpdateSendBitrateEstimate(size_t num_packets) { + const int kTimeWindowMs = 500; + num_recent_sent_packets_ += num_packets; + + int64_t delta_ms = clock_.TimeInMilliseconds() - last_generated_packets_ms_; + if (delta_ms >= kTimeWindowMs) { + bitrate_kbps_ = + static_cast<uint32_t>(8 * num_recent_sent_packets_ * kPacketSizeBytes) / + delta_ms; + last_generated_packets_ms_ = clock_.TimeInMilliseconds(); + num_recent_sent_packets_ = 0; + } + + RecordBitrate(); +} + +uint32_t TcpSender::TargetBitrateKbps() { + return bitrate_kbps_; +} + +} // namespace bwe +} // namespace testing +} // namespace webrtc |