diff options
author | Jeffrey Vander Stoep <jeffv@google.com> | 2022-12-12 16:05:43 +0000 |
---|---|---|
committer | Gerrit Code Review <noreply-gerritcodereview@google.com> | 2022-12-12 16:05:43 +0000 |
commit | caf7e3a063bea848e425e7bd76a4b5187697e3d2 (patch) | |
tree | 7ba0ba051758d1fc44bf179dda3fda51b245ca65 | |
parent | 72602f9d88ababd4d05ea7a97bbb66cdc271d6ba (diff) | |
parent | 68f408d3461bea2e77bf115ab7b061236451e531 (diff) | |
download | futures-util-caf7e3a063bea848e425e7bd76a4b5187697e3d2.tar.gz |
Merge "Upgrade futures-util to 0.3.25"main-16k-with-phones
59 files changed, 751 insertions, 265 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index b52386f..bd04ae0 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44" + "sha1": "77d82198c5afd04af3e760a6aa50b7e875289fc3" }, "path_in_vcs": "futures-util" }
\ No newline at end of file @@ -42,7 +42,7 @@ rust_test { host_supported: true, crate_name: "futures_util", cargo_env_compat: true, - cargo_pkg_version: "0.3.21", + cargo_pkg_version: "0.3.25", srcs: ["src/lib.rs"], test_suites: ["general-tests"], auto_gen_config: true, @@ -86,7 +86,7 @@ rust_library { host_supported: true, crate_name: "futures_util", cargo_env_compat: true, - cargo_pkg_version: "0.3.21", + cargo_pkg_version: "0.3.25", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -13,11 +13,12 @@ edition = "2018" rust-version = "1.45" name = "futures-util" -version = "0.3.21" +version = "0.3.25" description = """ Common utilities and extension traits for the futures-rs library. """ homepage = "https://rust-lang.github.io/futures-rs" +readme = "README.md" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" @@ -29,33 +30,33 @@ rustdoc-args = [ ] [dependencies.futures-channel] -version = "0.3.21" +version = "0.3.25" features = ["std"] optional = true default-features = false [dependencies.futures-core] -version = "0.3.21" +version = "0.3.25" default-features = false [dependencies.futures-io] -version = "0.3.21" +version = "0.3.25" features = ["std"] optional = true default-features = false [dependencies.futures-macro] -version = "=0.3.21" +version = "=0.3.25" optional = true default-features = false [dependencies.futures-sink] -version = "0.3.21" +version = "0.3.25" optional = true default-features = false [dependencies.futures-task] -version = "0.3.21" +version = "0.3.25" default-features = false [dependencies.futures_01] @@ -68,7 +69,7 @@ version = "2.2" optional = true [dependencies.pin-project-lite] -version = "0.2.4" +version = "0.2.6" [dependencies.pin-utils] version = "0.1.0" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 46ec854..aeecf0f 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,6 +1,6 @@ [package] name = "futures-util" -version = "0.3.21" +version = "0.3.25" edition = "2018" rust-version = "1.45" license = "MIT OR Apache-2.0" @@ -34,18 +34,18 @@ write-all-vectored = ["io"] cfg-target-has-atomic = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.21", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.21", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.21", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.21", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.25", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.25", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.25", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.25", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.25", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.25", default-features = false, optional = true } slab = { version = "0.4.2", optional = true } memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" -pin-project-lite = "0.2.4" +pin-project-lite = "0.2.6" [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } @@ -1,3 +1,7 @@ +# This project was upgraded with external_updater. +# Usage: tools/external_updater/updater.sh update rust/crates/futures-util +# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md + name: "futures-util" description: "Common utilities and extension traits for the futures-rs library." third_party { @@ -7,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-util/futures-util-0.3.21.crate" + value: "https://static.crates.io/crates/futures-util/futures-util-0.3.25.crate" } - version: "0.3.21" + version: "0.3.25" license_type: NOTICE last_upgrade_date { year: 2022 - month: 3 - day: 1 + month: 12 + day: 12 } } diff --git a/benches/select.rs b/benches/select.rs new file mode 100644 index 0000000..5410a95 --- /dev/null +++ b/benches/select.rs @@ -0,0 +1,35 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::executor::block_on; +use futures::stream::{repeat, select, StreamExt}; + +#[bench] +fn select_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + + b.iter(|| { + let stream1 = repeat(1).take(STREAM_COUNT); + let stream2 = repeat(2).take(STREAM_COUNT); + let stream3 = repeat(3).take(STREAM_COUNT); + let stream4 = repeat(4).take(STREAM_COUNT); + let stream5 = repeat(5).take(STREAM_COUNT); + let stream6 = repeat(6).take(STREAM_COUNT); + let stream7 = repeat(7).take(STREAM_COUNT); + let count = block_on(async { + let count = select( + stream1, + select( + stream2, + select(stream3, select(stream4, select(stream5, select(stream6, stream7)))), + ), + ) + .count() + .await; + count + }); + assert_eq!(count, STREAM_COUNT * 7); + }); +} diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs index 9b05d4b..16ec628 100644 --- a/no_atomic_cas.rs +++ b/no_atomic_cas.rs @@ -2,12 +2,16 @@ // It is not intended for manual editing. const NO_ATOMIC_CAS: &[&str] = &[ + "armv4t-none-eabi", + "armv5te-none-eabi", "avr-unknown-gnu-atmega328", "bpfeb-unknown-none", "bpfel-unknown-none", "msp430-none-elf", "riscv32i-unknown-none-elf", + "riscv32im-unknown-none-elf", "riscv32imc-unknown-none-elf", "thumbv4t-none-eabi", + "thumbv5te-none-eabi", "thumbv6m-none-eabi", ]; diff --git a/src/abortable.rs b/src/abortable.rs index bb82dd0..e0afd47 100644 --- a/src/abortable.rs +++ b/src/abortable.rs @@ -75,7 +75,7 @@ impl<T> Abortable<T> { /// in calls to `Abortable::new`. #[derive(Debug)] pub struct AbortRegistration { - inner: Arc<AbortInner>, + pub(crate) inner: Arc<AbortInner>, } /// A handle to an `Abortable` task. @@ -100,9 +100,9 @@ impl AbortHandle { // Inner type storing the waker to awaken and a bool indicating that it // should be aborted. #[derive(Debug)] -struct AbortInner { - waker: AtomicWaker, - aborted: AtomicBool, +pub(crate) struct AbortInner { + pub(crate) waker: AtomicWaker, + pub(crate) aborted: AtomicBool, } /// Indicator that the `Abortable` task was aborted. diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs index 9b31932..9859315 100644 --- a/src/future/future/shared.rs +++ b/src/future/future/shared.rs @@ -262,19 +262,20 @@ where let waker = waker_ref(&inner.notifier); let mut cx = Context::from_waker(&waker); - struct Reset<'a>(&'a AtomicUsize); + struct Reset<'a> { + state: &'a AtomicUsize, + did_not_panic: bool, + } impl Drop for Reset<'_> { fn drop(&mut self) { - use std::thread; - - if thread::panicking() { - self.0.store(POISONED, SeqCst); + if !self.did_not_panic { + self.state.store(POISONED, SeqCst); } } } - let _reset = Reset(&inner.notifier.state); + let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false }; let output = { let future = unsafe { @@ -284,12 +285,15 @@ where } }; - match future.poll(&mut cx) { + let poll_result = future.poll(&mut cx); + reset.did_not_panic = true; + + match poll_result { Poll::Pending => { if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() { // Success - drop(_reset); + drop(reset); this.inner = Some(inner); return Poll::Pending; } else { @@ -313,7 +317,7 @@ where waker.wake(); } - drop(_reset); // Make borrow checker happy + drop(reset); // Make borrow checker happy drop(wakers_guard); // Safety: We're in the COMPLETE state diff --git a/src/future/join_all.rs b/src/future/join_all.rs index 2e52ac1..7dc159b 100644 --- a/src/future/join_all.rs +++ b/src/future/join_all.rs @@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; -fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { +pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. @@ -32,9 +32,9 @@ where } #[cfg(not(futures_no_atomic_cas))] -const SMALL: usize = 30; +pub(crate) const SMALL: usize = 30; -pub(crate) enum JoinAllKind<F> +enum JoinAllKind<F> where F: Future, { @@ -104,26 +104,25 @@ where I: IntoIterator, I::Item: Future, { + let iter = iter.into_iter(); + #[cfg(futures_no_atomic_cas)] { - let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into(); - let kind = JoinAllKind::Small { elems }; + let kind = + JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) } + #[cfg(not(futures_no_atomic_cas))] { - let iter = iter.into_iter(); let kind = match iter.size_hint().1 { - None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }, - Some(max) => { - if max <= SMALL { - let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(); - JoinAllKind::Small { elems } - } else { - JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() } - } - } + Some(max) if max <= SMALL => JoinAllKind::Small { + elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(), + }, + _ => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }, }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) } } diff --git a/src/future/pending.rs b/src/future/pending.rs index 92c78d5..b8e2868 100644 --- a/src/future/pending.rs +++ b/src/future/pending.rs @@ -33,6 +33,7 @@ impl<T> FusedFuture for Pending<T> { /// unreachable!(); /// # }); /// ``` +#[cfg_attr(docsrs, doc(alias = "never"))] pub fn pending<T>() -> Pending<T> { assert_future::<T, _>(Pending { _data: marker::PhantomData }) } diff --git a/src/future/select.rs b/src/future/select.rs index bd44f20..e693a30 100644 --- a/src/future/select.rs +++ b/src/future/select.rs @@ -100,16 +100,17 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Left((x, b))), - Poll::Pending => match b.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Right((x, a))), - Poll::Pending => { - self.inner = Some((a, b)); - Poll::Pending - } - }, + + if let Poll::Ready(val) = a.poll_unpin(cx) { + return Poll::Ready(Either::Left((val, b))); + } + + if let Poll::Ready(val) = b.poll_unpin(cx) { + return Poll::Ready(Either::Right((val, a))); } + + self.inner = Some((a, b)); + Poll::Pending } } diff --git a/src/future/select_all.rs b/src/future/select_all.rs index 106e508..07d65ca 100644 --- a/src/future/select_all.rs +++ b/src/future/select_all.rs @@ -59,7 +59,7 @@ impl<Fut: Future + Unpin> Future for SelectAll<Fut> { match item { Some((idx, res)) => { let _ = self.inner.swap_remove(idx); - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); Poll::Ready((res, idx, rest)) } None => Poll::Pending, diff --git a/src/future/select_ok.rs b/src/future/select_ok.rs index 0ad83c6..5d55799 100644 --- a/src/future/select_ok.rs +++ b/src/future/select_ok.rs @@ -59,7 +59,7 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { drop(self.inner.remove(idx)); match res { Ok(e) => { - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); return Poll::Ready(Ok((e, rest))); } Err(e) => { diff --git a/src/future/try_future/mod.rs b/src/future/try_future/mod.rs index fb3bdd8..e5bc700 100644 --- a/src/future/try_future/mod.rs +++ b/src/future/try_future/mod.rs @@ -302,6 +302,9 @@ pub trait TryFutureExt: TryFuture { /// assert_eq!(future.await, Ok(1)); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn map_err<E, F>(self, f: F) -> MapErr<Self, F> where F: FnOnce(Self::Error) -> E, @@ -332,6 +335,9 @@ pub trait TryFutureExt: TryFuture { /// let future_err_i32 = future_err_u8.err_into::<i32>(); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn err_into<E>(self) -> ErrInto<Self, E> where Self: Sized, diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs index 29244af..25fcfcb 100644 --- a/src/future/try_join_all.rs +++ b/src/future/try_join_all.rs @@ -10,14 +10,11 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; -fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { - // Safety: `std` _could_ make this unsound if it were to decide Pin's - // invariants aren't required to transmit through slices. Otherwise this has - // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) -} +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; +use crate::TryFutureExt; enum FinalState<E = ()> { Pending, @@ -31,7 +28,20 @@ pub struct TryJoinAll<F> where F: TryFuture, { - elems: Pin<Box<[TryMaybeDone<F>]>>, + kind: TryJoinAllKind<F>, +} + +enum TryJoinAllKind<F> +where + F: TryFuture, +{ + Small { + elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>, + }, } impl<F> fmt::Debug for TryJoinAll<F> @@ -39,9 +49,16 @@ where F: TryFuture + fmt::Debug, F::Ok: fmt::Debug, F::Error: fmt::Debug, + F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryJoinAll").field("elems", &self.elems).finish() + match self.kind { + TryJoinAllKind::Small { ref elems } => { + f.debug_struct("TryJoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } } } @@ -83,15 +100,37 @@ where /// assert_eq!(try_join_all(futures).await, Err(2)); /// # }); /// ``` -pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item> +pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item> where I: IntoIterator, I::Item: TryFuture, { - let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( - TryJoinAll { elems: elems.into() }, - ) + let iter = iter.into_iter().map(TryFutureExt::into_future); + + #[cfg(futures_no_atomic_cas)] + { + let kind = TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), + }; + + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { kind }, + ) + } + + #[cfg(not(futures_no_atomic_cas))] + { + let kind = match iter.size_hint().1 { + Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), + }, + _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() }, + }; + + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { kind }, + ) + } } impl<F> Future for TryJoinAll<F> @@ -101,36 +140,46 @@ where type Output = Result<Vec<F::Ok>, F::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut state = FinalState::AllDone; - - for elem in iter_pin_mut(self.elems.as_mut()) { - match elem.try_poll(cx) { - Poll::Pending => state = FinalState::Pending, - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(e)) => { - state = FinalState::Error(e); - break; + match &mut self.kind { + TryJoinAllKind::Small { elems } => { + let mut state = FinalState::AllDone; + + for elem in join_all::iter_pin_mut(elems.as_mut()) { + match elem.try_poll(cx) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => { + state = FinalState::Error(e); + break; + } + } } - } - } - match state { - FinalState::Pending => Poll::Pending, - FinalState::AllDone => { - let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let results = - iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); - Poll::Ready(Ok(results)) - } - FinalState::Error(e) => { - let _ = mem::replace(&mut self.elems, Box::pin([])); - Poll::Ready(Err(e)) + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(elems, Box::pin([])); + let results = join_all::iter_pin_mut(elems.as_mut()) + .map(|e| e.take_output().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + } + FinalState::Error(e) => { + let _ = mem::replace(elems, Box::pin([])); + Poll::Ready(Err(e)) + } + } } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } -impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> { +impl<F> FromIterator<F> for TryJoinAll<F> +where + F: TryFuture, +{ fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self { try_join_all(iter) } diff --git a/src/io/copy_buf_abortable.rs b/src/io/copy_buf_abortable.rs new file mode 100644 index 0000000..fdbc4a5 --- /dev/null +++ b/src/io/copy_buf_abortable.rs @@ -0,0 +1,124 @@ +use crate::abortable::{AbortHandle, AbortInner, Aborted}; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncWrite}; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +/// Creates a future which copies all the bytes from one object to another, with its `AbortHandle`. +/// +/// The returned future will copy all the bytes read from this `AsyncBufRead` into the +/// `writer` specified. This future will only complete once abort has been requested or the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned. If aborted, `Aborted` is returned. Otherwise, the underlying error is returned. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncWriteExt, Cursor}; +/// use futures::future::Aborted; +/// +/// let reader = Cursor::new([1, 2, 3, 4]); +/// let mut writer = Cursor::new(vec![0u8; 5]); +/// +/// let (fut, abort_handle) = io::copy_buf_abortable(reader, &mut writer); +/// let bytes = fut.await; +/// abort_handle.abort(); +/// writer.close().await.unwrap(); +/// match bytes { +/// Ok(Ok(n)) => { +/// assert_eq!(n, 4); +/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); +/// Ok(n) +/// }, +/// Ok(Err(a)) => { +/// Err::<u64, Aborted>(a) +/// } +/// Err(e) => panic!("{}", e) +/// } +/// # }).unwrap(); +/// ``` +pub fn copy_buf_abortable<R, W>( + reader: R, + writer: &mut W, +) -> (CopyBufAbortable<'_, R, W>, AbortHandle) +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, +{ + let (handle, reg) = AbortHandle::new_pair(); + (CopyBufAbortable { reader, writer, amt: 0, inner: reg.inner }, handle) +} + +pin_project! { + /// Future for the [`copy_buf()`] function. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBufAbortable<'a, R, W: ?Sized> { + #[pin] + reader: R, + writer: &'a mut W, + amt: u64, + inner: Arc<AbortInner> + } +} + +macro_rules! ready_or_break { + ($e:expr $(,)?) => { + match $e { + $crate::task::Poll::Ready(t) => t, + $crate::task::Poll::Pending => break, + } + }; +} + +impl<R, W> Future for CopyBufAbortable<'_, R, W> +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + Sized, +{ + type Output = Result<Result<u64, Aborted>, io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + loop { + // Check if the task has been aborted + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + + // Read some bytes from the reader, and if we have reached EOF, return total bytes read + let buffer = ready_or_break!(this.reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + ready_or_break!(Pin::new(&mut this.writer).poll_flush(cx))?; + return Poll::Ready(Ok(Ok(*this.amt))); + } + + // Pass the buffer to the writer, and update the amount written + let i = ready_or_break!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + *this.amt += i as u64; + this.reader.as_mut().consume(i); + } + // Schedule the task to be woken up again. + // Never called unless Poll::Pending is returned from io objects. + this.inner.waker.register(cx.waker()); + + // Check to see if the task was aborted between the first check and + // registration. + // Checking with `Relaxed` is sufficient because + // `register` introduces an `AcqRel` barrier. + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + Poll::Pending + } +} diff --git a/src/io/lines.rs b/src/io/lines.rs index 13e70df..b5561bf 100644 --- a/src/io/lines.rs +++ b/src/io/lines.rs @@ -42,6 +42,6 @@ impl<R: AsyncBufRead> Stream for Lines<R> { this.buf.pop(); } } - Poll::Ready(Some(Ok(mem::replace(this.buf, String::new())))) + Poll::Ready(Some(Ok(mem::take(this.buf)))) } } diff --git a/src/io/mod.rs b/src/io/mod.rs index 4dd2e02..8ce3ad6 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -66,6 +66,9 @@ pub use self::copy::{copy, Copy}; mod copy_buf; pub use self::copy_buf::{copy_buf, CopyBuf}; +mod copy_buf_abortable; +pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable}; + mod cursor; pub use self::cursor::Cursor; diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs index 02e38c3..cd0b20e 100644 --- a/src/io/read_exact.rs +++ b/src/io/read_exact.rs @@ -30,7 +30,7 @@ impl<R: AsyncRead + ?Sized + Unpin> Future for ReadExact<'_, R> { while !this.buf.is_empty() { let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?; { - let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n); + let (_, rest) = mem::take(&mut this.buf).split_at_mut(n); this.buf = rest; } if n == 0 { diff --git a/src/io/read_line.rs b/src/io/read_line.rs index c75af94..e1b8fc9 100644 --- a/src/io/read_line.rs +++ b/src/io/read_line.rs @@ -22,7 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {} impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { - Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, read: 0 } + Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0 } } } diff --git a/src/io/read_to_string.rs b/src/io/read_to_string.rs index 457af59..c175396 100644 --- a/src/io/read_to_string.rs +++ b/src/io/read_to_string.rs @@ -22,7 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { let start_len = buf.len(); - Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, start_len } + Self { reader, bytes: mem::take(buf).into_bytes(), buf, start_len } } } diff --git a/src/io/write_all.rs b/src/io/write_all.rs index b134bf1..08c025f 100644 --- a/src/io/write_all.rs +++ b/src/io/write_all.rs @@ -30,7 +30,7 @@ impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> { while !this.buf.is_empty() { let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?; { - let (_, rest) = mem::replace(&mut this.buf, &[]).split_at(n); + let (_, rest) = mem::take(&mut this.buf).split_at(n); this.buf = rest; } if n == 0 { diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs index 2f51ae7..2174079 100644 --- a/src/lock/bilock.rs +++ b/src/lock/bilock.rs @@ -224,6 +224,9 @@ pub struct BiLockGuard<'a, T> { bilock: &'a BiLock<T>, } +// We allow parallel access to T via Deref, so Sync bound is also needed here. +unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {} + impl<T> Deref for BiLockGuard<'_, T> { type Target = T; fn deref(&self) -> &T { diff --git a/src/lock/mod.rs b/src/lock/mod.rs index cf374c0..0be7271 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -4,11 +4,18 @@ //! library is activated, and it is activated by default. #[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "std")] -mod mutex; +#[cfg(any(feature = "sink", feature = "io"))] +#[cfg(not(feature = "bilock"))] +pub(crate) use self::bilock::BiLock; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "bilock")] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] +pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "std")] -pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; +pub use self::mutex::{ + MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture, +}; #[cfg(not(futures_no_atomic_cas))] #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] @@ -16,10 +23,5 @@ pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] mod bilock; #[cfg(not(futures_no_atomic_cas))] -#[cfg(any(feature = "sink", feature = "io"))] -#[cfg(not(feature = "bilock"))] -pub(crate) use self::bilock::BiLock; -#[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "bilock")] -#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] -pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; +#[cfg(feature = "std")] +mod mutex; diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs index 85dcb15..335ad14 100644 --- a/src/lock/mutex.rs +++ b/src/lock/mutex.rs @@ -1,14 +1,16 @@ -use futures_core::future::{FusedFuture, Future}; -use futures_core::task::{Context, Poll, Waker}; -use slab::Slab; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Mutex as StdMutex; +use std::sync::{Arc, Mutex as StdMutex}; use std::{fmt, mem}; +use slab::Slab; + +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll, Waker}; + /// A futures-aware mutex. /// /// # Fairness @@ -107,6 +109,18 @@ impl<T: ?Sized> Mutex<T> { } } + /// Attempt to acquire the lock immediately. + /// + /// If the lock is currently held, this will return `None`. + pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> { + let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire); + if (old_state & IS_LOCKED) == 0 { + Some(OwnedMutexGuard { mutex: self.clone() }) + } else { + None + } + } + /// Acquire the lock asynchronously. /// /// This method returns a future that will resolve once the lock has been @@ -115,6 +129,14 @@ impl<T: ?Sized> Mutex<T> { MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } } + /// Acquire the lock asynchronously. + /// + /// This method returns a future that will resolve once the lock has been + /// successfully acquired. + pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> { + OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } + } + /// Returns a mutable reference to the underlying data. /// /// Since this call borrows the `Mutex` mutably, no actual locking needs to @@ -173,7 +195,118 @@ impl<T: ?Sized> Mutex<T> { } // Sentinel for when no slot in the `Slab` has been dedicated to this object. -const WAIT_KEY_NONE: usize = usize::max_value(); +const WAIT_KEY_NONE: usize = usize::MAX; + +/// A future which resolves when the target mutex has been successfully acquired, owned version. +pub struct OwnedMutexLockFuture<T: ?Sized> { + // `None` indicates that the mutex was successfully acquired. + mutex: Option<Arc<Mutex<T>>>, + wait_key: usize, +} + +impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexLockFuture") + .field("was_acquired", &self.mutex.is_none()) + .field("mutex", &self.mutex) + .field( + "wait_key", + &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), + ) + .finish() + } +} + +impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> { + fn is_terminated(&self) -> bool { + self.mutex.is_none() + } +} + +impl<T: ?Sized> Future for OwnedMutexLockFuture<T> { + type Output = OwnedMutexGuard<T>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = self.get_mut(); + + let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion"); + + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + { + let mut waiters = mutex.waiters.lock().unwrap(); + if this.wait_key == WAIT_KEY_NONE { + this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone())); + if waiters.len() == 1 { + mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock + } + } else { + waiters[this.wait_key].register(cx.waker()); + } + } + + // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by + // attempting to acquire the lock again. + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + Poll::Pending + } +} + +impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> { + fn drop(&mut self) { + if let Some(mutex) = self.mutex.as_ref() { + // This future was dropped before it acquired the mutex. + // + // Remove ourselves from the map, waking up another waiter if we + // had been awoken to acquire the lock. + mutex.remove_waker(self.wait_key, true); + } + } +} + +/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods. +/// When this structure is dropped (falls out of scope), the lock will be +/// unlocked. +pub struct OwnedMutexGuard<T: ?Sized> { + mutex: Arc<Mutex<T>>, +} + +impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexGuard") + .field("value", &&**self) + .field("mutex", &self.mutex) + .finish() + } +} + +impl<T: ?Sized> Drop for OwnedMutexGuard<T> { + fn drop(&mut self) { + self.mutex.unlock() + } +} + +impl<T: ?Sized> Deref for OwnedMutexGuard<T> { + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.mutex.value.get() } + } +} + +impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.mutex.value.get() } + } +} /// A future which resolves when the target mutex has been successfully acquired. pub struct MutexLockFuture<'a, T: ?Sized> { @@ -386,13 +519,25 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {} // It's safe to switch which thread the acquire is being attempted on so long as // `T` can be accessed on that thread. unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {} + // doesn't have any interesting `&self` methods (only Debug) unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {} +// It's safe to switch which thread the acquire is being attempted on so long as +// `T` can be accessed on that thread. +unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {} + +// doesn't have any interesting `&self` methods (only Debug) +unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {} + // Safe to send since we don't track any thread-specific details-- the inner // lock is essentially spinlock-equivalent (attempt to flip an atomic bool) unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {} unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {} + +unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {} +unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {} + unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {} unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {} diff --git a/src/sink/drain.rs b/src/sink/drain.rs index 5295115..1a5480c 100644 --- a/src/sink/drain.rs +++ b/src/sink/drain.rs @@ -32,6 +32,12 @@ pub fn drain<T>() -> Drain<T> { impl<T> Unpin for Drain<T> {} +impl<T> Clone for Drain<T> { + fn clone(&self) -> Self { + drain() + } +} + impl<T> Sink<T> for Drain<T> { type Error = Never; diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index f596b3b..f1c93fd 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs @@ -135,11 +135,39 @@ impl<Fut: Future> FuturesOrdered<Fut> { /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. + #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { + self.push_back(future); + } + + /// Pushes a future to the back of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. + pub fn push_back(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } + + /// Pushes a future to the front of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. This future will be the next future to be returned + /// complete. + pub fn push_front(&mut self, future: Fut) { + if self.next_outgoing_index == 0 { + self.push_back(future) + } else { + let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; + self.next_outgoing_index -= 1; + self.in_progress_queue.push(wrapped); + } + } } impl<Fut: Future> Default for FuturesOrdered<Fut> { @@ -196,7 +224,7 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { { let acc = Self::new(); iter.into_iter().fold(acc, |mut acc, item| { - acc.push(item); + acc.push_back(item); acc }) } @@ -214,7 +242,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { I: IntoIterator<Item = Fut>, { for item in iter { - self.push(item); + self.push_back(item); } } } diff --git a/src/stream/futures_unordered/iter.rs b/src/stream/futures_unordered/iter.rs index 04db5ee..20248c7 100644 --- a/src/stream/futures_unordered/iter.rs +++ b/src/stream/futures_unordered/iter.rs @@ -2,6 +2,7 @@ use super::task::Task; use super::FuturesUnordered; use core::marker::PhantomData; use core::pin::Pin; +use core::ptr; use core::sync::atomic::Ordering::Relaxed; /// Mutable iterator over all futures in the unordered set. @@ -58,6 +59,9 @@ impl<Fut: Unpin> Iterator for IntoIter<Fut> { // valid `next_all` checks can be skipped. let next = (**task).next_all.load(Relaxed); *task = next; + if !task.is_null() { + *(**task).prev_all.get() = ptr::null_mut(); + } self.len -= 1; Some(future) } diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index fdbd53d..5e995fd 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -22,6 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/102352 pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; diff --git a/src/stream/select_with_strategy.rs b/src/stream/select_with_strategy.rs index bd86990..224d5f8 100644 --- a/src/stream/select_with_strategy.rs +++ b/src/stream/select_with_strategy.rs @@ -1,5 +1,4 @@ use super::assert_stream; -use crate::stream::{Fuse, StreamExt}; use core::{fmt, pin::Pin}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -18,13 +17,15 @@ impl PollNext { /// Toggle the value and return the old one. pub fn toggle(&mut self) -> Self { let old = *self; + *self = self.other(); + old + } + fn other(&self) -> PollNext { match self { - PollNext::Left => *self = PollNext::Right, - PollNext::Right => *self = PollNext::Left, + PollNext::Left => PollNext::Right, + PollNext::Right => PollNext::Left, } - - old } } @@ -34,14 +35,41 @@ impl Default for PollNext { } } +enum InternalState { + Start, + LeftFinished, + RightFinished, + BothFinished, +} + +impl InternalState { + fn finish(&mut self, ps: PollNext) { + match (&self, ps) { + (InternalState::Start, PollNext::Left) => { + *self = InternalState::LeftFinished; + } + (InternalState::Start, PollNext::Right) => { + *self = InternalState::RightFinished; + } + (InternalState::LeftFinished, PollNext::Right) + | (InternalState::RightFinished, PollNext::Left) => { + *self = InternalState::BothFinished; + } + _ => {} + } + } +} + pin_project! { /// Stream for the [`select_with_strategy()`] function. See function docs for details. #[must_use = "streams do nothing unless polled"] + #[project = SelectWithStrategyProj] pub struct SelectWithStrategy<St1, St2, Clos, State> { #[pin] - stream1: Fuse<St1>, + stream1: St1, #[pin] - stream2: Fuse<St2>, + stream2: St2, + internal_state: InternalState, state: State, clos: Clos, } @@ -120,9 +148,10 @@ where State: Default, { assert_stream::<St1::Item, _>(SelectWithStrategy { - stream1: stream1.fuse(), - stream2: stream2.fuse(), + stream1, + stream2, state: Default::default(), + internal_state: InternalState::Start, clos: which, }) } @@ -131,7 +160,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// Acquires a reference to the underlying streams that this combinator is /// pulling from. pub fn get_ref(&self) -> (&St1, &St2) { - (self.stream1.get_ref(), self.stream2.get_ref()) + (&self.stream1, &self.stream2) } /// Acquires a mutable reference to the underlying streams that this @@ -140,7 +169,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { - (self.stream1.get_mut(), self.stream2.get_mut()) + (&mut self.stream1, &mut self.stream2) } /// Acquires a pinned mutable reference to the underlying streams that this @@ -150,7 +179,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { let this = self.project(); - (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) + (this.stream1, this.stream2) } /// Consumes this combinator, returning the underlying streams. @@ -158,7 +187,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> (St1, St2) { - (self.stream1.into_inner(), self.stream2.into_inner()) + (self.stream1, self.stream2) } } @@ -169,47 +198,93 @@ where Clos: FnMut(&mut State) -> PollNext, { fn is_terminated(&self) -> bool { - self.stream1.is_terminated() && self.stream2.is_terminated() + match self.internal_state { + InternalState::BothFinished => true, + _ => false, + } } } -impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State> +#[inline] +fn poll_side<St1, St2, Clos, State>( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, + cx: &mut Context<'_>, +) -> Poll<Option<St1::Item>> where St1: Stream, St2: Stream<Item = St1::Item>, - Clos: FnMut(&mut State) -> PollNext, { - type Item = St1::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { - let this = self.project(); - - match (this.clos)(this.state) { - PollNext::Left => poll_inner(this.stream1, this.stream2, cx), - PollNext::Right => poll_inner(this.stream2, this.stream1, cx), - } + match side { + PollNext::Left => select.stream1.as_mut().poll_next(cx), + PollNext::Right => select.stream2.as_mut().poll_next(cx), } } -fn poll_inner<St1, St2>( - a: Pin<&mut St1>, - b: Pin<&mut St2>, +#[inline] +fn poll_inner<St1, St2, Clos, State>( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, cx: &mut Context<'_>, ) -> Poll<Option<St1::Item>> where St1: Stream, St2: Stream<Item = St1::Item>, { - let a_done = match a.poll_next(cx) { + let first_done = match poll_side(select, side, cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => true, + Poll::Ready(None) => { + select.internal_state.finish(side); + true + } Poll::Pending => false, }; + let other = side.other(); + match poll_side(select, other, cx) { + Poll::Ready(None) => { + select.internal_state.finish(other); + if first_done { + Poll::Ready(None) + } else { + Poll::Pending + } + } + a => a, + } +} + +impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State> +where + St1: Stream, + St2: Stream<Item = St1::Item>, + Clos: FnMut(&mut State) -> PollNext, +{ + type Item = St1::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { + let mut this = self.project(); - match b.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) if a_done => Poll::Ready(None), - Poll::Ready(None) | Poll::Pending => Poll::Pending, + match this.internal_state { + InternalState::Start => { + let next_side = (this.clos)(this.state); + poll_inner(&mut this, next_side, cx) + } + InternalState::LeftFinished => match this.stream2.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::RightFinished => match this.stream1.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::BothFinished => Poll::Ready(None), + } } } diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs index d64c142..91b0f6b 100644 --- a/src/stream/stream/buffer_unordered.rs +++ b/src/stream/stream/buffer_unordered.rs @@ -41,11 +41,7 @@ where St: Stream, St::Item: Future, { - pub(super) fn new(stream: St, n: usize) -> Self - where - St: Stream, - St::Item: Future, - { + pub(super) fn new(stream: St, n: usize) -> Self { Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesUnordered::new(), diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs index 6052a73..8ca0391 100644 --- a/src/stream/stream/buffered.rs +++ b/src/stream/stream/buffered.rs @@ -64,7 +64,7 @@ where // our queue of futures. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs index 8457869..2a71ebc 100644 --- a/src/stream/stream/chunks.rs +++ b/src/stream/stream/chunks.rs @@ -21,10 +21,7 @@ pin_project! { } } -impl<St: Stream> Chunks<St> -where - St: Stream, -{ +impl<St: Stream> Chunks<St> { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); @@ -66,7 +63,7 @@ impl<St: Stream> Stream for Chunks<St> { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -77,9 +74,9 @@ impl<St: Stream> Stream for Chunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let chunk_len = usize::from(!self.items.is_empty()); let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs index b0e81b9..970ac26 100644 --- a/src/stream/stream/collect.rs +++ b/src/stream/stream/collect.rs @@ -19,7 +19,7 @@ pin_project! { impl<St: Stream, C: Default> Collect<St, C> { fn finish(self: Pin<&mut Self>) -> C { - mem::replace(self.project().collection, Default::default()) + mem::take(self.project().collection) } pub(super) fn new(stream: St) -> Self { diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index ccf1a51..997fe99 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -93,7 +93,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index 02a0a43..6b7d007 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -87,7 +87,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 642b91e..a823fab 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1674,6 +1674,8 @@ pub trait StreamExt: Stream { /// assert_eq!(total, 6); /// # }); /// ``` + /// + /// [`select!`]: crate::select fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream, diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs index c72dfc3..ea3d624 100644 --- a/src/stream/stream/peek.rs +++ b/src/stream/stream/peek.rs @@ -204,7 +204,7 @@ impl<S: Stream> Stream for Peekable<S> { } fn size_hint(&self) -> (usize, Option<usize>) { - let peek_len = if self.peeked.is_some() { 1 } else { 0 }; + let peek_len = usize::from(self.peeked.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(peek_len); let upper = match upper { diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs index 5ebc958..49116d4 100644 --- a/src/stream/stream/ready_chunks.rs +++ b/src/stream/stream/ready_chunks.rs @@ -20,10 +20,7 @@ pin_project! { } } -impl<St: Stream> ReadyChunks<St> -where - St: Stream, -{ +impl<St: Stream> ReadyChunks<St> { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); @@ -74,7 +71,7 @@ impl<St: Stream> Stream for ReadyChunks<St> { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -85,9 +82,9 @@ impl<St: Stream> Stream for ReadyChunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let chunk_len = usize::from(!self.items.is_empty()); let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 50a21a2..dabd5ee 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -99,7 +99,7 @@ where if self.done_skipping { self.stream.size_hint() } else { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs index 3a72fee..e2034e0 100644 --- a/src/stream/stream/split.rs +++ b/src/stream/stream/split.rs @@ -35,7 +35,7 @@ impl<S: Stream> Stream for SplitStream<S> { } } -#[allow(bad_style)] +#[allow(non_snake_case)] fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> { SplitSink { lock, slot: None } } diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index b1c728e..29d6c39 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -54,11 +54,11 @@ where let (lower, upper) = self.stream.size_hint(); - let lower = cmp::min(lower, self.remaining as usize); + let lower = cmp::min(lower, self.remaining); let upper = match upper { - Some(x) if x < self.remaining as usize => Some(x), - _ => Some(self.remaining as usize), + Some(x) if x < self.remaining => Some(x), + _ => Some(self.remaining), }; (lower, upper) diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index 01b2765..9256943 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -91,7 +91,7 @@ where return (0, Some(0)); } - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs index d4531d4..9192c0b 100644 --- a/src/stream/stream/then.rs +++ b/src/stream/stream/then.rs @@ -78,7 +78,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); let upper = match upper { diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs index 15f22e8..a88cf03 100644 --- a/src/stream/stream/unzip.rs +++ b/src/stream/stream/unzip.rs @@ -21,7 +21,7 @@ pin_project! { impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> { fn finish(self: Pin<&mut Self>) -> (FromA, FromB) { let this = self.project(); - (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default())) + (mem::take(this.left), mem::take(this.right)) } pub(super) fn new(stream: St) -> Self { diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 360a8b6..25a47e9 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -102,8 +102,8 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let queued1_len = if self.queued1.is_some() { 1 } else { 0 }; - let queued2_len = if self.queued2.is_some() { 1 } else { 0 }; + let queued1_len = usize::from(self.queued1.is_some()); + let queued2_len = usize::from(self.queued2.is_some()); let (stream1_lower, stream1_upper) = self.stream1.size_hint(); let (stream2_lower, stream2_upper) = self.stream2.size_hint(); diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs index a7b50db..2f8b6f2 100644 --- a/src/stream/try_stream/and_then.rs +++ b/src/stream/try_stream/and_then.rs @@ -71,7 +71,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); let upper = match upper { diff --git a/src/stream/try_stream/into_async_read.rs b/src/stream/try_stream/into_async_read.rs index 914b277..ffbfc7e 100644 --- a/src/stream/try_stream/into_async_read.rs +++ b/src/stream/try_stream/into_async_read.rs @@ -1,30 +1,26 @@ -use crate::stream::TryStreamExt; use core::pin::Pin; use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use pin_project_lite::pin_project; use std::cmp; use std::io::{Error, Result}; -/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. -#[derive(Debug)] -#[must_use = "readers do nothing unless polled"] -#[cfg_attr(docsrs, doc(cfg(feature = "io")))] -pub struct IntoAsyncRead<St> -where - St: TryStream<Error = Error> + Unpin, - St::Ok: AsRef<[u8]>, -{ - stream: St, - state: ReadState<St::Ok>, -} - -impl<St> Unpin for IntoAsyncRead<St> -where - St: TryStream<Error = Error> + Unpin, - St::Ok: AsRef<[u8]>, -{ +pin_project! { + /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. + #[derive(Debug)] + #[must_use = "readers do nothing unless polled"] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] + pub struct IntoAsyncRead<St> + where + St: TryStream<Error = Error>, + St::Ok: AsRef<[u8]>, + { + #[pin] + stream: St, + state: ReadState<St::Ok>, + } } #[derive(Debug)] @@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> { impl<St> IntoAsyncRead<St> where - St: TryStream<Error = Error> + Unpin, + St: TryStream<Error = Error>, St::Ok: AsRef<[u8]>, { pub(super) fn new(stream: St) -> Self { @@ -46,16 +42,18 @@ where impl<St> AsyncRead for IntoAsyncRead<St> where - St: TryStream<Error = Error> + Unpin, + St: TryStream<Error = Error>, St::Ok: AsRef<[u8]>, { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>> { + let mut this = self.project(); + loop { - match &mut self.state { + match this.state { ReadState::Ready { chunk, chunk_start } => { let chunk = chunk.as_ref(); let len = cmp::min(buf.len(), chunk.len() - *chunk_start); @@ -64,23 +62,23 @@ where *chunk_start += len; if chunk.len() == *chunk_start { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } return Poll::Ready(Ok(len)); } - ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) { + ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(0)); } }, @@ -94,51 +92,52 @@ where impl<St> AsyncWrite for IntoAsyncRead<St> where - St: TryStream<Error = Error> + AsyncWrite + Unpin, + St: TryStream<Error = Error> + AsyncWrite, St::Ok: AsRef<[u8]>, { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<Result<usize>> { - Pin::new(&mut self.stream).poll_write(cx, buf) + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { + let this = self.project(); + this.stream.poll_write(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - Pin::new(&mut self.stream).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + let this = self.project(); + this.stream.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - Pin::new(&mut self.stream).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + let this = self.project(); + this.stream.poll_close(cx) } } impl<St> AsyncBufRead for IntoAsyncRead<St> where - St: TryStream<Error = Error> + Unpin, + St: TryStream<Error = Error>, St::Ok: AsRef<[u8]>, { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { - while let ReadState::PendingChunk = self.state { - match ready!(self.stream.try_poll_next_unpin(cx)) { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { + let mut this = self.project(); + + while let ReadState::PendingChunk = this.state { + match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(&[])); } } } - if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state { + if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state { let chunk = chunk.as_ref(); return Poll::Ready(Ok(&chunk[chunk_start..])); } @@ -147,16 +146,18 @@ where Poll::Ready(Ok(&[])) } - fn consume(mut self: Pin<&mut Self>, amount: usize) { + fn consume(self: Pin<&mut Self>, amount: usize) { + let this = self.project(); + // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 if amount == 0 { return; } - if let ReadState::Ready { chunk, chunk_start } = &mut self.state { + if let ReadState::Ready { chunk, chunk_start } = this.state { *chunk_start += amount; debug_assert!(*chunk_start <= chunk.as_ref().len()); if *chunk_start >= chunk.as_ref().len() { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } } else { debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 6bf2cb7..bc4c6e4 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -918,7 +918,7 @@ pub trait TryStreamExt: TryStream { /// that matches the stream's `Error` type. /// /// This adaptor will buffer up to `n` futures and then return their - /// outputs in the order. If the underlying stream returns an error, it will + /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will /// be immediately propagated. /// /// The returned stream will be a stream of results, each containing either @@ -1031,12 +1031,7 @@ pub trait TryStreamExt: TryStream { Compat::new(self) } - /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead). - /// - /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be - /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll - /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`] - /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate. + /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). /// /// This method is only available when the `std` feature of this /// library is activated, and it is activated by default. @@ -1048,12 +1043,12 @@ pub trait TryStreamExt: TryStream { /// use futures::stream::{self, TryStreamExt}; /// use futures::io::AsyncReadExt; /// - /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]); + /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); /// let mut reader = stream.into_async_read(); - /// let mut buf = Vec::new(); /// - /// assert!(reader.read_to_end(&mut buf).await.is_ok()); - /// assert_eq!(buf, &[1, 2, 3, 4, 5]); + /// let mut buf = Vec::new(); + /// reader.read_to_end(&mut buf).await.unwrap(); + /// assert_eq!(buf, [1, 2, 3, 4, 5]); /// # }) /// ``` #[cfg(feature = "io")] @@ -1061,7 +1056,7 @@ pub trait TryStreamExt: TryStream { #[cfg(feature = "std")] fn into_async_read(self) -> IntoAsyncRead<Self> where - Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin, + Self: Sized + TryStreamExt<Error = std::io::Error>, Self::Ok: AsRef<[u8]>, { crate::io::assert_read(IntoAsyncRead::new(self)) diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs index cb69e81..53aceb8 100644 --- a/src/stream/try_stream/or_else.rs +++ b/src/stream/try_stream/or_else.rs @@ -75,7 +75,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); let upper = match upper { diff --git a/src/stream/try_stream/try_buffered.rs b/src/stream/try_stream/try_buffered.rs index 45bd3f8..9f48e5c 100644 --- a/src/stream/try_stream/try_buffered.rs +++ b/src/stream/try_stream/try_buffered.rs @@ -54,7 +54,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/src/stream/try_stream/try_chunks.rs b/src/stream/try_stream/try_chunks.rs index 07d4425..3bb253a 100644 --- a/src/stream/try_stream/try_chunks.rs +++ b/src/stream/try_stream/try_chunks.rs @@ -70,7 +70,7 @@ impl<St: TryStream> Stream for TryChunks<St> { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -81,9 +81,9 @@ impl<St: TryStream> Stream for TryChunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let chunk_len = usize::from(!self.items.is_empty()); let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs index 5d3b3d7..3e5963f 100644 --- a/src/stream/try_stream/try_collect.rs +++ b/src/stream/try_stream/try_collect.rs @@ -45,7 +45,7 @@ where Poll::Ready(Ok(loop { match ready!(this.stream.as_mut().try_poll_next(cx)?) { Some(x) => this.items.extend(Some(x)), - None => break mem::replace(this.items, Default::default()), + None => break mem::take(this.items), } })) } diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs index 61e6105..11d5824 100644 --- a/src/stream/try_stream/try_filter.rs +++ b/src/stream/try_stream/try_filter.rs @@ -90,7 +90,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_fut.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_fut.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs index bb1b5b9..ed12017 100644 --- a/src/stream/try_stream/try_filter_map.rs +++ b/src/stream/try_stream/try_filter_map.rs @@ -84,7 +84,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs index a424b6c..52aa2d4 100644 --- a/src/stream/try_stream/try_skip_while.rs +++ b/src/stream/try_stream/try_skip_while.rs @@ -87,7 +87,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/src/stream/try_stream/try_take_while.rs b/src/stream/try_stream/try_take_while.rs index 3375960..4b5ff1a 100644 --- a/src/stream/try_stream/try_take_while.rs +++ b/src/stream/try_stream/try_take_while.rs @@ -96,7 +96,7 @@ where return (0, Some(0)); } - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/src/task/spawn.rs b/src/task/spawn.rs index 87ca360..d9e9985 100644 --- a/src/task/spawn.rs +++ b/src/task/spawn.rs @@ -34,7 +34,7 @@ pub trait SpawnExt: Spawn { /// today. Feel free to use this method in the meantime. /// /// ``` - /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 + /// # { /// use futures::executor::ThreadPool; /// use futures::task::SpawnExt; /// @@ -42,6 +42,8 @@ pub trait SpawnExt: Spawn { /// /// let future = async { /* ... */ }; /// executor.spawn(future).unwrap(); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` #[cfg(feature = "alloc")] fn spawn<Fut>(&self, future: Fut) -> Result<(), SpawnError> @@ -59,7 +61,7 @@ pub trait SpawnExt: Spawn { /// resolves to the output of the spawned future. /// /// ``` - /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 + /// # { /// use futures::executor::{block_on, ThreadPool}; /// use futures::future; /// use futures::task::SpawnExt; @@ -69,6 +71,8 @@ pub trait SpawnExt: Spawn { /// let future = future::ready(1); /// let join_handle_fut = executor.spawn_with_handle(future).unwrap(); /// assert_eq!(block_on(join_handle_fut), 1); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` #[cfg(feature = "channel")] #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] @@ -138,7 +142,6 @@ pub trait LocalSpawnExt: LocalSpawn { /// resolves to the output of the spawned future. /// /// ``` - /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 /// use futures::executor::LocalPool; /// use futures::task::LocalSpawnExt; /// |