summaryrefslogtreecommitdiff
path: root/src/storage
diff options
context:
space:
mode:
Diffstat (limited to 'src/storage')
-rw-r--r--src/storage/assembler.rs750
-rw-r--r--src/storage/mod.rs31
-rw-r--r--src/storage/packet_buffer.rs402
-rw-r--r--src/storage/ring_buffer.rs803
4 files changed, 1986 insertions, 0 deletions
diff --git a/src/storage/assembler.rs b/src/storage/assembler.rs
new file mode 100644
index 0000000..365a1e0
--- /dev/null
+++ b/src/storage/assembler.rs
@@ -0,0 +1,750 @@
+use core::fmt;
+
+use crate::config::ASSEMBLER_MAX_SEGMENT_COUNT;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct TooManyHolesError;
+
+impl fmt::Display for TooManyHolesError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "too many holes")
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for TooManyHolesError {}
+
+/// A contiguous chunk of absent data, followed by a contiguous chunk of present data.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+struct Contig {
+ hole_size: usize,
+ data_size: usize,
+}
+
+impl fmt::Display for Contig {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ if self.has_hole() {
+ write!(f, "({})", self.hole_size)?;
+ }
+ if self.has_hole() && self.has_data() {
+ write!(f, " ")?;
+ }
+ if self.has_data() {
+ write!(f, "{}", self.data_size)?;
+ }
+ Ok(())
+ }
+}
+
+#[cfg(feature = "defmt")]
+impl defmt::Format for Contig {
+ fn format(&self, fmt: defmt::Formatter) {
+ if self.has_hole() {
+ defmt::write!(fmt, "({})", self.hole_size);
+ }
+ if self.has_hole() && self.has_data() {
+ defmt::write!(fmt, " ");
+ }
+ if self.has_data() {
+ defmt::write!(fmt, "{}", self.data_size);
+ }
+ }
+}
+
+impl Contig {
+ const fn empty() -> Contig {
+ Contig {
+ hole_size: 0,
+ data_size: 0,
+ }
+ }
+
+ fn hole_and_data(hole_size: usize, data_size: usize) -> Contig {
+ Contig {
+ hole_size,
+ data_size,
+ }
+ }
+
+ fn has_hole(&self) -> bool {
+ self.hole_size != 0
+ }
+
+ fn has_data(&self) -> bool {
+ self.data_size != 0
+ }
+
+ fn total_size(&self) -> usize {
+ self.hole_size + self.data_size
+ }
+
+ fn shrink_hole_by(&mut self, size: usize) {
+ self.hole_size -= size;
+ }
+
+ fn shrink_hole_to(&mut self, size: usize) {
+ debug_assert!(self.hole_size >= size);
+
+ let total_size = self.total_size();
+ self.hole_size = size;
+ self.data_size = total_size - size;
+ }
+}
+
+/// A buffer (re)assembler.
+///
+/// Currently, up to a hardcoded limit of 4 or 32 holes can be tracked in the buffer.
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub struct Assembler {
+ contigs: [Contig; ASSEMBLER_MAX_SEGMENT_COUNT],
+}
+
+impl fmt::Display for Assembler {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "[ ")?;
+ for contig in self.contigs.iter() {
+ if !contig.has_data() {
+ break;
+ }
+ write!(f, "{contig} ")?;
+ }
+ write!(f, "]")?;
+ Ok(())
+ }
+}
+
+#[cfg(feature = "defmt")]
+impl defmt::Format for Assembler {
+ fn format(&self, fmt: defmt::Formatter) {
+ defmt::write!(fmt, "[ ");
+ for contig in self.contigs.iter() {
+ if !contig.has_data() {
+ break;
+ }
+ defmt::write!(fmt, "{} ", contig);
+ }
+ defmt::write!(fmt, "]");
+ }
+}
+
+// Invariant on Assembler::contigs:
+// - There's an index `i` where all contigs before have data, and all contigs after don't (are unused).
+// - All contigs with data must have hole_size != 0, except the first.
+
+impl Assembler {
+ /// Create a new buffer assembler.
+ pub const fn new() -> Assembler {
+ const EMPTY: Contig = Contig::empty();
+ Assembler {
+ contigs: [EMPTY; ASSEMBLER_MAX_SEGMENT_COUNT],
+ }
+ }
+
+ pub fn clear(&mut self) {
+ self.contigs.fill(Contig::empty());
+ }
+
+ fn front(&self) -> Contig {
+ self.contigs[0]
+ }
+
+ /// Return length of the front contiguous range without removing it from the assembler
+ pub fn peek_front(&self) -> usize {
+ let front = self.front();
+ if front.has_hole() {
+ 0
+ } else {
+ front.data_size
+ }
+ }
+
+ fn back(&self) -> Contig {
+ self.contigs[self.contigs.len() - 1]
+ }
+
+ /// Return whether the assembler contains no data.
+ pub fn is_empty(&self) -> bool {
+ !self.front().has_data()
+ }
+
+ /// Remove a contig at the given index.
+ fn remove_contig_at(&mut self, at: usize) {
+ debug_assert!(self.contigs[at].has_data());
+
+ for i in at..self.contigs.len() - 1 {
+ if !self.contigs[i].has_data() {
+ return;
+ }
+ self.contigs[i] = self.contigs[i + 1];
+ }
+
+ // Removing the last one.
+ self.contigs[self.contigs.len() - 1] = Contig::empty();
+ }
+
+ /// Add a contig at the given index, and return a pointer to it.
+ fn add_contig_at(&mut self, at: usize) -> Result<&mut Contig, TooManyHolesError> {
+ if self.back().has_data() {
+ return Err(TooManyHolesError);
+ }
+
+ for i in (at + 1..self.contigs.len()).rev() {
+ self.contigs[i] = self.contigs[i - 1];
+ }
+
+ self.contigs[at] = Contig::empty();
+ Ok(&mut self.contigs[at])
+ }
+
+ /// Add a new contiguous range to the assembler,
+ /// or return `Err(TooManyHolesError)` if too many discontinuities are already recorded.
+ pub fn add(&mut self, mut offset: usize, size: usize) -> Result<(), TooManyHolesError> {
+ if size == 0 {
+ return Ok(());
+ }
+
+ let mut i = 0;
+
+ // Find index of the contig containing the start of the range.
+ loop {
+ if i == self.contigs.len() {
+ // The new range is after all the previous ranges, but there/s no space to add it.
+ return Err(TooManyHolesError);
+ }
+ let contig = &mut self.contigs[i];
+ if !contig.has_data() {
+ // The new range is after all the previous ranges. Add it.
+ *contig = Contig::hole_and_data(offset, size);
+ return Ok(());
+ }
+ if offset <= contig.total_size() {
+ break;
+ }
+ offset -= contig.total_size();
+ i += 1;
+ }
+
+ let contig = &mut self.contigs[i];
+ if offset < contig.hole_size {
+ // Range starts within the hole.
+
+ if offset + size < contig.hole_size {
+ // Range also ends within the hole.
+ let new_contig = self.add_contig_at(i)?;
+ new_contig.hole_size = offset;
+ new_contig.data_size = size;
+
+ // Previous contigs[index] got moved to contigs[index+1]
+ self.contigs[i + 1].shrink_hole_by(offset + size);
+ return Ok(());
+ }
+
+ // The range being added covers both a part of the hole and a part of the data
+ // in this contig, shrink the hole in this contig.
+ contig.shrink_hole_to(offset);
+ }
+
+ // coalesce contigs to the right.
+ let mut j = i + 1;
+ while j < self.contigs.len()
+ && self.contigs[j].has_data()
+ && offset + size >= self.contigs[i].total_size() + self.contigs[j].hole_size
+ {
+ self.contigs[i].data_size += self.contigs[j].total_size();
+ j += 1;
+ }
+ let shift = j - i - 1;
+ if shift != 0 {
+ for x in i + 1..self.contigs.len() {
+ if !self.contigs[x].has_data() {
+ break;
+ }
+
+ self.contigs[x] = self
+ .contigs
+ .get(x + shift)
+ .copied()
+ .unwrap_or_else(Contig::empty);
+ }
+ }
+
+ if offset + size > self.contigs[i].total_size() {
+ // The added range still extends beyond the current contig. Increase data size.
+ let left = offset + size - self.contigs[i].total_size();
+ self.contigs[i].data_size += left;
+
+ // Decrease hole size of the next, if any.
+ if i + 1 < self.contigs.len() && self.contigs[i + 1].has_data() {
+ self.contigs[i + 1].hole_size -= left;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Remove a contiguous range from the front of the assembler.
+ /// If no such range, return 0.
+ pub fn remove_front(&mut self) -> usize {
+ let front = self.front();
+ if front.has_hole() || !front.has_data() {
+ 0
+ } else {
+ self.remove_contig_at(0);
+ debug_assert!(front.data_size > 0);
+ front.data_size
+ }
+ }
+
+ /// Add a segment, then remove_front.
+ ///
+ /// This is equivalent to calling `add` then `remove_front` individually,
+ /// except it's guaranteed to not fail when offset = 0.
+ /// This is required for TCP: we must never drop the next expected segment, or
+ /// the protocol might get stuck.
+ pub fn add_then_remove_front(
+ &mut self,
+ offset: usize,
+ size: usize,
+ ) -> Result<usize, TooManyHolesError> {
+ // This is the only case where a segment at offset=0 would cause the
+ // total amount of contigs to rise (and therefore can potentially cause
+ // a TooManyHolesError). Handle it in a way that is guaranteed to succeed.
+ if offset == 0 && size < self.contigs[0].hole_size {
+ self.contigs[0].hole_size -= size;
+ return Ok(size);
+ }
+
+ self.add(offset, size)?;
+ Ok(self.remove_front())
+ }
+
+ /// Iterate over all of the contiguous data ranges.
+ ///
+ /// This is used in calculating what data ranges have been received. The offset indicates the
+ /// number of bytes of contiguous data received before the beginnings of this Assembler.
+ ///
+ /// Data Hole Data
+ /// |--- 100 ---|--- 200 ---|--- 100 ---|
+ ///
+ /// An offset of 1500 would return the ranges: ``(1500, 1600), (1800, 1900)``
+ pub fn iter_data(&self, first_offset: usize) -> AssemblerIter {
+ AssemblerIter::new(self, first_offset)
+ }
+}
+
+pub struct AssemblerIter<'a> {
+ assembler: &'a Assembler,
+ offset: usize,
+ index: usize,
+ left: usize,
+ right: usize,
+}
+
+impl<'a> AssemblerIter<'a> {
+ fn new(assembler: &'a Assembler, offset: usize) -> AssemblerIter<'a> {
+ AssemblerIter {
+ assembler,
+ offset,
+ index: 0,
+ left: 0,
+ right: 0,
+ }
+ }
+}
+
+impl<'a> Iterator for AssemblerIter<'a> {
+ type Item = (usize, usize);
+
+ fn next(&mut self) -> Option<(usize, usize)> {
+ let mut data_range = None;
+ while data_range.is_none() && self.index < self.assembler.contigs.len() {
+ let contig = self.assembler.contigs[self.index];
+ self.left += contig.hole_size;
+ self.right = self.left + contig.data_size;
+ data_range = if self.left < self.right {
+ let data_range = (self.left + self.offset, self.right + self.offset);
+ self.left = self.right;
+ Some(data_range)
+ } else {
+ None
+ };
+ self.index += 1;
+ }
+ data_range
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use std::vec::Vec;
+
+ impl From<Vec<(usize, usize)>> for Assembler {
+ fn from(vec: Vec<(usize, usize)>) -> Assembler {
+ const EMPTY: Contig = Contig::empty();
+
+ let mut contigs = [EMPTY; ASSEMBLER_MAX_SEGMENT_COUNT];
+ for (i, &(hole_size, data_size)) in vec.iter().enumerate() {
+ contigs[i] = Contig {
+ hole_size,
+ data_size,
+ };
+ }
+ Assembler { contigs }
+ }
+ }
+
+ macro_rules! contigs {
+ [$( $x:expr ),*] => ({
+ Assembler::from(vec![$( $x ),*])
+ })
+ }
+
+ #[test]
+ fn test_new() {
+ let assr = Assembler::new();
+ assert_eq!(assr, contigs![]);
+ }
+
+ #[test]
+ fn test_empty_add_full() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(0, 16), Ok(()));
+ assert_eq!(assr, contigs![(0, 16)]);
+ }
+
+ #[test]
+ fn test_empty_add_front() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(0, 4), Ok(()));
+ assert_eq!(assr, contigs![(0, 4)]);
+ }
+
+ #[test]
+ fn test_empty_add_back() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(12, 4), Ok(()));
+ assert_eq!(assr, contigs![(12, 4)]);
+ }
+
+ #[test]
+ fn test_empty_add_mid() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(4, 8), Ok(()));
+ assert_eq!(assr, contigs![(4, 8)]);
+ }
+
+ #[test]
+ fn test_partial_add_front() {
+ let mut assr = contigs![(4, 8)];
+ assert_eq!(assr.add(0, 4), Ok(()));
+ assert_eq!(assr, contigs![(0, 12)]);
+ }
+
+ #[test]
+ fn test_partial_add_back() {
+ let mut assr = contigs![(4, 8)];
+ assert_eq!(assr.add(12, 4), Ok(()));
+ assert_eq!(assr, contigs![(4, 12)]);
+ }
+
+ #[test]
+ fn test_partial_add_front_overlap() {
+ let mut assr = contigs![(4, 8)];
+ assert_eq!(assr.add(0, 8), Ok(()));
+ assert_eq!(assr, contigs![(0, 12)]);
+ }
+
+ #[test]
+ fn test_partial_add_front_overlap_split() {
+ let mut assr = contigs![(4, 8)];
+ assert_eq!(assr.add(2, 6), Ok(()));
+ assert_eq!(assr, contigs![(2, 10)]);
+ }
+
+ #[test]
+ fn test_partial_add_back_overlap() {
+ let mut assr = contigs![(4, 8)];
+ assert_eq!(assr.add(8, 8), Ok(()));
+ assert_eq!(assr, contigs![(4, 12)]);
+ }
+
+ #[test]
+ fn test_partial_add_back_overlap_split() {
+ let mut assr = contigs![(4, 8)];
+ assert_eq!(assr.add(10, 4), Ok(()));
+ assert_eq!(assr, contigs![(4, 10)]);
+ }
+
+ #[test]
+ fn test_partial_add_both_overlap() {
+ let mut assr = contigs![(4, 8)];
+ assert_eq!(assr.add(0, 16), Ok(()));
+ assert_eq!(assr, contigs![(0, 16)]);
+ }
+
+ #[test]
+ fn test_partial_add_both_overlap_split() {
+ let mut assr = contigs![(4, 8)];
+ assert_eq!(assr.add(2, 12), Ok(()));
+ assert_eq!(assr, contigs![(2, 12)]);
+ }
+
+ #[test]
+ fn test_rejected_add_keeps_state() {
+ let mut assr = Assembler::new();
+ for c in 1..=ASSEMBLER_MAX_SEGMENT_COUNT {
+ assert_eq!(assr.add(c * 10, 3), Ok(()));
+ }
+ // Maximum of allowed holes is reached
+ let assr_before = assr.clone();
+ assert_eq!(assr.add(1, 3), Err(TooManyHolesError));
+ assert_eq!(assr_before, assr);
+ }
+
+ #[test]
+ fn test_empty_remove_front() {
+ let mut assr = contigs![];
+ assert_eq!(assr.remove_front(), 0);
+ }
+
+ #[test]
+ fn test_trailing_hole_remove_front() {
+ let mut assr = contigs![(0, 4)];
+ assert_eq!(assr.remove_front(), 4);
+ assert_eq!(assr, contigs![]);
+ }
+
+ #[test]
+ fn test_trailing_data_remove_front() {
+ let mut assr = contigs![(0, 4), (4, 4)];
+ assert_eq!(assr.remove_front(), 4);
+ assert_eq!(assr, contigs![(4, 4)]);
+ }
+
+ #[test]
+ fn test_boundary_case_remove_front() {
+ let mut vec = vec![(1, 1); ASSEMBLER_MAX_SEGMENT_COUNT];
+ vec[0] = (0, 2);
+ let mut assr = Assembler::from(vec);
+ assert_eq!(assr.remove_front(), 2);
+ let mut vec = vec![(1, 1); ASSEMBLER_MAX_SEGMENT_COUNT];
+ vec[ASSEMBLER_MAX_SEGMENT_COUNT - 1] = (0, 0);
+ let exp_assr = Assembler::from(vec);
+ assert_eq!(assr, exp_assr);
+ }
+
+ #[test]
+ fn test_shrink_next_hole() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(100, 10), Ok(()));
+ assert_eq!(assr.add(50, 10), Ok(()));
+ assert_eq!(assr.add(40, 30), Ok(()));
+ assert_eq!(assr, contigs![(40, 30), (30, 10)]);
+ }
+
+ #[test]
+ fn test_join_two() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(10, 10), Ok(()));
+ assert_eq!(assr.add(50, 10), Ok(()));
+ assert_eq!(assr.add(15, 40), Ok(()));
+ assert_eq!(assr, contigs![(10, 50)]);
+ }
+
+ #[test]
+ fn test_join_two_reversed() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(50, 10), Ok(()));
+ assert_eq!(assr.add(10, 10), Ok(()));
+ assert_eq!(assr.add(15, 40), Ok(()));
+ assert_eq!(assr, contigs![(10, 50)]);
+ }
+
+ #[test]
+ fn test_join_two_overlong() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(50, 10), Ok(()));
+ assert_eq!(assr.add(10, 10), Ok(()));
+ assert_eq!(assr.add(15, 60), Ok(()));
+ assert_eq!(assr, contigs![(10, 65)]);
+ }
+
+ #[test]
+ fn test_iter_empty() {
+ let assr = Assembler::new();
+ let segments: Vec<_> = assr.iter_data(10).collect();
+ assert_eq!(segments, vec![]);
+ }
+
+ #[test]
+ fn test_iter_full() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(0, 16), Ok(()));
+ let segments: Vec<_> = assr.iter_data(10).collect();
+ assert_eq!(segments, vec![(10, 26)]);
+ }
+
+ #[test]
+ fn test_iter_offset() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(0, 16), Ok(()));
+ let segments: Vec<_> = assr.iter_data(100).collect();
+ assert_eq!(segments, vec![(100, 116)]);
+ }
+
+ #[test]
+ fn test_iter_one_front() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(0, 4), Ok(()));
+ let segments: Vec<_> = assr.iter_data(10).collect();
+ assert_eq!(segments, vec![(10, 14)]);
+ }
+
+ #[test]
+ fn test_iter_one_back() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(12, 4), Ok(()));
+ let segments: Vec<_> = assr.iter_data(10).collect();
+ assert_eq!(segments, vec![(22, 26)]);
+ }
+
+ #[test]
+ fn test_iter_one_mid() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(4, 8), Ok(()));
+ let segments: Vec<_> = assr.iter_data(10).collect();
+ assert_eq!(segments, vec![(14, 22)]);
+ }
+
+ #[test]
+ fn test_iter_one_trailing_gap() {
+ let assr = contigs![(4, 8)];
+ let segments: Vec<_> = assr.iter_data(100).collect();
+ assert_eq!(segments, vec![(104, 112)]);
+ }
+
+ #[test]
+ fn test_iter_two_split() {
+ let assr = contigs![(2, 6), (4, 1)];
+ let segments: Vec<_> = assr.iter_data(100).collect();
+ assert_eq!(segments, vec![(102, 108), (112, 113)]);
+ }
+
+ #[test]
+ fn test_iter_three_split() {
+ let assr = contigs![(2, 6), (2, 1), (2, 2)];
+ let segments: Vec<_> = assr.iter_data(100).collect();
+ assert_eq!(segments, vec![(102, 108), (110, 111), (113, 115)]);
+ }
+
+ #[test]
+ fn test_issue_694() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(0, 1), Ok(()));
+ assert_eq!(assr.add(2, 1), Ok(()));
+ assert_eq!(assr.add(1, 1), Ok(()));
+ }
+
+ #[test]
+ fn test_add_then_remove_front() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(50, 10), Ok(()));
+ assert_eq!(assr.add_then_remove_front(10, 10), Ok(0));
+ assert_eq!(assr, contigs![(10, 10), (30, 10)]);
+ }
+
+ #[test]
+ fn test_add_then_remove_front_at_front() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(50, 10), Ok(()));
+ assert_eq!(assr.add_then_remove_front(0, 10), Ok(10));
+ assert_eq!(assr, contigs![(40, 10)]);
+ }
+
+ #[test]
+ fn test_add_then_remove_front_at_front_touch() {
+ let mut assr = Assembler::new();
+ assert_eq!(assr.add(50, 10), Ok(()));
+ assert_eq!(assr.add_then_remove_front(0, 50), Ok(60));
+ assert_eq!(assr, contigs![]);
+ }
+
+ #[test]
+ fn test_add_then_remove_front_at_front_full() {
+ let mut assr = Assembler::new();
+ for c in 1..=ASSEMBLER_MAX_SEGMENT_COUNT {
+ assert_eq!(assr.add(c * 10, 3), Ok(()));
+ }
+ // Maximum of allowed holes is reached
+ let assr_before = assr.clone();
+ assert_eq!(assr.add_then_remove_front(1, 3), Err(TooManyHolesError));
+ assert_eq!(assr_before, assr);
+ }
+
+ #[test]
+ fn test_add_then_remove_front_at_front_full_offset_0() {
+ let mut assr = Assembler::new();
+ for c in 1..=ASSEMBLER_MAX_SEGMENT_COUNT {
+ assert_eq!(assr.add(c * 10, 3), Ok(()));
+ }
+ assert_eq!(assr.add_then_remove_front(0, 3), Ok(3));
+ }
+
+ // Test against an obviously-correct but inefficient bitmap impl.
+ #[test]
+ fn test_random() {
+ use rand::Rng;
+
+ const MAX_INDEX: usize = 256;
+
+ for max_size in [2, 5, 10, 100] {
+ for _ in 0..300 {
+ //println!("===");
+ let mut assr = Assembler::new();
+ let mut map = [false; MAX_INDEX];
+
+ for _ in 0..60 {
+ let offset = rand::thread_rng().gen_range(0..MAX_INDEX - max_size - 1);
+ let size = rand::thread_rng().gen_range(1..=max_size);
+
+ //println!("add {}..{} {}", offset, offset + size, size);
+ // Real impl
+ let res = assr.add(offset, size);
+
+ // Bitmap impl
+ let mut map2 = map;
+ map2[offset..][..size].fill(true);
+
+ let mut contigs = vec![];
+ let mut hole: usize = 0;
+ let mut data: usize = 0;
+ for b in map2 {
+ if b {
+ data += 1;
+ } else {
+ if data != 0 {
+ contigs.push((hole, data));
+ hole = 0;
+ data = 0;
+ }
+ hole += 1;
+ }
+ }
+
+ // Compare.
+ let wanted_res = if contigs.len() > ASSEMBLER_MAX_SEGMENT_COUNT {
+ Err(TooManyHolesError)
+ } else {
+ Ok(())
+ };
+ assert_eq!(res, wanted_res);
+ if res.is_ok() {
+ map = map2;
+ assert_eq!(assr, Assembler::from(contigs));
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
new file mode 100644
index 0000000..b03de71
--- /dev/null
+++ b/src/storage/mod.rs
@@ -0,0 +1,31 @@
+/*! Specialized containers.
+
+The `storage` module provides containers for use in other modules.
+The containers support both pre-allocated memory, without the `std`
+or `alloc` crates being available, and heap-allocated memory.
+*/
+
+mod assembler;
+mod packet_buffer;
+mod ring_buffer;
+
+pub use self::assembler::Assembler;
+pub use self::packet_buffer::{PacketBuffer, PacketMetadata};
+pub use self::ring_buffer::RingBuffer;
+
+/// A trait for setting a value to a known state.
+///
+/// In-place analog of Default.
+pub trait Resettable {
+ fn reset(&mut self);
+}
+
+/// Error returned when enqueuing into a full buffer.
+#[derive(Debug, PartialEq, Eq, Clone, Copy)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub struct Full;
+
+/// Error returned when dequeuing from an empty buffer.
+#[derive(Debug, PartialEq, Eq, Clone, Copy)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub struct Empty;
diff --git a/src/storage/packet_buffer.rs b/src/storage/packet_buffer.rs
new file mode 100644
index 0000000..28119fa
--- /dev/null
+++ b/src/storage/packet_buffer.rs
@@ -0,0 +1,402 @@
+use managed::ManagedSlice;
+
+use crate::storage::{Full, RingBuffer};
+
+use super::Empty;
+
+/// Size and header of a packet.
+#[derive(Debug, Clone, Copy)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub struct PacketMetadata<H> {
+ size: usize,
+ header: Option<H>,
+}
+
+impl<H> PacketMetadata<H> {
+ /// Empty packet description.
+ pub const EMPTY: PacketMetadata<H> = PacketMetadata {
+ size: 0,
+ header: None,
+ };
+
+ fn padding(size: usize) -> PacketMetadata<H> {
+ PacketMetadata {
+ size: size,
+ header: None,
+ }
+ }
+
+ fn packet(size: usize, header: H) -> PacketMetadata<H> {
+ PacketMetadata {
+ size: size,
+ header: Some(header),
+ }
+ }
+
+ fn is_padding(&self) -> bool {
+ self.header.is_none()
+ }
+}
+
+/// An UDP packet ring buffer.
+#[derive(Debug)]
+pub struct PacketBuffer<'a, H: 'a> {
+ metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
+ payload_ring: RingBuffer<'a, u8>,
+}
+
+impl<'a, H> PacketBuffer<'a, H> {
+ /// Create a new packet buffer with the provided metadata and payload storage.
+ ///
+ /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
+ /// storage limits the maximum _total size_ of packets.
+ pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
+ where
+ MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
+ PS: Into<ManagedSlice<'a, u8>>,
+ {
+ PacketBuffer {
+ metadata_ring: RingBuffer::new(metadata_storage),
+ payload_ring: RingBuffer::new(payload_storage),
+ }
+ }
+
+ /// Query whether the buffer is empty.
+ pub fn is_empty(&self) -> bool {
+ self.metadata_ring.is_empty()
+ }
+
+ /// Query whether the buffer is full.
+ pub fn is_full(&self) -> bool {
+ self.metadata_ring.is_full()
+ }
+
+ // There is currently no enqueue_with() because of the complexity of managing padding
+ // in case of failure.
+
+ /// Enqueue a single packet with the given header into the buffer, and
+ /// return a reference to its payload, or return `Err(Full)`
+ /// if the buffer is full.
+ pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full> {
+ if self.payload_ring.capacity() < size || self.metadata_ring.is_full() {
+ return Err(Full);
+ }
+
+ // Ring is currently empty. Clear it (resetting `read_at`) to maximize
+ // for contiguous space.
+ if self.payload_ring.is_empty() {
+ self.payload_ring.clear();
+ }
+
+ let window = self.payload_ring.window();
+ let contig_window = self.payload_ring.contiguous_window();
+
+ if window < size {
+ return Err(Full);
+ } else if contig_window < size {
+ if window - contig_window < size {
+ // The buffer length is larger than the current contiguous window
+ // and is larger than the contiguous window will be after adding
+ // the padding necessary to circle around to the beginning of the
+ // ring buffer.
+ return Err(Full);
+ } else {
+ // Add padding to the end of the ring buffer so that the
+ // contiguous window is at the beginning of the ring buffer.
+ *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
+ // note(discard): function does not write to the result
+ // enqueued padding buffer location
+ let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
+ }
+ }
+
+ *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
+
+ let payload_buf = self.payload_ring.enqueue_many(size);
+ debug_assert!(payload_buf.len() == size);
+ Ok(payload_buf)
+ }
+
+ /// Call `f` with a packet from the buffer large enough to fit `max_size` bytes. The packet
+ /// is shrunk to the size returned from `f` and enqueued into the buffer.
+ pub fn enqueue_with_infallible<'b, F>(
+ &'b mut self,
+ max_size: usize,
+ header: H,
+ f: F,
+ ) -> Result<usize, Full>
+ where
+ F: FnOnce(&'b mut [u8]) -> usize,
+ {
+ if self.payload_ring.capacity() < max_size || self.metadata_ring.is_full() {
+ return Err(Full);
+ }
+
+ let window = self.payload_ring.window();
+ let contig_window = self.payload_ring.contiguous_window();
+
+ if window < max_size {
+ return Err(Full);
+ } else if contig_window < max_size {
+ if window - contig_window < max_size {
+ // The buffer length is larger than the current contiguous window
+ // and is larger than the contiguous window will be after adding
+ // the padding necessary to circle around to the beginning of the
+ // ring buffer.
+ return Err(Full);
+ } else {
+ // Add padding to the end of the ring buffer so that the
+ // contiguous window is at the beginning of the ring buffer.
+ *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
+ // note(discard): function does not write to the result
+ // enqueued padding buffer location
+ let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
+ }
+ }
+
+ let (size, _) = self
+ .payload_ring
+ .enqueue_many_with(|data| (f(&mut data[..max_size]), ()));
+
+ *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
+
+ Ok(size)
+ }
+
+ fn dequeue_padding(&mut self) {
+ let _ = self.metadata_ring.dequeue_one_with(|metadata| {
+ if metadata.is_padding() {
+ // note(discard): function does not use value of dequeued padding bytes
+ let _buf_dequeued = self.payload_ring.dequeue_many(metadata.size);
+ Ok(()) // dequeue metadata
+ } else {
+ Err(()) // don't dequeue metadata
+ }
+ });
+ }
+
+ /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
+ /// returns successfully, or return `Err(EmptyError)` if the buffer is empty.
+ pub fn dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty>
+ where
+ F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>,
+ {
+ self.dequeue_padding();
+
+ self.metadata_ring.dequeue_one_with(|metadata| {
+ self.payload_ring
+ .dequeue_many_with(|payload_buf| {
+ debug_assert!(payload_buf.len() >= metadata.size);
+
+ match f(
+ metadata.header.as_mut().unwrap(),
+ &mut payload_buf[..metadata.size],
+ ) {
+ Ok(val) => (metadata.size, Ok(val)),
+ Err(err) => (0, Err(err)),
+ }
+ })
+ .1
+ })
+ }
+
+ /// Dequeue a single packet from the buffer, and return a reference to its payload
+ /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
+ pub fn dequeue(&mut self) -> Result<(H, &mut [u8]), Empty> {
+ self.dequeue_padding();
+
+ let meta = self.metadata_ring.dequeue_one()?;
+
+ let payload_buf = self.payload_ring.dequeue_many(meta.size);
+ debug_assert!(payload_buf.len() == meta.size);
+ Ok((meta.header.take().unwrap(), payload_buf))
+ }
+
+ /// Peek at a single packet from the buffer without removing it, and return a reference to
+ /// its payload as well as its header, or return `Err(Error:Exhausted)` if the buffer is empty.
+ ///
+ /// This function otherwise behaves identically to [dequeue](#method.dequeue).
+ pub fn peek(&mut self) -> Result<(&H, &[u8]), Empty> {
+ self.dequeue_padding();
+
+ if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
+ Ok((
+ metadata.header.as_ref().unwrap(),
+ self.payload_ring.get_allocated(0, metadata.size),
+ ))
+ } else {
+ Err(Empty)
+ }
+ }
+
+ /// Return the maximum number packets that can be stored.
+ pub fn packet_capacity(&self) -> usize {
+ self.metadata_ring.capacity()
+ }
+
+ /// Return the maximum number of bytes in the payload ring buffer.
+ pub fn payload_capacity(&self) -> usize {
+ self.payload_ring.capacity()
+ }
+
+ /// Reset the packet buffer and clear any staged.
+ #[allow(unused)]
+ pub(crate) fn reset(&mut self) {
+ self.payload_ring.clear();
+ self.metadata_ring.clear();
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ fn buffer() -> PacketBuffer<'static, ()> {
+ PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16])
+ }
+
+ #[test]
+ fn test_simple() {
+ let mut buffer = buffer();
+ buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
+ assert_eq!(buffer.enqueue(16, ()), Err(Full));
+ assert_eq!(buffer.metadata_ring.len(), 1);
+ assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
+ assert_eq!(buffer.dequeue(), Err(Empty));
+ }
+
+ #[test]
+ fn test_peek() {
+ let mut buffer = buffer();
+ assert_eq!(buffer.peek(), Err(Empty));
+ buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
+ assert_eq!(buffer.metadata_ring.len(), 1);
+ assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
+ assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
+ assert_eq!(buffer.peek(), Err(Empty));
+ }
+
+ #[test]
+ fn test_padding() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(6, ()).is_ok());
+ assert!(buffer.enqueue(8, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
+ assert_eq!(buffer.metadata_ring.len(), 3);
+ assert!(buffer.dequeue().is_ok());
+
+ assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
+ assert_eq!(buffer.metadata_ring.len(), 0);
+ }
+
+ #[test]
+ fn test_padding_with_large_payload() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(12, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ buffer
+ .enqueue(12, ())
+ .unwrap()
+ .copy_from_slice(b"abcdefghijkl");
+ }
+
+ #[test]
+ fn test_dequeue_with() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(6, ()).is_ok());
+ assert!(buffer.enqueue(8, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
+ assert_eq!(buffer.metadata_ring.len(), 3);
+ assert!(buffer.dequeue().is_ok());
+
+ assert!(matches!(
+ buffer.dequeue_with(|_, _| Result::<(), u32>::Err(123)),
+ Ok(Err(_))
+ ));
+ assert_eq!(buffer.metadata_ring.len(), 1);
+
+ assert!(buffer
+ .dequeue_with(|&mut (), payload| {
+ assert_eq!(payload, &b"abcd"[..]);
+ Result::<(), ()>::Ok(())
+ })
+ .is_ok());
+ assert_eq!(buffer.metadata_ring.len(), 0);
+ }
+
+ #[test]
+ fn test_metadata_full_empty() {
+ let mut buffer = buffer();
+ assert!(buffer.is_empty());
+ assert!(!buffer.is_full());
+ assert!(buffer.enqueue(1, ()).is_ok());
+ assert!(!buffer.is_empty());
+ assert!(buffer.enqueue(1, ()).is_ok());
+ assert!(buffer.enqueue(1, ()).is_ok());
+ assert!(!buffer.is_full());
+ assert!(!buffer.is_empty());
+ assert!(buffer.enqueue(1, ()).is_ok());
+ assert!(buffer.is_full());
+ assert!(!buffer.is_empty());
+ assert_eq!(buffer.metadata_ring.len(), 4);
+ assert_eq!(buffer.enqueue(1, ()), Err(Full));
+ }
+
+ #[test]
+ fn test_window_too_small() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(4, ()).is_ok());
+ assert!(buffer.enqueue(8, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ assert_eq!(buffer.enqueue(16, ()), Err(Full));
+ assert_eq!(buffer.metadata_ring.len(), 1);
+ }
+
+ #[test]
+ fn test_contiguous_window_too_small() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(4, ()).is_ok());
+ assert!(buffer.enqueue(8, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ assert_eq!(buffer.enqueue(8, ()), Err(Full));
+ assert_eq!(buffer.metadata_ring.len(), 1);
+ }
+
+ #[test]
+ fn test_contiguous_window_wrap() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(15, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ assert!(buffer.enqueue(16, ()).is_ok());
+ }
+
+ #[test]
+ fn test_capacity_too_small() {
+ let mut buffer = buffer();
+ assert_eq!(buffer.enqueue(32, ()), Err(Full));
+ }
+
+ #[test]
+ fn test_contig_window_prioritized() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(4, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ assert!(buffer.enqueue(5, ()).is_ok());
+ }
+
+ #[test]
+ fn clear() {
+ let mut buffer = buffer();
+
+ // Ensure enqueuing data in the buffer fills it somewhat.
+ assert!(buffer.is_empty());
+ assert!(buffer.enqueue(6, ()).is_ok());
+
+ // Ensure that resetting the buffer causes it to be empty.
+ assert!(!buffer.is_empty());
+ buffer.reset();
+ assert!(buffer.is_empty());
+ }
+}
diff --git a/src/storage/ring_buffer.rs b/src/storage/ring_buffer.rs
new file mode 100644
index 0000000..7d461b6
--- /dev/null
+++ b/src/storage/ring_buffer.rs
@@ -0,0 +1,803 @@
+// Some of the functions in ring buffer is marked as #[must_use]. It notes that
+// these functions may have side effects, and it's implemented by [RFC 1940].
+// [RFC 1940]: https://github.com/rust-lang/rust/issues/43302
+
+use core::cmp;
+use managed::ManagedSlice;
+
+use crate::storage::Resettable;
+
+use super::{Empty, Full};
+
+/// A ring buffer.
+///
+/// This ring buffer implementation provides many ways to interact with it:
+///
+/// * Enqueueing or dequeueing one element from corresponding side of the buffer;
+/// * Enqueueing or dequeueing a slice of elements from corresponding side of the buffer;
+/// * Accessing allocated and unallocated areas directly.
+///
+/// It is also zero-copy; all methods provide references into the buffer's storage.
+/// Note that all references are mutable; it is considered more important to allow
+/// in-place processing than to protect from accidental mutation.
+///
+/// This implementation is suitable for both simple uses such as a FIFO queue
+/// of UDP packets, and advanced ones such as a TCP reassembly buffer.
+#[derive(Debug)]
+pub struct RingBuffer<'a, T: 'a> {
+ storage: ManagedSlice<'a, T>,
+ read_at: usize,
+ length: usize,
+}
+
+impl<'a, T: 'a> RingBuffer<'a, T> {
+ /// Create a ring buffer with the given storage.
+ ///
+ /// During creation, every element in `storage` is reset.
+ pub fn new<S>(storage: S) -> RingBuffer<'a, T>
+ where
+ S: Into<ManagedSlice<'a, T>>,
+ {
+ RingBuffer {
+ storage: storage.into(),
+ read_at: 0,
+ length: 0,
+ }
+ }
+
+ /// Clear the ring buffer.
+ pub fn clear(&mut self) {
+ self.read_at = 0;
+ self.length = 0;
+ }
+
+ /// Return the maximum number of elements in the ring buffer.
+ pub fn capacity(&self) -> usize {
+ self.storage.len()
+ }
+
+ /// Clear the ring buffer, and reset every element.
+ pub fn reset(&mut self)
+ where
+ T: Resettable,
+ {
+ self.clear();
+ for elem in self.storage.iter_mut() {
+ elem.reset();
+ }
+ }
+
+ /// Return the current number of elements in the ring buffer.
+ pub fn len(&self) -> usize {
+ self.length
+ }
+
+ /// Return the number of elements that can be added to the ring buffer.
+ pub fn window(&self) -> usize {
+ self.capacity() - self.len()
+ }
+
+ /// Return the largest number of elements that can be added to the buffer
+ /// without wrapping around (i.e. in a single `enqueue_many` call).
+ pub fn contiguous_window(&self) -> usize {
+ cmp::min(self.window(), self.capacity() - self.get_idx(self.length))
+ }
+
+ /// Query whether the buffer is empty.
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Query whether the buffer is full.
+ pub fn is_full(&self) -> bool {
+ self.window() == 0
+ }
+
+ /// Shorthand for `(self.read + idx) % self.capacity()` with an
+ /// additional check to ensure that the capacity is not zero.
+ fn get_idx(&self, idx: usize) -> usize {
+ let len = self.capacity();
+ if len > 0 {
+ (self.read_at + idx) % len
+ } else {
+ 0
+ }
+ }
+
+ /// Shorthand for `(self.read + idx) % self.capacity()` with no
+ /// additional checks to ensure the capacity is not zero.
+ fn get_idx_unchecked(&self, idx: usize) -> usize {
+ (self.read_at + idx) % self.capacity()
+ }
+}
+
+/// This is the "discrete" ring buffer interface: it operates with single elements,
+/// and boundary conditions (empty/full) are errors.
+impl<'a, T: 'a> RingBuffer<'a, T> {
+ /// Call `f` with a single buffer element, and enqueue the element if `f`
+ /// returns successfully, or return `Err(Full)` if the buffer is full.
+ pub fn enqueue_one_with<'b, R, E, F>(&'b mut self, f: F) -> Result<Result<R, E>, Full>
+ where
+ F: FnOnce(&'b mut T) -> Result<R, E>,
+ {
+ if self.is_full() {
+ return Err(Full);
+ }
+
+ let index = self.get_idx_unchecked(self.length);
+ let res = f(&mut self.storage[index]);
+ if res.is_ok() {
+ self.length += 1;
+ }
+ Ok(res)
+ }
+
+ /// Enqueue a single element into the buffer, and return a reference to it,
+ /// or return `Err(Full)` if the buffer is full.
+ ///
+ /// This function is a shortcut for `ring_buf.enqueue_one_with(Ok)`.
+ pub fn enqueue_one(&mut self) -> Result<&mut T, Full> {
+ self.enqueue_one_with(Ok)?
+ }
+
+ /// Call `f` with a single buffer element, and dequeue the element if `f`
+ /// returns successfully, or return `Err(Empty)` if the buffer is empty.
+ pub fn dequeue_one_with<'b, R, E, F>(&'b mut self, f: F) -> Result<Result<R, E>, Empty>
+ where
+ F: FnOnce(&'b mut T) -> Result<R, E>,
+ {
+ if self.is_empty() {
+ return Err(Empty);
+ }
+
+ let next_at = self.get_idx_unchecked(1);
+ let res = f(&mut self.storage[self.read_at]);
+
+ if res.is_ok() {
+ self.length -= 1;
+ self.read_at = next_at;
+ }
+ Ok(res)
+ }
+
+ /// Dequeue an element from the buffer, and return a reference to it,
+ /// or return `Err(Empty)` if the buffer is empty.
+ ///
+ /// This function is a shortcut for `ring_buf.dequeue_one_with(Ok)`.
+ pub fn dequeue_one(&mut self) -> Result<&mut T, Empty> {
+ self.dequeue_one_with(Ok)?
+ }
+}
+
+/// This is the "continuous" ring buffer interface: it operates with element slices,
+/// and boundary conditions (empty/full) simply result in empty slices.
+impl<'a, T: 'a> RingBuffer<'a, T> {
+ /// Call `f` with the largest contiguous slice of unallocated buffer elements,
+ /// and enqueue the amount of elements returned by `f`.
+ ///
+ /// # Panics
+ /// This function panics if the amount of elements returned by `f` is larger
+ /// than the size of the slice passed into it.
+ pub fn enqueue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
+ where
+ F: FnOnce(&'b mut [T]) -> (usize, R),
+ {
+ if self.length == 0 {
+ // Ring is currently empty. Reset `read_at` to optimize
+ // for contiguous space.
+ self.read_at = 0;
+ }
+
+ let write_at = self.get_idx(self.length);
+ let max_size = self.contiguous_window();
+ let (size, result) = f(&mut self.storage[write_at..write_at + max_size]);
+ assert!(size <= max_size);
+ self.length += size;
+ (size, result)
+ }
+
+ /// Enqueue a slice of elements up to the given size into the buffer,
+ /// and return a reference to them.
+ ///
+ /// This function may return a slice smaller than the given size
+ /// if the free space in the buffer is not contiguous.
+ #[must_use]
+ pub fn enqueue_many(&mut self, size: usize) -> &mut [T] {
+ self.enqueue_many_with(|buf| {
+ let size = cmp::min(size, buf.len());
+ (size, &mut buf[..size])
+ })
+ .1
+ }
+
+ /// Enqueue as many elements from the given slice into the buffer as possible,
+ /// and return the amount of elements that could fit.
+ #[must_use]
+ pub fn enqueue_slice(&mut self, data: &[T]) -> usize
+ where
+ T: Copy,
+ {
+ let (size_1, data) = self.enqueue_many_with(|buf| {
+ let size = cmp::min(buf.len(), data.len());
+ buf[..size].copy_from_slice(&data[..size]);
+ (size, &data[size..])
+ });
+ let (size_2, ()) = self.enqueue_many_with(|buf| {
+ let size = cmp::min(buf.len(), data.len());
+ buf[..size].copy_from_slice(&data[..size]);
+ (size, ())
+ });
+ size_1 + size_2
+ }
+
+ /// Call `f` with the largest contiguous slice of allocated buffer elements,
+ /// and dequeue the amount of elements returned by `f`.
+ ///
+ /// # Panics
+ /// This function panics if the amount of elements returned by `f` is larger
+ /// than the size of the slice passed into it.
+ pub fn dequeue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
+ where
+ F: FnOnce(&'b mut [T]) -> (usize, R),
+ {
+ let capacity = self.capacity();
+ let max_size = cmp::min(self.len(), capacity - self.read_at);
+ let (size, result) = f(&mut self.storage[self.read_at..self.read_at + max_size]);
+ assert!(size <= max_size);
+ self.read_at = if capacity > 0 {
+ (self.read_at + size) % capacity
+ } else {
+ 0
+ };
+ self.length -= size;
+ (size, result)
+ }
+
+ /// Dequeue a slice of elements up to the given size from the buffer,
+ /// and return a reference to them.
+ ///
+ /// This function may return a slice smaller than the given size
+ /// if the allocated space in the buffer is not contiguous.
+ #[must_use]
+ pub fn dequeue_many(&mut self, size: usize) -> &mut [T] {
+ self.dequeue_many_with(|buf| {
+ let size = cmp::min(size, buf.len());
+ (size, &mut buf[..size])
+ })
+ .1
+ }
+
+ /// Dequeue as many elements from the buffer into the given slice as possible,
+ /// and return the amount of elements that could fit.
+ #[must_use]
+ pub fn dequeue_slice(&mut self, data: &mut [T]) -> usize
+ where
+ T: Copy,
+ {
+ let (size_1, data) = self.dequeue_many_with(|buf| {
+ let size = cmp::min(buf.len(), data.len());
+ data[..size].copy_from_slice(&buf[..size]);
+ (size, &mut data[size..])
+ });
+ let (size_2, ()) = self.dequeue_many_with(|buf| {
+ let size = cmp::min(buf.len(), data.len());
+ data[..size].copy_from_slice(&buf[..size]);
+ (size, ())
+ });
+ size_1 + size_2
+ }
+}
+
+/// This is the "random access" ring buffer interface: it operates with element slices,
+/// and allows to access elements of the buffer that are not adjacent to its head or tail.
+impl<'a, T: 'a> RingBuffer<'a, T> {
+ /// Return the largest contiguous slice of unallocated buffer elements starting
+ /// at the given offset past the last allocated element, and up to the given size.
+ #[must_use]
+ pub fn get_unallocated(&mut self, offset: usize, mut size: usize) -> &mut [T] {
+ let start_at = self.get_idx(self.length + offset);
+ // We can't access past the end of unallocated data.
+ if offset > self.window() {
+ return &mut [];
+ }
+ // We can't enqueue more than there is free space.
+ let clamped_window = self.window() - offset;
+ if size > clamped_window {
+ size = clamped_window
+ }
+ // We can't contiguously enqueue past the end of the storage.
+ let until_end = self.capacity() - start_at;
+ if size > until_end {
+ size = until_end
+ }
+
+ &mut self.storage[start_at..start_at + size]
+ }
+
+ /// Write as many elements from the given slice into unallocated buffer elements
+ /// starting at the given offset past the last allocated element, and return
+ /// the amount written.
+ #[must_use]
+ pub fn write_unallocated(&mut self, offset: usize, data: &[T]) -> usize
+ where
+ T: Copy,
+ {
+ let (size_1, offset, data) = {
+ let slice = self.get_unallocated(offset, data.len());
+ let slice_len = slice.len();
+ slice.copy_from_slice(&data[..slice_len]);
+ (slice_len, offset + slice_len, &data[slice_len..])
+ };
+ let size_2 = {
+ let slice = self.get_unallocated(offset, data.len());
+ let slice_len = slice.len();
+ slice.copy_from_slice(&data[..slice_len]);
+ slice_len
+ };
+ size_1 + size_2
+ }
+
+ /// Enqueue the given number of unallocated buffer elements.
+ ///
+ /// # Panics
+ /// Panics if the number of elements given exceeds the number of unallocated elements.
+ pub fn enqueue_unallocated(&mut self, count: usize) {
+ assert!(count <= self.window());
+ self.length += count;
+ }
+
+ /// Return the largest contiguous slice of allocated buffer elements starting
+ /// at the given offset past the first allocated element, and up to the given size.
+ #[must_use]
+ pub fn get_allocated(&self, offset: usize, mut size: usize) -> &[T] {
+ let start_at = self.get_idx(offset);
+ // We can't read past the end of the allocated data.
+ if offset > self.length {
+ return &mut [];
+ }
+ // We can't read more than we have allocated.
+ let clamped_length = self.length - offset;
+ if size > clamped_length {
+ size = clamped_length
+ }
+ // We can't contiguously dequeue past the end of the storage.
+ let until_end = self.capacity() - start_at;
+ if size > until_end {
+ size = until_end
+ }
+
+ &self.storage[start_at..start_at + size]
+ }
+
+ /// Read as many elements from allocated buffer elements into the given slice
+ /// starting at the given offset past the first allocated element, and return
+ /// the amount read.
+ #[must_use]
+ pub fn read_allocated(&mut self, offset: usize, data: &mut [T]) -> usize
+ where
+ T: Copy,
+ {
+ let (size_1, offset, data) = {
+ let slice = self.get_allocated(offset, data.len());
+ data[..slice.len()].copy_from_slice(slice);
+ (slice.len(), offset + slice.len(), &mut data[slice.len()..])
+ };
+ let size_2 = {
+ let slice = self.get_allocated(offset, data.len());
+ data[..slice.len()].copy_from_slice(slice);
+ slice.len()
+ };
+ size_1 + size_2
+ }
+
+ /// Dequeue the given number of allocated buffer elements.
+ ///
+ /// # Panics
+ /// Panics if the number of elements given exceeds the number of allocated elements.
+ pub fn dequeue_allocated(&mut self, count: usize) {
+ assert!(count <= self.len());
+ self.length -= count;
+ self.read_at = self.get_idx(count);
+ }
+}
+
+impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
+ fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
+ RingBuffer::new(slice)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_buffer_length_changes() {
+ let mut ring = RingBuffer::new(vec![0; 2]);
+ assert!(ring.is_empty());
+ assert!(!ring.is_full());
+ assert_eq!(ring.len(), 0);
+ assert_eq!(ring.capacity(), 2);
+ assert_eq!(ring.window(), 2);
+
+ ring.length = 1;
+ assert!(!ring.is_empty());
+ assert!(!ring.is_full());
+ assert_eq!(ring.len(), 1);
+ assert_eq!(ring.capacity(), 2);
+ assert_eq!(ring.window(), 1);
+
+ ring.length = 2;
+ assert!(!ring.is_empty());
+ assert!(ring.is_full());
+ assert_eq!(ring.len(), 2);
+ assert_eq!(ring.capacity(), 2);
+ assert_eq!(ring.window(), 0);
+ }
+
+ #[test]
+ fn test_buffer_enqueue_dequeue_one_with() {
+ let mut ring = RingBuffer::new(vec![0; 5]);
+ assert_eq!(
+ ring.dequeue_one_with(|_| -> Result::<(), ()> { unreachable!() }),
+ Err(Empty)
+ );
+
+ ring.enqueue_one_with(Ok::<_, ()>).unwrap().unwrap();
+ assert!(!ring.is_empty());
+ assert!(!ring.is_full());
+
+ for i in 1..5 {
+ ring.enqueue_one_with(|e| Ok::<_, ()>(*e = i))
+ .unwrap()
+ .unwrap();
+ assert!(!ring.is_empty());
+ }
+ assert!(ring.is_full());
+ assert_eq!(
+ ring.enqueue_one_with(|_| -> Result::<(), ()> { unreachable!() }),
+ Err(Full)
+ );
+
+ for i in 0..5 {
+ assert_eq!(
+ ring.dequeue_one_with(|e| Ok::<_, ()>(*e)).unwrap().unwrap(),
+ i
+ );
+ assert!(!ring.is_full());
+ }
+ assert_eq!(
+ ring.dequeue_one_with(|_| -> Result::<(), ()> { unreachable!() }),
+ Err(Empty)
+ );
+ assert!(ring.is_empty());
+ }
+
+ #[test]
+ fn test_buffer_enqueue_dequeue_one() {
+ let mut ring = RingBuffer::new(vec![0; 5]);
+ assert_eq!(ring.dequeue_one(), Err(Empty));
+
+ ring.enqueue_one().unwrap();
+ assert!(!ring.is_empty());
+ assert!(!ring.is_full());
+
+ for i in 1..5 {
+ *ring.enqueue_one().unwrap() = i;
+ assert!(!ring.is_empty());
+ }
+ assert!(ring.is_full());
+ assert_eq!(ring.enqueue_one(), Err(Full));
+
+ for i in 0..5 {
+ assert_eq!(*ring.dequeue_one().unwrap(), i);
+ assert!(!ring.is_full());
+ }
+ assert_eq!(ring.dequeue_one(), Err(Empty));
+ assert!(ring.is_empty());
+ }
+
+ #[test]
+ fn test_buffer_enqueue_many_with() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+ assert_eq!(
+ ring.enqueue_many_with(|buf| {
+ assert_eq!(buf.len(), 12);
+ buf[0..2].copy_from_slice(b"ab");
+ (2, true)
+ }),
+ (2, true)
+ );
+ assert_eq!(ring.len(), 2);
+ assert_eq!(&ring.storage[..], b"ab..........");
+
+ ring.enqueue_many_with(|buf| {
+ assert_eq!(buf.len(), 12 - 2);
+ buf[0..4].copy_from_slice(b"cdXX");
+ (2, ())
+ });
+ assert_eq!(ring.len(), 4);
+ assert_eq!(&ring.storage[..], b"abcdXX......");
+
+ ring.enqueue_many_with(|buf| {
+ assert_eq!(buf.len(), 12 - 4);
+ buf[0..4].copy_from_slice(b"efgh");
+ (4, ())
+ });
+ assert_eq!(ring.len(), 8);
+ assert_eq!(&ring.storage[..], b"abcdefgh....");
+
+ for _ in 0..4 {
+ *ring.dequeue_one().unwrap() = b'.';
+ }
+ assert_eq!(ring.len(), 4);
+ assert_eq!(&ring.storage[..], b"....efgh....");
+
+ ring.enqueue_many_with(|buf| {
+ assert_eq!(buf.len(), 12 - 8);
+ buf[0..4].copy_from_slice(b"ijkl");
+ (4, ())
+ });
+ assert_eq!(ring.len(), 8);
+ assert_eq!(&ring.storage[..], b"....efghijkl");
+
+ ring.enqueue_many_with(|buf| {
+ assert_eq!(buf.len(), 4);
+ buf[0..4].copy_from_slice(b"abcd");
+ (4, ())
+ });
+ assert_eq!(ring.len(), 12);
+ assert_eq!(&ring.storage[..], b"abcdefghijkl");
+
+ for _ in 0..4 {
+ *ring.dequeue_one().unwrap() = b'.';
+ }
+ assert_eq!(ring.len(), 8);
+ assert_eq!(&ring.storage[..], b"abcd....ijkl");
+ }
+
+ #[test]
+ fn test_buffer_enqueue_many() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+ ring.enqueue_many(8).copy_from_slice(b"abcdefgh");
+ assert_eq!(ring.len(), 8);
+ assert_eq!(&ring.storage[..], b"abcdefgh....");
+
+ ring.enqueue_many(8).copy_from_slice(b"ijkl");
+ assert_eq!(ring.len(), 12);
+ assert_eq!(&ring.storage[..], b"abcdefghijkl");
+ }
+
+ #[test]
+ fn test_buffer_enqueue_slice() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+ assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8);
+ assert_eq!(ring.len(), 8);
+ assert_eq!(&ring.storage[..], b"abcdefgh....");
+
+ for _ in 0..4 {
+ *ring.dequeue_one().unwrap() = b'.';
+ }
+ assert_eq!(ring.len(), 4);
+ assert_eq!(&ring.storage[..], b"....efgh....");
+
+ assert_eq!(ring.enqueue_slice(b"ijklabcd"), 8);
+ assert_eq!(ring.len(), 12);
+ assert_eq!(&ring.storage[..], b"abcdefghijkl");
+ }
+
+ #[test]
+ fn test_buffer_dequeue_many_with() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+ assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
+
+ assert_eq!(
+ ring.dequeue_many_with(|buf| {
+ assert_eq!(buf.len(), 12);
+ assert_eq!(buf, b"abcdefghijkl");
+ buf[..4].copy_from_slice(b"....");
+ (4, true)
+ }),
+ (4, true)
+ );
+ assert_eq!(ring.len(), 8);
+ assert_eq!(&ring.storage[..], b"....efghijkl");
+
+ ring.dequeue_many_with(|buf| {
+ assert_eq!(buf, b"efghijkl");
+ buf[..4].copy_from_slice(b"....");
+ (4, ())
+ });
+ assert_eq!(ring.len(), 4);
+ assert_eq!(&ring.storage[..], b"........ijkl");
+
+ assert_eq!(ring.enqueue_slice(b"abcd"), 4);
+ assert_eq!(ring.len(), 8);
+
+ ring.dequeue_many_with(|buf| {
+ assert_eq!(buf, b"ijkl");
+ buf[..4].copy_from_slice(b"....");
+ (4, ())
+ });
+ ring.dequeue_many_with(|buf| {
+ assert_eq!(buf, b"abcd");
+ buf[..4].copy_from_slice(b"....");
+ (4, ())
+ });
+ assert_eq!(ring.len(), 0);
+ assert_eq!(&ring.storage[..], b"............");
+ }
+
+ #[test]
+ fn test_buffer_dequeue_many() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+ assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
+
+ {
+ let buf = ring.dequeue_many(8);
+ assert_eq!(buf, b"abcdefgh");
+ buf.copy_from_slice(b"........");
+ }
+ assert_eq!(ring.len(), 4);
+ assert_eq!(&ring.storage[..], b"........ijkl");
+
+ {
+ let buf = ring.dequeue_many(8);
+ assert_eq!(buf, b"ijkl");
+ buf.copy_from_slice(b"....");
+ }
+ assert_eq!(ring.len(), 0);
+ assert_eq!(&ring.storage[..], b"............");
+ }
+
+ #[test]
+ fn test_buffer_dequeue_slice() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+ assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
+
+ {
+ let mut buf = [0; 8];
+ assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
+ assert_eq!(&buf[..], b"abcdefgh");
+ assert_eq!(ring.len(), 4);
+ }
+
+ assert_eq!(ring.enqueue_slice(b"abcd"), 4);
+
+ {
+ let mut buf = [0; 8];
+ assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
+ assert_eq!(&buf[..], b"ijklabcd");
+ assert_eq!(ring.len(), 0);
+ }
+ }
+
+ #[test]
+ fn test_buffer_get_unallocated() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+ assert_eq!(ring.get_unallocated(16, 4), b"");
+
+ {
+ let buf = ring.get_unallocated(0, 4);
+ buf.copy_from_slice(b"abcd");
+ }
+ assert_eq!(&ring.storage[..], b"abcd........");
+
+ let buf_enqueued = ring.enqueue_many(4);
+ assert_eq!(buf_enqueued.len(), 4);
+ assert_eq!(ring.len(), 4);
+
+ {
+ let buf = ring.get_unallocated(4, 8);
+ buf.copy_from_slice(b"ijkl");
+ }
+ assert_eq!(&ring.storage[..], b"abcd....ijkl");
+
+ ring.enqueue_many(8).copy_from_slice(b"EFGHIJKL");
+ ring.dequeue_many(4).copy_from_slice(b"abcd");
+ assert_eq!(ring.len(), 8);
+ assert_eq!(&ring.storage[..], b"abcdEFGHIJKL");
+
+ {
+ let buf = ring.get_unallocated(0, 8);
+ buf.copy_from_slice(b"ABCD");
+ }
+ assert_eq!(&ring.storage[..], b"ABCDEFGHIJKL");
+ }
+
+ #[test]
+ fn test_buffer_write_unallocated() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+ ring.enqueue_many(6).copy_from_slice(b"abcdef");
+ ring.dequeue_many(6).copy_from_slice(b"ABCDEF");
+
+ assert_eq!(ring.write_unallocated(0, b"ghi"), 3);
+ assert_eq!(ring.get_unallocated(0, 3), b"ghi");
+
+ assert_eq!(ring.write_unallocated(3, b"jklmno"), 6);
+ assert_eq!(ring.get_unallocated(3, 3), b"jkl");
+
+ assert_eq!(ring.write_unallocated(9, b"pqrstu"), 3);
+ assert_eq!(ring.get_unallocated(9, 3), b"pqr");
+ }
+
+ #[test]
+ fn test_buffer_get_allocated() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+ assert_eq!(ring.get_allocated(16, 4), b"");
+ assert_eq!(ring.get_allocated(0, 4), b"");
+
+ let len_enqueued = ring.enqueue_slice(b"abcd");
+ assert_eq!(ring.get_allocated(0, 8), b"abcd");
+ assert_eq!(len_enqueued, 4);
+
+ let len_enqueued = ring.enqueue_slice(b"efghijkl");
+ ring.dequeue_many(4).copy_from_slice(b"....");
+ assert_eq!(ring.get_allocated(4, 8), b"ijkl");
+ assert_eq!(len_enqueued, 8);
+
+ let len_enqueued = ring.enqueue_slice(b"abcd");
+ assert_eq!(ring.get_allocated(4, 8), b"ijkl");
+ assert_eq!(len_enqueued, 4);
+ }
+
+ #[test]
+ fn test_buffer_read_allocated() {
+ let mut ring = RingBuffer::new(vec![b'.'; 12]);
+ ring.enqueue_many(12).copy_from_slice(b"abcdefghijkl");
+
+ let mut data = [0; 6];
+ assert_eq!(ring.read_allocated(0, &mut data[..]), 6);
+ assert_eq!(&data[..], b"abcdef");
+
+ ring.dequeue_many(6).copy_from_slice(b"ABCDEF");
+ ring.enqueue_many(3).copy_from_slice(b"mno");
+
+ let mut data = [0; 6];
+ assert_eq!(ring.read_allocated(3, &mut data[..]), 6);
+ assert_eq!(&data[..], b"jklmno");
+
+ let mut data = [0; 6];
+ assert_eq!(ring.read_allocated(6, &mut data[..]), 3);
+ assert_eq!(&data[..], b"mno\x00\x00\x00");
+ }
+
+ #[test]
+ fn test_buffer_with_no_capacity() {
+ let mut no_capacity: RingBuffer<u8> = RingBuffer::new(vec![]);
+
+ // Call all functions that calculate the remainder against rx_buffer.capacity()
+ // with a backing storage with a length of 0.
+ assert_eq!(no_capacity.get_unallocated(0, 0), &[]);
+ assert_eq!(no_capacity.get_allocated(0, 0), &[]);
+ no_capacity.dequeue_allocated(0);
+ assert_eq!(no_capacity.enqueue_many(0), &[]);
+ assert_eq!(no_capacity.enqueue_one(), Err(Full));
+ assert_eq!(no_capacity.contiguous_window(), 0);
+ }
+
+ /// Use the buffer a bit. Then empty it and put in an item of
+ /// maximum size. By detecting a length of 0, the implementation
+ /// can reset the current buffer position.
+ #[test]
+ fn test_buffer_write_wholly() {
+ let mut ring = RingBuffer::new(vec![b'.'; 8]);
+ ring.enqueue_many(2).copy_from_slice(b"ab");
+ ring.enqueue_many(2).copy_from_slice(b"cd");
+ assert_eq!(ring.len(), 4);
+ let buf_dequeued = ring.dequeue_many(4);
+ assert_eq!(buf_dequeued, b"abcd");
+ assert_eq!(ring.len(), 0);
+
+ let large = ring.enqueue_many(8);
+ assert_eq!(large.len(), 8);
+ }
+}