diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 01:05:48 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 01:05:48 +0000 |
commit | 53eee9da2cdd58c31edece1be1a36627be3e45d2 (patch) | |
tree | a1abb0fc9b7e3d95c763e59b97e3a01b6101b5cf | |
parent | e8fe269bd48a7e2bca21ae08024103ef1a746a77 (diff) | |
parent | 6615c451a218ef4be8eb3bac1a9b81ba8c25f6f5 (diff) | |
download | tokio-stream-android14-mainline-tethering-release.tar.gz |
Snap for 10447354 from 6615c451a218ef4be8eb3bac1a9b81ba8c25f6f5 to mainline-tethering-releaseaml_tet_341610020aml_tet_341511010aml_tet_341411060aml_tet_341310230aml_tet_341112070aml_tet_341010040aml_tet_340913030android14-mainline-tethering-release
Change-Id: I4690225259e4af6fa8ebe78e4e183f4aa79b5fd2
-rw-r--r-- | .cargo_vcs_info.json | 7 | ||||
-rw-r--r-- | Android.bp | 6 | ||||
-rw-r--r-- | CHANGELOG.md | 42 | ||||
-rw-r--r-- | Cargo.toml | 37 | ||||
-rw-r--r-- | Cargo.toml.orig | 11 | ||||
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | METADATA | 12 | ||||
-rw-r--r-- | TEST_MAPPING | 3 | ||||
-rw-r--r-- | cargo2android.json | 2 | ||||
-rw-r--r-- | src/lib.rs | 4 | ||||
-rw-r--r-- | src/stream_ext.rs | 179 | ||||
-rw-r--r-- | src/stream_ext/chunks_timeout.rs | 86 | ||||
-rw-r--r-- | src/stream_ext/collect.rs | 14 | ||||
-rw-r--r-- | src/stream_ext/map_while.rs | 52 | ||||
-rw-r--r-- | src/stream_ext/next.rs | 7 | ||||
-rw-r--r-- | src/stream_ext/then.rs | 83 | ||||
-rw-r--r-- | src/stream_ext/throttle.rs | 4 | ||||
-rw-r--r-- | src/stream_ext/timeout.rs | 2 | ||||
-rw-r--r-- | src/stream_ext/try_next.rs | 6 | ||||
-rw-r--r-- | src/stream_map.rs | 9 | ||||
-rw-r--r-- | src/wrappers/broadcast.rs | 4 | ||||
-rw-r--r-- | src/wrappers/watch.rs | 36 | ||||
-rw-r--r-- | tests/chunks_timeout.rs | 84 | ||||
-rw-r--r-- | tests/stream_panic.rs | 55 | ||||
-rw-r--r-- | tests/stream_stream_map.rs | 56 | ||||
-rw-r--r-- | tests/stream_timeout.rs | 4 | ||||
-rw-r--r-- | tests/time_throttle.rs | 2 | ||||
-rw-r--r-- | tests/watch.rs | 30 |
28 files changed, 726 insertions, 113 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index f2822a6..d866201 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "d1a400912e82505c18c6c0c1f05cda06f334e201" - } -} + "sha1": "46f974d8cfcb56c251d80cf1dc4a6bcf9fd1d7a0" + }, + "path_in_vcs": "tokio-stream" +}
\ No newline at end of file @@ -23,7 +23,7 @@ rust_library { host_supported: true, crate_name: "tokio_stream", cargo_env_compat: true, - cargo_pkg_version: "0.1.8", + cargo_pkg_version: "0.1.12", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -39,7 +39,9 @@ rust_library { ], apex_available: [ "//apex_available:platform", - "com.android.bluetooth", + "com.android.btservices", ], + product_available: true, + vendor_available: true, min_sdk_version: "29", } diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ef469e..c475c7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,45 @@ +# 0.1.12 (January 20, 2022) + +- time: remove `Unpin` bound on `Throttle` methods ([#5105]) +- time: document that `throttle` operates on ms granularity ([#5101]) + +[#5105]: https://github.com/tokio-rs/tokio/pull/5105 +[#5101]: https://github.com/tokio-rs/tokio/pull/5101 + +# 0.1.11 (October 11, 2022) + +- time: allow `StreamExt::chunks_timeout` outside of a runtime ([#5036]) + +[#5036]: https://github.com/tokio-rs/tokio/pull/5036 + +# 0.1.10 (Sept 18, 2022) + +- time: add `StreamExt::chunks_timeout` ([#4695]) +- stream: add track_caller to public APIs ([#4786]) + +[#4695]: https://github.com/tokio-rs/tokio/pull/4695 +[#4786]: https://github.com/tokio-rs/tokio/pull/4786 + +# 0.1.9 (June 4, 2022) + +- deps: upgrade `tokio-util` dependency to `0.7.x` ([#3762]) +- stream: add `StreamExt::map_while` ([#4351]) +- stream: add `StreamExt::then` ([#4355]) +- stream: add cancel-safety docs to `StreamExt::next` and `try_next` ([#4715]) +- stream: expose `Elapsed` error ([#4502]) +- stream: expose `Timeout` ([#4601]) +- stream: implement `Extend` for `StreamMap` ([#4272]) +- sync: add `Clone` to `RecvError` types ([#4560]) + +[#3762]: https://github.com/tokio-rs/tokio/pull/3762 +[#4272]: https://github.com/tokio-rs/tokio/pull/4272 +[#4351]: https://github.com/tokio-rs/tokio/pull/4351 +[#4355]: https://github.com/tokio-rs/tokio/pull/4355 +[#4502]: https://github.com/tokio-rs/tokio/pull/4502 +[#4560]: https://github.com/tokio-rs/tokio/pull/4560 +[#4601]: https://github.com/tokio-rs/tokio/pull/4601 +[#4715]: https://github.com/tokio-rs/tokio/pull/4715 + # 0.1.8 (October 29, 2021) - stream: add `From<Receiver<T>>` impl for receiver streams ([#4080]) @@ -11,19 +11,29 @@ [package] edition = "2018" +rust-version = "1.49" name = "tokio-stream" -version = "0.1.8" +version = "0.1.12" authors = ["Tokio Contributors <team@tokio.rs>"] -description = "Utilities to work with `Stream` and `tokio`.\n" +description = """ +Utilities to work with `Stream` and `tokio`. +""" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream" categories = ["asynchronous"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" + [package.metadata.docs.rs] all-features = true -rustc-args = ["--cfg", "docsrs"] -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = [ + "--cfg", + "docsrs", +] +rustc-args = [ + "--cfg", + "docsrs", +] + [dependencies.futures-core] version = "0.3.0" @@ -35,8 +45,9 @@ version = "1.8.0" features = ["sync"] [dependencies.tokio-util] -version = "0.6.3" +version = "0.7.0" optional = true + [dev-dependencies.async-stream] version = "0.3" @@ -44,12 +55,15 @@ version = "0.3" version = "0.3" default-features = false -[dev-dependencies.proptest] -version = "1" +[dev-dependencies.parking_lot] +version = "0.12.0" [dev-dependencies.tokio] version = "1.2.0" -features = ["full", "test-util"] +features = [ + "full", + "test-util", +] [features] default = ["time"] @@ -57,5 +71,8 @@ fs = ["tokio/fs"] io-util = ["tokio/io-util"] net = ["tokio/net"] signal = ["tokio/signal"] -sync = ["tokio/sync", "tokio-util"] +sync = [ + "tokio/sync", + "tokio-util", +] time = ["tokio/time"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 83f8551..f87b59a 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -2,17 +2,15 @@ name = "tokio-stream" # When releasing to crates.io: # - Remove path dependencies -# - Update doc url -# - Cargo.toml # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.8" +version = "0.1.12" edition = "2018" +rust-version = "1.49" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream" description = """ Utilities to work with `Stream` and `tokio`. """ @@ -31,16 +29,15 @@ signal = ["tokio/signal"] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] } -tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true } +tokio-util = { version = "0.7.0", path = "../tokio-util", optional = true } [dev-dependencies] tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] } async-stream = "0.3" +parking_lot = "0.12.0" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } -proptest = "1" - [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] @@ -1,4 +1,4 @@ -Copyright (c) 2021 Tokio Contributors +Copyright (c) 2023 Tokio Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated @@ -1,3 +1,7 @@ +# This project was upgraded with external_updater. +# Usage: tools/external_updater/updater.sh update rust/crates/tokio-stream +# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md + name: "tokio-stream" description: "Utilities to work with `Stream` and `tokio`." third_party { @@ -7,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.8.crate" + value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.12.crate" } - version: "0.1.8" + version: "0.1.12" license_type: NOTICE last_upgrade_date { - year: 2022 + year: 2023 month: 3 - day: 1 + day: 30 } } diff --git a/TEST_MAPPING b/TEST_MAPPING index dfc3524..bbd2132 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -2,6 +2,9 @@ { "imports": [ { + "path": "external/rust/crates/async-stream" + }, + { "path": "external/rust/crates/tokio" }, { diff --git a/cargo2android.json b/cargo2android.json index e4cd046..aae1f59 100644 --- a/cargo2android.json +++ b/cargo2android.json @@ -1,7 +1,7 @@ { "apex-available": [ "//apex_available:platform", - "com.android.bluetooth" + "com.android.btservices" ], "device": true, "features": "time,net,io-util,fs", @@ -10,7 +10,6 @@ unreachable_pub )] #![cfg_attr(docsrs, feature(doc_cfg))] -#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) @@ -78,6 +77,9 @@ pub mod wrappers; mod stream_ext; pub use stream_ext::{collect::FromStream, StreamExt}; +cfg_time! { + pub use stream_ext::timeout::{Elapsed, Timeout}; +} mod empty; pub use empty::{empty, Empty}; diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 1157c9e..52d3202 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -1,3 +1,4 @@ +use core::future::Future; use futures_core::Stream; mod all; @@ -27,6 +28,9 @@ use fuse::Fuse; mod map; use map::Map; +mod map_while; +use map_while::MapWhile; + mod merge; use merge::Merge; @@ -39,21 +43,26 @@ use skip::Skip; mod skip_while; use skip_while::SkipWhile; -mod try_next; -use try_next::TryNext; - mod take; use take::Take; mod take_while; use take_while::TakeWhile; +mod then; +use then::Then; + +mod try_next; +use try_next::TryNext; + cfg_time! { - mod timeout; + pub(crate) mod timeout; use timeout::Timeout; use tokio::time::Duration; mod throttle; use throttle::{throttle, Throttle}; + mod chunks_timeout; + use chunks_timeout::ChunksTimeout; } /// An extension trait for the [`Stream`] trait that provides a variety of @@ -106,6 +115,12 @@ pub trait StreamExt: Stream { /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` /// crate. /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// /// # Examples /// /// ``` @@ -142,6 +157,12 @@ pub trait StreamExt: Stream { /// an [`Option<Result<T, E>>`](Option), making for easy use /// with the [`?`](std::ops::Try) operator. /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// /// # Examples /// /// ``` @@ -197,6 +218,93 @@ pub trait StreamExt: Stream { Map::new(self, f) } + /// Map this stream's items to a different type for as long as determined by + /// the provided closure. A stream of the target type will be returned, + /// which will yield elements until the closure returns `None`. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, until it returns `None`. It is executed inline + /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned, + /// the underlying stream will not be polled again. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the [`Iterator::map_while`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let mut stream = stream.map_while(|x| { + /// if x < 4 { + /// Some(x + 3) + /// } else { + /// None + /// } + /// }); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn map_while<T, F>(self, f: F) -> MapWhile<Self, F> + where + F: FnMut(Self::Item) -> Option<T>, + Self: Sized, + { + MapWhile::new(self, f) + } + + /// Maps this stream's items asynchronously to a different type, returning a + /// new stream of the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, and the returned future is executed. Only one + /// future is executed at the time. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `then` methods in the + /// standard library. + /// + /// Be aware that if the future is not `Unpin`, then neither is the `Stream` + /// returned by this method. To handle this, you can use `tokio::pin!` as in + /// the example below or put the stream in a `Box` with `Box::pin(stream)`. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// async fn do_async_work(value: i32) -> i32 { + /// value + 3 + /// } + /// + /// let stream = stream::iter(1..=3); + /// let stream = stream.then(do_async_work); + /// + /// tokio::pin!(stream); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F> + where + F: FnMut(Self::Item) -> Fut, + Fut: Future, + Self: Sized, + { + Then::new(self, f) + } + /// Combine two streams into one by interleaving the output of both as it /// is produced. /// @@ -874,6 +982,8 @@ pub trait StreamExt: Stream { /// Slows down a stream by enforcing a delay between items. /// + /// The underlying timer behind this utility has a granularity of one millisecond. + /// /// # Example /// /// Create a throttled stream. @@ -899,6 +1009,63 @@ pub trait StreamExt: Stream { { throttle(duration, self) } + + /// Batches the items in the given stream using a maximum duration and size for each batch. + /// + /// This stream returns the next batch of items in the following situations: + /// 1. The inner stream has returned at least `max_size` many items since the last batch. + /// 2. The time since the first item of a batch is greater than the given duration. + /// 3. The end of the stream is reached. + /// + /// The length of the returned vector is never empty or greater than the maximum size. Empty batches + /// will not be emitted if no items are received upstream. + /// + /// # Panics + /// + /// This function panics if `max_size` is zero + /// + /// # Example + /// + /// ```rust + /// use std::time::Duration; + /// use tokio::time; + /// use tokio_stream::{self as stream, StreamExt}; + /// use futures::FutureExt; + /// + /// #[tokio::main] + /// # async fn _unused() {} + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// async fn main() { + /// let iter = vec![1, 2, 3, 4].into_iter(); + /// let stream0 = stream::iter(iter); + /// + /// let iter = vec![5].into_iter(); + /// let stream1 = stream::iter(iter) + /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); + /// + /// let chunk_stream = stream0 + /// .chain(stream1) + /// .chunks_timeout(3, Duration::from_secs(2)); + /// tokio::pin!(chunk_stream); + /// + /// // a full batch was received + /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); + /// // deadline was reached before max_size was reached + /// assert_eq!(chunk_stream.next().await, Some(vec![4])); + /// // last element in the stream + /// assert_eq!(chunk_stream.next().await, Some(vec![5])); + /// } + /// ``` + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + #[track_caller] + fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> + where + Self: Sized, + { + assert!(max_size > 0, "`max_size` must be non-zero."); + ChunksTimeout::new(self, max_size, duration) + } } impl<St: ?Sized> StreamExt for St where St: Stream {} @@ -906,10 +1073,10 @@ impl<St: ?Sized> StreamExt for St where St: Stream {} /// Merge the size hints from two streams. fn merge_size_hints( (left_low, left_high): (usize, Option<usize>), - (right_low, right_hign): (usize, Option<usize>), + (right_low, right_high): (usize, Option<usize>), ) -> (usize, Option<usize>) { let low = left_low.saturating_add(right_low); - let high = match (left_high, right_hign) { + let high = match (left_high, right_high) { (Some(h1), Some(h2)) => h1.checked_add(h2), _ => None, }; diff --git a/src/stream_ext/chunks_timeout.rs b/src/stream_ext/chunks_timeout.rs new file mode 100644 index 0000000..48acd93 --- /dev/null +++ b/src/stream_ext/chunks_timeout.rs @@ -0,0 +1,86 @@ +use crate::stream_ext::Fuse; +use crate::Stream; +use tokio::time::{sleep, Sleep}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct ChunksTimeout<S: Stream> { + #[pin] + stream: Fuse<S>, + #[pin] + deadline: Option<Sleep>, + duration: Duration, + items: Vec<S::Item>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl<S: Stream> ChunksTimeout<S> { + pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { + ChunksTimeout { + stream: Fuse::new(stream), + deadline: None, + duration, + items: Vec::with_capacity(max_size), + cap: max_size, + } + } +} + +impl<S: Stream> Stream for ChunksTimeout<S> { + type Item = Vec<S::Item>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut me = self.as_mut().project(); + loop { + match me.stream.as_mut().poll_next(cx) { + Poll::Pending => break, + Poll::Ready(Some(item)) => { + if me.items.is_empty() { + me.deadline.set(Some(sleep(*me.duration))); + me.items.reserve_exact(*me.cap); + } + me.items.push(item); + if me.items.len() >= *me.cap { + return Poll::Ready(Some(std::mem::take(me.items))); + } + } + Poll::Ready(None) => { + // Returning Some here is only correct because we fuse the inner stream. + let last = if me.items.is_empty() { + None + } else { + Some(std::mem::take(me.items)) + }; + + return Poll::Ready(last); + } + } + } + + if !me.items.is_empty() { + if let Some(deadline) = me.deadline.as_pin_mut() { + ready!(deadline.poll(cx)); + } + return Poll::Ready(Some(std::mem::take(me.items))); + } + + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = (lower / self.cap).saturating_add(chunk_len); + let upper = upper.and_then(|x| x.checked_add(chunk_len)); + (lower, upper) + } +} diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs index a33a6d6..8548b74 100644 --- a/src/stream_ext/collect.rs +++ b/src/stream_ext/collect.rs @@ -66,17 +66,17 @@ where use Poll::Ready; loop { - let mut me = self.as_mut().project(); + let me = self.as_mut().project(); let item = match ready!(me.stream.poll_next(cx)) { Some(item) => item, None => { - return Ready(U::finalize(sealed::Internal, &mut me.collection)); + return Ready(U::finalize(sealed::Internal, me.collection)); } }; - if !U::extend(sealed::Internal, &mut me.collection, item) { - return Ready(U::finalize(sealed::Internal, &mut me.collection)); + if !U::extend(sealed::Internal, me.collection, item) { + return Ready(U::finalize(sealed::Internal, me.collection)); } } } @@ -195,11 +195,7 @@ where } else { let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); - if let Err(err) = res { - Err(err) - } else { - unreachable!(); - } + Err(res.map(drop).unwrap_err()) } } } diff --git a/src/stream_ext/map_while.rs b/src/stream_ext/map_while.rs new file mode 100644 index 0000000..d4fd825 --- /dev/null +++ b/src/stream_ext/map_while.rs @@ -0,0 +1,52 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map_while`](super::StreamExt::map_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct MapWhile<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for MapWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapWhile") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> MapWhile<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + MapWhile { stream, f } + } +} + +impl<St, F, T> Stream for MapWhile<St, F> +where + St: Stream, + F: FnMut(St::Item) -> Option<T>, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let me = self.project(); + let f = me.f; + me.stream.poll_next(cx).map(|opt| opt.and_then(f)) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (_, upper) = self.stream.size_hint(); + (0, upper) + } +} diff --git a/src/stream_ext/next.rs b/src/stream_ext/next.rs index 175490c..706069f 100644 --- a/src/stream_ext/next.rs +++ b/src/stream_ext/next.rs @@ -8,6 +8,13 @@ use pin_project_lite::pin_project; pin_project! { /// Future for the [`next`](super::StreamExt::next) method. + /// + /// # Cancel safety + /// + /// This method is cancel safe. It only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Next<'a, St: ?Sized> { diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs new file mode 100644 index 0000000..cc7caa7 --- /dev/null +++ b/src/stream_ext/then.rs @@ -0,0 +1,83 @@ +use crate::Stream; + +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`then`](super::StreamExt::then) method. + #[must_use = "streams do nothing unless polled"] + pub struct Then<St, Fut, F> { + #[pin] + stream: St, + #[pin] + future: Option<Fut>, + f: F, + } +} + +impl<St, Fut, F> fmt::Debug for Then<St, Fut, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Then") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, Fut, F> Then<St, Fut, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Then { + stream, + future: None, + f, + } + } +} + +impl<St, F, Fut> Stream for Then<St, Fut, F> +where + St: Stream, + Fut: Future, + F: FnMut(St::Item) -> Fut, +{ + type Item = Fut::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> { + let mut me = self.project(); + + loop { + if let Some(future) = me.future.as_mut().as_pin_mut() { + match future.poll(cx) { + Poll::Ready(item) => { + me.future.set(None); + return Poll::Ready(Some(item)); + } + Poll::Pending => return Poll::Pending, + } + } + + match me.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(item)) => { + me.future.set(Some((me.f)(item))); + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let future_len = usize::from(self.future.is_some()); + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_add(future_len); + let upper = upper.and_then(|upper| upper.checked_add(future_len)); + + (lower, upper) + } +} diff --git a/src/stream_ext/throttle.rs b/src/stream_ext/throttle.rs index f36c66a..5000139 100644 --- a/src/stream_ext/throttle.rs +++ b/src/stream_ext/throttle.rs @@ -4,7 +4,6 @@ use crate::Stream; use tokio::time::{Duration, Instant, Sleep}; use std::future::Future; -use std::marker::Unpin; use std::pin::Pin; use std::task::{self, Poll}; @@ -41,8 +40,7 @@ pin_project! { } } -// XXX: are these safe if `T: !Unpin`? -impl<T: Unpin> Throttle<T> { +impl<T> Throttle<T> { /// Acquires a reference to the underlying stream that this combinator is /// pulling from. pub fn get_ref(&self) -> &T { diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs index 98d7cd5..a440d20 100644 --- a/src/stream_ext/timeout.rs +++ b/src/stream_ext/timeout.rs @@ -24,7 +24,7 @@ pin_project! { } /// Error returned by `Timeout`. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub struct Elapsed(()); impl<S: Stream> Timeout<S> { diff --git a/src/stream_ext/try_next.rs b/src/stream_ext/try_next.rs index af27d87..93aa3bc 100644 --- a/src/stream_ext/try_next.rs +++ b/src/stream_ext/try_next.rs @@ -9,6 +9,12 @@ use pin_project_lite::pin_project; pin_project! { /// Future for the [`try_next`](super::StreamExt::try_next) method. + /// + /// # Cancel safety + /// + /// This method is cancel safe. It only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryNext<'a, St: ?Sized> { diff --git a/src/stream_map.rs b/src/stream_map.rs index 80a521e..2159804 100644 --- a/src/stream_map.rs +++ b/src/stream_map.rs @@ -585,6 +585,15 @@ where } } +impl<K, V> Extend<(K, V)> for StreamMap<K, V> { + fn extend<T>(&mut self, iter: T) + where + T: IntoIterator<Item = (K, V)>, + { + self.entries.extend(iter); + } +} + mod rand { use std::cell::Cell; diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs index c8346a6..7110664 100644 --- a/src/wrappers/broadcast.rs +++ b/src/wrappers/broadcast.rs @@ -14,11 +14,11 @@ use std::task::{Context, Poll}; /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct BroadcastStream<T> { - inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>, + inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>, } /// An error returned from the inner stream of a [`BroadcastStream`]. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStreamRecvError { /// The receiver lagged too far behind. Attempting to receive again will /// return the oldest message still retained by the channel. diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index bd3a18a..ec8ead0 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// -/// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardless of whether it was the initial value or sent afterwards. +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream<T>::from_changes`]. /// /// # Examples /// @@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError; /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new(rx); /// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream<T>::from_changes`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use futures::future::FutureExt; +/// use tokio::sync::watch; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::from_changes(rx); +/// +/// // no output from rx is available at this point - let's check this: +/// assert!(rx.next().now_or_never().is_none()); +/// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); /// # } @@ -49,7 +72,7 @@ use tokio::sync::watch::error::RecvError; /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct WatchStream<T> { - inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>, + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>, } async fn make_future<T: Clone + Send + Sync>( @@ -66,6 +89,13 @@ impl<T: 'static + Clone + Send + Sync> WatchStream<T> { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn from_changes(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } } impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { diff --git a/tests/chunks_timeout.rs b/tests/chunks_timeout.rs new file mode 100644 index 0000000..ffc7dea --- /dev/null +++ b/tests/chunks_timeout.rs @@ -0,0 +1,84 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time; +use tokio_stream::{self as stream, StreamExt}; +use tokio_test::assert_pending; +use tokio_test::task; + +use futures::FutureExt; +use std::time::Duration; + +#[tokio::test(start_paused = true)] +async fn usage() { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![4].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(4, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test(start_paused = true)] +async fn full_chunk_with_timeout() { + let iter = vec![1, 2].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![3].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n)); + + let iter = vec![4].into_iter(); + let stream2 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chain(stream2) + .chunks_timeout(3, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test] +#[ignore] +async fn real_time() { + let iter = vec![1, 2, 3, 4].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![5].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(3, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + assert_eq!(chunk_stream.next().await, Some(vec![4])); + assert_eq!(chunk_stream.next().await, Some(vec![5])); +} diff --git a/tests/stream_panic.rs b/tests/stream_panic.rs new file mode 100644 index 0000000..22c1c20 --- /dev/null +++ b/tests/stream_panic.rs @@ -0,0 +1,55 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery + +use parking_lot::{const_mutex, Mutex}; +use std::error::Error; +use std::panic; +use std::sync::Arc; +use tokio::time::Duration; +use tokio_stream::{self as stream, StreamExt}; + +fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> { + static PANIC_MUTEX: Mutex<()> = const_mutex(()); + + { + let _guard = PANIC_MUTEX.lock(); + let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); + + let prev_hook = panic::take_hook(); + { + let panic_file = panic_file.clone(); + panic::set_hook(Box::new(move |panic_info| { + let panic_location = panic_info.location().unwrap(); + panic_file + .lock() + .clone_from(&Some(panic_location.file().to_string())); + })); + } + + let result = panic::catch_unwind(func); + // Return to the previously set panic hook (maybe default) so that we get nice error + // messages in the tests. + panic::set_hook(prev_hook); + + if result.is_err() { + panic_file.lock().clone() + } else { + None + } + } +} + +#[test] +fn stream_chunks_timeout_panic_caller() -> Result<(), Box<dyn Error>> { + let panic_location_file = test_panic(|| { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let _chunk_stream = stream0.chunks_timeout(0, Duration::from_secs(2)); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs index 53f3d86..b6b87e9 100644 --- a/tests/stream_stream_map.rs +++ b/tests/stream_stream_map.rs @@ -325,62 +325,6 @@ fn one_ready_many_none() { } } -proptest::proptest! { - #[test] - fn fuzz_pending_complete_mix(kinds: Vec<bool>) { - use std::task::{Context, Poll}; - - struct DidPoll<T> { - did_poll: bool, - inner: T, - } - - impl<T: Stream + Unpin> Stream for DidPoll<T> { - type Item = T::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<Option<T::Item>> - { - self.did_poll = true; - Pin::new(&mut self.inner).poll_next(cx) - } - } - - for _ in 0..10 { - let mut map = task::spawn(StreamMap::new()); - let mut expect = 0; - - for (i, &is_empty) in kinds.iter().enumerate() { - let inner = if is_empty { - pin_box(stream::empty::<()>()) - } else { - expect += 1; - pin_box(stream::pending::<()>()) - }; - - let stream = DidPoll { - did_poll: false, - inner, - }; - - map.insert(i, stream); - } - - if expect == 0 { - assert_ready_none!(map.poll_next()); - } else { - assert_pending!(map.poll_next()); - - assert_eq!(expect, map.values().count()); - - for stream in map.values() { - assert!(stream.did_poll); - } - } - } - } -} - fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> { Box::pin(s) } diff --git a/tests/stream_timeout.rs b/tests/stream_timeout.rs index 5697ace..2338f83 100644 --- a/tests/stream_timeout.rs +++ b/tests/stream_timeout.rs @@ -1,10 +1,10 @@ -#![cfg(feature = "full")] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] use tokio::time::{self, sleep, Duration}; use tokio_stream::{self, StreamExt}; use tokio_test::*; -use futures::StreamExt as _; +use futures::stream; async fn maybe_sleep(idx: i32) -> i32 { if idx % 2 == 0 { diff --git a/tests/time_throttle.rs b/tests/time_throttle.rs index 42a643b..e6c9917 100644 --- a/tests/time_throttle.rs +++ b/tests/time_throttle.rs @@ -1,5 +1,5 @@ #![warn(rust_2018_idioms)] -#![cfg(feature = "full")] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] use tokio::time; use tokio_stream::StreamExt; diff --git a/tests/watch.rs b/tests/watch.rs index a56254e..3a39aaf 100644 --- a/tests/watch.rs +++ b/tests/watch.rs @@ -3,9 +3,11 @@ use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tokio_stream::StreamExt; +use tokio_test::assert_pending; +use tokio_test::task::spawn; #[tokio::test] -async fn message_not_twice() { +async fn watch_stream_message_not_twice() { let (tx, rx) = watch::channel("hello"); let mut counter = 0; @@ -27,3 +29,29 @@ async fn message_not_twice() { drop(tx); task.await.unwrap(); } + +#[tokio::test] +async fn watch_stream_from_rx() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from(rx); + + assert_eq!(stream.next().await.unwrap(), "hello"); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} + +#[tokio::test] +async fn watch_stream_from_changes() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from_changes(rx); + + assert_pending!(spawn(&mut stream).poll_next()); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} |