blob: 43c90b7838a87023f80e509b0636939ed5a1e066 [file] [log] [blame]
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use futures::stream::{Fuse, Stream};
pub trait SelectWithWeakExt: Stream {
fn select_with_weak<S>(self, other: S) -> SelectWithWeak<Self, S>
where
S: Stream<Item = Self::Item>,
Self: Sized;
}
impl<T> SelectWithWeakExt for T
where
T: Stream,
{
fn select_with_weak<S>(self, other: S) -> SelectWithWeak<Self, S>
where
S: Stream<Item = Self::Item>,
Self: Sized,
{
new(self, other)
}
}
/// An adapter for merging the output of two streams.
///
/// The merged stream produces items from either of the underlying streams as
/// they become available, and the streams are polled in a round-robin fashion.
/// Errors, however, are not merged: you get at most one error at a time.
///
/// Finishes when strong stream finishes
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct SelectWithWeak<S1, S2> {
strong: Fuse<S1>,
weak: Fuse<S2>,
use_strong: bool,
}
fn new<S1, S2>(stream1: S1, stream2: S2) -> SelectWithWeak<S1, S2>
where
S1: Stream,
S2: Stream<Item = S1::Item>,
{
use futures::StreamExt;
SelectWithWeak {
strong: stream1.fuse(),
weak: stream2.fuse(),
use_strong: false,
}
}
impl<S1, S2> Stream for SelectWithWeak<S1, S2>
where
S1: Stream + Unpin,
S2: Stream<Item = S1::Item> + Unpin,
{
type Item = S1::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = Pin::into_inner(self);
let mut checked_strong = false;
loop {
if this.use_strong {
match Pin::new(&mut this.strong).poll_next(cx) {
Poll::Ready(Some(item)) => {
this.use_strong = false;
return Poll::Ready(Some(item));
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {
if !checked_strong {
this.use_strong = false;
} else {
return Poll::Pending;
}
}
}
checked_strong = true;
} else {
this.use_strong = true;
match Pin::new(&mut this.weak).poll_next(cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) | Poll::Pending => (),
}
}
}
}
}