| #![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] |
| |
| use tokio::time::{self, sleep, Duration}; |
| use tokio_stream::{self, StreamExt}; |
| use tokio_test::*; |
| |
| use futures::stream; |
| |
| async fn maybe_sleep(idx: i32) -> i32 { |
| if idx % 2 == 0 { |
| sleep(ms(200)).await; |
| } |
| idx |
| } |
| |
| fn ms(n: u64) -> Duration { |
| Duration::from_millis(n) |
| } |
| |
| #[tokio::test] |
| async fn basic_usage() { |
| time::pause(); |
| |
| // Items 2 and 4 time out. If we run the stream until it completes, |
| // we end up with the following items: |
| // |
| // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)] |
| |
| let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100)); |
| let mut stream = task::spawn(stream); |
| |
| // First item completes immediately |
| assert_ready_eq!(stream.poll_next(), Some(Ok(1))); |
| |
| // Second item is delayed 200ms, times out after 100ms |
| assert_pending!(stream.poll_next()); |
| |
| time::advance(ms(150)).await; |
| let v = assert_ready!(stream.poll_next()); |
| assert!(v.unwrap().is_err()); |
| |
| assert_pending!(stream.poll_next()); |
| |
| time::advance(ms(100)).await; |
| assert_ready_eq!(stream.poll_next(), Some(Ok(2))); |
| |
| // Third item is ready immediately |
| assert_ready_eq!(stream.poll_next(), Some(Ok(3))); |
| |
| // Fourth item is delayed 200ms, times out after 100ms |
| assert_pending!(stream.poll_next()); |
| |
| time::advance(ms(60)).await; |
| assert_pending!(stream.poll_next()); // nothing ready yet |
| |
| time::advance(ms(60)).await; |
| let v = assert_ready!(stream.poll_next()); |
| assert!(v.unwrap().is_err()); // timeout! |
| |
| time::advance(ms(120)).await; |
| assert_ready_eq!(stream.poll_next(), Some(Ok(4))); |
| |
| // Done. |
| assert_ready_eq!(stream.poll_next(), None); |
| } |
| |
| #[tokio::test] |
| async fn return_elapsed_errors_only_once() { |
| time::pause(); |
| |
| let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50)); |
| let mut stream = task::spawn(stream); |
| |
| // First item completes immediately |
| assert_ready_eq!(stream.poll_next(), Some(Ok(1))); |
| |
| // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed` |
| // error is returned. |
| assert_pending!(stream.poll_next()); |
| // |
| time::advance(ms(51)).await; |
| let v = assert_ready!(stream.poll_next()); |
| assert!(v.unwrap().is_err()); // timeout! |
| |
| // deadline elapses again, but no error is returned |
| time::advance(ms(50)).await; |
| assert_pending!(stream.poll_next()); |
| |
| time::advance(ms(100)).await; |
| assert_ready_eq!(stream.poll_next(), Some(Ok(2))); |
| assert_ready_eq!(stream.poll_next(), Some(Ok(3))); |
| |
| // Done |
| assert_ready_eq!(stream.poll_next(), None); |
| } |
| |
| #[tokio::test] |
| async fn no_timeouts() { |
| let stream = stream::iter(vec![1, 3, 5]) |
| .then(maybe_sleep) |
| .timeout(ms(100)); |
| |
| let mut stream = task::spawn(stream); |
| |
| assert_ready_eq!(stream.poll_next(), Some(Ok(1))); |
| assert_ready_eq!(stream.poll_next(), Some(Ok(3))); |
| assert_ready_eq!(stream.poll_next(), Some(Ok(5))); |
| assert_ready_eq!(stream.poll_next(), None); |
| } |