// Copyright (C) 2018-2019, Cloudflare, Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // * Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::cmp; use std::collections::hash_map; use std::collections::BTreeMap; use std::collections::BinaryHeap; use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; use crate::Error; use crate::Result; use crate::ranges; const DEFAULT_URGENCY: u8 = 127; /// Keeps track of QUIC streams and enforces stream limits. #[derive(Default)] pub struct StreamMap { /// Map of streams indexed by stream ID. streams: HashMap, /// Set of streams that were completed and garbage collected. /// /// Instead of keeping the full stream state forever, we collect completed /// streams to save memory, but we still need to keep track of previously /// created streams, to prevent peers from re-creating them. collected: HashSet, /// Peer's maximum bidirectional stream count limit. peer_max_streams_bidi: u64, /// Peer's maximum unidirectional stream count limit. peer_max_streams_uni: u64, /// The total number of bidirectional streams opened by the peer. peer_opened_streams_bidi: u64, /// The total number of unidirectional streams opened by the peer. peer_opened_streams_uni: u64, /// Local maximum bidirectional stream count limit. local_max_streams_bidi: u64, local_max_streams_bidi_next: u64, /// Local maximum unidirectional stream count limit. local_max_streams_uni: u64, local_max_streams_uni_next: u64, /// The total number of bidirectional streams opened by the local endpoint. local_opened_streams_bidi: u64, /// The total number of unidirectional streams opened by the local endpoint. local_opened_streams_uni: u64, /// Queue of stream IDs corresponding to streams that have buffered data /// ready to be sent to the peer. This also implies that the stream has /// enough flow control credits to send at least some of that data. /// /// Streams are grouped by their priority, where each urgency level has two /// queues, one for non-incremental streams and one for incremental ones. /// /// Streams with lower urgency level are scheduled first, and within the /// same urgency level Non-incremental streams are scheduled first, in the /// order of their stream IDs, and incremental streams are scheduled in a /// round-robin fashion after all non-incremental streams have been flushed. flushable: BTreeMap>, VecDeque)>, /// Set of stream IDs corresponding to streams that have outstanding data /// to read. This is used to generate a `StreamIter` of streams without /// having to iterate over the full list of streams. readable: HashSet, /// Set of stream IDs corresponding to streams that have enough flow control /// capacity to be written to, and is not finished. This is used to generate /// a `StreamIter` of streams without having to iterate over the full list /// of streams. writable: HashSet, /// Set of stream IDs corresponding to streams that are almost out of flow /// control credit and need to send MAX_STREAM_DATA. This is used to /// generate a `StreamIter` of streams without having to iterate over the /// full list of streams. almost_full: HashSet, /// Set of stream IDs corresponding to streams that are blocked. The value /// of the map elements represents the offset of the stream at which the /// blocking occurred. blocked: HashMap, } impl StreamMap { pub fn new(max_streams_bidi: u64, max_streams_uni: u64) -> StreamMap { StreamMap { local_max_streams_bidi: max_streams_bidi, local_max_streams_bidi_next: max_streams_bidi, local_max_streams_uni: max_streams_uni, local_max_streams_uni_next: max_streams_uni, ..StreamMap::default() } } /// Returns the stream with the given ID if it exists. pub fn get(&self, id: u64) -> Option<&Stream> { self.streams.get(&id) } /// Returns the mutable stream with the given ID if it exists. pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> { self.streams.get_mut(&id) } /// Returns the mutable stream with the given ID if it exists, or creates /// a new one otherwise. /// /// The `local` parameter indicates whether the stream's creation was /// requested by the local application rather than the peer, and is /// used to validate the requested stream ID, and to select the initial /// flow control values from the local and remote transport parameters /// (also passed as arguments). /// /// This also takes care of enforcing both local and the peer's stream /// count limits. If one of these limits is violated, the `StreamLimit` /// error is returned. pub(crate) fn get_or_create( &mut self, id: u64, local_params: &crate::TransportParams, peer_params: &crate::TransportParams, local: bool, is_server: bool, ) -> Result<&mut Stream> { let stream = match self.streams.entry(id) { hash_map::Entry::Vacant(v) => { // Stream has already been closed and garbage collected. if self.collected.contains(&id) { return Err(Error::Done); } if local != is_local(id, is_server) { return Err(Error::InvalidStreamState); } let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) { // Locally-initiated bidirectional stream. (true, true) => ( local_params.initial_max_stream_data_bidi_local, peer_params.initial_max_stream_data_bidi_remote, ), // Locally-initiated unidirectional stream. (true, false) => (0, peer_params.initial_max_stream_data_uni), // Remotely-initiated bidirectional stream. (false, true) => ( local_params.initial_max_stream_data_bidi_remote, peer_params.initial_max_stream_data_bidi_local, ), // Remotely-initiated unidirectional stream. (false, false) => (local_params.initial_max_stream_data_uni, 0), }; // Enforce stream count limits. match (is_local(id, is_server), is_bidi(id)) { (true, true) => { if self.local_opened_streams_bidi >= self.peer_max_streams_bidi { return Err(Error::StreamLimit); } self.local_opened_streams_bidi += 1; }, (true, false) => { if self.local_opened_streams_uni >= self.peer_max_streams_uni { return Err(Error::StreamLimit); } self.local_opened_streams_uni += 1; }, (false, true) => { if self.peer_opened_streams_bidi >= self.local_max_streams_bidi { return Err(Error::StreamLimit); } self.peer_opened_streams_bidi += 1; }, (false, false) => { if self.peer_opened_streams_uni >= self.local_max_streams_uni { return Err(Error::StreamLimit); } self.peer_opened_streams_uni += 1; }, }; let s = Stream::new(max_rx_data, max_tx_data, is_bidi(id), local); v.insert(s) }, hash_map::Entry::Occupied(v) => v.into_mut(), }; // Stream might already be writable due to initial flow control limits. if stream.is_writable() { self.writable.insert(id); } Ok(stream) } /// Pushes the stream ID to the back of the flushable streams queue with /// the specified urgency. /// /// Note that the caller is responsible for checking that the specified /// stream ID was not in the queue already before calling this. /// /// Queueing a stream multiple times simultaneously means that it might be /// unfairly scheduled more often than other streams, and might also cause /// spurious cycles through the queue, so it should be avoided. pub fn push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool) { // Push the element to the back of the queue corresponding to the given // urgency. If the queue doesn't exist yet, create it first. let queues = self .flushable .entry(urgency) .or_insert_with(|| (BinaryHeap::new(), VecDeque::new())); if !incr { // Non-incremental streams are scheduled in order of their stream ID. queues.0.push(std::cmp::Reverse(stream_id)) } else { // Incremental streams are scheduled in a round-robin fashion. queues.1.push_back(stream_id) }; } /// Removes and returns the first stream ID from the flushable streams /// queue with the specified urgency. /// /// Note that if the stream is still flushable after sending some of its /// outstanding data, it needs to be added back to the queue. pub fn pop_flushable(&mut self) -> Option { // Remove the first element from the queue corresponding to the lowest // urgency that has elements. let (node, clear) = if let Some((urgency, queues)) = self.flushable.iter_mut().next() { let node = if !queues.0.is_empty() { queues.0.pop().map(|x| x.0) } else { queues.1.pop_front() }; let clear = if queues.0.is_empty() && queues.1.is_empty() { Some(*urgency) } else { None }; (node, clear) } else { (None, None) }; // Remove the queue from the list of queues if it is now empty, so that // the next time `pop_flushable()` is called the next queue with elements // is used. if let Some(urgency) = &clear { self.flushable.remove(urgency); } node } /// Adds or removes the stream ID to/from the readable streams set. /// /// If the stream was already in the list, this does nothing. pub fn mark_readable(&mut self, stream_id: u64, readable: bool) { if readable { self.readable.insert(stream_id); } else { self.readable.remove(&stream_id); } } /// Adds or removes the stream ID to/from the writable streams set. /// /// This should also be called anytime a new stream is created, in addition /// to when an existing stream becomes writable (or stops being writable). /// /// If the stream was already in the list, this does nothing. pub fn mark_writable(&mut self, stream_id: u64, writable: bool) { if writable { self.writable.insert(stream_id); } else { self.writable.remove(&stream_id); } } /// Adds or removes the stream ID to/from the almost full streams set. /// /// If the stream was already in the list, this does nothing. pub fn mark_almost_full(&mut self, stream_id: u64, almost_full: bool) { if almost_full { self.almost_full.insert(stream_id); } else { self.almost_full.remove(&stream_id); } } /// Adds or removes the stream ID to/from the blocked streams set with the /// given offset value. /// /// If the stream was already in the list, this does nothing. pub fn mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64) { if blocked { self.blocked.insert(stream_id, off); } else { self.blocked.remove(&stream_id); } } /// Updates the peer's maximum bidirectional stream count limit. pub fn update_peer_max_streams_bidi(&mut self, v: u64) { self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v); } /// Updates the peer's maximum unidirectional stream count limit. pub fn update_peer_max_streams_uni(&mut self, v: u64) { self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v); } /// Commits the new max_streams_bidi limit. pub fn update_max_streams_bidi(&mut self) { self.local_max_streams_bidi = self.local_max_streams_bidi_next; } /// Returns the new max_streams_bidi limit. pub fn max_streams_bidi_next(&mut self) -> u64 { self.local_max_streams_bidi_next } /// Commits the new max_streams_uni limit. pub fn update_max_streams_uni(&mut self) { self.local_max_streams_uni = self.local_max_streams_uni_next; } /// Returns the new max_streams_uni limit. pub fn max_streams_uni_next(&mut self) -> u64 { self.local_max_streams_uni_next } /// Drops completed stream. /// /// This should only be called when Stream::is_complete() returns true for /// the given stream. pub fn collect(&mut self, stream_id: u64, local: bool) { if !local { // If the stream was created by the peer, give back a max streams // credit. if is_bidi(stream_id) { self.local_max_streams_bidi_next = self.local_max_streams_bidi_next.saturating_add(1); } else { self.local_max_streams_uni_next = self.local_max_streams_uni_next.saturating_add(1); } } self.streams.remove(&stream_id); self.collected.insert(stream_id); } /// Creates an iterator over streams that have outstanding data to read. pub fn readable(&self) -> StreamIter { StreamIter::from(&self.readable) } /// Creates an iterator over streams that can be written to. pub fn writable(&self) -> StreamIter { StreamIter::from(&self.writable) } /// Creates an iterator over streams that need to send MAX_STREAM_DATA. pub fn almost_full(&self) -> StreamIter { StreamIter::from(&self.almost_full) } /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED. pub fn blocked(&self) -> hash_map::Iter { self.blocked.iter() } /// Returns true if there are any streams that have data to write. pub fn has_flushable(&self) -> bool { !self.flushable.is_empty() } /// Returns true if there are any streams that need to update the local /// flow control limit. pub fn has_almost_full(&self) -> bool { !self.almost_full.is_empty() } /// Returns true if there are any streams that are blocked. pub fn has_blocked(&self) -> bool { !self.blocked.is_empty() } /// Returns true if the max bidirectional streams count needs to be updated /// by sending a MAX_STREAMS frame to the peer. pub fn should_update_max_streams_bidi(&self) -> bool { self.local_max_streams_bidi_next != self.local_max_streams_bidi && self.local_max_streams_bidi_next / 2 > self.local_max_streams_bidi - self.peer_opened_streams_bidi } /// Returns true if the max unidirectional streams count needs to be updated /// by sending a MAX_STREAMS frame to the peer. pub fn should_update_max_streams_uni(&self) -> bool { self.local_max_streams_uni_next != self.local_max_streams_uni && self.local_max_streams_uni_next / 2 > self.local_max_streams_uni - self.peer_opened_streams_uni } /// Returns the number of active streams in the map. #[cfg(test)] pub fn len(&self) -> usize { self.streams.len() } } /// A QUIC stream. #[derive(Default)] pub struct Stream { /// Receive-side stream buffer. pub recv: RecvBuf, /// Send-side stream buffer. pub send: SendBuf, /// Whether the stream is bidirectional. pub bidi: bool, /// Whether the stream was created by the local endpoint. pub local: bool, /// Application data. pub data: Option>, /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`. pub urgency: u8, /// Whether the stream can be flushed incrementally. Default is `true`. pub incremental: bool, } impl Stream { /// Creates a new stream with the given flow control limits. pub fn new( max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool, ) -> Stream { Stream { recv: RecvBuf::new(max_rx_data), send: SendBuf::new(max_tx_data), bidi, local, data: None, urgency: DEFAULT_URGENCY, incremental: true, } } /// Returns true if the stream has data to read. pub fn is_readable(&self) -> bool { self.recv.ready() } /// Returns true if the stream has enough flow control capacity to be /// written to, and is not finished. pub fn is_writable(&self) -> bool { !self.send.shutdown && !self.send.is_fin() && self.send.off < self.send.max_data } /// Returns true if the stream has data to send and is allowed to send at /// least some of it. pub fn is_flushable(&self) -> bool { self.send.ready() && self.send.off_front() < self.send.max_data } /// Returns true if the stream is complete. /// /// For bidirectional streams this happens when both the receive and send /// sides are complete. That is when all incoming data has been read by the /// application, and when all outgoing data has been acked by the peer. /// /// For unidirectional streams this happens when either the receive or send /// side is complete, depending on whether the stream was created locally /// or not. pub fn is_complete(&self) -> bool { match (self.bidi, self.local) { // For bidirectional streams we need to check both receive and send // sides for completion. (true, _) => self.recv.is_fin() && self.send.is_complete(), // For unidirectional streams generated locally, we only need to // check the send side for completion. (false, true) => self.send.is_complete(), // For unidirectional streams generated by the peer, we only need // to check the receive side for completion. (false, false) => self.recv.is_fin(), } } } /// Returns true if the stream was created locally. pub fn is_local(stream_id: u64, is_server: bool) -> bool { (stream_id & 0x1) == (is_server as u64) } /// Returns true if the stream is bidirectional. pub fn is_bidi(stream_id: u64) -> bool { (stream_id & 0x2) == 0 } /// An iterator over QUIC streams. #[derive(Default)] pub struct StreamIter { streams: Vec, } impl StreamIter { fn from(streams: &HashSet) -> Self { StreamIter { streams: streams.iter().copied().collect(), } } } impl Iterator for StreamIter { type Item = u64; fn next(&mut self) -> Option { self.streams.pop() } } impl ExactSizeIterator for StreamIter { fn len(&self) -> usize { self.streams.len() } } /// Receive-side stream buffer. /// /// Stream data received by the peer is buffered in a list of data chunks /// ordered by offset in ascending order. Contiguous data can then be read /// into a slice. #[derive(Debug, Default)] pub struct RecvBuf { /// Chunks of data received from the peer that have not yet been read by /// the application, ordered by offset. data: BinaryHeap, /// The lowest data offset that has yet to be read by the application. off: u64, /// The total length of data received on this stream. len: u64, /// The maximum offset the peer is allowed to send us. max_data: u64, /// The updated maximum offset the peer is allowed to send us. max_data_next: u64, /// The final stream offset received from the peer, if any. fin_off: Option, /// Whether incoming data is validated but not buffered. drain: bool, } impl RecvBuf { /// Creates a new receive buffer. fn new(max_data: u64) -> RecvBuf { RecvBuf { max_data, max_data_next: max_data, ..RecvBuf::default() } } /// Inserts the given chunk of data in the buffer. /// /// This also takes care of enforcing stream flow control limits, as well /// as handling incoming data that overlaps data that is already in the /// buffer. pub fn push(&mut self, buf: RangeBuf) -> Result<()> { if buf.max_off() > self.max_data { return Err(Error::FlowControl); } if let Some(fin_off) = self.fin_off { // Stream's size is known, forbid data beyond that point. if buf.max_off() > fin_off { return Err(Error::FinalSize); } // Stream's size is already known, forbid changing it. if buf.fin() && fin_off != buf.max_off() { return Err(Error::FinalSize); } } // Stream's known size is lower than data already received. if buf.fin() && buf.max_off() < self.len { return Err(Error::FinalSize); } // We already saved the final offset, so there's nothing else we // need to keep from the RangeBuf if it's empty. if self.fin_off.is_some() && buf.is_empty() { return Ok(()); } // No need to process an empty buffer with the fin flag, if we already // know the final size. if buf.fin() && buf.is_empty() && self.fin_off.is_some() { return Ok(()); } if buf.fin() { self.fin_off = Some(buf.max_off()); } // No need to store empty buffer that doesn't carry the fin flag. if !buf.fin() && buf.is_empty() { return Ok(()); } // Check if data is fully duplicate, that is the buffer's max offset is // lower or equal to the offset already stored in the recv buffer. if self.off >= buf.max_off() { // An exception is applied to empty range buffers, because an empty // buffer's max offset matches the max offset of the recv buffer. // // By this point all spurious empty buffers should have already been // discarded, so allowing empty buffers here should be safe. if !buf.is_empty() { return Ok(()); } } if self.drain { return Ok(()); } let mut tmp_buf = Some(buf); while let Some(mut buf) = tmp_buf { tmp_buf = None; // Discard incoming data below current stream offset. Bytes up to // `self.off` have already been received so we should not buffer // them again. This is also important to make sure `ready()` doesn't // get stuck when a buffer with lower offset than the stream's is // buffered. if self.off > buf.off() { buf = buf.split_off((self.off - buf.off()) as usize); } for b in &self.data { // New buffer is fully contained in existing buffer. if buf.off() >= b.off() && buf.max_off() <= b.max_off() { return Ok(()); } // New buffer's start overlaps existing buffer. if buf.off() >= b.off() && buf.off() < b.max_off() { buf = buf.split_off((b.max_off() - buf.off()) as usize); } // New buffer's end overlaps existing buffer. if buf.off() < b.off() && buf.max_off() > b.off() { tmp_buf = Some(buf.split_off((b.off() - buf.off()) as usize)); } } self.len = cmp::max(self.len, buf.max_off()); self.data.push(buf); } Ok(()) } /// Writes data from the receive buffer into the given output buffer. /// /// Only contiguous data is written to the output buffer, starting from /// offset 0. The offset is incremented as data is read out of the receive /// buffer into the application buffer. If there is no data at the expected /// read offset, the `Done` error is returned. /// /// On success the amount of data read, and a flag indicating if there is /// no more data in the buffer, are returned as a tuple. pub fn pop(&mut self, out: &mut [u8]) -> Result<(usize, bool)> { let mut len = 0; let mut cap = out.len(); if !self.ready() { return Err(Error::Done); } while cap > 0 && self.ready() { let mut buf = match self.data.peek_mut() { Some(v) => v, None => break, }; let buf_len = cmp::min(buf.len(), cap); out[len..len + buf_len].copy_from_slice(&buf[..buf_len]); self.off += buf_len as u64; len += buf_len; cap -= buf_len; if buf_len < buf.len() { buf.consume(buf_len); // We reached the maximum capacity, so end here. break; } std::collections::binary_heap::PeekMut::pop(buf); } self.max_data_next = self.max_data_next.saturating_add(len as u64); Ok((len, self.is_fin())) } /// Resets the stream at the given offset. pub fn reset(&mut self, final_size: u64) -> Result { // Stream's size is already known, forbid changing it. if let Some(fin_off) = self.fin_off { if fin_off != final_size { return Err(Error::FinalSize); } } // Stream's known size is lower than data already received. if final_size < self.len { return Err(Error::FinalSize); } self.fin_off = Some(final_size); // Return how many bytes need to be removed from the connection flow // control. Ok((final_size - self.len) as usize) } /// Commits the new max_data limit. pub fn update_max_data(&mut self) { self.max_data = self.max_data_next; } /// Return the new max_data limit. pub fn max_data_next(&mut self) -> u64 { self.max_data_next } /// Shuts down receiving data. pub fn shutdown(&mut self) -> Result<()> { if self.drain { return Err(Error::Done); } self.drain = true; self.data.clear(); Ok(()) } /// Returns the lowest offset of data buffered. #[allow(dead_code)] pub fn off_front(&self) -> u64 { self.off } /// Returns true if we need to update the local flow control limit. pub fn almost_full(&self) -> bool { // Send MAX_STREAM_DATA when the new limit is at least double the // amount of data that can be received before blocking. self.fin_off.is_none() && self.max_data_next != self.max_data && self.max_data_next / 2 > self.max_data - self.len } /// Returns the largest offset ever received. pub fn max_off(&self) -> u64 { self.len } /// Returns true if the receive-side of the stream is complete. /// /// This happens when the stream's receive final size is known, and the /// application has read all data from the stream. pub fn is_fin(&self) -> bool { if self.fin_off == Some(self.off) { return true; } false } /// Returns true if the stream has data to be read. fn ready(&self) -> bool { let buf = match self.data.peek() { Some(v) => v, None => return false, }; buf.off() == self.off } } /// Send-side stream buffer. /// /// Stream data scheduled to be sent to the peer is buffered in a list of data /// chunks ordered by offset in ascending order. Contiguous data can then be /// read into a slice. /// /// By default, new data is appended at the end of the stream, but data can be /// inserted at the start of the buffer (this is to allow data that needs to be /// retransmitted to be re-buffered). #[derive(Debug, Default)] pub struct SendBuf { /// Chunks of data to be sent, ordered by offset. data: BinaryHeap, /// The maximum offset of data buffered in the stream. off: u64, /// The amount of data that was ever written to this stream. len: u64, /// The maximum offset we are allowed to send to the peer. max_data: u64, /// The final stream offset written to the stream, if any. fin_off: Option, /// Whether the stream's send-side has been shut down. shutdown: bool, /// Ranges of data offsets that have been acked. acked: ranges::RangeSet, } impl SendBuf { /// Creates a new send buffer. fn new(max_data: u64) -> SendBuf { SendBuf { max_data, ..SendBuf::default() } } /// Inserts the given slice of data at the end of the buffer. /// /// The number of bytes that were actually stored in the buffer is returned /// (this may be lower than the size of the input buffer, in case of partial /// writes). pub fn push_slice( &mut self, mut data: &[u8], mut fin: bool, ) -> Result { if self.shutdown { // Since we won't write any more data anyway, pretend that we sent // all data that was passed in. return Ok(data.len()); } if data.is_empty() { // Create a dummy range buffer, in order to propagate the `fin` flag // into `RangeBuf::push()`. This will be discarded later on. let buf = RangeBuf::from(&[], self.off, fin); return self.push(buf).map(|_| 0); } if data.len() > self.cap() { // Truncate the input buffer according to the stream's capacity. let len = self.cap(); data = &data[..len]; // We are not buffering the full input, so clear the fin flag. fin = false; } let buf = RangeBuf::from(data, self.off, fin); self.push(buf)?; self.off += data.len() as u64; Ok(data.len()) } /// Inserts the given chunk of data in the buffer. pub fn push(&mut self, buf: RangeBuf) -> Result<()> { if let Some(fin_off) = self.fin_off { // Can't write past final offset. if buf.max_off() > fin_off { return Err(Error::FinalSize); } // Can't "undo" final offset. if buf.max_off() == fin_off && !buf.fin() { return Err(Error::FinalSize); } } if self.shutdown { return Ok(()); } if buf.fin() { self.fin_off = Some(buf.max_off()); } // Don't queue data that was already fully acked. if self.ack_off() >= buf.max_off() { return Ok(()); } self.len += buf.len() as u64; // We already recorded the final offset, so we can just discard the // empty buffer now. if buf.is_empty() { return Ok(()); } self.data.push(buf); Ok(()) } /// Returns contiguous data from the send buffer as a single `RangeBuf`. pub fn pop(&mut self, max_data: usize) -> Result { let mut out = RangeBuf::default(); out.data = Vec::with_capacity(cmp::min(max_data as u64, self.len) as usize); out.off = self.off; let mut out_len = max_data; let mut out_off = self.data.peek().map_or_else(|| out.off, RangeBuf::off); while out_len > 0 && self.ready() && self.off_front() == out_off && self.off_front() < self.max_data { let mut buf = match self.data.peek_mut() { Some(v) => v, None => break, }; let buf_len = cmp::min(buf.len(), out_len); if out.is_empty() { out.off = buf.off(); } self.len -= buf_len as u64; out_len -= buf_len; out_off = buf.off() + buf_len as u64; out.data.extend_from_slice(&buf[..buf_len]); if buf_len < buf.len() { buf.consume(buf_len); // We reached the maximum capacity, so end here. break; } std::collections::binary_heap::PeekMut::pop(buf); } // Override the `fin` flag set for the output buffer by matching the // buffer's maximum offset against the stream's final offset (if known). // // This is more efficient than tracking `fin` using the range buffers // themselves, and lets us avoid queueing empty buffers just so we can // propagate the final size. out.fin = self.fin_off == Some(out.max_off()); Ok(out) } /// Updates the max_data limit to the given value. pub fn update_max_data(&mut self, max_data: u64) { self.max_data = cmp::max(self.max_data, max_data); } /// Increments the acked data offset. pub fn ack(&mut self, off: u64, len: usize) { self.acked.insert(off..off + len as u64); } /// Shuts down sending data. pub fn shutdown(&mut self) -> Result<()> { if self.shutdown { return Err(Error::Done); } self.shutdown = true; self.data.clear(); Ok(()) } /// Returns the largest offset of data buffered. #[allow(dead_code)] pub fn off_back(&self) -> u64 { self.off } /// Returns the lowest offset of data buffered. pub fn off_front(&self) -> u64 { match self.data.peek() { Some(v) => v.off(), None => self.off, } } /// The maximum offset we are allowed to send to the peer. pub fn max_off(&self) -> u64 { self.max_data } /// Returns true if all data in the stream has been sent. /// /// This happens when the stream's send final size is knwon, and the /// application has already written data up to that point. pub fn is_fin(&self) -> bool { if self.fin_off == Some(self.off) { return true; } false } /// Returns true if the send-side of the stream is complete. /// /// This happens when the stream's send final size is known, and the peer /// has already acked all stream data up to that point. pub fn is_complete(&self) -> bool { if let Some(fin_off) = self.fin_off { if self.acked == (0..fin_off) { return true; } } false } /// Returns true if there is data to be written. fn ready(&self) -> bool { !self.data.is_empty() } /// Returns the highest contiguously acked offset. fn ack_off(&self) -> u64 { match self.acked.iter().next() { // Only consider the initial range if it contiguously covers the // start of the stream (i.e. from offset 0). Some(std::ops::Range { start: 0, end }) => end, Some(_) | None => 0, } } /// Returns the outgoing flow control capacity. pub fn cap(&self) -> usize { (self.max_data - self.off) as usize } } /// Buffer holding data at a specific offset. #[derive(Clone, Debug, Default, Eq)] pub struct RangeBuf { /// The internal buffer holding the data. data: Vec, /// The starting offset within `data`. This allows partially consuming a /// buffer without duplicating the data. pos: usize, /// The starting offset within a stream. off: u64, /// Whether this contains the final byte in the stream. fin: bool, } impl RangeBuf { /// Creates a new `RangeBuf` from the given slice. pub(crate) fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf { RangeBuf { data: Vec::from(buf), pos: 0, off, fin, } } /// Returns whether `self` holds the final offset in the stream. pub fn fin(&self) -> bool { self.fin } /// Returns the starting offset of `self`. pub fn off(&self) -> u64 { self.off + self.pos as u64 } /// Returns the final offset of `self`. pub fn max_off(&self) -> u64 { self.off() + self.len() as u64 } /// Returns the length of `self`. pub fn len(&self) -> usize { self.data.len() - self.pos } /// Returns true if `self` has a length of zero bytes. pub fn is_empty(&self) -> bool { self.len() == 0 } /// Consumes the starting `count` bytes of `self`. pub fn consume(&mut self, count: usize) { self.pos += count; } /// Splits the buffer into two at the given index. pub fn split_off(&mut self, at: usize) -> RangeBuf { let buf = RangeBuf { data: self.data.split_off(at), pos: 0, off: self.off + at as u64, fin: self.fin, }; self.fin = false; buf } } impl std::ops::Deref for RangeBuf { type Target = [u8]; fn deref(&self) -> &[u8] { &self.data[self.pos..] } } impl std::ops::DerefMut for RangeBuf { fn deref_mut(&mut self) -> &mut [u8] { &mut self.data[self.pos..] } } impl Ord for RangeBuf { fn cmp(&self, other: &RangeBuf) -> cmp::Ordering { // Invert ordering to implement min-heap. self.off.cmp(&other.off).reverse() } } impl PartialOrd for RangeBuf { fn partial_cmp(&self, other: &RangeBuf) -> Option { Some(self.cmp(other)) } } impl PartialEq for RangeBuf { fn eq(&self, other: &RangeBuf) -> bool { self.off == other.off } } #[cfg(test)] mod tests { use super::*; #[test] fn empty_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn empty_stream_frame() { let mut recv = RecvBuf::new(15); assert_eq!(recv.len, 0); let buf = RangeBuf::from(b"hello", 0, false); assert!(recv.push(buf).is_ok()); assert_eq!(recv.len, 5); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); let mut buf = [0; 32]; assert_eq!(recv.pop(&mut buf), Ok((5, false))); // Don't store non-fin empty buffer. let buf = RangeBuf::from(b"", 10, false); assert!(recv.push(buf).is_ok()); assert_eq!(recv.len, 5); assert_eq!(recv.off, 5); assert_eq!(recv.data.len(), 0); // Check flow control for empty buffer. let buf = RangeBuf::from(b"", 16, false); assert_eq!(recv.push(buf), Err(Error::FlowControl)); // Store fin empty buffer. let buf = RangeBuf::from(b"", 5, true); assert!(recv.push(buf).is_ok()); assert_eq!(recv.len, 5); assert_eq!(recv.off, 5); assert_eq!(recv.data.len(), 1); // Don't store additional fin empty buffers. let buf = RangeBuf::from(b"", 5, true); assert!(recv.push(buf).is_ok()); assert_eq!(recv.len, 5); assert_eq!(recv.off, 5); assert_eq!(recv.data.len(), 1); // Don't store additional fin non-empty buffers. let buf = RangeBuf::from(b"aa", 3, true); assert!(recv.push(buf).is_ok()); assert_eq!(recv.len, 5); assert_eq!(recv.off, 5); assert_eq!(recv.data.len(), 1); // Validate final size with fin empty buffers. let buf = RangeBuf::from(b"", 6, true); assert_eq!(recv.push(buf), Err(Error::FinalSize)); let buf = RangeBuf::from(b"", 4, true); assert_eq!(recv.push(buf), Err(Error::FinalSize)); let mut buf = [0; 32]; assert_eq!(recv.pop(&mut buf), Ok((0, true))); } #[test] fn ordered_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"hello", 0, false); let second = RangeBuf::from(b"world", 5, false); let third = RangeBuf::from(b"something", 10, true); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 10); assert_eq!(recv.off, 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); assert!(recv.push(third).is_ok()); assert_eq!(recv.len, 19); assert_eq!(recv.off, 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 19); assert_eq!(recv.off, 0); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 19); assert_eq!(fin, true); assert_eq!(&buf[..len], b"helloworldsomething"); assert_eq!(recv.len, 19); assert_eq!(recv.off, 19); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn split_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"something", 0, false); let second = RangeBuf::from(b"helloworld", 9, true); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 19); assert_eq!(recv.off, 0); let (len, fin) = recv.pop(&mut buf[..10]).unwrap(); assert_eq!(len, 10); assert_eq!(fin, false); assert_eq!(&buf[..len], b"somethingh"); assert_eq!(recv.len, 19); assert_eq!(recv.off, 10); let (len, fin) = recv.pop(&mut buf[..5]).unwrap(); assert_eq!(len, 5); assert_eq!(fin, false); assert_eq!(&buf[..len], b"ellow"); assert_eq!(recv.len, 19); assert_eq!(recv.off, 15); let (len, fin) = recv.pop(&mut buf[..10]).unwrap(); assert_eq!(len, 4); assert_eq!(fin, true); assert_eq!(&buf[..len], b"orld"); assert_eq!(recv.len, 19); assert_eq!(recv.off, 19); } #[test] fn incomplete_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"something", 0, false); let second = RangeBuf::from(b"helloworld", 9, true); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 19); assert_eq!(recv.off, 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 19); assert_eq!(recv.off, 0); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 19); assert_eq!(fin, true); assert_eq!(&buf[..len], b"somethinghelloworld"); assert_eq!(recv.len, 19); assert_eq!(recv.off, 19); } #[test] fn zero_len_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"something", 0, false); let second = RangeBuf::from(b"", 9, true); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 9); assert_eq!(fin, true); assert_eq!(&buf[..len], b"something"); assert_eq!(recv.len, 9); assert_eq!(recv.off, 9); } #[test] fn past_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"something", 0, false); let second = RangeBuf::from(b"hello", 3, false); let third = RangeBuf::from(b"ello", 4, true); let fourth = RangeBuf::from(b"ello", 5, true); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 9); assert_eq!(fin, false); assert_eq!(&buf[..len], b"something"); assert_eq!(recv.len, 9); assert_eq!(recv.off, 9); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 9); assert_eq!(recv.data.len(), 0); assert_eq!(recv.push(third), Err(Error::FinalSize)); assert!(recv.push(fourth).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 9); assert_eq!(recv.data.len(), 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn fully_overlapping_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"something", 0, false); let second = RangeBuf::from(b"hello", 4, false); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 9); assert_eq!(fin, false); assert_eq!(&buf[..len], b"something"); assert_eq!(recv.len, 9); assert_eq!(recv.off, 9); assert_eq!(recv.data.len(), 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn fully_overlapping_read2() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"something", 0, false); let second = RangeBuf::from(b"hello", 4, false); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 2); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 9); assert_eq!(fin, false); assert_eq!(&buf[..len], b"somehello"); assert_eq!(recv.len, 9); assert_eq!(recv.off, 9); assert_eq!(recv.data.len(), 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn fully_overlapping_read3() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"something", 0, false); let second = RangeBuf::from(b"hello", 3, false); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 8); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 3); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 9); assert_eq!(fin, false); assert_eq!(&buf[..len], b"somhellog"); assert_eq!(recv.len, 9); assert_eq!(recv.off, 9); assert_eq!(recv.data.len(), 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn fully_overlapping_read_multi() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"somethingsomething", 0, false); let second = RangeBuf::from(b"hello", 3, false); let third = RangeBuf::from(b"hello", 12, false); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 8); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); assert!(recv.push(third).is_ok()); assert_eq!(recv.len, 17); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 2); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 18); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 5); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 18); assert_eq!(fin, false); assert_eq!(&buf[..len], b"somhellogsomhellog"); assert_eq!(recv.len, 18); assert_eq!(recv.off, 18); assert_eq!(recv.data.len(), 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn overlapping_start_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"something", 0, false); let second = RangeBuf::from(b"hello", 8, true); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 13); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 2); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 13); assert_eq!(fin, true); assert_eq!(&buf[..len], b"somethingello"); assert_eq!(recv.len, 13); assert_eq!(recv.off, 13); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn overlapping_end_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"hello", 0, false); let second = RangeBuf::from(b"something", 3, true); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 12); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 12); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 2); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 12); assert_eq!(fin, true); assert_eq!(&buf[..len], b"helsomething"); assert_eq!(recv.len, 12); assert_eq!(recv.off, 12); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn partially_multi_overlapping_reordered_read() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"hello", 8, false); let second = RangeBuf::from(b"something", 0, false); let third = RangeBuf::from(b"moar", 11, true); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 13); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 13); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 2); assert!(recv.push(third).is_ok()); assert_eq!(recv.len, 15); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 3); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 15); assert_eq!(fin, true); assert_eq!(&buf[..len], b"somethinhelloar"); assert_eq!(recv.len, 15); assert_eq!(recv.off, 15); assert_eq!(recv.data.len(), 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn partially_multi_overlapping_reordered_read2() { let mut recv = RecvBuf::new(std::u64::MAX); assert_eq!(recv.len, 0); let mut buf = [0; 32]; let first = RangeBuf::from(b"aaa", 0, false); let second = RangeBuf::from(b"bbb", 2, false); let third = RangeBuf::from(b"ccc", 4, false); let fourth = RangeBuf::from(b"ddd", 6, false); let fifth = RangeBuf::from(b"eee", 9, false); let sixth = RangeBuf::from(b"fff", 11, false); assert!(recv.push(second).is_ok()); assert_eq!(recv.len, 5); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 1); assert!(recv.push(fourth).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 2); assert!(recv.push(third).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 3); assert!(recv.push(first).is_ok()); assert_eq!(recv.len, 9); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 4); assert!(recv.push(sixth).is_ok()); assert_eq!(recv.len, 14); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 5); assert!(recv.push(fifth).is_ok()); assert_eq!(recv.len, 14); assert_eq!(recv.off, 0); assert_eq!(recv.data.len(), 6); let (len, fin) = recv.pop(&mut buf).unwrap(); assert_eq!(len, 14); assert_eq!(fin, false); assert_eq!(&buf[..len], b"aabbbcdddeefff"); assert_eq!(recv.len, 14); assert_eq!(recv.off, 14); assert_eq!(recv.data.len(), 0); assert_eq!(recv.pop(&mut buf), Err(Error::Done)); } #[test] fn empty_write() { let mut send = SendBuf::new(std::u64::MAX); assert_eq!(send.len, 0); let write = send.pop(std::usize::MAX).unwrap(); assert_eq!(write.len(), 0); assert_eq!(write.fin(), false); } #[test] fn multi_write() { let mut send = SendBuf::new(std::u64::MAX); assert_eq!(send.len, 0); let first = b"something"; let second = b"helloworld"; assert!(send.push_slice(first, false).is_ok()); assert_eq!(send.len, 9); assert!(send.push_slice(second, true).is_ok()); assert_eq!(send.len, 19); let write = send.pop(128).unwrap(); assert_eq!(write.len(), 19); assert_eq!(write.fin(), true); assert_eq!(&write[..], b"somethinghelloworld"); assert_eq!(send.len, 0); } #[test] fn split_write() { let mut send = SendBuf::new(std::u64::MAX); assert_eq!(send.len, 0); let first = b"something"; let second = b"helloworld"; assert!(send.push_slice(first, false).is_ok()); assert_eq!(send.len, 9); assert!(send.push_slice(second, true).is_ok()); assert_eq!(send.len, 19); let write = send.pop(10).unwrap(); assert_eq!(write.off(), 0); assert_eq!(write.len(), 10); assert_eq!(write.fin(), false); assert_eq!(&write[..], b"somethingh"); assert_eq!(send.len, 9); let write = send.pop(5).unwrap(); assert_eq!(write.off(), 10); assert_eq!(write.len(), 5); assert_eq!(write.fin(), false); assert_eq!(&write[..], b"ellow"); assert_eq!(send.len, 4); let write = send.pop(10).unwrap(); assert_eq!(write.off(), 15); assert_eq!(write.len(), 4); assert_eq!(write.fin(), true); assert_eq!(&write[..], b"orld"); assert_eq!(send.len, 0); } #[test] fn resend() { let mut send = SendBuf::new(std::u64::MAX); assert_eq!(send.len, 0); assert_eq!(send.off_front(), 0); let first = b"something"; let second = b"helloworld"; assert!(send.push_slice(first, false).is_ok()); assert_eq!(send.off_front(), 0); assert!(send.push_slice(second, true).is_ok()); assert_eq!(send.off_front(), 0); let write1 = send.pop(4).unwrap(); assert_eq!(write1.off(), 0); assert_eq!(write1.len(), 4); assert_eq!(write1.fin(), false); assert_eq!(&write1[..], b"some"); assert_eq!(send.len, 15); assert_eq!(send.off_front(), 4); let write2 = send.pop(5).unwrap(); assert_eq!(write2.off(), 4); assert_eq!(write2.len(), 5); assert_eq!(write2.fin(), false); assert_eq!(&write2[..], b"thing"); assert_eq!(send.len, 10); assert_eq!(send.off_front(), 9); let write3 = send.pop(5).unwrap(); assert_eq!(write3.off(), 9); assert_eq!(write3.len(), 5); assert_eq!(write3.fin(), false); assert_eq!(&write3[..], b"hello"); assert_eq!(send.len, 5); assert_eq!(send.off_front(), 14); send.push(write2).unwrap(); assert_eq!(send.len, 10); assert_eq!(send.off_front(), 4); send.push(write1).unwrap(); assert_eq!(send.len, 14); assert_eq!(send.off_front(), 0); let write4 = send.pop(11).unwrap(); assert_eq!(write4.off(), 0); assert_eq!(write4.len(), 9); assert_eq!(write4.fin(), false); assert_eq!(&write4[..], b"something"); assert_eq!(send.len, 5); assert_eq!(send.off_front(), 14); let write5 = send.pop(11).unwrap(); assert_eq!(write5.off(), 14); assert_eq!(write5.len(), 5); assert_eq!(write5.fin(), true); assert_eq!(&write5[..], b"world"); assert_eq!(send.len, 0); assert_eq!(send.off_front(), 19); } #[test] fn write_blocked_by_off() { let mut send = SendBuf::default(); assert_eq!(send.len, 0); let first = b"something"; let second = b"helloworld"; assert_eq!(send.push_slice(first, false), Ok(0)); assert_eq!(send.len, 0); assert_eq!(send.push_slice(second, true), Ok(0)); assert_eq!(send.len, 0); send.update_max_data(5); assert_eq!(send.push_slice(first, false), Ok(5)); assert_eq!(send.len, 5); assert_eq!(send.push_slice(second, true), Ok(0)); assert_eq!(send.len, 5); let write = send.pop(10).unwrap(); assert_eq!(write.off(), 0); assert_eq!(write.len(), 5); assert_eq!(write.fin(), false); assert_eq!(&write[..], b"somet"); assert_eq!(send.len, 0); let write = send.pop(10).unwrap(); assert_eq!(write.off(), 5); assert_eq!(write.len(), 0); assert_eq!(write.fin(), false); assert_eq!(&write[..], b""); assert_eq!(send.len, 0); send.update_max_data(15); assert_eq!(send.push_slice(&first[5..], false), Ok(4)); assert_eq!(send.len, 4); assert_eq!(send.push_slice(second, true), Ok(6)); assert_eq!(send.len, 10); let write = send.pop(10).unwrap(); assert_eq!(write.off(), 5); assert_eq!(write.len(), 10); assert_eq!(write.fin(), false); assert_eq!(&write[..], b"hinghellow"); assert_eq!(send.len, 0); send.update_max_data(25); assert_eq!(send.push_slice(&second[6..], true), Ok(4)); assert_eq!(send.len, 4); let write = send.pop(10).unwrap(); assert_eq!(write.off(), 15); assert_eq!(write.len(), 4); assert_eq!(write.fin(), true); assert_eq!(&write[..], b"orld"); assert_eq!(send.len, 0); } #[test] fn zero_len_write() { let mut send = SendBuf::new(std::u64::MAX); assert_eq!(send.len, 0); let first = b"something"; assert!(send.push_slice(first, false).is_ok()); assert_eq!(send.len, 9); assert!(send.push_slice(&[], true).is_ok()); assert_eq!(send.len, 9); let write = send.pop(10).unwrap(); assert_eq!(write.off(), 0); assert_eq!(write.len(), 9); assert_eq!(write.fin(), true); assert_eq!(&write[..], b"something"); assert_eq!(send.len, 0); } #[test] fn recv_flow_control() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let mut buf = [0; 32]; let first = RangeBuf::from(b"hello", 0, false); let second = RangeBuf::from(b"world", 5, false); let third = RangeBuf::from(b"something", 10, false); assert_eq!(stream.recv.push(second), Ok(())); assert_eq!(stream.recv.push(first), Ok(())); assert!(!stream.recv.almost_full()); assert_eq!(stream.recv.push(third), Err(Error::FlowControl)); let (len, fin) = stream.recv.pop(&mut buf).unwrap(); assert_eq!(&buf[..len], b"helloworld"); assert_eq!(fin, false); assert!(stream.recv.almost_full()); stream.recv.update_max_data(); assert_eq!(stream.recv.max_data_next(), 25); assert!(!stream.recv.almost_full()); let third = RangeBuf::from(b"something", 10, false); assert_eq!(stream.recv.push(third), Ok(())); } #[test] fn recv_past_fin() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let first = RangeBuf::from(b"hello", 0, true); let second = RangeBuf::from(b"world", 5, false); assert_eq!(stream.recv.push(first), Ok(())); assert_eq!(stream.recv.push(second), Err(Error::FinalSize)); } #[test] fn recv_fin_dup() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let first = RangeBuf::from(b"hello", 0, true); let second = RangeBuf::from(b"hello", 0, true); assert_eq!(stream.recv.push(first), Ok(())); assert_eq!(stream.recv.push(second), Ok(())); let mut buf = [0; 32]; let (len, fin) = stream.recv.pop(&mut buf).unwrap(); assert_eq!(&buf[..len], b"hello"); assert_eq!(fin, true); } #[test] fn recv_fin_change() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let first = RangeBuf::from(b"hello", 0, true); let second = RangeBuf::from(b"world", 5, true); assert_eq!(stream.recv.push(second), Ok(())); assert_eq!(stream.recv.push(first), Err(Error::FinalSize)); } #[test] fn recv_fin_lower_than_received() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let first = RangeBuf::from(b"hello", 0, true); let second = RangeBuf::from(b"world", 5, false); assert_eq!(stream.recv.push(second), Ok(())); assert_eq!(stream.recv.push(first), Err(Error::FinalSize)); } #[test] fn recv_fin_flow_control() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let mut buf = [0; 32]; let first = RangeBuf::from(b"hello", 0, false); let second = RangeBuf::from(b"world", 5, true); assert_eq!(stream.recv.push(first), Ok(())); assert_eq!(stream.recv.push(second), Ok(())); let (len, fin) = stream.recv.pop(&mut buf).unwrap(); assert_eq!(&buf[..len], b"helloworld"); assert_eq!(fin, true); assert!(!stream.recv.almost_full()); } #[test] fn recv_fin_reset_mismatch() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let first = RangeBuf::from(b"hello", 0, true); assert_eq!(stream.recv.push(first), Ok(())); assert_eq!(stream.recv.reset(10), Err(Error::FinalSize)); } #[test] fn recv_reset_dup() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let first = RangeBuf::from(b"hello", 0, false); assert_eq!(stream.recv.push(first), Ok(())); assert_eq!(stream.recv.reset(5), Ok(0)); assert_eq!(stream.recv.reset(5), Ok(0)); } #[test] fn recv_reset_change() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let first = RangeBuf::from(b"hello", 0, false); assert_eq!(stream.recv.push(first), Ok(())); assert_eq!(stream.recv.reset(5), Ok(0)); assert_eq!(stream.recv.reset(10), Err(Error::FinalSize)); } #[test] fn recv_reset_lower_than_received() { let mut stream = Stream::new(15, 0, true, true); assert!(!stream.recv.almost_full()); let first = RangeBuf::from(b"hello", 0, false); assert_eq!(stream.recv.push(first), Ok(())); assert_eq!(stream.recv.reset(4), Err(Error::FinalSize)); } #[test] fn send_flow_control() { let mut stream = Stream::new(0, 15, true, true); let first = b"hello"; let second = b"world"; let third = b"something"; assert!(stream.send.push_slice(first, false).is_ok()); assert!(stream.send.push_slice(second, false).is_ok()); assert!(stream.send.push_slice(third, false).is_ok()); let write = stream.send.pop(25).unwrap(); assert_eq!(write.off(), 0); assert_eq!(write.len(), 15); assert_eq!(write.fin(), false); assert_eq!(write.data, b"helloworldsomet"); let write = stream.send.pop(25).unwrap(); assert_eq!(write.off(), 15); assert_eq!(write.len(), 0); assert_eq!(write.fin(), false); assert_eq!(write.data, b""); let first = RangeBuf::from(b"helloworldsomet", 0, false); assert_eq!(stream.send.push(first), Ok(())); let write = stream.send.pop(10).unwrap(); assert_eq!(write.off(), 0); assert_eq!(write.len(), 10); assert_eq!(write.fin(), false); assert_eq!(write.data, b"helloworld"); let write = stream.send.pop(10).unwrap(); assert_eq!(write.off(), 10); assert_eq!(write.len(), 5); assert_eq!(write.fin(), false); assert_eq!(write.data, b"somet"); } #[test] fn send_past_fin() { let mut stream = Stream::new(0, 15, true, true); let first = b"hello"; let second = b"world"; let third = b"third"; assert_eq!(stream.send.push_slice(first, false), Ok(5)); assert_eq!(stream.send.push_slice(second, true), Ok(5)); assert!(stream.send.is_fin()); assert_eq!(stream.send.push_slice(third, false), Err(Error::FinalSize)); } #[test] fn send_fin_dup() { let mut stream = Stream::new(0, 15, true, true); let first = RangeBuf::from(b"hello", 0, true); let second = RangeBuf::from(b"hello", 0, true); assert_eq!(stream.send.push(first), Ok(())); assert_eq!(stream.send.push(second), Ok(())); } #[test] fn send_undo_fin() { let mut stream = Stream::new(0, 15, true, true); let first = b"hello"; let second = RangeBuf::from(b"hello", 0, false); assert_eq!(stream.send.push_slice(first, true), Ok(5)); assert!(stream.send.is_fin()); assert_eq!(stream.send.push(second), Err(Error::FinalSize)); } #[test] fn send_fin_max_data_match() { let mut stream = Stream::new(0, 15, true, true); let slice = b"hellohellohello"; assert!(stream.send.push_slice(slice, true).is_ok()); let write = stream.send.pop(15).unwrap(); assert_eq!(write.off(), 0); assert_eq!(write.len(), 15); assert_eq!(write.fin(), true); assert_eq!(write.data, slice); } #[test] fn send_fin_zero_length() { let mut stream = Stream::new(0, 15, true, true); assert_eq!(stream.send.push_slice(b"hello", false), Ok(5)); assert_eq!(stream.send.push_slice(b"", true), Ok(0)); assert!(stream.send.is_fin()); let write = stream.send.pop(5).unwrap(); assert_eq!(write.off(), 0); assert_eq!(write.len(), 5); assert_eq!(write.fin(), true); assert_eq!(write.data, b"hello"); } #[test] fn send_ack() { let mut stream = Stream::new(0, 15, true, true); assert_eq!(stream.send.push_slice(b"hello", false), Ok(5)); assert_eq!(stream.send.push_slice(b"world", false), Ok(5)); assert_eq!(stream.send.push_slice(b"", true), Ok(0)); assert!(stream.send.is_fin()); let write = stream.send.pop(5).unwrap(); assert_eq!(write.off(), 0); assert_eq!(write.len(), 5); assert_eq!(write.fin(), false); assert_eq!(write.data, b"hello"); stream.send.ack(write.off(), write.len()); assert_eq!(stream.send.push(write), Ok(())); let write = stream.send.pop(5).unwrap(); assert_eq!(write.off(), 5); assert_eq!(write.len(), 5); assert_eq!(write.fin(), true); assert_eq!(write.data, b"world"); } #[test] fn send_ack_reordering() { let mut stream = Stream::new(0, 15, true, true); assert_eq!(stream.send.push_slice(b"hello", false), Ok(5)); assert_eq!(stream.send.push_slice(b"world", false), Ok(5)); assert_eq!(stream.send.push_slice(b"", true), Ok(0)); assert!(stream.send.is_fin()); let write1 = stream.send.pop(5).unwrap(); assert_eq!(write1.off(), 0); assert_eq!(write1.len(), 5); assert_eq!(write1.fin(), false); assert_eq!(write1.data, b"hello"); let write2 = stream.send.pop(1).unwrap(); assert_eq!(write2.off(), 5); assert_eq!(write2.len(), 1); assert_eq!(write2.fin(), false); assert_eq!(write2.data, b"w"); stream.send.ack(write2.off(), write2.len()); stream.send.ack(write1.off(), write1.len()); assert_eq!(stream.send.push(write1), Ok(())); assert_eq!(stream.send.push(write2), Ok(())); let write = stream.send.pop(5).unwrap(); assert_eq!(write.off(), 6); assert_eq!(write.len(), 4); assert_eq!(write.fin(), true); assert_eq!(write.data, b"orld"); } #[test] fn recv_data_below_off() { let mut stream = Stream::new(15, 0, true, true); let first = RangeBuf::from(b"hello", 0, false); assert_eq!(stream.recv.push(first), Ok(())); let mut buf = [0; 10]; let (len, fin) = stream.recv.pop(&mut buf).unwrap(); assert_eq!(&buf[..len], b"hello"); assert_eq!(fin, false); let first = RangeBuf::from(b"elloworld", 1, true); assert_eq!(stream.recv.push(first), Ok(())); let (len, fin) = stream.recv.pop(&mut buf).unwrap(); assert_eq!(&buf[..len], b"world"); assert_eq!(fin, true); } #[test] fn stream_complete() { let mut stream = Stream::new(30, 30, true, true); assert_eq!(stream.send.push_slice(b"hello", false), Ok(5)); assert_eq!(stream.send.push_slice(b"world", false), Ok(5)); assert!(!stream.send.is_complete()); assert!(!stream.send.is_fin()); assert_eq!(stream.send.push_slice(b"", true), Ok(0)); assert!(!stream.send.is_complete()); assert!(stream.send.is_fin()); let buf = RangeBuf::from(b"hello", 0, true); assert!(stream.recv.push(buf).is_ok()); assert!(!stream.recv.is_fin()); stream.send.ack(6, 4); assert!(!stream.send.is_complete()); let mut buf = [0; 2]; assert_eq!(stream.recv.pop(&mut buf), Ok((2, false))); assert!(!stream.recv.is_fin()); stream.send.ack(1, 5); assert!(!stream.send.is_complete()); stream.send.ack(0, 1); assert!(stream.send.is_complete()); assert!(!stream.is_complete()); let mut buf = [0; 3]; assert_eq!(stream.recv.pop(&mut buf), Ok((3, true))); assert!(stream.recv.is_fin()); assert!(stream.is_complete()); } #[test] fn send_fin_zero_length_output() { let mut stream = Stream::new(0, 15, true, true); assert_eq!(stream.send.push_slice(b"hello", false), Ok(5)); assert!(!stream.send.is_fin()); let write = stream.send.pop(5).unwrap(); assert_eq!(write.off(), 0); assert_eq!(write.len(), 5); assert_eq!(write.fin(), false); assert_eq!(write.data, b"hello"); assert_eq!(stream.send.push_slice(b"", true), Ok(0)); assert!(stream.send.is_fin()); let write = stream.send.pop(5).unwrap(); assert_eq!(write.off(), 5); assert_eq!(write.len(), 0); assert_eq!(write.fin(), true); assert_eq!(write.data, b""); } }