| #![warn( |
| missing_debug_implementations, |
| missing_docs, |
| rust_2018_idioms, |
| unreachable_pub |
| )] |
| #![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))] |
| |
| //! Asynchronous stream of elements. |
| //! |
| //! Provides two macros, `stream!` and `try_stream!`, allowing the caller to |
| //! define asynchronous streams of elements. These are implemented using `async` |
| //! & `await` notation. This crate works without unstable features. |
| //! |
| //! The `stream!` macro returns an anonymous type implementing the [`Stream`] |
| //! trait. The `Item` associated type is the type of the values yielded from the |
| //! stream. The `try_stream!` also returns an anonymous type implementing the |
| //! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The |
| //! `try_stream!` macro supports using `?` notiation as part of the |
| //! implementation. |
| //! |
| //! # Usage |
| //! |
| //! A basic stream yielding numbers. Values are yielded using the `yield` |
| //! keyword. The stream block must return `()`. |
| //! |
| //! ```rust |
| //! use async_stream::stream; |
| //! |
| //! use futures_util::pin_mut; |
| //! use futures_util::stream::StreamExt; |
| //! |
| //! #[tokio::main] |
| //! async fn main() { |
| //! let s = stream! { |
| //! for i in 0..3 { |
| //! yield i; |
| //! } |
| //! }; |
| //! |
| //! pin_mut!(s); // needed for iteration |
| //! |
| //! while let Some(value) = s.next().await { |
| //! println!("got {}", value); |
| //! } |
| //! } |
| //! ``` |
| //! |
| //! Streams may be returned by using `impl Stream<Item = T>`: |
| //! |
| //! ```rust |
| //! use async_stream::stream; |
| //! |
| //! use futures_core::stream::Stream; |
| //! use futures_util::pin_mut; |
| //! use futures_util::stream::StreamExt; |
| //! |
| //! fn zero_to_three() -> impl Stream<Item = u32> { |
| //! stream! { |
| //! for i in 0..3 { |
| //! yield i; |
| //! } |
| //! } |
| //! } |
| //! |
| //! #[tokio::main] |
| //! async fn main() { |
| //! let s = zero_to_three(); |
| //! pin_mut!(s); // needed for iteration |
| //! |
| //! while let Some(value) = s.next().await { |
| //! println!("got {}", value); |
| //! } |
| //! } |
| //! ``` |
| //! |
| //! Streams may be implemented in terms of other streams - `async-stream` provides `for await` |
| //! syntax to assist with this: |
| //! |
| //! ```rust |
| //! use async_stream::stream; |
| //! |
| //! use futures_core::stream::Stream; |
| //! use futures_util::pin_mut; |
| //! use futures_util::stream::StreamExt; |
| //! |
| //! fn zero_to_three() -> impl Stream<Item = u32> { |
| //! stream! { |
| //! for i in 0..3 { |
| //! yield i; |
| //! } |
| //! } |
| //! } |
| //! |
| //! fn double<S: Stream<Item = u32>>(input: S) |
| //! -> impl Stream<Item = u32> |
| //! { |
| //! stream! { |
| //! for await value in input { |
| //! yield value * 2; |
| //! } |
| //! } |
| //! } |
| //! |
| //! #[tokio::main] |
| //! async fn main() { |
| //! let s = double(zero_to_three()); |
| //! pin_mut!(s); // needed for iteration |
| //! |
| //! while let Some(value) = s.next().await { |
| //! println!("got {}", value); |
| //! } |
| //! } |
| //! ``` |
| //! |
| //! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` |
| //! of the returned stream is `Result` with `Ok` being the value yielded and |
| //! `Err` the error type returned by `?`. |
| //! |
| //! ```rust |
| //! use tokio::net::{TcpListener, TcpStream}; |
| //! |
| //! use async_stream::try_stream; |
| //! use futures_core::stream::Stream; |
| //! |
| //! use std::io; |
| //! use std::net::SocketAddr; |
| //! |
| //! fn bind_and_accept(addr: SocketAddr) |
| //! -> impl Stream<Item = io::Result<TcpStream>> |
| //! { |
| //! try_stream! { |
| //! let mut listener = TcpListener::bind(addr).await?; |
| //! |
| //! loop { |
| //! let (stream, addr) = listener.accept().await?; |
| //! println!("received on {:?}", addr); |
| //! yield stream; |
| //! } |
| //! } |
| //! } |
| //! ``` |
| //! |
| //! # Implementation |
| //! |
| //! The `stream!` and `try_stream!` macros are implemented using proc macros. |
| //! The macro searches the syntax tree for instances of `sender.send($expr)` and |
| //! transforms them into `sender.send($expr).await`. |
| //! |
| //! The stream uses a lightweight sender to send values from the stream |
| //! implementation to the caller. When entering the stream, an `Option<T>` is |
| //! stored on the stack. A pointer to the cell is stored in a thread local and |
| //! `poll` is called on the async block. When `poll` returns. |
| //! `sender.send(value)` stores the value that cell and yields back to the |
| //! caller. |
| //! |
| //! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html |
| |
| mod async_stream; |
| mod next; |
| #[doc(hidden)] |
| pub mod yielder; |
| |
| // Used by the macro, but not intended to be accessed publicly. |
| #[doc(hidden)] |
| pub use crate::async_stream::AsyncStream; |
| |
| #[doc(hidden)] |
| pub use async_stream_impl; |
| |
| /// Asynchronous stream |
| /// |
| /// See [crate](index.html) documentation for more details. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use async_stream::stream; |
| /// |
| /// use futures_util::pin_mut; |
| /// use futures_util::stream::StreamExt; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let s = stream! { |
| /// for i in 0..3 { |
| /// yield i; |
| /// } |
| /// }; |
| /// |
| /// pin_mut!(s); // needed for iteration |
| /// |
| /// while let Some(value) = s.next().await { |
| /// println!("got {}", value); |
| /// } |
| /// } |
| /// ``` |
| #[macro_export] |
| macro_rules! stream { |
| ($($tt:tt)*) => { |
| $crate::async_stream_impl::stream_inner!(($crate) $($tt)*) |
| } |
| } |
| |
| /// Asynchronous fallible stream |
| /// |
| /// See [crate](index.html) documentation for more details. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use tokio::net::{TcpListener, TcpStream}; |
| /// |
| /// use async_stream::try_stream; |
| /// use futures_core::stream::Stream; |
| /// |
| /// use std::io; |
| /// use std::net::SocketAddr; |
| /// |
| /// fn bind_and_accept(addr: SocketAddr) |
| /// -> impl Stream<Item = io::Result<TcpStream>> |
| /// { |
| /// try_stream! { |
| /// let mut listener = TcpListener::bind(addr).await?; |
| /// |
| /// loop { |
| /// let (stream, addr) = listener.accept().await?; |
| /// println!("received on {:?}", addr); |
| /// yield stream; |
| /// } |
| /// } |
| /// } |
| /// ``` |
| #[macro_export] |
| macro_rules! try_stream { |
| ($($tt:tt)*) => { |
| $crate::async_stream_impl::try_stream_inner!(($crate) $($tt)*) |
| } |
| } |
| |
| #[doc(hidden)] |
| pub mod reexport { |
| #[doc(hidden)] |
| pub use crate::next::next; |
| } |