aboutsummaryrefslogtreecommitdiff
path: root/src/stream/try_stream/try_skip_while.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/try_stream/try_skip_while.rs')
-rw-r--r--src/stream/try_stream/try_skip_while.rs94
1 files changed, 27 insertions, 67 deletions
diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs
index a3d6803..624380f 100644
--- a/src/stream/try_stream/try_skip_while.rs
+++ b/src/stream/try_stream/try_skip_while.rs
@@ -5,21 +5,22 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while)
/// method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TrySkipWhile<St, Fut, F> where St: TryStream {
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Ok>,
done_skipping: bool,
}
-impl<St: Unpin + TryStream, Fut: Unpin, F> Unpin for TrySkipWhile<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F>
where
St: TryStream + fmt::Debug,
@@ -38,20 +39,9 @@ where
impl<St, Fut, F> TrySkipWhile<St, Fut, F>
where St: TryStream,
-{
- unsafe_pinned!(stream: St);
-}
-
-impl<St, Fut, F> TrySkipWhile<St, Fut, F>
- where St: TryStream,
F: FnMut(&St::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = St::Error>,
{
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending_fut: Option<Fut>);
- unsafe_unpinned!(pending_item: Option<St::Ok>);
- unsafe_unpinned!(done_skipping: bool);
-
pub(super) fn new(stream: St, f: F) -> TrySkipWhile<St, Fut, F> {
TrySkipWhile {
stream,
@@ -62,37 +52,7 @@ impl<St, Fut, F> TrySkipWhile<St, Fut, F>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
@@ -102,34 +62,34 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
{
type Item = Result<St::Ok, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.done_skipping {
- return self.as_mut().stream().try_poll_next(cx);
- }
+ #[project]
+ let TrySkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project();
- loop {
- if self.pending_item.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(e) => e,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(&item);
- self.as_mut().pending_fut().set(Some(fut));
- *self.as_mut().pending_item() = Some(item);
- }
-
- let skipped = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().try_poll(cx)?);
- let item = self.as_mut().pending_item().take().unwrap();
- self.as_mut().pending_fut().set(None);
+ if *done_skipping {
+ return stream.try_poll_next(cx);
+ }
- if !skipped {
- *self.as_mut().done_skipping() = true;
- return Poll::Ready(Some(Ok(item)))
+ Poll::Ready(loop {
+ if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ let skipped = ready!(fut.try_poll(cx)?);
+ let item = pending_item.take();
+ pending_fut.set(None);
+ if !skipped {
+ *done_skipping = true;
+ break item.map(Ok);
+ }
+ } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ pending_fut.set(Some(f(&item)));
+ *pending_item = Some(item);
+ } else {
+ break None;
}
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {