summaryrefslogtreecommitdiff
path: root/src/stream_ext.rs
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2024-02-08 01:23:29 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2024-02-08 01:23:29 +0000
commit625ed352181d25277d0bdedba093287038b80b4b (patch)
tree7d82b51451e1a8a40106414e34385ff76a3f7c3e /src/stream_ext.rs
parent4d2b8b6630d30e86f085278b201c41f68b28ca21 (diff)
parentddb59c87394b27e094ff61ba6ec50ea25e2bac61 (diff)
downloadtokio-stream-625ed352181d25277d0bdedba093287038b80b4b.tar.gz
Snap for 11421525 from ddb59c87394b27e094ff61ba6ec50ea25e2bac61 to simpleperf-releasesimpleperf-release
Change-Id: Icf50ae5d21672a740bcc1168edbcac352df91be7
Diffstat (limited to 'src/stream_ext.rs')
-rw-r--r--src/stream_ext.rs116
1 files changed, 114 insertions, 2 deletions
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 52d3202..a4ab8a0 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -57,8 +57,10 @@ use try_next::TryNext;
cfg_time! {
pub(crate) mod timeout;
+ pub(crate) mod timeout_repeating;
use timeout::Timeout;
- use tokio::time::Duration;
+ use timeout_repeating::TimeoutRepeating;
+ use tokio::time::{Duration, Interval};
mod throttle;
use throttle::{throttle, Throttle};
mod chunks_timeout;
@@ -924,7 +926,9 @@ pub trait StreamExt: Stream {
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
- /// stream value once it becomes available.
+ /// stream value once it becomes available. See
+ /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
+ /// where the timeouts will repeat.
///
/// # Notes
///
@@ -971,6 +975,25 @@ pub trait StreamExt: Stream {
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// # }
/// ```
+ ///
+ /// Once a timeout error is received, no further events will be received
+ /// unless the wrapped stream yields a value (timeouts do not repeat).
+ ///
+ /// ```
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// # async fn main() {
+ /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
+ /// use std::time::Duration;
+ /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
+ /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
+ /// tokio::pin!(timeout_stream);
+ ///
+ /// // Only one timeout will be received between values in the source stream.
+ /// assert!(timeout_stream.try_next().await.is_ok());
+ /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
+ /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
+ /// # }
+ /// ```
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout(self, duration: Duration) -> Timeout<Self>
@@ -980,6 +1003,95 @@ pub trait StreamExt: Stream {
Timeout::new(self, duration)
}
+ /// Applies a per-item timeout to the passed stream.
+ ///
+ /// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that
+ /// controls the time each element of the stream has to complete before
+ /// timing out.
+ ///
+ /// If the wrapped stream yields a value before the deadline is reached, the
+ /// value is returned. Otherwise, an error is returned. The caller may decide
+ /// to continue consuming the stream and will eventually get the next source
+ /// stream value once it becomes available. Unlike `timeout()`, if no value
+ /// becomes available before the deadline is reached, additional errors are
+ /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
+ /// for an alternative where the timeouts do not repeat.
+ ///
+ /// # Notes
+ ///
+ /// This function consumes the stream passed into it and returns a
+ /// wrapped version of it.
+ ///
+ /// Polling the returned stream will continue to poll the inner stream even
+ /// if one or more items time out.
+ ///
+ /// # Examples
+ ///
+ /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ /// use std::time::Duration;
+ /// # let int_stream = stream::iter(1..=3);
+ ///
+ /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
+ /// tokio::pin!(int_stream);
+ ///
+ /// // When no items time out, we get the 3 elements in succession:
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ ///
+ /// // If the second item times out, we get an error and continue polling the stream:
+ /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert!(int_stream.try_next().await.is_err());
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ ///
+ /// // If we want to stop consuming the source stream the first time an
+ /// // element times out, we can use the `take_while` operator:
+ /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+ /// let mut int_stream = int_stream.take_while(Result::is_ok);
+ ///
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ /// # }
+ /// ```
+ ///
+ /// Timeout errors will be continuously produced at the specified interval
+ /// until the wrapped stream yields a value.
+ ///
+ /// ```
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// # async fn main() {
+ /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
+ /// use std::time::Duration;
+ /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
+ /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
+ /// tokio::pin!(timeout_stream);
+ ///
+ /// // Multiple timeouts will be received between values in the source stream.
+ /// assert!(timeout_stream.try_next().await.is_ok());
+ /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
+ /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
+ /// // Will eventually receive another value from the source stream...
+ /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
+ /// # }
+ /// ```
+ #[cfg(all(feature = "time"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
+ where
+ Self: Sized,
+ {
+ TimeoutRepeating::new(self, interval)
+ }
+
/// Slows down a stream by enforcing a delay between items.
///
/// The underlying timer behind this utility has a granularity of one millisecond.