| use futures::channel::mpsc; |
| use futures::executor::block_on; |
| use futures::stream::{abortable, Stream, StreamExt}; |
| use futures::task::{Context, Poll}; |
| use futures::SinkExt; |
| use futures_test::task::new_count_waker; |
| use std::pin::Pin; |
| |
| #[test] |
| fn abortable_works() { |
| let (_tx, a_rx) = mpsc::channel::<()>(1); |
| let (mut abortable_rx, abort_handle) = abortable(a_rx); |
| |
| abort_handle.abort(); |
| assert!(abortable_rx.is_aborted()); |
| assert_eq!(None, block_on(abortable_rx.next())); |
| } |
| |
| #[test] |
| fn abortable_awakens() { |
| let (_tx, a_rx) = mpsc::channel::<()>(1); |
| let (mut abortable_rx, abort_handle) = abortable(a_rx); |
| |
| let (waker, counter) = new_count_waker(); |
| let mut cx = Context::from_waker(&waker); |
| |
| assert_eq!(counter, 0); |
| assert_eq!(Poll::Pending, Pin::new(&mut abortable_rx).poll_next(&mut cx)); |
| assert_eq!(counter, 0); |
| |
| abort_handle.abort(); |
| assert_eq!(counter, 1); |
| assert!(abortable_rx.is_aborted()); |
| assert_eq!(Poll::Ready(None), Pin::new(&mut abortable_rx).poll_next(&mut cx)); |
| } |
| |
| #[test] |
| fn abortable_resolves() { |
| let (mut tx, a_rx) = mpsc::channel::<()>(1); |
| let (mut abortable_rx, _abort_handle) = abortable(a_rx); |
| |
| block_on(tx.send(())).unwrap(); |
| |
| assert!(!abortable_rx.is_aborted()); |
| assert_eq!(Some(()), block_on(abortable_rx.next())); |
| } |