aboutsummaryrefslogtreecommitdiff
path: root/src/recovery/delivery_rate.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/recovery/delivery_rate.rs')
-rw-r--r--src/recovery/delivery_rate.rs452
1 files changed, 262 insertions, 190 deletions
diff --git a/src/recovery/delivery_rate.rs b/src/recovery/delivery_rate.rs
index 97edeba..dcc3e48 100644
--- a/src/recovery/delivery_rate.rs
+++ b/src/recovery/delivery_rate.rs
@@ -1,4 +1,4 @@
-// Copyright (C) 2020, Cloudflare, Inc.
+// Copyright (C) 2020-2022, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -27,127 +27,164 @@
//! Delivery rate estimation.
//!
//! This implements the algorithm for estimating delivery rate as described in
-//! <https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-00>
-
-use std::cmp;
+//! <https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-01>
use std::time::Duration;
use std::time::Instant;
+use crate::recovery::Acked;
use crate::recovery::Sent;
-#[derive(Default)]
+#[derive(Debug)]
pub struct Rate {
delivered: usize,
- delivered_time: Option<Instant>,
+ delivered_time: Instant,
+
+ first_sent_time: Instant,
- recent_delivered_packet_sent_time: Option<Instant>,
+ // Packet number of the last sent packet with app limited.
+ end_of_app_limited: u64,
- app_limited_at_pkt: usize,
+ // Packet number of the last sent packet.
+ last_sent_packet: u64,
+ // Packet number of the largest acked packet.
+ largest_acked: u64,
+
+ // Sample of rate estimation.
rate_sample: RateSample,
}
-impl Rate {
- pub fn on_packet_sent(&mut self, pkt: &mut Sent, now: Instant) {
- if self.delivered_time.is_none() {
- self.delivered_time = Some(now);
+impl Default for Rate {
+ fn default() -> Self {
+ let now = Instant::now();
+
+ Rate {
+ delivered: 0,
+
+ delivered_time: now,
+
+ first_sent_time: now,
+
+ end_of_app_limited: 0,
+
+ last_sent_packet: 0,
+
+ largest_acked: 0,
+
+ rate_sample: RateSample::default(),
}
+ }
+}
- if self.recent_delivered_packet_sent_time.is_none() {
- self.recent_delivered_packet_sent_time = Some(now);
+impl Rate {
+ pub fn on_packet_sent(
+ &mut self, pkt: &mut Sent, bytes_in_flight: usize, now: Instant,
+ ) {
+ // No packets in flight yet?
+ if bytes_in_flight == 0 {
+ self.first_sent_time = now;
+ self.delivered_time = now;
}
+ pkt.first_sent_time = self.first_sent_time;
+ pkt.delivered_time = self.delivered_time;
pkt.delivered = self.delivered;
- pkt.delivered_time = self.delivered_time.unwrap();
-
- pkt.recent_delivered_packet_sent_time =
- self.recent_delivered_packet_sent_time.unwrap();
+ pkt.is_app_limited = self.app_limited();
- pkt.is_app_limited = self.app_limited_at_pkt > 0;
+ self.last_sent_packet = pkt.pkt_num;
}
- pub fn on_packet_acked(&mut self, pkt: &Sent, now: Instant) {
- self.rate_sample.prior_time = Some(pkt.delivered_time);
-
+ // Update the delivery rate sample when a packet is acked.
+ pub fn update_rate_sample(&mut self, pkt: &Acked, now: Instant) {
self.delivered += pkt.size;
- self.delivered_time = Some(now);
+ self.delivered_time = now;
- if pkt.delivered > self.rate_sample.prior_delivered {
+ // Update info using the newest packet. If rate_sample is not yet
+ // initialized, initialize with the first packet.
+ if self.rate_sample.prior_time.is_none() ||
+ pkt.delivered > self.rate_sample.prior_delivered
+ {
self.rate_sample.prior_delivered = pkt.delivered;
-
+ self.rate_sample.prior_time = Some(pkt.delivered_time);
+ self.rate_sample.is_app_limited = pkt.is_app_limited;
self.rate_sample.send_elapsed =
- pkt.time_sent - pkt.recent_delivered_packet_sent_time;
-
+ pkt.time_sent.saturating_duration_since(pkt.first_sent_time);
+ self.rate_sample.rtt = pkt.rtt;
self.rate_sample.ack_elapsed = self
.delivered_time
- .unwrap()
- .duration_since(pkt.delivered_time);
+ .saturating_duration_since(pkt.delivered_time);
- self.recent_delivered_packet_sent_time = Some(pkt.time_sent);
+ self.first_sent_time = pkt.time_sent;
}
- }
- pub fn estimate(&mut self) {
- if (self.app_limited_at_pkt > 0) &&
- (self.delivered > self.app_limited_at_pkt)
- {
- self.app_limited_at_pkt = 0;
- }
+ self.largest_acked = self.largest_acked.max(pkt.pkt_num);
+ }
- match self.rate_sample.prior_time {
- Some(_) => {
- self.rate_sample.delivered =
- self.delivered - self.rate_sample.prior_delivered;
-
- self.rate_sample.interval = cmp::max(
- self.rate_sample.send_elapsed,
- self.rate_sample.ack_elapsed,
- );
- },
- None => return,
+ pub fn generate_rate_sample(&mut self, min_rtt: Duration) {
+ // End app-limited phase if bubble is ACKed and gone.
+ if self.app_limited() && self.largest_acked > self.end_of_app_limited {
+ self.update_app_limited(false);
}
- if self.rate_sample.interval.as_secs_f64() > 0.0 {
- self.rate_sample.delivery_rate = (self.rate_sample.delivered as f64 /
- self.rate_sample.interval.as_secs_f64())
- as u64;
+ if self.rate_sample.prior_time.is_some() {
+ let interval = self
+ .rate_sample
+ .send_elapsed
+ .max(self.rate_sample.ack_elapsed);
+
+ self.rate_sample.delivered =
+ self.delivered - self.rate_sample.prior_delivered;
+ self.rate_sample.interval = interval;
+
+ if interval < min_rtt {
+ self.rate_sample.interval = Duration::ZERO;
+
+ // No reliable sample.
+ return;
+ }
+
+ if !interval.is_zero() {
+ // Fill in rate_sample with a rate sample.
+ self.rate_sample.delivery_rate =
+ (self.rate_sample.delivered as f64 / interval.as_secs_f64())
+ as u64;
+ }
}
}
- pub fn check_app_limited(&mut self, bytes_in_flight: usize) {
- let limited = self.delivered + bytes_in_flight;
- self.app_limited_at_pkt = if limited > 0 { limited } else { 1 };
+ pub fn update_app_limited(&mut self, v: bool) {
+ self.end_of_app_limited = if v { self.last_sent_packet.max(1) } else { 0 }
}
- pub fn delivery_rate(&self) -> u64 {
- self.rate_sample.delivery_rate
+ pub fn app_limited(&mut self) -> bool {
+ self.end_of_app_limited != 0
}
-}
-
-impl std::fmt::Debug for Rate {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "delivered={:?} ", self.delivered)?;
- if let Some(t) = self.delivered_time {
- write!(f, "delivered_time={:?} ", t.elapsed())?;
- }
+ pub fn _delivered(&self) -> usize {
+ self.delivered
+ }
- if let Some(t) = self.recent_delivered_packet_sent_time {
- write!(f, "recent_delivered_packet_sent_time={:?} ", t.elapsed())?;
- }
+ pub fn sample_delivery_rate(&self) -> u64 {
+ self.rate_sample.delivery_rate
+ }
- write!(f, "app_limited_at_pkt={:?} ", self.app_limited_at_pkt)?;
+ pub fn _sample_rtt(&self) -> Duration {
+ self.rate_sample.rtt
+ }
- Ok(())
+ pub fn _sample_is_app_limited(&self) -> bool {
+ self.rate_sample.is_app_limited
}
}
-#[derive(Default)]
+#[derive(Default, Debug)]
struct RateSample {
delivery_rate: u64,
+ is_app_limited: bool,
+
interval: Duration,
delivered: usize,
@@ -159,22 +196,8 @@ struct RateSample {
send_elapsed: Duration,
ack_elapsed: Duration,
-}
-impl std::fmt::Debug for RateSample {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "delivery_rate={:?} ", self.delivery_rate)?;
- write!(f, "interval={:?} ", self.interval)?;
- write!(f, "delivered={:?} ", self.delivered)?;
- write!(f, "prior_delivered={:?} ", self.prior_delivered)?;
- write!(f, "send_elapsed={:?} ", self.send_elapsed)?;
- if let Some(t) = self.prior_time {
- write!(f, "prior_time={:?} ", t.elapsed())?;
- }
- write!(f, "ack_elapsed={:?}", self.ack_elapsed)?;
-
- Ok(())
- }
+ rtt: Duration,
}
#[cfg(test)]
@@ -186,114 +209,163 @@ mod tests {
#[test]
fn rate_check() {
let config = Config::new(0xbabababa).unwrap();
- let mut recovery = Recovery::new(&config);
-
- let mut pkt_1 = Sent {
- pkt_num: 0,
- frames: vec![],
- time_sent: Instant::now(),
- time_acked: None,
- time_lost: None,
- size: 1200,
- ack_eliciting: true,
- in_flight: true,
- delivered: 0,
- delivered_time: Instant::now(),
- recent_delivered_packet_sent_time: Instant::now(),
- is_app_limited: false,
- has_data: false,
- };
-
- recovery
- .delivery_rate
- .on_packet_sent(&mut pkt_1, Instant::now());
- std::thread::sleep(Duration::from_millis(50));
- recovery
- .delivery_rate
- .on_packet_acked(&pkt_1, Instant::now());
-
- let mut pkt_2 = Sent {
- pkt_num: 1,
- frames: vec![],
- time_sent: Instant::now(),
- time_acked: None,
- time_lost: None,
- size: 1200,
- ack_eliciting: true,
- in_flight: true,
- delivered: 0,
- delivered_time: Instant::now(),
- recent_delivered_packet_sent_time: Instant::now(),
- is_app_limited: false,
- has_data: false,
- };
-
- recovery
- .delivery_rate
- .on_packet_sent(&mut pkt_2, Instant::now());
- std::thread::sleep(Duration::from_millis(50));
- recovery
- .delivery_rate
- .on_packet_acked(&pkt_2, Instant::now());
- recovery.delivery_rate.estimate();
-
- assert!(recovery.delivery_rate() > 0);
+ let mut r = Recovery::new(&config);
+
+ let now = Instant::now();
+ let mss = r.max_datagram_size();
+
+ // Send 2 packets.
+ for pn in 0..2 {
+ let pkt = Sent {
+ pkt_num: pn,
+ frames: vec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: mss,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ pkt,
+ packet::EPOCH_APPLICATION,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+ }
+
+ let rtt = Duration::from_millis(50);
+ let now = now + rtt;
+
+ // Ack 2 packets.
+ for pn in 0..2 {
+ let acked = Acked {
+ pkt_num: pn,
+ time_sent: now,
+ size: mss,
+ rtt,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now - rtt,
+ is_app_limited: false,
+ };
+
+ r.delivery_rate.update_rate_sample(&acked, now);
+ }
+
+ // Update rate sample after 1 rtt.
+ r.delivery_rate.generate_rate_sample(rtt);
+
+ // Bytes acked so far.
+ assert_eq!(r.delivery_rate._delivered(), 2400);
+
+ // Estimated delivery rate = (1200 x 2) / 0.05s = 48000.
+ assert_eq!(r.delivery_rate(), 48000);
+ }
+
+ #[test]
+ fn app_limited_cwnd_full() {
+ let config = Config::new(0xbabababa).unwrap();
+ let mut r = Recovery::new(&config);
+
+ let now = Instant::now();
+ let mss = r.max_datagram_size();
+
+ // Send 10 packets to fill cwnd.
+ for pn in 0..10 {
+ let pkt = Sent {
+ pkt_num: pn,
+ frames: vec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: mss,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ pkt,
+ packet::EPOCH_APPLICATION,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+ }
+
+ assert_eq!(r.app_limited(), false);
+ assert_eq!(r.delivery_rate._sample_is_app_limited(), false);
}
#[test]
fn app_limited_check() {
let config = Config::new(0xbabababa).unwrap();
- let mut recvry = Recovery::new(&config);
-
- let mut pkt_1 = Sent {
- pkt_num: 0,
- frames: vec![],
- time_sent: Instant::now(),
- time_acked: None,
- time_lost: None,
- size: 1200,
- ack_eliciting: true,
- in_flight: true,
- delivered: 0,
- delivered_time: Instant::now(),
- recent_delivered_packet_sent_time: Instant::now(),
- is_app_limited: false,
- has_data: false,
- };
-
- recvry
- .delivery_rate
- .on_packet_sent(&mut pkt_1, Instant::now());
- std::thread::sleep(Duration::from_millis(50));
- recvry.delivery_rate.on_packet_acked(&pkt_1, Instant::now());
-
- let mut pkt_2 = Sent {
- pkt_num: 1,
- frames: vec![],
- time_sent: Instant::now(),
- time_acked: None,
- time_lost: None,
- size: 1200,
- ack_eliciting: true,
- in_flight: true,
- delivered: 0,
- delivered_time: Instant::now(),
- recent_delivered_packet_sent_time: Instant::now(),
- is_app_limited: false,
- has_data: false,
- };
-
- recvry.app_limited = true;
- recvry
- .delivery_rate
- .check_app_limited(recvry.bytes_in_flight);
- recvry
- .delivery_rate
- .on_packet_sent(&mut pkt_2, Instant::now());
- std::thread::sleep(Duration::from_millis(50));
- recvry.delivery_rate.on_packet_acked(&pkt_2, Instant::now());
- recvry.delivery_rate.estimate();
-
- assert_eq!(recvry.delivery_rate.app_limited_at_pkt, 0);
+ let mut r = Recovery::new(&config);
+
+ let now = Instant::now();
+ let mss = r.max_datagram_size();
+
+ // Send 5 packets.
+ for pn in 0..5 {
+ let pkt = Sent {
+ pkt_num: pn,
+ frames: vec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: mss,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ pkt,
+ packet::EPOCH_APPLICATION,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+ }
+
+ let rtt = Duration::from_millis(50);
+ let now = now + rtt;
+
+ let mut acked = ranges::RangeSet::default();
+ acked.insert(0..5);
+
+ assert_eq!(
+ r.on_ack_received(
+ &acked,
+ 25,
+ packet::EPOCH_APPLICATION,
+ HandshakeStatus::default(),
+ now,
+ "",
+ ),
+ Ok(()),
+ );
+
+ assert_eq!(r.app_limited(), true);
+
+ // Rate sample is not app limited (all acked).
+ assert_eq!(r.delivery_rate._sample_is_app_limited(), false);
+ assert_eq!(r.delivery_rate._sample_rtt(), rtt);
}
}