/* * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "webrtc/modules/remote_bitrate_estimator/test/packet_sender.h" #include #include #include #include "webrtc/base/checks.h" #include "webrtc/modules/include/module_common_types.h" #include "webrtc/modules/remote_bitrate_estimator/test/bwe.h" #include "webrtc/modules/remote_bitrate_estimator/test/metric_recorder.h" namespace webrtc { namespace testing { namespace bwe { void PacketSender::Pause() { running_ = false; if (metric_recorder_ != nullptr) { metric_recorder_->PauseFlow(); } } void PacketSender::Resume(int64_t paused_time_ms) { running_ = true; if (metric_recorder_ != nullptr) { metric_recorder_->ResumeFlow(paused_time_ms); } } void PacketSender::set_metric_recorder(MetricRecorder* metric_recorder) { metric_recorder_ = metric_recorder; } void PacketSender::RecordBitrate() { if (metric_recorder_ != nullptr) { BWE_TEST_LOGGING_CONTEXT("Sender"); BWE_TEST_LOGGING_CONTEXT(*flow_ids().begin()); metric_recorder_->UpdateTimeMs(clock_.TimeInMilliseconds()); metric_recorder_->UpdateSendingEstimateKbps(TargetBitrateKbps()); } } std::list GetFeedbackPackets(Packets* in_out, int64_t end_time_ms, int flow_id) { std::list fb_packets; for (auto it = in_out->begin(); it != in_out->end();) { if ((*it)->send_time_us() > 1000 * end_time_ms) break; if ((*it)->GetPacketType() == Packet::kFeedback && flow_id == (*it)->flow_id()) { fb_packets.push_back(static_cast(*it)); it = in_out->erase(it); } else { ++it; } } return fb_packets; } VideoSender::VideoSender(PacketProcessorListener* listener, VideoSource* source, BandwidthEstimatorType estimator_type) : PacketSender(listener, source->flow_id()), source_(source), bwe_(CreateBweSender(estimator_type, source_->bits_per_second() / 1000, this, &clock_)), previous_sending_bitrate_(0) { modules_.push_back(bwe_.get()); } VideoSender::~VideoSender() { } void VideoSender::Pause() { previous_sending_bitrate_ = TargetBitrateKbps(); PacketSender::Pause(); } void VideoSender::Resume(int64_t paused_time_ms) { source_->SetBitrateBps(previous_sending_bitrate_); PacketSender::Resume(paused_time_ms); } void VideoSender::RunFor(int64_t time_ms, Packets* in_out) { std::list feedbacks = GetFeedbackPackets( in_out, clock_.TimeInMilliseconds() + time_ms, source_->flow_id()); ProcessFeedbackAndGeneratePackets(time_ms, &feedbacks, in_out); } void VideoSender::ProcessFeedbackAndGeneratePackets( int64_t time_ms, std::list* feedbacks, Packets* packets) { do { // Make sure to at least run Process() below every 100 ms. int64_t time_to_run_ms = std::min(time_ms, 100); if (!feedbacks->empty()) { int64_t time_until_feedback_ms = feedbacks->front()->send_time_ms() - clock_.TimeInMilliseconds(); time_to_run_ms = std::max(std::min(time_ms, time_until_feedback_ms), 0); } if (!running_) { source_->SetBitrateBps(0); } Packets generated; source_->RunFor(time_to_run_ms, &generated); bwe_->OnPacketsSent(generated); packets->merge(generated, DereferencingComparator); clock_.AdvanceTimeMilliseconds(time_to_run_ms); if (!feedbacks->empty()) { bwe_->GiveFeedback(*feedbacks->front()); delete feedbacks->front(); feedbacks->pop_front(); } bwe_->Process(); time_ms -= time_to_run_ms; } while (time_ms > 0); assert(feedbacks->empty()); } int VideoSender::GetFeedbackIntervalMs() const { return bwe_->GetFeedbackIntervalMs(); } void VideoSender::OnNetworkChanged(uint32_t target_bitrate_bps, uint8_t fraction_lost, int64_t rtt) { source_->SetBitrateBps(target_bitrate_bps); RecordBitrate(); } uint32_t VideoSender::TargetBitrateKbps() { return (source_->bits_per_second() + 500) / 1000; } PacedVideoSender::PacedVideoSender(PacketProcessorListener* listener, VideoSource* source, BandwidthEstimatorType estimator) : VideoSender(listener, source, estimator), pacer_(&clock_, this, source->bits_per_second() / 1000, PacedSender::kDefaultPaceMultiplier * source->bits_per_second() / 1000, 0) { modules_.push_back(&pacer_); } PacedVideoSender::~PacedVideoSender() { for (Packet* packet : pacer_queue_) delete packet; for (Packet* packet : queue_) delete packet; } void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) { int64_t end_time_ms = clock_.TimeInMilliseconds() + time_ms; // Run process periodically to allow the packets to be paced out. std::list feedbacks = GetFeedbackPackets(in_out, end_time_ms, source_->flow_id()); int64_t last_run_time_ms = -1; BWE_TEST_LOGGING_CONTEXT("Sender"); BWE_TEST_LOGGING_CONTEXT(source_->flow_id()); do { int64_t time_until_process_ms = TimeUntilNextProcess(modules_); int64_t time_until_feedback_ms = time_ms; if (!feedbacks.empty()) time_until_feedback_ms = std::max( feedbacks.front()->send_time_ms() - clock_.TimeInMilliseconds(), 0); int64_t time_until_next_event_ms = std::min(time_until_feedback_ms, time_until_process_ms); time_until_next_event_ms = std::min(source_->GetTimeUntilNextFrameMs(), time_until_next_event_ms); // Never run for longer than we have been asked for. if (clock_.TimeInMilliseconds() + time_until_next_event_ms > end_time_ms) time_until_next_event_ms = end_time_ms - clock_.TimeInMilliseconds(); // Make sure we don't get stuck if an event doesn't trigger. This typically // happens if the prober wants to probe, but there's no packet to send. if (time_until_next_event_ms == 0 && last_run_time_ms == 0) time_until_next_event_ms = 1; last_run_time_ms = time_until_next_event_ms; Packets generated_packets; source_->RunFor(time_until_next_event_ms, &generated_packets); if (!generated_packets.empty()) { for (Packet* packet : generated_packets) { MediaPacket* media_packet = static_cast(packet); pacer_.InsertPacket( PacedSender::kNormalPriority, media_packet->header().ssrc, media_packet->header().sequenceNumber, media_packet->send_time_ms(), media_packet->payload_size(), false); pacer_queue_.push_back(packet); assert(pacer_queue_.size() < 10000); } } clock_.AdvanceTimeMilliseconds(time_until_next_event_ms); if (time_until_next_event_ms == time_until_feedback_ms) { if (!feedbacks.empty()) { bwe_->GiveFeedback(*feedbacks.front()); delete feedbacks.front(); feedbacks.pop_front(); } bwe_->Process(); } if (time_until_next_event_ms == time_until_process_ms) { CallProcess(modules_); } } while (clock_.TimeInMilliseconds() < end_time_ms); QueuePackets(in_out, end_time_ms * 1000); } int64_t PacedVideoSender::TimeUntilNextProcess( const std::list& modules) { int64_t time_until_next_process_ms = 10; for (Module* module : modules) { int64_t next_process_ms = module->TimeUntilNextProcess(); if (next_process_ms < time_until_next_process_ms) time_until_next_process_ms = next_process_ms; } if (time_until_next_process_ms < 0) time_until_next_process_ms = 0; return time_until_next_process_ms; } void PacedVideoSender::CallProcess(const std::list& modules) { for (Module* module : modules) { if (module->TimeUntilNextProcess() <= 0) { module->Process(); } } } void PacedVideoSender::QueuePackets(Packets* batch, int64_t end_of_batch_time_us) { queue_.merge(*batch, DereferencingComparator); if (queue_.empty()) { return; } Packets::iterator it = queue_.begin(); for (; it != queue_.end(); ++it) { if ((*it)->send_time_us() > end_of_batch_time_us) { break; } } Packets to_transfer; to_transfer.splice(to_transfer.begin(), queue_, queue_.begin(), it); for (Packet* packet : to_transfer) packet->set_paced(true); bwe_->OnPacketsSent(to_transfer); batch->merge(to_transfer, DereferencingComparator); } bool PacedVideoSender::TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms, bool retransmission) { for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end(); ++it) { MediaPacket* media_packet = static_cast(*it); if (media_packet->header().sequenceNumber == sequence_number) { int64_t pace_out_time_ms = clock_.TimeInMilliseconds(); // Make sure a packet is never paced out earlier than when it was put into // the pacer. assert(pace_out_time_ms >= media_packet->send_time_ms()); media_packet->SetAbsSendTimeMs(pace_out_time_ms); media_packet->set_send_time_us(1000 * pace_out_time_ms); media_packet->set_sender_timestamp_us(1000 * pace_out_time_ms); queue_.push_back(media_packet); pacer_queue_.erase(it); return true; } } return false; } size_t PacedVideoSender::TimeToSendPadding(size_t bytes) { return 0; } void PacedVideoSender::OnNetworkChanged(uint32_t target_bitrate_bps, uint8_t fraction_lost, int64_t rtt) { VideoSender::OnNetworkChanged(target_bitrate_bps, fraction_lost, rtt); pacer_.UpdateBitrate( target_bitrate_bps / 1000, PacedSender::kDefaultPaceMultiplier * target_bitrate_bps / 1000, 0); } const int kNoLimit = std::numeric_limits::max(); const int kPacketSizeBytes = 1200; TcpSender::TcpSender(PacketProcessorListener* listener, int flow_id, int64_t offset_ms) : TcpSender(listener, flow_id, offset_ms, kNoLimit) { } TcpSender::TcpSender(PacketProcessorListener* listener, int flow_id, int64_t offset_ms, int send_limit_bytes) : PacketSender(listener, flow_id), cwnd_(10), ssthresh_(kNoLimit), ack_received_(false), last_acked_seq_num_(0), next_sequence_number_(0), offset_ms_(offset_ms), last_reduction_time_ms_(-1), last_rtt_ms_(0), total_sent_bytes_(0), send_limit_bytes_(send_limit_bytes), last_generated_packets_ms_(0), num_recent_sent_packets_(0), bitrate_kbps_(0) { } void TcpSender::RunFor(int64_t time_ms, Packets* in_out) { if (clock_.TimeInMilliseconds() + time_ms < offset_ms_) { clock_.AdvanceTimeMilliseconds(time_ms); if (running_) { Pause(); } return; } if (!running_ && total_sent_bytes_ == 0) { Resume(offset_ms_); } int64_t start_time_ms = clock_.TimeInMilliseconds(); std::list feedbacks = GetFeedbackPackets( in_out, clock_.TimeInMilliseconds() + time_ms, *flow_ids().begin()); // The number of packets which are sent in during time_ms depends on the // number of packets in_flight_ and the max number of packets in flight // (cwnd_). Therefore SendPackets() isn't directly dependent on time_ms. for (FeedbackPacket* fb : feedbacks) { clock_.AdvanceTimeMilliseconds(fb->send_time_ms() - clock_.TimeInMilliseconds()); last_rtt_ms_ = fb->send_time_ms() - fb->latest_send_time_ms(); UpdateCongestionControl(fb); SendPackets(in_out); } for (auto it = in_flight_.begin(); it != in_flight_.end();) { if (it->time_ms < clock_.TimeInMilliseconds() - 1000) in_flight_.erase(it++); else ++it; } clock_.AdvanceTimeMilliseconds(time_ms - (clock_.TimeInMilliseconds() - start_time_ms)); SendPackets(in_out); } void TcpSender::SendPackets(Packets* in_out) { int cwnd = ceil(cwnd_); int packets_to_send = std::max(cwnd - static_cast(in_flight_.size()), 0); int timed_out = TriggerTimeouts(); if (timed_out > 0) { HandleLoss(); } if (packets_to_send > 0) { Packets generated = GeneratePackets(packets_to_send); for (Packet* packet : generated) in_flight_.insert(InFlight(*static_cast(packet))); in_out->merge(generated, DereferencingComparator); } } void TcpSender::UpdateCongestionControl(const FeedbackPacket* fb) { const TcpFeedback* tcp_fb = static_cast(fb); RTC_DCHECK(!tcp_fb->acked_packets().empty()); ack_received_ = true; uint16_t expected = tcp_fb->acked_packets().back() - last_acked_seq_num_; uint16_t missing = expected - static_cast(tcp_fb->acked_packets().size()); for (uint16_t ack_seq_num : tcp_fb->acked_packets()) in_flight_.erase(InFlight(ack_seq_num, clock_.TimeInMilliseconds())); if (missing > 0) { HandleLoss(); } else if (cwnd_ <= ssthresh_) { cwnd_ += tcp_fb->acked_packets().size(); } else { cwnd_ += 1.0f / cwnd_; } last_acked_seq_num_ = LatestSequenceNumber(tcp_fb->acked_packets().back(), last_acked_seq_num_); } int TcpSender::TriggerTimeouts() { int timed_out = 0; for (auto it = in_flight_.begin(); it != in_flight_.end();) { if (it->time_ms < clock_.TimeInMilliseconds() - 1000) { in_flight_.erase(it++); ++timed_out; } else { ++it; } } return timed_out; } void TcpSender::HandleLoss() { if (clock_.TimeInMilliseconds() - last_reduction_time_ms_ < last_rtt_ms_) return; last_reduction_time_ms_ = clock_.TimeInMilliseconds(); ssthresh_ = std::max(static_cast(in_flight_.size() / 2), 2); cwnd_ = ssthresh_; } Packets TcpSender::GeneratePackets(size_t num_packets) { Packets generated; UpdateSendBitrateEstimate(num_packets); for (size_t i = 0; i < num_packets; ++i) { if ((total_sent_bytes_ + kPacketSizeBytes) > send_limit_bytes_) { if (running_) { Pause(); } break; } generated.push_back( new MediaPacket(*flow_ids().begin(), 1000 * clock_.TimeInMilliseconds(), kPacketSizeBytes, next_sequence_number_++)); generated.back()->set_sender_timestamp_us( 1000 * clock_.TimeInMilliseconds()); total_sent_bytes_ += kPacketSizeBytes; } return generated; } void TcpSender::UpdateSendBitrateEstimate(size_t num_packets) { const int kTimeWindowMs = 500; num_recent_sent_packets_ += num_packets; int64_t delta_ms = clock_.TimeInMilliseconds() - last_generated_packets_ms_; if (delta_ms >= kTimeWindowMs) { bitrate_kbps_ = static_cast(8 * num_recent_sent_packets_ * kPacketSizeBytes) / delta_ms; last_generated_packets_ms_ = clock_.TimeInMilliseconds(); num_recent_sent_packets_ = 0; } RecordBitrate(); } uint32_t TcpSender::TargetBitrateKbps() { return bitrate_kbps_; } } // namespace bwe } // namespace testing } // namespace webrtc