diff options
Diffstat (limited to 'src/storage')
-rw-r--r-- | src/storage/assembler.rs | 750 | ||||
-rw-r--r-- | src/storage/mod.rs | 31 | ||||
-rw-r--r-- | src/storage/packet_buffer.rs | 402 | ||||
-rw-r--r-- | src/storage/ring_buffer.rs | 803 |
4 files changed, 1986 insertions, 0 deletions
diff --git a/src/storage/assembler.rs b/src/storage/assembler.rs new file mode 100644 index 0000000..365a1e0 --- /dev/null +++ b/src/storage/assembler.rs @@ -0,0 +1,750 @@ +use core::fmt; + +use crate::config::ASSEMBLER_MAX_SEGMENT_COUNT; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TooManyHolesError; + +impl fmt::Display for TooManyHolesError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "too many holes") + } +} + +#[cfg(feature = "std")] +impl std::error::Error for TooManyHolesError {} + +/// A contiguous chunk of absent data, followed by a contiguous chunk of present data. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct Contig { + hole_size: usize, + data_size: usize, +} + +impl fmt::Display for Contig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.has_hole() { + write!(f, "({})", self.hole_size)?; + } + if self.has_hole() && self.has_data() { + write!(f, " ")?; + } + if self.has_data() { + write!(f, "{}", self.data_size)?; + } + Ok(()) + } +} + +#[cfg(feature = "defmt")] +impl defmt::Format for Contig { + fn format(&self, fmt: defmt::Formatter) { + if self.has_hole() { + defmt::write!(fmt, "({})", self.hole_size); + } + if self.has_hole() && self.has_data() { + defmt::write!(fmt, " "); + } + if self.has_data() { + defmt::write!(fmt, "{}", self.data_size); + } + } +} + +impl Contig { + const fn empty() -> Contig { + Contig { + hole_size: 0, + data_size: 0, + } + } + + fn hole_and_data(hole_size: usize, data_size: usize) -> Contig { + Contig { + hole_size, + data_size, + } + } + + fn has_hole(&self) -> bool { + self.hole_size != 0 + } + + fn has_data(&self) -> bool { + self.data_size != 0 + } + + fn total_size(&self) -> usize { + self.hole_size + self.data_size + } + + fn shrink_hole_by(&mut self, size: usize) { + self.hole_size -= size; + } + + fn shrink_hole_to(&mut self, size: usize) { + debug_assert!(self.hole_size >= size); + + let total_size = self.total_size(); + self.hole_size = size; + self.data_size = total_size - size; + } +} + +/// A buffer (re)assembler. +/// +/// Currently, up to a hardcoded limit of 4 or 32 holes can be tracked in the buffer. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Assembler { + contigs: [Contig; ASSEMBLER_MAX_SEGMENT_COUNT], +} + +impl fmt::Display for Assembler { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "[ ")?; + for contig in self.contigs.iter() { + if !contig.has_data() { + break; + } + write!(f, "{contig} ")?; + } + write!(f, "]")?; + Ok(()) + } +} + +#[cfg(feature = "defmt")] +impl defmt::Format for Assembler { + fn format(&self, fmt: defmt::Formatter) { + defmt::write!(fmt, "[ "); + for contig in self.contigs.iter() { + if !contig.has_data() { + break; + } + defmt::write!(fmt, "{} ", contig); + } + defmt::write!(fmt, "]"); + } +} + +// Invariant on Assembler::contigs: +// - There's an index `i` where all contigs before have data, and all contigs after don't (are unused). +// - All contigs with data must have hole_size != 0, except the first. + +impl Assembler { + /// Create a new buffer assembler. + pub const fn new() -> Assembler { + const EMPTY: Contig = Contig::empty(); + Assembler { + contigs: [EMPTY; ASSEMBLER_MAX_SEGMENT_COUNT], + } + } + + pub fn clear(&mut self) { + self.contigs.fill(Contig::empty()); + } + + fn front(&self) -> Contig { + self.contigs[0] + } + + /// Return length of the front contiguous range without removing it from the assembler + pub fn peek_front(&self) -> usize { + let front = self.front(); + if front.has_hole() { + 0 + } else { + front.data_size + } + } + + fn back(&self) -> Contig { + self.contigs[self.contigs.len() - 1] + } + + /// Return whether the assembler contains no data. + pub fn is_empty(&self) -> bool { + !self.front().has_data() + } + + /// Remove a contig at the given index. + fn remove_contig_at(&mut self, at: usize) { + debug_assert!(self.contigs[at].has_data()); + + for i in at..self.contigs.len() - 1 { + if !self.contigs[i].has_data() { + return; + } + self.contigs[i] = self.contigs[i + 1]; + } + + // Removing the last one. + self.contigs[self.contigs.len() - 1] = Contig::empty(); + } + + /// Add a contig at the given index, and return a pointer to it. + fn add_contig_at(&mut self, at: usize) -> Result<&mut Contig, TooManyHolesError> { + if self.back().has_data() { + return Err(TooManyHolesError); + } + + for i in (at + 1..self.contigs.len()).rev() { + self.contigs[i] = self.contigs[i - 1]; + } + + self.contigs[at] = Contig::empty(); + Ok(&mut self.contigs[at]) + } + + /// Add a new contiguous range to the assembler, + /// or return `Err(TooManyHolesError)` if too many discontinuities are already recorded. + pub fn add(&mut self, mut offset: usize, size: usize) -> Result<(), TooManyHolesError> { + if size == 0 { + return Ok(()); + } + + let mut i = 0; + + // Find index of the contig containing the start of the range. + loop { + if i == self.contigs.len() { + // The new range is after all the previous ranges, but there/s no space to add it. + return Err(TooManyHolesError); + } + let contig = &mut self.contigs[i]; + if !contig.has_data() { + // The new range is after all the previous ranges. Add it. + *contig = Contig::hole_and_data(offset, size); + return Ok(()); + } + if offset <= contig.total_size() { + break; + } + offset -= contig.total_size(); + i += 1; + } + + let contig = &mut self.contigs[i]; + if offset < contig.hole_size { + // Range starts within the hole. + + if offset + size < contig.hole_size { + // Range also ends within the hole. + let new_contig = self.add_contig_at(i)?; + new_contig.hole_size = offset; + new_contig.data_size = size; + + // Previous contigs[index] got moved to contigs[index+1] + self.contigs[i + 1].shrink_hole_by(offset + size); + return Ok(()); + } + + // The range being added covers both a part of the hole and a part of the data + // in this contig, shrink the hole in this contig. + contig.shrink_hole_to(offset); + } + + // coalesce contigs to the right. + let mut j = i + 1; + while j < self.contigs.len() + && self.contigs[j].has_data() + && offset + size >= self.contigs[i].total_size() + self.contigs[j].hole_size + { + self.contigs[i].data_size += self.contigs[j].total_size(); + j += 1; + } + let shift = j - i - 1; + if shift != 0 { + for x in i + 1..self.contigs.len() { + if !self.contigs[x].has_data() { + break; + } + + self.contigs[x] = self + .contigs + .get(x + shift) + .copied() + .unwrap_or_else(Contig::empty); + } + } + + if offset + size > self.contigs[i].total_size() { + // The added range still extends beyond the current contig. Increase data size. + let left = offset + size - self.contigs[i].total_size(); + self.contigs[i].data_size += left; + + // Decrease hole size of the next, if any. + if i + 1 < self.contigs.len() && self.contigs[i + 1].has_data() { + self.contigs[i + 1].hole_size -= left; + } + } + + Ok(()) + } + + /// Remove a contiguous range from the front of the assembler. + /// If no such range, return 0. + pub fn remove_front(&mut self) -> usize { + let front = self.front(); + if front.has_hole() || !front.has_data() { + 0 + } else { + self.remove_contig_at(0); + debug_assert!(front.data_size > 0); + front.data_size + } + } + + /// Add a segment, then remove_front. + /// + /// This is equivalent to calling `add` then `remove_front` individually, + /// except it's guaranteed to not fail when offset = 0. + /// This is required for TCP: we must never drop the next expected segment, or + /// the protocol might get stuck. + pub fn add_then_remove_front( + &mut self, + offset: usize, + size: usize, + ) -> Result<usize, TooManyHolesError> { + // This is the only case where a segment at offset=0 would cause the + // total amount of contigs to rise (and therefore can potentially cause + // a TooManyHolesError). Handle it in a way that is guaranteed to succeed. + if offset == 0 && size < self.contigs[0].hole_size { + self.contigs[0].hole_size -= size; + return Ok(size); + } + + self.add(offset, size)?; + Ok(self.remove_front()) + } + + /// Iterate over all of the contiguous data ranges. + /// + /// This is used in calculating what data ranges have been received. The offset indicates the + /// number of bytes of contiguous data received before the beginnings of this Assembler. + /// + /// Data Hole Data + /// |--- 100 ---|--- 200 ---|--- 100 ---| + /// + /// An offset of 1500 would return the ranges: ``(1500, 1600), (1800, 1900)`` + pub fn iter_data(&self, first_offset: usize) -> AssemblerIter { + AssemblerIter::new(self, first_offset) + } +} + +pub struct AssemblerIter<'a> { + assembler: &'a Assembler, + offset: usize, + index: usize, + left: usize, + right: usize, +} + +impl<'a> AssemblerIter<'a> { + fn new(assembler: &'a Assembler, offset: usize) -> AssemblerIter<'a> { + AssemblerIter { + assembler, + offset, + index: 0, + left: 0, + right: 0, + } + } +} + +impl<'a> Iterator for AssemblerIter<'a> { + type Item = (usize, usize); + + fn next(&mut self) -> Option<(usize, usize)> { + let mut data_range = None; + while data_range.is_none() && self.index < self.assembler.contigs.len() { + let contig = self.assembler.contigs[self.index]; + self.left += contig.hole_size; + self.right = self.left + contig.data_size; + data_range = if self.left < self.right { + let data_range = (self.left + self.offset, self.right + self.offset); + self.left = self.right; + Some(data_range) + } else { + None + }; + self.index += 1; + } + data_range + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::vec::Vec; + + impl From<Vec<(usize, usize)>> for Assembler { + fn from(vec: Vec<(usize, usize)>) -> Assembler { + const EMPTY: Contig = Contig::empty(); + + let mut contigs = [EMPTY; ASSEMBLER_MAX_SEGMENT_COUNT]; + for (i, &(hole_size, data_size)) in vec.iter().enumerate() { + contigs[i] = Contig { + hole_size, + data_size, + }; + } + Assembler { contigs } + } + } + + macro_rules! contigs { + [$( $x:expr ),*] => ({ + Assembler::from(vec![$( $x ),*]) + }) + } + + #[test] + fn test_new() { + let assr = Assembler::new(); + assert_eq!(assr, contigs![]); + } + + #[test] + fn test_empty_add_full() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(0, 16), Ok(())); + assert_eq!(assr, contigs![(0, 16)]); + } + + #[test] + fn test_empty_add_front() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(0, 4), Ok(())); + assert_eq!(assr, contigs![(0, 4)]); + } + + #[test] + fn test_empty_add_back() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(12, 4), Ok(())); + assert_eq!(assr, contigs![(12, 4)]); + } + + #[test] + fn test_empty_add_mid() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(4, 8), Ok(())); + assert_eq!(assr, contigs![(4, 8)]); + } + + #[test] + fn test_partial_add_front() { + let mut assr = contigs![(4, 8)]; + assert_eq!(assr.add(0, 4), Ok(())); + assert_eq!(assr, contigs![(0, 12)]); + } + + #[test] + fn test_partial_add_back() { + let mut assr = contigs![(4, 8)]; + assert_eq!(assr.add(12, 4), Ok(())); + assert_eq!(assr, contigs![(4, 12)]); + } + + #[test] + fn test_partial_add_front_overlap() { + let mut assr = contigs![(4, 8)]; + assert_eq!(assr.add(0, 8), Ok(())); + assert_eq!(assr, contigs![(0, 12)]); + } + + #[test] + fn test_partial_add_front_overlap_split() { + let mut assr = contigs![(4, 8)]; + assert_eq!(assr.add(2, 6), Ok(())); + assert_eq!(assr, contigs![(2, 10)]); + } + + #[test] + fn test_partial_add_back_overlap() { + let mut assr = contigs![(4, 8)]; + assert_eq!(assr.add(8, 8), Ok(())); + assert_eq!(assr, contigs![(4, 12)]); + } + + #[test] + fn test_partial_add_back_overlap_split() { + let mut assr = contigs![(4, 8)]; + assert_eq!(assr.add(10, 4), Ok(())); + assert_eq!(assr, contigs![(4, 10)]); + } + + #[test] + fn test_partial_add_both_overlap() { + let mut assr = contigs![(4, 8)]; + assert_eq!(assr.add(0, 16), Ok(())); + assert_eq!(assr, contigs![(0, 16)]); + } + + #[test] + fn test_partial_add_both_overlap_split() { + let mut assr = contigs![(4, 8)]; + assert_eq!(assr.add(2, 12), Ok(())); + assert_eq!(assr, contigs![(2, 12)]); + } + + #[test] + fn test_rejected_add_keeps_state() { + let mut assr = Assembler::new(); + for c in 1..=ASSEMBLER_MAX_SEGMENT_COUNT { + assert_eq!(assr.add(c * 10, 3), Ok(())); + } + // Maximum of allowed holes is reached + let assr_before = assr.clone(); + assert_eq!(assr.add(1, 3), Err(TooManyHolesError)); + assert_eq!(assr_before, assr); + } + + #[test] + fn test_empty_remove_front() { + let mut assr = contigs![]; + assert_eq!(assr.remove_front(), 0); + } + + #[test] + fn test_trailing_hole_remove_front() { + let mut assr = contigs![(0, 4)]; + assert_eq!(assr.remove_front(), 4); + assert_eq!(assr, contigs![]); + } + + #[test] + fn test_trailing_data_remove_front() { + let mut assr = contigs![(0, 4), (4, 4)]; + assert_eq!(assr.remove_front(), 4); + assert_eq!(assr, contigs![(4, 4)]); + } + + #[test] + fn test_boundary_case_remove_front() { + let mut vec = vec![(1, 1); ASSEMBLER_MAX_SEGMENT_COUNT]; + vec[0] = (0, 2); + let mut assr = Assembler::from(vec); + assert_eq!(assr.remove_front(), 2); + let mut vec = vec![(1, 1); ASSEMBLER_MAX_SEGMENT_COUNT]; + vec[ASSEMBLER_MAX_SEGMENT_COUNT - 1] = (0, 0); + let exp_assr = Assembler::from(vec); + assert_eq!(assr, exp_assr); + } + + #[test] + fn test_shrink_next_hole() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(100, 10), Ok(())); + assert_eq!(assr.add(50, 10), Ok(())); + assert_eq!(assr.add(40, 30), Ok(())); + assert_eq!(assr, contigs![(40, 30), (30, 10)]); + } + + #[test] + fn test_join_two() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(10, 10), Ok(())); + assert_eq!(assr.add(50, 10), Ok(())); + assert_eq!(assr.add(15, 40), Ok(())); + assert_eq!(assr, contigs![(10, 50)]); + } + + #[test] + fn test_join_two_reversed() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(50, 10), Ok(())); + assert_eq!(assr.add(10, 10), Ok(())); + assert_eq!(assr.add(15, 40), Ok(())); + assert_eq!(assr, contigs![(10, 50)]); + } + + #[test] + fn test_join_two_overlong() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(50, 10), Ok(())); + assert_eq!(assr.add(10, 10), Ok(())); + assert_eq!(assr.add(15, 60), Ok(())); + assert_eq!(assr, contigs![(10, 65)]); + } + + #[test] + fn test_iter_empty() { + let assr = Assembler::new(); + let segments: Vec<_> = assr.iter_data(10).collect(); + assert_eq!(segments, vec![]); + } + + #[test] + fn test_iter_full() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(0, 16), Ok(())); + let segments: Vec<_> = assr.iter_data(10).collect(); + assert_eq!(segments, vec![(10, 26)]); + } + + #[test] + fn test_iter_offset() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(0, 16), Ok(())); + let segments: Vec<_> = assr.iter_data(100).collect(); + assert_eq!(segments, vec![(100, 116)]); + } + + #[test] + fn test_iter_one_front() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(0, 4), Ok(())); + let segments: Vec<_> = assr.iter_data(10).collect(); + assert_eq!(segments, vec![(10, 14)]); + } + + #[test] + fn test_iter_one_back() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(12, 4), Ok(())); + let segments: Vec<_> = assr.iter_data(10).collect(); + assert_eq!(segments, vec![(22, 26)]); + } + + #[test] + fn test_iter_one_mid() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(4, 8), Ok(())); + let segments: Vec<_> = assr.iter_data(10).collect(); + assert_eq!(segments, vec![(14, 22)]); + } + + #[test] + fn test_iter_one_trailing_gap() { + let assr = contigs![(4, 8)]; + let segments: Vec<_> = assr.iter_data(100).collect(); + assert_eq!(segments, vec![(104, 112)]); + } + + #[test] + fn test_iter_two_split() { + let assr = contigs![(2, 6), (4, 1)]; + let segments: Vec<_> = assr.iter_data(100).collect(); + assert_eq!(segments, vec![(102, 108), (112, 113)]); + } + + #[test] + fn test_iter_three_split() { + let assr = contigs![(2, 6), (2, 1), (2, 2)]; + let segments: Vec<_> = assr.iter_data(100).collect(); + assert_eq!(segments, vec![(102, 108), (110, 111), (113, 115)]); + } + + #[test] + fn test_issue_694() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(0, 1), Ok(())); + assert_eq!(assr.add(2, 1), Ok(())); + assert_eq!(assr.add(1, 1), Ok(())); + } + + #[test] + fn test_add_then_remove_front() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(50, 10), Ok(())); + assert_eq!(assr.add_then_remove_front(10, 10), Ok(0)); + assert_eq!(assr, contigs![(10, 10), (30, 10)]); + } + + #[test] + fn test_add_then_remove_front_at_front() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(50, 10), Ok(())); + assert_eq!(assr.add_then_remove_front(0, 10), Ok(10)); + assert_eq!(assr, contigs![(40, 10)]); + } + + #[test] + fn test_add_then_remove_front_at_front_touch() { + let mut assr = Assembler::new(); + assert_eq!(assr.add(50, 10), Ok(())); + assert_eq!(assr.add_then_remove_front(0, 50), Ok(60)); + assert_eq!(assr, contigs![]); + } + + #[test] + fn test_add_then_remove_front_at_front_full() { + let mut assr = Assembler::new(); + for c in 1..=ASSEMBLER_MAX_SEGMENT_COUNT { + assert_eq!(assr.add(c * 10, 3), Ok(())); + } + // Maximum of allowed holes is reached + let assr_before = assr.clone(); + assert_eq!(assr.add_then_remove_front(1, 3), Err(TooManyHolesError)); + assert_eq!(assr_before, assr); + } + + #[test] + fn test_add_then_remove_front_at_front_full_offset_0() { + let mut assr = Assembler::new(); + for c in 1..=ASSEMBLER_MAX_SEGMENT_COUNT { + assert_eq!(assr.add(c * 10, 3), Ok(())); + } + assert_eq!(assr.add_then_remove_front(0, 3), Ok(3)); + } + + // Test against an obviously-correct but inefficient bitmap impl. + #[test] + fn test_random() { + use rand::Rng; + + const MAX_INDEX: usize = 256; + + for max_size in [2, 5, 10, 100] { + for _ in 0..300 { + //println!("==="); + let mut assr = Assembler::new(); + let mut map = [false; MAX_INDEX]; + + for _ in 0..60 { + let offset = rand::thread_rng().gen_range(0..MAX_INDEX - max_size - 1); + let size = rand::thread_rng().gen_range(1..=max_size); + + //println!("add {}..{} {}", offset, offset + size, size); + // Real impl + let res = assr.add(offset, size); + + // Bitmap impl + let mut map2 = map; + map2[offset..][..size].fill(true); + + let mut contigs = vec![]; + let mut hole: usize = 0; + let mut data: usize = 0; + for b in map2 { + if b { + data += 1; + } else { + if data != 0 { + contigs.push((hole, data)); + hole = 0; + data = 0; + } + hole += 1; + } + } + + // Compare. + let wanted_res = if contigs.len() > ASSEMBLER_MAX_SEGMENT_COUNT { + Err(TooManyHolesError) + } else { + Ok(()) + }; + assert_eq!(res, wanted_res); + if res.is_ok() { + map = map2; + assert_eq!(assr, Assembler::from(contigs)); + } + } + } + } + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 0000000..b03de71 --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,31 @@ +/*! Specialized containers. + +The `storage` module provides containers for use in other modules. +The containers support both pre-allocated memory, without the `std` +or `alloc` crates being available, and heap-allocated memory. +*/ + +mod assembler; +mod packet_buffer; +mod ring_buffer; + +pub use self::assembler::Assembler; +pub use self::packet_buffer::{PacketBuffer, PacketMetadata}; +pub use self::ring_buffer::RingBuffer; + +/// A trait for setting a value to a known state. +/// +/// In-place analog of Default. +pub trait Resettable { + fn reset(&mut self); +} + +/// Error returned when enqueuing into a full buffer. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct Full; + +/// Error returned when dequeuing from an empty buffer. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct Empty; diff --git a/src/storage/packet_buffer.rs b/src/storage/packet_buffer.rs new file mode 100644 index 0000000..28119fa --- /dev/null +++ b/src/storage/packet_buffer.rs @@ -0,0 +1,402 @@ +use managed::ManagedSlice; + +use crate::storage::{Full, RingBuffer}; + +use super::Empty; + +/// Size and header of a packet. +#[derive(Debug, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct PacketMetadata<H> { + size: usize, + header: Option<H>, +} + +impl<H> PacketMetadata<H> { + /// Empty packet description. + pub const EMPTY: PacketMetadata<H> = PacketMetadata { + size: 0, + header: None, + }; + + fn padding(size: usize) -> PacketMetadata<H> { + PacketMetadata { + size: size, + header: None, + } + } + + fn packet(size: usize, header: H) -> PacketMetadata<H> { + PacketMetadata { + size: size, + header: Some(header), + } + } + + fn is_padding(&self) -> bool { + self.header.is_none() + } +} + +/// An UDP packet ring buffer. +#[derive(Debug)] +pub struct PacketBuffer<'a, H: 'a> { + metadata_ring: RingBuffer<'a, PacketMetadata<H>>, + payload_ring: RingBuffer<'a, u8>, +} + +impl<'a, H> PacketBuffer<'a, H> { + /// Create a new packet buffer with the provided metadata and payload storage. + /// + /// Metadata storage limits the maximum _number_ of packets in the buffer and payload + /// storage limits the maximum _total size_ of packets. + pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H> + where + MS: Into<ManagedSlice<'a, PacketMetadata<H>>>, + PS: Into<ManagedSlice<'a, u8>>, + { + PacketBuffer { + metadata_ring: RingBuffer::new(metadata_storage), + payload_ring: RingBuffer::new(payload_storage), + } + } + + /// Query whether the buffer is empty. + pub fn is_empty(&self) -> bool { + self.metadata_ring.is_empty() + } + + /// Query whether the buffer is full. + pub fn is_full(&self) -> bool { + self.metadata_ring.is_full() + } + + // There is currently no enqueue_with() because of the complexity of managing padding + // in case of failure. + + /// Enqueue a single packet with the given header into the buffer, and + /// return a reference to its payload, or return `Err(Full)` + /// if the buffer is full. + pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full> { + if self.payload_ring.capacity() < size || self.metadata_ring.is_full() { + return Err(Full); + } + + // Ring is currently empty. Clear it (resetting `read_at`) to maximize + // for contiguous space. + if self.payload_ring.is_empty() { + self.payload_ring.clear(); + } + + let window = self.payload_ring.window(); + let contig_window = self.payload_ring.contiguous_window(); + + if window < size { + return Err(Full); + } else if contig_window < size { + if window - contig_window < size { + // The buffer length is larger than the current contiguous window + // and is larger than the contiguous window will be after adding + // the padding necessary to circle around to the beginning of the + // ring buffer. + return Err(Full); + } else { + // Add padding to the end of the ring buffer so that the + // contiguous window is at the beginning of the ring buffer. + *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window); + // note(discard): function does not write to the result + // enqueued padding buffer location + let _buf_enqueued = self.payload_ring.enqueue_many(contig_window); + } + } + + *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header); + + let payload_buf = self.payload_ring.enqueue_many(size); + debug_assert!(payload_buf.len() == size); + Ok(payload_buf) + } + + /// Call `f` with a packet from the buffer large enough to fit `max_size` bytes. The packet + /// is shrunk to the size returned from `f` and enqueued into the buffer. + pub fn enqueue_with_infallible<'b, F>( + &'b mut self, + max_size: usize, + header: H, + f: F, + ) -> Result<usize, Full> + where + F: FnOnce(&'b mut [u8]) -> usize, + { + if self.payload_ring.capacity() < max_size || self.metadata_ring.is_full() { + return Err(Full); + } + + let window = self.payload_ring.window(); + let contig_window = self.payload_ring.contiguous_window(); + + if window < max_size { + return Err(Full); + } else if contig_window < max_size { + if window - contig_window < max_size { + // The buffer length is larger than the current contiguous window + // and is larger than the contiguous window will be after adding + // the padding necessary to circle around to the beginning of the + // ring buffer. + return Err(Full); + } else { + // Add padding to the end of the ring buffer so that the + // contiguous window is at the beginning of the ring buffer. + *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window); + // note(discard): function does not write to the result + // enqueued padding buffer location + let _buf_enqueued = self.payload_ring.enqueue_many(contig_window); + } + } + + let (size, _) = self + .payload_ring + .enqueue_many_with(|data| (f(&mut data[..max_size]), ())); + + *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header); + + Ok(size) + } + + fn dequeue_padding(&mut self) { + let _ = self.metadata_ring.dequeue_one_with(|metadata| { + if metadata.is_padding() { + // note(discard): function does not use value of dequeued padding bytes + let _buf_dequeued = self.payload_ring.dequeue_many(metadata.size); + Ok(()) // dequeue metadata + } else { + Err(()) // don't dequeue metadata + } + }); + } + + /// Call `f` with a single packet from the buffer, and dequeue the packet if `f` + /// returns successfully, or return `Err(EmptyError)` if the buffer is empty. + pub fn dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty> + where + F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>, + { + self.dequeue_padding(); + + self.metadata_ring.dequeue_one_with(|metadata| { + self.payload_ring + .dequeue_many_with(|payload_buf| { + debug_assert!(payload_buf.len() >= metadata.size); + + match f( + metadata.header.as_mut().unwrap(), + &mut payload_buf[..metadata.size], + ) { + Ok(val) => (metadata.size, Ok(val)), + Err(err) => (0, Err(err)), + } + }) + .1 + }) + } + + /// Dequeue a single packet from the buffer, and return a reference to its payload + /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty. + pub fn dequeue(&mut self) -> Result<(H, &mut [u8]), Empty> { + self.dequeue_padding(); + + let meta = self.metadata_ring.dequeue_one()?; + + let payload_buf = self.payload_ring.dequeue_many(meta.size); + debug_assert!(payload_buf.len() == meta.size); + Ok((meta.header.take().unwrap(), payload_buf)) + } + + /// Peek at a single packet from the buffer without removing it, and return a reference to + /// its payload as well as its header, or return `Err(Error:Exhausted)` if the buffer is empty. + /// + /// This function otherwise behaves identically to [dequeue](#method.dequeue). + pub fn peek(&mut self) -> Result<(&H, &[u8]), Empty> { + self.dequeue_padding(); + + if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() { + Ok(( + metadata.header.as_ref().unwrap(), + self.payload_ring.get_allocated(0, metadata.size), + )) + } else { + Err(Empty) + } + } + + /// Return the maximum number packets that can be stored. + pub fn packet_capacity(&self) -> usize { + self.metadata_ring.capacity() + } + + /// Return the maximum number of bytes in the payload ring buffer. + pub fn payload_capacity(&self) -> usize { + self.payload_ring.capacity() + } + + /// Reset the packet buffer and clear any staged. + #[allow(unused)] + pub(crate) fn reset(&mut self) { + self.payload_ring.clear(); + self.metadata_ring.clear(); + } +} + +#[cfg(test)] +mod test { + use super::*; + + fn buffer() -> PacketBuffer<'static, ()> { + PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16]) + } + + #[test] + fn test_simple() { + let mut buffer = buffer(); + buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef"); + assert_eq!(buffer.enqueue(16, ()), Err(Full)); + assert_eq!(buffer.metadata_ring.len(), 1); + assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]); + assert_eq!(buffer.dequeue(), Err(Empty)); + } + + #[test] + fn test_peek() { + let mut buffer = buffer(); + assert_eq!(buffer.peek(), Err(Empty)); + buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef"); + assert_eq!(buffer.metadata_ring.len(), 1); + assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]); + assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]); + assert_eq!(buffer.peek(), Err(Empty)); + } + + #[test] + fn test_padding() { + let mut buffer = buffer(); + assert!(buffer.enqueue(6, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd"); + assert_eq!(buffer.metadata_ring.len(), 3); + assert!(buffer.dequeue().is_ok()); + + assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]); + assert_eq!(buffer.metadata_ring.len(), 0); + } + + #[test] + fn test_padding_with_large_payload() { + let mut buffer = buffer(); + assert!(buffer.enqueue(12, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + buffer + .enqueue(12, ()) + .unwrap() + .copy_from_slice(b"abcdefghijkl"); + } + + #[test] + fn test_dequeue_with() { + let mut buffer = buffer(); + assert!(buffer.enqueue(6, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd"); + assert_eq!(buffer.metadata_ring.len(), 3); + assert!(buffer.dequeue().is_ok()); + + assert!(matches!( + buffer.dequeue_with(|_, _| Result::<(), u32>::Err(123)), + Ok(Err(_)) + )); + assert_eq!(buffer.metadata_ring.len(), 1); + + assert!(buffer + .dequeue_with(|&mut (), payload| { + assert_eq!(payload, &b"abcd"[..]); + Result::<(), ()>::Ok(()) + }) + .is_ok()); + assert_eq!(buffer.metadata_ring.len(), 0); + } + + #[test] + fn test_metadata_full_empty() { + let mut buffer = buffer(); + assert!(buffer.is_empty()); + assert!(!buffer.is_full()); + assert!(buffer.enqueue(1, ()).is_ok()); + assert!(!buffer.is_empty()); + assert!(buffer.enqueue(1, ()).is_ok()); + assert!(buffer.enqueue(1, ()).is_ok()); + assert!(!buffer.is_full()); + assert!(!buffer.is_empty()); + assert!(buffer.enqueue(1, ()).is_ok()); + assert!(buffer.is_full()); + assert!(!buffer.is_empty()); + assert_eq!(buffer.metadata_ring.len(), 4); + assert_eq!(buffer.enqueue(1, ()), Err(Full)); + } + + #[test] + fn test_window_too_small() { + let mut buffer = buffer(); + assert!(buffer.enqueue(4, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert_eq!(buffer.enqueue(16, ()), Err(Full)); + assert_eq!(buffer.metadata_ring.len(), 1); + } + + #[test] + fn test_contiguous_window_too_small() { + let mut buffer = buffer(); + assert!(buffer.enqueue(4, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert_eq!(buffer.enqueue(8, ()), Err(Full)); + assert_eq!(buffer.metadata_ring.len(), 1); + } + + #[test] + fn test_contiguous_window_wrap() { + let mut buffer = buffer(); + assert!(buffer.enqueue(15, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert!(buffer.enqueue(16, ()).is_ok()); + } + + #[test] + fn test_capacity_too_small() { + let mut buffer = buffer(); + assert_eq!(buffer.enqueue(32, ()), Err(Full)); + } + + #[test] + fn test_contig_window_prioritized() { + let mut buffer = buffer(); + assert!(buffer.enqueue(4, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert!(buffer.enqueue(5, ()).is_ok()); + } + + #[test] + fn clear() { + let mut buffer = buffer(); + + // Ensure enqueuing data in the buffer fills it somewhat. + assert!(buffer.is_empty()); + assert!(buffer.enqueue(6, ()).is_ok()); + + // Ensure that resetting the buffer causes it to be empty. + assert!(!buffer.is_empty()); + buffer.reset(); + assert!(buffer.is_empty()); + } +} diff --git a/src/storage/ring_buffer.rs b/src/storage/ring_buffer.rs new file mode 100644 index 0000000..7d461b6 --- /dev/null +++ b/src/storage/ring_buffer.rs @@ -0,0 +1,803 @@ +// Some of the functions in ring buffer is marked as #[must_use]. It notes that +// these functions may have side effects, and it's implemented by [RFC 1940]. +// [RFC 1940]: https://github.com/rust-lang/rust/issues/43302 + +use core::cmp; +use managed::ManagedSlice; + +use crate::storage::Resettable; + +use super::{Empty, Full}; + +/// A ring buffer. +/// +/// This ring buffer implementation provides many ways to interact with it: +/// +/// * Enqueueing or dequeueing one element from corresponding side of the buffer; +/// * Enqueueing or dequeueing a slice of elements from corresponding side of the buffer; +/// * Accessing allocated and unallocated areas directly. +/// +/// It is also zero-copy; all methods provide references into the buffer's storage. +/// Note that all references are mutable; it is considered more important to allow +/// in-place processing than to protect from accidental mutation. +/// +/// This implementation is suitable for both simple uses such as a FIFO queue +/// of UDP packets, and advanced ones such as a TCP reassembly buffer. +#[derive(Debug)] +pub struct RingBuffer<'a, T: 'a> { + storage: ManagedSlice<'a, T>, + read_at: usize, + length: usize, +} + +impl<'a, T: 'a> RingBuffer<'a, T> { + /// Create a ring buffer with the given storage. + /// + /// During creation, every element in `storage` is reset. + pub fn new<S>(storage: S) -> RingBuffer<'a, T> + where + S: Into<ManagedSlice<'a, T>>, + { + RingBuffer { + storage: storage.into(), + read_at: 0, + length: 0, + } + } + + /// Clear the ring buffer. + pub fn clear(&mut self) { + self.read_at = 0; + self.length = 0; + } + + /// Return the maximum number of elements in the ring buffer. + pub fn capacity(&self) -> usize { + self.storage.len() + } + + /// Clear the ring buffer, and reset every element. + pub fn reset(&mut self) + where + T: Resettable, + { + self.clear(); + for elem in self.storage.iter_mut() { + elem.reset(); + } + } + + /// Return the current number of elements in the ring buffer. + pub fn len(&self) -> usize { + self.length + } + + /// Return the number of elements that can be added to the ring buffer. + pub fn window(&self) -> usize { + self.capacity() - self.len() + } + + /// Return the largest number of elements that can be added to the buffer + /// without wrapping around (i.e. in a single `enqueue_many` call). + pub fn contiguous_window(&self) -> usize { + cmp::min(self.window(), self.capacity() - self.get_idx(self.length)) + } + + /// Query whether the buffer is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Query whether the buffer is full. + pub fn is_full(&self) -> bool { + self.window() == 0 + } + + /// Shorthand for `(self.read + idx) % self.capacity()` with an + /// additional check to ensure that the capacity is not zero. + fn get_idx(&self, idx: usize) -> usize { + let len = self.capacity(); + if len > 0 { + (self.read_at + idx) % len + } else { + 0 + } + } + + /// Shorthand for `(self.read + idx) % self.capacity()` with no + /// additional checks to ensure the capacity is not zero. + fn get_idx_unchecked(&self, idx: usize) -> usize { + (self.read_at + idx) % self.capacity() + } +} + +/// This is the "discrete" ring buffer interface: it operates with single elements, +/// and boundary conditions (empty/full) are errors. +impl<'a, T: 'a> RingBuffer<'a, T> { + /// Call `f` with a single buffer element, and enqueue the element if `f` + /// returns successfully, or return `Err(Full)` if the buffer is full. + pub fn enqueue_one_with<'b, R, E, F>(&'b mut self, f: F) -> Result<Result<R, E>, Full> + where + F: FnOnce(&'b mut T) -> Result<R, E>, + { + if self.is_full() { + return Err(Full); + } + + let index = self.get_idx_unchecked(self.length); + let res = f(&mut self.storage[index]); + if res.is_ok() { + self.length += 1; + } + Ok(res) + } + + /// Enqueue a single element into the buffer, and return a reference to it, + /// or return `Err(Full)` if the buffer is full. + /// + /// This function is a shortcut for `ring_buf.enqueue_one_with(Ok)`. + pub fn enqueue_one(&mut self) -> Result<&mut T, Full> { + self.enqueue_one_with(Ok)? + } + + /// Call `f` with a single buffer element, and dequeue the element if `f` + /// returns successfully, or return `Err(Empty)` if the buffer is empty. + pub fn dequeue_one_with<'b, R, E, F>(&'b mut self, f: F) -> Result<Result<R, E>, Empty> + where + F: FnOnce(&'b mut T) -> Result<R, E>, + { + if self.is_empty() { + return Err(Empty); + } + + let next_at = self.get_idx_unchecked(1); + let res = f(&mut self.storage[self.read_at]); + + if res.is_ok() { + self.length -= 1; + self.read_at = next_at; + } + Ok(res) + } + + /// Dequeue an element from the buffer, and return a reference to it, + /// or return `Err(Empty)` if the buffer is empty. + /// + /// This function is a shortcut for `ring_buf.dequeue_one_with(Ok)`. + pub fn dequeue_one(&mut self) -> Result<&mut T, Empty> { + self.dequeue_one_with(Ok)? + } +} + +/// This is the "continuous" ring buffer interface: it operates with element slices, +/// and boundary conditions (empty/full) simply result in empty slices. +impl<'a, T: 'a> RingBuffer<'a, T> { + /// Call `f` with the largest contiguous slice of unallocated buffer elements, + /// and enqueue the amount of elements returned by `f`. + /// + /// # Panics + /// This function panics if the amount of elements returned by `f` is larger + /// than the size of the slice passed into it. + pub fn enqueue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R) + where + F: FnOnce(&'b mut [T]) -> (usize, R), + { + if self.length == 0 { + // Ring is currently empty. Reset `read_at` to optimize + // for contiguous space. + self.read_at = 0; + } + + let write_at = self.get_idx(self.length); + let max_size = self.contiguous_window(); + let (size, result) = f(&mut self.storage[write_at..write_at + max_size]); + assert!(size <= max_size); + self.length += size; + (size, result) + } + + /// Enqueue a slice of elements up to the given size into the buffer, + /// and return a reference to them. + /// + /// This function may return a slice smaller than the given size + /// if the free space in the buffer is not contiguous. + #[must_use] + pub fn enqueue_many(&mut self, size: usize) -> &mut [T] { + self.enqueue_many_with(|buf| { + let size = cmp::min(size, buf.len()); + (size, &mut buf[..size]) + }) + .1 + } + + /// Enqueue as many elements from the given slice into the buffer as possible, + /// and return the amount of elements that could fit. + #[must_use] + pub fn enqueue_slice(&mut self, data: &[T]) -> usize + where + T: Copy, + { + let (size_1, data) = self.enqueue_many_with(|buf| { + let size = cmp::min(buf.len(), data.len()); + buf[..size].copy_from_slice(&data[..size]); + (size, &data[size..]) + }); + let (size_2, ()) = self.enqueue_many_with(|buf| { + let size = cmp::min(buf.len(), data.len()); + buf[..size].copy_from_slice(&data[..size]); + (size, ()) + }); + size_1 + size_2 + } + + /// Call `f` with the largest contiguous slice of allocated buffer elements, + /// and dequeue the amount of elements returned by `f`. + /// + /// # Panics + /// This function panics if the amount of elements returned by `f` is larger + /// than the size of the slice passed into it. + pub fn dequeue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R) + where + F: FnOnce(&'b mut [T]) -> (usize, R), + { + let capacity = self.capacity(); + let max_size = cmp::min(self.len(), capacity - self.read_at); + let (size, result) = f(&mut self.storage[self.read_at..self.read_at + max_size]); + assert!(size <= max_size); + self.read_at = if capacity > 0 { + (self.read_at + size) % capacity + } else { + 0 + }; + self.length -= size; + (size, result) + } + + /// Dequeue a slice of elements up to the given size from the buffer, + /// and return a reference to them. + /// + /// This function may return a slice smaller than the given size + /// if the allocated space in the buffer is not contiguous. + #[must_use] + pub fn dequeue_many(&mut self, size: usize) -> &mut [T] { + self.dequeue_many_with(|buf| { + let size = cmp::min(size, buf.len()); + (size, &mut buf[..size]) + }) + .1 + } + + /// Dequeue as many elements from the buffer into the given slice as possible, + /// and return the amount of elements that could fit. + #[must_use] + pub fn dequeue_slice(&mut self, data: &mut [T]) -> usize + where + T: Copy, + { + let (size_1, data) = self.dequeue_many_with(|buf| { + let size = cmp::min(buf.len(), data.len()); + data[..size].copy_from_slice(&buf[..size]); + (size, &mut data[size..]) + }); + let (size_2, ()) = self.dequeue_many_with(|buf| { + let size = cmp::min(buf.len(), data.len()); + data[..size].copy_from_slice(&buf[..size]); + (size, ()) + }); + size_1 + size_2 + } +} + +/// This is the "random access" ring buffer interface: it operates with element slices, +/// and allows to access elements of the buffer that are not adjacent to its head or tail. +impl<'a, T: 'a> RingBuffer<'a, T> { + /// Return the largest contiguous slice of unallocated buffer elements starting + /// at the given offset past the last allocated element, and up to the given size. + #[must_use] + pub fn get_unallocated(&mut self, offset: usize, mut size: usize) -> &mut [T] { + let start_at = self.get_idx(self.length + offset); + // We can't access past the end of unallocated data. + if offset > self.window() { + return &mut []; + } + // We can't enqueue more than there is free space. + let clamped_window = self.window() - offset; + if size > clamped_window { + size = clamped_window + } + // We can't contiguously enqueue past the end of the storage. + let until_end = self.capacity() - start_at; + if size > until_end { + size = until_end + } + + &mut self.storage[start_at..start_at + size] + } + + /// Write as many elements from the given slice into unallocated buffer elements + /// starting at the given offset past the last allocated element, and return + /// the amount written. + #[must_use] + pub fn write_unallocated(&mut self, offset: usize, data: &[T]) -> usize + where + T: Copy, + { + let (size_1, offset, data) = { + let slice = self.get_unallocated(offset, data.len()); + let slice_len = slice.len(); + slice.copy_from_slice(&data[..slice_len]); + (slice_len, offset + slice_len, &data[slice_len..]) + }; + let size_2 = { + let slice = self.get_unallocated(offset, data.len()); + let slice_len = slice.len(); + slice.copy_from_slice(&data[..slice_len]); + slice_len + }; + size_1 + size_2 + } + + /// Enqueue the given number of unallocated buffer elements. + /// + /// # Panics + /// Panics if the number of elements given exceeds the number of unallocated elements. + pub fn enqueue_unallocated(&mut self, count: usize) { + assert!(count <= self.window()); + self.length += count; + } + + /// Return the largest contiguous slice of allocated buffer elements starting + /// at the given offset past the first allocated element, and up to the given size. + #[must_use] + pub fn get_allocated(&self, offset: usize, mut size: usize) -> &[T] { + let start_at = self.get_idx(offset); + // We can't read past the end of the allocated data. + if offset > self.length { + return &mut []; + } + // We can't read more than we have allocated. + let clamped_length = self.length - offset; + if size > clamped_length { + size = clamped_length + } + // We can't contiguously dequeue past the end of the storage. + let until_end = self.capacity() - start_at; + if size > until_end { + size = until_end + } + + &self.storage[start_at..start_at + size] + } + + /// Read as many elements from allocated buffer elements into the given slice + /// starting at the given offset past the first allocated element, and return + /// the amount read. + #[must_use] + pub fn read_allocated(&mut self, offset: usize, data: &mut [T]) -> usize + where + T: Copy, + { + let (size_1, offset, data) = { + let slice = self.get_allocated(offset, data.len()); + data[..slice.len()].copy_from_slice(slice); + (slice.len(), offset + slice.len(), &mut data[slice.len()..]) + }; + let size_2 = { + let slice = self.get_allocated(offset, data.len()); + data[..slice.len()].copy_from_slice(slice); + slice.len() + }; + size_1 + size_2 + } + + /// Dequeue the given number of allocated buffer elements. + /// + /// # Panics + /// Panics if the number of elements given exceeds the number of allocated elements. + pub fn dequeue_allocated(&mut self, count: usize) { + assert!(count <= self.len()); + self.length -= count; + self.read_at = self.get_idx(count); + } +} + +impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> { + fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> { + RingBuffer::new(slice) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_buffer_length_changes() { + let mut ring = RingBuffer::new(vec![0; 2]); + assert!(ring.is_empty()); + assert!(!ring.is_full()); + assert_eq!(ring.len(), 0); + assert_eq!(ring.capacity(), 2); + assert_eq!(ring.window(), 2); + + ring.length = 1; + assert!(!ring.is_empty()); + assert!(!ring.is_full()); + assert_eq!(ring.len(), 1); + assert_eq!(ring.capacity(), 2); + assert_eq!(ring.window(), 1); + + ring.length = 2; + assert!(!ring.is_empty()); + assert!(ring.is_full()); + assert_eq!(ring.len(), 2); + assert_eq!(ring.capacity(), 2); + assert_eq!(ring.window(), 0); + } + + #[test] + fn test_buffer_enqueue_dequeue_one_with() { + let mut ring = RingBuffer::new(vec![0; 5]); + assert_eq!( + ring.dequeue_one_with(|_| -> Result::<(), ()> { unreachable!() }), + Err(Empty) + ); + + ring.enqueue_one_with(Ok::<_, ()>).unwrap().unwrap(); + assert!(!ring.is_empty()); + assert!(!ring.is_full()); + + for i in 1..5 { + ring.enqueue_one_with(|e| Ok::<_, ()>(*e = i)) + .unwrap() + .unwrap(); + assert!(!ring.is_empty()); + } + assert!(ring.is_full()); + assert_eq!( + ring.enqueue_one_with(|_| -> Result::<(), ()> { unreachable!() }), + Err(Full) + ); + + for i in 0..5 { + assert_eq!( + ring.dequeue_one_with(|e| Ok::<_, ()>(*e)).unwrap().unwrap(), + i + ); + assert!(!ring.is_full()); + } + assert_eq!( + ring.dequeue_one_with(|_| -> Result::<(), ()> { unreachable!() }), + Err(Empty) + ); + assert!(ring.is_empty()); + } + + #[test] + fn test_buffer_enqueue_dequeue_one() { + let mut ring = RingBuffer::new(vec![0; 5]); + assert_eq!(ring.dequeue_one(), Err(Empty)); + + ring.enqueue_one().unwrap(); + assert!(!ring.is_empty()); + assert!(!ring.is_full()); + + for i in 1..5 { + *ring.enqueue_one().unwrap() = i; + assert!(!ring.is_empty()); + } + assert!(ring.is_full()); + assert_eq!(ring.enqueue_one(), Err(Full)); + + for i in 0..5 { + assert_eq!(*ring.dequeue_one().unwrap(), i); + assert!(!ring.is_full()); + } + assert_eq!(ring.dequeue_one(), Err(Empty)); + assert!(ring.is_empty()); + } + + #[test] + fn test_buffer_enqueue_many_with() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + + assert_eq!( + ring.enqueue_many_with(|buf| { + assert_eq!(buf.len(), 12); + buf[0..2].copy_from_slice(b"ab"); + (2, true) + }), + (2, true) + ); + assert_eq!(ring.len(), 2); + assert_eq!(&ring.storage[..], b"ab.........."); + + ring.enqueue_many_with(|buf| { + assert_eq!(buf.len(), 12 - 2); + buf[0..4].copy_from_slice(b"cdXX"); + (2, ()) + }); + assert_eq!(ring.len(), 4); + assert_eq!(&ring.storage[..], b"abcdXX......"); + + ring.enqueue_many_with(|buf| { + assert_eq!(buf.len(), 12 - 4); + buf[0..4].copy_from_slice(b"efgh"); + (4, ()) + }); + assert_eq!(ring.len(), 8); + assert_eq!(&ring.storage[..], b"abcdefgh...."); + + for _ in 0..4 { + *ring.dequeue_one().unwrap() = b'.'; + } + assert_eq!(ring.len(), 4); + assert_eq!(&ring.storage[..], b"....efgh...."); + + ring.enqueue_many_with(|buf| { + assert_eq!(buf.len(), 12 - 8); + buf[0..4].copy_from_slice(b"ijkl"); + (4, ()) + }); + assert_eq!(ring.len(), 8); + assert_eq!(&ring.storage[..], b"....efghijkl"); + + ring.enqueue_many_with(|buf| { + assert_eq!(buf.len(), 4); + buf[0..4].copy_from_slice(b"abcd"); + (4, ()) + }); + assert_eq!(ring.len(), 12); + assert_eq!(&ring.storage[..], b"abcdefghijkl"); + + for _ in 0..4 { + *ring.dequeue_one().unwrap() = b'.'; + } + assert_eq!(ring.len(), 8); + assert_eq!(&ring.storage[..], b"abcd....ijkl"); + } + + #[test] + fn test_buffer_enqueue_many() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + + ring.enqueue_many(8).copy_from_slice(b"abcdefgh"); + assert_eq!(ring.len(), 8); + assert_eq!(&ring.storage[..], b"abcdefgh...."); + + ring.enqueue_many(8).copy_from_slice(b"ijkl"); + assert_eq!(ring.len(), 12); + assert_eq!(&ring.storage[..], b"abcdefghijkl"); + } + + #[test] + fn test_buffer_enqueue_slice() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + + assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8); + assert_eq!(ring.len(), 8); + assert_eq!(&ring.storage[..], b"abcdefgh...."); + + for _ in 0..4 { + *ring.dequeue_one().unwrap() = b'.'; + } + assert_eq!(ring.len(), 4); + assert_eq!(&ring.storage[..], b"....efgh...."); + + assert_eq!(ring.enqueue_slice(b"ijklabcd"), 8); + assert_eq!(ring.len(), 12); + assert_eq!(&ring.storage[..], b"abcdefghijkl"); + } + + #[test] + fn test_buffer_dequeue_many_with() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + + assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12); + + assert_eq!( + ring.dequeue_many_with(|buf| { + assert_eq!(buf.len(), 12); + assert_eq!(buf, b"abcdefghijkl"); + buf[..4].copy_from_slice(b"...."); + (4, true) + }), + (4, true) + ); + assert_eq!(ring.len(), 8); + assert_eq!(&ring.storage[..], b"....efghijkl"); + + ring.dequeue_many_with(|buf| { + assert_eq!(buf, b"efghijkl"); + buf[..4].copy_from_slice(b"...."); + (4, ()) + }); + assert_eq!(ring.len(), 4); + assert_eq!(&ring.storage[..], b"........ijkl"); + + assert_eq!(ring.enqueue_slice(b"abcd"), 4); + assert_eq!(ring.len(), 8); + + ring.dequeue_many_with(|buf| { + assert_eq!(buf, b"ijkl"); + buf[..4].copy_from_slice(b"...."); + (4, ()) + }); + ring.dequeue_many_with(|buf| { + assert_eq!(buf, b"abcd"); + buf[..4].copy_from_slice(b"...."); + (4, ()) + }); + assert_eq!(ring.len(), 0); + assert_eq!(&ring.storage[..], b"............"); + } + + #[test] + fn test_buffer_dequeue_many() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + + assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12); + + { + let buf = ring.dequeue_many(8); + assert_eq!(buf, b"abcdefgh"); + buf.copy_from_slice(b"........"); + } + assert_eq!(ring.len(), 4); + assert_eq!(&ring.storage[..], b"........ijkl"); + + { + let buf = ring.dequeue_many(8); + assert_eq!(buf, b"ijkl"); + buf.copy_from_slice(b"...."); + } + assert_eq!(ring.len(), 0); + assert_eq!(&ring.storage[..], b"............"); + } + + #[test] + fn test_buffer_dequeue_slice() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + + assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12); + + { + let mut buf = [0; 8]; + assert_eq!(ring.dequeue_slice(&mut buf[..]), 8); + assert_eq!(&buf[..], b"abcdefgh"); + assert_eq!(ring.len(), 4); + } + + assert_eq!(ring.enqueue_slice(b"abcd"), 4); + + { + let mut buf = [0; 8]; + assert_eq!(ring.dequeue_slice(&mut buf[..]), 8); + assert_eq!(&buf[..], b"ijklabcd"); + assert_eq!(ring.len(), 0); + } + } + + #[test] + fn test_buffer_get_unallocated() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + + assert_eq!(ring.get_unallocated(16, 4), b""); + + { + let buf = ring.get_unallocated(0, 4); + buf.copy_from_slice(b"abcd"); + } + assert_eq!(&ring.storage[..], b"abcd........"); + + let buf_enqueued = ring.enqueue_many(4); + assert_eq!(buf_enqueued.len(), 4); + assert_eq!(ring.len(), 4); + + { + let buf = ring.get_unallocated(4, 8); + buf.copy_from_slice(b"ijkl"); + } + assert_eq!(&ring.storage[..], b"abcd....ijkl"); + + ring.enqueue_many(8).copy_from_slice(b"EFGHIJKL"); + ring.dequeue_many(4).copy_from_slice(b"abcd"); + assert_eq!(ring.len(), 8); + assert_eq!(&ring.storage[..], b"abcdEFGHIJKL"); + + { + let buf = ring.get_unallocated(0, 8); + buf.copy_from_slice(b"ABCD"); + } + assert_eq!(&ring.storage[..], b"ABCDEFGHIJKL"); + } + + #[test] + fn test_buffer_write_unallocated() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + ring.enqueue_many(6).copy_from_slice(b"abcdef"); + ring.dequeue_many(6).copy_from_slice(b"ABCDEF"); + + assert_eq!(ring.write_unallocated(0, b"ghi"), 3); + assert_eq!(ring.get_unallocated(0, 3), b"ghi"); + + assert_eq!(ring.write_unallocated(3, b"jklmno"), 6); + assert_eq!(ring.get_unallocated(3, 3), b"jkl"); + + assert_eq!(ring.write_unallocated(9, b"pqrstu"), 3); + assert_eq!(ring.get_unallocated(9, 3), b"pqr"); + } + + #[test] + fn test_buffer_get_allocated() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + + assert_eq!(ring.get_allocated(16, 4), b""); + assert_eq!(ring.get_allocated(0, 4), b""); + + let len_enqueued = ring.enqueue_slice(b"abcd"); + assert_eq!(ring.get_allocated(0, 8), b"abcd"); + assert_eq!(len_enqueued, 4); + + let len_enqueued = ring.enqueue_slice(b"efghijkl"); + ring.dequeue_many(4).copy_from_slice(b"...."); + assert_eq!(ring.get_allocated(4, 8), b"ijkl"); + assert_eq!(len_enqueued, 8); + + let len_enqueued = ring.enqueue_slice(b"abcd"); + assert_eq!(ring.get_allocated(4, 8), b"ijkl"); + assert_eq!(len_enqueued, 4); + } + + #[test] + fn test_buffer_read_allocated() { + let mut ring = RingBuffer::new(vec![b'.'; 12]); + ring.enqueue_many(12).copy_from_slice(b"abcdefghijkl"); + + let mut data = [0; 6]; + assert_eq!(ring.read_allocated(0, &mut data[..]), 6); + assert_eq!(&data[..], b"abcdef"); + + ring.dequeue_many(6).copy_from_slice(b"ABCDEF"); + ring.enqueue_many(3).copy_from_slice(b"mno"); + + let mut data = [0; 6]; + assert_eq!(ring.read_allocated(3, &mut data[..]), 6); + assert_eq!(&data[..], b"jklmno"); + + let mut data = [0; 6]; + assert_eq!(ring.read_allocated(6, &mut data[..]), 3); + assert_eq!(&data[..], b"mno\x00\x00\x00"); + } + + #[test] + fn test_buffer_with_no_capacity() { + let mut no_capacity: RingBuffer<u8> = RingBuffer::new(vec![]); + + // Call all functions that calculate the remainder against rx_buffer.capacity() + // with a backing storage with a length of 0. + assert_eq!(no_capacity.get_unallocated(0, 0), &[]); + assert_eq!(no_capacity.get_allocated(0, 0), &[]); + no_capacity.dequeue_allocated(0); + assert_eq!(no_capacity.enqueue_many(0), &[]); + assert_eq!(no_capacity.enqueue_one(), Err(Full)); + assert_eq!(no_capacity.contiguous_window(), 0); + } + + /// Use the buffer a bit. Then empty it and put in an item of + /// maximum size. By detecting a length of 0, the implementation + /// can reset the current buffer position. + #[test] + fn test_buffer_write_wholly() { + let mut ring = RingBuffer::new(vec![b'.'; 8]); + ring.enqueue_many(2).copy_from_slice(b"ab"); + ring.enqueue_many(2).copy_from_slice(b"cd"); + assert_eq!(ring.len(), 4); + let buf_dequeued = ring.dequeue_many(4); + assert_eq!(buf_dequeued, b"abcd"); + assert_eq!(ring.len(), 0); + + let large = ring.enqueue_many(8); + assert_eq!(large.len(), 8); + } +} |