summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2024-02-05 14:07:57 +0100
committerJeff Vander Stoep <jeffv@google.com>2024-02-05 14:07:57 +0100
commitddb59c87394b27e094ff61ba6ec50ea25e2bac61 (patch)
tree7d82b51451e1a8a40106414e34385ff76a3f7c3e
parent3855d14388ef8a197bd07b1092bece0925fad3ae (diff)
downloadtokio-stream-emu-34-3-release.tar.gz
Upgrade tokio-stream to 0.1.14emu-34-3-release
This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update external/rust/crates/tokio-stream For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md Test: TreeHugger Change-Id: I3b4ee2f213083dd174d39257fd6dad9fd811fa2d
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp4
-rw-r--r--CHANGELOG.md23
-rw-r--r--Cargo.toml21
-rw-r--r--Cargo.toml.orig18
-rw-r--r--METADATA25
-rw-r--r--src/lib.rs3
-rw-r--r--src/stream_close.rs93
-rw-r--r--src/stream_ext.rs116
-rw-r--r--src/stream_ext/timeout.rs2
-rw-r--r--src/stream_ext/timeout_repeating.rs56
-rw-r--r--src/stream_map.rs32
-rw-r--r--tests/stream_close.rs11
13 files changed, 374 insertions, 32 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index d866201..a31b22b 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "46f974d8cfcb56c251d80cf1dc4a6bcf9fd1d7a0"
+ "sha1": "398dfda56d3ee4b0d4d9e86abe15039e86979d83"
},
"path_in_vcs": "tokio-stream"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 357bd1c..8083c73 100644
--- a/Android.bp
+++ b/Android.bp
@@ -23,9 +23,9 @@ rust_library {
host_supported: true,
crate_name: "tokio_stream",
cargo_env_compat: true,
- cargo_pkg_version: "0.1.12",
+ cargo_pkg_version: "0.1.14",
srcs: ["src/lib.rs"],
- edition: "2018",
+ edition: "2021",
features: [
"fs",
"io-util",
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c475c7c..c14ad07 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,10 +1,31 @@
-# 0.1.12 (January 20, 2022)
+# 0.1.14 (April 26th, 2023)
+
+This bugfix release bumps the minimum version of Tokio to 1.15, which is
+necessary for `timeout_repeating` to compile. ([#5657])
+
+[#5657]: https://github.com/tokio-rs/tokio/pull/5657
+
+# 0.1.13 (April 25th, 2023)
+
+This release bumps the MSRV of tokio-stream to 1.56.
+
+- stream: add "full" feature flag ([#5639])
+- stream: add `StreamExt::timeout_repeating` ([#5577])
+- stream: add `StreamNotifyClose` ([#4851])
+
+[#4851]: https://github.com/tokio-rs/tokio/pull/4851
+[#5577]: https://github.com/tokio-rs/tokio/pull/5577
+[#5639]: https://github.com/tokio-rs/tokio/pull/5639
+
+# 0.1.12 (January 20, 2023)
- time: remove `Unpin` bound on `Throttle` methods ([#5105])
- time: document that `throttle` operates on ms granularity ([#5101])
+- sync: add `WatchStream::from_changes` ([#5432])
[#5105]: https://github.com/tokio-rs/tokio/pull/5105
[#5101]: https://github.com/tokio-rs/tokio/pull/5101
+[#5432]: https://github.com/tokio-rs/tokio/pull/5432
# 0.1.11 (October 11, 2022)
diff --git a/Cargo.toml b/Cargo.toml
index 5a3542e..897ba84 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,10 +10,10 @@
# See Cargo.toml.orig for the original contents.
[package]
-edition = "2018"
-rust-version = "1.49"
+edition = "2021"
+rust-version = "1.56"
name = "tokio-stream"
-version = "0.1.12"
+version = "0.1.14"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = """
Utilities to work with `Stream` and `tokio`.
@@ -22,14 +22,15 @@ homepage = "https://tokio.rs"
categories = ["asynchronous"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
+resolver = "1"
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = [
+rustc-args = [
"--cfg",
"docsrs",
]
-rustc-args = [
+rustdoc-args = [
"--cfg",
"docsrs",
]
@@ -41,7 +42,7 @@ version = "0.3.0"
version = "0.2.0"
[dependencies.tokio]
-version = "1.8.0"
+version = "1.15.0"
features = ["sync"]
[dependencies.tokio-util]
@@ -68,6 +69,14 @@ features = [
[features]
default = ["time"]
fs = ["tokio/fs"]
+full = [
+ "time",
+ "net",
+ "io-util",
+ "fs",
+ "sync",
+ "signal",
+]
io-util = ["tokio/io-util"]
net = ["tokio/net"]
signal = ["tokio/signal"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index f87b59a..9a90cd3 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -4,9 +4,9 @@ name = "tokio-stream"
# - Remove path dependencies
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.12"
-edition = "2018"
-rust-version = "1.49"
+version = "0.1.14"
+edition = "2021"
+rust-version = "1.56"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
@@ -18,6 +18,16 @@ categories = ["asynchronous"]
[features]
default = ["time"]
+
+full = [
+ "time",
+ "net",
+ "io-util",
+ "fs",
+ "sync",
+ "signal"
+]
+
time = ["tokio/time"]
net = ["tokio/net"]
io-util = ["tokio/io-util"]
@@ -28,7 +38,7 @@ signal = ["tokio/signal"]
[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
-tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] }
+tokio = { version = "1.15.0", path = "../tokio", features = ["sync"] }
tokio-util = { version = "0.7.0", path = "../tokio-util", optional = true }
[dev-dependencies]
diff --git a/METADATA b/METADATA
index 390ea84..5a3e237 100644
--- a/METADATA
+++ b/METADATA
@@ -1,23 +1,20 @@
# This project was upgraded with external_updater.
-# Usage: tools/external_updater/updater.sh update rust/crates/tokio-stream
-# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+# Usage: tools/external_updater/updater.sh update external/rust/crates/tokio-stream
+# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md
name: "tokio-stream"
description: "Utilities to work with `Stream` and `tokio`."
third_party {
- url {
- type: HOMEPAGE
- value: "https://crates.io/crates/tokio-stream"
- }
- url {
- type: ARCHIVE
- value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.12.crate"
- }
- version: "0.1.12"
license_type: NOTICE
last_upgrade_date {
- year: 2023
- month: 3
- day: 30
+ year: 2024
+ month: 2
+ day: 5
+ }
+ homepage: "https://crates.io/crates/tokio-stream"
+ identifier {
+ type: "Archive"
+ value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.14.crate"
+ version: "0.1.14"
}
}
diff --git a/src/lib.rs b/src/lib.rs
index bbd4cef..351c77e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -96,5 +96,8 @@ pub use pending::{pending, Pending};
mod stream_map;
pub use stream_map::StreamMap;
+mod stream_close;
+pub use stream_close::StreamNotifyClose;
+
#[doc(no_inline)]
pub use futures_core::Stream;
diff --git a/src/stream_close.rs b/src/stream_close.rs
new file mode 100644
index 0000000..735acf0
--- /dev/null
+++ b/src/stream_close.rs
@@ -0,0 +1,93 @@
+use crate::Stream;
+use pin_project_lite::pin_project;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// A `Stream` that wraps the values in an `Option`.
+ ///
+ /// Whenever the wrapped stream yields an item, this stream yields that item
+ /// wrapped in `Some`. When the inner stream ends, then this stream first
+ /// yields a `None` item, and then this stream will also end.
+ ///
+ /// # Example
+ ///
+ /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
+ ///
+ /// ```
+ /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut map = StreamMap::new();
+ /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+ /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+ /// map.insert(0, stream);
+ /// map.insert(1, stream2);
+ /// while let Some((key, val)) = map.next().await {
+ /// match val {
+ /// Some(val) => println!("got {val:?} from stream {key:?}"),
+ /// None => println!("stream {key:?} closed"),
+ /// }
+ /// }
+ /// }
+ /// ```
+ #[must_use = "streams do nothing unless polled"]
+ pub struct StreamNotifyClose<S> {
+ #[pin]
+ inner: Option<S>,
+ }
+}
+
+impl<S> StreamNotifyClose<S> {
+ /// Create a new `StreamNotifyClose`.
+ pub fn new(stream: S) -> Self {
+ Self {
+ inner: Some(stream),
+ }
+ }
+
+ /// Get back the inner `Stream`.
+ ///
+ /// Returns `None` if the stream has reached its end.
+ pub fn into_inner(self) -> Option<S> {
+ self.inner
+ }
+}
+
+impl<S> Stream for StreamNotifyClose<S>
+where
+ S: Stream,
+{
+ type Item = Option<S::Item>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ // We can't invoke poll_next after it ended, so we unset the inner stream as a marker.
+ match self
+ .as_mut()
+ .project()
+ .inner
+ .as_pin_mut()
+ .map(|stream| S::poll_next(stream, cx))
+ {
+ Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))),
+ Some(Poll::Ready(None)) => {
+ self.project().inner.set(None);
+ Poll::Ready(Some(None))
+ }
+ Some(Poll::Pending) => Poll::Pending,
+ None => Poll::Ready(None),
+ }
+ }
+
+ #[inline]
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if let Some(inner) = &self.inner {
+ // We always return +1 because when there's stream there's atleast one more item.
+ let (l, u) = inner.size_hint();
+ (l.saturating_add(1), u.and_then(|u| u.checked_add(1)))
+ } else {
+ (0, Some(0))
+ }
+ }
+}
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 52d3202..a4ab8a0 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -57,8 +57,10 @@ use try_next::TryNext;
cfg_time! {
pub(crate) mod timeout;
+ pub(crate) mod timeout_repeating;
use timeout::Timeout;
- use tokio::time::Duration;
+ use timeout_repeating::TimeoutRepeating;
+ use tokio::time::{Duration, Interval};
mod throttle;
use throttle::{throttle, Throttle};
mod chunks_timeout;
@@ -924,7 +926,9 @@ pub trait StreamExt: Stream {
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
- /// stream value once it becomes available.
+ /// stream value once it becomes available. See
+ /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
+ /// where the timeouts will repeat.
///
/// # Notes
///
@@ -971,6 +975,25 @@ pub trait StreamExt: Stream {
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// # }
/// ```
+ ///
+ /// Once a timeout error is received, no further events will be received
+ /// unless the wrapped stream yields a value (timeouts do not repeat).
+ ///
+ /// ```
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// # async fn main() {
+ /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
+ /// use std::time::Duration;
+ /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
+ /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
+ /// tokio::pin!(timeout_stream);
+ ///
+ /// // Only one timeout will be received between values in the source stream.
+ /// assert!(timeout_stream.try_next().await.is_ok());
+ /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
+ /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
+ /// # }
+ /// ```
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout(self, duration: Duration) -> Timeout<Self>
@@ -980,6 +1003,95 @@ pub trait StreamExt: Stream {
Timeout::new(self, duration)
}
+ /// Applies a per-item timeout to the passed stream.
+ ///
+ /// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that
+ /// controls the time each element of the stream has to complete before
+ /// timing out.
+ ///
+ /// If the wrapped stream yields a value before the deadline is reached, the
+ /// value is returned. Otherwise, an error is returned. The caller may decide
+ /// to continue consuming the stream and will eventually get the next source
+ /// stream value once it becomes available. Unlike `timeout()`, if no value
+ /// becomes available before the deadline is reached, additional errors are
+ /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
+ /// for an alternative where the timeouts do not repeat.
+ ///
+ /// # Notes
+ ///
+ /// This function consumes the stream passed into it and returns a
+ /// wrapped version of it.
+ ///
+ /// Polling the returned stream will continue to poll the inner stream even
+ /// if one or more items time out.
+ ///
+ /// # Examples
+ ///
+ /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ /// use std::time::Duration;
+ /// # let int_stream = stream::iter(1..=3);
+ ///
+ /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
+ /// tokio::pin!(int_stream);
+ ///
+ /// // When no items time out, we get the 3 elements in succession:
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ ///
+ /// // If the second item times out, we get an error and continue polling the stream:
+ /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert!(int_stream.try_next().await.is_err());
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ ///
+ /// // If we want to stop consuming the source stream the first time an
+ /// // element times out, we can use the `take_while` operator:
+ /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+ /// let mut int_stream = int_stream.take_while(Result::is_ok);
+ ///
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ /// # }
+ /// ```
+ ///
+ /// Timeout errors will be continuously produced at the specified interval
+ /// until the wrapped stream yields a value.
+ ///
+ /// ```
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// # async fn main() {
+ /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
+ /// use std::time::Duration;
+ /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
+ /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
+ /// tokio::pin!(timeout_stream);
+ ///
+ /// // Multiple timeouts will be received between values in the source stream.
+ /// assert!(timeout_stream.try_next().await.is_ok());
+ /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
+ /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
+ /// // Will eventually receive another value from the source stream...
+ /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
+ /// # }
+ /// ```
+ #[cfg(all(feature = "time"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
+ where
+ Self: Sized,
+ {
+ TimeoutRepeating::new(self, interval)
+ }
+
/// Slows down a stream by enforcing a delay between items.
///
/// The underlying timer behind this utility has a granularity of one millisecond.
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
index a440d20..17d1349 100644
--- a/src/stream_ext/timeout.rs
+++ b/src/stream_ext/timeout.rs
@@ -23,7 +23,7 @@ pin_project! {
}
}
-/// Error returned by `Timeout`.
+/// Error returned by `Timeout` and `TimeoutRepeating`.
#[derive(Debug, PartialEq, Eq)]
pub struct Elapsed(());
diff --git a/src/stream_ext/timeout_repeating.rs b/src/stream_ext/timeout_repeating.rs
new file mode 100644
index 0000000..253d2fd
--- /dev/null
+++ b/src/stream_ext/timeout_repeating.rs
@@ -0,0 +1,56 @@
+use crate::stream_ext::Fuse;
+use crate::{Elapsed, Stream};
+use tokio::time::Interval;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct TimeoutRepeating<S> {
+ #[pin]
+ stream: Fuse<S>,
+ #[pin]
+ interval: Interval,
+ }
+}
+
+impl<S: Stream> TimeoutRepeating<S> {
+ pub(super) fn new(stream: S, interval: Interval) -> Self {
+ TimeoutRepeating {
+ stream: Fuse::new(stream),
+ interval,
+ }
+ }
+}
+
+impl<S: Stream> Stream for TimeoutRepeating<S> {
+ type Item = Result<S::Item, Elapsed>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut me = self.project();
+
+ match me.stream.poll_next(cx) {
+ Poll::Ready(v) => {
+ if v.is_some() {
+ me.interval.reset();
+ }
+ return Poll::Ready(v.map(Ok));
+ }
+ Poll::Pending => {}
+ };
+
+ ready!(me.interval.poll_tick(cx));
+ Poll::Ready(Some(Err(Elapsed::new())))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (lower, _) = self.stream.size_hint();
+
+ // The timeout stream may insert an error an infinite number of times.
+ (lower, None)
+ }
+}
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 2159804..0c11bf1 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -42,10 +42,18 @@ use std::task::{Context, Poll};
/// to be merged, it may be advisable to use tasks sending values on a shared
/// [`mpsc`] channel.
///
+/// # Notes
+///
+/// `StreamMap` removes finished streams automatically, without alerting the user.
+/// In some scenarios, the caller would want to know on closed streams.
+/// To do this, use [`StreamNotifyClose`] as a wrapper to your stream.
+/// It will return None when the stream is closed.
+///
/// [`StreamExt::merge`]: crate::StreamExt::merge
/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
/// [`Box::pin`]: std::boxed::Box::pin
+/// [`StreamNotifyClose`]: crate::StreamNotifyClose
///
/// # Examples
///
@@ -170,6 +178,28 @@ use std::task::{Context, Poll};
/// }
/// }
/// ```
+///
+/// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
+///
+/// ```
+/// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut map = StreamMap::new();
+/// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+/// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+/// map.insert(0, stream);
+/// map.insert(1, stream2);
+/// while let Some((key, val)) = map.next().await {
+/// match val {
+/// Some(val) => println!("got {val:?} from stream {key:?}"),
+/// None => println!("stream {key:?} closed"),
+/// }
+/// }
+/// }
+/// ```
+
#[derive(Debug)]
pub struct StreamMap<K, V> {
/// Streams stored in the map
@@ -568,7 +598,7 @@ where
}
}
-impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V>
+impl<K, V> FromIterator<(K, V)> for StreamMap<K, V>
where
K: Hash + Eq,
{
diff --git a/tests/stream_close.rs b/tests/stream_close.rs
new file mode 100644
index 0000000..9ddb565
--- /dev/null
+++ b/tests/stream_close.rs
@@ -0,0 +1,11 @@
+use tokio_stream::{StreamExt, StreamNotifyClose};
+
+#[tokio::test]
+async fn basic_usage() {
+ let mut stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+
+ assert_eq!(stream.next().await, Some(Some(0)));
+ assert_eq!(stream.next().await, Some(Some(1)));
+ assert_eq!(stream.next().await, Some(None));
+ assert_eq!(stream.next().await, None);
+}