# Asynchronous streams for Rust Asynchronous stream of elements. Provides two macros, `stream!` and `try_stream!`, allowing the caller to define asynchronous streams of elements. These are implemented using `async` & `await` notation. The `stream!` macro works without unstable features. The `stream!` macro returns an anonymous type implementing the [`Stream`] trait. The `Item` associated type is the type of the values yielded from the stream. The `try_stream!` also returns an anonymous type implementing the [`Stream`] trait, but the `Item` associated type is `Result`. The `try_stream!` macro supports using `?` notiation as part of the implementation. ## Usage A basic stream yielding numbers. Values are yielded using the `yield` keyword. The stream block must return `()`. ```rust use async_stream::stream; use futures_util::pin_mut; use futures_util::stream::StreamExt; #[tokio::main] async fn main() { let s = stream! { for i in 0..3 { yield i; } }; pin_mut!(s); // needed for iteration while let Some(value) = s.next().await { println!("got {}", value); } } ``` Streams may be returned by using `impl Stream`: ```rust use async_stream::stream; use futures_core::stream::Stream; use futures_util::pin_mut; use futures_util::stream::StreamExt; fn zero_to_three() -> impl Stream { stream! { for i in 0..3 { yield i; } } } #[tokio::main] async fn main() { let s = zero_to_three(); pin_mut!(s); // needed for iteration while let Some(value) = s.next().await { println!("got {}", value); } } ``` Streams may be implemented in terms of other streams: ```rust use async_stream::stream; use futures_core::stream::Stream; use futures_util::pin_mut; use futures_util::stream::StreamExt; fn zero_to_three() -> impl Stream { stream! { for i in 0..3 { yield i; } } } fn double>(input: S) -> impl Stream { stream! { pin_mut!(input); while let Some(value) = input.next().await { yield value * 2; } } } #[tokio::main] async fn main() { let s = double(zero_to_three()); pin_mut!(s); // needed for iteration while let Some(value) = s.next().await { println!("got {}", value); } } ``` Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` of the returned stream is `Result` with `Ok` being the value yielded and `Err` the error type returned by `?`. ```rust use tokio::net::{TcpListener, TcpStream}; use async_stream::try_stream; use futures_core::stream::Stream; use std::io; use std::net::SocketAddr; fn bind_and_accept(addr: SocketAddr) -> impl Stream> { try_stream! { let mut listener = TcpListener::bind(&addr)?; loop { let (stream, addr) = listener.accept().await?; println!("received on {:?}", addr); yield stream; } } } ``` ## Implementation The `stream!` and `try_stream!` macros are implemented using proc macros. Given that proc macros in expression position are not supported on stable rust, a hack similar to the one provided by the [`proc-macro-hack`] crate is used. The macro searches the syntax tree for instances of `sender.send($expr)` and transforms them into `sender.send($expr).await`. The stream uses a lightweight sender to send values from the stream implementation to the caller. When entering the stream, an `Option` is stored on the stack. A pointer to the cell is stored in a thread local and `poll` is called on the async block. When `poll` returns. `sender.send(value)` stores the value that cell and yields back to the caller. ## Limitations `async-stream` suffers from the same limitations as the [`proc-macro-hack`] crate. Primarily, nesting support must be implemented using a `TT-muncher`. If large `stream!` blocks are used, the caller will be required to add `#![recursion_limit = "..."]` to their crate. A `stream!` macro may only contain up to 64 macro invocations. [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html [`proc-macro-hack`]: https://github.com/dtolnay/proc-macro-hack/ ## License This project is licensed under the [MIT license](LICENSE). ### Contribution Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in `async-stream` by you, shall be licensed as MIT, without any additional terms or conditions.