aboutsummaryrefslogtreecommitdiff
path: root/src/stream/try_stream/try_concat.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/try_stream/try_concat.rs')
-rw-r--r--src/stream/try_stream/try_concat.rs36
1 files changed, 16 insertions, 20 deletions
diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs
index 395f166..8c9710b 100644
--- a/src/stream/try_stream/try_concat.rs
+++ b/src/stream/try_stream/try_concat.rs
@@ -2,26 +2,23 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryConcat<St: TryStream> {
+ #[pin]
stream: St,
accum: Option<St::Ok>,
}
-impl<St: TryStream + Unpin> Unpin for TryConcat<St> {}
-
impl<St> TryConcat<St>
where
St: TryStream,
St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(accum: Option<St::Ok>);
-
pub(super) fn new(stream: St) -> TryConcat<St> {
TryConcat {
stream,
@@ -37,21 +34,20 @@ where
{
type Output = Result<St::Ok, St::Error>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- loop {
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(x) => {
- let accum = self.as_mut().accum();
- if let Some(a) = accum {
- a.extend(x)
- } else {
- *accum = Some(x)
- }
- },
- None => {
- return Poll::Ready(Ok(self.as_mut().accum().take().unwrap_or_default()))
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ let TryConcat { mut stream, accum } = self.project();
+ Poll::Ready(Ok(loop {
+ if let Some(x) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ if let Some(a) = accum {
+ a.extend(x)
+ } else {
+ *accum = Some(x)
}
+ } else {
+ break accum.take().unwrap_or_default();
}
- }
+ }))
}
}