diff options
Diffstat (limited to 'src/stream/stream/chunks.rs')
-rw-r--r-- | src/stream/stream/chunks.rs | 60 |
1 files changed, 15 insertions, 45 deletions
diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs index b42d1d1..d24c31c 100644 --- a/src/stream/stream/chunks.rs +++ b/src/stream/stream/chunks.rs @@ -3,26 +3,23 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; use core::mem; use core::pin::Pin; use alloc::vec::Vec; /// Stream for the [`chunks`](super::StreamExt::chunks) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Chunks<St: Stream> { + #[pin] stream: Fuse<St>, items: Vec<St::Item>, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } -impl<St: Unpin + Stream> Unpin for Chunks<St> {} - impl<St: Stream> Chunks<St> where St: Stream { - unsafe_unpinned!(items: Vec<St::Item>); - unsafe_pinned!(stream: Fuse<St>); - pub(super) fn new(stream: St, capacity: usize) -> Chunks<St> { assert!(capacity > 0); @@ -33,70 +30,43 @@ impl<St: Stream> Chunks<St> where St: Stream { } } - fn take(mut self: Pin<&mut Self>) -> Vec<St::Item> { + fn take(self: Pin<&mut Self>) -> Vec<St::Item> { let cap = self.cap; - mem::replace(self.as_mut().items(), Vec::with_capacity(cap)) + mem::replace(self.project().items, Vec::with_capacity(cap)) } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - self.stream.get_ref() - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - self.stream.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream().get_pin_mut() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream.into_inner() - } + delegate_access_inner!(stream, St, (.)); } impl<St: Stream> Stream for Chunks<St> { type Item = Vec<St::Item>; + #[project] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { + #[project] + let Chunks { mut stream, items, cap } = self.as_mut().project(); loop { - match ready!(self.as_mut().stream().poll_next(cx)) { + match ready!(stream.as_mut().poll_next(cx)) { // Push the item into the buffer and check whether it is full. // If so, replace our buffer with a new and empty one and return // the full one. Some(item) => { - self.as_mut().items().push(item); - if self.items.len() >= self.cap { - return Poll::Ready(Some(self.as_mut().take())) + items.push(item); + if items.len() >= *cap { + return Poll::Ready(Some(self.take())) } } // Since the underlying stream ran out of values, return what we // have buffered, if we have anything. None => { - let last = if self.items.is_empty() { + let last = if items.is_empty() { None } else { - let full_buf = mem::replace(self.as_mut().items(), Vec::new()); + let full_buf = mem::replace(items, Vec::new()); Some(full_buf) }; |