diff options
Diffstat (limited to 'src/stream/collect.rs')
-rw-r--r-- | src/stream/collect.rs | 141 |
1 files changed, 64 insertions, 77 deletions
diff --git a/src/stream/collect.rs b/src/stream/collect.rs index 4649428..1aafc30 100644 --- a/src/stream/collect.rs +++ b/src/stream/collect.rs @@ -1,7 +1,7 @@ use crate::stream::Stream; -use bytes::{Buf, BufMut, Bytes, BytesMut}; use core::future::Future; +use core::marker::PhantomPinned; use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; @@ -10,7 +10,7 @@ use pin_project_lite::pin_project; // Do not export this struct until `FromStream` can be unsealed. pin_project! { /// Future returned by the [`collect`](super::StreamExt::collect) method. - #[must_use = "streams do nothing unless polled"] + #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Collect<T, U> where @@ -19,7 +19,10 @@ pin_project! { { #[pin] stream: T, - collection: U::Collection, + collection: U::InternalCollection, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -42,9 +45,13 @@ where { pub(super) fn new(stream: T) -> Collect<T, U> { let (lower, upper) = stream.size_hint(); - let collection = U::initialize(lower, upper); + let collection = U::initialize(sealed::Internal, lower, upper); - Collect { stream, collection } + Collect { + stream, + collection, + _pin: PhantomPinned, + } } } @@ -64,12 +71,12 @@ where let item = match ready!(me.stream.poll_next(cx)) { Some(item) => item, None => { - return Ready(U::finalize(&mut me.collection)); + return Ready(U::finalize(sealed::Internal, &mut me.collection)); } }; - if !U::extend(&mut me.collection, item) { - return Ready(U::finalize(&mut me.collection)); + if !U::extend(sealed::Internal, &mut me.collection, item) { + return Ready(U::finalize(sealed::Internal, &mut me.collection)); } } } @@ -80,32 +87,32 @@ where impl FromStream<()> for () {} impl sealed::FromStreamPriv<()> for () { - type Collection = (); + type InternalCollection = (); - fn initialize(_lower: usize, _upper: Option<usize>) {} + fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {} - fn extend(_collection: &mut (), _item: ()) -> bool { + fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool { true } - fn finalize(_collection: &mut ()) {} + fn finalize(_: sealed::Internal, _collection: &mut ()) {} } impl<T: AsRef<str>> FromStream<T> for String {} impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { - type Collection = String; + type InternalCollection = String; - fn initialize(_lower: usize, _upper: Option<usize>) -> String { + fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String { String::new() } - fn extend(collection: &mut String, item: T) -> bool { + fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool { collection.push_str(item.as_ref()); true } - fn finalize(collection: &mut String) -> String { + fn finalize(_: sealed::Internal, collection: &mut String) -> String { mem::replace(collection, String::new()) } } @@ -113,18 +120,18 @@ impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { impl<T> FromStream<T> for Vec<T> {} impl<T> sealed::FromStreamPriv<T> for Vec<T> { - type Collection = Vec<T>; + type InternalCollection = Vec<T>; - fn initialize(lower: usize, _upper: Option<usize>) -> Vec<T> { + fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> { Vec::with_capacity(lower) } - fn extend(collection: &mut Vec<T>, item: T) -> bool { + fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { collection.push(item); true } - fn finalize(collection: &mut Vec<T>) -> Vec<T> { + fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> { mem::replace(collection, vec![]) } } @@ -132,18 +139,19 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> { impl<T> FromStream<T> for Box<[T]> {} impl<T> sealed::FromStreamPriv<T> for Box<[T]> { - type Collection = Vec<T>; + type InternalCollection = Vec<T>; - fn initialize(lower: usize, upper: Option<usize>) -> Vec<T> { - <Vec<T> as sealed::FromStreamPriv<T>>::initialize(lower, upper) + fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> { + <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper) } - fn extend(collection: &mut Vec<T>, item: T) -> bool { - <Vec<T> as sealed::FromStreamPriv<T>>::extend(collection, item) + fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { + <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item) } - fn finalize(collection: &mut Vec<T>) -> Box<[T]> { - <Vec<T> as sealed::FromStreamPriv<T>>::finalize(collection).into_boxed_slice() + fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> { + <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection) + .into_boxed_slice() } } @@ -153,18 +161,26 @@ impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E> where U: FromStream<T>, { - type Collection = Result<U::Collection, E>; + type InternalCollection = Result<U::InternalCollection, E>; - fn initialize(lower: usize, upper: Option<usize>) -> Result<U::Collection, E> { - Ok(U::initialize(lower, upper)) + fn initialize( + _: sealed::Internal, + lower: usize, + upper: Option<usize>, + ) -> Result<U::InternalCollection, E> { + Ok(U::initialize(sealed::Internal, lower, upper)) } - fn extend(collection: &mut Self::Collection, item: Result<T, E>) -> bool { + fn extend( + _: sealed::Internal, + collection: &mut Self::InternalCollection, + item: Result<T, E>, + ) -> bool { assert!(collection.is_ok()); match item { Ok(item) => { let collection = collection.as_mut().ok().expect("invalid state"); - U::extend(collection, item) + U::extend(sealed::Internal, collection, item) } Err(err) => { *collection = Err(err); @@ -173,11 +189,11 @@ where } } - fn finalize(collection: &mut Self::Collection) -> Result<U, E> { + fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> { if let Ok(collection) = collection.as_mut() { - Ok(U::finalize(collection)) + Ok(U::finalize(sealed::Internal, collection)) } else { - let res = mem::replace(collection, Ok(U::initialize(0, Some(0)))); + let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); if let Err(err) = res { Err(err) @@ -188,59 +204,30 @@ where } } -impl<T: Buf> FromStream<T> for Bytes {} - -impl<T: Buf> sealed::FromStreamPriv<T> for Bytes { - type Collection = BytesMut; - - fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut { - BytesMut::new() - } - - fn extend(collection: &mut BytesMut, item: T) -> bool { - collection.put(item); - true - } - - fn finalize(collection: &mut BytesMut) -> Bytes { - mem::replace(collection, BytesMut::new()).freeze() - } -} - -impl<T: Buf> FromStream<T> for BytesMut {} - -impl<T: Buf> sealed::FromStreamPriv<T> for BytesMut { - type Collection = BytesMut; - - fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut { - BytesMut::new() - } - - fn extend(collection: &mut BytesMut, item: T) -> bool { - collection.put(item); - true - } - - fn finalize(collection: &mut BytesMut) -> BytesMut { - mem::replace(collection, BytesMut::new()) - } -} - pub(crate) mod sealed { #[doc(hidden)] pub trait FromStreamPriv<T> { /// Intermediate type used during collection process - type Collection; + /// + /// The name of this type is internal and cannot be relied upon. + type InternalCollection; /// Initialize the collection - fn initialize(lower: usize, upper: Option<usize>) -> Self::Collection; + fn initialize( + internal: Internal, + lower: usize, + upper: Option<usize>, + ) -> Self::InternalCollection; /// Extend the collection with the received item /// /// Return `true` to continue streaming, `false` complete collection. - fn extend(collection: &mut Self::Collection, item: T) -> bool; + fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool; /// Finalize collection into target type. - fn finalize(collection: &mut Self::Collection) -> Self; + fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self; } + + #[allow(missing_debug_implementations)] + pub struct Internal; } |