summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Wailes <chriswailes@google.com>2023-01-18 22:41:32 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2023-01-18 22:41:32 +0000
commit5905ccf7a73d0bc487b84adbd62c71d46f21c9a2 (patch)
tree32543bf48b06816c5b45896873d8b18c39047702
parent0655aa9337e2627539de726a74008a688a8ff1eb (diff)
parent4361fcbb3f069ba7f04bc18bb34c6e29f98dfd8a (diff)
downloadtokio-stream-5905ccf7a73d0bc487b84adbd62c71d46f21c9a2.tar.gz
Merge "Upgrade tokio-stream to 0.1.11" am: 3b22d38b59 am: ee83363fd0 am: 4361fcbb3f
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/2346810 Change-Id: I71fbc94dae331e755673c21fd47d0c90389f93af Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--Android.bp2
-rw-r--r--CHANGELOG.md34
-rw-r--r--Cargo.toml40
-rw-r--r--Cargo.toml.orig10
-rw-r--r--LICENSE2
-rw-r--r--METADATA12
-rw-r--r--src/lib.rs4
-rw-r--r--src/stream_ext.rs177
-rw-r--r--src/stream_ext/chunks_timeout.rs86
-rw-r--r--src/stream_ext/collect.rs8
-rw-r--r--src/stream_ext/map_while.rs52
-rw-r--r--src/stream_ext/next.rs7
-rw-r--r--src/stream_ext/then.rs83
-rw-r--r--src/stream_ext/try_next.rs6
-rw-r--r--src/stream_map.rs9
-rw-r--r--src/wrappers/broadcast.rs4
-rw-r--r--src/wrappers/watch.rs2
-rw-r--r--tests/chunks_timeout.rs84
-rw-r--r--tests/stream_panic.rs55
-rw-r--r--tests/stream_stream_map.rs1
-rw-r--r--tests/stream_timeout.rs4
-rw-r--r--tests/time_throttle.rs2
23 files changed, 650 insertions, 41 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index f2822a6..94e3fd4 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "d1a400912e82505c18c6c0c1f05cda06f334e201"
- }
-}
+ "sha1": "ae0d49d59c0c63efafde73306af5d0d94046b50d"
+ },
+ "path_in_vcs": "tokio-stream"
+} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index c619fe9..4b00918 100644
--- a/Android.bp
+++ b/Android.bp
@@ -23,7 +23,7 @@ rust_library {
host_supported: true,
crate_name: "tokio_stream",
cargo_env_compat: true,
- cargo_pkg_version: "0.1.8",
+ cargo_pkg_version: "0.1.11",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4ef469e..05c2b18 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,37 @@
+# 0.1.11 (October 11, 2022)
+
+- time: allow `StreamExt::chunks_timeout` outside of a runtime ([#5036])
+
+[#5036]: https://github.com/tokio-rs/tokio/pull/5036
+
+# 0.1.10 (Sept 18, 2022)
+
+- time: add `StreamExt::chunks_timeout` ([#4695])
+- stream: add track_caller to public APIs ([#4786])
+
+[#4695]: https://github.com/tokio-rs/tokio/pull/4695
+[#4786]: https://github.com/tokio-rs/tokio/pull/4786
+
+# 0.1.9 (June 4, 2022)
+
+- deps: upgrade `tokio-util` dependency to `0.7.x` ([#3762])
+- stream: add `StreamExt::map_while` ([#4351])
+- stream: add `StreamExt::then` ([#4355])
+- stream: add cancel-safety docs to `StreamExt::next` and `try_next` ([#4715])
+- stream: expose `Elapsed` error ([#4502])
+- stream: expose `Timeout` ([#4601])
+- stream: implement `Extend` for `StreamMap` ([#4272])
+- sync: add `Clone` to `RecvError` types ([#4560])
+
+[#3762]: https://github.com/tokio-rs/tokio/pull/3762
+[#4272]: https://github.com/tokio-rs/tokio/pull/4272
+[#4351]: https://github.com/tokio-rs/tokio/pull/4351
+[#4355]: https://github.com/tokio-rs/tokio/pull/4355
+[#4502]: https://github.com/tokio-rs/tokio/pull/4502
+[#4560]: https://github.com/tokio-rs/tokio/pull/4560
+[#4601]: https://github.com/tokio-rs/tokio/pull/4601
+[#4715]: https://github.com/tokio-rs/tokio/pull/4715
+
# 0.1.8 (October 29, 2021)
- stream: add `From<Receiver<T>>` impl for receiver streams ([#4080])
diff --git a/Cargo.toml b/Cargo.toml
index 699d94a..df1dc53 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,19 +11,29 @@
[package]
edition = "2018"
+rust-version = "1.49"
name = "tokio-stream"
-version = "0.1.8"
+version = "0.1.11"
authors = ["Tokio Contributors <team@tokio.rs>"]
-description = "Utilities to work with `Stream` and `tokio`.\n"
+description = """
+Utilities to work with `Stream` and `tokio`.
+"""
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream"
categories = ["asynchronous"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
+
[package.metadata.docs.rs]
all-features = true
-rustc-args = ["--cfg", "docsrs"]
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
+rustc-args = [
+ "--cfg",
+ "docsrs",
+]
+
[dependencies.futures-core]
version = "0.3.0"
@@ -35,8 +45,9 @@ version = "1.8.0"
features = ["sync"]
[dependencies.tokio-util]
-version = "0.6.3"
+version = "0.7.0"
optional = true
+
[dev-dependencies.async-stream]
version = "0.3"
@@ -44,12 +55,15 @@ version = "0.3"
version = "0.3"
default-features = false
-[dev-dependencies.proptest]
-version = "1"
+[dev-dependencies.parking_lot]
+version = "0.12.0"
[dev-dependencies.tokio]
version = "1.2.0"
-features = ["full", "test-util"]
+features = [
+ "full",
+ "test-util",
+]
[features]
default = ["time"]
@@ -57,5 +71,11 @@ fs = ["tokio/fs"]
io-util = ["tokio/io-util"]
net = ["tokio/net"]
signal = ["tokio/signal"]
-sync = ["tokio/sync", "tokio-util"]
+sync = [
+ "tokio/sync",
+ "tokio-util",
+]
time = ["tokio/time"]
+
+[target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.proptest]
+version = "1"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 83f8551..6dfa978 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -2,17 +2,15 @@
name = "tokio-stream"
# When releasing to crates.io:
# - Remove path dependencies
-# - Update doc url
-# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.8"
+version = "0.1.11"
edition = "2018"
+rust-version = "1.49"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream"
description = """
Utilities to work with `Stream` and `tokio`.
"""
@@ -31,14 +29,16 @@ signal = ["tokio/signal"]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] }
-tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true }
+tokio-util = { version = "0.7.0", path = "../tokio-util", optional = true }
[dev-dependencies]
tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] }
async-stream = "0.3"
+parking_lot = "0.12.0"
tokio-test = { path = "../tokio-test" }
futures = { version = "0.3", default-features = false }
+[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
proptest = "1"
[package.metadata.docs.rs]
diff --git a/LICENSE b/LICENSE
index ffa38bb..8af5baf 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2021 Tokio Contributors
+Copyright (c) 2022 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
diff --git a/METADATA b/METADATA
index 68e56e9..1bf6cf9 100644
--- a/METADATA
+++ b/METADATA
@@ -1,3 +1,7 @@
+# This project was upgraded with external_updater.
+# Usage: tools/external_updater/updater.sh update rust/crates/tokio-stream
+# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+
name: "tokio-stream"
description: "Utilities to work with `Stream` and `tokio`."
third_party {
@@ -7,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.8.crate"
+ value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.11.crate"
}
- version: "0.1.8"
+ version: "0.1.11"
license_type: NOTICE
last_upgrade_date {
year: 2022
- month: 3
- day: 1
+ month: 12
+ day: 12
}
}
diff --git a/src/lib.rs b/src/lib.rs
index b7f232f..bbd4cef 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,7 +10,6 @@
unreachable_pub
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
-#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
@@ -78,6 +77,9 @@ pub mod wrappers;
mod stream_ext;
pub use stream_ext::{collect::FromStream, StreamExt};
+cfg_time! {
+ pub use stream_ext::timeout::{Elapsed, Timeout};
+}
mod empty;
pub use empty::{empty, Empty};
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 1157c9e..6cea7b5 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -1,3 +1,4 @@
+use core::future::Future;
use futures_core::Stream;
mod all;
@@ -27,6 +28,9 @@ use fuse::Fuse;
mod map;
use map::Map;
+mod map_while;
+use map_while::MapWhile;
+
mod merge;
use merge::Merge;
@@ -39,21 +43,26 @@ use skip::Skip;
mod skip_while;
use skip_while::SkipWhile;
-mod try_next;
-use try_next::TryNext;
-
mod take;
use take::Take;
mod take_while;
use take_while::TakeWhile;
+mod then;
+use then::Then;
+
+mod try_next;
+use try_next::TryNext;
+
cfg_time! {
- mod timeout;
+ pub(crate) mod timeout;
use timeout::Timeout;
use tokio::time::Duration;
mod throttle;
use throttle::{throttle, Throttle};
+ mod chunks_timeout;
+ use chunks_timeout::ChunksTimeout;
}
/// An extension trait for the [`Stream`] trait that provides a variety of
@@ -106,6 +115,12 @@ pub trait StreamExt: Stream {
/// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
/// crate.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. The returned future only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ ///
/// # Examples
///
/// ```
@@ -142,6 +157,12 @@ pub trait StreamExt: Stream {
/// an [`Option<Result<T, E>>`](Option), making for easy use
/// with the [`?`](std::ops::Try) operator.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. The returned future only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ ///
/// # Examples
///
/// ```
@@ -197,6 +218,93 @@ pub trait StreamExt: Stream {
Map::new(self, f)
}
+ /// Map this stream's items to a different type for as long as determined by
+ /// the provided closure. A stream of the target type will be returned,
+ /// which will yield elements until the closure returns `None`.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available, until it returns `None`. It is executed inline
+ /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
+ /// the underlying stream will not be polled again.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=10);
+ /// let mut stream = stream.map_while(|x| {
+ /// if x < 4 {
+ /// Some(x + 3)
+ /// } else {
+ /// None
+ /// }
+ /// });
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// assert_eq!(stream.next().await, None);
+ /// # }
+ /// ```
+ fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
+ where
+ F: FnMut(Self::Item) -> Option<T>,
+ Self: Sized,
+ {
+ MapWhile::new(self, f)
+ }
+
+ /// Maps this stream's items asynchronously to a different type, returning a
+ /// new stream of the resulting type.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available, and the returned future is executed. Only one
+ /// future is executed at the time.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `then` methods in the
+ /// standard library.
+ ///
+ /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
+ /// returned by this method. To handle this, you can use `tokio::pin!` as in
+ /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// async fn do_async_work(value: i32) -> i32 {
+ /// value + 3
+ /// }
+ ///
+ /// let stream = stream::iter(1..=3);
+ /// let stream = stream.then(do_async_work);
+ ///
+ /// tokio::pin!(stream);
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// # }
+ /// ```
+ fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
+ where
+ F: FnMut(Self::Item) -> Fut,
+ Fut: Future,
+ Self: Sized,
+ {
+ Then::new(self, f)
+ }
+
/// Combine two streams into one by interleaving the output of both as it
/// is produced.
///
@@ -899,6 +1007,63 @@ pub trait StreamExt: Stream {
{
throttle(duration, self)
}
+
+ /// Batches the items in the given stream using a maximum duration and size for each batch.
+ ///
+ /// This stream returns the next batch of items in the following situations:
+ /// 1. The inner stream has returned at least `max_size` many items since the last batch.
+ /// 2. The time since the first item of a batch is greater than the given duration.
+ /// 3. The end of the stream is reached.
+ ///
+ /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
+ /// will not be emitted if no items are received upstream.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if `max_size` is zero
+ ///
+ /// # Example
+ ///
+ /// ```rust
+ /// use std::time::Duration;
+ /// use tokio::time;
+ /// use tokio_stream::{self as stream, StreamExt};
+ /// use futures::FutureExt;
+ ///
+ /// #[tokio::main]
+ /// # async fn _unused() {}
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// async fn main() {
+ /// let iter = vec![1, 2, 3, 4].into_iter();
+ /// let stream0 = stream::iter(iter);
+ ///
+ /// let iter = vec![5].into_iter();
+ /// let stream1 = stream::iter(iter)
+ /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
+ ///
+ /// let chunk_stream = stream0
+ /// .chain(stream1)
+ /// .chunks_timeout(3, Duration::from_secs(2));
+ /// tokio::pin!(chunk_stream);
+ ///
+ /// // a full batch was received
+ /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
+ /// // deadline was reached before max_size was reached
+ /// assert_eq!(chunk_stream.next().await, Some(vec![4]));
+ /// // last element in the stream
+ /// assert_eq!(chunk_stream.next().await, Some(vec![5]));
+ /// }
+ /// ```
+ #[cfg(feature = "time")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ #[track_caller]
+ fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
+ where
+ Self: Sized,
+ {
+ assert!(max_size > 0, "`max_size` must be non-zero.");
+ ChunksTimeout::new(self, max_size, duration)
+ }
}
impl<St: ?Sized> StreamExt for St where St: Stream {}
@@ -906,10 +1071,10 @@ impl<St: ?Sized> StreamExt for St where St: Stream {}
/// Merge the size hints from two streams.
fn merge_size_hints(
(left_low, left_high): (usize, Option<usize>),
- (right_low, right_hign): (usize, Option<usize>),
+ (right_low, right_high): (usize, Option<usize>),
) -> (usize, Option<usize>) {
let low = left_low.saturating_add(right_low);
- let high = match (left_high, right_hign) {
+ let high = match (left_high, right_high) {
(Some(h1), Some(h2)) => h1.checked_add(h2),
_ => None,
};
diff --git a/src/stream_ext/chunks_timeout.rs b/src/stream_ext/chunks_timeout.rs
new file mode 100644
index 0000000..48acd93
--- /dev/null
+++ b/src/stream_ext/chunks_timeout.rs
@@ -0,0 +1,86 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+use tokio::time::{sleep, Sleep};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use std::time::Duration;
+
+pin_project! {
+ /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct ChunksTimeout<S: Stream> {
+ #[pin]
+ stream: Fuse<S>,
+ #[pin]
+ deadline: Option<Sleep>,
+ duration: Duration,
+ items: Vec<S::Item>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+ }
+}
+
+impl<S: Stream> ChunksTimeout<S> {
+ pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
+ ChunksTimeout {
+ stream: Fuse::new(stream),
+ deadline: None,
+ duration,
+ items: Vec::with_capacity(max_size),
+ cap: max_size,
+ }
+ }
+}
+
+impl<S: Stream> Stream for ChunksTimeout<S> {
+ type Item = Vec<S::Item>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut me = self.as_mut().project();
+ loop {
+ match me.stream.as_mut().poll_next(cx) {
+ Poll::Pending => break,
+ Poll::Ready(Some(item)) => {
+ if me.items.is_empty() {
+ me.deadline.set(Some(sleep(*me.duration)));
+ me.items.reserve_exact(*me.cap);
+ }
+ me.items.push(item);
+ if me.items.len() >= *me.cap {
+ return Poll::Ready(Some(std::mem::take(me.items)));
+ }
+ }
+ Poll::Ready(None) => {
+ // Returning Some here is only correct because we fuse the inner stream.
+ let last = if me.items.is_empty() {
+ None
+ } else {
+ Some(std::mem::take(me.items))
+ };
+
+ return Poll::Ready(last);
+ }
+ }
+ }
+
+ if !me.items.is_empty() {
+ if let Some(deadline) = me.deadline.as_pin_mut() {
+ ready!(deadline.poll(cx));
+ }
+ return Poll::Ready(Some(std::mem::take(me.items)));
+ }
+
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let (lower, upper) = self.stream.size_hint();
+ let lower = (lower / self.cap).saturating_add(chunk_len);
+ let upper = upper.and_then(|x| x.checked_add(chunk_len));
+ (lower, upper)
+ }
+}
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
index a33a6d6..4b157a9 100644
--- a/src/stream_ext/collect.rs
+++ b/src/stream_ext/collect.rs
@@ -66,17 +66,17 @@ where
use Poll::Ready;
loop {
- let mut me = self.as_mut().project();
+ let me = self.as_mut().project();
let item = match ready!(me.stream.poll_next(cx)) {
Some(item) => item,
None => {
- return Ready(U::finalize(sealed::Internal, &mut me.collection));
+ return Ready(U::finalize(sealed::Internal, me.collection));
}
};
- if !U::extend(sealed::Internal, &mut me.collection, item) {
- return Ready(U::finalize(sealed::Internal, &mut me.collection));
+ if !U::extend(sealed::Internal, me.collection, item) {
+ return Ready(U::finalize(sealed::Internal, me.collection));
}
}
}
diff --git a/src/stream_ext/map_while.rs b/src/stream_ext/map_while.rs
new file mode 100644
index 0000000..d4fd825
--- /dev/null
+++ b/src/stream_ext/map_while.rs
@@ -0,0 +1,52 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`map_while`](super::StreamExt::map_while) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct MapWhile<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for MapWhile<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MapWhile")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, F> MapWhile<St, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ MapWhile { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for MapWhile<St, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Option<T>,
+{
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let me = self.project();
+ let f = me.f;
+ me.stream.poll_next(cx).map(|opt| opt.and_then(f))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (_, upper) = self.stream.size_hint();
+ (0, upper)
+ }
+}
diff --git a/src/stream_ext/next.rs b/src/stream_ext/next.rs
index 175490c..706069f 100644
--- a/src/stream_ext/next.rs
+++ b/src/stream_ext/next.rs
@@ -8,6 +8,13 @@ use pin_project_lite::pin_project;
pin_project! {
/// Future for the [`next`](super::StreamExt::next) method.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. It only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ ///
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Next<'a, St: ?Sized> {
diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs
new file mode 100644
index 0000000..7f6b5a2
--- /dev/null
+++ b/src/stream_ext/then.rs
@@ -0,0 +1,83 @@
+use crate::Stream;
+
+use core::fmt;
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`then`](super::StreamExt::then) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Then<St, Fut, F> {
+ #[pin]
+ stream: St,
+ #[pin]
+ future: Option<Fut>,
+ f: F,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Then")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> Then<St, Fut, F> {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Then {
+ stream,
+ future: None,
+ f,
+ }
+ }
+}
+
+impl<St, F, Fut> Stream for Then<St, Fut, F>
+where
+ St: Stream,
+ Fut: Future,
+ F: FnMut(St::Item) -> Fut,
+{
+ type Item = Fut::Output;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> {
+ let mut me = self.project();
+
+ loop {
+ if let Some(future) = me.future.as_mut().as_pin_mut() {
+ match future.poll(cx) {
+ Poll::Ready(item) => {
+ me.future.set(None);
+ return Poll::Ready(Some(item));
+ }
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+
+ match me.stream.as_mut().poll_next(cx) {
+ Poll::Ready(Some(item)) => {
+ me.future.set(Some((me.f)(item)));
+ }
+ Poll::Ready(None) => return Poll::Ready(None),
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let future_len = if self.future.is_some() { 1 } else { 0 };
+ let (lower, upper) = self.stream.size_hint();
+
+ let lower = lower.saturating_add(future_len);
+ let upper = upper.and_then(|upper| upper.checked_add(future_len));
+
+ (lower, upper)
+ }
+}
diff --git a/src/stream_ext/try_next.rs b/src/stream_ext/try_next.rs
index af27d87..93aa3bc 100644
--- a/src/stream_ext/try_next.rs
+++ b/src/stream_ext/try_next.rs
@@ -9,6 +9,12 @@ use pin_project_lite::pin_project;
pin_project! {
/// Future for the [`try_next`](super::StreamExt::try_next) method.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. It only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryNext<'a, St: ?Sized> {
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 80a521e..2159804 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -585,6 +585,15 @@ where
}
}
+impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
+ fn extend<T>(&mut self, iter: T)
+ where
+ T: IntoIterator<Item = (K, V)>,
+ {
+ self.entries.extend(iter);
+ }
+}
+
mod rand {
use std::cell::Cell;
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
index c8346a6..10184bf 100644
--- a/src/wrappers/broadcast.rs
+++ b/src/wrappers/broadcast.rs
@@ -14,11 +14,11 @@ use std::task::{Context, Poll};
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct BroadcastStream<T> {
- inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
+ inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>,
}
/// An error returned from the inner stream of a [`BroadcastStream`].
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Clone)]
pub enum BroadcastStreamRecvError {
/// The receiver lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the channel.
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index bd3a18a..c682c9c 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -49,7 +49,7 @@ use tokio::sync::watch::error::RecvError;
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct WatchStream<T> {
- inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>,
}
async fn make_future<T: Clone + Send + Sync>(
diff --git a/tests/chunks_timeout.rs b/tests/chunks_timeout.rs
new file mode 100644
index 0000000..ffc7dea
--- /dev/null
+++ b/tests/chunks_timeout.rs
@@ -0,0 +1,84 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
+
+use tokio::time;
+use tokio_stream::{self as stream, StreamExt};
+use tokio_test::assert_pending;
+use tokio_test::task;
+
+use futures::FutureExt;
+use std::time::Duration;
+
+#[tokio::test(start_paused = true)]
+async fn usage() {
+ let iter = vec![1, 2, 3].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![4].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chunks_timeout(4, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test(start_paused = true)]
+async fn full_chunk_with_timeout() {
+ let iter = vec![1, 2].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![3].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
+
+ let iter = vec![4].into_iter();
+ let stream2 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chain(stream2)
+ .chunks_timeout(3, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test]
+#[ignore]
+async fn real_time() {
+ let iter = vec![1, 2, 3, 4].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![5].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chunks_timeout(3, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+ assert_eq!(chunk_stream.next().await, Some(vec![5]));
+}
diff --git a/tests/stream_panic.rs b/tests/stream_panic.rs
new file mode 100644
index 0000000..22c1c20
--- /dev/null
+++ b/tests/stream_panic.rs
@@ -0,0 +1,55 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery
+
+use parking_lot::{const_mutex, Mutex};
+use std::error::Error;
+use std::panic;
+use std::sync::Arc;
+use tokio::time::Duration;
+use tokio_stream::{self as stream, StreamExt};
+
+fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> {
+ static PANIC_MUTEX: Mutex<()> = const_mutex(());
+
+ {
+ let _guard = PANIC_MUTEX.lock();
+ let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
+
+ let prev_hook = panic::take_hook();
+ {
+ let panic_file = panic_file.clone();
+ panic::set_hook(Box::new(move |panic_info| {
+ let panic_location = panic_info.location().unwrap();
+ panic_file
+ .lock()
+ .clone_from(&Some(panic_location.file().to_string()));
+ }));
+ }
+
+ let result = panic::catch_unwind(func);
+ // Return to the previously set panic hook (maybe default) so that we get nice error
+ // messages in the tests.
+ panic::set_hook(prev_hook);
+
+ if result.is_err() {
+ panic_file.lock().clone()
+ } else {
+ None
+ }
+ }
+}
+
+#[test]
+fn stream_chunks_timeout_panic_caller() -> Result<(), Box<dyn Error>> {
+ let panic_location_file = test_panic(|| {
+ let iter = vec![1, 2, 3].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let _chunk_stream = stream0.chunks_timeout(0, Duration::from_secs(2));
+ });
+
+ // The panic location should be in this file
+ assert_eq!(&panic_location_file.unwrap(), file!());
+
+ Ok(())
+}
diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs
index 53f3d86..ffc489b 100644
--- a/tests/stream_stream_map.rs
+++ b/tests/stream_stream_map.rs
@@ -325,6 +325,7 @@ fn one_ready_many_none() {
}
}
+#[cfg(not(target_os = "wasi"))]
proptest::proptest! {
#[test]
fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
diff --git a/tests/stream_timeout.rs b/tests/stream_timeout.rs
index 5697ace..2338f83 100644
--- a/tests/stream_timeout.rs
+++ b/tests/stream_timeout.rs
@@ -1,10 +1,10 @@
-#![cfg(feature = "full")]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
use tokio::time::{self, sleep, Duration};
use tokio_stream::{self, StreamExt};
use tokio_test::*;
-use futures::StreamExt as _;
+use futures::stream;
async fn maybe_sleep(idx: i32) -> i32 {
if idx % 2 == 0 {
diff --git a/tests/time_throttle.rs b/tests/time_throttle.rs
index 42a643b..e6c9917 100644
--- a/tests/time_throttle.rs
+++ b/tests/time_throttle.rs
@@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
-#![cfg(feature = "full")]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
use tokio::time;
use tokio_stream::StreamExt;