diff options
author | Haibo Huang <hhb@google.com> | 2020-05-08 19:26:17 -0700 |
---|---|---|
committer | Chih-Hung Hsieh <chh@google.com> | 2020-05-11 21:06:51 -0700 |
commit | 52627c866ba9ce070950c81a3a98f844a55305cf (patch) | |
tree | b261d9acf1a15a6d9d39e311930effa8fdaccd43 /src/stream | |
parent | 032d3071c35e3fc8fd539889f96691b67d489bbb (diff) | |
download | futures-util-52627c866ba9ce070950c81a3a98f844a55305cf.tar.gz |
Upgrade rust/crates/futures-util to 0.3.5
* Update Android.bp with new features and dependent
packages for futures-* 0.3.5.
* New dependencies of pin-project and pin-project-internal.
Test: mm in external/rust/crates
Change-Id: I705a08e44c8598d28b4b465170df8ed206df1494
Diffstat (limited to 'src/stream')
58 files changed, 1330 insertions, 2421 deletions
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index a30cbaa..6dc07ad 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs @@ -2,16 +2,18 @@ use crate::stream::{FuturesUnordered, StreamExt}; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::pin_project; use core::cmp::Ordering; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::pin::Pin; use alloc::collections::binary_heap::{BinaryHeap, PeekMut}; +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] struct OrderWrapper<T> { + #[pin] data: T, // A future or a future's output index: usize, } @@ -37,21 +39,18 @@ impl<T> Ord for OrderWrapper<T> { } } -impl<T> OrderWrapper<T> { - unsafe_pinned!(data: T); -} - impl<T> Future for OrderWrapper<T> where T: Future { type Output = OrderWrapper<T::Output>; fn poll( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { - self.as_mut().data().as_mut().poll(cx) - .map(|output| OrderWrapper { data: output, index: self.index }) + let index = self.index; + self.project().data.poll(cx) + .map(|output| OrderWrapper { data: output, index }) } } @@ -203,3 +202,14 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc }) } } + +impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { + fn extend<I>(&mut self, iter: I) + where + I: IntoIterator<Item = Fut>, + { + for item in iter.into_iter() { + self.push(item); + } + } +} diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 5d93a64..2b7d704 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -121,7 +121,13 @@ impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> { // notifiaction is received, the task will only be inserted into the ready to // run queue if it isn't inserted already. -impl<Fut: Future> FuturesUnordered<Fut> { +impl<Fut> Default for FuturesUnordered<Fut> { + fn default() -> FuturesUnordered<Fut> { + FuturesUnordered::new() + } +} + +impl<Fut> FuturesUnordered<Fut> { /// Constructs a new, empty [`FuturesUnordered`]. /// /// The returned [`FuturesUnordered`] does not contain any futures. @@ -151,15 +157,7 @@ impl<Fut: Future> FuturesUnordered<Fut> { is_terminated: AtomicBool::new(false), } } -} - -impl<Fut: Future> Default for FuturesUnordered<Fut> { - fn default() -> FuturesUnordered<Fut> { - FuturesUnordered::new() - } -} -impl<Fut> FuturesUnordered<Fut> { /// Returns the number of futures contained in the set. /// /// This represents the total number of in-flight futures. @@ -607,7 +605,7 @@ impl<Fut> Drop for FuturesUnordered<Fut> { } } -impl<Fut: Future> FromIterator<Fut> for FuturesUnordered<Fut> { +impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> { fn from_iter<I>(iter: I) -> Self where I: IntoIterator<Item = Fut>, @@ -622,3 +620,14 @@ impl<Fut: Future> FusedStream for FuturesUnordered<Fut> { self.is_terminated.load(Relaxed) } } + +impl<Fut> Extend<Fut> for FuturesUnordered<Fut> { + fn extend<I>(&mut self, iter: I) + where + I: IntoIterator<Item = Fut>, + { + for item in iter.into_iter() { + self.push(item); + } + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index ba3575b..2a5ecf9 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -13,9 +13,9 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; #[allow(clippy::module_inception)] mod stream; pub use self::stream::{ - Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold, ForEach, Fuse, Inspect, - Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, - TakeWhile, Then, Zip, + Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, Fuse, + Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt, + StreamFuture, Take, TakeWhile, TakeUntil, Then, Zip, }; #[cfg(feature = "std")] @@ -24,6 +24,9 @@ pub use self::stream::CatchUnwind; #[cfg(feature = "alloc")] pub use self::stream::Chunks; +#[cfg(feature = "alloc")] +pub use self::stream::ReadyChunks; + #[cfg(feature = "sink")] pub use self::stream::Forward; diff --git a/src/stream/once.rs b/src/stream/once.rs index 4f68b0c..21cd14b 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -2,7 +2,7 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, project}; /// Creates a stream of a single element. /// @@ -16,34 +16,39 @@ use pin_utils::unsafe_pinned; /// # }); /// ``` pub fn once<Fut: Future>(future: Fut) -> Once<Fut> { - Once { future: Some(future) } + Once::new(future) } /// A stream which emits single element and then EOF. /// /// This stream will never block and is always ready. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Once<Fut> { + #[pin] future: Option<Fut> } -impl<Fut: Unpin> Unpin for Once<Fut> {} - impl<Fut> Once<Fut> { - unsafe_pinned!(future: Option<Fut>); + pub(crate) fn new(future: Fut) -> Self { + Self { future: Some(future) } + } } impl<Fut: Future> Stream for Once<Fut> { type Item = Fut::Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let v = match self.as_mut().future().as_pin_mut() { + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + #[project] + let Once { mut future } = self.project(); + let v = match future.as_mut().as_pin_mut() { Some(fut) => ready!(fut.poll(cx)), None => return Poll::Ready(None), }; - self.as_mut().future().set(None); + future.set(None); Poll::Ready(Some(v)) } diff --git a/src/stream/select.rs b/src/stream/select.rs index b5fb813..36503e4 100644 --- a/src/stream/select.rs +++ b/src/stream/select.rs @@ -2,18 +2,20 @@ use crate::stream::{StreamExt, Fuse}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; +use pin_project::{pin_project, project}; /// Stream for the [`select()`] function. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Select<St1, St2> { + #[pin] stream1: Fuse<St1>, + #[pin] stream2: Fuse<St2>, flag: bool, } -impl<St1: Unpin, St2: Unpin> Unpin for Select<St1, St2> {} - /// This function will attempt to pull items from both streams. Each /// stream will be polled in a round-robin fashion, and whenever a stream is /// ready to yield an item that item is yielded. @@ -56,11 +58,11 @@ impl<St1, St2> Select<St1, St2> { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[project] pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { - unsafe { - let Self { stream1, stream2, .. } = self.get_unchecked_mut(); - (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut()) - } + #[project] + let Select { stream1, stream2, .. } = self.project(); + (stream1.get_pin_mut(), stream2.get_pin_mut()) } /// Consumes this combinator, returning the underlying streams. @@ -87,14 +89,13 @@ impl<St1, St2> Stream for Select<St1, St2> { type Item = St1::Item; + #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St1::Item>> { - let Select { flag, stream1, stream2 } = - unsafe { self.get_unchecked_mut() }; - let stream1 = unsafe { Pin::new_unchecked(stream1) }; - let stream2 = unsafe { Pin::new_unchecked(stream2) }; + #[project] + let Select { flag, stream1, stream2 } = self.project(); if !*flag { poll_inner(flag, stream1, stream2, cx) diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs index d257689..2547993 100644 --- a/src/stream/select_all.rs +++ b/src/stream/select_all.rs @@ -131,3 +131,11 @@ impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> { select_all(iter) } } + +impl<St: Stream + Unpin> Extend<St> for SelectAll<St> { + fn extend<T: IntoIterator<Item = St>>(&mut self, iter: T) { + for st in iter { + self.push(st) + } + } +} diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs index bea6e5b..a822576 100644 --- a/src/stream/stream/buffer_unordered.rs +++ b/src/stream/stream/buffer_unordered.rs @@ -4,27 +4,24 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; use core::fmt; use core::pin::Pin; /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) /// method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct BufferUnordered<St> where St: Stream, { + #[pin] stream: Fuse<St>, in_progress_queue: FuturesUnordered<St::Item>, max: usize, } -impl<St> Unpin for BufferUnordered<St> -where - St: Stream + Unpin, -{} - impl<St> fmt::Debug for BufferUnordered<St> where St: Stream + fmt::Debug, @@ -43,9 +40,6 @@ where St: Stream, St::Item: Future, { - unsafe_pinned!(stream: Fuse<St>); - unsafe_unpinned!(in_progress_queue: FuturesUnordered<St::Item>); - pub(super) fn new(stream: St, n: usize) -> BufferUnordered<St> where St: Stream, @@ -58,37 +52,7 @@ where } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - self.stream.get_ref() - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - self.stream.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream().get_pin_mut() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream.into_inner() - } + delegate_access_inner!(stream, St, (.)); } impl<St> Stream for BufferUnordered<St> @@ -98,27 +62,31 @@ where { type Item = <St::Item as Future>::Output; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { + #[project] + let BufferUnordered { mut stream, in_progress_queue, max } = self.project(); + // First up, try to spawn off as many futures as possible by filling up // our queue of futures. - while self.in_progress_queue.len() < self.max { - match self.as_mut().stream().poll_next(cx) { - Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut), + while in_progress_queue.len() < *max { + match stream.as_mut().poll_next(cx) { + Poll::Ready(Some(fut)) => in_progress_queue.push(fut), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - match self.as_mut().in_progress_queue().poll_next_unpin(cx) { + match in_progress_queue.poll_next_unpin(cx) { x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, Poll::Ready(None) => {} } // If more values are still coming from the stream, we're not done yet - if self.stream.is_done() { + if stream.is_done() { Poll::Ready(None) } else { Poll::Pending diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs index 2445a85..9dff01f 100644 --- a/src/stream/stream/buffered.rs +++ b/src/stream/stream/buffered.rs @@ -4,28 +4,24 @@ use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; use core::fmt; use core::pin::Pin; /// Stream for the [`buffered`](super::StreamExt::buffered) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct Buffered<St> where St: Stream, St::Item: Future, { + #[pin] stream: Fuse<St>, in_progress_queue: FuturesOrdered<St::Item>, max: usize, } -impl<St> Unpin for Buffered<St> -where - St: Stream + Unpin, - St::Item: Future, -{} - impl<St> fmt::Debug for Buffered<St> where St: Stream + fmt::Debug, @@ -45,9 +41,6 @@ where St: Stream, St::Item: Future, { - unsafe_pinned!(stream: Fuse<St>); - unsafe_unpinned!(in_progress_queue: FuturesOrdered<St::Item>); - pub(super) fn new(stream: St, n: usize) -> Buffered<St> { Buffered { stream: super::Fuse::new(stream), @@ -56,37 +49,7 @@ where } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - self.stream.get_ref() - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - self.stream.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream().get_pin_mut() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream.into_inner() - } + delegate_access_inner!(stream, St, (.)); } impl<St> Stream for Buffered<St> @@ -96,27 +59,31 @@ where { type Item = <St::Item as Future>::Output; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - // Try to spawn off as many futures as possible by filling up - // our in_progress_queue of futures. - while self.in_progress_queue.len() < self.max { - match self.as_mut().stream().poll_next(cx) { - Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut), + #[project] + let Buffered { mut stream, in_progress_queue, max } = self.project(); + + // First up, try to spawn off as many futures as possible by filling up + // our queue of futures. + while in_progress_queue.len() < *max { + match stream.as_mut().poll_next(cx) { + Poll::Ready(Some(fut)) => in_progress_queue.push(fut), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - let res = self.as_mut().in_progress_queue().poll_next_unpin(cx); + let res = in_progress_queue.poll_next_unpin(cx); if let Some(val) = ready!(res) { return Poll::Ready(Some(val)) } // If more values are still coming from the stream, we're not done yet - if self.stream.is_done() { + if stream.is_done() { Poll::Ready(None) } else { Poll::Pending diff --git a/src/stream/stream/catch_unwind.rs b/src/stream/stream/catch_unwind.rs index 8d2dcf7..1bb43b2 100644 --- a/src/stream/stream/catch_unwind.rs +++ b/src/stream/stream/catch_unwind.rs @@ -1,45 +1,50 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; use std::any::Any; use std::pin::Pin; use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; /// Stream for the [`catch_unwind`](super::StreamExt::catch_unwind) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct CatchUnwind<St> { + #[pin] stream: St, caught_unwind: bool, } impl<St: Stream + UnwindSafe> CatchUnwind<St> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(caught_unwind: bool); - pub(super) fn new(stream: St) -> CatchUnwind<St> { CatchUnwind { stream, caught_unwind: false } } + + delegate_access_inner!(stream, St, ()); } impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> { type Item = Result<St::Item, Box<dyn Any + Send>>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.caught_unwind { + #[project] + let CatchUnwind { stream, caught_unwind } = self.project(); + + if *caught_unwind { Poll::Ready(None) } else { let res = catch_unwind(AssertUnwindSafe(|| { - self.as_mut().stream().poll_next(cx) + stream.poll_next(cx) })); match res { Ok(poll) => poll.map(|opt| opt.map(Ok)), Err(e) => { - *self.as_mut().caught_unwind() = true; + *caught_unwind = true; Poll::Ready(Some(Err(e))) }, } diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index b2ada69..720903c 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -1,13 +1,16 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, project}; /// Stream for the [`chain`](super::StreamExt::chain) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Chain<St1, St2> { + #[pin] first: Option<St1>, + #[pin] second: St2, } @@ -16,9 +19,6 @@ impl<St1, St2> Chain<St1, St2> where St1: Stream, St2: Stream<Item = St1::Item>, { - unsafe_pinned!(first: Option<St1>); - unsafe_pinned!(second: St2); - pub(super) fn new(stream1: St1, stream2: St2) -> Chain<St1, St2> { Chain { first: Some(stream1), @@ -42,17 +42,20 @@ where St1: Stream, { type Item = St1::Item; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if let Some(first) = self.as_mut().first().as_pin_mut() { + #[project] + let Chain { mut first, second } = self.project(); + if let Some(first) = first.as_mut().as_pin_mut() { if let Some(item) = ready!(first.poll_next(cx)) { return Poll::Ready(Some(item)) } } - self.as_mut().first().set(None); - self.as_mut().second().poll_next(cx) + first.set(None); + second.poll_next(cx) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs index b42d1d1..d24c31c 100644 --- a/src/stream/stream/chunks.rs +++ b/src/stream/stream/chunks.rs @@ -3,26 +3,23 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; use core::mem; use core::pin::Pin; use alloc::vec::Vec; /// Stream for the [`chunks`](super::StreamExt::chunks) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Chunks<St: Stream> { + #[pin] stream: Fuse<St>, items: Vec<St::Item>, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } -impl<St: Unpin + Stream> Unpin for Chunks<St> {} - impl<St: Stream> Chunks<St> where St: Stream { - unsafe_unpinned!(items: Vec<St::Item>); - unsafe_pinned!(stream: Fuse<St>); - pub(super) fn new(stream: St, capacity: usize) -> Chunks<St> { assert!(capacity > 0); @@ -33,70 +30,43 @@ impl<St: Stream> Chunks<St> where St: Stream { } } - fn take(mut self: Pin<&mut Self>) -> Vec<St::Item> { + fn take(self: Pin<&mut Self>) -> Vec<St::Item> { let cap = self.cap; - mem::replace(self.as_mut().items(), Vec::with_capacity(cap)) + mem::replace(self.project().items, Vec::with_capacity(cap)) } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - self.stream.get_ref() - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - self.stream.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream().get_pin_mut() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream.into_inner() - } + delegate_access_inner!(stream, St, (.)); } impl<St: Stream> Stream for Chunks<St> { type Item = Vec<St::Item>; + #[project] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { + #[project] + let Chunks { mut stream, items, cap } = self.as_mut().project(); loop { - match ready!(self.as_mut().stream().poll_next(cx)) { + match ready!(stream.as_mut().poll_next(cx)) { // Push the item into the buffer and check whether it is full. // If so, replace our buffer with a new and empty one and return // the full one. Some(item) => { - self.as_mut().items().push(item); - if self.items.len() >= self.cap { - return Poll::Ready(Some(self.as_mut().take())) + items.push(item); + if items.len() >= *cap { + return Poll::Ready(Some(self.take())) } } // Since the underlying stream ran out of values, return what we // have buffered, if we have anything. None => { - let last = if self.items.is_empty() { + let last = if items.is_empty() { None } else { - let full_buf = mem::replace(self.as_mut().items(), Vec::new()); + let full_buf = mem::replace(items, Vec::new()); Some(full_buf) }; diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs index 127a3f7..349e42d 100644 --- a/src/stream/stream/collect.rs +++ b/src/stream/stream/collect.rs @@ -3,24 +3,21 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`collect`](super::StreamExt::collect) method. +#[pin_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Collect<St, C> { + #[pin] stream: St, collection: C, } -impl<St: Unpin, C> Unpin for Collect<St, C> {} - impl<St: Stream, C: Default> Collect<St, C> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(collection: C); - - fn finish(mut self: Pin<&mut Self>) -> C { - mem::replace(self.as_mut().collection(), Default::default()) + fn finish(self: Pin<&mut Self>) -> C { + mem::replace(self.project().collection, Default::default()) } pub(super) fn new(stream: St) -> Collect<St, C> { @@ -46,11 +43,14 @@ where St: Stream, { type Output = C; + #[project] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> { + #[project] + let Collect { mut stream, collection } = self.as_mut().project(); loop { - match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => self.as_mut().collection().extend(Some(e)), - None => return Poll::Ready(self.as_mut().finish()), + match ready!(stream.as_mut().poll_next(cx)) { + Some(e) => collection.extend(Some(e)), + None => return Poll::Ready(self.finish()), } } } diff --git a/src/stream/stream/concat.rs b/src/stream/stream/concat.rs index 704efc7..647632b 100644 --- a/src/stream/stream/concat.rs +++ b/src/stream/stream/concat.rs @@ -2,26 +2,23 @@ use core::pin::Pin; use futures_core::future::{Future, FusedFuture}; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`concat`](super::StreamExt::concat) method. +#[pin_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Concat<St: Stream> { + #[pin] stream: St, accum: Option<St::Item>, } -impl<St: Stream + Unpin> Unpin for Concat<St> {} - impl<St> Concat<St> where St: Stream, St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(accum: Option<St::Item>); - pub(super) fn new(stream: St) -> Concat<St> { Concat { stream, @@ -37,16 +34,19 @@ where St: Stream, { type Output = St::Item; + #[project] fn poll( - mut self: Pin<&mut Self>, cx: &mut Context<'_> + self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output> { + #[project] + let Concat { mut stream, accum } = self.project(); + loop { - match ready!(self.as_mut().stream().poll_next(cx)) { + match ready!(stream.as_mut().poll_next(cx)) { None => { - return Poll::Ready(self.as_mut().accum().take().unwrap_or_default()) + return Poll::Ready(accum.take().unwrap_or_default()) } Some(e) => { - let accum = self.as_mut().accum(); if let Some(a) = accum { a.extend(e) } else { diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index 6366c8b..477a052 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -3,22 +3,19 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`enumerate`](super::StreamExt::enumerate) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Enumerate<St> { + #[pin] stream: St, count: usize, } -impl<St: Unpin> Unpin for Enumerate<St> {} - impl<St: Stream> Enumerate<St> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(count: usize); - pub(super) fn new(stream: St) -> Enumerate<St> { Enumerate { stream, @@ -26,37 +23,7 @@ impl<St: Stream> Enumerate<St> { } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St: Stream + FusedStream> FusedStream for Enumerate<St> { @@ -68,15 +35,19 @@ impl<St: Stream + FusedStream> FusedStream for Enumerate<St> { impl<St: Stream> Stream for Enumerate<St> { type Item = (usize, St::Item); + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - match ready!(self.as_mut().stream().poll_next(cx)) { + #[project] + let Enumerate { stream, count } = self.project(); + + match ready!(stream.poll_next(cx)) { Some(item) => { - let count = self.count; - *self.as_mut().count() += 1; - Poll::Ready(Some((count, item))) + let prev_count = *count; + *count += 1; + Poll::Ready(Some((prev_count, item))) } None => Poll::Ready(None), } diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index 06335f1..9d848ad 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -5,25 +5,23 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; +use crate::fns::FnMut1; /// Stream for the [`filter`](super::StreamExt::filter) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct Filter<St, Fut, F> where St: Stream, { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option<Fut>, pending_item: Option<St::Item>, } -impl<St, Fut, F> Unpin for Filter<St, Fut, F> -where - St: Stream + Unpin, - Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for Filter<St, Fut, F> where St: Stream + fmt::Debug, @@ -41,14 +39,9 @@ where impl<St, Fut, F> Filter<St, Fut, F> where St: Stream, - F: FnMut(&St::Item) -> Fut, + F: for<'a> FnMut1<&'a St::Item, Output=Fut>, Fut: Future<Output = bool>, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option<Fut>); - unsafe_unpinned!(pending_item: Option<St::Item>); - pub(super) fn new(stream: St, f: F) -> Filter<St, Fut, F> { Filter { stream, @@ -58,37 +51,7 @@ where St: Stream, } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for Filter<St, Fut, F> @@ -103,34 +66,33 @@ impl<St, Fut, F> FusedStream for Filter<St, Fut, F> impl<St, Fut, F> Stream for Filter<St, Fut, F> where St: Stream, - F: FnMut(&St::Item) -> Fut, + F: for<'a> FnMut1<&'a St::Item, Output=Fut>, Fut: Future<Output = bool>, { type Item = St::Item; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>> { - loop { - if self.pending_fut.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => e, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); + #[project] + let Filter { mut stream, f, mut pending_fut, pending_item } = self.project(); + Poll::Ready(loop { + if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + let res = ready!(fut.poll(cx)); + pending_fut.set(None); + if res { + break pending_item.take(); + } + *pending_item = None; + } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { + pending_fut.set(Some(f.call_mut(&item))); + *pending_item = Some(item); + } else { + break None; } - - let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending_fut().set(None); - let item = self.as_mut().pending_item().take().unwrap(); - - if yield_item { - return Poll::Ready(Some(item)); - } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index 532e6ca..2d098ee 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -5,22 +5,20 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; +use crate::fns::FnMut1; /// Stream for the [`filter_map`](super::StreamExt::filter_map) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct FilterMap<St, Fut, F> { + #[pin] stream: St, f: F, + #[pin] pending: Option<Fut>, } -impl<St, Fut, F> Unpin for FilterMap<St, Fut, F> -where - St: Unpin, - Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for FilterMap<St, Fut, F> where St: fmt::Debug, @@ -39,50 +37,16 @@ impl<St, Fut, F> FilterMap<St, Fut, F> F: FnMut(St::Item) -> Fut, Fut: Future, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending: Option<Fut>); - pub(super) fn new(stream: St, f: F) -> FilterMap<St, Fut, F> { FilterMap { stream, f, pending: None } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F, T> FusedStream for FilterMap<St, Fut, F> where St: Stream + FusedStream, - F: FnMut(St::Item) -> Fut, + F: FnMut1<St::Item, Output=Fut>, Fut: Future<Output = Option<T>>, { fn is_terminated(&self) -> bool { @@ -92,31 +56,34 @@ impl<St, Fut, F, T> FusedStream for FilterMap<St, Fut, F> impl<St, Fut, F, T> Stream for FilterMap<St, Fut, F> where St: Stream, - F: FnMut(St::Item) -> Fut, + F: FnMut1<St::Item, Output=Fut>, Fut: Future<Output = Option<T>>, { type Item = T; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>> { - loop { - if self.pending.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => e, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(item); - self.as_mut().pending().set(Some(fut)); - } - - let item = ready!(self.as_mut().pending().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending().set(None); - if item.is_some() { - return Poll::Ready(item); + #[project] + let FilterMap { mut stream, f, mut pending } = self.project(); + Poll::Ready(loop { + if let Some(p) = pending.as_mut().as_pin_mut() { + // We have an item in progress, poll that until it's done + let item = ready!(p.poll(cx)); + pending.set(None); + if item.is_some() { + break item; + } + } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { + // No item in progress, but the stream is still going + pending.set(Some(f.call_mut(item))); + } else { + // The stream is done + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { @@ -134,7 +101,7 @@ impl<St, Fut, F, T> Stream for FilterMap<St, Fut, F> #[cfg(feature = "sink")] impl<S, Fut, F, Item> Sink<Item> for FilterMap<S, Fut, F> where S: Stream + Sink<Item>, - F: FnMut(S::Item) -> Fut, + F: FnMut1<S::Item, Output=Fut>, Fut: Future, { type Error = S::Error; diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index b19ffc0..4db77e1 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -3,77 +3,28 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, project}; /// Stream for the [`flatten`](super::StreamExt::flatten) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] -pub struct Flatten<St> -where - St: Stream, -{ +pub struct Flatten<St, U> { + #[pin] stream: St, - next: Option<St::Item>, -} - -impl<St> Unpin for Flatten<St> -where - St: Stream + Unpin, - St::Item: Unpin, -{ + #[pin] + next: Option<U>, } -impl<St> Flatten<St> -where - St: Stream, -{ - unsafe_pinned!(stream: St); - unsafe_pinned!(next: Option<St::Item>); -} - -impl<St> Flatten<St> -where - St: Stream, - St::Item: Stream, -{ +impl<St, U> Flatten<St, U> { pub(super) fn new(stream: St) -> Self { Self { stream, next: None } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } -impl<St> FusedStream for Flatten<St> +impl<St> FusedStream for Flatten<St, St::Item> where St: FusedStream, St::Item: Stream, @@ -83,34 +34,36 @@ where } } -impl<St> Stream for Flatten<St> +impl<St> Stream for Flatten<St, St::Item> where St: Stream, St::Item: Stream, { type Item = <St::Item as Stream>::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - loop { - if self.next.is_none() { - match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => self.as_mut().next().set(Some(e)), - None => return Poll::Ready(None), + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + #[project] + let Flatten { mut stream, mut next } = self.project(); + Poll::Ready(loop { + if let Some(s) = next.as_mut().as_pin_mut() { + if let Some(item) = ready!(s.poll_next(cx)) { + break Some(item); + } else { + next.set(None); } - } - - if let Some(item) = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)) { - return Poll::Ready(Some(item)); + } else if let Some(s) = ready!(stream.as_mut().poll_next(cx)) { + next.set(Some(s)); } else { - self.as_mut().next().set(None); + break None; } - } + }) } } // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl<S, Item> Sink<Item> for Flatten<S> +impl<S, Item> Sink<Item> for Flatten<S, S::Item> where S: Stream + Sink<Item>, { diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index e92a72e..d4bec25 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -3,19 +3,20 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`fold`](super::StreamExt::fold) method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Fold<St, Fut, T, F> { + #[pin] stream: St, f: F, accum: Option<T>, + #[pin] future: Option<Fut>, } -impl<St: Unpin, Fut: Unpin, T, F> Unpin for Fold<St, Fut, T, F> {} - impl<St, Fut, T, F> fmt::Debug for Fold<St, Fut, T, F> where St: fmt::Debug, @@ -36,11 +37,6 @@ where St: Stream, F: FnMut(T, St::Item) -> Fut, Fut: Future<Output = T>, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_unpinned!(accum: Option<T>); - unsafe_pinned!(future: Option<Fut>); - pub(super) fn new(stream: St, f: F, t: T) -> Fold<St, Fut, T, F> { Fold { stream, @@ -68,25 +64,27 @@ impl<St, Fut, T, F> Future for Fold<St, Fut, T, F> { type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { - loop { - // we're currently processing a future to produce a new accum value - if self.accum.is_none() { - let accum = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - *self.as_mut().accum() = Some(accum); - self.as_mut().future().set(None); - } - - let item = ready!(self.as_mut().stream().poll_next(cx)); - let accum = self.as_mut().accum().take() - .expect("Fold polled after completion"); - - if let Some(e) = item { - let future = (self.as_mut().f())(accum, e); - self.as_mut().future().set(Some(future)); + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + #[project] + let Fold { mut stream, f, accum, mut future } = self.project(); + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + *accum = Some(ready!(fut.poll(cx))); + future.set(None); + } else if accum.is_some() { + // we're waiting on a new item from the stream + let res = ready!(stream.as_mut().poll_next(cx)); + let a = accum.take().unwrap(); + if let Some(item) = res { + future.set(Some(f(a, item))); + } else { + break a; + } } else { - return Poll::Ready(accum) + panic!("Fold polled after completion") } - } + }) } } diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index f8adcb2..fb3f40f 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -3,22 +3,19 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`for_each`](super::StreamExt::for_each) method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ForEach<St, Fut, F> { + #[pin] stream: St, f: F, + #[pin] future: Option<Fut>, } -impl<St, Fut, F> Unpin for ForEach<St, Fut, F> -where - St: Unpin, - Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for ForEach<St, Fut, F> where St: fmt::Debug, @@ -37,10 +34,6 @@ where St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future<Output = ()>, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(future: Option<Fut>); - pub(super) fn new(stream: St, f: F) -> ForEach<St, Fut, F> { ForEach { stream, @@ -67,22 +60,20 @@ impl<St, Fut, F> Future for ForEach<St, Fut, F> { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + #[project] + let ForEach { mut stream, f, mut future } = self.project(); loop { - if let Some(future) = self.as_mut().future().as_pin_mut() { - ready!(future.poll(cx)); - } - self.as_mut().future().set(None); - - match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => { - let future = (self.as_mut().f())(e); - self.as_mut().future().set(Some(future)); - } - None => { - return Poll::Ready(()); - } + if let Some(fut) = future.as_mut().as_pin_mut() { + ready!(fut.poll(cx)); + future.set(None); + } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { + future.set(Some(f(item))); + } else { + break; } } + Poll::Ready(()) } } diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs index 18ca4bd..88ff2d3 100644 --- a/src/stream/stream/for_each_concurrent.rs +++ b/src/stream/stream/for_each_concurrent.rs @@ -5,23 +5,20 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) /// method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ForEachConcurrent<St, Fut, F> { + #[pin] stream: Option<St>, f: F, futures: FuturesUnordered<Fut>, limit: Option<NonZeroUsize>, } -impl<St, Fut, F> Unpin for ForEachConcurrent<St, Fut, F> -where St: Unpin, - Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F> where St: fmt::Debug, @@ -41,11 +38,6 @@ where St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future<Output = ()>, { - unsafe_pinned!(stream: Option<St>); - unsafe_unpinned!(f: F); - unsafe_unpinned!(futures: FuturesUnordered<Fut>); - unsafe_unpinned!(limit: Option<NonZeroUsize>); - pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> ForEachConcurrent<St, Fut, F> { ForEachConcurrent { stream: Some(stream), @@ -74,16 +66,17 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + #[project] + let ForEachConcurrent { mut stream, f, futures, limit } = self.project(); loop { let mut made_progress_this_iter = false; - // Try and pull an item from the stream - let current_len = self.futures.len(); // Check if we've already created a number of futures greater than `limit` - if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) { + if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) { let mut stream_completed = false; - let elem = if let Some(stream) = self.as_mut().stream().as_pin_mut() { + let elem = if let Some(stream) = stream.as_mut().as_pin_mut() { match stream.poll_next(cx) { Poll::Ready(Some(elem)) => { made_progress_this_iter = true; @@ -99,18 +92,17 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> None }; if stream_completed { - self.as_mut().stream().set(None); + stream.set(None); } if let Some(elem) = elem { - let next_future = (self.as_mut().f())(elem); - self.as_mut().futures().push(next_future); + futures.push(f(elem)); } } - match self.as_mut().futures().poll_next_unpin(cx) { + match futures.poll_next_unpin(cx) { Poll::Ready(Some(())) => made_progress_this_iter = true, Poll::Ready(None) => { - if self.stream.is_none() { + if stream.is_none() { return Poll::Ready(()) } }, diff --git a/src/stream/stream/forward.rs b/src/stream/stream/forward.rs index fd89625..9776056 100644 --- a/src/stream/stream/forward.rs +++ b/src/stream/stream/forward.rs @@ -1,59 +1,34 @@ -use crate::stream::{StreamExt, Fuse}; +use crate::stream::Fuse; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; -use futures_core::stream::{Stream, TryStream}; +use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -const INVALID_POLL: &str = "polled `Forward` after completion"; +use pin_project::{pin_project, project}; /// Future for the [`forward`](super::StreamExt::forward) method. +#[pin_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Forward<St: TryStream, Si> { +pub struct Forward<St, Si, Item> { + #[pin] sink: Option<Si>, + #[pin] stream: Fuse<St>, - buffered_item: Option<St::Ok>, + buffered_item: Option<Item>, } -impl<St: TryStream + Unpin, Si: Unpin> Unpin for Forward<St, Si> {} - -impl<St, Si, E> Forward<St, Si> -where - Si: Sink<St::Ok, Error = E>, - St: TryStream<Error = E> + Stream, -{ - unsafe_pinned!(sink: Option<Si>); - unsafe_pinned!(stream: Fuse<St>); - unsafe_unpinned!(buffered_item: Option<St::Ok>); - - pub(super) fn new(stream: St, sink: Si) -> Self { +impl<St, Si, Item> Forward<St, Si, Item> { + pub(crate) fn new(stream: St, sink: Si) -> Self { Forward { sink: Some(sink), - stream: stream.fuse(), - buffered_item: None, + stream: Fuse::new(stream), + buffered_item: None, } } - - fn try_start_send( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - item: St::Ok, - ) -> Poll<Result<(), E>> { - debug_assert!(self.buffered_item.is_none()); - { - let mut sink = self.as_mut().sink().as_pin_mut().unwrap(); - if sink.as_mut().poll_ready(cx)?.is_ready() { - return Poll::Ready(sink.start_send(item)); - } - } - *self.as_mut().buffered_item() = Some(item); - Poll::Pending - } } -impl<St, Si, Item, E> FusedFuture for Forward<St, Si> +impl<St, Si, Item, E> FusedFuture for Forward<St, Si, Item> where Si: Sink<Item, Error = E>, St: Stream<Item = Result<Item, E>>, @@ -63,34 +38,41 @@ where } } -impl<St, Si, Item, E> Future for Forward<St, Si> +impl<St, Si, Item, E> Future for Forward<St, Si, Item> where Si: Sink<Item, Error = E>, St: Stream<Item = Result<Item, E>>, { type Output = Result<(), E>; + #[project] fn poll( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { - // If we've got an item buffered already, we need to write it to the - // sink before we can do anything else - if let Some(item) = self.as_mut().buffered_item().take() { - ready!(self.as_mut().try_start_send(cx, item))?; - } + #[project] + let Forward { mut sink, mut stream, buffered_item } = self.project(); + let mut si = sink.as_mut().as_pin_mut().expect("polled `Forward` after completion"); loop { - match self.as_mut().stream().poll_next(cx)? { - Poll::Ready(Some(item)) => - ready!(self.as_mut().try_start_send(cx, item))?, + // If we've got an item buffered already, we need to write it to the + // sink before we can do anything else + if buffered_item.is_some() { + ready!(si.as_mut().poll_ready(cx))?; + si.as_mut().start_send(buffered_item.take().unwrap())?; + } + + match stream.as_mut().poll_next(cx)? { + Poll::Ready(Some(item)) => { + *buffered_item = Some(item); + } Poll::Ready(None) => { - ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL).poll_close(cx))?; - self.as_mut().sink().set(None); + ready!(si.poll_close(cx))?; + sink.set(None); return Poll::Ready(Ok(())) } Poll::Pending => { - ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL).poll_flush(cx))?; + ready!(si.poll_flush(cx))?; return Poll::Pending } } diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 9085dc5..971fe60 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -3,22 +3,19 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`fuse`](super::StreamExt::fuse) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Fuse<St> { + #[pin] stream: St, done: bool, } -impl<St: Unpin> Unpin for Fuse<St> {} - impl<St> Fuse<St> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(done: bool); - pub(super) fn new(stream: St) -> Fuse<St> { Fuse { stream, done: false } } @@ -32,37 +29,7 @@ impl<St> Fuse<St> { self.done } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<S: Stream> FusedStream for Fuse<S> { @@ -74,17 +41,21 @@ impl<S: Stream> FusedStream for Fuse<S> { impl<S: Stream> Stream for Fuse<S> { type Item = S::Item; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<S::Item>> { - if self.done { + #[project] + let Fuse { stream, done } = self.project(); + + if *done { return Poll::Ready(None); } - let item = ready!(self.as_mut().stream().poll_next(cx)); + let item = ready!(stream.poll_next(cx)); if item.is_none() { - *self.as_mut().done() = true; + *done = true; } Poll::Ready(item) } diff --git a/src/stream/stream/inspect.rs b/src/stream/stream/inspect.rs deleted file mode 100644 index e34970a..0000000 --- a/src/stream/stream/inspect.rs +++ /dev/null @@ -1,119 +0,0 @@ -use core::fmt; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -/// Stream for the [`inspect`](super::StreamExt::inspect) method. -#[must_use = "streams do nothing unless polled"] -pub struct Inspect<St, F> { - stream: St, - f: F, -} - -impl<St: Unpin, F> Unpin for Inspect<St, F> {} - -impl<St, F> fmt::Debug for Inspect<St, F> -where - St: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Inspect") - .field("stream", &self.stream) - .finish() - } -} - -impl<St, F> Inspect<St, F> - where St: Stream, - F: FnMut(&St::Item), -{ - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - - pub(super) fn new(stream: St, f: F) -> Inspect<St, F> { - Inspect { stream, f } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, F> FusedStream for Inspect<St, F> - where St: FusedStream, - F: FnMut(&St::Item), -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -// used by `TryStreamExt::{inspect_ok, inspect_err}` -#[inline] -pub(crate) fn inspect<T, F: FnMut(&T)>(x: T, mut f: F) -> T { - f(&x); - x -} - -impl<St, F> Stream for Inspect<St, F> - where St: Stream, - F: FnMut(&St::Item), -{ - type Item = St::Item; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<St::Item>> { - self.as_mut() - .stream() - .poll_next(cx) - .map(|opt| opt.map(|e| inspect(e, self.as_mut().f()))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, F, Item> Sink<Item> for Inspect<S, F> - where S: Stream + Sink<Item>, - F: FnMut(&S::Item), -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/stream/into_future.rs b/src/stream/stream/into_future.rs index abae98c..0d49384 100644 --- a/src/stream/stream/into_future.rs +++ b/src/stream/stream/into_future.rs @@ -3,7 +3,6 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; /// Future for the [`into_future`](super::StreamExt::into_future) method. #[derive(Debug)] @@ -12,11 +11,7 @@ pub struct StreamFuture<St> { stream: Option<St>, } -impl<St: Unpin> Unpin for StreamFuture<St> {} - impl<St: Stream + Unpin> StreamFuture<St> { - unsafe_pinned!(stream: Option<St>); - pub(super) fn new(stream: St) -> StreamFuture<St> { StreamFuture { stream: Some(stream) } } @@ -57,7 +52,7 @@ impl<St: Stream + Unpin> StreamFuture<St> { /// in order to return it to the caller of `Future::poll` if the stream yielded /// an element. pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> { - self.stream().as_pin_mut() + Pin::get_mut(self).stream.as_mut().map(Pin::new) } /// Consumes this combinator, returning the underlying stream. diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index 8119434..755f53a 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -4,17 +4,19 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; + +use crate::fns::FnMut1; /// Stream for the [`map`](super::StreamExt::map) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct Map<St, F> { + #[pin] stream: St, f: F, } -impl<St: Unpin, F> Unpin for Map<St, F> {} - impl<St, F> fmt::Debug for Map<St, F> where St: fmt::Debug, @@ -26,73 +28,38 @@ where } } -impl<St, T, F> Map<St, F> - where St: Stream, - F: FnMut(St::Item) -> T, -{ - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - - pub(super) fn new(stream: St, f: F) -> Map<St, F> { +impl<St, F> Map<St, F> { + pub(crate) fn new(stream: St, f: F) -> Map<St, F> { Map { stream, f } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } -impl<St, F, T> FusedStream for Map<St, F> +impl<St, F> FusedStream for Map<St, F> where St: FusedStream, - F: FnMut(St::Item) -> T, + F: FnMut1<St::Item>, { fn is_terminated(&self) -> bool { self.stream.is_terminated() } } -impl<St, F, T> Stream for Map<St, F> +impl<St, F> Stream for Map<St, F> where St: Stream, - F: FnMut(St::Item) -> T, + F: FnMut1<St::Item>, { - type Item = T; + type Item = F::Output; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll<Option<T>> { - self.as_mut() - .stream() - .poll_next(cx) - .map(|opt| opt.map(|x| self.as_mut().f()(x))) + ) -> Poll<Option<Self::Item>> { + #[project] + let Map { stream, f } = self.project(); + let res = ready!(stream.poll_next(cx)); + Poll::Ready(res.map(|x| f.call_mut(x))) } fn size_hint(&self) -> (usize, Option<usize>) { @@ -102,11 +69,11 @@ impl<St, F, T> Stream for Map<St, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl<S, F, T, Item> Sink<Item> for Map<S, F> - where S: Stream + Sink<Item>, - F: FnMut(S::Item) -> T, +impl<St, F, Item> Sink<Item> for Map<St, F> + where St: Stream + Sink<Item>, + F: FnMut1<St::Item>, { - type Error = S::Error; + type Error = St::Error; delegate_sink!(stream, Item); } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index da5ade8..359bb2f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -19,6 +19,8 @@ use futures_core::{ #[cfg(feature = "sink")] use futures_sink::Sink; +use crate::fns::{InspectFn, inspect_fn}; + mod chain; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::chain::Chain; @@ -44,8 +46,14 @@ mod filter_map; pub use self::filter_map::FilterMap; mod flatten; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::flatten::Flatten; + +delegate_all!( + /// Stream for the [`inspect`](StreamExt::inspect) method. + Flatten<St>( + flatten::Flatten<St, St::Item> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)] + where St: Stream +); mod fold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -53,9 +61,15 @@ pub use self::fold::Fold; #[cfg(feature = "sink")] mod forward; + #[cfg(feature = "sink")] -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::forward::Forward; +delegate_all!( + /// Future for the [`forward`](super::StreamExt::forward) method. + Forward<St, Si>( + forward::Forward<St, Si, St::Ok> + ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)] + where St: TryStream +); mod for_each; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -69,15 +83,24 @@ mod into_future; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_future::StreamFuture; -mod inspect; -pub(crate) use self::inspect::inspect; // used by `TryStreamExt::{inspect_ok, inspect_err}` -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::inspect::Inspect; +delegate_all!( + /// Stream for the [`inspect`](StreamExt::inspect) method. + Inspect<St, F>( + map::Map<St, InspectFn<F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))] +); mod map; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::map::Map; +delegate_all!( + /// Stream for the [`flat_map`](StreamExt::flat_map) method. + FlatMap<St, U, F>( + flatten::Flatten<Map<St, F>, U> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))] +); + mod next; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::next::Next; @@ -106,6 +129,10 @@ mod take_while; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::take_while::TakeWhile; +mod take_until; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::take_until::TakeUntil; + mod then; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::then::Then; @@ -120,6 +147,12 @@ mod chunks; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::chunks::Chunks; +#[cfg(feature = "alloc")] +mod ready_chunks; +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::ready_chunks::ReadyChunks; + mod scan; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::scan::Scan; @@ -544,10 +577,46 @@ pub trait StreamExt: Stream { Flatten::new(self) } - /// Combinator similar to [`StreamExt::fold`] that holds internal state and produces a new stream. + /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. /// - /// Accepts initial state and closure which will be applied to each element of the stream until provided closure - /// returns `None`. Once `None` is returned, stream will be terminated. + /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, + /// you would have to chain combinators like `.map(f).flatten()` while this + /// combinator provides ability to write `.flat_map(f)` instead of chaining. + /// + /// The provided closure which produce inner streams is executed over all elements + /// of stream as last inner stream is terminated and next stream item is available. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `flat_map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x])); + /// + /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await); + /// # }); + /// ``` + fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> + where + F: FnMut(Self::Item) -> U, + U: Stream, + Self: Sized, + { + FlatMap::new(self, f) + } + + /// Combinator similar to [`StreamExt::fold`] that holds internal state + /// and produces a new stream. + /// + /// Accepts initial state and closure which will be applied to each element + /// of the stream until provided closure returns `None`. Once `None` is + /// returned, stream will be terminated. /// /// # Examples /// @@ -580,7 +649,7 @@ pub trait StreamExt: Stream { /// /// This function, like `Iterator::skip_while`, will skip elements on the /// stream until the predicate `f` resolves to `false`. Once one element - /// returns false all future elements will be returned from the underlying + /// returns `false`, all future elements will be returned from the underlying /// stream. /// /// # Examples @@ -611,7 +680,7 @@ pub trait StreamExt: Stream { /// /// This function, like `Iterator::take_while`, will take elements from the /// stream until the predicate `f` resolves to `false`. Once one element - /// returns false it will always return that the stream is done. + /// returns `false`, it will always return that the stream is done. /// /// # Examples /// @@ -636,6 +705,50 @@ pub trait StreamExt: Stream { TakeWhile::new(self, f) } + /// Take elements from this stream until the provided future resolves. + /// + /// This function will take elements from the stream until the provided + /// stopping future `fut` resolves. Once the `fut` future becomes ready, + /// this stream combinator will always return that the stream is done. + /// + /// The stopping future may return any type. Once the stream is stopped + /// the result of the stopping future may be aceessed with `TakeUntil::take_result()`. + /// The stream may also be resumed with `TakeUntil::take_future()`. + /// See the documentation of [`TakeUntil`] for more information. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::stream::{self, StreamExt}; + /// use futures::task::Poll; + /// + /// let stream = stream::iter(1..=10); + /// + /// let mut i = 0; + /// let stop_fut = future::poll_fn(|_cx| { + /// i += 1; + /// if i <= 5 { + /// Poll::Pending + /// } else { + /// Poll::Ready(()) + /// } + /// }); + /// + /// let stream = stream.take_until(stop_fut); + /// + /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); + /// # }); + /// ``` + fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> + where + Fut: Future, + Self: Sized, + { + TakeUntil::new(self, fut) + } + /// Runs this stream to completion, executing the provided asynchronous /// closure for each element on the stream. /// @@ -1098,6 +1211,32 @@ pub trait StreamExt: Stream { Chunks::new(self, capacity) } + /// An adaptor for chunking up ready items of the stream inside a vector. + /// + /// This combinator will attempt to pull ready items from this stream and + /// buffer them into a local vector. At most `capacity` items will get + /// buffered before they're yielded from the returned stream. If underlying + /// stream returns `Poll::Pending`, and collected chunk is not empty, it will + /// be immediately returned. + /// + /// If the underlying stream ended and only a partial vector was created, + /// it'll be returned. Additionally if an error happens from the underlying + /// stream then the currently buffered items will be yielded. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// # Panics + /// + /// This method will panic if `capacity` is zero. + #[cfg(feature = "alloc")] + fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self> + where + Self: Sized, + { + ReadyChunks::new(self, capacity) + } + /// A future that completes after the given stream has been fully processed /// into the sink and the sink has been flushed and closed. /// @@ -1111,13 +1250,13 @@ pub trait StreamExt: Stream { #[cfg(feature = "sink")] fn forward<S>(self, sink: S) -> Forward<Self, S> where - S: Sink<<Self as TryStream>::Ok>, - Self: TryStream<Error = S::Error> + Sized, + S: Sink<Self::Ok, Error = Self::Error>, + Self: TryStream + Sized, { Forward::new(self, sink) } - /// Splits this `Stream + Sink` object into separate `Stream` and `Sink` + /// Splits this `Stream + Sink` object into separate `Sink` and `Stream` /// objects. /// /// This can be useful when you want to split ownership between tasks, or diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs index 9272baf..fb0f874 100644 --- a/src/stream/stream/peek.rs +++ b/src/stream/stream/peek.rs @@ -1,4 +1,3 @@ -use crate::future::Either; use crate::stream::{Fuse, StreamExt}; use core::fmt; use core::pin::Pin; @@ -7,26 +6,23 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// A `Stream` that implements a `peek` method. /// /// The `peek` method can be used to retrieve a reference /// to the next `Stream::Item` if available. A subsequent /// call to `poll` will return the owned item. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Peekable<St: Stream> { + #[pin] stream: Fuse<St>, peeked: Option<St::Item>, } -impl<St: Stream + Unpin> Unpin for Peekable<St> {} - impl<St: Stream> Peekable<St> { - unsafe_pinned!(stream: Fuse<St>); - unsafe_unpinned!(peeked: Option<St::Item>); - pub(super) fn new(stream: St) -> Peekable<St> { Peekable { stream: stream.fuse(), @@ -34,37 +30,7 @@ impl<St: Stream> Peekable<St> { } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - self.stream.get_ref() - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - self.stream.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream().get_pin_mut() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream.into_inner() - } + delegate_access_inner!(stream, St, (.)); /// Produces a `Peek` future which retrieves a reference to the next item /// in the stream, or `None` if the underlying stream terminates. @@ -72,42 +38,27 @@ impl<St: Stream> Peekable<St> { Peek { inner: Some(self) } } - /// Attempt to poll the underlying stream, and return the mutable borrow - /// in case that is desirable to try for another time. - /// In case a peeking poll is successful, the reference to the next item - /// will be in the `Either::Right` variant; otherwise, the mutable borrow - /// will be in the `Either::Left` variant. - fn do_poll_peek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Either<Pin<&mut Self>, Option<&St::Item>> { - if self.peeked.is_some() { - let this: &Self = self.into_ref().get_ref(); - return Either::Right(this.peeked.as_ref()); - } - match self.as_mut().stream().poll_next(cx) { - Poll::Ready(None) => Either::Right(None), - Poll::Ready(Some(item)) => { - *self.as_mut().peeked() = Some(item); - let this: &Self = self.into_ref().get_ref(); - Either::Right(this.peeked.as_ref()) - } - _ => Either::Left(self), - } - } - /// Peek retrieves a reference to the next item in the stream. /// /// This method polls the underlying stream and return either a reference /// to the next item if the stream is ready or passes through any errors. + #[project] pub fn poll_peek( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&St::Item>> { - match self.do_poll_peek(cx) { - Either::Left(_) => Poll::Pending, - Either::Right(poll) => Poll::Ready(poll), - } + #[project] + let Peekable { mut stream, peeked } = self.project(); + + Poll::Ready(loop { + if peeked.is_some() { + break peeked.as_ref(); + } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { + *peeked = Some(item); + } else { + break None; + } + }) } } @@ -120,11 +71,14 @@ impl<St: Stream> FusedStream for Peekable<St> { impl<S: Stream> Stream for Peekable<S> { type Item = S::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - if let Some(item) = self.as_mut().peeked().take() { + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + #[project] + let Peekable { stream, peeked } = self.project(); + if let Some(item) = peeked.take() { return Poll::Ready(Some(item)); } - self.as_mut().stream().poll_next(cx) + stream.poll_next(cx) } fn size_hint(&self) -> (usize, Option<usize>) { @@ -151,13 +105,12 @@ where } /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] +#[pin_project] #[must_use = "futures do nothing unless polled"] pub struct Peek<'a, St: Stream> { inner: Option<Pin<&'a mut Peekable<St>>>, } -impl<St: Stream> Unpin for Peek<'_, St> {} - impl<St> fmt::Debug for Peek<'_, St> where St: Stream + fmt::Debug, @@ -181,15 +134,12 @@ where St: Stream, { type Output = Option<&'a St::Item>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - if let Some(peekable) = self.inner.take() { - match peekable.do_poll_peek(cx) { - Either::Left(peekable) => { - self.inner = Some(peekable); - Poll::Pending - } - Either::Right(peek) => Poll::Ready(peek), - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = self.project().inner; + if let Some(peekable) = inner { + ready!(peekable.as_mut().poll_peek(cx)); + + inner.take().unwrap().poll_peek(cx) } else { panic!("Peek polled after completion") } diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs new file mode 100644 index 0000000..2152cb7 --- /dev/null +++ b/src/stream/stream/ready_chunks.rs @@ -0,0 +1,112 @@ +use crate::stream::Fuse; +use futures_core::stream::{Stream, FusedStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project::{pin_project, project}; +use core::mem; +use core::pin::Pin; +use alloc::vec::Vec; + +/// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method. +#[pin_project] +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct ReadyChunks<St: Stream> { + #[pin] + stream: Fuse<St>, + items: Vec<St::Item>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 +} + +impl<St: Stream> ReadyChunks<St> where St: Stream { + pub(super) fn new(stream: St, capacity: usize) -> ReadyChunks<St> { + assert!(capacity > 0); + + ReadyChunks { + stream: super::Fuse::new(stream), + items: Vec::with_capacity(capacity), + cap: capacity, + } + } + + delegate_access_inner!(stream, St, (.)); +} + +impl<St: Stream> Stream for ReadyChunks<St> { + type Item = Vec<St::Item>; + + #[project] + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + #[project] + let ReadyChunks { items, cap, mut stream } = self.project(); + + loop { + match stream.as_mut().poll_next(cx) { + // Flush all collected data if underlying stream doesn't contain + // more ready values + Poll::Pending => { + return if items.is_empty() { + Poll::Pending + } else { + Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap)))) + } + } + + // Push the ready item into the buffer and check whether it is full. + // If so, replace our buffer with a new and empty one and return + // the full one. + Poll::Ready(Some(item)) => { + items.push(item); + if items.len() >= *cap { + return Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap)))) + } + } + + // Since the underlying stream ran out of values, return what we + // have buffered, if we have anything. + Poll::Ready(None) => { + let last = if items.is_empty() { + None + } else { + let full_buf = mem::replace(items, Vec::new()); + Some(full_buf) + }; + + return Poll::Ready(last); + } + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(chunk_len); + let upper = match upper { + Some(x) => x.checked_add(chunk_len), + None => None, + }; + (lower, upper) + } +} + +impl<St: FusedStream> FusedStream for ReadyChunks<St> { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() && self.items.is_empty() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<S, Item> Sink<Item> for ReadyChunks<S> +where + S: Stream + Sink<Item>, +{ + type Error = S::Error; + + delegate_sink!(stream, Item); +} diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index 4f937f4..0cdfcbc 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; struct StateFn<S, F> { state: S, @@ -13,15 +13,16 @@ struct StateFn<S, F> { } /// Stream for the [`scan`](super::StreamExt::scan) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct Scan<St: Stream, S, Fut, F> { + #[pin] stream: St, state_f: Option<StateFn<S, F>>, + #[pin] future: Option<Fut>, } -impl<St: Unpin + Stream, S, Fut: Unpin, F> Unpin for Scan<St, S, Fut, F> {} - impl<St, S, Fut, F> fmt::Debug for Scan<St, S, Fut, F> where St: Stream + fmt::Debug, @@ -40,10 +41,6 @@ where } impl<St: Stream, S, Fut, F> Scan<St, S, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(state_f: Option<StateFn<S, F>>); - unsafe_pinned!(future: Option<Fut>); - /// Checks if internal state is `None`. fn is_done_taking(&self) -> bool { self.state_f.is_none() @@ -67,37 +64,7 @@ where } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<B, St, S, Fut, F> Stream for Scan<St, S, Fut, F> @@ -108,29 +75,32 @@ where { type Item = B; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> { + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> { if self.is_done_taking() { return Poll::Ready(None); } - if self.future.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => e, - None => return Poll::Ready(None), - }; - let state_f = self.as_mut().state_f().as_mut().unwrap(); - let fut = (state_f.f)(&mut state_f.state, item); - self.as_mut().future().set(Some(fut)); - } - - let item = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - self.as_mut().future().set(None); - - if item.is_none() { - self.as_mut().state_f().take(); - } - - Poll::Ready(item) + #[project] + let Scan { mut stream, state_f, mut future } = self.project(); + + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + let item = ready!(fut.poll(cx)); + future.set(None); + + if item.is_none() { + *state_f = None; + } + + break item; + } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { + let state_f = state_f.as_mut().unwrap(); + future.set(Some((state_f.f)(&mut state_f.state, item))) + } else { + break None; + } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index 0b7c632..c0f6611 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -3,22 +3,19 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`skip`](super::StreamExt::skip) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Skip<St> { + #[pin] stream: St, remaining: usize, } -impl<St: Unpin> Unpin for Skip<St> {} - impl<St: Stream> Skip<St> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(remaining: usize); - pub(super) fn new(stream: St, n: usize) -> Skip<St> { Skip { stream, @@ -26,37 +23,7 @@ impl<St: Stream> Skip<St> { } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St: FusedStream> FusedStream for Skip<St> { @@ -68,18 +35,22 @@ impl<St: FusedStream> FusedStream for Skip<St> { impl<St: Stream> Stream for Skip<St> { type Item = St::Item; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>> { - while self.remaining > 0 { - match ready!(self.as_mut().stream().poll_next(cx)) { - Some(_) => *self.as_mut().remaining() -= 1, - None => return Poll::Ready(None), + #[project] + let Skip { mut stream, remaining } = self.project(); + while *remaining > 0 { + if ready!(stream.as_mut().poll_next(cx)).is_some() { + *remaining -= 1; + } else { + return Poll::Ready(None); } } - self.as_mut().stream().poll_next(cx) + stream.poll_next(cx) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 666d9de..3d664f2 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -5,20 +5,21 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`skip_while`](super::StreamExt::skip_while) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct SkipWhile<St, Fut, F> where St: Stream { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option<Fut>, pending_item: Option<St::Item>, done_skipping: bool, } -impl<St: Unpin + Stream, Fut: Unpin, F> Unpin for SkipWhile<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for SkipWhile<St, Fut, F> where St: Stream + fmt::Debug, @@ -40,12 +41,6 @@ impl<St, Fut, F> SkipWhile<St, Fut, F> F: FnMut(&St::Item) -> Fut, Fut: Future<Output = bool>, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option<Fut>); - unsafe_unpinned!(pending_item: Option<St::Item>); - unsafe_unpinned!(done_skipping: bool); - pub(super) fn new(stream: St, f: F) -> SkipWhile<St, Fut, F> { SkipWhile { stream, @@ -56,37 +51,7 @@ impl<St, Fut, F> SkipWhile<St, Fut, F> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F> @@ -106,44 +71,48 @@ impl<St, Fut, F> Stream for SkipWhile<St, Fut, F> { type Item = St::Item; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>> { - if self.done_skipping { - return self.as_mut().stream().poll_next(cx); - } - - loop { - if self.pending_item.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => e, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); - } + #[project] + let SkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project(); - let skipped = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - let item = self.as_mut().pending_item().take().unwrap(); - self.as_mut().pending_fut().set(None); + if *done_skipping { + return stream.poll_next(cx); + } - if !skipped { - *self.as_mut().done_skipping() = true; - return Poll::Ready(Some(item)) + Poll::Ready(loop { + if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + let skipped = ready!(fut.poll(cx)); + let item = pending_item.take(); + pending_fut.set(None); + if !skipped { + *done_skipping = true; + break item; + } + } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { + pending_fut.set(Some(f(&item))); + *pending_item = Some(item); + } else { + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; - let (_, upper) = self.stream.size_hint(); - let upper = match upper { - Some(x) => x.checked_add(pending_len), - None => None, - }; - (0, upper) // can't know a lower bound, due to the predicate + if self.done_skipping { + self.stream.size_hint() + } else { + let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let (_, upper) = self.stream.size_hint(); + let upper = match upper { + Some(x) => x.checked_add(pending_len), + None => None, + }; + (0, upper) // can't know a lower bound, due to the predicate + } } } diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs index 4118b33..c7237a2 100644 --- a/src/stream/stream/split.rs +++ b/src/stream/stream/split.rs @@ -16,7 +16,7 @@ impl<S> Unpin for SplitStream<S> {} impl<S: Unpin> SplitStream<S> { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are - /// a matching pair originating from the same call to `Stream::split`. + /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>> where S: Sink<Item>, { @@ -53,7 +53,7 @@ impl<S, Item> Unpin for SplitSink<S, Item> {} impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are - /// a matching pair originating from the same call to `Stream::split`. + /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> { self.lock.reunite(other.0).map_err(|err| { ReuniteError(SplitSink(err.0), SplitStream(err.1)) diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 1109a4a..4a68920 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -4,22 +4,19 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`take`](super::StreamExt::take) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Take<St> { + #[pin] stream: St, remaining: usize, } -impl<St: Unpin> Unpin for Take<St> {} - impl<St: Stream> Take<St> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(remaining: usize); - pub(super) fn new(stream: St, n: usize) -> Take<St> { Take { stream, @@ -27,37 +24,7 @@ impl<St: Stream> Take<St> { } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St> Stream for Take<St> @@ -65,17 +32,21 @@ impl<St> Stream for Take<St> { type Item = St::Item; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>> { if self.remaining == 0 { Poll::Ready(None) } else { - let next = ready!(self.as_mut().stream().poll_next(cx)); - match next { - Some(_) => *self.as_mut().remaining() -= 1, - None => *self.as_mut().remaining() = 0, + #[project] + let Take { stream, remaining } = self.project(); + let next = ready!(stream.poll_next(cx)); + if next.is_some() { + *remaining -= 1; + } else { + *remaining = 0; } Poll::Ready(next) } diff --git a/src/stream/stream/take_until.rs b/src/stream/stream/take_until.rs new file mode 100644 index 0000000..3662620 --- /dev/null +++ b/src/stream/stream/take_until.rs @@ -0,0 +1,178 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project::{pin_project, project}; + +// FIXME: docs, tests + +/// Stream for the [`take_until`](super::StreamExt::take_until) method. +#[pin_project] +#[must_use = "streams do nothing unless polled"] +pub struct TakeUntil<St: Stream, Fut: Future> { + #[pin] + stream: St, + /// Contains the inner Future on start and None once the inner Future is resolved + /// or taken out by the user. + #[pin] + fut: Option<Fut>, + /// Contains fut's return value once fut is resolved + fut_result: Option<Fut::Output>, + /// Whether the future was taken out by the user. + free: bool, +} + +impl<St, Fut> fmt::Debug for TakeUntil<St, Fut> +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, + Fut: Future + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TakeUntil") + .field("stream", &self.stream) + .field("fut", &self.fut) + .finish() + } +} + +impl<St, Fut> TakeUntil<St, Fut> +where + St: Stream, + Fut: Future, +{ + pub(super) fn new(stream: St, fut: Fut) -> TakeUntil<St, Fut> { + TakeUntil { + stream, + fut: Some(fut), + fut_result: None, + free: false, + } + } + + delegate_access_inner!(stream, St, ()); + + /// Extract the stopping future out of the combinator. + /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet. + /// Taking out the future means the combinator will be yielding + /// elements from the wrapped stream without ever stopping it. + pub fn take_future(&mut self) -> Option<Fut> { + if self.fut.is_some() { + self.free = true; + } + + self.fut.take() + } + + /// Once the stopping future is resolved, this method can be used + /// to extract the value returned by the stopping future. + /// + /// This may be used to retrieve arbitrary data from the stopping + /// future, for example a reason why the stream was stopped. + /// + /// This method will return `None` if the future isn't resovled yet, + /// or if the result was already taken out. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::stream::{self, StreamExt}; + /// use futures::task::Poll; + /// + /// let stream = stream::iter(1..=10); + /// + /// let mut i = 0; + /// let stop_fut = future::poll_fn(|_cx| { + /// i += 1; + /// if i <= 5 { + /// Poll::Pending + /// } else { + /// Poll::Ready("reason") + /// } + /// }); + /// + /// let mut stream = stream.take_until(stop_fut); + /// let _ = stream.by_ref().collect::<Vec<_>>().await; + /// + /// let result = stream.take_result().unwrap(); + /// assert_eq!(result, "reason"); + /// # }); + /// ``` + pub fn take_result(&mut self) -> Option<Fut::Output> { + self.fut_result.take() + } + + /// Whether the stream was stopped yet by the stopping future + /// being resolved. + pub fn is_stopped(&self) -> bool { + !self.free && self.fut.is_none() + } +} + +impl<St, Fut> Stream for TakeUntil<St, Fut> +where + St: Stream, + Fut: Future, +{ + type Item = St::Item; + + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { + #[project] + let TakeUntil { stream, mut fut, fut_result, free } = self.project(); + + if let Some(f) = fut.as_mut().as_pin_mut() { + if let Poll::Ready(result) = f.poll(cx) { + fut.set(None); + *fut_result = Some(result); + } + } + + if !*free && fut.is_none() { + // Future resolved, inner stream stopped + Poll::Ready(None) + } else { + // Future either not resolved yet or taken out by the user + let item = ready!(stream.poll_next(cx)); + if item.is_none() { + fut.set(None); + } + Poll::Ready(item) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.is_stopped() { + return (0, Some(0)); + } + + self.stream.size_hint() + } +} + +impl<St, Fut> FusedStream for TakeUntil<St, Fut> +where + St: Stream, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.is_stopped() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut> +where + S: Stream + Sink<Item>, + Fut: Future, +{ + type Error = S::Error; + + delegate_sink!(stream, Item); +} diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index 68606ec..d90061e 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -5,20 +5,21 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`take_while`](super::StreamExt::take_while) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] -pub struct TakeWhile<St: Stream , Fut, F> { +pub struct TakeWhile<St: Stream, Fut, F> { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option<Fut>, pending_item: Option<St::Item>, done_taking: bool, } -impl<St: Unpin + Stream, Fut: Unpin, F> Unpin for TakeWhile<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for TakeWhile<St, Fut, F> where St: Stream + fmt::Debug, @@ -35,14 +36,6 @@ where } } -impl<St: Stream, Fut, F> TakeWhile<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option<Fut>); - unsafe_unpinned!(pending_item: Option<St::Item>); - unsafe_unpinned!(done_taking: bool); -} - impl<St, Fut, F> TakeWhile<St, Fut, F> where St: Stream, F: FnMut(&St::Item) -> Fut, @@ -58,37 +51,7 @@ impl<St, Fut, F> TakeWhile<St, Fut, F> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for TakeWhile<St, Fut, F> @@ -98,34 +61,36 @@ impl<St, Fut, F> Stream for TakeWhile<St, Fut, F> { type Item = St::Item; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>> { if self.done_taking { return Poll::Ready(None); } - if self.pending_item.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => e, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); - } - - let take = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending_fut().set(None); - let item = self.as_mut().pending_item().take().unwrap(); - - if take { - Poll::Ready(Some(item)) - } else { - *self.as_mut().done_taking() = true; - Poll::Ready(None) - } + #[project] + let TakeWhile { mut stream, f, mut pending_fut, pending_item, done_taking } = self.project(); + + Poll::Ready(loop { + if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + let take = ready!(fut.poll(cx)); + let item = pending_item.take(); + pending_fut.set(None); + if take { + break item; + } else { + *done_taking = true; + break None; + } + } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { + pending_fut.set(Some(f(&item))); + *pending_item = Some(item); + } else { + break None; + } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs index 39843b2..d54512e 100644 --- a/src/stream/stream/then.rs +++ b/src/stream/stream/then.rs @@ -5,18 +5,19 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`then`](super::StreamExt::then) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct Then<St, Fut, F> { + #[pin] stream: St, + #[pin] future: Option<Fut>, f: F, } -impl<St: Unpin, Fut: Unpin, F> Unpin for Then<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for Then<St, Fut, F> where St: fmt::Debug, @@ -30,12 +31,6 @@ where } } -impl<St, Fut, F> Then<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option<Fut>); - unsafe_unpinned!(f: F); -} - impl<St, Fut, F> Then<St, Fut, F> where St: Stream, F: FnMut(St::Item) -> Fut, @@ -48,37 +43,7 @@ impl<St, Fut, F> Then<St, Fut, F> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for Then<St, Fut, F> @@ -98,22 +63,25 @@ impl<St, Fut, F> Stream for Then<St, Fut, F> { type Item = Fut::Output; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Fut::Output>> { - if self.future.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { - None => return Poll::Ready(None), - Some(e) => e, - }; - let fut = (self.as_mut().f())(item); - self.as_mut().future().set(Some(fut)); - } - - let e = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - self.as_mut().future().set(None); - Poll::Ready(Some(e)) + #[project] + let Then { mut stream, f, mut future } = self.project(); + + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + let item = ready!(fut.poll(cx)); + future.set(None); + break Some(item); + } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { + future.set(Some(f(item))); + } else { + break None; + } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index f97ac17..6c148fb 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -3,33 +3,22 @@ use core::cmp; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`zip`](super::StreamExt::zip) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Zip<St1: Stream, St2: Stream> { + #[pin] stream1: Fuse<St1>, + #[pin] stream2: Fuse<St2>, queued1: Option<St1::Item>, queued2: Option<St2::Item>, } -#[allow(clippy::type_repetition_in_bounds)] // https://github.com/rust-lang/rust-clippy/issues/4323 -impl<St1, St2> Unpin for Zip<St1, St2> -where - St1: Stream, - Fuse<St1>: Unpin, - St2: Stream, - Fuse<St2>: Unpin, -{} - impl<St1: Stream, St2: Stream> Zip<St1, St2> { - unsafe_pinned!(stream1: Fuse<St1>); - unsafe_pinned!(stream2: Fuse<St2>); - unsafe_unpinned!(queued1: Option<St1::Item>); - unsafe_unpinned!(queued2: Option<St2::Item>); - pub(super) fn new(stream1: St1, stream2: St2) -> Zip<St1, St2> { Zip { stream1: stream1.fuse(), @@ -88,28 +77,31 @@ impl<St1, St2> Stream for Zip<St1, St2> { type Item = (St1::Item, St2::Item); + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.queued1.is_none() { - match self.as_mut().stream1().poll_next(cx) { - Poll::Ready(Some(item1)) => *self.as_mut().queued1() = Some(item1), + #[project] + let Zip { mut stream1, mut stream2, queued1, queued2 } = self.project(); + + if queued1.is_none() { + match stream1.as_mut().poll_next(cx) { + Poll::Ready(Some(item1)) => *queued1 = Some(item1), Poll::Ready(None) | Poll::Pending => {} } } - if self.queued2.is_none() { - match self.as_mut().stream2().poll_next(cx) { - Poll::Ready(Some(item2)) => *self.as_mut().queued2() = Some(item2), + if queued2.is_none() { + match stream2.as_mut().poll_next(cx) { + Poll::Ready(Some(item2)) => *queued2 = Some(item2), Poll::Ready(None) | Poll::Pending => {} } } - if self.queued1.is_some() && self.queued2.is_some() { - let pair = (self.as_mut().queued1().take().unwrap(), - self.as_mut().queued2().take().unwrap()); + if queued1.is_some() && queued2.is_some() { + let pair = (queued1.take().unwrap(), queued2.take().unwrap()); Poll::Ready(Some(pair)) - } else if self.stream1.is_done() || self.stream2.is_done() { + } else if stream1.is_done() || stream2.is_done() { Poll::Ready(None) } else { Poll::Pending diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs index 809c32a..563ed34 100644 --- a/src/stream/try_stream/and_then.rs +++ b/src/stream/try_stream/and_then.rs @@ -5,18 +5,19 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`and_then`](super::TryStreamExt::and_then) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct AndThen<St, Fut, F> { + #[pin] stream: St, + #[pin] future: Option<Fut>, f: F, } -impl<St: Unpin, Fut: Unpin, F> Unpin for AndThen<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F> where St: fmt::Debug, @@ -30,12 +31,6 @@ where } } -impl<St, Fut, F> AndThen<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option<Fut>); - unsafe_unpinned!(f: F); -} - impl<St, Fut, F> AndThen<St, Fut, F> where St: TryStream, F: FnMut(St::Ok) -> Fut, @@ -45,37 +40,7 @@ impl<St, Fut, F> AndThen<St, Fut, F> Self { stream, future: None, f } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for AndThen<St, Fut, F> @@ -85,22 +50,25 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F> { type Item = Result<Fut::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.future.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - None => return Poll::Ready(None), - Some(e) => e, - }; - let fut = (self.as_mut().f())(item); - self.as_mut().future().set(Some(fut)); - } - - let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().future().set(None); - Poll::Ready(Some(e)) + #[project] + let AndThen { mut stream, mut future, f } = self.project(); + + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + let item = ready!(fut.try_poll(cx)); + future.set(None); + break Some(item); + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + future.set(Some(f(item))); + } else { + break None; + } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/err_into.rs b/src/stream/try_stream/err_into.rs deleted file mode 100644 index f5d9294..0000000 --- a/src/stream/try_stream/err_into.rs +++ /dev/null @@ -1,98 +0,0 @@ -use core::marker::PhantomData; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::unsafe_pinned; - -/// Stream for the [`err_into`](super::TryStreamExt::err_into) method. -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct ErrInto<St, E> { - stream: St, - _marker: PhantomData<E>, -} - -impl<St: Unpin, E> Unpin for ErrInto<St, E> {} - -impl<St, E> ErrInto<St, E> { - unsafe_pinned!(stream: St); - - pub(super) fn new(stream: St) -> Self { - ErrInto { stream, _marker: PhantomData } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, E> FusedStream for ErrInto<St, E> -where - St: TryStream + FusedStream, - St::Error: Into<E>, -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, E> Stream for ErrInto<St, E> -where - St: TryStream, - St::Error: Into<E>, -{ - type Item = Result<St::Ok, E>; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.stream().try_poll_next(cx) - .map(|res| res.map(|some| some.map_err(Into::into))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, E, Item> Sink<Item> for ErrInto<S, E> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/inspect_err.rs b/src/stream/try_stream/inspect_err.rs deleted file mode 100644 index 3c23ae0..0000000 --- a/src/stream/try_stream/inspect_err.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::stream::stream::inspect; -use core::fmt; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -/// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. -#[must_use = "streams do nothing unless polled"] -pub struct InspectErr<St, F> { - stream: St, - f: F, -} - -impl<St: Unpin, F> Unpin for InspectErr<St, F> {} - -impl<St, F> fmt::Debug for InspectErr<St, F> -where - St: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("InspectErr") - .field("stream", &self.stream) - .finish() - } -} - -impl<St, F> InspectErr<St, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); -} - -impl<St, F> InspectErr<St, F> -where - St: TryStream, - F: FnMut(&St::Error), -{ - pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, F> FusedStream for InspectErr<St, F> -where - St: TryStream + FusedStream, - F: FnMut(&St::Error), -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, F> Stream for InspectErr<St, F> -where - St: TryStream, - F: FnMut(&St::Error), -{ - type Item = Result<St::Ok, St::Error>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.as_mut() - .stream() - .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.as_mut().f())))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, F, Item> Sink<Item> for InspectErr<S, F> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/inspect_ok.rs b/src/stream/try_stream/inspect_ok.rs deleted file mode 100644 index 89fb459..0000000 --- a/src/stream/try_stream/inspect_ok.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::stream::stream::inspect; -use core::fmt; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -/// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. -#[must_use = "streams do nothing unless polled"] -pub struct InspectOk<St, F> { - stream: St, - f: F, -} - -impl<St: Unpin, F> Unpin for InspectOk<St, F> {} - -impl<St, F> fmt::Debug for InspectOk<St, F> -where - St: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("InspectOk") - .field("stream", &self.stream) - .finish() - } -} - -impl<St, F> InspectOk<St, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); -} - -impl<St, F> InspectOk<St, F> -where - St: TryStream, - F: FnMut(&St::Ok), -{ - pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, F> FusedStream for InspectOk<St, F> -where - St: TryStream + FusedStream, - F: FnMut(&St::Ok), -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, F> Stream for InspectOk<St, F> -where - St: TryStream, - F: FnMut(&St::Ok), -{ - type Item = Result<St::Ok, St::Error>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.as_mut() - .stream() - .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map(|e| inspect(e, self.as_mut().f())))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, F, Item> Sink<Item> for InspectOk<S, F> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/into_stream.rs b/src/stream/try_stream/into_stream.rs index b0fa07a..370a327 100644 --- a/src/stream/try_stream/into_stream.rs +++ b/src/stream/try_stream/into_stream.rs @@ -3,54 +3,24 @@ use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::unsafe_pinned; +use pin_project::pin_project; /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct IntoStream<St> { + #[pin] stream: St, } impl<St> IntoStream<St> { - unsafe_pinned!(stream: St); - #[inline] pub(super) fn new(stream: St) -> Self { IntoStream { stream } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> { @@ -67,7 +37,7 @@ impl<St: TryStream> Stream for IntoStream<St> { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - self.stream().try_poll_next(cx) + self.project().stream.try_poll_next(cx) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/map_err.rs b/src/stream/try_stream/map_err.rs deleted file mode 100644 index 1b98d6b..0000000 --- a/src/stream/try_stream/map_err.rs +++ /dev/null @@ -1,112 +0,0 @@ -use core::fmt; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -/// Stream for the [`map_err`](super::TryStreamExt::map_err) method. -#[must_use = "streams do nothing unless polled"] -pub struct MapErr<St, F> { - stream: St, - f: F, -} - -impl<St: Unpin, F> Unpin for MapErr<St, F> {} - -impl<St, F> fmt::Debug for MapErr<St, F> -where - St: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MapErr") - .field("stream", &self.stream) - .finish() - } -} - -impl<St, F> MapErr<St, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - - /// Creates a new MapErr. - pub(super) fn new(stream: St, f: F) -> Self { - MapErr { stream, f } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, F, E> FusedStream for MapErr<St, F> -where - St: TryStream + FusedStream, - F: FnMut(St::Error) -> E, -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, F, E> Stream for MapErr<St, F> -where - St: TryStream, - F: FnMut(St::Error) -> E, -{ - type Item = Result<St::Ok, E>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.as_mut() - .stream() - .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map_err(|e| self.as_mut().f()(e)))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, F, Item> Sink<Item> for MapErr<S, F> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/map_ok.rs b/src/stream/try_stream/map_ok.rs deleted file mode 100644 index 19d01be..0000000 --- a/src/stream/try_stream/map_ok.rs +++ /dev/null @@ -1,112 +0,0 @@ -use core::fmt; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -/// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. -#[must_use = "streams do nothing unless polled"] -pub struct MapOk<St, F> { - stream: St, - f: F, -} - -impl<St: Unpin, F> Unpin for MapOk<St, F> {} - -impl<St, F> fmt::Debug for MapOk<St, F> -where - St: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MapOk") - .field("stream", &self.stream) - .finish() - } -} - -impl<St, F> MapOk<St, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - - /// Creates a new MapOk. - pub(super) fn new(stream: St, f: F) -> Self { - MapOk { stream, f } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, F, T> FusedStream for MapOk<St, F> -where - St: TryStream + FusedStream, - F: FnMut(St::Ok) -> T, -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, F, T> Stream for MapOk<St, F> -where - St: TryStream, - F: FnMut(St::Ok) -> T, -{ - type Item = Result<T, St::Error>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.as_mut() - .stream() - .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map(|x| self.as_mut().f()(x)))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, F, Item> Sink<Item> for MapOk<S, F> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 6a7ced4..99d5a6d 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -11,34 +11,53 @@ use futures_core::{ stream::TryStream, task::{Context, Poll}, }; +use crate::fns::{ + InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn, +}; +use crate::stream::{Map, Inspect}; mod and_then; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::and_then::AndThen; -mod err_into; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::err_into::ErrInto; - -mod inspect_ok; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::inspect_ok::InspectOk; - -mod inspect_err; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::inspect_err::InspectErr; +delegate_all!( + /// Stream for the [`err_into`](super::TryStreamExt::err_into) method. + ErrInto<St, E>( + MapErr<St, IntoFn<E>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())] +); + +delegate_all!( + /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. + InspectOk<St, F>( + Inspect<IntoStream<St>, InspectOkFn<F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))] +); + +delegate_all!( + /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. + InspectErr<St, F>( + Inspect<IntoStream<St>, InspectErrFn<F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))] +); mod into_stream; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_stream::IntoStream; -mod map_ok; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::map_ok::MapOk; +delegate_all!( + /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. + MapOk<St, F>( + Map<IntoStream<St>, MapOkFn<F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))] +); -mod map_err; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::map_err::MapErr; +delegate_all!( + /// Stream for the [`map_err`](super::TryStreamExt::map_err) method. + MapErr<St, F>( + Map<IntoStream<St>, MapErrFn<F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))] +); mod or_else; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -385,8 +404,9 @@ pub trait TryStreamExt: TryStream { /// Skip elements on this stream while the provided asynchronous predicate /// resolves to `true`. /// - /// This function is similar to [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) - /// but exits early if an error occurs. + /// This function is similar to + /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits + /// early if an error occurs. /// /// # Examples /// diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs index 33310d1..0bba0d0 100644 --- a/src/stream/try_stream/or_else.rs +++ b/src/stream/try_stream/or_else.rs @@ -5,18 +5,19 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`or_else`](super::TryStreamExt::or_else) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct OrElse<St, Fut, F> { + #[pin] stream: St, + #[pin] future: Option<Fut>, f: F, } -impl<St: Unpin, Fut: Unpin, F> Unpin for OrElse<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F> where St: fmt::Debug, @@ -30,12 +31,6 @@ where } } -impl<St, Fut, F> OrElse<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option<Fut>); - unsafe_unpinned!(f: F); -} - impl<St, Fut, F> OrElse<St, Fut, F> where St: TryStream, F: FnMut(St::Error) -> Fut, @@ -45,37 +40,7 @@ impl<St, Fut, F> OrElse<St, Fut, F> Self { stream, future: None, f } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for OrElse<St, Fut, F> @@ -85,23 +50,29 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F> { type Item = Result<St::Ok, Fut::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.future.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)) { - None => return Poll::Ready(None), - Some(Ok(e)) => return Poll::Ready(Some(Ok(e))), - Some(Err(e)) => e, - }; - let fut = (self.as_mut().f())(item); - self.as_mut().future().set(Some(fut)); - } - - let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().future().set(None); - Poll::Ready(Some(e)) + #[project] + let OrElse { mut stream, mut future, f } = self.project(); + + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + let item = ready!(fut.try_poll(cx)); + future.set(None); + break Some(item); + } else { + match ready!(stream.as_mut().try_poll_next(cx)) { + Some(Ok(item)) => break Some(Ok(item)), + Some(Err(e)) => { + future.set(Some(f(e))); + }, + None => break None, + } + } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs index d11e1b4..566868b 100644 --- a/src/stream/try_stream/try_buffer_unordered.rs +++ b/src/stream/try_stream/try_buffer_unordered.rs @@ -5,32 +5,27 @@ use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; use core::pin::Pin; /// Stream for the /// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TryBufferUnordered<St> where St: TryStream { + #[pin] stream: Fuse<IntoStream<St>>, in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>, max: usize, } -impl<St> Unpin for TryBufferUnordered<St> - where St: TryStream + Unpin -{} - impl<St> TryBufferUnordered<St> where St: TryStream, St::Ok: TryFuture, { - unsafe_pinned!(stream: Fuse<IntoStream<St>>); - unsafe_unpinned!(in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>); - pub(super) fn new(stream: St, n: usize) -> Self { TryBufferUnordered { stream: IntoStream::new(stream).fuse(), @@ -39,37 +34,7 @@ impl<St> TryBufferUnordered<St> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - self.stream.get_ref().get_ref() - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - self.stream.get_mut().get_mut() - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream().get_pin_mut().get_pin_mut() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream.into_inner().into_inner() - } + delegate_access_inner!(stream, St, (. .)); } impl<St> Stream for TryBufferUnordered<St> @@ -78,27 +43,31 @@ impl<St> Stream for TryBufferUnordered<St> { type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { + #[project] + let TryBufferUnordered { mut stream, in_progress_queue, max } = self.project(); + // First up, try to spawn off as many futures as possible by filling up // our queue of futures. Propagate errors from the stream immediately. - while self.in_progress_queue.len() < self.max { - match self.as_mut().stream().poll_next(cx)? { - Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut.into_future()), + while in_progress_queue.len() < *max { + match stream.as_mut().poll_next(cx)? { + Poll::Ready(Some(fut)) => in_progress_queue.push(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - match self.as_mut().in_progress_queue().poll_next_unpin(cx) { + match in_progress_queue.poll_next_unpin(cx) { x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, Poll::Ready(None) => {} } // If more values are still coming from the stream, we're not done yet - if self.stream.is_done() { + if stream.is_done() { Poll::Ready(None) } else { Poll::Pending diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs index d22e8e8..3c9aee2 100644 --- a/src/stream/try_stream/try_collect.rs +++ b/src/stream/try_stream/try_collect.rs @@ -3,34 +3,27 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, TryStream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_collect`](super::TryStreamExt::try_collect) method. +#[pin_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryCollect<St, C> { + #[pin] stream: St, items: C, } impl<St: TryStream, C: Default> TryCollect<St, C> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(items: C); - pub(super) fn new(s: St) -> TryCollect<St, C> { TryCollect { stream: s, items: Default::default(), } } - - fn finish(self: Pin<&mut Self>) -> C { - mem::replace(self.items(), Default::default()) - } } -impl<St: Unpin + TryStream, C> Unpin for TryCollect<St, C> {} - impl<St, C> FusedFuture for TryCollect<St, C> where St: TryStream + FusedStream, @@ -48,15 +41,18 @@ where { type Output = Result<C, St::Error>; + #[project] fn poll( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { - loop { - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => self.as_mut().items().extend(Some(x)), - None => return Poll::Ready(Ok(self.as_mut().finish())), + #[project] + let TryCollect { mut stream, items } = self.project(); + Poll::Ready(Ok(loop { + match ready!(stream.as_mut().try_poll_next(cx)?) { + Some(x) => items.extend(Some(x)), + None => break mem::replace(items, Default::default()), } - } + })) } } diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs index 395f166..8c9710b 100644 --- a/src/stream/try_stream/try_concat.rs +++ b/src/stream/try_stream/try_concat.rs @@ -2,26 +2,23 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method. +#[pin_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryConcat<St: TryStream> { + #[pin] stream: St, accum: Option<St::Ok>, } -impl<St: TryStream + Unpin> Unpin for TryConcat<St> {} - impl<St> TryConcat<St> where St: TryStream, St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(accum: Option<St::Ok>); - pub(super) fn new(stream: St) -> TryConcat<St> { TryConcat { stream, @@ -37,21 +34,20 @@ where { type Output = Result<St::Ok, St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - loop { - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => { - let accum = self.as_mut().accum(); - if let Some(a) = accum { - a.extend(x) - } else { - *accum = Some(x) - } - }, - None => { - return Poll::Ready(Ok(self.as_mut().accum().take().unwrap_or_default())) + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryConcat { mut stream, accum } = self.project(); + Poll::Ready(Ok(loop { + if let Some(x) = ready!(stream.as_mut().try_poll_next(cx)?) { + if let Some(a) = accum { + a.extend(x) + } else { + *accum = Some(x) } + } else { + break accum.take().unwrap_or_default(); } - } + })) } } diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs index 24a9c32..310f991 100644 --- a/src/stream/try_stream/try_filter.rs +++ b/src/stream/try_stream/try_filter.rs @@ -5,24 +5,23 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`try_filter`](super::TryStreamExt::try_filter) /// method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TryFilter<St, Fut, F> where St: TryStream { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option<Fut>, pending_item: Option<St::Ok>, } -impl<St, Fut, F> Unpin for TryFilter<St, Fut, F> - where St: TryStream + Unpin, Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for TryFilter<St, Fut, F> where St: TryStream + fmt::Debug, @@ -41,11 +40,6 @@ where impl<St, Fut, F> TryFilter<St, Fut, F> where St: TryStream { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option<Fut>); - unsafe_unpinned!(pending_item: Option<St::Ok>); - pub(super) fn new(stream: St, f: F) -> Self { TryFilter { stream, @@ -55,37 +49,7 @@ impl<St, Fut, F> TryFilter<St, Fut, F> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for TryFilter<St, Fut, F> @@ -105,29 +69,28 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F> { type Item = Result<St::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<St::Ok, St::Error>>> { - loop { - if self.pending_fut.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => x, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); - } - - let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending_fut().set(None); - let item = self.as_mut().pending_item().take().unwrap(); - - if yield_item { - return Poll::Ready(Some(Ok(item))); + #[project] + let TryFilter { mut stream, f, mut pending_fut, pending_item } = self.project(); + Poll::Ready(loop { + if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + let res = ready!(fut.poll(cx)); + pending_fut.set(None); + if res { + break pending_item.take().map(Ok); + } + *pending_item = None; + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + pending_fut.set(Some(f(&item))); + *pending_item = Some(item); + } else { + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs index ed7eeb2..ba8e43a 100644 --- a/src/stream/try_stream/try_filter_map.rs +++ b/src/stream/try_stream/try_filter_map.rs @@ -5,21 +5,20 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map) /// method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TryFilterMap<St, Fut, F> { + #[pin] stream: St, f: F, + #[pin] pending: Option<Fut>, } -impl<St, Fut, F> Unpin for TryFilterMap<St, Fut, F> - where St: Unpin, Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for TryFilterMap<St, Fut, F> where St: fmt::Debug, @@ -34,45 +33,11 @@ where } impl<St, Fut, F> TryFilterMap<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending: Option<Fut>); - pub(super) fn new(stream: St, f: F) -> Self { TryFilterMap { stream, f, pending: None } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F, T> FusedStream for TryFilterMap<St, Fut, F> @@ -92,26 +57,29 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F> { type Item = Result<T, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<T, St::Error>>> { - loop { - if self.pending.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => x, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(item); - self.as_mut().pending().set(Some(fut)); - } - - let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().pending().set(None); - if let Some(x) = result? { - return Poll::Ready(Some(Ok(x))); + #[project] + let TryFilterMap { mut stream, f, mut pending } = self.project(); + Poll::Ready(loop { + if let Some(p) = pending.as_mut().as_pin_mut() { + // We have an item in progress, poll that until it's done + let item = ready!(p.try_poll(cx)?); + pending.set(None); + if item.is_some() { + break item.map(Ok); + } + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + // No item in progress, but the stream is still going + pending.set(Some(f(item))); + } else { + // The stream is done + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs index 5f81b22..a528639 100644 --- a/src/stream/try_stream/try_flatten.rs +++ b/src/stream/try_stream/try_flatten.rs @@ -3,34 +3,22 @@ use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, project}; /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TryFlatten<St> where St: TryStream, { + #[pin] stream: St, + #[pin] next: Option<St::Ok>, } -impl<St> Unpin for TryFlatten<St> -where - St: TryStream + Unpin, - St::Ok: Unpin, -{ -} - -impl<St> TryFlatten<St> -where - St: TryStream, -{ - unsafe_pinned!(stream: St); - unsafe_pinned!(next: Option<St::Ok>); -} - impl<St> TryFlatten<St> where St: TryStream, @@ -41,37 +29,7 @@ where Self { stream, next: None } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St> FusedStream for TryFlatten<St> @@ -93,27 +51,23 @@ where { type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - loop { - if self.next.is_none() { - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(e) => self.as_mut().next().set(Some(e)), - None => return Poll::Ready(None), + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + #[project] + let TryFlatten { mut stream, mut next } = self.project(); + Poll::Ready(loop { + if let Some(s) = next.as_mut().as_pin_mut() { + if let Some(item) = ready!(s.try_poll_next(cx)?) { + break Some(Ok(item)); + } else { + next.set(None); } - } - - if let Some(item) = ready!(self - .as_mut() - .next() - .as_pin_mut() - .unwrap() - .try_poll_next(cx)?) - { - return Poll::Ready(Some(Ok(item))); + } else if let Some(s) = ready!(stream.as_mut().try_poll_next(cx)?) { + next.set(Some(s)); } else { - self.as_mut().next().set(None); + break None; } - } + }) } } diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs index b8b8dc2..d85c1fe 100644 --- a/src/stream/try_stream/try_fold.rs +++ b/src/stream/try_stream/try_fold.rs @@ -3,19 +3,20 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryFold<St, Fut, T, F> { + #[pin] stream: St, f: F, accum: Option<T>, + #[pin] future: Option<Fut>, } -impl<St: Unpin, Fut: Unpin, T, F> Unpin for TryFold<St, Fut, T, F> {} - impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F> where St: fmt::Debug, @@ -36,11 +37,6 @@ where St: TryStream, F: FnMut(T, St::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = St::Error>, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_unpinned!(accum: Option<T>); - unsafe_pinned!(future: Option<Fut>); - pub(super) fn new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F> { TryFold { stream, @@ -68,43 +64,31 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F> { type Output = Result<T, St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - loop { - // we're currently processing a future to produce a new accum value - if self.accum.is_none() { - let accum = match ready!( - self.as_mut().future().as_pin_mut() - .expect("TryFold polled after completion") - .try_poll(cx) - ) { - Ok(accum) => accum, - Err(e) => { - // Indicate that the future can no longer be polled. - self.as_mut().future().set(None); - return Poll::Ready(Err(e)); - } - }; - *self.as_mut().accum() = Some(accum); - self.as_mut().future().set(None); - } - - let item = match ready!(self.as_mut().stream().try_poll_next(cx)) { - Some(Ok(item)) => Some(item), - Some(Err(e)) => { - // Indicate that the future can no longer be polled. - *self.as_mut().accum() = None; - return Poll::Ready(Err(e)); + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryFold { mut stream, f, accum, mut future } = self.project(); + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + let res = ready!(fut.try_poll(cx)); + future.set(None); + match res { + Ok(a) => *accum = Some(a), + Err(e) => break Err(e), + } + } else if accum.is_some() { + // we're waiting on a new item from the stream + let res = ready!(stream.as_mut().try_poll_next(cx)); + let a = accum.take().unwrap(); + match res { + Some(Ok(item)) => future.set(Some(f(a, item))), + Some(Err(e)) => break Err(e), + None => break Ok(a), } - None => None, - }; - let accum = self.as_mut().accum().take().unwrap(); - - if let Some(e) = item { - let future = (self.as_mut().f())(accum, e); - self.as_mut().future().set(Some(future)); } else { - return Poll::Ready(Ok(accum)) + panic!("Fold polled after completion") } - } + }) } } diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs index 2c71107..5fc91df 100644 --- a/src/stream/try_stream/try_for_each.rs +++ b/src/stream/try_stream/try_for_each.rs @@ -3,18 +3,19 @@ use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryForEach<St, Fut, F> { + #[pin] stream: St, f: F, + #[pin] future: Option<Fut>, } -impl<St: Unpin, Fut: Unpin, F> Unpin for TryForEach<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for TryForEach<St, Fut, F> where St: fmt::Debug, @@ -33,10 +34,6 @@ where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = St::Error>, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(future: Option<Fut>); - pub(super) fn new(stream: St, f: F) -> TryForEach<St, Fut, F> { TryForEach { stream, @@ -53,20 +50,21 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F> { type Output = Result<(), St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryForEach { mut stream, f, mut future } = self.project(); loop { - if let Some(future) = self.as_mut().future().as_pin_mut() { - ready!(future.try_poll(cx))?; - } - self.as_mut().future().set(None); - - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(e) => { - let future = (self.as_mut().f())(e); - self.as_mut().future().set(Some(future)); + if let Some(fut) = future.as_mut().as_pin_mut() { + ready!(fut.try_poll(cx))?; + future.set(None); + } else { + match ready!(stream.as_mut().try_poll_next(cx)?) { + Some(e) => future.set(Some(f(e))), + None => break, } - None => return Poll::Ready(Ok(())), } } + Poll::Ready(Ok(())) } } diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs index 19c3e5b..87fd465 100644 --- a/src/stream/try_stream/try_for_each_concurrent.rs +++ b/src/stream/try_stream/try_for_each_concurrent.rs @@ -6,24 +6,21 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) /// method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryForEachConcurrent<St, Fut, F> { + #[pin] stream: Option<St>, f: F, futures: FuturesUnordered<Fut>, limit: Option<NonZeroUsize>, } -impl<St, Fut, F> Unpin for TryForEachConcurrent<St, Fut, F> -where St: Unpin, - Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F> where St: fmt::Debug, @@ -53,11 +50,6 @@ where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: Future<Output = Result<(), St::Error>>, { - unsafe_pinned!(stream: Option<St>); - unsafe_unpinned!(f: F); - unsafe_unpinned!(futures: FuturesUnordered<Fut>); - unsafe_unpinned!(limit: Option<NonZeroUsize>); - pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> TryForEachConcurrent<St, Fut, F> { TryForEachConcurrent { stream: Some(stream), @@ -76,15 +68,16 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> { type Output = Result<(), St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryForEachConcurrent { mut stream, f, futures, limit } = self.project(); loop { let mut made_progress_this_iter = false; - // Try and pull an item from the stream - let current_len = self.futures.len(); // Check if we've already created a number of futures greater than `limit` - if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) { - let poll_res = match self.as_mut().stream().as_pin_mut() { + if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) { + let poll_res = match stream.as_mut().as_pin_mut() { Some(stream) => stream.try_poll_next(cx), None => Poll::Ready(None), }; @@ -95,29 +88,28 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Some(elem) }, Poll::Ready(None) => { - self.as_mut().stream().set(None); + stream.set(None); None } Poll::Pending => None, Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - self.as_mut().stream().set(None); - drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new())); + stream.set(None); + drop(mem::replace(futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } }; if let Some(elem) = elem { - let next_future = (self.as_mut().f())(elem); - self.as_mut().futures().push(next_future); + futures.push(f(elem)); } } - match self.as_mut().futures().poll_next_unpin(cx) { + match futures.poll_next_unpin(cx) { Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, Poll::Ready(None) => { - if self.stream.is_none() { + if stream.is_none() { return Poll::Ready(Ok(())) } }, @@ -125,8 +117,8 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - self.as_mut().stream().set(None); - drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new())); + stream.set(None); + drop(mem::replace(futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } } diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs index a3d6803..624380f 100644 --- a/src/stream/try_stream/try_skip_while.rs +++ b/src/stream/try_stream/try_skip_while.rs @@ -5,21 +5,22 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while) /// method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TrySkipWhile<St, Fut, F> where St: TryStream { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option<Fut>, pending_item: Option<St::Ok>, done_skipping: bool, } -impl<St: Unpin + TryStream, Fut: Unpin, F> Unpin for TrySkipWhile<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F> where St: TryStream + fmt::Debug, @@ -38,20 +39,9 @@ where impl<St, Fut, F> TrySkipWhile<St, Fut, F> where St: TryStream, -{ - unsafe_pinned!(stream: St); -} - -impl<St, Fut, F> TrySkipWhile<St, Fut, F> - where St: TryStream, F: FnMut(&St::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = St::Error>, { - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option<Fut>); - unsafe_unpinned!(pending_item: Option<St::Ok>); - unsafe_unpinned!(done_skipping: bool); - pub(super) fn new(stream: St, f: F) -> TrySkipWhile<St, Fut, F> { TrySkipWhile { stream, @@ -62,37 +52,7 @@ impl<St, Fut, F> TrySkipWhile<St, Fut, F> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> @@ -102,34 +62,34 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> { type Item = Result<St::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.done_skipping { - return self.as_mut().stream().try_poll_next(cx); - } + #[project] + let TrySkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project(); - loop { - if self.pending_item.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(e) => e, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); - } - - let skipped = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().try_poll(cx)?); - let item = self.as_mut().pending_item().take().unwrap(); - self.as_mut().pending_fut().set(None); + if *done_skipping { + return stream.try_poll_next(cx); + } - if !skipped { - *self.as_mut().done_skipping() = true; - return Poll::Ready(Some(Ok(item))) + Poll::Ready(loop { + if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + let skipped = ready!(fut.try_poll(cx)?); + let item = pending_item.take(); + pending_fut.set(None); + if !skipped { + *done_skipping = true; + break item.map(Ok); + } + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + pending_fut.set(Some(f(&item))); + *pending_item = Some(item); + } else { + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs index 6266274..8da1248 100644 --- a/src/stream/try_stream/try_unfold.rs +++ b/src/stream/try_stream/try_unfold.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Creates a `TryStream` from a seed and a closure returning a `TryFuture`. /// @@ -67,15 +67,15 @@ where } /// Stream for the [`try_unfold`] function. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TryUnfold<T, F, Fut> { f: F, state: Option<T>, + #[pin] fut: Option<Fut>, } -impl<T, F, Fut: Unpin> Unpin for TryUnfold<T, F, Fut> {} - impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut> where T: fmt::Debug, @@ -89,12 +89,6 @@ where } } -impl<T, F, Fut> TryUnfold<T, F, Fut> { - unsafe_unpinned!(f: F); - unsafe_unpinned!(state: Option<T>); - unsafe_pinned!(fut: Option<Fut>); -} - impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut> where F: FnMut(T) -> Fut, @@ -102,27 +96,30 @@ where { type Item = Result<Item, Fut::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Item, Fut::Error>>> { - if let Some(state) = self.as_mut().state().take() { - let fut = (self.as_mut().f())(state); - self.as_mut().fut().set(Some(fut)); + #[project] + let TryUnfold {f, state, mut fut } = self.project(); + + if let Some(state) = state.take() { + fut.set(Some(f(state))); } - match self.as_mut().fut().as_pin_mut() { + match fut.as_mut().as_pin_mut() { None => { // The future previously errored Poll::Ready(None) } - Some(fut) => { - let step = ready!(fut.try_poll(cx)); - self.as_mut().fut().set(None); + Some(future) => { + let step = ready!(future.try_poll(cx)); + fut.set(None); match step { Ok(Some((item, next_state))) => { - *self.as_mut().state() = Some(next_state); + *state = Some(next_state); Poll::Ready(Some(Ok(item))) } Ok(None) => Poll::Ready(None), diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs index 3153f83..0279571 100644 --- a/src/stream/unfold.rs +++ b/src/stream/unfold.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Creates a `Stream` from a seed and a closure returning a `Future`. /// @@ -56,15 +56,15 @@ pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut> } /// Stream for the [`unfold`] function. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct Unfold<T, F, Fut> { f: F, state: Option<T>, + #[pin] fut: Option<Fut>, } -impl<T, F, Fut: Unpin> Unpin for Unfold<T, F, Fut> {} - impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut> where T: fmt::Debug, @@ -78,12 +78,6 @@ where } } -impl<T, F, Fut> Unfold<T, F, Fut> { - unsafe_unpinned!(f: F); - unsafe_unpinned!(state: Option<T>); - unsafe_pinned!(fut: Option<Fut>); -} - impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut> where F: FnMut(T) -> Fut, Fut: Future<Output = Option<(Item, T)>>, @@ -99,21 +93,24 @@ impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut> { type Item = Item; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if let Some(state) = self.as_mut().state().take() { - let fut = (self.as_mut().f())(state); - self.as_mut().fut().set(Some(fut)); + #[project] + let Unfold { state, f, mut fut } = self.project(); + + if let Some(state) = state.take() { + fut.set(Some(f(state))); } - let step = ready!(self.as_mut().fut().as_pin_mut() + let step = ready!(fut.as_mut().as_pin_mut() .expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx)); - self.as_mut().fut().set(None); + fut.set(None); if let Some((item, next_state)) = step { - *self.as_mut().state() = Some(next_state); + *state = Some(next_state); Poll::Ready(Some(item)) } else { Poll::Ready(None) |