| use core::pin::Pin; |
| |
| use futures::{ |
| stream::{self, repeat, Repeat, StreamExt, TryStreamExt}, |
| task::Poll, |
| Stream, |
| }; |
| use futures_executor::block_on; |
| use futures_task::Context; |
| use futures_test::task::noop_context; |
| |
| #[test] |
| fn try_filter_map_after_err() { |
| let cx = &mut noop_context(); |
| let mut s = stream::iter(1..=3) |
| .map(Ok) |
| .try_filter_map(|v| async move { Err::<Option<()>, _>(v) }) |
| .filter_map(|r| async move { r.ok() }) |
| .boxed(); |
| assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); |
| } |
| |
| #[test] |
| fn try_skip_while_after_err() { |
| let cx = &mut noop_context(); |
| let mut s = stream::iter(1..=3) |
| .map(Ok) |
| .try_skip_while(|_| async move { Err::<_, ()>(()) }) |
| .filter_map(|r| async move { r.ok() }) |
| .boxed(); |
| assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); |
| } |
| |
| #[test] |
| fn try_take_while_after_err() { |
| let cx = &mut noop_context(); |
| let mut s = stream::iter(1..=3) |
| .map(Ok) |
| .try_take_while(|_| async move { Err::<_, ()>(()) }) |
| .filter_map(|r| async move { r.ok() }) |
| .boxed(); |
| assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); |
| } |
| |
| #[test] |
| fn try_flatten_unordered() { |
| let test_st = stream::iter(1..7) |
| .map(|val: u32| { |
| if val % 2 == 0 { |
| Ok(stream::unfold((val, 1), |(val, pow)| async move { |
| Some((val.pow(pow), (val, pow + 1))) |
| }) |
| .take(3) |
| .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) })) |
| } else { |
| Err(val) |
| } |
| }) |
| .map_ok(Box::pin) |
| .try_flatten_unordered(None); |
| |
| block_on(async move { |
| assert_eq!( |
| // All numbers can be divided by 16 and odds must be `Err` |
| // For all basic evens we must have powers from 1 to 3 |
| vec![ |
| Err(1), |
| Err(3), |
| Err(5), |
| Ok(2), |
| Ok(4), |
| Ok(6), |
| Ok(4), |
| Err(16), |
| Ok(36), |
| Ok(8), |
| Err(64), |
| Ok(216) |
| ], |
| test_st.collect::<Vec<_>>().await |
| ) |
| }); |
| |
| #[derive(Clone, Debug)] |
| struct ErrorStream { |
| error_after: usize, |
| polled: usize, |
| } |
| |
| impl Stream for ErrorStream { |
| type Item = Result<Repeat<Result<(), ()>>, ()>; |
| |
| fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> { |
| if self.polled > self.error_after { |
| panic!("Polled after error"); |
| } else { |
| let out = |
| if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) }; |
| self.polled += 1; |
| Poll::Ready(Some(out)) |
| } |
| } |
| } |
| |
| block_on(async move { |
| let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None); |
| let mut ctr = 0; |
| while (st.try_next().await).is_ok() { |
| ctr += 1; |
| } |
| assert_eq!(ctr, 0); |
| |
| assert_eq!( |
| ErrorStream { error_after: 10, polled: 0 } |
| .try_flatten_unordered(None) |
| .inspect_ok(|_| panic!("Unexpected `Ok`")) |
| .try_collect::<Vec<_>>() |
| .await, |
| Err(()) |
| ); |
| |
| let mut taken = 0; |
| assert_eq!( |
| ErrorStream { error_after: 10, polled: 0 } |
| .map_ok(|st| st.take(3)) |
| .try_flatten_unordered(1) |
| .inspect(|_| taken += 1) |
| .try_fold((), |(), res| async move { Ok(res) }) |
| .await, |
| Err(()) |
| ); |
| assert_eq!(taken, 31); |
| }) |
| } |