| use futures_core::Stream; |
| |
| mod all; |
| use all::AllFuture; |
| |
| mod any; |
| use any::AnyFuture; |
| |
| mod chain; |
| use chain::Chain; |
| |
| pub(crate) mod collect; |
| use collect::{Collect, FromStream}; |
| |
| mod filter; |
| use filter::Filter; |
| |
| mod filter_map; |
| use filter_map::FilterMap; |
| |
| mod fold; |
| use fold::FoldFuture; |
| |
| mod fuse; |
| use fuse::Fuse; |
| |
| mod map; |
| use map::Map; |
| |
| mod merge; |
| use merge::Merge; |
| |
| mod next; |
| use next::Next; |
| |
| mod skip; |
| use skip::Skip; |
| |
| mod skip_while; |
| use skip_while::SkipWhile; |
| |
| mod try_next; |
| use try_next::TryNext; |
| |
| mod take; |
| use take::Take; |
| |
| mod take_while; |
| use take_while::TakeWhile; |
| |
| cfg_time! { |
| mod timeout; |
| use timeout::Timeout; |
| use tokio::time::Duration; |
| mod throttle; |
| use throttle::{throttle, Throttle}; |
| } |
| |
| /// An extension trait for the [`Stream`] trait that provides a variety of |
| /// convenient combinator functions. |
| /// |
| /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found |
| /// in the [futures] crate, however both Tokio and futures provide separate |
| /// `StreamExt` utility traits, and some utilities are only available on one of |
| /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt` |
| /// trait in the futures crate. |
| /// |
| /// If you need utilities from both `StreamExt` traits, you should prefer to |
| /// import one of them, and use the other through the fully qualified call |
| /// syntax. For example: |
| /// ``` |
| /// // import one of the traits: |
| /// use futures::stream::StreamExt; |
| /// # #[tokio::main(flavor = "current_thread")] |
| /// # async fn main() { |
| /// |
| /// let a = tokio_stream::iter(vec![1, 3, 5]); |
| /// let b = tokio_stream::iter(vec![2, 4, 6]); |
| /// |
| /// // use the fully qualified call syntax for the other trait: |
| /// let merged = tokio_stream::StreamExt::merge(a, b); |
| /// |
| /// // use normal call notation for futures::stream::StreamExt::collect |
| /// let output: Vec<_> = merged.collect().await; |
| /// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]); |
| /// # } |
| /// ``` |
| /// |
| /// [`Stream`]: crate::Stream |
| /// [futures]: https://docs.rs/futures |
| /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html |
| pub trait StreamExt: Stream { |
| /// Consumes and returns the next value in the stream or `None` if the |
| /// stream is finished. |
| /// |
| /// Equivalent to: |
| /// |
| /// ```ignore |
| /// async fn next(&mut self) -> Option<Self::Item>; |
| /// ``` |
| /// |
| /// Note that because `next` doesn't take ownership over the stream, |
| /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a |
| /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can |
| /// be done by boxing the stream using [`Box::pin`] or |
| /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` |
| /// crate. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let mut stream = stream::iter(1..=3); |
| /// |
| /// assert_eq!(stream.next().await, Some(1)); |
| /// assert_eq!(stream.next().await, Some(2)); |
| /// assert_eq!(stream.next().await, Some(3)); |
| /// assert_eq!(stream.next().await, None); |
| /// # } |
| /// ``` |
| fn next(&mut self) -> Next<'_, Self> |
| where |
| Self: Unpin, |
| { |
| Next::new(self) |
| } |
| |
| /// Consumes and returns the next item in the stream. If an error is |
| /// encountered before the next item, the error is returned instead. |
| /// |
| /// Equivalent to: |
| /// |
| /// ```ignore |
| /// async fn try_next(&mut self) -> Result<Option<T>, E>; |
| /// ``` |
| /// |
| /// This is similar to the [`next`](StreamExt::next) combinator, |
| /// but returns a [`Result<Option<T>, E>`](Result) rather than |
| /// an [`Option<Result<T, E>>`](Option), making for easy use |
| /// with the [`?`](std::ops::Try) operator. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); |
| /// |
| /// assert_eq!(stream.try_next().await, Ok(Some(1))); |
| /// assert_eq!(stream.try_next().await, Ok(Some(2))); |
| /// assert_eq!(stream.try_next().await, Err("nope")); |
| /// # } |
| /// ``` |
| fn try_next<T, E>(&mut self) -> TryNext<'_, Self> |
| where |
| Self: Stream<Item = Result<T, E>> + Unpin, |
| { |
| TryNext::new(self) |
| } |
| |
| /// Maps this stream's items to a different type, returning a new stream of |
| /// the resulting type. |
| /// |
| /// The provided closure is executed over all elements of this stream as |
| /// they are made available. It is executed inline with calls to |
| /// [`poll_next`](Stream::poll_next). |
| /// |
| /// Note that this function consumes the stream passed into it and returns a |
| /// wrapped version of it, similar to the existing `map` methods in the |
| /// standard library. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let stream = stream::iter(1..=3); |
| /// let mut stream = stream.map(|x| x + 3); |
| /// |
| /// assert_eq!(stream.next().await, Some(4)); |
| /// assert_eq!(stream.next().await, Some(5)); |
| /// assert_eq!(stream.next().await, Some(6)); |
| /// # } |
| /// ``` |
| fn map<T, F>(self, f: F) -> Map<Self, F> |
| where |
| F: FnMut(Self::Item) -> T, |
| Self: Sized, |
| { |
| Map::new(self, f) |
| } |
| |
| /// Combine two streams into one by interleaving the output of both as it |
| /// is produced. |
| /// |
| /// Values are produced from the merged stream in the order they arrive from |
| /// the two source streams. If both source streams provide values |
| /// simultaneously, the merge stream alternates between them. This provides |
| /// some level of fairness. You should not chain calls to `merge`, as this |
| /// will break the fairness of the merging. |
| /// |
| /// The merged stream completes once **both** source streams complete. When |
| /// one source stream completes before the other, the merge stream |
| /// exclusively polls the remaining stream. |
| /// |
| /// For merging multiple streams, consider using [`StreamMap`] instead. |
| /// |
| /// [`StreamMap`]: crate::StreamMap |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use tokio_stream::{StreamExt, Stream}; |
| /// use tokio::sync::mpsc; |
| /// use tokio::time; |
| /// |
| /// use std::time::Duration; |
| /// use std::pin::Pin; |
| /// |
| /// # /* |
| /// #[tokio::main] |
| /// # */ |
| /// # #[tokio::main(flavor = "current_thread")] |
| /// async fn main() { |
| /// # time::pause(); |
| /// let (tx1, mut rx1) = mpsc::channel::<usize>(10); |
| /// let (tx2, mut rx2) = mpsc::channel::<usize>(10); |
| /// |
| /// // Convert the channels to a `Stream`. |
| /// let rx1 = Box::pin(async_stream::stream! { |
| /// while let Some(item) = rx1.recv().await { |
| /// yield item; |
| /// } |
| /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; |
| /// |
| /// let rx2 = Box::pin(async_stream::stream! { |
| /// while let Some(item) = rx2.recv().await { |
| /// yield item; |
| /// } |
| /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; |
| /// |
| /// let mut rx = rx1.merge(rx2); |
| /// |
| /// tokio::spawn(async move { |
| /// // Send some values immediately |
| /// tx1.send(1).await.unwrap(); |
| /// tx1.send(2).await.unwrap(); |
| /// |
| /// // Let the other task send values |
| /// time::sleep(Duration::from_millis(20)).await; |
| /// |
| /// tx1.send(4).await.unwrap(); |
| /// }); |
| /// |
| /// tokio::spawn(async move { |
| /// // Wait for the first task to send values |
| /// time::sleep(Duration::from_millis(5)).await; |
| /// |
| /// tx2.send(3).await.unwrap(); |
| /// |
| /// time::sleep(Duration::from_millis(25)).await; |
| /// |
| /// // Send the final value |
| /// tx2.send(5).await.unwrap(); |
| /// }); |
| /// |
| /// assert_eq!(1, rx.next().await.unwrap()); |
| /// assert_eq!(2, rx.next().await.unwrap()); |
| /// assert_eq!(3, rx.next().await.unwrap()); |
| /// assert_eq!(4, rx.next().await.unwrap()); |
| /// assert_eq!(5, rx.next().await.unwrap()); |
| /// |
| /// // The merged stream is consumed |
| /// assert!(rx.next().await.is_none()); |
| /// } |
| /// ``` |
| fn merge<U>(self, other: U) -> Merge<Self, U> |
| where |
| U: Stream<Item = Self::Item>, |
| Self: Sized, |
| { |
| Merge::new(self, other) |
| } |
| |
| /// Filters the values produced by this stream according to the provided |
| /// predicate. |
| /// |
| /// As values of this stream are made available, the provided predicate `f` |
| /// will be run against them. If the predicate |
| /// resolves to `true`, then the stream will yield the value, but if the |
| /// predicate resolves to `false`, then the value |
| /// will be discarded and the next value will be produced. |
| /// |
| /// Note that this function consumes the stream passed into it and returns a |
| /// wrapped version of it, similar to [`Iterator::filter`] method in the |
| /// standard library. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let stream = stream::iter(1..=8); |
| /// let mut evens = stream.filter(|x| x % 2 == 0); |
| /// |
| /// assert_eq!(Some(2), evens.next().await); |
| /// assert_eq!(Some(4), evens.next().await); |
| /// assert_eq!(Some(6), evens.next().await); |
| /// assert_eq!(Some(8), evens.next().await); |
| /// assert_eq!(None, evens.next().await); |
| /// # } |
| /// ``` |
| fn filter<F>(self, f: F) -> Filter<Self, F> |
| where |
| F: FnMut(&Self::Item) -> bool, |
| Self: Sized, |
| { |
| Filter::new(self, f) |
| } |
| |
| /// Filters the values produced by this stream while simultaneously mapping |
| /// them to a different type according to the provided closure. |
| /// |
| /// As values of this stream are made available, the provided function will |
| /// be run on them. If the predicate `f` resolves to |
| /// [`Some(item)`](Some) then the stream will yield the value `item`, but if |
| /// it resolves to [`None`], then the value will be skipped. |
| /// |
| /// Note that this function consumes the stream passed into it and returns a |
| /// wrapped version of it, similar to [`Iterator::filter_map`] method in the |
| /// standard library. |
| /// |
| /// # Examples |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let stream = stream::iter(1..=8); |
| /// let mut evens = stream.filter_map(|x| { |
| /// if x % 2 == 0 { Some(x + 1) } else { None } |
| /// }); |
| /// |
| /// assert_eq!(Some(3), evens.next().await); |
| /// assert_eq!(Some(5), evens.next().await); |
| /// assert_eq!(Some(7), evens.next().await); |
| /// assert_eq!(Some(9), evens.next().await); |
| /// assert_eq!(None, evens.next().await); |
| /// # } |
| /// ``` |
| fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> |
| where |
| F: FnMut(Self::Item) -> Option<T>, |
| Self: Sized, |
| { |
| FilterMap::new(self, f) |
| } |
| |
| /// Creates a stream which ends after the first `None`. |
| /// |
| /// After a stream returns `None`, behavior is undefined. Future calls to |
| /// `poll_next` may or may not return `Some(T)` again or they may panic. |
| /// `fuse()` adapts a stream, ensuring that after `None` is given, it will |
| /// return `None` forever. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use tokio_stream::{Stream, StreamExt}; |
| /// |
| /// use std::pin::Pin; |
| /// use std::task::{Context, Poll}; |
| /// |
| /// // a stream which alternates between Some and None |
| /// struct Alternate { |
| /// state: i32, |
| /// } |
| /// |
| /// impl Stream for Alternate { |
| /// type Item = i32; |
| /// |
| /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> { |
| /// let val = self.state; |
| /// self.state = self.state + 1; |
| /// |
| /// // if it's even, Some(i32), else None |
| /// if val % 2 == 0 { |
| /// Poll::Ready(Some(val)) |
| /// } else { |
| /// Poll::Ready(None) |
| /// } |
| /// } |
| /// } |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let mut stream = Alternate { state: 0 }; |
| /// |
| /// // the stream goes back and forth |
| /// assert_eq!(stream.next().await, Some(0)); |
| /// assert_eq!(stream.next().await, None); |
| /// assert_eq!(stream.next().await, Some(2)); |
| /// assert_eq!(stream.next().await, None); |
| /// |
| /// // however, once it is fused |
| /// let mut stream = stream.fuse(); |
| /// |
| /// assert_eq!(stream.next().await, Some(4)); |
| /// assert_eq!(stream.next().await, None); |
| /// |
| /// // it will always return `None` after the first time. |
| /// assert_eq!(stream.next().await, None); |
| /// assert_eq!(stream.next().await, None); |
| /// assert_eq!(stream.next().await, None); |
| /// } |
| /// ``` |
| fn fuse(self) -> Fuse<Self> |
| where |
| Self: Sized, |
| { |
| Fuse::new(self) |
| } |
| |
| /// Creates a new stream of at most `n` items of the underlying stream. |
| /// |
| /// Once `n` items have been yielded from this stream then it will always |
| /// return that the stream is done. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let mut stream = stream::iter(1..=10).take(3); |
| /// |
| /// assert_eq!(Some(1), stream.next().await); |
| /// assert_eq!(Some(2), stream.next().await); |
| /// assert_eq!(Some(3), stream.next().await); |
| /// assert_eq!(None, stream.next().await); |
| /// # } |
| /// ``` |
| fn take(self, n: usize) -> Take<Self> |
| where |
| Self: Sized, |
| { |
| Take::new(self, n) |
| } |
| |
| /// Take elements from this stream while the provided predicate |
| /// resolves to `true`. |
| /// |
| /// This function, like `Iterator::take_while`, will take elements from the |
| /// stream until the predicate `f` resolves to `false`. Once one element |
| /// returns false it will always return that the stream is done. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); |
| /// |
| /// assert_eq!(Some(1), stream.next().await); |
| /// assert_eq!(Some(2), stream.next().await); |
| /// assert_eq!(Some(3), stream.next().await); |
| /// assert_eq!(None, stream.next().await); |
| /// # } |
| /// ``` |
| fn take_while<F>(self, f: F) -> TakeWhile<Self, F> |
| where |
| F: FnMut(&Self::Item) -> bool, |
| Self: Sized, |
| { |
| TakeWhile::new(self, f) |
| } |
| |
| /// Creates a new stream that will skip the `n` first items of the |
| /// underlying stream. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let mut stream = stream::iter(1..=10).skip(7); |
| /// |
| /// assert_eq!(Some(8), stream.next().await); |
| /// assert_eq!(Some(9), stream.next().await); |
| /// assert_eq!(Some(10), stream.next().await); |
| /// assert_eq!(None, stream.next().await); |
| /// # } |
| /// ``` |
| fn skip(self, n: usize) -> Skip<Self> |
| where |
| Self: Sized, |
| { |
| Skip::new(self, n) |
| } |
| |
| /// Skip elements from the underlying stream while the provided predicate |
| /// resolves to `true`. |
| /// |
| /// This function, like [`Iterator::skip_while`], will ignore elements from the |
| /// stream until the predicate `f` resolves to `false`. Once one element |
| /// returns false, the rest of the elements will be yielded. |
| /// |
| /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); |
| /// |
| /// assert_eq!(Some(3), stream.next().await); |
| /// assert_eq!(Some(4), stream.next().await); |
| /// assert_eq!(Some(1), stream.next().await); |
| /// assert_eq!(None, stream.next().await); |
| /// # } |
| /// ``` |
| fn skip_while<F>(self, f: F) -> SkipWhile<Self, F> |
| where |
| F: FnMut(&Self::Item) -> bool, |
| Self: Sized, |
| { |
| SkipWhile::new(self, f) |
| } |
| |
| /// Tests if every element of the stream matches a predicate. |
| /// |
| /// Equivalent to: |
| /// |
| /// ```ignore |
| /// async fn all<F>(&mut self, f: F) -> bool; |
| /// ``` |
| /// |
| /// `all()` takes a closure that returns `true` or `false`. It applies |
| /// this closure to each element of the stream, and if they all return |
| /// `true`, then so does `all`. If any of them return `false`, it |
| /// returns `false`. An empty stream returns `true`. |
| /// |
| /// `all()` is short-circuiting; in other words, it will stop processing |
| /// as soon as it finds a `false`, given that no matter what else happens, |
| /// the result will also be `false`. |
| /// |
| /// An empty stream returns `true`. |
| /// |
| /// # Examples |
| /// |
| /// Basic usage: |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let a = [1, 2, 3]; |
| /// |
| /// assert!(stream::iter(&a).all(|&x| x > 0).await); |
| /// |
| /// assert!(!stream::iter(&a).all(|&x| x > 2).await); |
| /// # } |
| /// ``` |
| /// |
| /// Stopping at the first `false`: |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let a = [1, 2, 3]; |
| /// |
| /// let mut iter = stream::iter(&a); |
| /// |
| /// assert!(!iter.all(|&x| x != 2).await); |
| /// |
| /// // we can still use `iter`, as there are more elements. |
| /// assert_eq!(iter.next().await, Some(&3)); |
| /// # } |
| /// ``` |
| fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> |
| where |
| Self: Unpin, |
| F: FnMut(Self::Item) -> bool, |
| { |
| AllFuture::new(self, f) |
| } |
| |
| /// Tests if any element of the stream matches a predicate. |
| /// |
| /// Equivalent to: |
| /// |
| /// ```ignore |
| /// async fn any<F>(&mut self, f: F) -> bool; |
| /// ``` |
| /// |
| /// `any()` takes a closure that returns `true` or `false`. It applies |
| /// this closure to each element of the stream, and if any of them return |
| /// `true`, then so does `any()`. If they all return `false`, it |
| /// returns `false`. |
| /// |
| /// `any()` is short-circuiting; in other words, it will stop processing |
| /// as soon as it finds a `true`, given that no matter what else happens, |
| /// the result will also be `true`. |
| /// |
| /// An empty stream returns `false`. |
| /// |
| /// Basic usage: |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let a = [1, 2, 3]; |
| /// |
| /// assert!(stream::iter(&a).any(|&x| x > 0).await); |
| /// |
| /// assert!(!stream::iter(&a).any(|&x| x > 5).await); |
| /// # } |
| /// ``` |
| /// |
| /// Stopping at the first `true`: |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// let a = [1, 2, 3]; |
| /// |
| /// let mut iter = stream::iter(&a); |
| /// |
| /// assert!(iter.any(|&x| x != 2).await); |
| /// |
| /// // we can still use `iter`, as there are more elements. |
| /// assert_eq!(iter.next().await, Some(&2)); |
| /// # } |
| /// ``` |
| fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> |
| where |
| Self: Unpin, |
| F: FnMut(Self::Item) -> bool, |
| { |
| AnyFuture::new(self, f) |
| } |
| |
| /// Combine two streams into one by first returning all values from the |
| /// first stream then all values from the second stream. |
| /// |
| /// As long as `self` still has values to emit, no values from `other` are |
| /// emitted, even if some are ready. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let one = stream::iter(vec![1, 2, 3]); |
| /// let two = stream::iter(vec![4, 5, 6]); |
| /// |
| /// let mut stream = one.chain(two); |
| /// |
| /// assert_eq!(stream.next().await, Some(1)); |
| /// assert_eq!(stream.next().await, Some(2)); |
| /// assert_eq!(stream.next().await, Some(3)); |
| /// assert_eq!(stream.next().await, Some(4)); |
| /// assert_eq!(stream.next().await, Some(5)); |
| /// assert_eq!(stream.next().await, Some(6)); |
| /// assert_eq!(stream.next().await, None); |
| /// } |
| /// ``` |
| fn chain<U>(self, other: U) -> Chain<Self, U> |
| where |
| U: Stream<Item = Self::Item>, |
| Self: Sized, |
| { |
| Chain::new(self, other) |
| } |
| |
| /// A combinator that applies a function to every element in a stream |
| /// producing a single, final value. |
| /// |
| /// Equivalent to: |
| /// |
| /// ```ignore |
| /// async fn fold<B, F>(self, init: B, f: F) -> B; |
| /// ``` |
| /// |
| /// # Examples |
| /// Basic usage: |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, *}; |
| /// |
| /// let s = stream::iter(vec![1u8, 2, 3]); |
| /// let sum = s.fold(0, |acc, x| acc + x).await; |
| /// |
| /// assert_eq!(sum, 6); |
| /// # } |
| /// ``` |
| fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> |
| where |
| Self: Sized, |
| F: FnMut(B, Self::Item) -> B, |
| { |
| FoldFuture::new(self, init, f) |
| } |
| |
| /// Drain stream pushing all emitted values into a collection. |
| /// |
| /// Equivalent to: |
| /// |
| /// ```ignore |
| /// async fn collect<T>(self) -> T; |
| /// ``` |
| /// |
| /// `collect` streams all values, awaiting as needed. Values are pushed into |
| /// a collection. A number of different target collection types are |
| /// supported, including [`Vec`](std::vec::Vec), |
| /// [`String`](std::string::String), and [`Bytes`]. |
| /// |
| /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html |
| /// |
| /// # `Result` |
| /// |
| /// `collect()` can also be used with streams of type `Result<T, E>` where |
| /// `T: FromStream<_>`. In this case, `collect()` will stream as long as |
| /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered, |
| /// streaming is terminated and `collect()` returns the `Err`. |
| /// |
| /// # Notes |
| /// |
| /// `FromStream` is currently a sealed trait. Stabilization is pending |
| /// enhancements to the Rust language. |
| /// |
| /// # Examples |
| /// |
| /// Basic usage: |
| /// |
| /// ``` |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let doubled: Vec<i32> = |
| /// stream::iter(vec![1, 2, 3]) |
| /// .map(|x| x * 2) |
| /// .collect() |
| /// .await; |
| /// |
| /// assert_eq!(vec![2, 4, 6], doubled); |
| /// } |
| /// ``` |
| /// |
| /// Collecting a stream of `Result` values |
| /// |
| /// ``` |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// // A stream containing only `Ok` values will be collected |
| /// let values: Result<Vec<i32>, &str> = |
| /// stream::iter(vec![Ok(1), Ok(2), Ok(3)]) |
| /// .collect() |
| /// .await; |
| /// |
| /// assert_eq!(Ok(vec![1, 2, 3]), values); |
| /// |
| /// // A stream containing `Err` values will return the first error. |
| /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")]; |
| /// |
| /// let values: Result<Vec<i32>, &str> = |
| /// stream::iter(results) |
| /// .collect() |
| /// .await; |
| /// |
| /// assert_eq!(Err("no"), values); |
| /// } |
| /// ``` |
| fn collect<T>(self) -> Collect<Self, T> |
| where |
| T: FromStream<Self::Item>, |
| Self: Sized, |
| { |
| Collect::new(self) |
| } |
| |
| /// Applies a per-item timeout to the passed stream. |
| /// |
| /// `timeout()` takes a `Duration` that represents the maximum amount of |
| /// time each element of the stream has to complete before timing out. |
| /// |
| /// If the wrapped stream yields a value before the deadline is reached, the |
| /// value is returned. Otherwise, an error is returned. The caller may decide |
| /// to continue consuming the stream and will eventually get the next source |
| /// stream value once it becomes available. |
| /// |
| /// # Notes |
| /// |
| /// This function consumes the stream passed into it and returns a |
| /// wrapped version of it. |
| /// |
| /// Polling the returned stream will continue to poll the inner stream even |
| /// if one or more items time out. |
| /// |
| /// # Examples |
| /// |
| /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): |
| /// |
| /// ``` |
| /// # #[tokio::main] |
| /// # async fn main() { |
| /// use tokio_stream::{self as stream, StreamExt}; |
| /// use std::time::Duration; |
| /// # let int_stream = stream::iter(1..=3); |
| /// |
| /// let int_stream = int_stream.timeout(Duration::from_secs(1)); |
| /// tokio::pin!(int_stream); |
| /// |
| /// // When no items time out, we get the 3 elements in succession: |
| /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); |
| /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); |
| /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); |
| /// assert_eq!(int_stream.try_next().await, Ok(None)); |
| /// |
| /// // If the second item times out, we get an error and continue polling the stream: |
| /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); |
| /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); |
| /// assert!(int_stream.try_next().await.is_err()); |
| /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); |
| /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); |
| /// assert_eq!(int_stream.try_next().await, Ok(None)); |
| /// |
| /// // If we want to stop consuming the source stream the first time an |
| /// // element times out, we can use the `take_while` operator: |
| /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); |
| /// let mut int_stream = int_stream.take_while(Result::is_ok); |
| /// |
| /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); |
| /// assert_eq!(int_stream.try_next().await, Ok(None)); |
| /// # } |
| /// ``` |
| #[cfg(all(feature = "time"))] |
| #[cfg_attr(docsrs, doc(cfg(feature = "time")))] |
| fn timeout(self, duration: Duration) -> Timeout<Self> |
| where |
| Self: Sized, |
| { |
| Timeout::new(self, duration) |
| } |
| |
| /// Slows down a stream by enforcing a delay between items. |
| /// |
| /// # Example |
| /// |
| /// Create a throttled stream. |
| /// ```rust,no_run |
| /// use std::time::Duration; |
| /// use tokio_stream::StreamExt; |
| /// |
| /// # async fn dox() { |
| /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); |
| /// tokio::pin!(item_stream); |
| /// |
| /// loop { |
| /// // The string will be produced at most every 2 seconds |
| /// println!("{:?}", item_stream.next().await); |
| /// } |
| /// # } |
| /// ``` |
| #[cfg(all(feature = "time"))] |
| #[cfg_attr(docsrs, doc(cfg(feature = "time")))] |
| fn throttle(self, duration: Duration) -> Throttle<Self> |
| where |
| Self: Sized, |
| { |
| throttle(duration, self) |
| } |
| } |
| |
| impl<St: ?Sized> StreamExt for St where St: Stream {} |
| |
| /// Merge the size hints from two streams. |
| fn merge_size_hints( |
| (left_low, left_high): (usize, Option<usize>), |
| (right_low, right_hign): (usize, Option<usize>), |
| ) -> (usize, Option<usize>) { |
| let low = left_low.saturating_add(right_low); |
| let high = match (left_high, right_hign) { |
| (Some(h1), Some(h2)) => h1.checked_add(h2), |
| _ => None, |
| }; |
| (low, high) |
| } |