diff options
author | Chris Wailes <chriswailes@google.com> | 2023-01-18 22:41:32 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2023-01-18 22:41:32 +0000 |
commit | 5905ccf7a73d0bc487b84adbd62c71d46f21c9a2 (patch) | |
tree | 32543bf48b06816c5b45896873d8b18c39047702 | |
parent | 0655aa9337e2627539de726a74008a688a8ff1eb (diff) | |
parent | 4361fcbb3f069ba7f04bc18bb34c6e29f98dfd8a (diff) | |
download | tokio-stream-5905ccf7a73d0bc487b84adbd62c71d46f21c9a2.tar.gz |
Merge "Upgrade tokio-stream to 0.1.11" am: 3b22d38b59 am: ee83363fd0 am: 4361fcbb3f
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/2346810
Change-Id: I71fbc94dae331e755673c21fd47d0c90389f93af
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r-- | .cargo_vcs_info.json | 7 | ||||
-rw-r--r-- | Android.bp | 2 | ||||
-rw-r--r-- | CHANGELOG.md | 34 | ||||
-rw-r--r-- | Cargo.toml | 40 | ||||
-rw-r--r-- | Cargo.toml.orig | 10 | ||||
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | METADATA | 12 | ||||
-rw-r--r-- | src/lib.rs | 4 | ||||
-rw-r--r-- | src/stream_ext.rs | 177 | ||||
-rw-r--r-- | src/stream_ext/chunks_timeout.rs | 86 | ||||
-rw-r--r-- | src/stream_ext/collect.rs | 8 | ||||
-rw-r--r-- | src/stream_ext/map_while.rs | 52 | ||||
-rw-r--r-- | src/stream_ext/next.rs | 7 | ||||
-rw-r--r-- | src/stream_ext/then.rs | 83 | ||||
-rw-r--r-- | src/stream_ext/try_next.rs | 6 | ||||
-rw-r--r-- | src/stream_map.rs | 9 | ||||
-rw-r--r-- | src/wrappers/broadcast.rs | 4 | ||||
-rw-r--r-- | src/wrappers/watch.rs | 2 | ||||
-rw-r--r-- | tests/chunks_timeout.rs | 84 | ||||
-rw-r--r-- | tests/stream_panic.rs | 55 | ||||
-rw-r--r-- | tests/stream_stream_map.rs | 1 | ||||
-rw-r--r-- | tests/stream_timeout.rs | 4 | ||||
-rw-r--r-- | tests/time_throttle.rs | 2 |
23 files changed, 650 insertions, 41 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index f2822a6..94e3fd4 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "d1a400912e82505c18c6c0c1f05cda06f334e201" - } -} + "sha1": "ae0d49d59c0c63efafde73306af5d0d94046b50d" + }, + "path_in_vcs": "tokio-stream" +}
\ No newline at end of file @@ -23,7 +23,7 @@ rust_library { host_supported: true, crate_name: "tokio_stream", cargo_env_compat: true, - cargo_pkg_version: "0.1.8", + cargo_pkg_version: "0.1.11", srcs: ["src/lib.rs"], edition: "2018", features: [ diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ef469e..05c2b18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,37 @@ +# 0.1.11 (October 11, 2022) + +- time: allow `StreamExt::chunks_timeout` outside of a runtime ([#5036]) + +[#5036]: https://github.com/tokio-rs/tokio/pull/5036 + +# 0.1.10 (Sept 18, 2022) + +- time: add `StreamExt::chunks_timeout` ([#4695]) +- stream: add track_caller to public APIs ([#4786]) + +[#4695]: https://github.com/tokio-rs/tokio/pull/4695 +[#4786]: https://github.com/tokio-rs/tokio/pull/4786 + +# 0.1.9 (June 4, 2022) + +- deps: upgrade `tokio-util` dependency to `0.7.x` ([#3762]) +- stream: add `StreamExt::map_while` ([#4351]) +- stream: add `StreamExt::then` ([#4355]) +- stream: add cancel-safety docs to `StreamExt::next` and `try_next` ([#4715]) +- stream: expose `Elapsed` error ([#4502]) +- stream: expose `Timeout` ([#4601]) +- stream: implement `Extend` for `StreamMap` ([#4272]) +- sync: add `Clone` to `RecvError` types ([#4560]) + +[#3762]: https://github.com/tokio-rs/tokio/pull/3762 +[#4272]: https://github.com/tokio-rs/tokio/pull/4272 +[#4351]: https://github.com/tokio-rs/tokio/pull/4351 +[#4355]: https://github.com/tokio-rs/tokio/pull/4355 +[#4502]: https://github.com/tokio-rs/tokio/pull/4502 +[#4560]: https://github.com/tokio-rs/tokio/pull/4560 +[#4601]: https://github.com/tokio-rs/tokio/pull/4601 +[#4715]: https://github.com/tokio-rs/tokio/pull/4715 + # 0.1.8 (October 29, 2021) - stream: add `From<Receiver<T>>` impl for receiver streams ([#4080]) @@ -11,19 +11,29 @@ [package] edition = "2018" +rust-version = "1.49" name = "tokio-stream" -version = "0.1.8" +version = "0.1.11" authors = ["Tokio Contributors <team@tokio.rs>"] -description = "Utilities to work with `Stream` and `tokio`.\n" +description = """ +Utilities to work with `Stream` and `tokio`. +""" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream" categories = ["asynchronous"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" + [package.metadata.docs.rs] all-features = true -rustc-args = ["--cfg", "docsrs"] -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = [ + "--cfg", + "docsrs", +] +rustc-args = [ + "--cfg", + "docsrs", +] + [dependencies.futures-core] version = "0.3.0" @@ -35,8 +45,9 @@ version = "1.8.0" features = ["sync"] [dependencies.tokio-util] -version = "0.6.3" +version = "0.7.0" optional = true + [dev-dependencies.async-stream] version = "0.3" @@ -44,12 +55,15 @@ version = "0.3" version = "0.3" default-features = false -[dev-dependencies.proptest] -version = "1" +[dev-dependencies.parking_lot] +version = "0.12.0" [dev-dependencies.tokio] version = "1.2.0" -features = ["full", "test-util"] +features = [ + "full", + "test-util", +] [features] default = ["time"] @@ -57,5 +71,11 @@ fs = ["tokio/fs"] io-util = ["tokio/io-util"] net = ["tokio/net"] signal = ["tokio/signal"] -sync = ["tokio/sync", "tokio-util"] +sync = [ + "tokio/sync", + "tokio-util", +] time = ["tokio/time"] + +[target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.proptest] +version = "1" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 83f8551..6dfa978 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -2,17 +2,15 @@ name = "tokio-stream" # When releasing to crates.io: # - Remove path dependencies -# - Update doc url -# - Cargo.toml # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.8" +version = "0.1.11" edition = "2018" +rust-version = "1.49" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream" description = """ Utilities to work with `Stream` and `tokio`. """ @@ -31,14 +29,16 @@ signal = ["tokio/signal"] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] } -tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true } +tokio-util = { version = "0.7.0", path = "../tokio-util", optional = true } [dev-dependencies] tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] } async-stream = "0.3" +parking_lot = "0.12.0" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] proptest = "1" [package.metadata.docs.rs] @@ -1,4 +1,4 @@ -Copyright (c) 2021 Tokio Contributors +Copyright (c) 2022 Tokio Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated @@ -1,3 +1,7 @@ +# This project was upgraded with external_updater. +# Usage: tools/external_updater/updater.sh update rust/crates/tokio-stream +# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md + name: "tokio-stream" description: "Utilities to work with `Stream` and `tokio`." third_party { @@ -7,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.8.crate" + value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.11.crate" } - version: "0.1.8" + version: "0.1.11" license_type: NOTICE last_upgrade_date { year: 2022 - month: 3 - day: 1 + month: 12 + day: 12 } } @@ -10,7 +10,6 @@ unreachable_pub )] #![cfg_attr(docsrs, feature(doc_cfg))] -#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) @@ -78,6 +77,9 @@ pub mod wrappers; mod stream_ext; pub use stream_ext::{collect::FromStream, StreamExt}; +cfg_time! { + pub use stream_ext::timeout::{Elapsed, Timeout}; +} mod empty; pub use empty::{empty, Empty}; diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 1157c9e..6cea7b5 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -1,3 +1,4 @@ +use core::future::Future; use futures_core::Stream; mod all; @@ -27,6 +28,9 @@ use fuse::Fuse; mod map; use map::Map; +mod map_while; +use map_while::MapWhile; + mod merge; use merge::Merge; @@ -39,21 +43,26 @@ use skip::Skip; mod skip_while; use skip_while::SkipWhile; -mod try_next; -use try_next::TryNext; - mod take; use take::Take; mod take_while; use take_while::TakeWhile; +mod then; +use then::Then; + +mod try_next; +use try_next::TryNext; + cfg_time! { - mod timeout; + pub(crate) mod timeout; use timeout::Timeout; use tokio::time::Duration; mod throttle; use throttle::{throttle, Throttle}; + mod chunks_timeout; + use chunks_timeout::ChunksTimeout; } /// An extension trait for the [`Stream`] trait that provides a variety of @@ -106,6 +115,12 @@ pub trait StreamExt: Stream { /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` /// crate. /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// /// # Examples /// /// ``` @@ -142,6 +157,12 @@ pub trait StreamExt: Stream { /// an [`Option<Result<T, E>>`](Option), making for easy use /// with the [`?`](std::ops::Try) operator. /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// /// # Examples /// /// ``` @@ -197,6 +218,93 @@ pub trait StreamExt: Stream { Map::new(self, f) } + /// Map this stream's items to a different type for as long as determined by + /// the provided closure. A stream of the target type will be returned, + /// which will yield elements until the closure returns `None`. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, until it returns `None`. It is executed inline + /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned, + /// the underlying stream will not be polled again. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the [`Iterator::map_while`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let mut stream = stream.map_while(|x| { + /// if x < 4 { + /// Some(x + 3) + /// } else { + /// None + /// } + /// }); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn map_while<T, F>(self, f: F) -> MapWhile<Self, F> + where + F: FnMut(Self::Item) -> Option<T>, + Self: Sized, + { + MapWhile::new(self, f) + } + + /// Maps this stream's items asynchronously to a different type, returning a + /// new stream of the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, and the returned future is executed. Only one + /// future is executed at the time. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `then` methods in the + /// standard library. + /// + /// Be aware that if the future is not `Unpin`, then neither is the `Stream` + /// returned by this method. To handle this, you can use `tokio::pin!` as in + /// the example below or put the stream in a `Box` with `Box::pin(stream)`. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// async fn do_async_work(value: i32) -> i32 { + /// value + 3 + /// } + /// + /// let stream = stream::iter(1..=3); + /// let stream = stream.then(do_async_work); + /// + /// tokio::pin!(stream); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F> + where + F: FnMut(Self::Item) -> Fut, + Fut: Future, + Self: Sized, + { + Then::new(self, f) + } + /// Combine two streams into one by interleaving the output of both as it /// is produced. /// @@ -899,6 +1007,63 @@ pub trait StreamExt: Stream { { throttle(duration, self) } + + /// Batches the items in the given stream using a maximum duration and size for each batch. + /// + /// This stream returns the next batch of items in the following situations: + /// 1. The inner stream has returned at least `max_size` many items since the last batch. + /// 2. The time since the first item of a batch is greater than the given duration. + /// 3. The end of the stream is reached. + /// + /// The length of the returned vector is never empty or greater than the maximum size. Empty batches + /// will not be emitted if no items are received upstream. + /// + /// # Panics + /// + /// This function panics if `max_size` is zero + /// + /// # Example + /// + /// ```rust + /// use std::time::Duration; + /// use tokio::time; + /// use tokio_stream::{self as stream, StreamExt}; + /// use futures::FutureExt; + /// + /// #[tokio::main] + /// # async fn _unused() {} + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// async fn main() { + /// let iter = vec![1, 2, 3, 4].into_iter(); + /// let stream0 = stream::iter(iter); + /// + /// let iter = vec![5].into_iter(); + /// let stream1 = stream::iter(iter) + /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); + /// + /// let chunk_stream = stream0 + /// .chain(stream1) + /// .chunks_timeout(3, Duration::from_secs(2)); + /// tokio::pin!(chunk_stream); + /// + /// // a full batch was received + /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); + /// // deadline was reached before max_size was reached + /// assert_eq!(chunk_stream.next().await, Some(vec![4])); + /// // last element in the stream + /// assert_eq!(chunk_stream.next().await, Some(vec![5])); + /// } + /// ``` + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + #[track_caller] + fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> + where + Self: Sized, + { + assert!(max_size > 0, "`max_size` must be non-zero."); + ChunksTimeout::new(self, max_size, duration) + } } impl<St: ?Sized> StreamExt for St where St: Stream {} @@ -906,10 +1071,10 @@ impl<St: ?Sized> StreamExt for St where St: Stream {} /// Merge the size hints from two streams. fn merge_size_hints( (left_low, left_high): (usize, Option<usize>), - (right_low, right_hign): (usize, Option<usize>), + (right_low, right_high): (usize, Option<usize>), ) -> (usize, Option<usize>) { let low = left_low.saturating_add(right_low); - let high = match (left_high, right_hign) { + let high = match (left_high, right_high) { (Some(h1), Some(h2)) => h1.checked_add(h2), _ => None, }; diff --git a/src/stream_ext/chunks_timeout.rs b/src/stream_ext/chunks_timeout.rs new file mode 100644 index 0000000..48acd93 --- /dev/null +++ b/src/stream_ext/chunks_timeout.rs @@ -0,0 +1,86 @@ +use crate::stream_ext::Fuse; +use crate::Stream; +use tokio::time::{sleep, Sleep}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct ChunksTimeout<S: Stream> { + #[pin] + stream: Fuse<S>, + #[pin] + deadline: Option<Sleep>, + duration: Duration, + items: Vec<S::Item>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl<S: Stream> ChunksTimeout<S> { + pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { + ChunksTimeout { + stream: Fuse::new(stream), + deadline: None, + duration, + items: Vec::with_capacity(max_size), + cap: max_size, + } + } +} + +impl<S: Stream> Stream for ChunksTimeout<S> { + type Item = Vec<S::Item>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut me = self.as_mut().project(); + loop { + match me.stream.as_mut().poll_next(cx) { + Poll::Pending => break, + Poll::Ready(Some(item)) => { + if me.items.is_empty() { + me.deadline.set(Some(sleep(*me.duration))); + me.items.reserve_exact(*me.cap); + } + me.items.push(item); + if me.items.len() >= *me.cap { + return Poll::Ready(Some(std::mem::take(me.items))); + } + } + Poll::Ready(None) => { + // Returning Some here is only correct because we fuse the inner stream. + let last = if me.items.is_empty() { + None + } else { + Some(std::mem::take(me.items)) + }; + + return Poll::Ready(last); + } + } + } + + if !me.items.is_empty() { + if let Some(deadline) = me.deadline.as_pin_mut() { + ready!(deadline.poll(cx)); + } + return Poll::Ready(Some(std::mem::take(me.items))); + } + + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = (lower / self.cap).saturating_add(chunk_len); + let upper = upper.and_then(|x| x.checked_add(chunk_len)); + (lower, upper) + } +} diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs index a33a6d6..4b157a9 100644 --- a/src/stream_ext/collect.rs +++ b/src/stream_ext/collect.rs @@ -66,17 +66,17 @@ where use Poll::Ready; loop { - let mut me = self.as_mut().project(); + let me = self.as_mut().project(); let item = match ready!(me.stream.poll_next(cx)) { Some(item) => item, None => { - return Ready(U::finalize(sealed::Internal, &mut me.collection)); + return Ready(U::finalize(sealed::Internal, me.collection)); } }; - if !U::extend(sealed::Internal, &mut me.collection, item) { - return Ready(U::finalize(sealed::Internal, &mut me.collection)); + if !U::extend(sealed::Internal, me.collection, item) { + return Ready(U::finalize(sealed::Internal, me.collection)); } } } diff --git a/src/stream_ext/map_while.rs b/src/stream_ext/map_while.rs new file mode 100644 index 0000000..d4fd825 --- /dev/null +++ b/src/stream_ext/map_while.rs @@ -0,0 +1,52 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map_while`](super::StreamExt::map_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct MapWhile<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for MapWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapWhile") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> MapWhile<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + MapWhile { stream, f } + } +} + +impl<St, F, T> Stream for MapWhile<St, F> +where + St: Stream, + F: FnMut(St::Item) -> Option<T>, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let me = self.project(); + let f = me.f; + me.stream.poll_next(cx).map(|opt| opt.and_then(f)) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (_, upper) = self.stream.size_hint(); + (0, upper) + } +} diff --git a/src/stream_ext/next.rs b/src/stream_ext/next.rs index 175490c..706069f 100644 --- a/src/stream_ext/next.rs +++ b/src/stream_ext/next.rs @@ -8,6 +8,13 @@ use pin_project_lite::pin_project; pin_project! { /// Future for the [`next`](super::StreamExt::next) method. + /// + /// # Cancel safety + /// + /// This method is cancel safe. It only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Next<'a, St: ?Sized> { diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs new file mode 100644 index 0000000..7f6b5a2 --- /dev/null +++ b/src/stream_ext/then.rs @@ -0,0 +1,83 @@ +use crate::Stream; + +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`then`](super::StreamExt::then) method. + #[must_use = "streams do nothing unless polled"] + pub struct Then<St, Fut, F> { + #[pin] + stream: St, + #[pin] + future: Option<Fut>, + f: F, + } +} + +impl<St, Fut, F> fmt::Debug for Then<St, Fut, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Then") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, Fut, F> Then<St, Fut, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Then { + stream, + future: None, + f, + } + } +} + +impl<St, F, Fut> Stream for Then<St, Fut, F> +where + St: Stream, + Fut: Future, + F: FnMut(St::Item) -> Fut, +{ + type Item = Fut::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> { + let mut me = self.project(); + + loop { + if let Some(future) = me.future.as_mut().as_pin_mut() { + match future.poll(cx) { + Poll::Ready(item) => { + me.future.set(None); + return Poll::Ready(Some(item)); + } + Poll::Pending => return Poll::Pending, + } + } + + match me.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(item)) => { + me.future.set(Some((me.f)(item))); + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let future_len = if self.future.is_some() { 1 } else { 0 }; + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_add(future_len); + let upper = upper.and_then(|upper| upper.checked_add(future_len)); + + (lower, upper) + } +} diff --git a/src/stream_ext/try_next.rs b/src/stream_ext/try_next.rs index af27d87..93aa3bc 100644 --- a/src/stream_ext/try_next.rs +++ b/src/stream_ext/try_next.rs @@ -9,6 +9,12 @@ use pin_project_lite::pin_project; pin_project! { /// Future for the [`try_next`](super::StreamExt::try_next) method. + /// + /// # Cancel safety + /// + /// This method is cancel safe. It only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryNext<'a, St: ?Sized> { diff --git a/src/stream_map.rs b/src/stream_map.rs index 80a521e..2159804 100644 --- a/src/stream_map.rs +++ b/src/stream_map.rs @@ -585,6 +585,15 @@ where } } +impl<K, V> Extend<(K, V)> for StreamMap<K, V> { + fn extend<T>(&mut self, iter: T) + where + T: IntoIterator<Item = (K, V)>, + { + self.entries.extend(iter); + } +} + mod rand { use std::cell::Cell; diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs index c8346a6..10184bf 100644 --- a/src/wrappers/broadcast.rs +++ b/src/wrappers/broadcast.rs @@ -14,11 +14,11 @@ use std::task::{Context, Poll}; /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct BroadcastStream<T> { - inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>, + inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>, } /// An error returned from the inner stream of a [`BroadcastStream`]. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum BroadcastStreamRecvError { /// The receiver lagged too far behind. Attempting to receive again will /// return the oldest message still retained by the channel. diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index bd3a18a..c682c9c 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -49,7 +49,7 @@ use tokio::sync::watch::error::RecvError; /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct WatchStream<T> { - inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>, + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>, } async fn make_future<T: Clone + Send + Sync>( diff --git a/tests/chunks_timeout.rs b/tests/chunks_timeout.rs new file mode 100644 index 0000000..ffc7dea --- /dev/null +++ b/tests/chunks_timeout.rs @@ -0,0 +1,84 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time; +use tokio_stream::{self as stream, StreamExt}; +use tokio_test::assert_pending; +use tokio_test::task; + +use futures::FutureExt; +use std::time::Duration; + +#[tokio::test(start_paused = true)] +async fn usage() { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![4].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(4, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test(start_paused = true)] +async fn full_chunk_with_timeout() { + let iter = vec![1, 2].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![3].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n)); + + let iter = vec![4].into_iter(); + let stream2 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chain(stream2) + .chunks_timeout(3, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test] +#[ignore] +async fn real_time() { + let iter = vec![1, 2, 3, 4].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![5].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(3, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + assert_eq!(chunk_stream.next().await, Some(vec![4])); + assert_eq!(chunk_stream.next().await, Some(vec![5])); +} diff --git a/tests/stream_panic.rs b/tests/stream_panic.rs new file mode 100644 index 0000000..22c1c20 --- /dev/null +++ b/tests/stream_panic.rs @@ -0,0 +1,55 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery + +use parking_lot::{const_mutex, Mutex}; +use std::error::Error; +use std::panic; +use std::sync::Arc; +use tokio::time::Duration; +use tokio_stream::{self as stream, StreamExt}; + +fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> { + static PANIC_MUTEX: Mutex<()> = const_mutex(()); + + { + let _guard = PANIC_MUTEX.lock(); + let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); + + let prev_hook = panic::take_hook(); + { + let panic_file = panic_file.clone(); + panic::set_hook(Box::new(move |panic_info| { + let panic_location = panic_info.location().unwrap(); + panic_file + .lock() + .clone_from(&Some(panic_location.file().to_string())); + })); + } + + let result = panic::catch_unwind(func); + // Return to the previously set panic hook (maybe default) so that we get nice error + // messages in the tests. + panic::set_hook(prev_hook); + + if result.is_err() { + panic_file.lock().clone() + } else { + None + } + } +} + +#[test] +fn stream_chunks_timeout_panic_caller() -> Result<(), Box<dyn Error>> { + let panic_location_file = test_panic(|| { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let _chunk_stream = stream0.chunks_timeout(0, Duration::from_secs(2)); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs index 53f3d86..ffc489b 100644 --- a/tests/stream_stream_map.rs +++ b/tests/stream_stream_map.rs @@ -325,6 +325,7 @@ fn one_ready_many_none() { } } +#[cfg(not(target_os = "wasi"))] proptest::proptest! { #[test] fn fuzz_pending_complete_mix(kinds: Vec<bool>) { diff --git a/tests/stream_timeout.rs b/tests/stream_timeout.rs index 5697ace..2338f83 100644 --- a/tests/stream_timeout.rs +++ b/tests/stream_timeout.rs @@ -1,10 +1,10 @@ -#![cfg(feature = "full")] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] use tokio::time::{self, sleep, Duration}; use tokio_stream::{self, StreamExt}; use tokio_test::*; -use futures::StreamExt as _; +use futures::stream; async fn maybe_sleep(idx: i32) -> i32 { if idx % 2 == 0 { diff --git a/tests/time_throttle.rs b/tests/time_throttle.rs index 42a643b..e6c9917 100644 --- a/tests/time_throttle.rs +++ b/tests/time_throttle.rs @@ -1,5 +1,5 @@ #![warn(rust_2018_idioms)] -#![cfg(feature = "full")] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] use tokio::time; use tokio_stream::StreamExt; |