diff options
Diffstat (limited to 'src/stream/stream/peek.rs')
-rw-r--r-- | src/stream/stream/peek.rs | 92 |
1 files changed, 91 insertions, 1 deletions
diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs index 217faba..c72dfc3 100644 --- a/src/stream/stream/peek.rs +++ b/src/stream/stream/peek.rs @@ -33,7 +33,7 @@ impl<St: Stream> Peekable<St> { delegate_access_inner!(stream, St, (.)); - /// Produces a `Peek` future which retrieves a reference to the next item + /// Produces a future which retrieves a reference to the next item /// in the stream, or `None` if the underlying stream terminates. pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { Peek { inner: Some(self) } @@ -57,6 +57,54 @@ impl<St: Stream> Peekable<St> { }) } + /// Produces a future which retrieves a mutable reference to the next item + /// in the stream, or `None` if the underlying stream terminates. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// use futures::pin_mut; + /// + /// let stream = stream::iter(vec![1, 2, 3]).peekable(); + /// pin_mut!(stream); + /// + /// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1)); + /// assert_eq!(stream.as_mut().next().await, Some(1)); + /// + /// // Peek into the stream and modify the value which will be returned next + /// if let Some(p) = stream.as_mut().peek_mut().await { + /// if *p == 2 { + /// *p = 5; + /// } + /// } + /// + /// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]); + /// # }); + /// ``` + pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> { + PeekMut { inner: Some(self) } + } + + /// Peek retrieves a mutable reference to the next item in the stream. + pub fn poll_peek_mut( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<&mut St::Item>> { + let mut this = self.project(); + + Poll::Ready(loop { + if this.peeked.is_some() { + break this.peeked.as_mut(); + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + *this.peeked = Some(item); + } else { + break None; + } + }) + } + /// Creates a future which will consume and return the next value of this /// stream if a condition is true. /// @@ -221,6 +269,48 @@ where } pin_project! { + /// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method. + #[must_use = "futures do nothing unless polled"] + pub struct PeekMut<'a, St: Stream> { + inner: Option<Pin<&'a mut Peekable<St>>>, + } +} + +impl<St> fmt::Debug for PeekMut<'_, St> +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeekMut").field("inner", &self.inner).finish() + } +} + +impl<St: Stream> FusedFuture for PeekMut<'_, St> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<'a, St> Future for PeekMut<'a, St> +where + St: Stream, +{ + type Output = Option<&'a mut St::Item>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = self.project().inner; + if let Some(peekable) = inner { + ready!(peekable.as_mut().poll_peek_mut(cx)); + + inner.take().unwrap().poll_peek_mut(cx) + } else { + panic!("PeekMut polled after completion") + } + } +} + +pin_project! { /// Future for the [`Peekable::next_if`](self::Peekable::next_if) method. #[must_use = "futures do nothing unless polled"] pub struct NextIf<'a, St: Stream, F> { |