aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream/chunks.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/stream/chunks.rs')
-rw-r--r--src/stream/stream/chunks.rs60
1 files changed, 15 insertions, 45 deletions
diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs
index b42d1d1..d24c31c 100644
--- a/src/stream/stream/chunks.rs
+++ b/src/stream/stream/chunks.rs
@@ -3,26 +3,23 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
use core::mem;
use core::pin::Pin;
use alloc::vec::Vec;
/// Stream for the [`chunks`](super::StreamExt::chunks) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Chunks<St: Stream> {
+ #[pin]
stream: Fuse<St>,
items: Vec<St::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
-impl<St: Unpin + Stream> Unpin for Chunks<St> {}
-
impl<St: Stream> Chunks<St> where St: Stream {
- unsafe_unpinned!(items: Vec<St::Item>);
- unsafe_pinned!(stream: Fuse<St>);
-
pub(super) fn new(stream: St, capacity: usize) -> Chunks<St> {
assert!(capacity > 0);
@@ -33,70 +30,43 @@ impl<St: Stream> Chunks<St> where St: Stream {
}
}
- fn take(mut self: Pin<&mut Self>) -> Vec<St::Item> {
+ fn take(self: Pin<&mut Self>) -> Vec<St::Item> {
let cap = self.cap;
- mem::replace(self.as_mut().items(), Vec::with_capacity(cap))
+ mem::replace(self.project().items, Vec::with_capacity(cap))
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- self.stream.get_ref()
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- self.stream.get_mut()
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream().get_pin_mut()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream.into_inner()
- }
+ delegate_access_inner!(stream, St, (.));
}
impl<St: Stream> Stream for Chunks<St> {
type Item = Vec<St::Item>;
+ #[project]
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ #[project]
+ let Chunks { mut stream, items, cap } = self.as_mut().project();
loop {
- match ready!(self.as_mut().stream().poll_next(cx)) {
+ match ready!(stream.as_mut().poll_next(cx)) {
// Push the item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Some(item) => {
- self.as_mut().items().push(item);
- if self.items.len() >= self.cap {
- return Poll::Ready(Some(self.as_mut().take()))
+ items.push(item);
+ if items.len() >= *cap {
+ return Poll::Ready(Some(self.take()))
}
}
// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
None => {
- let last = if self.items.is_empty() {
+ let last = if items.is_empty() {
None
} else {
- let full_buf = mem::replace(self.as_mut().items(), Vec::new());
+ let full_buf = mem::replace(items, Vec::new());
Some(full_buf)
};