diff options
author | Joel Galenson <jgalenson@google.com> | 2021-09-22 10:59:23 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-09-22 10:59:23 -0700 |
commit | dc20fc8367e78e4b3ac62def28f0bf1082da81d7 (patch) | |
tree | 630ac4f8f0e947984e7ee872e9d0ee74cf0f047e | |
parent | 541ee666ea26ab070f0581fa9e3124b8f384c8e1 (diff) | |
download | futures-util-dc20fc8367e78e4b3ac62def28f0bf1082da81d7.tar.gz |
Upgrade rust/crates/futures-util to 0.3.17
Test: make
Change-Id: I8ab43c55906b0f8e81fd92f62b3814b60202de59
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 64 | ||||
-rw-r--r-- | Cargo.toml | 14 | ||||
-rw-r--r-- | Cargo.toml.orig | 14 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | src/async_await/mod.rs | 7 | ||||
-rw-r--r-- | src/async_await/stream_select_mod.rs | 45 | ||||
-rw-r--r-- | src/future/join_all.rs | 95 | ||||
-rw-r--r-- | src/future/mod.rs | 3 | ||||
-rw-r--r-- | src/future/option.rs | 6 | ||||
-rw-r--r-- | src/future/poll_immediate.rs | 126 | ||||
-rw-r--r-- | src/io/buf_reader.rs | 69 | ||||
-rw-r--r-- | src/io/mod.rs | 2 | ||||
-rw-r--r-- | src/lib.rs | 8 | ||||
-rw-r--r-- | src/stream/mod.rs | 7 | ||||
-rw-r--r-- | src/stream/poll_immediate.rs | 80 | ||||
-rw-r--r-- | src/stream/stream/mod.rs | 2 | ||||
-rw-r--r-- | src/stream/stream/peek.rs | 92 |
18 files changed, 534 insertions, 110 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 99dc8b0..ffd4f55 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "ab38fd29d3f84f8fc028fa7883e53dba423da0ee" + "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010" } } @@ -41,6 +41,8 @@ rust_defaults { name: "futures-util_test_defaults", crate_name: "futures_util", srcs: ["src/lib.rs"], + cargo_env_compat: true, + cargo_pkg_version: "0.3.17", test_suites: ["general-tests"], auto_gen_config: true, edition: "2018", @@ -99,6 +101,8 @@ rust_library { name: "libfutures_util", host_supported: true, crate_name: "futures_util", + cargo_env_compat: true, + cargo_pkg_version: "0.3.17", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -143,63 +147,3 @@ rust_library { ], min_sdk_version: "29", } - -// dependent_library ["feature_list"] -// autocfg-1.0.1 -// byteorder-1.4.3 "default,std" -// bytes-0.4.12 -// cfg-if-0.1.10 -// cfg-if-1.0.0 -// crossbeam-deque-0.7.4 -// crossbeam-epoch-0.8.2 "default,lazy_static,std" -// crossbeam-queue-0.2.3 "default,std" -// crossbeam-utils-0.7.2 "default,lazy_static,std" -// fnv-1.0.7 "default,std" -// futures-0.1.31 "default,use_std,with-deprecated" -// futures-channel-0.3.16 "alloc,std" -// futures-core-0.3.16 "alloc,std" -// futures-io-0.3.16 "std" -// futures-macro-0.3.16 -// futures-sink-0.3.16 -// futures-task-0.3.16 "alloc,std" -// iovec-0.1.4 -// lazy_static-1.4.0 -// libc-0.2.98 "default,std" -// lock_api-0.3.4 -// log-0.4.14 -// maybe-uninit-2.0.0 -// memchr-2.4.0 "default,std" -// memoffset-0.5.6 "default" -// mio-0.6.23 "default,with-deprecated" -// mio-uds-0.6.8 -// net2-0.2.37 "default,duration" -// num_cpus-1.13.0 -// parking_lot-0.9.0 "default" -// parking_lot_core-0.6.2 -// pin-project-lite-0.2.7 -// pin-utils-0.1.0 -// proc-macro-hack-0.5.19 -// proc-macro-nested-0.1.7 -// proc-macro2-1.0.28 "default,proc-macro" -// quote-1.0.9 "default,proc-macro" -// rustc_version-0.2.3 -// scopeguard-1.1.0 -// semver-0.9.0 "default" -// semver-parser-0.7.0 -// slab-0.4.4 "default,std" -// smallvec-0.6.14 "default,std" -// syn-1.0.74 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote" -// tokio-0.1.22 "bytes,codec,default,fs,io,mio,num_cpus,reactor,rt-full,sync,tcp,timer,tokio-codec,tokio-current-thread,tokio-executor,tokio-fs,tokio-io,tokio-reactor,tokio-sync,tokio-tcp,tokio-threadpool,tokio-timer,tokio-udp,tokio-uds,udp,uds" -// tokio-codec-0.1.2 -// tokio-current-thread-0.1.7 -// tokio-executor-0.1.10 -// tokio-fs-0.1.7 -// tokio-io-0.1.13 -// tokio-reactor-0.1.12 -// tokio-sync-0.1.8 -// tokio-tcp-0.1.4 -// tokio-threadpool-0.1.18 -// tokio-timer-0.2.13 -// tokio-udp-0.1.6 -// tokio-uds-0.2.7 -// unicode-xid-0.2.2 "default" @@ -12,7 +12,7 @@ [package] edition = "2018" name = "futures-util" -version = "0.3.16" +version = "0.3.17" authors = ["Alex Crichton <alex@alexcrichton.com>"] description = "Common utilities and extension traits for the futures-rs library.\n" homepage = "https://rust-lang.github.io/futures-rs" @@ -23,33 +23,33 @@ repository = "https://github.com/rust-lang/futures-rs" all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies.futures-channel] -version = "0.3.16" +version = "0.3.17" features = ["std"] optional = true default-features = false [dependencies.futures-core] -version = "0.3.16" +version = "0.3.17" default-features = false [dependencies.futures-io] -version = "0.3.16" +version = "0.3.17" features = ["std"] optional = true default-features = false [dependencies.futures-macro] -version = "=0.3.16" +version = "=0.3.17" optional = true default-features = false [dependencies.futures-sink] -version = "0.3.16" +version = "0.3.17" optional = true default-features = false [dependencies.futures-task] -version = "0.3.16" +version = "0.3.17" default-features = false [dependencies.futures_01] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 98cace6..a8e9362 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,7 +1,7 @@ [package] name = "futures-util" edition = "2018" -version = "0.3.16" +version = "0.3.17" authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" @@ -39,12 +39,12 @@ cfg-target-has-atomic = [] autocfg = "1" [dependencies] -futures-core = { path = "../futures-core", version = "0.3.16", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.16", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.16", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.16", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.16", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.16", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.17", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.17", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.17", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.17", default-features = false, optional = true } proc-macro-hack = { version = "0.5.19", optional = true } proc-macro-nested = { version = "0.1.2", optional = true } slab = { version = "0.4.2", optional = true } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-util/futures-util-0.3.16.crate" + value: "https://static.crates.io/crates/futures-util/futures-util-0.3.17.crate" } - version: "0.3.16" + version: "0.3.17" license_type: NOTICE last_upgrade_date { year: 2021 - month: 8 - day: 9 + month: 9 + day: 22 } } diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs index 5f5d4ac..7276da2 100644 --- a/src/async_await/mod.rs +++ b/src/async_await/mod.rs @@ -30,6 +30,13 @@ mod select_mod; #[cfg(feature = "async-await-macro")] pub use self::select_mod::*; +// Primary export is a macro +#[cfg(feature = "async-await-macro")] +mod stream_select_mod; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762 +#[cfg(feature = "async-await-macro")] +pub use self::stream_select_mod::*; + #[cfg(feature = "std")] #[cfg(feature = "async-await-macro")] mod random; diff --git a/src/async_await/stream_select_mod.rs b/src/async_await/stream_select_mod.rs new file mode 100644 index 0000000..7743406 --- /dev/null +++ b/src/async_await/stream_select_mod.rs @@ -0,0 +1,45 @@ +//! The `stream_select` macro. + +#[cfg(feature = "std")] +#[allow(unreachable_pub)] +#[doc(hidden)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] +pub use futures_macro::stream_select_internal; + +/// Combines several streams, all producing the same `Item` type, into one stream. +/// This is similar to `select_all` but does not require the streams to all be the same type. +/// It also keeps the streams inline, and does not require `Box<dyn Stream>`s to be allocated. +/// Streams passed to this macro must be `Unpin`. +/// +/// If multiple streams are ready, one will be pseudo randomly selected at runtime. +/// +/// This macro is gated behind the `async-await` feature of this library, which is activated by default. +/// Note that `stream_select!` relies on `proc-macro-hack`, and may require to set the compiler's recursion +/// limit very high, e.g. `#![recursion_limit="1024"]`. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::{stream, StreamExt, stream_select}; +/// let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()).fuse(); +/// +/// let mut endless_numbers = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3)); +/// match endless_numbers.next().await { +/// Some(1) => println!("Got a 1"), +/// Some(2) => println!("Got a 2"), +/// Some(3) => println!("Got a 3"), +/// _ => unreachable!(), +/// } +/// # }); +/// ``` +#[cfg(feature = "std")] +#[macro_export] +macro_rules! stream_select { + ($($tokens:tt)*) => {{ + use $crate::__private as __futures_crate; + $crate::stream_select_internal! { + $( $tokens )* + } + }} +} diff --git a/src/future/join_all.rs b/src/future/join_all.rs index 427e71c..2e52ac1 100644 --- a/src/future/join_all.rs +++ b/src/future/join_all.rs @@ -12,6 +12,9 @@ use core::task::{Context, Poll}; use super::{assert_future, MaybeDone}; +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{Collect, FuturesOrdered, StreamExt}; + fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has @@ -19,13 +22,29 @@ fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) } -/// Future for the [`join_all`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] +/// Future for the [`join_all`] function. pub struct JoinAll<F> where F: Future, { - elems: Pin<Box<[MaybeDone<F>]>>, + kind: JoinAllKind<F>, +} + +#[cfg(not(futures_no_atomic_cas))] +const SMALL: usize = 30; + +pub(crate) enum JoinAllKind<F> +where + F: Future, +{ + Small { + elems: Pin<Box<[MaybeDone<F>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: Collect<FuturesOrdered<F>, Vec<F::Output>>, + }, } impl<F> fmt::Debug for JoinAll<F> @@ -34,7 +53,13 @@ where F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("JoinAll").field("elems", &self.elems).finish() + match self.kind { + JoinAllKind::Small { ref elems } => { + f.debug_struct("JoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } } } @@ -50,10 +75,9 @@ where /// /// # See Also /// -/// This is purposefully a very simple API for basic use-cases. In a lot of -/// cases you will want to use the more powerful -/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does -/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered]. +/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance +/// reasons if the number of futures is large. You may want to look into using it or +/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. /// /// Some examples for additional functionality provided by these are: /// @@ -75,13 +99,33 @@ where /// assert_eq!(join_all(futures).await, [1, 2, 3]); /// # }); /// ``` -pub fn join_all<I>(i: I) -> JoinAll<I::Item> +pub fn join_all<I>(iter: I) -> JoinAll<I::Item> where I: IntoIterator, I::Item: Future, { - let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect(); - assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() }) + #[cfg(futures_no_atomic_cas)] + { + let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into(); + let kind = JoinAllKind::Small { elems }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) + } + #[cfg(not(futures_no_atomic_cas))] + { + let iter = iter.into_iter(); + let kind = match iter.size_hint().1 { + None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }, + Some(max) => { + if max <= SMALL { + let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(); + JoinAllKind::Small { elems } + } else { + JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() } + } + } + }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) + } } impl<F> Future for JoinAll<F> @@ -91,20 +135,27 @@ where type Output = Vec<F::Output>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut all_done = true; + match &mut self.kind { + JoinAllKind::Small { elems } => { + let mut all_done = true; - for elem in iter_pin_mut(self.elems.as_mut()) { - if elem.poll(cx).is_pending() { - all_done = false; - } - } + for elem in iter_pin_mut(elems.as_mut()) { + if elem.poll(cx).is_pending() { + all_done = false; + } + } - if all_done { - let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let result = iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); - Poll::Ready(result) - } else { - Poll::Pending + if all_done { + let mut elems = mem::replace(elems, Box::pin([])); + let result = + iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); + Poll::Ready(result) + } else { + Poll::Pending + } + } + #[cfg(not(futures_no_atomic_cas))] + JoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } diff --git a/src/future/mod.rs b/src/future/mod.rs index cd08264..374e365 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -68,6 +68,9 @@ pub use self::option::OptionFuture; mod poll_fn; pub use self::poll_fn::{poll_fn, PollFn}; +mod poll_immediate; +pub use self::poll_immediate::{poll_immediate, PollImmediate}; + mod ready; pub use self::ready::{err, ok, ready, Ready}; diff --git a/src/future/option.rs b/src/future/option.rs index 426fe50..0bc3777 100644 --- a/src/future/option.rs +++ b/src/future/option.rs @@ -31,6 +31,12 @@ pin_project! { } } +impl<F> Default for OptionFuture<F> { + fn default() -> Self { + Self { inner: None } + } +} + impl<F: Future> Future for OptionFuture<F> { type Output = Option<F::Output>; diff --git a/src/future/poll_immediate.rs b/src/future/poll_immediate.rs new file mode 100644 index 0000000..5ae555c --- /dev/null +++ b/src/future/poll_immediate.rs @@ -0,0 +1,126 @@ +use super::assert_future; +use core::pin::Pin; +use futures_core::task::{Context, Poll}; +use futures_core::{FusedFuture, Future, Stream}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`poll_immediate`](poll_immediate()) function. + /// + /// It will never return [Poll::Pending](core::task::Poll::Pending) + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate<T> { + #[pin] + future: Option<T> + } +} + +impl<T, F> Future for PollImmediate<F> +where + F: Future<Output = T>, +{ + type Output = Option<T>; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let mut this = self.project(); + let inner = + this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion"); + match inner.poll(cx) { + Poll::Ready(t) => { + this.future.set(None); + Poll::Ready(Some(t)) + } + Poll::Pending => Poll::Ready(None), + } + } +} + +impl<T: Future> FusedFuture for PollImmediate<T> { + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} + +/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done. +/// The stream will never return [Poll::Pending](core::task::Poll::Pending) +/// so polling it in a tight loop is worse than using a blocking synchronous function. +/// ``` +/// # futures::executor::block_on(async { +/// use futures::task::Poll; +/// use futures::{StreamExt, future, pin_mut}; +/// use future::FusedFuture; +/// +/// let f = async { 1_u32 }; +/// pin_mut!(f); +/// let mut r = future::poll_immediate(f); +/// assert_eq!(r.next().await, Some(Poll::Ready(1))); +/// +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// let mut p = future::poll_immediate(f); +/// assert_eq!(p.next().await, Some(Poll::Pending)); +/// assert!(!p.is_terminated()); +/// assert_eq!(p.next().await, Some(Poll::Ready(42))); +/// assert!(p.is_terminated()); +/// assert_eq!(p.next().await, None); +/// # }); +/// ``` +impl<T, F> Stream for PollImmediate<F> +where + F: Future<Output = T>, +{ + type Item = Poll<T>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + match this.future.as_mut().as_pin_mut() { + // inner is gone, so we can signal that the stream is closed. + None => Poll::Ready(None), + Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| { + this.future.set(None); + t + }))), + } + } +} + +/// Creates a future that is immediately ready with an Option of a value. +/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready). +/// +/// # Caution +/// +/// When consuming the future by this function, note the following: +/// +/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value. +/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let r = future::poll_immediate(async { 1_u32 }); +/// assert_eq!(r.await, Some(1)); +/// +/// let p = future::poll_immediate(future::pending::<i32>()); +/// assert_eq!(p.await, None); +/// # }); +/// ``` +/// +/// ### Reusing a future +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::{future, pin_mut}; +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// assert_eq!(None, future::poll_immediate(&mut f).await); +/// assert_eq!(42, f.await); +/// # }); +/// ``` +pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> { + assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) }) +} diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index 5931edc..2d585a9 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -1,4 +1,5 @@ use super::DEFAULT_BUF_SIZE; +use futures_core::future::Future; use futures_core::ready; use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] @@ -73,6 +74,40 @@ impl<R: AsyncRead> BufReader<R> { } } +impl<R: AsyncRead + AsyncSeek> BufReader<R> { + /// Seeks relative to the current position. If the new position lies within the buffer, + /// the buffer will not be flushed, allowing for more efficient seeks. + /// This method does not return the location of the underlying reader, so the caller + /// must track this information themselves if it is required. + pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> { + SeeKRelative { inner: self, offset, first: true } + } + + /// Attempts to seek relative to the current position. If the new position lies within the buffer, + /// the buffer will not be flushed, allowing for more efficient seeks. + /// This method does not return the location of the underlying reader, so the caller + /// must track this information themselves if it is required. + pub fn poll_seek_relative( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + offset: i64, + ) -> Poll<io::Result<()>> { + let pos = self.pos as u64; + if offset < 0 { + if let Some(new_pos) = pos.checked_sub((-offset) as u64) { + *self.project().pos = new_pos as usize; + return Poll::Ready(Ok(())); + } + } else if let Some(new_pos) = pos.checked_add(offset as u64) { + if new_pos <= self.cap as u64 { + *self.project().pos = new_pos as usize; + return Poll::Ready(Ok(())); + } + } + self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ())) + } +} + impl<R: AsyncRead> AsyncRead for BufReader<R> { fn poll_read( mut self: Pin<&mut Self>, @@ -163,6 +198,10 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { /// `.into_inner()` immediately after a seek yields the underlying reader /// at the same position. /// + /// To seek without discarding the internal buffer, use + /// [`BufReader::seek_relative`](BufReader::seek_relative) or + /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative). + /// /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. /// /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` @@ -200,3 +239,33 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { Poll::Ready(Ok(result)) } } + +/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct SeeKRelative<'a, R> { + inner: Pin<&'a mut BufReader<R>>, + offset: i64, + first: bool, +} + +impl<R> Future for SeeKRelative<'_, R> +where + R: AsyncRead + AsyncSeek, +{ + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let offset = self.offset; + if self.first { + self.first = false; + self.inner.as_mut().poll_seek_relative(cx, offset) + } else { + self.inner + .as_mut() + .as_mut() + .poll_seek(cx, SeekFrom::Current(offset)) + .map(|res| res.map(|_| ())) + } + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs index b96223d..16cf5a7 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -56,7 +56,7 @@ mod allow_std; pub use self::allow_std::AllowStdIo; mod buf_reader; -pub use self::buf_reader::BufReader; +pub use self::buf_reader::{BufReader, SeeKRelative}; mod buf_writer; pub use self::buf_writer::BufWriter; @@ -301,18 +301,18 @@ macro_rules! delegate_all { } pub mod future; -#[doc(hidden)] +#[doc(no_inline)] pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt}; pub mod stream; -#[doc(hidden)] +#[doc(no_inline)] pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt}; #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub mod sink; #[cfg(feature = "sink")] -#[doc(hidden)] +#[doc(no_inline)] pub use crate::sink::{Sink, SinkExt}; pub mod task; @@ -329,7 +329,7 @@ pub mod compat; pub mod io; #[cfg(feature = "io")] #[cfg(feature = "std")] -#[doc(hidden)] +#[doc(no_inline)] pub use crate::io::{ AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 8cf9f80..ec685b9 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -19,8 +19,8 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; mod stream; pub use self::stream::{ Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, - Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, - SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, + Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome, + Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, }; #[cfg(feature = "std")] @@ -88,6 +88,9 @@ pub use self::pending::{pending, Pending}; mod poll_fn; pub use self::poll_fn::{poll_fn, PollFn}; +mod poll_immediate; +pub use self::poll_immediate::{poll_immediate, PollImmediate}; + mod select; pub use self::select::{select, Select}; diff --git a/src/stream/poll_immediate.rs b/src/stream/poll_immediate.rs new file mode 100644 index 0000000..c7e8a5b --- /dev/null +++ b/src/stream/poll_immediate.rs @@ -0,0 +1,80 @@ +use core::pin::Pin; +use futures_core::task::{Context, Poll}; +use futures_core::Stream; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [poll_immediate](poll_immediate()) function. + /// + /// It will never return [Poll::Pending](core::task::Poll::Pending) + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate<S> { + #[pin] + stream: Option<S> + } +} + +impl<T, S> Stream for PollImmediate<S> +where + S: Stream<Item = T>, +{ + type Item = Poll<T>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + let stream = match this.stream.as_mut().as_pin_mut() { + // inner is gone, so we can continue to signal that the stream is closed. + None => return Poll::Ready(None), + Some(inner) => inner, + }; + + match stream.poll_next(cx) { + Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))), + Poll::Ready(None) => { + this.stream.set(None); + Poll::Ready(None) + } + Poll::Pending => Poll::Ready(Some(Poll::Pending)), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint) + } +} + +impl<S: Stream> super::FusedStream for PollImmediate<S> { + fn is_terminated(&self) -> bool { + self.stream.is_none() + } +} + +/// Creates a new stream that always immediately returns [Poll::Ready](core::task::Poll::Ready) when awaiting it. +/// +/// This is useful when immediacy is more important than waiting for the next item to be ready. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::stream::{self, StreamExt}; +/// use futures::task::Poll; +/// +/// let mut r = stream::poll_immediate(Box::pin(stream::iter(1_u32..3))); +/// assert_eq!(r.next().await, Some(Poll::Ready(1))); +/// assert_eq!(r.next().await, Some(Poll::Ready(2))); +/// assert_eq!(r.next().await, None); +/// +/// let mut p = stream::poll_immediate(Box::pin(stream::once(async { +/// futures::pending!(); +/// 42_u8 +/// }))); +/// assert_eq!(p.next().await, Some(Poll::Pending)); +/// assert_eq!(p.next().await, Some(Poll::Ready(42))); +/// assert_eq!(p.next().await, None); +/// # }); +/// ``` +pub fn poll_immediate<S: Stream>(s: S) -> PollImmediate<S> { + super::assert_stream::<Poll<S::Item>, PollImmediate<S>>(PollImmediate { stream: Some(s) }) +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b3b0155..86997f4 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -131,7 +131,7 @@ pub use self::select_next_some::SelectNextSome; mod peek; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::peek::{NextIf, NextIfEq, Peek, Peekable}; +pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable}; mod skip; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs index 217faba..c72dfc3 100644 --- a/src/stream/stream/peek.rs +++ b/src/stream/stream/peek.rs @@ -33,7 +33,7 @@ impl<St: Stream> Peekable<St> { delegate_access_inner!(stream, St, (.)); - /// Produces a `Peek` future which retrieves a reference to the next item + /// Produces a future which retrieves a reference to the next item /// in the stream, or `None` if the underlying stream terminates. pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { Peek { inner: Some(self) } @@ -57,6 +57,54 @@ impl<St: Stream> Peekable<St> { }) } + /// Produces a future which retrieves a mutable reference to the next item + /// in the stream, or `None` if the underlying stream terminates. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// use futures::pin_mut; + /// + /// let stream = stream::iter(vec![1, 2, 3]).peekable(); + /// pin_mut!(stream); + /// + /// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1)); + /// assert_eq!(stream.as_mut().next().await, Some(1)); + /// + /// // Peek into the stream and modify the value which will be returned next + /// if let Some(p) = stream.as_mut().peek_mut().await { + /// if *p == 2 { + /// *p = 5; + /// } + /// } + /// + /// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]); + /// # }); + /// ``` + pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> { + PeekMut { inner: Some(self) } + } + + /// Peek retrieves a mutable reference to the next item in the stream. + pub fn poll_peek_mut( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<&mut St::Item>> { + let mut this = self.project(); + + Poll::Ready(loop { + if this.peeked.is_some() { + break this.peeked.as_mut(); + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + *this.peeked = Some(item); + } else { + break None; + } + }) + } + /// Creates a future which will consume and return the next value of this /// stream if a condition is true. /// @@ -221,6 +269,48 @@ where } pin_project! { + /// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method. + #[must_use = "futures do nothing unless polled"] + pub struct PeekMut<'a, St: Stream> { + inner: Option<Pin<&'a mut Peekable<St>>>, + } +} + +impl<St> fmt::Debug for PeekMut<'_, St> +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeekMut").field("inner", &self.inner).finish() + } +} + +impl<St: Stream> FusedFuture for PeekMut<'_, St> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<'a, St> Future for PeekMut<'a, St> +where + St: Stream, +{ + type Output = Option<&'a mut St::Item>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = self.project().inner; + if let Some(peekable) = inner { + ready!(peekable.as_mut().poll_peek_mut(cx)); + + inner.take().unwrap().poll_peek_mut(cx) + } else { + panic!("PeekMut polled after completion") + } + } +} + +pin_project! { /// Future for the [`Peekable::next_if`](self::Peekable::next_if) method. #[must_use = "futures do nothing unless polled"] pub struct NextIf<'a, St: Stream, F> { |