| #![allow(clippy::cognitive_complexity)] |
| #![warn(rust_2018_idioms)] |
| #![cfg(feature = "sync")] |
| |
| #[cfg(all(target_family = "wasm", not(target_os = "wasi")))] |
| use wasm_bindgen_test::wasm_bindgen_test as test; |
| |
| use tokio::sync::watch; |
| use tokio_test::task::spawn; |
| use tokio_test::{ |
| assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok, |
| }; |
| |
| #[test] |
| fn single_rx_recv() { |
| let (tx, mut rx) = watch::channel("one"); |
| |
| { |
| // Not initially notified |
| let mut t = spawn(rx.changed()); |
| assert_pending!(t.poll()); |
| } |
| assert_eq!(*rx.borrow(), "one"); |
| |
| { |
| let mut t = spawn(rx.changed()); |
| assert_pending!(t.poll()); |
| |
| tx.send("two").unwrap(); |
| |
| assert!(t.is_woken()); |
| |
| assert_ready_ok!(t.poll()); |
| } |
| assert_eq!(*rx.borrow(), "two"); |
| |
| { |
| let mut t = spawn(rx.changed()); |
| assert_pending!(t.poll()); |
| |
| drop(tx); |
| |
| assert!(t.is_woken()); |
| assert_ready_err!(t.poll()); |
| } |
| assert_eq!(*rx.borrow(), "two"); |
| } |
| |
| #[test] |
| fn rx_version_underflow() { |
| let (_tx, mut rx) = watch::channel("one"); |
| |
| // Version starts at 2, validate we do not underflow |
| rx.mark_changed(); |
| rx.mark_changed(); |
| } |
| |
| #[test] |
| fn rx_mark_changed() { |
| let (tx, mut rx) = watch::channel("one"); |
| |
| let mut rx2 = rx.clone(); |
| let mut rx3 = rx.clone(); |
| let mut rx4 = rx.clone(); |
| { |
| rx.mark_changed(); |
| assert!(rx.has_changed().unwrap()); |
| |
| let mut t = spawn(rx.changed()); |
| assert_ready_ok!(t.poll()); |
| } |
| |
| { |
| assert!(!rx2.has_changed().unwrap()); |
| |
| let mut t = spawn(rx2.changed()); |
| assert_pending!(t.poll()); |
| } |
| |
| { |
| rx3.mark_changed(); |
| assert_eq!(*rx3.borrow(), "one"); |
| |
| assert!(rx3.has_changed().unwrap()); |
| |
| assert_eq!(*rx3.borrow_and_update(), "one"); |
| |
| assert!(!rx3.has_changed().unwrap()); |
| |
| let mut t = spawn(rx3.changed()); |
| assert_pending!(t.poll()); |
| } |
| |
| { |
| tx.send("two").unwrap(); |
| assert!(rx4.has_changed().unwrap()); |
| assert_eq!(*rx4.borrow_and_update(), "two"); |
| |
| rx4.mark_changed(); |
| assert!(rx4.has_changed().unwrap()); |
| assert_eq!(*rx4.borrow_and_update(), "two") |
| } |
| |
| assert_eq!(*rx.borrow(), "two"); |
| } |
| |
| #[test] |
| fn rx_mark_unchanged() { |
| let (tx, mut rx) = watch::channel("one"); |
| |
| let mut rx2 = rx.clone(); |
| |
| { |
| assert!(!rx.has_changed().unwrap()); |
| |
| rx.mark_changed(); |
| assert!(rx.has_changed().unwrap()); |
| |
| rx.mark_unchanged(); |
| assert!(!rx.has_changed().unwrap()); |
| |
| let mut t = spawn(rx.changed()); |
| assert_pending!(t.poll()); |
| } |
| |
| { |
| assert!(!rx2.has_changed().unwrap()); |
| |
| tx.send("two").unwrap(); |
| assert!(rx2.has_changed().unwrap()); |
| |
| rx2.mark_unchanged(); |
| assert!(!rx2.has_changed().unwrap()); |
| assert_eq!(*rx2.borrow_and_update(), "two"); |
| } |
| |
| assert_eq!(*rx.borrow(), "two"); |
| } |
| |
| #[test] |
| fn multi_rx() { |
| let (tx, mut rx1) = watch::channel("one"); |
| let mut rx2 = rx1.clone(); |
| |
| { |
| let mut t1 = spawn(rx1.changed()); |
| let mut t2 = spawn(rx2.changed()); |
| |
| assert_pending!(t1.poll()); |
| assert_pending!(t2.poll()); |
| } |
| assert_eq!(*rx1.borrow(), "one"); |
| assert_eq!(*rx2.borrow(), "one"); |
| |
| let mut t2 = spawn(rx2.changed()); |
| |
| { |
| let mut t1 = spawn(rx1.changed()); |
| |
| assert_pending!(t1.poll()); |
| assert_pending!(t2.poll()); |
| |
| tx.send("two").unwrap(); |
| |
| assert!(t1.is_woken()); |
| assert!(t2.is_woken()); |
| |
| assert_ready_ok!(t1.poll()); |
| } |
| assert_eq!(*rx1.borrow(), "two"); |
| |
| { |
| let mut t1 = spawn(rx1.changed()); |
| |
| assert_pending!(t1.poll()); |
| |
| tx.send("three").unwrap(); |
| |
| assert!(t1.is_woken()); |
| assert!(t2.is_woken()); |
| |
| assert_ready_ok!(t1.poll()); |
| assert_ready_ok!(t2.poll()); |
| } |
| assert_eq!(*rx1.borrow(), "three"); |
| |
| drop(t2); |
| |
| assert_eq!(*rx2.borrow(), "three"); |
| |
| { |
| let mut t1 = spawn(rx1.changed()); |
| let mut t2 = spawn(rx2.changed()); |
| |
| assert_pending!(t1.poll()); |
| assert_pending!(t2.poll()); |
| |
| tx.send("four").unwrap(); |
| |
| assert_ready_ok!(t1.poll()); |
| assert_ready_ok!(t2.poll()); |
| } |
| assert_eq!(*rx1.borrow(), "four"); |
| assert_eq!(*rx2.borrow(), "four"); |
| } |
| |
| #[test] |
| fn rx_observes_final_value() { |
| // Initial value |
| |
| let (tx, mut rx) = watch::channel("one"); |
| drop(tx); |
| |
| { |
| let mut t1 = spawn(rx.changed()); |
| assert_ready_err!(t1.poll()); |
| } |
| assert_eq!(*rx.borrow(), "one"); |
| |
| // Sending a value |
| |
| let (tx, mut rx) = watch::channel("one"); |
| |
| tx.send("two").unwrap(); |
| |
| { |
| let mut t1 = spawn(rx.changed()); |
| assert_ready_ok!(t1.poll()); |
| } |
| assert_eq!(*rx.borrow(), "two"); |
| |
| { |
| let mut t1 = spawn(rx.changed()); |
| assert_pending!(t1.poll()); |
| |
| tx.send("three").unwrap(); |
| drop(tx); |
| |
| assert!(t1.is_woken()); |
| |
| assert_ready_ok!(t1.poll()); |
| } |
| assert_eq!(*rx.borrow(), "three"); |
| |
| { |
| let mut t1 = spawn(rx.changed()); |
| assert_ready_err!(t1.poll()); |
| } |
| assert_eq!(*rx.borrow(), "three"); |
| } |
| |
| #[test] |
| fn poll_close() { |
| let (tx, rx) = watch::channel("one"); |
| |
| { |
| let mut t = spawn(tx.closed()); |
| assert_pending!(t.poll()); |
| |
| drop(rx); |
| |
| assert!(t.is_woken()); |
| assert_ready!(t.poll()); |
| } |
| |
| assert!(tx.send("two").is_err()); |
| } |
| |
| #[test] |
| fn borrow_and_update() { |
| let (tx, mut rx) = watch::channel("one"); |
| |
| assert!(!rx.has_changed().unwrap()); |
| |
| tx.send("two").unwrap(); |
| assert!(rx.has_changed().unwrap()); |
| assert_ready!(spawn(rx.changed()).poll()).unwrap(); |
| assert_pending!(spawn(rx.changed()).poll()); |
| assert!(!rx.has_changed().unwrap()); |
| |
| tx.send("three").unwrap(); |
| assert!(rx.has_changed().unwrap()); |
| assert_eq!(*rx.borrow_and_update(), "three"); |
| assert_pending!(spawn(rx.changed()).poll()); |
| assert!(!rx.has_changed().unwrap()); |
| |
| drop(tx); |
| assert_eq!(*rx.borrow_and_update(), "three"); |
| assert_ready!(spawn(rx.changed()).poll()).unwrap_err(); |
| assert!(rx.has_changed().is_err()); |
| } |
| |
| #[test] |
| fn reopened_after_subscribe() { |
| let (tx, rx) = watch::channel("one"); |
| assert!(!tx.is_closed()); |
| |
| drop(rx); |
| assert!(tx.is_closed()); |
| |
| let rx = tx.subscribe(); |
| assert!(!tx.is_closed()); |
| |
| drop(rx); |
| assert!(tx.is_closed()); |
| } |
| |
| #[test] |
| #[cfg(panic = "unwind")] |
| #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding |
| fn send_modify_panic() { |
| let (tx, mut rx) = watch::channel("one"); |
| |
| tx.send_modify(|old| *old = "two"); |
| assert_eq!(*rx.borrow_and_update(), "two"); |
| |
| let mut rx2 = rx.clone(); |
| assert_eq!(*rx2.borrow_and_update(), "two"); |
| |
| let mut task = spawn(rx2.changed()); |
| |
| let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { |
| tx.send_modify(|old| { |
| *old = "panicked"; |
| panic!(); |
| }) |
| })); |
| assert!(result.is_err()); |
| |
| assert_pending!(task.poll()); |
| assert_eq!(*rx.borrow(), "panicked"); |
| |
| tx.send_modify(|old| *old = "three"); |
| assert_ready_ok!(task.poll()); |
| assert_eq!(*rx.borrow_and_update(), "three"); |
| } |
| |
| #[tokio::test] |
| async fn multiple_sender() { |
| let (tx1, mut rx) = watch::channel(0); |
| let tx2 = tx1.clone(); |
| |
| let mut t = spawn(async { |
| rx.changed().await.unwrap(); |
| let v1 = *rx.borrow_and_update(); |
| rx.changed().await.unwrap(); |
| let v2 = *rx.borrow_and_update(); |
| (v1, v2) |
| }); |
| |
| tx1.send(1).unwrap(); |
| assert_pending!(t.poll()); |
| tx2.send(2).unwrap(); |
| assert_ready_eq!(t.poll(), (1, 2)); |
| } |
| |
| #[tokio::test] |
| async fn receiver_is_notified_when_last_sender_is_dropped() { |
| let (tx1, mut rx) = watch::channel(0); |
| let tx2 = tx1.clone(); |
| |
| let mut t = spawn(rx.changed()); |
| assert_pending!(t.poll()); |
| |
| drop(tx1); |
| assert!(!t.is_woken()); |
| drop(tx2); |
| |
| assert!(t.is_woken()); |
| } |
| |
| #[tokio::test] |
| async fn receiver_changed_is_cooperative() { |
| let (tx, mut rx) = watch::channel(()); |
| |
| drop(tx); |
| |
| tokio::select! { |
| biased; |
| _ = async { |
| loop { |
| assert!(rx.changed().await.is_err()); |
| } |
| } => {}, |
| _ = tokio::task::yield_now() => {}, |
| } |
| } |
| |
| #[tokio::test] |
| async fn receiver_changed_is_cooperative_ok() { |
| let (tx, mut rx) = watch::channel(()); |
| |
| tokio::select! { |
| biased; |
| _ = async { |
| loop { |
| assert!(tx.send(()).is_ok()); |
| assert!(rx.changed().await.is_ok()); |
| } |
| } => {}, |
| _ = tokio::task::yield_now() => {}, |
| } |
| } |
| |
| #[tokio::test] |
| async fn receiver_wait_for_is_cooperative() { |
| let (tx, mut rx) = watch::channel(0); |
| |
| drop(tx); |
| |
| tokio::select! { |
| biased; |
| _ = async { |
| loop { |
| assert!(rx.wait_for(|val| *val == 1).await.is_err()); |
| } |
| } => {}, |
| _ = tokio::task::yield_now() => {}, |
| } |
| } |
| |
| #[tokio::test] |
| async fn receiver_wait_for_is_cooperative_ok() { |
| let (tx, mut rx) = watch::channel(0); |
| |
| tokio::select! { |
| biased; |
| _ = async { |
| loop { |
| assert!(tx.send(1).is_ok()); |
| assert!(rx.wait_for(|val| *val == 1).await.is_ok()); |
| } |
| } => {}, |
| _ = tokio::task::yield_now() => {}, |
| } |
| } |
| |
| #[tokio::test] |
| async fn sender_closed_is_cooperative() { |
| let (tx, rx) = watch::channel(()); |
| |
| drop(rx); |
| |
| tokio::select! { |
| _ = async { |
| loop { |
| tx.closed().await; |
| } |
| } => {}, |
| _ = tokio::task::yield_now() => {}, |
| } |
| } |