| use crate::sync::Notify; |
| |
| use loom::future::block_on; |
| use loom::sync::Arc; |
| use loom::thread; |
| |
| use tokio_test::{assert_pending, assert_ready}; |
| |
| /// `util::wake_list::NUM_WAKERS` |
| const WAKE_LIST_SIZE: usize = 32; |
| |
| #[test] |
| fn notify_one() { |
| loom::model(|| { |
| let tx = Arc::new(Notify::new()); |
| let rx = tx.clone(); |
| |
| let th = thread::spawn(move || { |
| block_on(async { |
| rx.notified().await; |
| }); |
| }); |
| |
| tx.notify_one(); |
| th.join().unwrap(); |
| }); |
| } |
| |
| #[test] |
| fn notify_waiters() { |
| loom::model(|| { |
| let notify = Arc::new(Notify::new()); |
| let tx = notify.clone(); |
| let notified1 = notify.notified(); |
| let notified2 = notify.notified(); |
| |
| let th = thread::spawn(move || { |
| tx.notify_waiters(); |
| }); |
| |
| block_on(async { |
| notified1.await; |
| notified2.await; |
| }); |
| |
| th.join().unwrap(); |
| }); |
| } |
| |
| #[test] |
| fn notify_waiters_and_one() { |
| loom::model(|| { |
| let notify = Arc::new(Notify::new()); |
| let tx1 = notify.clone(); |
| let tx2 = notify.clone(); |
| |
| let th1 = thread::spawn(move || { |
| tx1.notify_waiters(); |
| }); |
| |
| let th2 = thread::spawn(move || { |
| tx2.notify_one(); |
| }); |
| |
| let th3 = thread::spawn(move || { |
| let notified = notify.notified(); |
| |
| block_on(async { |
| notified.await; |
| }); |
| }); |
| |
| th1.join().unwrap(); |
| th2.join().unwrap(); |
| th3.join().unwrap(); |
| }); |
| } |
| |
| #[test] |
| fn notify_multi() { |
| loom::model(|| { |
| let notify = Arc::new(Notify::new()); |
| |
| let mut ths = vec![]; |
| |
| for _ in 0..2 { |
| let notify = notify.clone(); |
| |
| ths.push(thread::spawn(move || { |
| block_on(async { |
| notify.notified().await; |
| notify.notify_one(); |
| }) |
| })); |
| } |
| |
| notify.notify_one(); |
| |
| for th in ths.drain(..) { |
| th.join().unwrap(); |
| } |
| |
| block_on(async { |
| notify.notified().await; |
| }); |
| }); |
| } |
| |
| #[test] |
| fn notify_drop() { |
| use crate::future::poll_fn; |
| use std::future::Future; |
| use std::task::Poll; |
| |
| loom::model(|| { |
| let notify = Arc::new(Notify::new()); |
| let rx1 = notify.clone(); |
| let rx2 = notify.clone(); |
| |
| let th1 = thread::spawn(move || { |
| let mut recv = Box::pin(rx1.notified()); |
| |
| block_on(poll_fn(|cx| { |
| if recv.as_mut().poll(cx).is_ready() { |
| rx1.notify_one(); |
| } |
| Poll::Ready(()) |
| })); |
| }); |
| |
| let th2 = thread::spawn(move || { |
| block_on(async { |
| rx2.notified().await; |
| // Trigger second notification |
| rx2.notify_one(); |
| rx2.notified().await; |
| }); |
| }); |
| |
| notify.notify_one(); |
| |
| th1.join().unwrap(); |
| th2.join().unwrap(); |
| }); |
| } |
| |
| /// Polls two `Notified` futures and checks if poll results are consistent |
| /// with each other. If the first future is notified by a `notify_waiters` |
| /// call, then the second one must be notified as well. |
| #[test] |
| fn notify_waiters_poll_consistency() { |
| fn notify_waiters_poll_consistency_variant(poll_setting: [bool; 2]) { |
| let notify = Arc::new(Notify::new()); |
| let mut notified = [ |
| tokio_test::task::spawn(notify.notified()), |
| tokio_test::task::spawn(notify.notified()), |
| ]; |
| for i in 0..2 { |
| if poll_setting[i] { |
| assert_pending!(notified[i].poll()); |
| } |
| } |
| |
| let tx = notify.clone(); |
| let th = thread::spawn(move || { |
| tx.notify_waiters(); |
| }); |
| |
| let res1 = notified[0].poll(); |
| let res2 = notified[1].poll(); |
| |
| // If res1 is ready, then res2 must also be ready. |
| assert!(res1.is_pending() || res2.is_ready()); |
| |
| th.join().unwrap(); |
| } |
| |
| // We test different scenarios in which pending futures had or had not |
| // been polled before the call to `notify_waiters`. |
| loom::model(|| notify_waiters_poll_consistency_variant([false, false])); |
| loom::model(|| notify_waiters_poll_consistency_variant([true, false])); |
| loom::model(|| notify_waiters_poll_consistency_variant([false, true])); |
| loom::model(|| notify_waiters_poll_consistency_variant([true, true])); |
| } |
| |
| /// Polls two `Notified` futures and checks if poll results are consistent |
| /// with each other. If the first future is notified by a `notify_waiters` |
| /// call, then the second one must be notified as well. |
| /// |
| /// Here we also add other `Notified` futures in between to force the two |
| /// tested futures to end up in different chunks. |
| #[test] |
| fn notify_waiters_poll_consistency_many() { |
| fn notify_waiters_poll_consistency_many_variant(order: [usize; 2]) { |
| let notify = Arc::new(Notify::new()); |
| |
| let mut futs = (0..WAKE_LIST_SIZE + 1) |
| .map(|_| tokio_test::task::spawn(notify.notified())) |
| .collect::<Vec<_>>(); |
| |
| assert_pending!(futs[order[0]].poll()); |
| for i in 2..futs.len() { |
| assert_pending!(futs[i].poll()); |
| } |
| assert_pending!(futs[order[1]].poll()); |
| |
| let tx = notify.clone(); |
| let th = thread::spawn(move || { |
| tx.notify_waiters(); |
| }); |
| |
| let res1 = futs[0].poll(); |
| let res2 = futs[1].poll(); |
| |
| // If res1 is ready, then res2 must also be ready. |
| assert!(res1.is_pending() || res2.is_ready()); |
| |
| th.join().unwrap(); |
| } |
| |
| // We test different scenarios in which futures are polled in different order. |
| loom::model(|| notify_waiters_poll_consistency_many_variant([0, 1])); |
| loom::model(|| notify_waiters_poll_consistency_many_variant([1, 0])); |
| } |
| |
| /// Checks if a call to `notify_waiters` is observed as atomic when combined |
| /// with a concurrent call to `notify_one`. |
| #[test] |
| fn notify_waiters_is_atomic() { |
| fn notify_waiters_is_atomic_variant(tested_fut_index: usize) { |
| let notify = Arc::new(Notify::new()); |
| |
| let mut futs = (0..WAKE_LIST_SIZE + 1) |
| .map(|_| tokio_test::task::spawn(notify.notified())) |
| .collect::<Vec<_>>(); |
| |
| for fut in &mut futs { |
| assert_pending!(fut.poll()); |
| } |
| |
| let tx = notify.clone(); |
| let th = thread::spawn(move || { |
| tx.notify_waiters(); |
| }); |
| |
| block_on(async { |
| // If awaiting one of the futures completes, then we should be |
| // able to assume that all pending futures are notified. Therefore |
| // a notification from a subsequent `notify_one` call should not |
| // be consumed by an old future. |
| futs.remove(tested_fut_index).await; |
| |
| let mut new_fut = tokio_test::task::spawn(notify.notified()); |
| assert_pending!(new_fut.poll()); |
| |
| notify.notify_one(); |
| |
| // `new_fut` must consume the notification from `notify_one`. |
| assert_ready!(new_fut.poll()); |
| }); |
| |
| th.join().unwrap(); |
| } |
| |
| // We test different scenarios in which the tested future is at the beginning |
| // or at the end of the waiters queue used by `Notify`. |
| loom::model(|| notify_waiters_is_atomic_variant(0)); |
| loom::model(|| notify_waiters_is_atomic_variant(32)); |
| } |
| |
| /// Checks if a single call to `notify_waiters` does not get through two `Notified` |
| /// futures created and awaited sequentially like this: |
| /// ```ignore |
| /// notify.notified().await; |
| /// notify.notified().await; |
| /// ``` |
| #[test] |
| fn notify_waiters_sequential_notified_await() { |
| use crate::sync::oneshot; |
| |
| loom::model(|| { |
| let notify = Arc::new(Notify::new()); |
| |
| let (tx_fst, rx_fst) = oneshot::channel(); |
| let (tx_snd, rx_snd) = oneshot::channel(); |
| |
| let receiver = thread::spawn({ |
| let notify = notify.clone(); |
| move || { |
| block_on(async { |
| // Poll the first `Notified` to put it as the first waiter |
| // in the queue. |
| let mut first_notified = tokio_test::task::spawn(notify.notified()); |
| assert_pending!(first_notified.poll()); |
| |
| // Create additional waiters to force `notify_waiters` to |
| // release the lock at least once. |
| let _task_pile = (0..WAKE_LIST_SIZE + 1) |
| .map(|_| { |
| let mut fut = tokio_test::task::spawn(notify.notified()); |
| assert_pending!(fut.poll()); |
| fut |
| }) |
| .collect::<Vec<_>>(); |
| |
| // We are ready for the notify_waiters call. |
| tx_fst.send(()).unwrap(); |
| |
| first_notified.await; |
| |
| // Poll the second `Notified` future to try to insert |
| // it to the waiters queue. |
| let mut second_notified = tokio_test::task::spawn(notify.notified()); |
| assert_pending!(second_notified.poll()); |
| |
| // Wait for the `notify_waiters` to end and check if we |
| // are woken up. |
| rx_snd.await.unwrap(); |
| assert_pending!(second_notified.poll()); |
| }); |
| } |
| }); |
| |
| // Wait for the signal and call `notify_waiters`. |
| block_on(rx_fst).unwrap(); |
| notify.notify_waiters(); |
| tx_snd.send(()).unwrap(); |
| |
| receiver.join().unwrap(); |
| }); |
| } |