aboutsummaryrefslogtreecommitdiff
path: root/src/stream/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/mod.rs')
-rw-r--r--src/stream/mod.rs121
1 files changed, 112 insertions, 9 deletions
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index 7b061ef..6bf4232 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -1,8 +1,57 @@
//! Stream utilities for Tokio.
//!
-//! A `Stream` is an asynchronous sequence of values. It can be thought of as an asynchronous version of the standard library's `Iterator` trait.
+//! A `Stream` is an asynchronous sequence of values. It can be thought of as
+//! an asynchronous version of the standard library's `Iterator` trait.
//!
-//! This module provides helpers to work with them.
+//! This module provides helpers to work with them. For examples of usage and a more in-depth
+//! description of streams you can also refer to the [streams
+//! tutorial](https://tokio.rs/tokio/tutorial/streams) on the tokio website.
+//!
+//! # Iterating over a Stream
+//!
+//! Due to similarities with the standard library's `Iterator` trait, some new
+//! users may assume that they can use `for in` syntax to iterate over a
+//! `Stream`, but this is unfortunately not possible. Instead, you can use a
+//! `while let` loop as follows:
+//!
+//! ```rust
+//! use tokio::stream::{self, StreamExt};
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let mut stream = stream::iter(vec![0, 1, 2]);
+//!
+//! while let Some(value) = stream.next().await {
+//! println!("Got {}", value);
+//! }
+//! }
+//! ```
+//!
+//! # Returning a Stream from a function
+//!
+//! A common way to stream values from a function is to pass in the sender
+//! half of a channel and use the receiver as the stream. This requires awaiting
+//! both futures to ensure progress is made. Another alternative is the
+//! [async-stream] crate, which contains macros that provide a `yield` keyword
+//! and allow you to return an `impl Stream`.
+//!
+//! [async-stream]: https://docs.rs/async-stream
+//!
+//! # Conversion to and from AsyncRead/AsyncWrite
+//!
+//! It is often desirable to convert a `Stream` into an [`AsyncRead`],
+//! especially when dealing with plaintext formats streamed over the network.
+//! The opposite conversion from an [`AsyncRead`] into a `Stream` is also
+//! another commonly required feature. To enable these conversions,
+//! [`tokio-util`] provides the [`StreamReader`] and [`ReaderStream`]
+//! types when the io feature is enabled.
+//!
+//! [tokio-util]: https://docs.rs/tokio-util/0.3/tokio_util/codec/index.html
+//! [`tokio::io`]: crate::io
+//! [`AsyncRead`]: crate::io::AsyncRead
+//! [`AsyncWrite`]: crate::io::AsyncWrite
+//! [`ReaderStream`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.ReaderStream.html
+//! [`StreamReader`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.StreamReader.html
mod all;
use all::AllFuture;
@@ -71,9 +120,12 @@ use take_while::TakeWhile;
cfg_time! {
mod timeout;
use timeout::Timeout;
- use std::time::Duration;
+ use crate::time::Duration;
+ mod throttle;
+ use crate::stream::throttle::{throttle, Throttle};
}
+#[doc(no_inline)]
pub use futures_core::Stream;
/// An extension trait for `Stream`s that provides a variety of convenient
@@ -215,11 +267,11 @@ pub trait StreamExt: Stream {
/// # /*
/// #[tokio::main]
/// # */
- /// # #[tokio::main(basic_scheduler)]
+ /// # #[tokio::main(flavor = "current_thread")]
/// async fn main() {
/// # time::pause();
- /// let (mut tx1, rx1) = mpsc::channel(10);
- /// let (mut tx2, rx2) = mpsc::channel(10);
+ /// let (tx1, rx1) = mpsc::channel(10);
+ /// let (tx2, rx2) = mpsc::channel(10);
///
/// let mut rx = rx1.merge(rx2);
///
@@ -229,18 +281,18 @@ pub trait StreamExt: Stream {
/// tx1.send(2).await.unwrap();
///
/// // Let the other task send values
- /// time::delay_for(Duration::from_millis(20)).await;
+ /// time::sleep(Duration::from_millis(20)).await;
///
/// tx1.send(4).await.unwrap();
/// });
///
/// tokio::spawn(async move {
/// // Wait for the first task to send values
- /// time::delay_for(Duration::from_millis(5)).await;
+ /// time::sleep(Duration::from_millis(5)).await;
///
/// tx2.send(3).await.unwrap();
///
- /// time::delay_for(Duration::from_millis(25)).await;
+ /// time::sleep(Duration::from_millis(25)).await;
///
/// // Send the final value
/// tx2.send(5).await.unwrap();
@@ -520,6 +572,12 @@ pub trait StreamExt: Stream {
/// Tests if every element of the stream matches a predicate.
///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn all<F>(&mut self, f: F) -> bool;
+ /// ```
+ ///
/// `all()` takes a closure that returns `true` or `false`. It applies
/// this closure to each element of the stream, and if they all return
/// `true`, then so does `all`. If any of them return `false`, it
@@ -575,6 +633,12 @@ pub trait StreamExt: Stream {
/// Tests if any element of the stream matches a predicate.
///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn any<F>(&mut self, f: F) -> bool;
+ /// ```
+ ///
/// `any()` takes a closure that returns `true` or `false`. It applies
/// this closure to each element of the stream, and if any of them return
/// `true`, then so does `any()`. If they all return `false`, it
@@ -664,6 +728,12 @@ pub trait StreamExt: Stream {
/// A combinator that applies a function to every element in a stream
/// producing a single, final value.
///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn fold<B, F>(self, init: B, f: F) -> B;
+ /// ```
+ ///
/// # Examples
/// Basic usage:
/// ```
@@ -687,6 +757,12 @@ pub trait StreamExt: Stream {
/// Drain stream pushing all emitted values into a collection.
///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn collect<T>(self) -> T;
+ /// ```
+ ///
/// `collect` streams all values, awaiting as needed. Values are pushed into
/// a collection. A number of different target collection types are
/// supported, including [`Vec`](std::vec::Vec),
@@ -819,6 +895,33 @@ pub trait StreamExt: Stream {
{
Timeout::new(self, duration)
}
+
+ /// Slows down a stream by enforcing a delay between items.
+ ///
+ /// # Example
+ ///
+ /// Create a throttled stream.
+ /// ```rust,no_run
+ /// use std::time::Duration;
+ /// use tokio::stream::StreamExt;
+ ///
+ /// # async fn dox() {
+ /// let mut item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
+ ///
+ /// loop {
+ /// // The string will be produced at most every 2 seconds
+ /// println!("{:?}", item_stream.next().await);
+ /// }
+ /// # }
+ /// ```
+ #[cfg(all(feature = "time"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ fn throttle(self, duration: Duration) -> Throttle<Self>
+ where
+ Self: Sized,
+ {
+ throttle(duration, self)
+ }
}
impl<St: ?Sized> StreamExt for St where St: Stream {}