diff options
Diffstat (limited to 'src/recovery/delivery_rate.rs')
-rw-r--r-- | src/recovery/delivery_rate.rs | 452 |
1 files changed, 262 insertions, 190 deletions
diff --git a/src/recovery/delivery_rate.rs b/src/recovery/delivery_rate.rs index 97edeba..dcc3e48 100644 --- a/src/recovery/delivery_rate.rs +++ b/src/recovery/delivery_rate.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2020, Cloudflare, Inc. +// Copyright (C) 2020-2022, Cloudflare, Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -27,127 +27,164 @@ //! Delivery rate estimation. //! //! This implements the algorithm for estimating delivery rate as described in -//! <https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-00> - -use std::cmp; +//! <https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-01> use std::time::Duration; use std::time::Instant; +use crate::recovery::Acked; use crate::recovery::Sent; -#[derive(Default)] +#[derive(Debug)] pub struct Rate { delivered: usize, - delivered_time: Option<Instant>, + delivered_time: Instant, + + first_sent_time: Instant, - recent_delivered_packet_sent_time: Option<Instant>, + // Packet number of the last sent packet with app limited. + end_of_app_limited: u64, - app_limited_at_pkt: usize, + // Packet number of the last sent packet. + last_sent_packet: u64, + // Packet number of the largest acked packet. + largest_acked: u64, + + // Sample of rate estimation. rate_sample: RateSample, } -impl Rate { - pub fn on_packet_sent(&mut self, pkt: &mut Sent, now: Instant) { - if self.delivered_time.is_none() { - self.delivered_time = Some(now); +impl Default for Rate { + fn default() -> Self { + let now = Instant::now(); + + Rate { + delivered: 0, + + delivered_time: now, + + first_sent_time: now, + + end_of_app_limited: 0, + + last_sent_packet: 0, + + largest_acked: 0, + + rate_sample: RateSample::default(), } + } +} - if self.recent_delivered_packet_sent_time.is_none() { - self.recent_delivered_packet_sent_time = Some(now); +impl Rate { + pub fn on_packet_sent( + &mut self, pkt: &mut Sent, bytes_in_flight: usize, now: Instant, + ) { + // No packets in flight yet? + if bytes_in_flight == 0 { + self.first_sent_time = now; + self.delivered_time = now; } + pkt.first_sent_time = self.first_sent_time; + pkt.delivered_time = self.delivered_time; pkt.delivered = self.delivered; - pkt.delivered_time = self.delivered_time.unwrap(); - - pkt.recent_delivered_packet_sent_time = - self.recent_delivered_packet_sent_time.unwrap(); + pkt.is_app_limited = self.app_limited(); - pkt.is_app_limited = self.app_limited_at_pkt > 0; + self.last_sent_packet = pkt.pkt_num; } - pub fn on_packet_acked(&mut self, pkt: &Sent, now: Instant) { - self.rate_sample.prior_time = Some(pkt.delivered_time); - + // Update the delivery rate sample when a packet is acked. + pub fn update_rate_sample(&mut self, pkt: &Acked, now: Instant) { self.delivered += pkt.size; - self.delivered_time = Some(now); + self.delivered_time = now; - if pkt.delivered > self.rate_sample.prior_delivered { + // Update info using the newest packet. If rate_sample is not yet + // initialized, initialize with the first packet. + if self.rate_sample.prior_time.is_none() || + pkt.delivered > self.rate_sample.prior_delivered + { self.rate_sample.prior_delivered = pkt.delivered; - + self.rate_sample.prior_time = Some(pkt.delivered_time); + self.rate_sample.is_app_limited = pkt.is_app_limited; self.rate_sample.send_elapsed = - pkt.time_sent - pkt.recent_delivered_packet_sent_time; - + pkt.time_sent.saturating_duration_since(pkt.first_sent_time); + self.rate_sample.rtt = pkt.rtt; self.rate_sample.ack_elapsed = self .delivered_time - .unwrap() - .duration_since(pkt.delivered_time); + .saturating_duration_since(pkt.delivered_time); - self.recent_delivered_packet_sent_time = Some(pkt.time_sent); + self.first_sent_time = pkt.time_sent; } - } - pub fn estimate(&mut self) { - if (self.app_limited_at_pkt > 0) && - (self.delivered > self.app_limited_at_pkt) - { - self.app_limited_at_pkt = 0; - } + self.largest_acked = self.largest_acked.max(pkt.pkt_num); + } - match self.rate_sample.prior_time { - Some(_) => { - self.rate_sample.delivered = - self.delivered - self.rate_sample.prior_delivered; - - self.rate_sample.interval = cmp::max( - self.rate_sample.send_elapsed, - self.rate_sample.ack_elapsed, - ); - }, - None => return, + pub fn generate_rate_sample(&mut self, min_rtt: Duration) { + // End app-limited phase if bubble is ACKed and gone. + if self.app_limited() && self.largest_acked > self.end_of_app_limited { + self.update_app_limited(false); } - if self.rate_sample.interval.as_secs_f64() > 0.0 { - self.rate_sample.delivery_rate = (self.rate_sample.delivered as f64 / - self.rate_sample.interval.as_secs_f64()) - as u64; + if self.rate_sample.prior_time.is_some() { + let interval = self + .rate_sample + .send_elapsed + .max(self.rate_sample.ack_elapsed); + + self.rate_sample.delivered = + self.delivered - self.rate_sample.prior_delivered; + self.rate_sample.interval = interval; + + if interval < min_rtt { + self.rate_sample.interval = Duration::ZERO; + + // No reliable sample. + return; + } + + if !interval.is_zero() { + // Fill in rate_sample with a rate sample. + self.rate_sample.delivery_rate = + (self.rate_sample.delivered as f64 / interval.as_secs_f64()) + as u64; + } } } - pub fn check_app_limited(&mut self, bytes_in_flight: usize) { - let limited = self.delivered + bytes_in_flight; - self.app_limited_at_pkt = if limited > 0 { limited } else { 1 }; + pub fn update_app_limited(&mut self, v: bool) { + self.end_of_app_limited = if v { self.last_sent_packet.max(1) } else { 0 } } - pub fn delivery_rate(&self) -> u64 { - self.rate_sample.delivery_rate + pub fn app_limited(&mut self) -> bool { + self.end_of_app_limited != 0 } -} - -impl std::fmt::Debug for Rate { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "delivered={:?} ", self.delivered)?; - if let Some(t) = self.delivered_time { - write!(f, "delivered_time={:?} ", t.elapsed())?; - } + pub fn _delivered(&self) -> usize { + self.delivered + } - if let Some(t) = self.recent_delivered_packet_sent_time { - write!(f, "recent_delivered_packet_sent_time={:?} ", t.elapsed())?; - } + pub fn sample_delivery_rate(&self) -> u64 { + self.rate_sample.delivery_rate + } - write!(f, "app_limited_at_pkt={:?} ", self.app_limited_at_pkt)?; + pub fn _sample_rtt(&self) -> Duration { + self.rate_sample.rtt + } - Ok(()) + pub fn _sample_is_app_limited(&self) -> bool { + self.rate_sample.is_app_limited } } -#[derive(Default)] +#[derive(Default, Debug)] struct RateSample { delivery_rate: u64, + is_app_limited: bool, + interval: Duration, delivered: usize, @@ -159,22 +196,8 @@ struct RateSample { send_elapsed: Duration, ack_elapsed: Duration, -} -impl std::fmt::Debug for RateSample { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "delivery_rate={:?} ", self.delivery_rate)?; - write!(f, "interval={:?} ", self.interval)?; - write!(f, "delivered={:?} ", self.delivered)?; - write!(f, "prior_delivered={:?} ", self.prior_delivered)?; - write!(f, "send_elapsed={:?} ", self.send_elapsed)?; - if let Some(t) = self.prior_time { - write!(f, "prior_time={:?} ", t.elapsed())?; - } - write!(f, "ack_elapsed={:?}", self.ack_elapsed)?; - - Ok(()) - } + rtt: Duration, } #[cfg(test)] @@ -186,114 +209,163 @@ mod tests { #[test] fn rate_check() { let config = Config::new(0xbabababa).unwrap(); - let mut recovery = Recovery::new(&config); - - let mut pkt_1 = Sent { - pkt_num: 0, - frames: vec![], - time_sent: Instant::now(), - time_acked: None, - time_lost: None, - size: 1200, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: Instant::now(), - recent_delivered_packet_sent_time: Instant::now(), - is_app_limited: false, - has_data: false, - }; - - recovery - .delivery_rate - .on_packet_sent(&mut pkt_1, Instant::now()); - std::thread::sleep(Duration::from_millis(50)); - recovery - .delivery_rate - .on_packet_acked(&pkt_1, Instant::now()); - - let mut pkt_2 = Sent { - pkt_num: 1, - frames: vec![], - time_sent: Instant::now(), - time_acked: None, - time_lost: None, - size: 1200, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: Instant::now(), - recent_delivered_packet_sent_time: Instant::now(), - is_app_limited: false, - has_data: false, - }; - - recovery - .delivery_rate - .on_packet_sent(&mut pkt_2, Instant::now()); - std::thread::sleep(Duration::from_millis(50)); - recovery - .delivery_rate - .on_packet_acked(&pkt_2, Instant::now()); - recovery.delivery_rate.estimate(); - - assert!(recovery.delivery_rate() > 0); + let mut r = Recovery::new(&config); + + let now = Instant::now(); + let mss = r.max_datagram_size(); + + // Send 2 packets. + for pn in 0..2 { + let pkt = Sent { + pkt_num: pn, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: mss, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + pkt, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + } + + let rtt = Duration::from_millis(50); + let now = now + rtt; + + // Ack 2 packets. + for pn in 0..2 { + let acked = Acked { + pkt_num: pn, + time_sent: now, + size: mss, + rtt, + delivered: 0, + delivered_time: now, + first_sent_time: now - rtt, + is_app_limited: false, + }; + + r.delivery_rate.update_rate_sample(&acked, now); + } + + // Update rate sample after 1 rtt. + r.delivery_rate.generate_rate_sample(rtt); + + // Bytes acked so far. + assert_eq!(r.delivery_rate._delivered(), 2400); + + // Estimated delivery rate = (1200 x 2) / 0.05s = 48000. + assert_eq!(r.delivery_rate(), 48000); + } + + #[test] + fn app_limited_cwnd_full() { + let config = Config::new(0xbabababa).unwrap(); + let mut r = Recovery::new(&config); + + let now = Instant::now(); + let mss = r.max_datagram_size(); + + // Send 10 packets to fill cwnd. + for pn in 0..10 { + let pkt = Sent { + pkt_num: pn, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: mss, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + pkt, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + } + + assert_eq!(r.app_limited(), false); + assert_eq!(r.delivery_rate._sample_is_app_limited(), false); } #[test] fn app_limited_check() { let config = Config::new(0xbabababa).unwrap(); - let mut recvry = Recovery::new(&config); - - let mut pkt_1 = Sent { - pkt_num: 0, - frames: vec![], - time_sent: Instant::now(), - time_acked: None, - time_lost: None, - size: 1200, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: Instant::now(), - recent_delivered_packet_sent_time: Instant::now(), - is_app_limited: false, - has_data: false, - }; - - recvry - .delivery_rate - .on_packet_sent(&mut pkt_1, Instant::now()); - std::thread::sleep(Duration::from_millis(50)); - recvry.delivery_rate.on_packet_acked(&pkt_1, Instant::now()); - - let mut pkt_2 = Sent { - pkt_num: 1, - frames: vec![], - time_sent: Instant::now(), - time_acked: None, - time_lost: None, - size: 1200, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: Instant::now(), - recent_delivered_packet_sent_time: Instant::now(), - is_app_limited: false, - has_data: false, - }; - - recvry.app_limited = true; - recvry - .delivery_rate - .check_app_limited(recvry.bytes_in_flight); - recvry - .delivery_rate - .on_packet_sent(&mut pkt_2, Instant::now()); - std::thread::sleep(Duration::from_millis(50)); - recvry.delivery_rate.on_packet_acked(&pkt_2, Instant::now()); - recvry.delivery_rate.estimate(); - - assert_eq!(recvry.delivery_rate.app_limited_at_pkt, 0); + let mut r = Recovery::new(&config); + + let now = Instant::now(); + let mss = r.max_datagram_size(); + + // Send 5 packets. + for pn in 0..5 { + let pkt = Sent { + pkt_num: pn, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: mss, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + pkt, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + } + + let rtt = Duration::from_millis(50); + let now = now + rtt; + + let mut acked = ranges::RangeSet::default(); + acked.insert(0..5); + + assert_eq!( + r.on_ack_received( + &acked, + 25, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ), + Ok(()), + ); + + assert_eq!(r.app_limited(), true); + + // Rate sample is not app limited (all acked). + assert_eq!(r.delivery_rate._sample_is_app_limited(), false); + assert_eq!(r.delivery_rate._sample_rtt(), rtt); } } |