| use async_stream::stream; |
| |
| use futures_core::stream::{FusedStream, Stream}; |
| use futures_util::pin_mut; |
| use futures_util::stream::StreamExt; |
| use tokio::sync::mpsc; |
| use tokio_test::assert_ok; |
| |
| #[tokio::test] |
| async fn noop_stream() { |
| let s = stream! {}; |
| pin_mut!(s); |
| |
| while let Some(_) = s.next().await { |
| unreachable!(); |
| } |
| } |
| |
| #[tokio::test] |
| async fn empty_stream() { |
| let mut ran = false; |
| |
| { |
| let r = &mut ran; |
| let s = stream! { |
| *r = true; |
| println!("hello world!"); |
| }; |
| pin_mut!(s); |
| |
| while let Some(_) = s.next().await { |
| unreachable!(); |
| } |
| } |
| |
| assert!(ran); |
| } |
| |
| #[tokio::test] |
| async fn yield_single_value() { |
| let s = stream! { |
| yield "hello"; |
| }; |
| |
| let values: Vec<_> = s.collect().await; |
| |
| assert_eq!(1, values.len()); |
| assert_eq!("hello", values[0]); |
| } |
| |
| #[tokio::test] |
| async fn fused() { |
| let s = stream! { |
| yield "hello"; |
| }; |
| pin_mut!(s); |
| |
| assert!(!s.is_terminated()); |
| assert_eq!(s.next().await, Some("hello")); |
| assert_eq!(s.next().await, None); |
| |
| assert!(s.is_terminated()); |
| // This should return None from now on |
| assert_eq!(s.next().await, None); |
| } |
| |
| #[tokio::test] |
| async fn yield_multi_value() { |
| let s = stream! { |
| yield "hello"; |
| yield "world"; |
| yield "dizzy"; |
| }; |
| |
| let values: Vec<_> = s.collect().await; |
| |
| assert_eq!(3, values.len()); |
| assert_eq!("hello", values[0]); |
| assert_eq!("world", values[1]); |
| assert_eq!("dizzy", values[2]); |
| } |
| |
| #[tokio::test] |
| async fn unit_yield_in_select() { |
| use tokio::select; |
| |
| async fn do_stuff_async() {} |
| |
| let s = stream! { |
| select! { |
| _ = do_stuff_async() => yield, |
| else => yield, |
| } |
| }; |
| |
| let values: Vec<_> = s.collect().await; |
| assert_eq!(values.len(), 1); |
| } |
| |
| #[tokio::test] |
| async fn yield_with_select() { |
| use tokio::select; |
| |
| async fn do_stuff_async() {} |
| async fn more_async_work() {} |
| |
| let s = stream! { |
| select! { |
| _ = do_stuff_async() => yield "hey", |
| _ = more_async_work() => yield "hey", |
| else => yield "hey", |
| } |
| }; |
| |
| let values: Vec<_> = s.collect().await; |
| assert_eq!(values, vec!["hey"]); |
| } |
| |
| #[tokio::test] |
| async fn return_stream() { |
| fn build_stream() -> impl Stream<Item = u32> { |
| stream! { |
| yield 1; |
| yield 2; |
| yield 3; |
| } |
| } |
| |
| let s = build_stream(); |
| |
| let values: Vec<_> = s.collect().await; |
| assert_eq!(3, values.len()); |
| assert_eq!(1, values[0]); |
| assert_eq!(2, values[1]); |
| assert_eq!(3, values[2]); |
| } |
| |
| #[tokio::test] |
| async fn consume_channel() { |
| let (tx, mut rx) = mpsc::channel(10); |
| |
| let s = stream! { |
| while let Some(v) = rx.recv().await { |
| yield v; |
| } |
| }; |
| |
| pin_mut!(s); |
| |
| for i in 0..3 { |
| assert_ok!(tx.send(i).await); |
| assert_eq!(Some(i), s.next().await); |
| } |
| |
| drop(tx); |
| assert_eq!(None, s.next().await); |
| } |
| |
| #[tokio::test] |
| async fn borrow_self() { |
| struct Data(String); |
| |
| impl Data { |
| fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a { |
| stream! { |
| yield &self.0[..]; |
| } |
| } |
| } |
| |
| let data = Data("hello".to_string()); |
| let s = data.stream(); |
| pin_mut!(s); |
| |
| assert_eq!(Some("hello"), s.next().await); |
| } |
| |
| #[tokio::test] |
| async fn stream_in_stream() { |
| let s = stream! { |
| let s = stream! { |
| for i in 0..3 { |
| yield i; |
| } |
| }; |
| |
| pin_mut!(s); |
| while let Some(v) = s.next().await { |
| yield v; |
| } |
| }; |
| |
| let values: Vec<_> = s.collect().await; |
| assert_eq!(3, values.len()); |
| } |
| |
| #[tokio::test] |
| async fn yield_non_unpin_value() { |
| let s: Vec<_> = stream! { |
| for i in 0..3 { |
| yield async move { i }; |
| } |
| } |
| .buffered(1) |
| .collect() |
| .await; |
| |
| assert_eq!(s, vec![0, 1, 2]); |
| } |
| |
| #[test] |
| fn inner_try_stream() { |
| use async_stream::try_stream; |
| use tokio::select; |
| |
| async fn do_stuff_async() {} |
| |
| let _ = stream! { |
| select! { |
| _ = do_stuff_async() => { |
| let another_s = try_stream! { |
| yield; |
| }; |
| let _: Result<(), ()> = Box::pin(another_s).next().await.unwrap(); |
| }, |
| else => {}, |
| } |
| yield |
| }; |
| } |
| |
| #[test] |
| fn test() { |
| let t = trybuild::TestCases::new(); |
| t.compile_fail("tests/ui/*.rs"); |
| } |