aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2020-05-08 19:26:17 -0700
committerChih-Hung Hsieh <chh@google.com>2020-05-11 21:06:51 -0700
commit52627c866ba9ce070950c81a3a98f844a55305cf (patch)
treeb261d9acf1a15a6d9d39e311930effa8fdaccd43 /src/stream
parent032d3071c35e3fc8fd539889f96691b67d489bbb (diff)
downloadfutures-util-52627c866ba9ce070950c81a3a98f844a55305cf.tar.gz
Upgrade rust/crates/futures-util to 0.3.5
* Update Android.bp with new features and dependent packages for futures-* 0.3.5. * New dependencies of pin-project and pin-project-internal. Test: mm in external/rust/crates Change-Id: I705a08e44c8598d28b4b465170df8ed206df1494
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/futures_ordered.rs26
-rw-r--r--src/stream/futures_unordered/mod.rs29
-rw-r--r--src/stream/mod.rs9
-rw-r--r--src/stream/once.rs21
-rw-r--r--src/stream/select.rs21
-rw-r--r--src/stream/select_all.rs8
-rw-r--r--src/stream/stream/buffer_unordered.rs60
-rw-r--r--src/stream/stream/buffered.rs65
-rw-r--r--src/stream/stream/catch_unwind.rs21
-rw-r--r--src/stream/stream/chain.rs19
-rw-r--r--src/stream/stream/chunks.rs60
-rw-r--r--src/stream/stream/collect.rs22
-rw-r--r--src/stream/stream/concat.rs20
-rw-r--r--src/stream/stream/enumerate.rs55
-rw-r--r--src/stream/stream/filter.rs90
-rw-r--r--src/stream/stream/filter_map.rs89
-rw-r--r--src/stream/stream/flatten.rs97
-rw-r--r--src/stream/stream/fold.rs50
-rw-r--r--src/stream/stream/for_each.rs41
-rw-r--r--src/stream/stream/for_each_concurrent.rs34
-rw-r--r--src/stream/stream/forward.rs84
-rw-r--r--src/stream/stream/fuse.rs53
-rw-r--r--src/stream/stream/inspect.rs119
-rw-r--r--src/stream/stream/into_future.rs7
-rw-r--r--src/stream/stream/map.rs81
-rw-r--r--src/stream/stream/mod.rs171
-rw-r--r--src/stream/stream/peek.rs110
-rw-r--r--src/stream/stream/ready_chunks.rs112
-rw-r--r--src/stream/stream/scan.rs84
-rw-r--r--src/stream/stream/skip.rs57
-rw-r--r--src/stream/stream/skip_while.rs107
-rw-r--r--src/stream/stream/split.rs4
-rw-r--r--src/stream/stream/take.rs55
-rw-r--r--src/stream/stream/take_until.rs178
-rw-r--r--src/stream/stream/take_while.rs93
-rw-r--r--src/stream/stream/then.rs74
-rw-r--r--src/stream/stream/zip.rs44
-rw-r--r--src/stream/try_stream/and_then.rs74
-rw-r--r--src/stream/try_stream/err_into.rs98
-rw-r--r--src/stream/try_stream/inspect_err.rs118
-rw-r--r--src/stream/try_stream/inspect_ok.rs118
-rw-r--r--src/stream/try_stream/into_stream.rs40
-rw-r--r--src/stream/try_stream/map_err.rs112
-rw-r--r--src/stream/try_stream/map_ok.rs112
-rw-r--r--src/stream/try_stream/mod.rs58
-rw-r--r--src/stream/try_stream/or_else.rs79
-rw-r--r--src/stream/try_stream/try_buffer_unordered.rs59
-rw-r--r--src/stream/try_stream/try_collect.rs28
-rw-r--r--src/stream/try_stream/try_concat.rs36
-rw-r--r--src/stream/try_stream/try_filter.rs83
-rw-r--r--src/stream/try_stream/try_filter_map.rs80
-rw-r--r--src/stream/try_stream/try_flatten.rs84
-rw-r--r--src/stream/try_stream/try_fold.rs70
-rw-r--r--src/stream/try_stream/try_for_each.rs34
-rw-r--r--src/stream/try_stream/try_for_each_concurrent.rs42
-rw-r--r--src/stream/try_stream/try_skip_while.rs94
-rw-r--r--src/stream/try_stream/try_unfold.rs33
-rw-r--r--src/stream/unfold.rs29
58 files changed, 1330 insertions, 2421 deletions
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs
index a30cbaa..6dc07ad 100644
--- a/src/stream/futures_ordered.rs
+++ b/src/stream/futures_ordered.rs
@@ -2,16 +2,18 @@ use crate::stream::{FuturesUnordered, StreamExt};
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_utils::unsafe_pinned;
+use pin_project::pin_project;
use core::cmp::Ordering;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::pin::Pin;
use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
struct OrderWrapper<T> {
+ #[pin]
data: T, // A future or a future's output
index: usize,
}
@@ -37,21 +39,18 @@ impl<T> Ord for OrderWrapper<T> {
}
}
-impl<T> OrderWrapper<T> {
- unsafe_pinned!(data: T);
-}
-
impl<T> Future for OrderWrapper<T>
where T: Future
{
type Output = OrderWrapper<T::Output>;
fn poll(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
- self.as_mut().data().as_mut().poll(cx)
- .map(|output| OrderWrapper { data: output, index: self.index })
+ let index = self.index;
+ self.project().data.poll(cx)
+ .map(|output| OrderWrapper { data: output, index })
}
}
@@ -203,3 +202,14 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc })
}
}
+
+impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
+ fn extend<I>(&mut self, iter: I)
+ where
+ I: IntoIterator<Item = Fut>,
+ {
+ for item in iter.into_iter() {
+ self.push(item);
+ }
+ }
+}
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index 5d93a64..2b7d704 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -121,7 +121,13 @@ impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> {
// notifiaction is received, the task will only be inserted into the ready to
// run queue if it isn't inserted already.
-impl<Fut: Future> FuturesUnordered<Fut> {
+impl<Fut> Default for FuturesUnordered<Fut> {
+ fn default() -> FuturesUnordered<Fut> {
+ FuturesUnordered::new()
+ }
+}
+
+impl<Fut> FuturesUnordered<Fut> {
/// Constructs a new, empty [`FuturesUnordered`].
///
/// The returned [`FuturesUnordered`] does not contain any futures.
@@ -151,15 +157,7 @@ impl<Fut: Future> FuturesUnordered<Fut> {
is_terminated: AtomicBool::new(false),
}
}
-}
-
-impl<Fut: Future> Default for FuturesUnordered<Fut> {
- fn default() -> FuturesUnordered<Fut> {
- FuturesUnordered::new()
- }
-}
-impl<Fut> FuturesUnordered<Fut> {
/// Returns the number of futures contained in the set.
///
/// This represents the total number of in-flight futures.
@@ -607,7 +605,7 @@ impl<Fut> Drop for FuturesUnordered<Fut> {
}
}
-impl<Fut: Future> FromIterator<Fut> for FuturesUnordered<Fut> {
+impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> {
fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = Fut>,
@@ -622,3 +620,14 @@ impl<Fut: Future> FusedStream for FuturesUnordered<Fut> {
self.is_terminated.load(Relaxed)
}
}
+
+impl<Fut> Extend<Fut> for FuturesUnordered<Fut> {
+ fn extend<I>(&mut self, iter: I)
+ where
+ I: IntoIterator<Item = Fut>,
+ {
+ for item in iter.into_iter() {
+ self.push(item);
+ }
+ }
+}
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index ba3575b..2a5ecf9 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -13,9 +13,9 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
#[allow(clippy::module_inception)]
mod stream;
pub use self::stream::{
- Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold, ForEach, Fuse, Inspect,
- Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take,
- TakeWhile, Then, Zip,
+ Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, Fuse,
+ Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
+ StreamFuture, Take, TakeWhile, TakeUntil, Then, Zip,
};
#[cfg(feature = "std")]
@@ -24,6 +24,9 @@ pub use self::stream::CatchUnwind;
#[cfg(feature = "alloc")]
pub use self::stream::Chunks;
+#[cfg(feature = "alloc")]
+pub use self::stream::ReadyChunks;
+
#[cfg(feature = "sink")]
pub use self::stream::Forward;
diff --git a/src/stream/once.rs b/src/stream/once.rs
index 4f68b0c..21cd14b 100644
--- a/src/stream/once.rs
+++ b/src/stream/once.rs
@@ -2,7 +2,7 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_utils::unsafe_pinned;
+use pin_project::{pin_project, project};
/// Creates a stream of a single element.
///
@@ -16,34 +16,39 @@ use pin_utils::unsafe_pinned;
/// # });
/// ```
pub fn once<Fut: Future>(future: Fut) -> Once<Fut> {
- Once { future: Some(future) }
+ Once::new(future)
}
/// A stream which emits single element and then EOF.
///
/// This stream will never block and is always ready.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Once<Fut> {
+ #[pin]
future: Option<Fut>
}
-impl<Fut: Unpin> Unpin for Once<Fut> {}
-
impl<Fut> Once<Fut> {
- unsafe_pinned!(future: Option<Fut>);
+ pub(crate) fn new(future: Fut) -> Self {
+ Self { future: Some(future) }
+ }
}
impl<Fut: Future> Stream for Once<Fut> {
type Item = Fut::Output;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let v = match self.as_mut().future().as_pin_mut() {
+ #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ #[project]
+ let Once { mut future } = self.project();
+ let v = match future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};
- self.as_mut().future().set(None);
+ future.set(None);
Poll::Ready(Some(v))
}
diff --git a/src/stream/select.rs b/src/stream/select.rs
index b5fb813..36503e4 100644
--- a/src/stream/select.rs
+++ b/src/stream/select.rs
@@ -2,18 +2,20 @@ use crate::stream::{StreamExt, Fuse};
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
+use pin_project::{pin_project, project};
/// Stream for the [`select()`] function.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Select<St1, St2> {
+ #[pin]
stream1: Fuse<St1>,
+ #[pin]
stream2: Fuse<St2>,
flag: bool,
}
-impl<St1: Unpin, St2: Unpin> Unpin for Select<St1, St2> {}
-
/// This function will attempt to pull items from both streams. Each
/// stream will be polled in a round-robin fashion, and whenever a stream is
/// ready to yield an item that item is yielded.
@@ -56,11 +58,11 @@ impl<St1, St2> Select<St1, St2> {
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
+ #[project]
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
- unsafe {
- let Self { stream1, stream2, .. } = self.get_unchecked_mut();
- (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut())
- }
+ #[project]
+ let Select { stream1, stream2, .. } = self.project();
+ (stream1.get_pin_mut(), stream2.get_pin_mut())
}
/// Consumes this combinator, returning the underlying streams.
@@ -87,14 +89,13 @@ impl<St1, St2> Stream for Select<St1, St2>
{
type Item = St1::Item;
+ #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St1::Item>> {
- let Select { flag, stream1, stream2 } =
- unsafe { self.get_unchecked_mut() };
- let stream1 = unsafe { Pin::new_unchecked(stream1) };
- let stream2 = unsafe { Pin::new_unchecked(stream2) };
+ #[project]
+ let Select { flag, stream1, stream2 } = self.project();
if !*flag {
poll_inner(flag, stream1, stream2, cx)
diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs
index d257689..2547993 100644
--- a/src/stream/select_all.rs
+++ b/src/stream/select_all.rs
@@ -131,3 +131,11 @@ impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> {
select_all(iter)
}
}
+
+impl<St: Stream + Unpin> Extend<St> for SelectAll<St> {
+ fn extend<T: IntoIterator<Item = St>>(&mut self, iter: T) {
+ for st in iter {
+ self.push(st)
+ }
+ }
+}
diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs
index bea6e5b..a822576 100644
--- a/src/stream/stream/buffer_unordered.rs
+++ b/src/stream/stream/buffer_unordered.rs
@@ -4,27 +4,24 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
use core::fmt;
use core::pin::Pin;
/// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
/// method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct BufferUnordered<St>
where
St: Stream,
{
+ #[pin]
stream: Fuse<St>,
in_progress_queue: FuturesUnordered<St::Item>,
max: usize,
}
-impl<St> Unpin for BufferUnordered<St>
-where
- St: Stream + Unpin,
-{}
-
impl<St> fmt::Debug for BufferUnordered<St>
where
St: Stream + fmt::Debug,
@@ -43,9 +40,6 @@ where
St: Stream,
St::Item: Future,
{
- unsafe_pinned!(stream: Fuse<St>);
- unsafe_unpinned!(in_progress_queue: FuturesUnordered<St::Item>);
-
pub(super) fn new(stream: St, n: usize) -> BufferUnordered<St>
where
St: Stream,
@@ -58,37 +52,7 @@ where
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- self.stream.get_ref()
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- self.stream.get_mut()
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream().get_pin_mut()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream.into_inner()
- }
+ delegate_access_inner!(stream, St, (.));
}
impl<St> Stream for BufferUnordered<St>
@@ -98,27 +62,31 @@ where
{
type Item = <St::Item as Future>::Output;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ #[project]
+ let BufferUnordered { mut stream, in_progress_queue, max } = self.project();
+
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
- while self.in_progress_queue.len() < self.max {
- match self.as_mut().stream().poll_next(cx) {
- Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut),
+ while in_progress_queue.len() < *max {
+ match stream.as_mut().poll_next(cx) {
+ Poll::Ready(Some(fut)) => in_progress_queue.push(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
// Attempt to pull the next value from the in_progress_queue
- match self.as_mut().in_progress_queue().poll_next_unpin(cx) {
+ match in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}
// If more values are still coming from the stream, we're not done yet
- if self.stream.is_done() {
+ if stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs
index 2445a85..9dff01f 100644
--- a/src/stream/stream/buffered.rs
+++ b/src/stream/stream/buffered.rs
@@ -4,28 +4,24 @@ use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
use core::fmt;
use core::pin::Pin;
/// Stream for the [`buffered`](super::StreamExt::buffered) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct Buffered<St>
where
St: Stream,
St::Item: Future,
{
+ #[pin]
stream: Fuse<St>,
in_progress_queue: FuturesOrdered<St::Item>,
max: usize,
}
-impl<St> Unpin for Buffered<St>
-where
- St: Stream + Unpin,
- St::Item: Future,
-{}
-
impl<St> fmt::Debug for Buffered<St>
where
St: Stream + fmt::Debug,
@@ -45,9 +41,6 @@ where
St: Stream,
St::Item: Future,
{
- unsafe_pinned!(stream: Fuse<St>);
- unsafe_unpinned!(in_progress_queue: FuturesOrdered<St::Item>);
-
pub(super) fn new(stream: St, n: usize) -> Buffered<St> {
Buffered {
stream: super::Fuse::new(stream),
@@ -56,37 +49,7 @@ where
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- self.stream.get_ref()
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- self.stream.get_mut()
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream().get_pin_mut()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream.into_inner()
- }
+ delegate_access_inner!(stream, St, (.));
}
impl<St> Stream for Buffered<St>
@@ -96,27 +59,31 @@ where
{
type Item = <St::Item as Future>::Output;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- // Try to spawn off as many futures as possible by filling up
- // our in_progress_queue of futures.
- while self.in_progress_queue.len() < self.max {
- match self.as_mut().stream().poll_next(cx) {
- Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut),
+ #[project]
+ let Buffered { mut stream, in_progress_queue, max } = self.project();
+
+ // First up, try to spawn off as many futures as possible by filling up
+ // our queue of futures.
+ while in_progress_queue.len() < *max {
+ match stream.as_mut().poll_next(cx) {
+ Poll::Ready(Some(fut)) => in_progress_queue.push(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
// Attempt to pull the next value from the in_progress_queue
- let res = self.as_mut().in_progress_queue().poll_next_unpin(cx);
+ let res = in_progress_queue.poll_next_unpin(cx);
if let Some(val) = ready!(res) {
return Poll::Ready(Some(val))
}
// If more values are still coming from the stream, we're not done yet
- if self.stream.is_done() {
+ if stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
diff --git a/src/stream/stream/catch_unwind.rs b/src/stream/stream/catch_unwind.rs
index 8d2dcf7..1bb43b2 100644
--- a/src/stream/stream/catch_unwind.rs
+++ b/src/stream/stream/catch_unwind.rs
@@ -1,45 +1,50 @@
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
use std::any::Any;
use std::pin::Pin;
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
/// Stream for the [`catch_unwind`](super::StreamExt::catch_unwind) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct CatchUnwind<St> {
+ #[pin]
stream: St,
caught_unwind: bool,
}
impl<St: Stream + UnwindSafe> CatchUnwind<St> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(caught_unwind: bool);
-
pub(super) fn new(stream: St) -> CatchUnwind<St> {
CatchUnwind { stream, caught_unwind: false }
}
+
+ delegate_access_inner!(stream, St, ());
}
impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> {
type Item = Result<St::Item, Box<dyn Any + Send>>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.caught_unwind {
+ #[project]
+ let CatchUnwind { stream, caught_unwind } = self.project();
+
+ if *caught_unwind {
Poll::Ready(None)
} else {
let res = catch_unwind(AssertUnwindSafe(|| {
- self.as_mut().stream().poll_next(cx)
+ stream.poll_next(cx)
}));
match res {
Ok(poll) => poll.map(|opt| opt.map(Ok)),
Err(e) => {
- *self.as_mut().caught_unwind() = true;
+ *caught_unwind = true;
Poll::Ready(Some(Err(e)))
},
}
diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs
index b2ada69..720903c 100644
--- a/src/stream/stream/chain.rs
+++ b/src/stream/stream/chain.rs
@@ -1,13 +1,16 @@
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_utils::unsafe_pinned;
+use pin_project::{pin_project, project};
/// Stream for the [`chain`](super::StreamExt::chain) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Chain<St1, St2> {
+ #[pin]
first: Option<St1>,
+ #[pin]
second: St2,
}
@@ -16,9 +19,6 @@ impl<St1, St2> Chain<St1, St2>
where St1: Stream,
St2: Stream<Item = St1::Item>,
{
- unsafe_pinned!(first: Option<St1>);
- unsafe_pinned!(second: St2);
-
pub(super) fn new(stream1: St1, stream2: St2) -> Chain<St1, St2> {
Chain {
first: Some(stream1),
@@ -42,17 +42,20 @@ where St1: Stream,
{
type Item = St1::Item;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if let Some(first) = self.as_mut().first().as_pin_mut() {
+ #[project]
+ let Chain { mut first, second } = self.project();
+ if let Some(first) = first.as_mut().as_pin_mut() {
if let Some(item) = ready!(first.poll_next(cx)) {
return Poll::Ready(Some(item))
}
}
- self.as_mut().first().set(None);
- self.as_mut().second().poll_next(cx)
+ first.set(None);
+ second.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs
index b42d1d1..d24c31c 100644
--- a/src/stream/stream/chunks.rs
+++ b/src/stream/stream/chunks.rs
@@ -3,26 +3,23 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
use core::mem;
use core::pin::Pin;
use alloc::vec::Vec;
/// Stream for the [`chunks`](super::StreamExt::chunks) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Chunks<St: Stream> {
+ #[pin]
stream: Fuse<St>,
items: Vec<St::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
-impl<St: Unpin + Stream> Unpin for Chunks<St> {}
-
impl<St: Stream> Chunks<St> where St: Stream {
- unsafe_unpinned!(items: Vec<St::Item>);
- unsafe_pinned!(stream: Fuse<St>);
-
pub(super) fn new(stream: St, capacity: usize) -> Chunks<St> {
assert!(capacity > 0);
@@ -33,70 +30,43 @@ impl<St: Stream> Chunks<St> where St: Stream {
}
}
- fn take(mut self: Pin<&mut Self>) -> Vec<St::Item> {
+ fn take(self: Pin<&mut Self>) -> Vec<St::Item> {
let cap = self.cap;
- mem::replace(self.as_mut().items(), Vec::with_capacity(cap))
+ mem::replace(self.project().items, Vec::with_capacity(cap))
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- self.stream.get_ref()
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- self.stream.get_mut()
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream().get_pin_mut()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream.into_inner()
- }
+ delegate_access_inner!(stream, St, (.));
}
impl<St: Stream> Stream for Chunks<St> {
type Item = Vec<St::Item>;
+ #[project]
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ #[project]
+ let Chunks { mut stream, items, cap } = self.as_mut().project();
loop {
- match ready!(self.as_mut().stream().poll_next(cx)) {
+ match ready!(stream.as_mut().poll_next(cx)) {
// Push the item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Some(item) => {
- self.as_mut().items().push(item);
- if self.items.len() >= self.cap {
- return Poll::Ready(Some(self.as_mut().take()))
+ items.push(item);
+ if items.len() >= *cap {
+ return Poll::Ready(Some(self.take()))
}
}
// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
None => {
- let last = if self.items.is_empty() {
+ let last = if items.is_empty() {
None
} else {
- let full_buf = mem::replace(self.as_mut().items(), Vec::new());
+ let full_buf = mem::replace(items, Vec::new());
Some(full_buf)
};
diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs
index 127a3f7..349e42d 100644
--- a/src/stream/stream/collect.rs
+++ b/src/stream/stream/collect.rs
@@ -3,24 +3,21 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`collect`](super::StreamExt::collect) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Collect<St, C> {
+ #[pin]
stream: St,
collection: C,
}
-impl<St: Unpin, C> Unpin for Collect<St, C> {}
-
impl<St: Stream, C: Default> Collect<St, C> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(collection: C);
-
- fn finish(mut self: Pin<&mut Self>) -> C {
- mem::replace(self.as_mut().collection(), Default::default())
+ fn finish(self: Pin<&mut Self>) -> C {
+ mem::replace(self.project().collection, Default::default())
}
pub(super) fn new(stream: St) -> Collect<St, C> {
@@ -46,11 +43,14 @@ where St: Stream,
{
type Output = C;
+ #[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
+ #[project]
+ let Collect { mut stream, collection } = self.as_mut().project();
loop {
- match ready!(self.as_mut().stream().poll_next(cx)) {
- Some(e) => self.as_mut().collection().extend(Some(e)),
- None => return Poll::Ready(self.as_mut().finish()),
+ match ready!(stream.as_mut().poll_next(cx)) {
+ Some(e) => collection.extend(Some(e)),
+ None => return Poll::Ready(self.finish()),
}
}
}
diff --git a/src/stream/stream/concat.rs b/src/stream/stream/concat.rs
index 704efc7..647632b 100644
--- a/src/stream/stream/concat.rs
+++ b/src/stream/stream/concat.rs
@@ -2,26 +2,23 @@ use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`concat`](super::StreamExt::concat) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Concat<St: Stream> {
+ #[pin]
stream: St,
accum: Option<St::Item>,
}
-impl<St: Stream + Unpin> Unpin for Concat<St> {}
-
impl<St> Concat<St>
where St: Stream,
St::Item: Extend<<St::Item as IntoIterator>::Item> +
IntoIterator + Default,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(accum: Option<St::Item>);
-
pub(super) fn new(stream: St) -> Concat<St> {
Concat {
stream,
@@ -37,16 +34,19 @@ where St: Stream,
{
type Output = St::Item;
+ #[project]
fn poll(
- mut self: Pin<&mut Self>, cx: &mut Context<'_>
+ self: Pin<&mut Self>, cx: &mut Context<'_>
) -> Poll<Self::Output> {
+ #[project]
+ let Concat { mut stream, accum } = self.project();
+
loop {
- match ready!(self.as_mut().stream().poll_next(cx)) {
+ match ready!(stream.as_mut().poll_next(cx)) {
None => {
- return Poll::Ready(self.as_mut().accum().take().unwrap_or_default())
+ return Poll::Ready(accum.take().unwrap_or_default())
}
Some(e) => {
- let accum = self.as_mut().accum();
if let Some(a) = accum {
a.extend(e)
} else {
diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs
index 6366c8b..477a052 100644
--- a/src/stream/stream/enumerate.rs
+++ b/src/stream/stream/enumerate.rs
@@ -3,22 +3,19 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`enumerate`](super::StreamExt::enumerate) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Enumerate<St> {
+ #[pin]
stream: St,
count: usize,
}
-impl<St: Unpin> Unpin for Enumerate<St> {}
-
impl<St: Stream> Enumerate<St> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(count: usize);
-
pub(super) fn new(stream: St) -> Enumerate<St> {
Enumerate {
stream,
@@ -26,37 +23,7 @@ impl<St: Stream> Enumerate<St> {
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St: Stream + FusedStream> FusedStream for Enumerate<St> {
@@ -68,15 +35,19 @@ impl<St: Stream + FusedStream> FusedStream for Enumerate<St> {
impl<St: Stream> Stream for Enumerate<St> {
type Item = (usize, St::Item);
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- match ready!(self.as_mut().stream().poll_next(cx)) {
+ #[project]
+ let Enumerate { stream, count } = self.project();
+
+ match ready!(stream.poll_next(cx)) {
Some(item) => {
- let count = self.count;
- *self.as_mut().count() += 1;
- Poll::Ready(Some((count, item)))
+ let prev_count = *count;
+ *count += 1;
+ Poll::Ready(Some((prev_count, item)))
}
None => Poll::Ready(None),
}
diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs
index 06335f1..9d848ad 100644
--- a/src/stream/stream/filter.rs
+++ b/src/stream/stream/filter.rs
@@ -5,25 +5,23 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
+use crate::fns::FnMut1;
/// Stream for the [`filter`](super::StreamExt::filter) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct Filter<St, Fut, F>
where St: Stream,
{
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Item>,
}
-impl<St, Fut, F> Unpin for Filter<St, Fut, F>
-where
- St: Stream + Unpin,
- Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for Filter<St, Fut, F>
where
St: Stream + fmt::Debug,
@@ -41,14 +39,9 @@ where
impl<St, Fut, F> Filter<St, Fut, F>
where St: Stream,
- F: FnMut(&St::Item) -> Fut,
+ F: for<'a> FnMut1<&'a St::Item, Output=Fut>,
Fut: Future<Output = bool>,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending_fut: Option<Fut>);
- unsafe_unpinned!(pending_item: Option<St::Item>);
-
pub(super) fn new(stream: St, f: F) -> Filter<St, Fut, F> {
Filter {
stream,
@@ -58,37 +51,7 @@ where St: Stream,
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> FusedStream for Filter<St, Fut, F>
@@ -103,34 +66,33 @@ impl<St, Fut, F> FusedStream for Filter<St, Fut, F>
impl<St, Fut, F> Stream for Filter<St, Fut, F>
where St: Stream,
- F: FnMut(&St::Item) -> Fut,
+ F: for<'a> FnMut1<&'a St::Item, Output=Fut>,
Fut: Future<Output = bool>,
{
type Item = St::Item;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St::Item>> {
- loop {
- if self.pending_fut.is_none() {
- let item = match ready!(self.as_mut().stream().poll_next(cx)) {
- Some(e) => e,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(&item);
- self.as_mut().pending_fut().set(Some(fut));
- *self.as_mut().pending_item() = Some(item);
+ #[project]
+ let Filter { mut stream, f, mut pending_fut, pending_item } = self.project();
+ Poll::Ready(loop {
+ if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ let res = ready!(fut.poll(cx));
+ pending_fut.set(None);
+ if res {
+ break pending_item.take();
+ }
+ *pending_item = None;
+ } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+ pending_fut.set(Some(f.call_mut(&item)));
+ *pending_item = Some(item);
+ } else {
+ break None;
}
-
- let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx));
- self.as_mut().pending_fut().set(None);
- let item = self.as_mut().pending_item().take().unwrap();
-
- if yield_item {
- return Poll::Ready(Some(item));
- }
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs
index 532e6ca..2d098ee 100644
--- a/src/stream/stream/filter_map.rs
+++ b/src/stream/stream/filter_map.rs
@@ -5,22 +5,20 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
+use crate::fns::FnMut1;
/// Stream for the [`filter_map`](super::StreamExt::filter_map) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct FilterMap<St, Fut, F> {
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending: Option<Fut>,
}
-impl<St, Fut, F> Unpin for FilterMap<St, Fut, F>
-where
- St: Unpin,
- Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for FilterMap<St, Fut, F>
where
St: fmt::Debug,
@@ -39,50 +37,16 @@ impl<St, Fut, F> FilterMap<St, Fut, F>
F: FnMut(St::Item) -> Fut,
Fut: Future,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending: Option<Fut>);
-
pub(super) fn new(stream: St, f: F) -> FilterMap<St, Fut, F> {
FilterMap { stream, f, pending: None }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F, T> FusedStream for FilterMap<St, Fut, F>
where St: Stream + FusedStream,
- F: FnMut(St::Item) -> Fut,
+ F: FnMut1<St::Item, Output=Fut>,
Fut: Future<Output = Option<T>>,
{
fn is_terminated(&self) -> bool {
@@ -92,31 +56,34 @@ impl<St, Fut, F, T> FusedStream for FilterMap<St, Fut, F>
impl<St, Fut, F, T> Stream for FilterMap<St, Fut, F>
where St: Stream,
- F: FnMut(St::Item) -> Fut,
+ F: FnMut1<St::Item, Output=Fut>,
Fut: Future<Output = Option<T>>,
{
type Item = T;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<T>> {
- loop {
- if self.pending.is_none() {
- let item = match ready!(self.as_mut().stream().poll_next(cx)) {
- Some(e) => e,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(item);
- self.as_mut().pending().set(Some(fut));
- }
-
- let item = ready!(self.as_mut().pending().as_pin_mut().unwrap().poll(cx));
- self.as_mut().pending().set(None);
- if item.is_some() {
- return Poll::Ready(item);
+ #[project]
+ let FilterMap { mut stream, f, mut pending } = self.project();
+ Poll::Ready(loop {
+ if let Some(p) = pending.as_mut().as_pin_mut() {
+ // We have an item in progress, poll that until it's done
+ let item = ready!(p.poll(cx));
+ pending.set(None);
+ if item.is_some() {
+ break item;
+ }
+ } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+ // No item in progress, but the stream is still going
+ pending.set(Some(f.call_mut(item)));
+ } else {
+ // The stream is done
+ break None;
}
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
@@ -134,7 +101,7 @@ impl<St, Fut, F, T> Stream for FilterMap<St, Fut, F>
#[cfg(feature = "sink")]
impl<S, Fut, F, Item> Sink<Item> for FilterMap<S, Fut, F>
where S: Stream + Sink<Item>,
- F: FnMut(S::Item) -> Fut,
+ F: FnMut1<S::Item, Output=Fut>,
Fut: Future,
{
type Error = S::Error;
diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs
index b19ffc0..4db77e1 100644
--- a/src/stream/stream/flatten.rs
+++ b/src/stream/stream/flatten.rs
@@ -3,77 +3,28 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::unsafe_pinned;
+use pin_project::{pin_project, project};
/// Stream for the [`flatten`](super::StreamExt::flatten) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
-pub struct Flatten<St>
-where
- St: Stream,
-{
+pub struct Flatten<St, U> {
+ #[pin]
stream: St,
- next: Option<St::Item>,
-}
-
-impl<St> Unpin for Flatten<St>
-where
- St: Stream + Unpin,
- St::Item: Unpin,
-{
+ #[pin]
+ next: Option<U>,
}
-impl<St> Flatten<St>
-where
- St: Stream,
-{
- unsafe_pinned!(stream: St);
- unsafe_pinned!(next: Option<St::Item>);
-}
-
-impl<St> Flatten<St>
-where
- St: Stream,
- St::Item: Stream,
-{
+impl<St, U> Flatten<St, U> {
pub(super) fn new(stream: St) -> Self {
Self { stream, next: None }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
-impl<St> FusedStream for Flatten<St>
+impl<St> FusedStream for Flatten<St, St::Item>
where
St: FusedStream,
St::Item: Stream,
@@ -83,34 +34,36 @@ where
}
}
-impl<St> Stream for Flatten<St>
+impl<St> Stream for Flatten<St, St::Item>
where
St: Stream,
St::Item: Stream,
{
type Item = <St::Item as Stream>::Item;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- loop {
- if self.next.is_none() {
- match ready!(self.as_mut().stream().poll_next(cx)) {
- Some(e) => self.as_mut().next().set(Some(e)),
- None => return Poll::Ready(None),
+ #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ #[project]
+ let Flatten { mut stream, mut next } = self.project();
+ Poll::Ready(loop {
+ if let Some(s) = next.as_mut().as_pin_mut() {
+ if let Some(item) = ready!(s.poll_next(cx)) {
+ break Some(item);
+ } else {
+ next.set(None);
}
- }
-
- if let Some(item) = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)) {
- return Poll::Ready(Some(item));
+ } else if let Some(s) = ready!(stream.as_mut().poll_next(cx)) {
+ next.set(Some(s));
} else {
- self.as_mut().next().set(None);
+ break None;
}
- }
+ })
}
}
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
-impl<S, Item> Sink<Item> for Flatten<S>
+impl<S, Item> Sink<Item> for Flatten<S, S::Item>
where
S: Stream + Sink<Item>,
{
diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs
index e92a72e..d4bec25 100644
--- a/src/stream/stream/fold.rs
+++ b/src/stream/stream/fold.rs
@@ -3,19 +3,20 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`fold`](super::StreamExt::fold) method.
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Fold<St, Fut, T, F> {
+ #[pin]
stream: St,
f: F,
accum: Option<T>,
+ #[pin]
future: Option<Fut>,
}
-impl<St: Unpin, Fut: Unpin, T, F> Unpin for Fold<St, Fut, T, F> {}
-
impl<St, Fut, T, F> fmt::Debug for Fold<St, Fut, T, F>
where
St: fmt::Debug,
@@ -36,11 +37,6 @@ where St: Stream,
F: FnMut(T, St::Item) -> Fut,
Fut: Future<Output = T>,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_unpinned!(accum: Option<T>);
- unsafe_pinned!(future: Option<Fut>);
-
pub(super) fn new(stream: St, f: F, t: T) -> Fold<St, Fut, T, F> {
Fold {
stream,
@@ -68,25 +64,27 @@ impl<St, Fut, T, F> Future for Fold<St, Fut, T, F>
{
type Output = T;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
- loop {
- // we're currently processing a future to produce a new accum value
- if self.accum.is_none() {
- let accum = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
- *self.as_mut().accum() = Some(accum);
- self.as_mut().future().set(None);
- }
-
- let item = ready!(self.as_mut().stream().poll_next(cx));
- let accum = self.as_mut().accum().take()
- .expect("Fold polled after completion");
-
- if let Some(e) = item {
- let future = (self.as_mut().f())(accum, e);
- self.as_mut().future().set(Some(future));
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
+ #[project]
+ let Fold { mut stream, f, accum, mut future } = self.project();
+ Poll::Ready(loop {
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ // we're currently processing a future to produce a new accum value
+ *accum = Some(ready!(fut.poll(cx)));
+ future.set(None);
+ } else if accum.is_some() {
+ // we're waiting on a new item from the stream
+ let res = ready!(stream.as_mut().poll_next(cx));
+ let a = accum.take().unwrap();
+ if let Some(item) = res {
+ future.set(Some(f(a, item)));
+ } else {
+ break a;
+ }
} else {
- return Poll::Ready(accum)
+ panic!("Fold polled after completion")
}
- }
+ })
}
}
diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs
index f8adcb2..fb3f40f 100644
--- a/src/stream/stream/for_each.rs
+++ b/src/stream/stream/for_each.rs
@@ -3,22 +3,19 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`for_each`](super::StreamExt::for_each) method.
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ForEach<St, Fut, F> {
+ #[pin]
stream: St,
f: F,
+ #[pin]
future: Option<Fut>,
}
-impl<St, Fut, F> Unpin for ForEach<St, Fut, F>
-where
- St: Unpin,
- Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for ForEach<St, Fut, F>
where
St: fmt::Debug,
@@ -37,10 +34,6 @@ where St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = ()>,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(future: Option<Fut>);
-
pub(super) fn new(stream: St, f: F) -> ForEach<St, Fut, F> {
ForEach {
stream,
@@ -67,22 +60,20 @@ impl<St, Fut, F> Future for ForEach<St, Fut, F>
{
type Output = ();
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ #[project]
+ let ForEach { mut stream, f, mut future } = self.project();
loop {
- if let Some(future) = self.as_mut().future().as_pin_mut() {
- ready!(future.poll(cx));
- }
- self.as_mut().future().set(None);
-
- match ready!(self.as_mut().stream().poll_next(cx)) {
- Some(e) => {
- let future = (self.as_mut().f())(e);
- self.as_mut().future().set(Some(future));
- }
- None => {
- return Poll::Ready(());
- }
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ ready!(fut.poll(cx));
+ future.set(None);
+ } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+ future.set(Some(f(item)));
+ } else {
+ break;
}
}
+ Poll::Ready(())
}
}
diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs
index 18ca4bd..88ff2d3 100644
--- a/src/stream/stream/for_each_concurrent.rs
+++ b/src/stream/stream/for_each_concurrent.rs
@@ -5,23 +5,20 @@ use core::num::NonZeroUsize;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent)
/// method.
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ForEachConcurrent<St, Fut, F> {
+ #[pin]
stream: Option<St>,
f: F,
futures: FuturesUnordered<Fut>,
limit: Option<NonZeroUsize>,
}
-impl<St, Fut, F> Unpin for ForEachConcurrent<St, Fut, F>
-where St: Unpin,
- Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F>
where
St: fmt::Debug,
@@ -41,11 +38,6 @@ where St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = ()>,
{
- unsafe_pinned!(stream: Option<St>);
- unsafe_unpinned!(f: F);
- unsafe_unpinned!(futures: FuturesUnordered<Fut>);
- unsafe_unpinned!(limit: Option<NonZeroUsize>);
-
pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> ForEachConcurrent<St, Fut, F> {
ForEachConcurrent {
stream: Some(stream),
@@ -74,16 +66,17 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
{
type Output = ();
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ #[project]
+ let ForEachConcurrent { mut stream, f, futures, limit } = self.project();
loop {
let mut made_progress_this_iter = false;
- // Try and pull an item from the stream
- let current_len = self.futures.len();
// Check if we've already created a number of futures greater than `limit`
- if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) {
+ if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) {
let mut stream_completed = false;
- let elem = if let Some(stream) = self.as_mut().stream().as_pin_mut() {
+ let elem = if let Some(stream) = stream.as_mut().as_pin_mut() {
match stream.poll_next(cx) {
Poll::Ready(Some(elem)) => {
made_progress_this_iter = true;
@@ -99,18 +92,17 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
None
};
if stream_completed {
- self.as_mut().stream().set(None);
+ stream.set(None);
}
if let Some(elem) = elem {
- let next_future = (self.as_mut().f())(elem);
- self.as_mut().futures().push(next_future);
+ futures.push(f(elem));
}
}
- match self.as_mut().futures().poll_next_unpin(cx) {
+ match futures.poll_next_unpin(cx) {
Poll::Ready(Some(())) => made_progress_this_iter = true,
Poll::Ready(None) => {
- if self.stream.is_none() {
+ if stream.is_none() {
return Poll::Ready(())
}
},
diff --git a/src/stream/stream/forward.rs b/src/stream/stream/forward.rs
index fd89625..9776056 100644
--- a/src/stream/stream/forward.rs
+++ b/src/stream/stream/forward.rs
@@ -1,59 +1,34 @@
-use crate::stream::{StreamExt, Fuse};
+use crate::stream::Fuse;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
-use futures_core::stream::{Stream, TryStream};
+use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-const INVALID_POLL: &str = "polled `Forward` after completion";
+use pin_project::{pin_project, project};
/// Future for the [`forward`](super::StreamExt::forward) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct Forward<St: TryStream, Si> {
+pub struct Forward<St, Si, Item> {
+ #[pin]
sink: Option<Si>,
+ #[pin]
stream: Fuse<St>,
- buffered_item: Option<St::Ok>,
+ buffered_item: Option<Item>,
}
-impl<St: TryStream + Unpin, Si: Unpin> Unpin for Forward<St, Si> {}
-
-impl<St, Si, E> Forward<St, Si>
-where
- Si: Sink<St::Ok, Error = E>,
- St: TryStream<Error = E> + Stream,
-{
- unsafe_pinned!(sink: Option<Si>);
- unsafe_pinned!(stream: Fuse<St>);
- unsafe_unpinned!(buffered_item: Option<St::Ok>);
-
- pub(super) fn new(stream: St, sink: Si) -> Self {
+impl<St, Si, Item> Forward<St, Si, Item> {
+ pub(crate) fn new(stream: St, sink: Si) -> Self {
Forward {
sink: Some(sink),
- stream: stream.fuse(),
- buffered_item: None,
+ stream: Fuse::new(stream),
+ buffered_item: None,
}
}
-
- fn try_start_send(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- item: St::Ok,
- ) -> Poll<Result<(), E>> {
- debug_assert!(self.buffered_item.is_none());
- {
- let mut sink = self.as_mut().sink().as_pin_mut().unwrap();
- if sink.as_mut().poll_ready(cx)?.is_ready() {
- return Poll::Ready(sink.start_send(item));
- }
- }
- *self.as_mut().buffered_item() = Some(item);
- Poll::Pending
- }
}
-impl<St, Si, Item, E> FusedFuture for Forward<St, Si>
+impl<St, Si, Item, E> FusedFuture for Forward<St, Si, Item>
where
Si: Sink<Item, Error = E>,
St: Stream<Item = Result<Item, E>>,
@@ -63,34 +38,41 @@ where
}
}
-impl<St, Si, Item, E> Future for Forward<St, Si>
+impl<St, Si, Item, E> Future for Forward<St, Si, Item>
where
Si: Sink<Item, Error = E>,
St: Stream<Item = Result<Item, E>>,
{
type Output = Result<(), E>;
+ #[project]
fn poll(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
- // If we've got an item buffered already, we need to write it to the
- // sink before we can do anything else
- if let Some(item) = self.as_mut().buffered_item().take() {
- ready!(self.as_mut().try_start_send(cx, item))?;
- }
+ #[project]
+ let Forward { mut sink, mut stream, buffered_item } = self.project();
+ let mut si = sink.as_mut().as_pin_mut().expect("polled `Forward` after completion");
loop {
- match self.as_mut().stream().poll_next(cx)? {
- Poll::Ready(Some(item)) =>
- ready!(self.as_mut().try_start_send(cx, item))?,
+ // If we've got an item buffered already, we need to write it to the
+ // sink before we can do anything else
+ if buffered_item.is_some() {
+ ready!(si.as_mut().poll_ready(cx))?;
+ si.as_mut().start_send(buffered_item.take().unwrap())?;
+ }
+
+ match stream.as_mut().poll_next(cx)? {
+ Poll::Ready(Some(item)) => {
+ *buffered_item = Some(item);
+ }
Poll::Ready(None) => {
- ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL).poll_close(cx))?;
- self.as_mut().sink().set(None);
+ ready!(si.poll_close(cx))?;
+ sink.set(None);
return Poll::Ready(Ok(()))
}
Poll::Pending => {
- ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL).poll_flush(cx))?;
+ ready!(si.poll_flush(cx))?;
return Poll::Pending
}
}
diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs
index 9085dc5..971fe60 100644
--- a/src/stream/stream/fuse.rs
+++ b/src/stream/stream/fuse.rs
@@ -3,22 +3,19 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`fuse`](super::StreamExt::fuse) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Fuse<St> {
+ #[pin]
stream: St,
done: bool,
}
-impl<St: Unpin> Unpin for Fuse<St> {}
-
impl<St> Fuse<St> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(done: bool);
-
pub(super) fn new(stream: St) -> Fuse<St> {
Fuse { stream, done: false }
}
@@ -32,37 +29,7 @@ impl<St> Fuse<St> {
self.done
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<S: Stream> FusedStream for Fuse<S> {
@@ -74,17 +41,21 @@ impl<S: Stream> FusedStream for Fuse<S> {
impl<S: Stream> Stream for Fuse<S> {
type Item = S::Item;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<S::Item>> {
- if self.done {
+ #[project]
+ let Fuse { stream, done } = self.project();
+
+ if *done {
return Poll::Ready(None);
}
- let item = ready!(self.as_mut().stream().poll_next(cx));
+ let item = ready!(stream.poll_next(cx));
if item.is_none() {
- *self.as_mut().done() = true;
+ *done = true;
}
Poll::Ready(item)
}
diff --git a/src/stream/stream/inspect.rs b/src/stream/stream/inspect.rs
deleted file mode 100644
index e34970a..0000000
--- a/src/stream/stream/inspect.rs
+++ /dev/null
@@ -1,119 +0,0 @@
-use core::fmt;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-/// Stream for the [`inspect`](super::StreamExt::inspect) method.
-#[must_use = "streams do nothing unless polled"]
-pub struct Inspect<St, F> {
- stream: St,
- f: F,
-}
-
-impl<St: Unpin, F> Unpin for Inspect<St, F> {}
-
-impl<St, F> fmt::Debug for Inspect<St, F>
-where
- St: fmt::Debug,
-{
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Inspect")
- .field("stream", &self.stream)
- .finish()
- }
-}
-
-impl<St, F> Inspect<St, F>
- where St: Stream,
- F: FnMut(&St::Item),
-{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-
- pub(super) fn new(stream: St, f: F) -> Inspect<St, F> {
- Inspect { stream, f }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, F> FusedStream for Inspect<St, F>
- where St: FusedStream,
- F: FnMut(&St::Item),
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-// used by `TryStreamExt::{inspect_ok, inspect_err}`
-#[inline]
-pub(crate) fn inspect<T, F: FnMut(&T)>(x: T, mut f: F) -> T {
- f(&x);
- x
-}
-
-impl<St, F> Stream for Inspect<St, F>
- where St: Stream,
- F: FnMut(&St::Item),
-{
- type Item = St::Item;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<St::Item>> {
- self.as_mut()
- .stream()
- .poll_next(cx)
- .map(|opt| opt.map(|e| inspect(e, self.as_mut().f())))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, F, Item> Sink<Item> for Inspect<S, F>
- where S: Stream + Sink<Item>,
- F: FnMut(&S::Item),
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/stream/into_future.rs b/src/stream/stream/into_future.rs
index abae98c..0d49384 100644
--- a/src/stream/stream/into_future.rs
+++ b/src/stream/stream/into_future.rs
@@ -3,7 +3,6 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_utils::unsafe_pinned;
/// Future for the [`into_future`](super::StreamExt::into_future) method.
#[derive(Debug)]
@@ -12,11 +11,7 @@ pub struct StreamFuture<St> {
stream: Option<St>,
}
-impl<St: Unpin> Unpin for StreamFuture<St> {}
-
impl<St: Stream + Unpin> StreamFuture<St> {
- unsafe_pinned!(stream: Option<St>);
-
pub(super) fn new(stream: St) -> StreamFuture<St> {
StreamFuture { stream: Some(stream) }
}
@@ -57,7 +52,7 @@ impl<St: Stream + Unpin> StreamFuture<St> {
/// in order to return it to the caller of `Future::poll` if the stream yielded
/// an element.
pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> {
- self.stream().as_pin_mut()
+ Pin::get_mut(self).stream.as_mut().map(Pin::new)
}
/// Consumes this combinator, returning the underlying stream.
diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs
index 8119434..755f53a 100644
--- a/src/stream/stream/map.rs
+++ b/src/stream/stream/map.rs
@@ -4,17 +4,19 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
+
+use crate::fns::FnMut1;
/// Stream for the [`map`](super::StreamExt::map) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct Map<St, F> {
+ #[pin]
stream: St,
f: F,
}
-impl<St: Unpin, F> Unpin for Map<St, F> {}
-
impl<St, F> fmt::Debug for Map<St, F>
where
St: fmt::Debug,
@@ -26,73 +28,38 @@ where
}
}
-impl<St, T, F> Map<St, F>
- where St: Stream,
- F: FnMut(St::Item) -> T,
-{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-
- pub(super) fn new(stream: St, f: F) -> Map<St, F> {
+impl<St, F> Map<St, F> {
+ pub(crate) fn new(stream: St, f: F) -> Map<St, F> {
Map { stream, f }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
-impl<St, F, T> FusedStream for Map<St, F>
+impl<St, F> FusedStream for Map<St, F>
where St: FusedStream,
- F: FnMut(St::Item) -> T,
+ F: FnMut1<St::Item>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}
-impl<St, F, T> Stream for Map<St, F>
+impl<St, F> Stream for Map<St, F>
where St: Stream,
- F: FnMut(St::Item) -> T,
+ F: FnMut1<St::Item>,
{
- type Item = T;
+ type Item = F::Output;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
- ) -> Poll<Option<T>> {
- self.as_mut()
- .stream()
- .poll_next(cx)
- .map(|opt| opt.map(|x| self.as_mut().f()(x)))
+ ) -> Poll<Option<Self::Item>> {
+ #[project]
+ let Map { stream, f } = self.project();
+ let res = ready!(stream.poll_next(cx));
+ Poll::Ready(res.map(|x| f.call_mut(x)))
}
fn size_hint(&self) -> (usize, Option<usize>) {
@@ -102,11 +69,11 @@ impl<St, F, T> Stream for Map<St, F>
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
-impl<S, F, T, Item> Sink<Item> for Map<S, F>
- where S: Stream + Sink<Item>,
- F: FnMut(S::Item) -> T,
+impl<St, F, Item> Sink<Item> for Map<St, F>
+ where St: Stream + Sink<Item>,
+ F: FnMut1<St::Item>,
{
- type Error = S::Error;
+ type Error = St::Error;
delegate_sink!(stream, Item);
}
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index da5ade8..359bb2f 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -19,6 +19,8 @@ use futures_core::{
#[cfg(feature = "sink")]
use futures_sink::Sink;
+use crate::fns::{InspectFn, inspect_fn};
+
mod chain;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::chain::Chain;
@@ -44,8 +46,14 @@ mod filter_map;
pub use self::filter_map::FilterMap;
mod flatten;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::flatten::Flatten;
+
+delegate_all!(
+ /// Stream for the [`inspect`](StreamExt::inspect) method.
+ Flatten<St>(
+ flatten::Flatten<St, St::Item>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
+ where St: Stream
+);
mod fold;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -53,9 +61,15 @@ pub use self::fold::Fold;
#[cfg(feature = "sink")]
mod forward;
+
#[cfg(feature = "sink")]
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::forward::Forward;
+delegate_all!(
+ /// Future for the [`forward`](super::StreamExt::forward) method.
+ Forward<St, Si>(
+ forward::Forward<St, Si, St::Ok>
+ ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
+ where St: TryStream
+);
mod for_each;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -69,15 +83,24 @@ mod into_future;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::into_future::StreamFuture;
-mod inspect;
-pub(crate) use self::inspect::inspect; // used by `TryStreamExt::{inspect_ok, inspect_err}`
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::inspect::Inspect;
+delegate_all!(
+ /// Stream for the [`inspect`](StreamExt::inspect) method.
+ Inspect<St, F>(
+ map::Map<St, InspectFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))]
+);
mod map;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::map::Map;
+delegate_all!(
+ /// Stream for the [`flat_map`](StreamExt::flat_map) method.
+ FlatMap<St, U, F>(
+ flatten::Flatten<Map<St, F>, U>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
+);
+
mod next;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::next::Next;
@@ -106,6 +129,10 @@ mod take_while;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::take_while::TakeWhile;
+mod take_until;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::take_until::TakeUntil;
+
mod then;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::then::Then;
@@ -120,6 +147,12 @@ mod chunks;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::chunks::Chunks;
+#[cfg(feature = "alloc")]
+mod ready_chunks;
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::ready_chunks::ReadyChunks;
+
mod scan;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::scan::Scan;
@@ -544,10 +577,46 @@ pub trait StreamExt: Stream {
Flatten::new(self)
}
- /// Combinator similar to [`StreamExt::fold`] that holds internal state and produces a new stream.
+ /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
///
- /// Accepts initial state and closure which will be applied to each element of the stream until provided closure
- /// returns `None`. Once `None` is returned, stream will be terminated.
+ /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
+ /// you would have to chain combinators like `.map(f).flatten()` while this
+ /// combinator provides ability to write `.flat_map(f)` instead of chaining.
+ ///
+ /// The provided closure which produce inner streams is executed over all elements
+ /// of stream as last inner stream is terminated and next stream item is available.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `flat_map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=3);
+ /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x]));
+ ///
+ /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await);
+ /// # });
+ /// ```
+ fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
+ where
+ F: FnMut(Self::Item) -> U,
+ U: Stream,
+ Self: Sized,
+ {
+ FlatMap::new(self, f)
+ }
+
+ /// Combinator similar to [`StreamExt::fold`] that holds internal state
+ /// and produces a new stream.
+ ///
+ /// Accepts initial state and closure which will be applied to each element
+ /// of the stream until provided closure returns `None`. Once `None` is
+ /// returned, stream will be terminated.
///
/// # Examples
///
@@ -580,7 +649,7 @@ pub trait StreamExt: Stream {
///
/// This function, like `Iterator::skip_while`, will skip elements on the
/// stream until the predicate `f` resolves to `false`. Once one element
- /// returns false all future elements will be returned from the underlying
+ /// returns `false`, all future elements will be returned from the underlying
/// stream.
///
/// # Examples
@@ -611,7 +680,7 @@ pub trait StreamExt: Stream {
///
/// This function, like `Iterator::take_while`, will take elements from the
/// stream until the predicate `f` resolves to `false`. Once one element
- /// returns false it will always return that the stream is done.
+ /// returns `false`, it will always return that the stream is done.
///
/// # Examples
///
@@ -636,6 +705,50 @@ pub trait StreamExt: Stream {
TakeWhile::new(self, f)
}
+ /// Take elements from this stream until the provided future resolves.
+ ///
+ /// This function will take elements from the stream until the provided
+ /// stopping future `fut` resolves. Once the `fut` future becomes ready,
+ /// this stream combinator will always return that the stream is done.
+ ///
+ /// The stopping future may return any type. Once the stream is stopped
+ /// the result of the stopping future may be aceessed with `TakeUntil::take_result()`.
+ /// The stream may also be resumed with `TakeUntil::take_future()`.
+ /// See the documentation of [`TakeUntil`] for more information.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future;
+ /// use futures::stream::{self, StreamExt};
+ /// use futures::task::Poll;
+ ///
+ /// let stream = stream::iter(1..=10);
+ ///
+ /// let mut i = 0;
+ /// let stop_fut = future::poll_fn(|_cx| {
+ /// i += 1;
+ /// if i <= 5 {
+ /// Poll::Pending
+ /// } else {
+ /// Poll::Ready(())
+ /// }
+ /// });
+ ///
+ /// let stream = stream.take_until(stop_fut);
+ ///
+ /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
+ /// # });
+ /// ```
+ fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
+ where
+ Fut: Future,
+ Self: Sized,
+ {
+ TakeUntil::new(self, fut)
+ }
+
/// Runs this stream to completion, executing the provided asynchronous
/// closure for each element on the stream.
///
@@ -1098,6 +1211,32 @@ pub trait StreamExt: Stream {
Chunks::new(self, capacity)
}
+ /// An adaptor for chunking up ready items of the stream inside a vector.
+ ///
+ /// This combinator will attempt to pull ready items from this stream and
+ /// buffer them into a local vector. At most `capacity` items will get
+ /// buffered before they're yielded from the returned stream. If underlying
+ /// stream returns `Poll::Pending`, and collected chunk is not empty, it will
+ /// be immediately returned.
+ ///
+ /// If the underlying stream ended and only a partial vector was created,
+ /// it'll be returned. Additionally if an error happens from the underlying
+ /// stream then the currently buffered items will be yielded.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `capacity` is zero.
+ #[cfg(feature = "alloc")]
+ fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
+ where
+ Self: Sized,
+ {
+ ReadyChunks::new(self, capacity)
+ }
+
/// A future that completes after the given stream has been fully processed
/// into the sink and the sink has been flushed and closed.
///
@@ -1111,13 +1250,13 @@ pub trait StreamExt: Stream {
#[cfg(feature = "sink")]
fn forward<S>(self, sink: S) -> Forward<Self, S>
where
- S: Sink<<Self as TryStream>::Ok>,
- Self: TryStream<Error = S::Error> + Sized,
+ S: Sink<Self::Ok, Error = Self::Error>,
+ Self: TryStream + Sized,
{
Forward::new(self, sink)
}
- /// Splits this `Stream + Sink` object into separate `Stream` and `Sink`
+ /// Splits this `Stream + Sink` object into separate `Sink` and `Stream`
/// objects.
///
/// This can be useful when you want to split ownership between tasks, or
diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs
index 9272baf..fb0f874 100644
--- a/src/stream/stream/peek.rs
+++ b/src/stream/stream/peek.rs
@@ -1,4 +1,3 @@
-use crate::future::Either;
use crate::stream::{Fuse, StreamExt};
use core::fmt;
use core::pin::Pin;
@@ -7,26 +6,23 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// A `Stream` that implements a `peek` method.
///
/// The `peek` method can be used to retrieve a reference
/// to the next `Stream::Item` if available. A subsequent
/// call to `poll` will return the owned item.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Peekable<St: Stream> {
+ #[pin]
stream: Fuse<St>,
peeked: Option<St::Item>,
}
-impl<St: Stream + Unpin> Unpin for Peekable<St> {}
-
impl<St: Stream> Peekable<St> {
- unsafe_pinned!(stream: Fuse<St>);
- unsafe_unpinned!(peeked: Option<St::Item>);
-
pub(super) fn new(stream: St) -> Peekable<St> {
Peekable {
stream: stream.fuse(),
@@ -34,37 +30,7 @@ impl<St: Stream> Peekable<St> {
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- self.stream.get_ref()
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- self.stream.get_mut()
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream().get_pin_mut()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream.into_inner()
- }
+ delegate_access_inner!(stream, St, (.));
/// Produces a `Peek` future which retrieves a reference to the next item
/// in the stream, or `None` if the underlying stream terminates.
@@ -72,42 +38,27 @@ impl<St: Stream> Peekable<St> {
Peek { inner: Some(self) }
}
- /// Attempt to poll the underlying stream, and return the mutable borrow
- /// in case that is desirable to try for another time.
- /// In case a peeking poll is successful, the reference to the next item
- /// will be in the `Either::Right` variant; otherwise, the mutable borrow
- /// will be in the `Either::Left` variant.
- fn do_poll_peek(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Either<Pin<&mut Self>, Option<&St::Item>> {
- if self.peeked.is_some() {
- let this: &Self = self.into_ref().get_ref();
- return Either::Right(this.peeked.as_ref());
- }
- match self.as_mut().stream().poll_next(cx) {
- Poll::Ready(None) => Either::Right(None),
- Poll::Ready(Some(item)) => {
- *self.as_mut().peeked() = Some(item);
- let this: &Self = self.into_ref().get_ref();
- Either::Right(this.peeked.as_ref())
- }
- _ => Either::Left(self),
- }
- }
-
/// Peek retrieves a reference to the next item in the stream.
///
/// This method polls the underlying stream and return either a reference
/// to the next item if the stream is ready or passes through any errors.
+ #[project]
pub fn poll_peek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&St::Item>> {
- match self.do_poll_peek(cx) {
- Either::Left(_) => Poll::Pending,
- Either::Right(poll) => Poll::Ready(poll),
- }
+ #[project]
+ let Peekable { mut stream, peeked } = self.project();
+
+ Poll::Ready(loop {
+ if peeked.is_some() {
+ break peeked.as_ref();
+ } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+ *peeked = Some(item);
+ } else {
+ break None;
+ }
+ })
}
}
@@ -120,11 +71,14 @@ impl<St: Stream> FusedStream for Peekable<St> {
impl<S: Stream> Stream for Peekable<S> {
type Item = S::Item;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- if let Some(item) = self.as_mut().peeked().take() {
+ #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ #[project]
+ let Peekable { stream, peeked } = self.project();
+ if let Some(item) = peeked.take() {
return Poll::Ready(Some(item));
}
- self.as_mut().stream().poll_next(cx)
+ stream.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
@@ -151,13 +105,12 @@ where
}
/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
+#[pin_project]
#[must_use = "futures do nothing unless polled"]
pub struct Peek<'a, St: Stream> {
inner: Option<Pin<&'a mut Peekable<St>>>,
}
-impl<St: Stream> Unpin for Peek<'_, St> {}
-
impl<St> fmt::Debug for Peek<'_, St>
where
St: Stream + fmt::Debug,
@@ -181,15 +134,12 @@ where
St: Stream,
{
type Output = Option<&'a St::Item>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- if let Some(peekable) = self.inner.take() {
- match peekable.do_poll_peek(cx) {
- Either::Left(peekable) => {
- self.inner = Some(peekable);
- Poll::Pending
- }
- Either::Right(peek) => Poll::Ready(peek),
- }
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let inner = self.project().inner;
+ if let Some(peekable) = inner {
+ ready!(peekable.as_mut().poll_peek(cx));
+
+ inner.take().unwrap().poll_peek(cx)
} else {
panic!("Peek polled after completion")
}
diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs
new file mode 100644
index 0000000..2152cb7
--- /dev/null
+++ b/src/stream/stream/ready_chunks.rs
@@ -0,0 +1,112 @@
+use crate::stream::Fuse;
+use futures_core::stream::{Stream, FusedStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project::{pin_project, project};
+use core::mem;
+use core::pin::Pin;
+use alloc::vec::Vec;
+
+/// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method.
+#[pin_project]
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct ReadyChunks<St: Stream> {
+ #[pin]
+ stream: Fuse<St>,
+ items: Vec<St::Item>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+}
+
+impl<St: Stream> ReadyChunks<St> where St: Stream {
+ pub(super) fn new(stream: St, capacity: usize) -> ReadyChunks<St> {
+ assert!(capacity > 0);
+
+ ReadyChunks {
+ stream: super::Fuse::new(stream),
+ items: Vec::with_capacity(capacity),
+ cap: capacity,
+ }
+ }
+
+ delegate_access_inner!(stream, St, (.));
+}
+
+impl<St: Stream> Stream for ReadyChunks<St> {
+ type Item = Vec<St::Item>;
+
+ #[project]
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ #[project]
+ let ReadyChunks { items, cap, mut stream } = self.project();
+
+ loop {
+ match stream.as_mut().poll_next(cx) {
+ // Flush all collected data if underlying stream doesn't contain
+ // more ready values
+ Poll::Pending => {
+ return if items.is_empty() {
+ Poll::Pending
+ } else {
+ Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap))))
+ }
+ }
+
+ // Push the ready item into the buffer and check whether it is full.
+ // If so, replace our buffer with a new and empty one and return
+ // the full one.
+ Poll::Ready(Some(item)) => {
+ items.push(item);
+ if items.len() >= *cap {
+ return Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap))))
+ }
+ }
+
+ // Since the underlying stream ran out of values, return what we
+ // have buffered, if we have anything.
+ Poll::Ready(None) => {
+ let last = if items.is_empty() {
+ None
+ } else {
+ let full_buf = mem::replace(items, Vec::new());
+ Some(full_buf)
+ };
+
+ return Poll::Ready(last);
+ }
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let (lower, upper) = self.stream.size_hint();
+ let lower = lower.saturating_add(chunk_len);
+ let upper = match upper {
+ Some(x) => x.checked_add(chunk_len),
+ None => None,
+ };
+ (lower, upper)
+ }
+}
+
+impl<St: FusedStream> FusedStream for ReadyChunks<St> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated() && self.items.is_empty()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Item> Sink<Item> for ReadyChunks<S>
+where
+ S: Stream + Sink<Item>,
+{
+ type Error = S::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs
index 4f937f4..0cdfcbc 100644
--- a/src/stream/stream/scan.rs
+++ b/src/stream/stream/scan.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
struct StateFn<S, F> {
state: S,
@@ -13,15 +13,16 @@ struct StateFn<S, F> {
}
/// Stream for the [`scan`](super::StreamExt::scan) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct Scan<St: Stream, S, Fut, F> {
+ #[pin]
stream: St,
state_f: Option<StateFn<S, F>>,
+ #[pin]
future: Option<Fut>,
}
-impl<St: Unpin + Stream, S, Fut: Unpin, F> Unpin for Scan<St, S, Fut, F> {}
-
impl<St, S, Fut, F> fmt::Debug for Scan<St, S, Fut, F>
where
St: Stream + fmt::Debug,
@@ -40,10 +41,6 @@ where
}
impl<St: Stream, S, Fut, F> Scan<St, S, Fut, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(state_f: Option<StateFn<S, F>>);
- unsafe_pinned!(future: Option<Fut>);
-
/// Checks if internal state is `None`.
fn is_done_taking(&self) -> bool {
self.state_f.is_none()
@@ -67,37 +64,7 @@ where
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<B, St, S, Fut, F> Stream for Scan<St, S, Fut, F>
@@ -108,29 +75,32 @@ where
{
type Item = B;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
+ #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
if self.is_done_taking() {
return Poll::Ready(None);
}
- if self.future.is_none() {
- let item = match ready!(self.as_mut().stream().poll_next(cx)) {
- Some(e) => e,
- None => return Poll::Ready(None),
- };
- let state_f = self.as_mut().state_f().as_mut().unwrap();
- let fut = (state_f.f)(&mut state_f.state, item);
- self.as_mut().future().set(Some(fut));
- }
-
- let item = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
- self.as_mut().future().set(None);
-
- if item.is_none() {
- self.as_mut().state_f().take();
- }
-
- Poll::Ready(item)
+ #[project]
+ let Scan { mut stream, state_f, mut future } = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ let item = ready!(fut.poll(cx));
+ future.set(None);
+
+ if item.is_none() {
+ *state_f = None;
+ }
+
+ break item;
+ } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+ let state_f = state_f.as_mut().unwrap();
+ future.set(Some((state_f.f)(&mut state_f.state, item)))
+ } else {
+ break None;
+ }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs
index 0b7c632..c0f6611 100644
--- a/src/stream/stream/skip.rs
+++ b/src/stream/stream/skip.rs
@@ -3,22 +3,19 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`skip`](super::StreamExt::skip) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Skip<St> {
+ #[pin]
stream: St,
remaining: usize,
}
-impl<St: Unpin> Unpin for Skip<St> {}
-
impl<St: Stream> Skip<St> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(remaining: usize);
-
pub(super) fn new(stream: St, n: usize) -> Skip<St> {
Skip {
stream,
@@ -26,37 +23,7 @@ impl<St: Stream> Skip<St> {
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St: FusedStream> FusedStream for Skip<St> {
@@ -68,18 +35,22 @@ impl<St: FusedStream> FusedStream for Skip<St> {
impl<St: Stream> Stream for Skip<St> {
type Item = St::Item;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St::Item>> {
- while self.remaining > 0 {
- match ready!(self.as_mut().stream().poll_next(cx)) {
- Some(_) => *self.as_mut().remaining() -= 1,
- None => return Poll::Ready(None),
+ #[project]
+ let Skip { mut stream, remaining } = self.project();
+ while *remaining > 0 {
+ if ready!(stream.as_mut().poll_next(cx)).is_some() {
+ *remaining -= 1;
+ } else {
+ return Poll::Ready(None);
}
}
- self.as_mut().stream().poll_next(cx)
+ stream.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs
index 666d9de..3d664f2 100644
--- a/src/stream/stream/skip_while.rs
+++ b/src/stream/stream/skip_while.rs
@@ -5,20 +5,21 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct SkipWhile<St, Fut, F> where St: Stream {
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Item>,
done_skipping: bool,
}
-impl<St: Unpin + Stream, Fut: Unpin, F> Unpin for SkipWhile<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for SkipWhile<St, Fut, F>
where
St: Stream + fmt::Debug,
@@ -40,12 +41,6 @@ impl<St, Fut, F> SkipWhile<St, Fut, F>
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending_fut: Option<Fut>);
- unsafe_unpinned!(pending_item: Option<St::Item>);
- unsafe_unpinned!(done_skipping: bool);
-
pub(super) fn new(stream: St, f: F) -> SkipWhile<St, Fut, F> {
SkipWhile {
stream,
@@ -56,37 +51,7 @@ impl<St, Fut, F> SkipWhile<St, Fut, F>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F>
@@ -106,44 +71,48 @@ impl<St, Fut, F> Stream for SkipWhile<St, Fut, F>
{
type Item = St::Item;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St::Item>> {
- if self.done_skipping {
- return self.as_mut().stream().poll_next(cx);
- }
-
- loop {
- if self.pending_item.is_none() {
- let item = match ready!(self.as_mut().stream().poll_next(cx)) {
- Some(e) => e,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(&item);
- self.as_mut().pending_fut().set(Some(fut));
- *self.as_mut().pending_item() = Some(item);
- }
+ #[project]
+ let SkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project();
- let skipped = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx));
- let item = self.as_mut().pending_item().take().unwrap();
- self.as_mut().pending_fut().set(None);
+ if *done_skipping {
+ return stream.poll_next(cx);
+ }
- if !skipped {
- *self.as_mut().done_skipping() = true;
- return Poll::Ready(Some(item))
+ Poll::Ready(loop {
+ if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ let skipped = ready!(fut.poll(cx));
+ let item = pending_item.take();
+ pending_fut.set(None);
+ if !skipped {
+ *done_skipping = true;
+ break item;
+ }
+ } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+ pending_fut.set(Some(f(&item)));
+ *pending_item = Some(item);
+ } else {
+ break None;
}
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
- let (_, upper) = self.stream.size_hint();
- let upper = match upper {
- Some(x) => x.checked_add(pending_len),
- None => None,
- };
- (0, upper) // can't know a lower bound, due to the predicate
+ if self.done_skipping {
+ self.stream.size_hint()
+ } else {
+ let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let (_, upper) = self.stream.size_hint();
+ let upper = match upper {
+ Some(x) => x.checked_add(pending_len),
+ None => None,
+ };
+ (0, upper) // can't know a lower bound, due to the predicate
+ }
}
}
diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs
index 4118b33..c7237a2 100644
--- a/src/stream/stream/split.rs
+++ b/src/stream/stream/split.rs
@@ -16,7 +16,7 @@ impl<S> Unpin for SplitStream<S> {}
impl<S: Unpin> SplitStream<S> {
/// Attempts to put the two "halves" of a split `Stream + Sink` back
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
- /// a matching pair originating from the same call to `Stream::split`.
+ /// a matching pair originating from the same call to `StreamExt::split`.
pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>>
where S: Sink<Item>,
{
@@ -53,7 +53,7 @@ impl<S, Item> Unpin for SplitSink<S, Item> {}
impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
/// Attempts to put the two "halves" of a split `Stream + Sink` back
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
- /// a matching pair originating from the same call to `Stream::split`.
+ /// a matching pair originating from the same call to `StreamExt::split`.
pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> {
self.lock.reunite(other.0).map_err(|err| {
ReuniteError(SplitSink(err.0), SplitStream(err.1))
diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs
index 1109a4a..4a68920 100644
--- a/src/stream/stream/take.rs
+++ b/src/stream/stream/take.rs
@@ -4,22 +4,19 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`take`](super::StreamExt::take) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Take<St> {
+ #[pin]
stream: St,
remaining: usize,
}
-impl<St: Unpin> Unpin for Take<St> {}
-
impl<St: Stream> Take<St> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(remaining: usize);
-
pub(super) fn new(stream: St, n: usize) -> Take<St> {
Take {
stream,
@@ -27,37 +24,7 @@ impl<St: Stream> Take<St> {
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St> Stream for Take<St>
@@ -65,17 +32,21 @@ impl<St> Stream for Take<St>
{
type Item = St::Item;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St::Item>> {
if self.remaining == 0 {
Poll::Ready(None)
} else {
- let next = ready!(self.as_mut().stream().poll_next(cx));
- match next {
- Some(_) => *self.as_mut().remaining() -= 1,
- None => *self.as_mut().remaining() = 0,
+ #[project]
+ let Take { stream, remaining } = self.project();
+ let next = ready!(stream.poll_next(cx));
+ if next.is_some() {
+ *remaining -= 1;
+ } else {
+ *remaining = 0;
}
Poll::Ready(next)
}
diff --git a/src/stream/stream/take_until.rs b/src/stream/stream/take_until.rs
new file mode 100644
index 0000000..3662620
--- /dev/null
+++ b/src/stream/stream/take_until.rs
@@ -0,0 +1,178 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project::{pin_project, project};
+
+// FIXME: docs, tests
+
+/// Stream for the [`take_until`](super::StreamExt::take_until) method.
+#[pin_project]
+#[must_use = "streams do nothing unless polled"]
+pub struct TakeUntil<St: Stream, Fut: Future> {
+ #[pin]
+ stream: St,
+ /// Contains the inner Future on start and None once the inner Future is resolved
+ /// or taken out by the user.
+ #[pin]
+ fut: Option<Fut>,
+ /// Contains fut's return value once fut is resolved
+ fut_result: Option<Fut::Output>,
+ /// Whether the future was taken out by the user.
+ free: bool,
+}
+
+impl<St, Fut> fmt::Debug for TakeUntil<St, Fut>
+where
+ St: Stream + fmt::Debug,
+ St::Item: fmt::Debug,
+ Fut: Future + fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TakeUntil")
+ .field("stream", &self.stream)
+ .field("fut", &self.fut)
+ .finish()
+ }
+}
+
+impl<St, Fut> TakeUntil<St, Fut>
+where
+ St: Stream,
+ Fut: Future,
+{
+ pub(super) fn new(stream: St, fut: Fut) -> TakeUntil<St, Fut> {
+ TakeUntil {
+ stream,
+ fut: Some(fut),
+ fut_result: None,
+ free: false,
+ }
+ }
+
+ delegate_access_inner!(stream, St, ());
+
+ /// Extract the stopping future out of the combinator.
+ /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
+ /// Taking out the future means the combinator will be yielding
+ /// elements from the wrapped stream without ever stopping it.
+ pub fn take_future(&mut self) -> Option<Fut> {
+ if self.fut.is_some() {
+ self.free = true;
+ }
+
+ self.fut.take()
+ }
+
+ /// Once the stopping future is resolved, this method can be used
+ /// to extract the value returned by the stopping future.
+ ///
+ /// This may be used to retrieve arbitrary data from the stopping
+ /// future, for example a reason why the stream was stopped.
+ ///
+ /// This method will return `None` if the future isn't resovled yet,
+ /// or if the result was already taken out.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future;
+ /// use futures::stream::{self, StreamExt};
+ /// use futures::task::Poll;
+ ///
+ /// let stream = stream::iter(1..=10);
+ ///
+ /// let mut i = 0;
+ /// let stop_fut = future::poll_fn(|_cx| {
+ /// i += 1;
+ /// if i <= 5 {
+ /// Poll::Pending
+ /// } else {
+ /// Poll::Ready("reason")
+ /// }
+ /// });
+ ///
+ /// let mut stream = stream.take_until(stop_fut);
+ /// let _ = stream.by_ref().collect::<Vec<_>>().await;
+ ///
+ /// let result = stream.take_result().unwrap();
+ /// assert_eq!(result, "reason");
+ /// # });
+ /// ```
+ pub fn take_result(&mut self) -> Option<Fut::Output> {
+ self.fut_result.take()
+ }
+
+ /// Whether the stream was stopped yet by the stopping future
+ /// being resolved.
+ pub fn is_stopped(&self) -> bool {
+ !self.free && self.fut.is_none()
+ }
+}
+
+impl<St, Fut> Stream for TakeUntil<St, Fut>
+where
+ St: Stream,
+ Fut: Future,
+{
+ type Item = St::Item;
+
+ #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
+ #[project]
+ let TakeUntil { stream, mut fut, fut_result, free } = self.project();
+
+ if let Some(f) = fut.as_mut().as_pin_mut() {
+ if let Poll::Ready(result) = f.poll(cx) {
+ fut.set(None);
+ *fut_result = Some(result);
+ }
+ }
+
+ if !*free && fut.is_none() {
+ // Future resolved, inner stream stopped
+ Poll::Ready(None)
+ } else {
+ // Future either not resolved yet or taken out by the user
+ let item = ready!(stream.poll_next(cx));
+ if item.is_none() {
+ fut.set(None);
+ }
+ Poll::Ready(item)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.is_stopped() {
+ return (0, Some(0));
+ }
+
+ self.stream.size_hint()
+ }
+}
+
+impl<St, Fut> FusedStream for TakeUntil<St, Fut>
+where
+ St: Stream,
+ Fut: Future,
+{
+ fn is_terminated(&self) -> bool {
+ self.is_stopped()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut>
+where
+ S: Stream + Sink<Item>,
+ Fut: Future,
+{
+ type Error = S::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs
index 68606ec..d90061e 100644
--- a/src/stream/stream/take_while.rs
+++ b/src/stream/stream/take_while.rs
@@ -5,20 +5,21 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`take_while`](super::StreamExt::take_while) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
-pub struct TakeWhile<St: Stream , Fut, F> {
+pub struct TakeWhile<St: Stream, Fut, F> {
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Item>,
done_taking: bool,
}
-impl<St: Unpin + Stream, Fut: Unpin, F> Unpin for TakeWhile<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for TakeWhile<St, Fut, F>
where
St: Stream + fmt::Debug,
@@ -35,14 +36,6 @@ where
}
}
-impl<St: Stream, Fut, F> TakeWhile<St, Fut, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending_fut: Option<Fut>);
- unsafe_unpinned!(pending_item: Option<St::Item>);
- unsafe_unpinned!(done_taking: bool);
-}
-
impl<St, Fut, F> TakeWhile<St, Fut, F>
where St: Stream,
F: FnMut(&St::Item) -> Fut,
@@ -58,37 +51,7 @@ impl<St, Fut, F> TakeWhile<St, Fut, F>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> Stream for TakeWhile<St, Fut, F>
@@ -98,34 +61,36 @@ impl<St, Fut, F> Stream for TakeWhile<St, Fut, F>
{
type Item = St::Item;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St::Item>> {
if self.done_taking {
return Poll::Ready(None);
}
- if self.pending_item.is_none() {
- let item = match ready!(self.as_mut().stream().poll_next(cx)) {
- Some(e) => e,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(&item);
- self.as_mut().pending_fut().set(Some(fut));
- *self.as_mut().pending_item() = Some(item);
- }
-
- let take = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx));
- self.as_mut().pending_fut().set(None);
- let item = self.as_mut().pending_item().take().unwrap();
-
- if take {
- Poll::Ready(Some(item))
- } else {
- *self.as_mut().done_taking() = true;
- Poll::Ready(None)
- }
+ #[project]
+ let TakeWhile { mut stream, f, mut pending_fut, pending_item, done_taking } = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ let take = ready!(fut.poll(cx));
+ let item = pending_item.take();
+ pending_fut.set(None);
+ if take {
+ break item;
+ } else {
+ *done_taking = true;
+ break None;
+ }
+ } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+ pending_fut.set(Some(f(&item)));
+ *pending_item = Some(item);
+ } else {
+ break None;
+ }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs
index 39843b2..d54512e 100644
--- a/src/stream/stream/then.rs
+++ b/src/stream/stream/then.rs
@@ -5,18 +5,19 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`then`](super::StreamExt::then) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct Then<St, Fut, F> {
+ #[pin]
stream: St,
+ #[pin]
future: Option<Fut>,
f: F,
}
-impl<St: Unpin, Fut: Unpin, F> Unpin for Then<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
where
St: fmt::Debug,
@@ -30,12 +31,6 @@ where
}
}
-impl<St, Fut, F> Then<St, Fut, F> {
- unsafe_pinned!(stream: St);
- unsafe_pinned!(future: Option<Fut>);
- unsafe_unpinned!(f: F);
-}
-
impl<St, Fut, F> Then<St, Fut, F>
where St: Stream,
F: FnMut(St::Item) -> Fut,
@@ -48,37 +43,7 @@ impl<St, Fut, F> Then<St, Fut, F>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> FusedStream for Then<St, Fut, F>
@@ -98,22 +63,25 @@ impl<St, Fut, F> Stream for Then<St, Fut, F>
{
type Item = Fut::Output;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Fut::Output>> {
- if self.future.is_none() {
- let item = match ready!(self.as_mut().stream().poll_next(cx)) {
- None => return Poll::Ready(None),
- Some(e) => e,
- };
- let fut = (self.as_mut().f())(item);
- self.as_mut().future().set(Some(fut));
- }
-
- let e = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
- self.as_mut().future().set(None);
- Poll::Ready(Some(e))
+ #[project]
+ let Then { mut stream, f, mut future } = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ let item = ready!(fut.poll(cx));
+ future.set(None);
+ break Some(item);
+ } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+ future.set(Some(f(item)));
+ } else {
+ break None;
+ }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs
index f97ac17..6c148fb 100644
--- a/src/stream/stream/zip.rs
+++ b/src/stream/stream/zip.rs
@@ -3,33 +3,22 @@ use core::cmp;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`zip`](super::StreamExt::zip) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Zip<St1: Stream, St2: Stream> {
+ #[pin]
stream1: Fuse<St1>,
+ #[pin]
stream2: Fuse<St2>,
queued1: Option<St1::Item>,
queued2: Option<St2::Item>,
}
-#[allow(clippy::type_repetition_in_bounds)] // https://github.com/rust-lang/rust-clippy/issues/4323
-impl<St1, St2> Unpin for Zip<St1, St2>
-where
- St1: Stream,
- Fuse<St1>: Unpin,
- St2: Stream,
- Fuse<St2>: Unpin,
-{}
-
impl<St1: Stream, St2: Stream> Zip<St1, St2> {
- unsafe_pinned!(stream1: Fuse<St1>);
- unsafe_pinned!(stream2: Fuse<St2>);
- unsafe_unpinned!(queued1: Option<St1::Item>);
- unsafe_unpinned!(queued2: Option<St2::Item>);
-
pub(super) fn new(stream1: St1, stream2: St2) -> Zip<St1, St2> {
Zip {
stream1: stream1.fuse(),
@@ -88,28 +77,31 @@ impl<St1, St2> Stream for Zip<St1, St2>
{
type Item = (St1::Item, St2::Item);
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.queued1.is_none() {
- match self.as_mut().stream1().poll_next(cx) {
- Poll::Ready(Some(item1)) => *self.as_mut().queued1() = Some(item1),
+ #[project]
+ let Zip { mut stream1, mut stream2, queued1, queued2 } = self.project();
+
+ if queued1.is_none() {
+ match stream1.as_mut().poll_next(cx) {
+ Poll::Ready(Some(item1)) => *queued1 = Some(item1),
Poll::Ready(None) | Poll::Pending => {}
}
}
- if self.queued2.is_none() {
- match self.as_mut().stream2().poll_next(cx) {
- Poll::Ready(Some(item2)) => *self.as_mut().queued2() = Some(item2),
+ if queued2.is_none() {
+ match stream2.as_mut().poll_next(cx) {
+ Poll::Ready(Some(item2)) => *queued2 = Some(item2),
Poll::Ready(None) | Poll::Pending => {}
}
}
- if self.queued1.is_some() && self.queued2.is_some() {
- let pair = (self.as_mut().queued1().take().unwrap(),
- self.as_mut().queued2().take().unwrap());
+ if queued1.is_some() && queued2.is_some() {
+ let pair = (queued1.take().unwrap(), queued2.take().unwrap());
Poll::Ready(Some(pair))
- } else if self.stream1.is_done() || self.stream2.is_done() {
+ } else if stream1.is_done() || stream2.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs
index 809c32a..563ed34 100644
--- a/src/stream/try_stream/and_then.rs
+++ b/src/stream/try_stream/and_then.rs
@@ -5,18 +5,19 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct AndThen<St, Fut, F> {
+ #[pin]
stream: St,
+ #[pin]
future: Option<Fut>,
f: F,
}
-impl<St: Unpin, Fut: Unpin, F> Unpin for AndThen<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F>
where
St: fmt::Debug,
@@ -30,12 +31,6 @@ where
}
}
-impl<St, Fut, F> AndThen<St, Fut, F> {
- unsafe_pinned!(stream: St);
- unsafe_pinned!(future: Option<Fut>);
- unsafe_unpinned!(f: F);
-}
-
impl<St, Fut, F> AndThen<St, Fut, F>
where St: TryStream,
F: FnMut(St::Ok) -> Fut,
@@ -45,37 +40,7 @@ impl<St, Fut, F> AndThen<St, Fut, F>
Self { stream, future: None, f }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> Stream for AndThen<St, Fut, F>
@@ -85,22 +50,25 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F>
{
type Item = Result<Fut::Ok, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.future.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- None => return Poll::Ready(None),
- Some(e) => e,
- };
- let fut = (self.as_mut().f())(item);
- self.as_mut().future().set(Some(fut));
- }
-
- let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx));
- self.as_mut().future().set(None);
- Poll::Ready(Some(e))
+ #[project]
+ let AndThen { mut stream, mut future, f } = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ let item = ready!(fut.try_poll(cx));
+ future.set(None);
+ break Some(item);
+ } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ future.set(Some(f(item)));
+ } else {
+ break None;
+ }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/err_into.rs b/src/stream/try_stream/err_into.rs
deleted file mode 100644
index f5d9294..0000000
--- a/src/stream/try_stream/err_into.rs
+++ /dev/null
@@ -1,98 +0,0 @@
-use core::marker::PhantomData;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::unsafe_pinned;
-
-/// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct ErrInto<St, E> {
- stream: St,
- _marker: PhantomData<E>,
-}
-
-impl<St: Unpin, E> Unpin for ErrInto<St, E> {}
-
-impl<St, E> ErrInto<St, E> {
- unsafe_pinned!(stream: St);
-
- pub(super) fn new(stream: St) -> Self {
- ErrInto { stream, _marker: PhantomData }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, E> FusedStream for ErrInto<St, E>
-where
- St: TryStream + FusedStream,
- St::Error: Into<E>,
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, E> Stream for ErrInto<St, E>
-where
- St: TryStream,
- St::Error: Into<E>,
-{
- type Item = Result<St::Ok, E>;
-
- fn poll_next(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.stream().try_poll_next(cx)
- .map(|res| res.map(|some| some.map_err(Into::into)))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, E, Item> Sink<Item> for ErrInto<S, E>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/inspect_err.rs b/src/stream/try_stream/inspect_err.rs
deleted file mode 100644
index 3c23ae0..0000000
--- a/src/stream/try_stream/inspect_err.rs
+++ /dev/null
@@ -1,118 +0,0 @@
-use crate::stream::stream::inspect;
-use core::fmt;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-/// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
-#[must_use = "streams do nothing unless polled"]
-pub struct InspectErr<St, F> {
- stream: St,
- f: F,
-}
-
-impl<St: Unpin, F> Unpin for InspectErr<St, F> {}
-
-impl<St, F> fmt::Debug for InspectErr<St, F>
-where
- St: fmt::Debug,
-{
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("InspectErr")
- .field("stream", &self.stream)
- .finish()
- }
-}
-
-impl<St, F> InspectErr<St, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-}
-
-impl<St, F> InspectErr<St, F>
-where
- St: TryStream,
- F: FnMut(&St::Error),
-{
- pub(super) fn new(stream: St, f: F) -> Self {
- Self { stream, f }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, F> FusedStream for InspectErr<St, F>
-where
- St: TryStream + FusedStream,
- F: FnMut(&St::Error),
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, F> Stream for InspectErr<St, F>
-where
- St: TryStream,
- F: FnMut(&St::Error),
-{
- type Item = Result<St::Ok, St::Error>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.as_mut()
- .stream()
- .try_poll_next(cx)
- .map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.as_mut().f()))))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, F, Item> Sink<Item> for InspectErr<S, F>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/inspect_ok.rs b/src/stream/try_stream/inspect_ok.rs
deleted file mode 100644
index 89fb459..0000000
--- a/src/stream/try_stream/inspect_ok.rs
+++ /dev/null
@@ -1,118 +0,0 @@
-use crate::stream::stream::inspect;
-use core::fmt;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-/// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
-#[must_use = "streams do nothing unless polled"]
-pub struct InspectOk<St, F> {
- stream: St,
- f: F,
-}
-
-impl<St: Unpin, F> Unpin for InspectOk<St, F> {}
-
-impl<St, F> fmt::Debug for InspectOk<St, F>
-where
- St: fmt::Debug,
-{
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("InspectOk")
- .field("stream", &self.stream)
- .finish()
- }
-}
-
-impl<St, F> InspectOk<St, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-}
-
-impl<St, F> InspectOk<St, F>
-where
- St: TryStream,
- F: FnMut(&St::Ok),
-{
- pub(super) fn new(stream: St, f: F) -> Self {
- Self { stream, f }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, F> FusedStream for InspectOk<St, F>
-where
- St: TryStream + FusedStream,
- F: FnMut(&St::Ok),
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, F> Stream for InspectOk<St, F>
-where
- St: TryStream,
- F: FnMut(&St::Ok),
-{
- type Item = Result<St::Ok, St::Error>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.as_mut()
- .stream()
- .try_poll_next(cx)
- .map(|opt| opt.map(|res| res.map(|e| inspect(e, self.as_mut().f()))))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, F, Item> Sink<Item> for InspectOk<S, F>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/into_stream.rs b/src/stream/try_stream/into_stream.rs
index b0fa07a..370a327 100644
--- a/src/stream/try_stream/into_stream.rs
+++ b/src/stream/try_stream/into_stream.rs
@@ -3,54 +3,24 @@ use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::unsafe_pinned;
+use pin_project::pin_project;
/// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct IntoStream<St> {
+ #[pin]
stream: St,
}
impl<St> IntoStream<St> {
- unsafe_pinned!(stream: St);
-
#[inline]
pub(super) fn new(stream: St) -> Self {
IntoStream { stream }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> {
@@ -67,7 +37,7 @@ impl<St: TryStream> Stream for IntoStream<St> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- self.stream().try_poll_next(cx)
+ self.project().stream.try_poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/map_err.rs b/src/stream/try_stream/map_err.rs
deleted file mode 100644
index 1b98d6b..0000000
--- a/src/stream/try_stream/map_err.rs
+++ /dev/null
@@ -1,112 +0,0 @@
-use core::fmt;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-/// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
-#[must_use = "streams do nothing unless polled"]
-pub struct MapErr<St, F> {
- stream: St,
- f: F,
-}
-
-impl<St: Unpin, F> Unpin for MapErr<St, F> {}
-
-impl<St, F> fmt::Debug for MapErr<St, F>
-where
- St: fmt::Debug,
-{
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("MapErr")
- .field("stream", &self.stream)
- .finish()
- }
-}
-
-impl<St, F> MapErr<St, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-
- /// Creates a new MapErr.
- pub(super) fn new(stream: St, f: F) -> Self {
- MapErr { stream, f }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, F, E> FusedStream for MapErr<St, F>
-where
- St: TryStream + FusedStream,
- F: FnMut(St::Error) -> E,
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, F, E> Stream for MapErr<St, F>
-where
- St: TryStream,
- F: FnMut(St::Error) -> E,
-{
- type Item = Result<St::Ok, E>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.as_mut()
- .stream()
- .try_poll_next(cx)
- .map(|opt| opt.map(|res| res.map_err(|e| self.as_mut().f()(e))))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, F, Item> Sink<Item> for MapErr<S, F>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/map_ok.rs b/src/stream/try_stream/map_ok.rs
deleted file mode 100644
index 19d01be..0000000
--- a/src/stream/try_stream/map_ok.rs
+++ /dev/null
@@ -1,112 +0,0 @@
-use core::fmt;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-/// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
-#[must_use = "streams do nothing unless polled"]
-pub struct MapOk<St, F> {
- stream: St,
- f: F,
-}
-
-impl<St: Unpin, F> Unpin for MapOk<St, F> {}
-
-impl<St, F> fmt::Debug for MapOk<St, F>
-where
- St: fmt::Debug,
-{
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("MapOk")
- .field("stream", &self.stream)
- .finish()
- }
-}
-
-impl<St, F> MapOk<St, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-
- /// Creates a new MapOk.
- pub(super) fn new(stream: St, f: F) -> Self {
- MapOk { stream, f }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, F, T> FusedStream for MapOk<St, F>
-where
- St: TryStream + FusedStream,
- F: FnMut(St::Ok) -> T,
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, F, T> Stream for MapOk<St, F>
-where
- St: TryStream,
- F: FnMut(St::Ok) -> T,
-{
- type Item = Result<T, St::Error>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.as_mut()
- .stream()
- .try_poll_next(cx)
- .map(|opt| opt.map(|res| res.map(|x| self.as_mut().f()(x))))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, F, Item> Sink<Item> for MapOk<S, F>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 6a7ced4..99d5a6d 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -11,34 +11,53 @@ use futures_core::{
stream::TryStream,
task::{Context, Poll},
};
+use crate::fns::{
+ InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn,
+};
+use crate::stream::{Map, Inspect};
mod and_then;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::and_then::AndThen;
-mod err_into;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::err_into::ErrInto;
-
-mod inspect_ok;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::inspect_ok::InspectOk;
-
-mod inspect_err;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::inspect_err::InspectErr;
+delegate_all!(
+ /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
+ ErrInto<St, E>(
+ MapErr<St, IntoFn<E>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
+);
+
+delegate_all!(
+ /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
+ InspectOk<St, F>(
+ Inspect<IntoStream<St>, InspectOkFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
+);
+
+delegate_all!(
+ /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
+ InspectErr<St, F>(
+ Inspect<IntoStream<St>, InspectErrFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
+);
mod into_stream;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::into_stream::IntoStream;
-mod map_ok;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::map_ok::MapOk;
+delegate_all!(
+ /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
+ MapOk<St, F>(
+ Map<IntoStream<St>, MapOkFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
+);
-mod map_err;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::map_err::MapErr;
+delegate_all!(
+ /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
+ MapErr<St, F>(
+ Map<IntoStream<St>, MapErrFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
+);
mod or_else;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -385,8 +404,9 @@ pub trait TryStreamExt: TryStream {
/// Skip elements on this stream while the provided asynchronous predicate
/// resolves to `true`.
///
- /// This function is similar to [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while)
- /// but exits early if an error occurs.
+ /// This function is similar to
+ /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
+ /// early if an error occurs.
///
/// # Examples
///
diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs
index 33310d1..0bba0d0 100644
--- a/src/stream/try_stream/or_else.rs
+++ b/src/stream/try_stream/or_else.rs
@@ -5,18 +5,19 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct OrElse<St, Fut, F> {
+ #[pin]
stream: St,
+ #[pin]
future: Option<Fut>,
f: F,
}
-impl<St: Unpin, Fut: Unpin, F> Unpin for OrElse<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F>
where
St: fmt::Debug,
@@ -30,12 +31,6 @@ where
}
}
-impl<St, Fut, F> OrElse<St, Fut, F> {
- unsafe_pinned!(stream: St);
- unsafe_pinned!(future: Option<Fut>);
- unsafe_unpinned!(f: F);
-}
-
impl<St, Fut, F> OrElse<St, Fut, F>
where St: TryStream,
F: FnMut(St::Error) -> Fut,
@@ -45,37 +40,7 @@ impl<St, Fut, F> OrElse<St, Fut, F>
Self { stream, future: None, f }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> Stream for OrElse<St, Fut, F>
@@ -85,23 +50,29 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F>
{
type Item = Result<St::Ok, Fut::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.future.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)) {
- None => return Poll::Ready(None),
- Some(Ok(e)) => return Poll::Ready(Some(Ok(e))),
- Some(Err(e)) => e,
- };
- let fut = (self.as_mut().f())(item);
- self.as_mut().future().set(Some(fut));
- }
-
- let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx));
- self.as_mut().future().set(None);
- Poll::Ready(Some(e))
+ #[project]
+ let OrElse { mut stream, mut future, f } = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ let item = ready!(fut.try_poll(cx));
+ future.set(None);
+ break Some(item);
+ } else {
+ match ready!(stream.as_mut().try_poll_next(cx)) {
+ Some(Ok(item)) => break Some(Ok(item)),
+ Some(Err(e)) => {
+ future.set(Some(f(e)));
+ },
+ None => break None,
+ }
+ }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs
index d11e1b4..566868b 100644
--- a/src/stream/try_stream/try_buffer_unordered.rs
+++ b/src/stream/try_stream/try_buffer_unordered.rs
@@ -5,32 +5,27 @@ use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
use core::pin::Pin;
/// Stream for the
/// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryBufferUnordered<St>
where St: TryStream
{
+ #[pin]
stream: Fuse<IntoStream<St>>,
in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>,
max: usize,
}
-impl<St> Unpin for TryBufferUnordered<St>
- where St: TryStream + Unpin
-{}
-
impl<St> TryBufferUnordered<St>
where St: TryStream,
St::Ok: TryFuture,
{
- unsafe_pinned!(stream: Fuse<IntoStream<St>>);
- unsafe_unpinned!(in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>);
-
pub(super) fn new(stream: St, n: usize) -> Self {
TryBufferUnordered {
stream: IntoStream::new(stream).fuse(),
@@ -39,37 +34,7 @@ impl<St> TryBufferUnordered<St>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- self.stream.get_ref().get_ref()
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- self.stream.get_mut().get_mut()
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream().get_pin_mut().get_pin_mut()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream.into_inner().into_inner()
- }
+ delegate_access_inner!(stream, St, (. .));
}
impl<St> Stream for TryBufferUnordered<St>
@@ -78,27 +43,31 @@ impl<St> Stream for TryBufferUnordered<St>
{
type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ #[project]
+ let TryBufferUnordered { mut stream, in_progress_queue, max } = self.project();
+
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures. Propagate errors from the stream immediately.
- while self.in_progress_queue.len() < self.max {
- match self.as_mut().stream().poll_next(cx)? {
- Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut.into_future()),
+ while in_progress_queue.len() < *max {
+ match stream.as_mut().poll_next(cx)? {
+ Poll::Ready(Some(fut)) => in_progress_queue.push(fut.into_future()),
Poll::Ready(None) | Poll::Pending => break,
}
}
// Attempt to pull the next value from the in_progress_queue
- match self.as_mut().in_progress_queue().poll_next_unpin(cx) {
+ match in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}
// If more values are still coming from the stream, we're not done yet
- if self.stream.is_done() {
+ if stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs
index d22e8e8..3c9aee2 100644
--- a/src/stream/try_stream/try_collect.rs
+++ b/src/stream/try_stream/try_collect.rs
@@ -3,34 +3,27 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, TryStream};
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`try_collect`](super::TryStreamExt::try_collect) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryCollect<St, C> {
+ #[pin]
stream: St,
items: C,
}
impl<St: TryStream, C: Default> TryCollect<St, C> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(items: C);
-
pub(super) fn new(s: St) -> TryCollect<St, C> {
TryCollect {
stream: s,
items: Default::default(),
}
}
-
- fn finish(self: Pin<&mut Self>) -> C {
- mem::replace(self.items(), Default::default())
- }
}
-impl<St: Unpin + TryStream, C> Unpin for TryCollect<St, C> {}
-
impl<St, C> FusedFuture for TryCollect<St, C>
where
St: TryStream + FusedStream,
@@ -48,15 +41,18 @@ where
{
type Output = Result<C, St::Error>;
+ #[project]
fn poll(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
- loop {
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(x) => self.as_mut().items().extend(Some(x)),
- None => return Poll::Ready(Ok(self.as_mut().finish())),
+ #[project]
+ let TryCollect { mut stream, items } = self.project();
+ Poll::Ready(Ok(loop {
+ match ready!(stream.as_mut().try_poll_next(cx)?) {
+ Some(x) => items.extend(Some(x)),
+ None => break mem::replace(items, Default::default()),
}
- }
+ }))
}
}
diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs
index 395f166..8c9710b 100644
--- a/src/stream/try_stream/try_concat.rs
+++ b/src/stream/try_stream/try_concat.rs
@@ -2,26 +2,23 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryConcat<St: TryStream> {
+ #[pin]
stream: St,
accum: Option<St::Ok>,
}
-impl<St: TryStream + Unpin> Unpin for TryConcat<St> {}
-
impl<St> TryConcat<St>
where
St: TryStream,
St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(accum: Option<St::Ok>);
-
pub(super) fn new(stream: St) -> TryConcat<St> {
TryConcat {
stream,
@@ -37,21 +34,20 @@ where
{
type Output = Result<St::Ok, St::Error>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- loop {
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(x) => {
- let accum = self.as_mut().accum();
- if let Some(a) = accum {
- a.extend(x)
- } else {
- *accum = Some(x)
- }
- },
- None => {
- return Poll::Ready(Ok(self.as_mut().accum().take().unwrap_or_default()))
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ let TryConcat { mut stream, accum } = self.project();
+ Poll::Ready(Ok(loop {
+ if let Some(x) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ if let Some(a) = accum {
+ a.extend(x)
+ } else {
+ *accum = Some(x)
}
+ } else {
+ break accum.take().unwrap_or_default();
}
- }
+ }))
}
}
diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs
index 24a9c32..310f991 100644
--- a/src/stream/try_stream/try_filter.rs
+++ b/src/stream/try_stream/try_filter.rs
@@ -5,24 +5,23 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`try_filter`](super::TryStreamExt::try_filter)
/// method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TryFilter<St, Fut, F>
where St: TryStream
{
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Ok>,
}
-impl<St, Fut, F> Unpin for TryFilter<St, Fut, F>
- where St: TryStream + Unpin, Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for TryFilter<St, Fut, F>
where
St: TryStream + fmt::Debug,
@@ -41,11 +40,6 @@ where
impl<St, Fut, F> TryFilter<St, Fut, F>
where St: TryStream
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending_fut: Option<Fut>);
- unsafe_unpinned!(pending_item: Option<St::Ok>);
-
pub(super) fn new(stream: St, f: F) -> Self {
TryFilter {
stream,
@@ -55,37 +49,7 @@ impl<St, Fut, F> TryFilter<St, Fut, F>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> FusedStream for TryFilter<St, Fut, F>
@@ -105,29 +69,28 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F>
{
type Item = Result<St::Ok, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<St::Ok, St::Error>>> {
- loop {
- if self.pending_fut.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(x) => x,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(&item);
- self.as_mut().pending_fut().set(Some(fut));
- *self.as_mut().pending_item() = Some(item);
- }
-
- let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx));
- self.as_mut().pending_fut().set(None);
- let item = self.as_mut().pending_item().take().unwrap();
-
- if yield_item {
- return Poll::Ready(Some(Ok(item)));
+ #[project]
+ let TryFilter { mut stream, f, mut pending_fut, pending_item } = self.project();
+ Poll::Ready(loop {
+ if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ let res = ready!(fut.poll(cx));
+ pending_fut.set(None);
+ if res {
+ break pending_item.take().map(Ok);
+ }
+ *pending_item = None;
+ } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ pending_fut.set(Some(f(&item)));
+ *pending_item = Some(item);
+ } else {
+ break None;
}
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs
index ed7eeb2..ba8e43a 100644
--- a/src/stream/try_stream/try_filter_map.rs
+++ b/src/stream/try_stream/try_filter_map.rs
@@ -5,21 +5,20 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map)
/// method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TryFilterMap<St, Fut, F> {
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending: Option<Fut>,
}
-impl<St, Fut, F> Unpin for TryFilterMap<St, Fut, F>
- where St: Unpin, Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for TryFilterMap<St, Fut, F>
where
St: fmt::Debug,
@@ -34,45 +33,11 @@ where
}
impl<St, Fut, F> TryFilterMap<St, Fut, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending: Option<Fut>);
-
pub(super) fn new(stream: St, f: F) -> Self {
TryFilterMap { stream, f, pending: None }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F, T> FusedStream for TryFilterMap<St, Fut, F>
@@ -92,26 +57,29 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
{
type Item = Result<T, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<T, St::Error>>> {
- loop {
- if self.pending.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(x) => x,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(item);
- self.as_mut().pending().set(Some(fut));
- }
-
- let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().try_poll(cx));
- self.as_mut().pending().set(None);
- if let Some(x) = result? {
- return Poll::Ready(Some(Ok(x)));
+ #[project]
+ let TryFilterMap { mut stream, f, mut pending } = self.project();
+ Poll::Ready(loop {
+ if let Some(p) = pending.as_mut().as_pin_mut() {
+ // We have an item in progress, poll that until it's done
+ let item = ready!(p.try_poll(cx)?);
+ pending.set(None);
+ if item.is_some() {
+ break item.map(Ok);
+ }
+ } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ // No item in progress, but the stream is still going
+ pending.set(Some(f(item)));
+ } else {
+ // The stream is done
+ break None;
}
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs
index 5f81b22..a528639 100644
--- a/src/stream/try_stream/try_flatten.rs
+++ b/src/stream/try_stream/try_flatten.rs
@@ -3,34 +3,22 @@ use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::unsafe_pinned;
+use pin_project::{pin_project, project};
/// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryFlatten<St>
where
St: TryStream,
{
+ #[pin]
stream: St,
+ #[pin]
next: Option<St::Ok>,
}
-impl<St> Unpin for TryFlatten<St>
-where
- St: TryStream + Unpin,
- St::Ok: Unpin,
-{
-}
-
-impl<St> TryFlatten<St>
-where
- St: TryStream,
-{
- unsafe_pinned!(stream: St);
- unsafe_pinned!(next: Option<St::Ok>);
-}
-
impl<St> TryFlatten<St>
where
St: TryStream,
@@ -41,37 +29,7 @@ where
Self { stream, next: None }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St> FusedStream for TryFlatten<St>
@@ -93,27 +51,23 @@ where
{
type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- loop {
- if self.next.is_none() {
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(e) => self.as_mut().next().set(Some(e)),
- None => return Poll::Ready(None),
+ #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ #[project]
+ let TryFlatten { mut stream, mut next } = self.project();
+ Poll::Ready(loop {
+ if let Some(s) = next.as_mut().as_pin_mut() {
+ if let Some(item) = ready!(s.try_poll_next(cx)?) {
+ break Some(Ok(item));
+ } else {
+ next.set(None);
}
- }
-
- if let Some(item) = ready!(self
- .as_mut()
- .next()
- .as_pin_mut()
- .unwrap()
- .try_poll_next(cx)?)
- {
- return Poll::Ready(Some(Ok(item)));
+ } else if let Some(s) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ next.set(Some(s));
} else {
- self.as_mut().next().set(None);
+ break None;
}
- }
+ })
}
}
diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs
index b8b8dc2..d85c1fe 100644
--- a/src/stream/try_stream/try_fold.rs
+++ b/src/stream/try_stream/try_fold.rs
@@ -3,19 +3,20 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`try_fold`](super::TryStreamExt::try_fold) method.
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryFold<St, Fut, T, F> {
+ #[pin]
stream: St,
f: F,
accum: Option<T>,
+ #[pin]
future: Option<Fut>,
}
-impl<St: Unpin, Fut: Unpin, T, F> Unpin for TryFold<St, Fut, T, F> {}
-
impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F>
where
St: fmt::Debug,
@@ -36,11 +37,6 @@ where St: TryStream,
F: FnMut(T, St::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = St::Error>,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_unpinned!(accum: Option<T>);
- unsafe_pinned!(future: Option<Fut>);
-
pub(super) fn new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F> {
TryFold {
stream,
@@ -68,43 +64,31 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
{
type Output = Result<T, St::Error>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- loop {
- // we're currently processing a future to produce a new accum value
- if self.accum.is_none() {
- let accum = match ready!(
- self.as_mut().future().as_pin_mut()
- .expect("TryFold polled after completion")
- .try_poll(cx)
- ) {
- Ok(accum) => accum,
- Err(e) => {
- // Indicate that the future can no longer be polled.
- self.as_mut().future().set(None);
- return Poll::Ready(Err(e));
- }
- };
- *self.as_mut().accum() = Some(accum);
- self.as_mut().future().set(None);
- }
-
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)) {
- Some(Ok(item)) => Some(item),
- Some(Err(e)) => {
- // Indicate that the future can no longer be polled.
- *self.as_mut().accum() = None;
- return Poll::Ready(Err(e));
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ let TryFold { mut stream, f, accum, mut future } = self.project();
+ Poll::Ready(loop {
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ // we're currently processing a future to produce a new accum value
+ let res = ready!(fut.try_poll(cx));
+ future.set(None);
+ match res {
+ Ok(a) => *accum = Some(a),
+ Err(e) => break Err(e),
+ }
+ } else if accum.is_some() {
+ // we're waiting on a new item from the stream
+ let res = ready!(stream.as_mut().try_poll_next(cx));
+ let a = accum.take().unwrap();
+ match res {
+ Some(Ok(item)) => future.set(Some(f(a, item))),
+ Some(Err(e)) => break Err(e),
+ None => break Ok(a),
}
- None => None,
- };
- let accum = self.as_mut().accum().take().unwrap();
-
- if let Some(e) = item {
- let future = (self.as_mut().f())(accum, e);
- self.as_mut().future().set(Some(future));
} else {
- return Poll::Ready(Ok(accum))
+ panic!("Fold polled after completion")
}
- }
+ })
}
}
diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs
index 2c71107..5fc91df 100644
--- a/src/stream/try_stream/try_for_each.rs
+++ b/src/stream/try_stream/try_for_each.rs
@@ -3,18 +3,19 @@ use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryForEach<St, Fut, F> {
+ #[pin]
stream: St,
f: F,
+ #[pin]
future: Option<Fut>,
}
-impl<St: Unpin, Fut: Unpin, F> Unpin for TryForEach<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for TryForEach<St, Fut, F>
where
St: fmt::Debug,
@@ -33,10 +34,6 @@ where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = St::Error>,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(future: Option<Fut>);
-
pub(super) fn new(stream: St, f: F) -> TryForEach<St, Fut, F> {
TryForEach {
stream,
@@ -53,20 +50,21 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F>
{
type Output = Result<(), St::Error>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ let TryForEach { mut stream, f, mut future } = self.project();
loop {
- if let Some(future) = self.as_mut().future().as_pin_mut() {
- ready!(future.try_poll(cx))?;
- }
- self.as_mut().future().set(None);
-
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(e) => {
- let future = (self.as_mut().f())(e);
- self.as_mut().future().set(Some(future));
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ ready!(fut.try_poll(cx))?;
+ future.set(None);
+ } else {
+ match ready!(stream.as_mut().try_poll_next(cx)?) {
+ Some(e) => future.set(Some(f(e))),
+ None => break,
}
- None => return Poll::Ready(Ok(())),
}
}
+ Poll::Ready(Ok(()))
}
}
diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs
index 19c3e5b..87fd465 100644
--- a/src/stream/try_stream/try_for_each_concurrent.rs
+++ b/src/stream/try_stream/try_for_each_concurrent.rs
@@ -6,24 +6,21 @@ use core::num::NonZeroUsize;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the
/// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
/// method.
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryForEachConcurrent<St, Fut, F> {
+ #[pin]
stream: Option<St>,
f: F,
futures: FuturesUnordered<Fut>,
limit: Option<NonZeroUsize>,
}
-impl<St, Fut, F> Unpin for TryForEachConcurrent<St, Fut, F>
-where St: Unpin,
- Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F>
where
St: fmt::Debug,
@@ -53,11 +50,6 @@ where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: Future<Output = Result<(), St::Error>>,
{
- unsafe_pinned!(stream: Option<St>);
- unsafe_unpinned!(f: F);
- unsafe_unpinned!(futures: FuturesUnordered<Fut>);
- unsafe_unpinned!(limit: Option<NonZeroUsize>);
-
pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> TryForEachConcurrent<St, Fut, F> {
TryForEachConcurrent {
stream: Some(stream),
@@ -76,15 +68,16 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
{
type Output = Result<(), St::Error>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ let TryForEachConcurrent { mut stream, f, futures, limit } = self.project();
loop {
let mut made_progress_this_iter = false;
- // Try and pull an item from the stream
- let current_len = self.futures.len();
// Check if we've already created a number of futures greater than `limit`
- if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) {
- let poll_res = match self.as_mut().stream().as_pin_mut() {
+ if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) {
+ let poll_res = match stream.as_mut().as_pin_mut() {
Some(stream) => stream.try_poll_next(cx),
None => Poll::Ready(None),
};
@@ -95,29 +88,28 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
Some(elem)
},
Poll::Ready(None) => {
- self.as_mut().stream().set(None);
+ stream.set(None);
None
}
Poll::Pending => None,
Poll::Ready(Some(Err(e))) => {
// Empty the stream and futures so that we know
// the future has completed.
- self.as_mut().stream().set(None);
- drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new()));
+ stream.set(None);
+ drop(mem::replace(futures, FuturesUnordered::new()));
return Poll::Ready(Err(e));
}
};
if let Some(elem) = elem {
- let next_future = (self.as_mut().f())(elem);
- self.as_mut().futures().push(next_future);
+ futures.push(f(elem));
}
}
- match self.as_mut().futures().poll_next_unpin(cx) {
+ match futures.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true,
Poll::Ready(None) => {
- if self.stream.is_none() {
+ if stream.is_none() {
return Poll::Ready(Ok(()))
}
},
@@ -125,8 +117,8 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
Poll::Ready(Some(Err(e))) => {
// Empty the stream and futures so that we know
// the future has completed.
- self.as_mut().stream().set(None);
- drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new()));
+ stream.set(None);
+ drop(mem::replace(futures, FuturesUnordered::new()));
return Poll::Ready(Err(e));
}
}
diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs
index a3d6803..624380f 100644
--- a/src/stream/try_stream/try_skip_while.rs
+++ b/src/stream/try_stream/try_skip_while.rs
@@ -5,21 +5,22 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while)
/// method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TrySkipWhile<St, Fut, F> where St: TryStream {
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Ok>,
done_skipping: bool,
}
-impl<St: Unpin + TryStream, Fut: Unpin, F> Unpin for TrySkipWhile<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F>
where
St: TryStream + fmt::Debug,
@@ -38,20 +39,9 @@ where
impl<St, Fut, F> TrySkipWhile<St, Fut, F>
where St: TryStream,
-{
- unsafe_pinned!(stream: St);
-}
-
-impl<St, Fut, F> TrySkipWhile<St, Fut, F>
- where St: TryStream,
F: FnMut(&St::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = St::Error>,
{
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending_fut: Option<Fut>);
- unsafe_unpinned!(pending_item: Option<St::Ok>);
- unsafe_unpinned!(done_skipping: bool);
-
pub(super) fn new(stream: St, f: F) -> TrySkipWhile<St, Fut, F> {
TrySkipWhile {
stream,
@@ -62,37 +52,7 @@ impl<St, Fut, F> TrySkipWhile<St, Fut, F>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
@@ -102,34 +62,34 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
{
type Item = Result<St::Ok, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.done_skipping {
- return self.as_mut().stream().try_poll_next(cx);
- }
+ #[project]
+ let TrySkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project();
- loop {
- if self.pending_item.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(e) => e,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(&item);
- self.as_mut().pending_fut().set(Some(fut));
- *self.as_mut().pending_item() = Some(item);
- }
-
- let skipped = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().try_poll(cx)?);
- let item = self.as_mut().pending_item().take().unwrap();
- self.as_mut().pending_fut().set(None);
+ if *done_skipping {
+ return stream.try_poll_next(cx);
+ }
- if !skipped {
- *self.as_mut().done_skipping() = true;
- return Poll::Ready(Some(Ok(item)))
+ Poll::Ready(loop {
+ if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ let skipped = ready!(fut.try_poll(cx)?);
+ let item = pending_item.take();
+ pending_fut.set(None);
+ if !skipped {
+ *done_skipping = true;
+ break item.map(Ok);
+ }
+ } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ pending_fut.set(Some(f(&item)));
+ *pending_item = Some(item);
+ } else {
+ break None;
}
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs
index 6266274..8da1248 100644
--- a/src/stream/try_stream/try_unfold.rs
+++ b/src/stream/try_stream/try_unfold.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
///
@@ -67,15 +67,15 @@ where
}
/// Stream for the [`try_unfold`] function.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TryUnfold<T, F, Fut> {
f: F,
state: Option<T>,
+ #[pin]
fut: Option<Fut>,
}
-impl<T, F, Fut: Unpin> Unpin for TryUnfold<T, F, Fut> {}
-
impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
where
T: fmt::Debug,
@@ -89,12 +89,6 @@ where
}
}
-impl<T, F, Fut> TryUnfold<T, F, Fut> {
- unsafe_unpinned!(f: F);
- unsafe_unpinned!(state: Option<T>);
- unsafe_pinned!(fut: Option<Fut>);
-}
-
impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
@@ -102,27 +96,30 @@ where
{
type Item = Result<Item, Fut::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Fut::Error>>> {
- if let Some(state) = self.as_mut().state().take() {
- let fut = (self.as_mut().f())(state);
- self.as_mut().fut().set(Some(fut));
+ #[project]
+ let TryUnfold {f, state, mut fut } = self.project();
+
+ if let Some(state) = state.take() {
+ fut.set(Some(f(state)));
}
- match self.as_mut().fut().as_pin_mut() {
+ match fut.as_mut().as_pin_mut() {
None => {
// The future previously errored
Poll::Ready(None)
}
- Some(fut) => {
- let step = ready!(fut.try_poll(cx));
- self.as_mut().fut().set(None);
+ Some(future) => {
+ let step = ready!(future.try_poll(cx));
+ fut.set(None);
match step {
Ok(Some((item, next_state))) => {
- *self.as_mut().state() = Some(next_state);
+ *state = Some(next_state);
Poll::Ready(Some(Ok(item)))
}
Ok(None) => Poll::Ready(None),
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs
index 3153f83..0279571 100644
--- a/src/stream/unfold.rs
+++ b/src/stream/unfold.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Creates a `Stream` from a seed and a closure returning a `Future`.
///
@@ -56,15 +56,15 @@ pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut>
}
/// Stream for the [`unfold`] function.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct Unfold<T, F, Fut> {
f: F,
state: Option<T>,
+ #[pin]
fut: Option<Fut>,
}
-impl<T, F, Fut: Unpin> Unpin for Unfold<T, F, Fut> {}
-
impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
where
T: fmt::Debug,
@@ -78,12 +78,6 @@ where
}
}
-impl<T, F, Fut> Unfold<T, F, Fut> {
- unsafe_unpinned!(f: F);
- unsafe_unpinned!(state: Option<T>);
- unsafe_pinned!(fut: Option<Fut>);
-}
-
impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
@@ -99,21 +93,24 @@ impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
{
type Item = Item;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if let Some(state) = self.as_mut().state().take() {
- let fut = (self.as_mut().f())(state);
- self.as_mut().fut().set(Some(fut));
+ #[project]
+ let Unfold { state, f, mut fut } = self.project();
+
+ if let Some(state) = state.take() {
+ fut.set(Some(f(state)));
}
- let step = ready!(self.as_mut().fut().as_pin_mut()
+ let step = ready!(fut.as_mut().as_pin_mut()
.expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx));
- self.as_mut().fut().set(None);
+ fut.set(None);
if let Some((item, next_state)) = step {
- *self.as_mut().state() = Some(next_state);
+ *state = Some(next_state);
Poll::Ready(Some(item))
} else {
Poll::Ready(None)