diff options
Diffstat (limited to 'src/stream/mod.rs')
-rw-r--r-- | src/stream/mod.rs | 121 |
1 files changed, 112 insertions, 9 deletions
diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 7b061ef..6bf4232 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -1,8 +1,57 @@ //! Stream utilities for Tokio. //! -//! A `Stream` is an asynchronous sequence of values. It can be thought of as an asynchronous version of the standard library's `Iterator` trait. +//! A `Stream` is an asynchronous sequence of values. It can be thought of as +//! an asynchronous version of the standard library's `Iterator` trait. //! -//! This module provides helpers to work with them. +//! This module provides helpers to work with them. For examples of usage and a more in-depth +//! description of streams you can also refer to the [streams +//! tutorial](https://tokio.rs/tokio/tutorial/streams) on the tokio website. +//! +//! # Iterating over a Stream +//! +//! Due to similarities with the standard library's `Iterator` trait, some new +//! users may assume that they can use `for in` syntax to iterate over a +//! `Stream`, but this is unfortunately not possible. Instead, you can use a +//! `while let` loop as follows: +//! +//! ```rust +//! use tokio::stream::{self, StreamExt}; +//! +//! #[tokio::main] +//! async fn main() { +//! let mut stream = stream::iter(vec![0, 1, 2]); +//! +//! while let Some(value) = stream.next().await { +//! println!("Got {}", value); +//! } +//! } +//! ``` +//! +//! # Returning a Stream from a function +//! +//! A common way to stream values from a function is to pass in the sender +//! half of a channel and use the receiver as the stream. This requires awaiting +//! both futures to ensure progress is made. Another alternative is the +//! [async-stream] crate, which contains macros that provide a `yield` keyword +//! and allow you to return an `impl Stream`. +//! +//! [async-stream]: https://docs.rs/async-stream +//! +//! # Conversion to and from AsyncRead/AsyncWrite +//! +//! It is often desirable to convert a `Stream` into an [`AsyncRead`], +//! especially when dealing with plaintext formats streamed over the network. +//! The opposite conversion from an [`AsyncRead`] into a `Stream` is also +//! another commonly required feature. To enable these conversions, +//! [`tokio-util`] provides the [`StreamReader`] and [`ReaderStream`] +//! types when the io feature is enabled. +//! +//! [tokio-util]: https://docs.rs/tokio-util/0.3/tokio_util/codec/index.html +//! [`tokio::io`]: crate::io +//! [`AsyncRead`]: crate::io::AsyncRead +//! [`AsyncWrite`]: crate::io::AsyncWrite +//! [`ReaderStream`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.ReaderStream.html +//! [`StreamReader`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.StreamReader.html mod all; use all::AllFuture; @@ -71,9 +120,12 @@ use take_while::TakeWhile; cfg_time! { mod timeout; use timeout::Timeout; - use std::time::Duration; + use crate::time::Duration; + mod throttle; + use crate::stream::throttle::{throttle, Throttle}; } +#[doc(no_inline)] pub use futures_core::Stream; /// An extension trait for `Stream`s that provides a variety of convenient @@ -215,11 +267,11 @@ pub trait StreamExt: Stream { /// # /* /// #[tokio::main] /// # */ - /// # #[tokio::main(basic_scheduler)] + /// # #[tokio::main(flavor = "current_thread")] /// async fn main() { /// # time::pause(); - /// let (mut tx1, rx1) = mpsc::channel(10); - /// let (mut tx2, rx2) = mpsc::channel(10); + /// let (tx1, rx1) = mpsc::channel(10); + /// let (tx2, rx2) = mpsc::channel(10); /// /// let mut rx = rx1.merge(rx2); /// @@ -229,18 +281,18 @@ pub trait StreamExt: Stream { /// tx1.send(2).await.unwrap(); /// /// // Let the other task send values - /// time::delay_for(Duration::from_millis(20)).await; + /// time::sleep(Duration::from_millis(20)).await; /// /// tx1.send(4).await.unwrap(); /// }); /// /// tokio::spawn(async move { /// // Wait for the first task to send values - /// time::delay_for(Duration::from_millis(5)).await; + /// time::sleep(Duration::from_millis(5)).await; /// /// tx2.send(3).await.unwrap(); /// - /// time::delay_for(Duration::from_millis(25)).await; + /// time::sleep(Duration::from_millis(25)).await; /// /// // Send the final value /// tx2.send(5).await.unwrap(); @@ -520,6 +572,12 @@ pub trait StreamExt: Stream { /// Tests if every element of the stream matches a predicate. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn all<F>(&mut self, f: F) -> bool; + /// ``` + /// /// `all()` takes a closure that returns `true` or `false`. It applies /// this closure to each element of the stream, and if they all return /// `true`, then so does `all`. If any of them return `false`, it @@ -575,6 +633,12 @@ pub trait StreamExt: Stream { /// Tests if any element of the stream matches a predicate. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn any<F>(&mut self, f: F) -> bool; + /// ``` + /// /// `any()` takes a closure that returns `true` or `false`. It applies /// this closure to each element of the stream, and if any of them return /// `true`, then so does `any()`. If they all return `false`, it @@ -664,6 +728,12 @@ pub trait StreamExt: Stream { /// A combinator that applies a function to every element in a stream /// producing a single, final value. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn fold<B, F>(self, init: B, f: F) -> B; + /// ``` + /// /// # Examples /// Basic usage: /// ``` @@ -687,6 +757,12 @@ pub trait StreamExt: Stream { /// Drain stream pushing all emitted values into a collection. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn collect<T>(self) -> T; + /// ``` + /// /// `collect` streams all values, awaiting as needed. Values are pushed into /// a collection. A number of different target collection types are /// supported, including [`Vec`](std::vec::Vec), @@ -819,6 +895,33 @@ pub trait StreamExt: Stream { { Timeout::new(self, duration) } + + /// Slows down a stream by enforcing a delay between items. + /// + /// # Example + /// + /// Create a throttled stream. + /// ```rust,no_run + /// use std::time::Duration; + /// use tokio::stream::StreamExt; + /// + /// # async fn dox() { + /// let mut item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); + /// + /// loop { + /// // The string will be produced at most every 2 seconds + /// println!("{:?}", item_stream.next().await); + /// } + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn throttle(self, duration: Duration) -> Throttle<Self> + where + Self: Sized, + { + throttle(duration, self) + } } impl<St: ?Sized> StreamExt for St where St: Stream {} |