aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream/peek.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/stream/peek.rs')
-rw-r--r--src/stream/stream/peek.rs92
1 files changed, 91 insertions, 1 deletions
diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs
index 217faba..c72dfc3 100644
--- a/src/stream/stream/peek.rs
+++ b/src/stream/stream/peek.rs
@@ -33,7 +33,7 @@ impl<St: Stream> Peekable<St> {
delegate_access_inner!(stream, St, (.));
- /// Produces a `Peek` future which retrieves a reference to the next item
+ /// Produces a future which retrieves a reference to the next item
/// in the stream, or `None` if the underlying stream terminates.
pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
Peek { inner: Some(self) }
@@ -57,6 +57,54 @@ impl<St: Stream> Peekable<St> {
})
}
+ /// Produces a future which retrieves a mutable reference to the next item
+ /// in the stream, or `None` if the underlying stream terminates.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ /// use futures::pin_mut;
+ ///
+ /// let stream = stream::iter(vec![1, 2, 3]).peekable();
+ /// pin_mut!(stream);
+ ///
+ /// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1));
+ /// assert_eq!(stream.as_mut().next().await, Some(1));
+ ///
+ /// // Peek into the stream and modify the value which will be returned next
+ /// if let Some(p) = stream.as_mut().peek_mut().await {
+ /// if *p == 2 {
+ /// *p = 5;
+ /// }
+ /// }
+ ///
+ /// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]);
+ /// # });
+ /// ```
+ pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> {
+ PeekMut { inner: Some(self) }
+ }
+
+ /// Peek retrieves a mutable reference to the next item in the stream.
+ pub fn poll_peek_mut(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<&mut St::Item>> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if this.peeked.is_some() {
+ break this.peeked.as_mut();
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+ *this.peeked = Some(item);
+ } else {
+ break None;
+ }
+ })
+ }
+
/// Creates a future which will consume and return the next value of this
/// stream if a condition is true.
///
@@ -221,6 +269,48 @@ where
}
pin_project! {
+ /// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method.
+ #[must_use = "futures do nothing unless polled"]
+ pub struct PeekMut<'a, St: Stream> {
+ inner: Option<Pin<&'a mut Peekable<St>>>,
+ }
+}
+
+impl<St> fmt::Debug for PeekMut<'_, St>
+where
+ St: Stream + fmt::Debug,
+ St::Item: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("PeekMut").field("inner", &self.inner).finish()
+ }
+}
+
+impl<St: Stream> FusedFuture for PeekMut<'_, St> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_none()
+ }
+}
+
+impl<'a, St> Future for PeekMut<'a, St>
+where
+ St: Stream,
+{
+ type Output = Option<&'a mut St::Item>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let inner = self.project().inner;
+ if let Some(peekable) = inner {
+ ready!(peekable.as_mut().poll_peek_mut(cx));
+
+ inner.take().unwrap().poll_peek_mut(cx)
+ } else {
+ panic!("PeekMut polled after completion")
+ }
+ }
+}
+
+pin_project! {
/// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
#[must_use = "futures do nothing unless polled"]
pub struct NextIf<'a, St: Stream, F> {