| #![warn(rust_2018_idioms)] |
| #![cfg(feature = "full")] |
| |
| use std::pin::Pin; |
| use std::task::{Context, Poll}; |
| |
| use futures::{Stream, StreamExt}; |
| use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior}; |
| use tokio_test::{assert_pending, assert_ready, assert_ready_eq, task}; |
| |
| // Takes the `Interval` task, `start` variable, and optional time deltas |
| // For each time delta, it polls the `Interval` and asserts that the result is |
| // equal to `start` + the specific time delta. Then it asserts that the |
| // `Interval` is pending. |
| macro_rules! check_interval_poll { |
| ($i:ident, $start:ident, $($delta:expr),*$(,)?) => { |
| $( |
| assert_ready_eq!(poll_next(&mut $i), $start + ms($delta)); |
| )* |
| assert_pending!(poll_next(&mut $i)); |
| }; |
| ($i:ident, $start:ident) => { |
| check_interval_poll!($i, $start,); |
| }; |
| } |
| |
| #[tokio::test] |
| #[should_panic] |
| async fn interval_zero_duration() { |
| let _ = time::interval_at(Instant::now(), ms(0)); |
| } |
| |
| // Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | |
| // Actual ticks: | work -----| delay | work | work | work -| work -----| |
| // Poll behavior: | | | | | | | | |
| // | | | | | | | | |
| // Ready(s) | | Ready(s + 2p) | | | | |
| // Pending | Ready(s + 3p) | | | |
| // Ready(s + p) Ready(s + 4p) | | |
| // Ready(s + 5p) | |
| // Ready(s + 6p) |
| #[tokio::test(start_paused = true)] |
| async fn burst() { |
| let start = Instant::now(); |
| |
| // This is necessary because the timer is only so granular, and in order for |
| // all our ticks to resolve, the time needs to be 1ms ahead of what we |
| // expect, so that the runtime will see that it is time to resolve the timer |
| time::advance(ms(1)).await; |
| |
| let mut i = task::spawn(time::interval_at(start, ms(300))); |
| |
| check_interval_poll!(i, start, 0); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 300); |
| |
| time::advance(ms(650)).await; |
| check_interval_poll!(i, start, 600, 900); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start, 1200); |
| |
| time::advance(ms(250)).await; |
| check_interval_poll!(i, start, 1500); |
| |
| time::advance(ms(300)).await; |
| check_interval_poll!(i, start, 1800); |
| } |
| |
| // Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | |
| // Actual ticks: | work -----| delay | work -----| work -----| work -----| |
| // Poll behavior: | | | | | | | | |
| // | | | | | | | | |
| // Ready(s) | | Ready(s + 2p) | | | | |
| // Pending | Pending | | | |
| // Ready(s + p) Ready(s + 2p + d) | | |
| // Ready(s + 3p + d) | |
| // Ready(s + 4p + d) |
| #[tokio::test(start_paused = true)] |
| async fn delay() { |
| let start = Instant::now(); |
| |
| // This is necessary because the timer is only so granular, and in order for |
| // all our ticks to resolve, the time needs to be 1ms ahead of what we |
| // expect, so that the runtime will see that it is time to resolve the timer |
| time::advance(ms(1)).await; |
| |
| let mut i = task::spawn(time::interval_at(start, ms(300))); |
| i.set_missed_tick_behavior(MissedTickBehavior::Delay); |
| |
| check_interval_poll!(i, start, 0); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 300); |
| |
| time::advance(ms(650)).await; |
| check_interval_poll!(i, start, 600); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| // We have to add one here for the same reason as is above. |
| // Because `Interval` has reset its timer according to `Instant::now()`, |
| // we have to go forward 1 more millisecond than is expected so that the |
| // runtime realizes that it's time to resolve the timer. |
| time::advance(ms(201)).await; |
| // We add one because when using the `Delay` behavior, `Interval` |
| // adds the `period` from `Instant::now()`, which will always be off by one |
| // because we have to advance time by 1 (see above). |
| check_interval_poll!(i, start, 1251); |
| |
| time::advance(ms(300)).await; |
| // Again, we add one. |
| check_interval_poll!(i, start, 1551); |
| |
| time::advance(ms(300)).await; |
| check_interval_poll!(i, start, 1851); |
| } |
| |
| // Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | |
| // Actual ticks: | work -----| delay | work ---| work -----| work -----| |
| // Poll behavior: | | | | | | | |
| // | | | | | | | |
| // Ready(s) | | Ready(s + 2p) | | | |
| // Pending | Ready(s + 4p) | | |
| // Ready(s + p) Ready(s + 5p) | |
| // Ready(s + 6p) |
| #[tokio::test(start_paused = true)] |
| async fn skip() { |
| let start = Instant::now(); |
| |
| // This is necessary because the timer is only so granular, and in order for |
| // all our ticks to resolve, the time needs to be 1ms ahead of what we |
| // expect, so that the runtime will see that it is time to resolve the timer |
| time::advance(ms(1)).await; |
| |
| let mut i = task::spawn(time::interval_at(start, ms(300))); |
| i.set_missed_tick_behavior(MissedTickBehavior::Skip); |
| |
| check_interval_poll!(i, start, 0); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 300); |
| |
| time::advance(ms(650)).await; |
| check_interval_poll!(i, start, 600); |
| |
| time::advance(ms(250)).await; |
| check_interval_poll!(i, start, 1200); |
| |
| time::advance(ms(300)).await; |
| check_interval_poll!(i, start, 1500); |
| |
| time::advance(ms(300)).await; |
| check_interval_poll!(i, start, 1800); |
| } |
| |
| #[tokio::test(start_paused = true)] |
| async fn reset() { |
| let start = Instant::now(); |
| |
| // This is necessary because the timer is only so granular, and in order for |
| // all our ticks to resolve, the time needs to be 1ms ahead of what we |
| // expect, so that the runtime will see that it is time to resolve the timer |
| time::advance(ms(1)).await; |
| |
| let mut i = task::spawn(time::interval_at(start, ms(300))); |
| |
| check_interval_poll!(i, start, 0); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 300); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| i.reset(); |
| |
| time::advance(ms(250)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(50)).await; |
| // We add one because when using `reset` method, `Interval` adds the |
| // `period` from `Instant::now()`, which will always be off by one |
| check_interval_poll!(i, start, 701); |
| |
| time::advance(ms(300)).await; |
| check_interval_poll!(i, start, 1001); |
| } |
| |
| #[tokio::test(start_paused = true)] |
| async fn reset_immediately() { |
| let start = Instant::now(); |
| |
| // This is necessary because the timer is only so granular, and in order for |
| // all our ticks to resolve, the time needs to be 1ms ahead of what we |
| // expect, so that the runtime will see that it is time to resolve the timer |
| time::advance(ms(1)).await; |
| |
| let mut i = task::spawn(time::interval_at(start, ms(300))); |
| |
| check_interval_poll!(i, start, 0); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 300); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| i.reset_immediately(); |
| |
| // We add one because when using `reset` method, `Interval` adds the |
| // `period` from `Instant::now()`, which will always be off by one |
| check_interval_poll!(i, start, 401); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 701); |
| } |
| |
| #[tokio::test(start_paused = true)] |
| async fn reset_after() { |
| let start = Instant::now(); |
| |
| // This is necessary because the timer is only so granular, and in order for |
| // all our ticks to resolve, the time needs to be 1ms ahead of what we |
| // expect, so that the runtime will see that it is time to resolve the timer |
| time::advance(ms(1)).await; |
| |
| let mut i = task::spawn(time::interval_at(start, ms(300))); |
| |
| check_interval_poll!(i, start, 0); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 300); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| i.reset_after(Duration::from_millis(20)); |
| |
| // We add one because when using `reset` method, `Interval` adds the |
| // `period` from `Instant::now()`, which will always be off by one |
| time::advance(ms(20)).await; |
| check_interval_poll!(i, start, 421); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 721); |
| } |
| |
| #[tokio::test(start_paused = true)] |
| async fn reset_at() { |
| let start = Instant::now(); |
| |
| // This is necessary because the timer is only so granular, and in order for |
| // all our ticks to resolve, the time needs to be 1ms ahead of what we |
| // expect, so that the runtime will see that it is time to resolve the timer |
| time::advance(ms(1)).await; |
| |
| let mut i = task::spawn(time::interval_at(start, ms(300))); |
| |
| check_interval_poll!(i, start, 0); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 300); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| i.reset_at(Instant::now() + Duration::from_millis(40)); |
| |
| // We add one because when using `reset` method, `Interval` adds the |
| // `period` from `Instant::now()`, which will always be off by one |
| time::advance(ms(40)).await; |
| check_interval_poll!(i, start, 441); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 741); |
| } |
| |
| #[tokio::test(start_paused = true)] |
| async fn reset_at_bigger_than_interval() { |
| let start = Instant::now(); |
| |
| // This is necessary because the timer is only so granular, and in order for |
| // all our ticks to resolve, the time needs to be 1ms ahead of what we |
| // expect, so that the runtime will see that it is time to resolve the timer |
| time::advance(ms(1)).await; |
| |
| let mut i = task::spawn(time::interval_at(start, ms(300))); |
| |
| check_interval_poll!(i, start, 0); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| time::advance(ms(200)).await; |
| check_interval_poll!(i, start, 300); |
| |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start); |
| |
| i.reset_at(Instant::now() + Duration::from_millis(1000)); |
| |
| // Validate the interval does not tick until 1000ms have passed |
| time::advance(ms(300)).await; |
| check_interval_poll!(i, start); |
| time::advance(ms(300)).await; |
| check_interval_poll!(i, start); |
| time::advance(ms(300)).await; |
| check_interval_poll!(i, start); |
| |
| // We add one because when using `reset` method, `Interval` adds the |
| // `period` from `Instant::now()`, which will always be off by one |
| time::advance(ms(100)).await; |
| check_interval_poll!(i, start, 1401); |
| |
| time::advance(ms(300)).await; |
| check_interval_poll!(i, start, 1701); |
| } |
| |
| fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> { |
| interval.enter(|cx, mut interval| interval.poll_tick(cx)) |
| } |
| |
| fn ms(n: u64) -> Duration { |
| Duration::from_millis(n) |
| } |
| |
| /// Helper struct to test the [tokio::time::Interval::poll_tick()] method. |
| /// |
| /// `poll_tick()` should register the waker in the context only if it returns |
| /// `Poll::Pending`, not when returning `Poll::Ready`. This struct contains an |
| /// interval timer and counts up on every tick when used as stream. When the |
| /// counter is a multiple of four, it yields the current counter value. |
| /// Depending on the value for `wake_on_pending`, it will reschedule itself when |
| /// it returns `Poll::Pending` or not. When used with `wake_on_pending=false`, |
| /// we expect that the stream stalls because the timer will **not** reschedule |
| /// the next wake-up itself once it returned `Poll::Ready`. |
| struct IntervalStreamer { |
| counter: u32, |
| timer: Interval, |
| wake_on_pending: bool, |
| } |
| |
| impl Stream for IntervalStreamer { |
| type Item = u32; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let this = Pin::into_inner(self); |
| |
| if this.counter > 12 { |
| return Poll::Ready(None); |
| } |
| |
| match this.timer.poll_tick(cx) { |
| Poll::Pending => Poll::Pending, |
| Poll::Ready(_) => { |
| this.counter += 1; |
| if this.counter % 4 == 0 { |
| Poll::Ready(Some(this.counter)) |
| } else { |
| if this.wake_on_pending { |
| // Schedule this task for wake-up |
| cx.waker().wake_by_ref(); |
| } |
| Poll::Pending |
| } |
| } |
| } |
| } |
| } |
| |
| #[tokio::test(start_paused = true)] |
| async fn stream_with_interval_poll_tick_self_waking() { |
| let stream = IntervalStreamer { |
| counter: 0, |
| timer: tokio::time::interval(tokio::time::Duration::from_millis(10)), |
| wake_on_pending: true, |
| }; |
| |
| let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12); |
| |
| // Wrap task in timeout so that it will finish eventually even if the stream |
| // stalls. |
| tokio::spawn(tokio::time::timeout( |
| tokio::time::Duration::from_millis(150), |
| async move { |
| tokio::pin!(stream); |
| |
| while let Some(item) = stream.next().await { |
| res_tx.send(item).await.ok(); |
| } |
| }, |
| )); |
| |
| let mut items = Vec::with_capacity(3); |
| while let Some(result) = res_rx.recv().await { |
| items.push(result); |
| } |
| |
| // We expect the stream to yield normally and thus three items. |
| assert_eq!(items, vec![4, 8, 12]); |
| } |
| |
| #[tokio::test(start_paused = true)] |
| async fn stream_with_interval_poll_tick_no_waking() { |
| let stream = IntervalStreamer { |
| counter: 0, |
| timer: tokio::time::interval(tokio::time::Duration::from_millis(10)), |
| wake_on_pending: false, |
| }; |
| |
| let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12); |
| |
| // Wrap task in timeout so that it will finish eventually even if the stream |
| // stalls. |
| tokio::spawn(tokio::time::timeout( |
| tokio::time::Duration::from_millis(150), |
| async move { |
| tokio::pin!(stream); |
| |
| while let Some(item) = stream.next().await { |
| res_tx.send(item).await.ok(); |
| } |
| }, |
| )); |
| |
| let mut items = Vec::with_capacity(0); |
| while let Some(result) = res_rx.recv().await { |
| items.push(result); |
| } |
| |
| // We expect the stream to stall because it does not reschedule itself on |
| // `Poll::Pending` and neither does [tokio::time::Interval] reschedule the |
| // task when returning `Poll::Ready`. |
| assert_eq!(items, vec![]); |
| } |
| |
| #[tokio::test(start_paused = true)] |
| async fn interval_doesnt_panic_max_duration_when_polling() { |
| let mut timer = task::spawn(time::interval(Duration::MAX)); |
| assert_ready!(timer.enter(|cx, mut timer| timer.poll_tick(cx))); |
| } |