aboutsummaryrefslogtreecommitdiff
path: root/src/stream/try_stream/try_flatten.rs
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2020-05-08 19:26:17 -0700
committerChih-Hung Hsieh <chh@google.com>2020-05-11 21:06:51 -0700
commit52627c866ba9ce070950c81a3a98f844a55305cf (patch)
treeb261d9acf1a15a6d9d39e311930effa8fdaccd43 /src/stream/try_stream/try_flatten.rs
parent032d3071c35e3fc8fd539889f96691b67d489bbb (diff)
downloadfutures-util-52627c866ba9ce070950c81a3a98f844a55305cf.tar.gz
Upgrade rust/crates/futures-util to 0.3.5
* Update Android.bp with new features and dependent packages for futures-* 0.3.5. * New dependencies of pin-project and pin-project-internal. Test: mm in external/rust/crates Change-Id: I705a08e44c8598d28b4b465170df8ed206df1494
Diffstat (limited to 'src/stream/try_stream/try_flatten.rs')
-rw-r--r--src/stream/try_stream/try_flatten.rs84
1 files changed, 19 insertions, 65 deletions
diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs
index 5f81b22..a528639 100644
--- a/src/stream/try_stream/try_flatten.rs
+++ b/src/stream/try_stream/try_flatten.rs
@@ -3,34 +3,22 @@ use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::unsafe_pinned;
+use pin_project::{pin_project, project};
/// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryFlatten<St>
where
St: TryStream,
{
+ #[pin]
stream: St,
+ #[pin]
next: Option<St::Ok>,
}
-impl<St> Unpin for TryFlatten<St>
-where
- St: TryStream + Unpin,
- St::Ok: Unpin,
-{
-}
-
-impl<St> TryFlatten<St>
-where
- St: TryStream,
-{
- unsafe_pinned!(stream: St);
- unsafe_pinned!(next: Option<St::Ok>);
-}
-
impl<St> TryFlatten<St>
where
St: TryStream,
@@ -41,37 +29,7 @@ where
Self { stream, next: None }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St> FusedStream for TryFlatten<St>
@@ -93,27 +51,23 @@ where
{
type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- loop {
- if self.next.is_none() {
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(e) => self.as_mut().next().set(Some(e)),
- None => return Poll::Ready(None),
+ #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ #[project]
+ let TryFlatten { mut stream, mut next } = self.project();
+ Poll::Ready(loop {
+ if let Some(s) = next.as_mut().as_pin_mut() {
+ if let Some(item) = ready!(s.try_poll_next(cx)?) {
+ break Some(Ok(item));
+ } else {
+ next.set(None);
}
- }
-
- if let Some(item) = ready!(self
- .as_mut()
- .next()
- .as_pin_mut()
- .unwrap()
- .try_poll_next(cx)?)
- {
- return Poll::Ready(Some(Ok(item)));
+ } else if let Some(s) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ next.set(Some(s));
} else {
- self.as_mut().next().set(None);
+ break None;
}
- }
+ })
}
}