diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2024-02-08 01:23:29 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2024-02-08 01:23:29 +0000 |
commit | 625ed352181d25277d0bdedba093287038b80b4b (patch) | |
tree | 7d82b51451e1a8a40106414e34385ff76a3f7c3e /src/stream_ext.rs | |
parent | 4d2b8b6630d30e86f085278b201c41f68b28ca21 (diff) | |
parent | ddb59c87394b27e094ff61ba6ec50ea25e2bac61 (diff) | |
download | tokio-stream-625ed352181d25277d0bdedba093287038b80b4b.tar.gz |
Snap for 11421525 from ddb59c87394b27e094ff61ba6ec50ea25e2bac61 to simpleperf-releasesimpleperf-release
Change-Id: Icf50ae5d21672a740bcc1168edbcac352df91be7
Diffstat (limited to 'src/stream_ext.rs')
-rw-r--r-- | src/stream_ext.rs | 116 |
1 files changed, 114 insertions, 2 deletions
diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 52d3202..a4ab8a0 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -57,8 +57,10 @@ use try_next::TryNext; cfg_time! { pub(crate) mod timeout; + pub(crate) mod timeout_repeating; use timeout::Timeout; - use tokio::time::Duration; + use timeout_repeating::TimeoutRepeating; + use tokio::time::{Duration, Interval}; mod throttle; use throttle::{throttle, Throttle}; mod chunks_timeout; @@ -924,7 +926,9 @@ pub trait StreamExt: Stream { /// If the wrapped stream yields a value before the deadline is reached, the /// value is returned. Otherwise, an error is returned. The caller may decide /// to continue consuming the stream and will eventually get the next source - /// stream value once it becomes available. + /// stream value once it becomes available. See + /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative + /// where the timeouts will repeat. /// /// # Notes /// @@ -971,6 +975,25 @@ pub trait StreamExt: Stream { /// assert_eq!(int_stream.try_next().await, Ok(None)); /// # } /// ``` + /// + /// Once a timeout error is received, no further events will be received + /// unless the wrapped stream yields a value (timeouts do not repeat). + /// + /// ``` + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// # async fn main() { + /// use tokio_stream::{StreamExt, wrappers::IntervalStream}; + /// use std::time::Duration; + /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100))); + /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10)); + /// tokio::pin!(timeout_stream); + /// + /// // Only one timeout will be received between values in the source stream. + /// assert!(timeout_stream.try_next().await.is_ok()); + /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); + /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts"); + /// # } + /// ``` #[cfg(all(feature = "time"))] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] fn timeout(self, duration: Duration) -> Timeout<Self> @@ -980,6 +1003,95 @@ pub trait StreamExt: Stream { Timeout::new(self, duration) } + /// Applies a per-item timeout to the passed stream. + /// + /// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that + /// controls the time each element of the stream has to complete before + /// timing out. + /// + /// If the wrapped stream yields a value before the deadline is reached, the + /// value is returned. Otherwise, an error is returned. The caller may decide + /// to continue consuming the stream and will eventually get the next source + /// stream value once it becomes available. Unlike `timeout()`, if no value + /// becomes available before the deadline is reached, additional errors are + /// returned at the specified interval. See [`timeout`](StreamExt::timeout) + /// for an alternative where the timeouts do not repeat. + /// + /// # Notes + /// + /// This function consumes the stream passed into it and returns a + /// wrapped version of it. + /// + /// Polling the returned stream will continue to poll the inner stream even + /// if one or more items time out. + /// + /// # Examples + /// + /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// use std::time::Duration; + /// # let int_stream = stream::iter(1..=3); + /// + /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1))); + /// tokio::pin!(int_stream); + /// + /// // When no items time out, we get the 3 elements in succession: + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If the second item times out, we get an error and continue polling the stream: + /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert!(int_stream.try_next().await.is_err()); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If we want to stop consuming the source stream the first time an + /// // element times out, we can use the `take_while` operator: + /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// let mut int_stream = int_stream.take_while(Result::is_ok); + /// + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` + /// + /// Timeout errors will be continuously produced at the specified interval + /// until the wrapped stream yields a value. + /// + /// ``` + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// # async fn main() { + /// use tokio_stream::{StreamExt, wrappers::IntervalStream}; + /// use std::time::Duration; + /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); + /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9))); + /// tokio::pin!(timeout_stream); + /// + /// // Multiple timeouts will be received between values in the source stream. + /// assert!(timeout_stream.try_next().await.is_ok()); + /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); + /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout"); + /// // Will eventually receive another value from the source stream... + /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout"); + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> + where + Self: Sized, + { + TimeoutRepeating::new(self, interval) + } + /// Slows down a stream by enforcing a delay between items. /// /// The underlying timer behind this utility has a granularity of one millisecond. |