Upgrade tokio-stream to 0.1.14

This project was upgraded with external_updater.
Usage: tools/external_updater/updater.sh update external/rust/crates/tokio-stream
For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md

Test: TreeHugger
Change-Id: I3b4ee2f213083dd174d39257fd6dad9fd811fa2d
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index d866201..a31b22b 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
 {
   "git": {
-    "sha1": "46f974d8cfcb56c251d80cf1dc4a6bcf9fd1d7a0"
+    "sha1": "398dfda56d3ee4b0d4d9e86abe15039e86979d83"
   },
   "path_in_vcs": "tokio-stream"
 }
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 357bd1c..8083c73 100644
--- a/Android.bp
+++ b/Android.bp
@@ -23,9 +23,9 @@
     host_supported: true,
     crate_name: "tokio_stream",
     cargo_env_compat: true,
-    cargo_pkg_version: "0.1.12",
+    cargo_pkg_version: "0.1.14",
     srcs: ["src/lib.rs"],
-    edition: "2018",
+    edition: "2021",
     features: [
         "fs",
         "io-util",
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c475c7c..c14ad07 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,10 +1,31 @@
-# 0.1.12 (January 20, 2022)
+# 0.1.14 (April 26th, 2023)
+
+This bugfix release bumps the minimum version of Tokio to 1.15, which is
+necessary for `timeout_repeating` to compile. ([#5657])
+
+[#5657]: https://github.com/tokio-rs/tokio/pull/5657
+
+# 0.1.13 (April 25th, 2023)
+
+This release bumps the MSRV of tokio-stream to 1.56.
+
+- stream: add "full" feature flag ([#5639])
+- stream: add `StreamExt::timeout_repeating` ([#5577])
+- stream: add `StreamNotifyClose` ([#4851])
+
+[#4851]: https://github.com/tokio-rs/tokio/pull/4851
+[#5577]: https://github.com/tokio-rs/tokio/pull/5577
+[#5639]: https://github.com/tokio-rs/tokio/pull/5639
+
+# 0.1.12 (January 20, 2023)
 
 - time: remove `Unpin` bound on `Throttle` methods ([#5105])
 - time: document that `throttle` operates on ms granularity ([#5101])
+- sync: add `WatchStream::from_changes` ([#5432])
 
 [#5105]: https://github.com/tokio-rs/tokio/pull/5105
 [#5101]: https://github.com/tokio-rs/tokio/pull/5101
+[#5432]: https://github.com/tokio-rs/tokio/pull/5432
 
 # 0.1.11 (October 11, 2022)
 
diff --git a/Cargo.toml b/Cargo.toml
index 5a3542e..897ba84 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,10 +10,10 @@
 # See Cargo.toml.orig for the original contents.
 
 [package]
-edition = "2018"
-rust-version = "1.49"
+edition = "2021"
+rust-version = "1.56"
 name = "tokio-stream"
-version = "0.1.12"
+version = "0.1.14"
 authors = ["Tokio Contributors <[email protected]>"]
 description = """
 Utilities to work with `Stream` and `tokio`.
@@ -22,14 +22,15 @@
 categories = ["asynchronous"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
+resolver = "1"
 
 [package.metadata.docs.rs]
 all-features = true
-rustdoc-args = [
+rustc-args = [
     "--cfg",
     "docsrs",
 ]
-rustc-args = [
+rustdoc-args = [
     "--cfg",
     "docsrs",
 ]
@@ -41,7 +42,7 @@
 version = "0.2.0"
 
 [dependencies.tokio]
-version = "1.8.0"
+version = "1.15.0"
 features = ["sync"]
 
 [dependencies.tokio-util]
@@ -68,6 +69,14 @@
 [features]
 default = ["time"]
 fs = ["tokio/fs"]
+full = [
+    "time",
+    "net",
+    "io-util",
+    "fs",
+    "sync",
+    "signal",
+]
 io-util = ["tokio/io-util"]
 net = ["tokio/net"]
 signal = ["tokio/signal"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index f87b59a..9a90cd3 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -4,9 +4,9 @@
 # - Remove path dependencies
 # - Update CHANGELOG.md.
 # - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.12"
-edition = "2018"
-rust-version = "1.49"
+version = "0.1.14"
+edition = "2021"
+rust-version = "1.56"
 authors = ["Tokio Contributors <[email protected]>"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
@@ -18,6 +18,16 @@
 
 [features]
 default = ["time"]
+
+full = [
+    "time",
+    "net",
+    "io-util",
+    "fs",
+    "sync",
+    "signal"
+]
+
 time = ["tokio/time"]
 net = ["tokio/net"]
 io-util = ["tokio/io-util"]
@@ -28,7 +38,7 @@
 [dependencies]
 futures-core = { version = "0.3.0" }
 pin-project-lite = "0.2.0"
-tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] }
+tokio = { version = "1.15.0", path = "../tokio", features = ["sync"] }
 tokio-util = { version = "0.7.0", path = "../tokio-util", optional = true }
 
 [dev-dependencies]
diff --git a/METADATA b/METADATA
index 390ea84..5a3e237 100644
--- a/METADATA
+++ b/METADATA
@@ -1,23 +1,20 @@
 # This project was upgraded with external_updater.
-# Usage: tools/external_updater/updater.sh update rust/crates/tokio-stream
-# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+# Usage: tools/external_updater/updater.sh update external/rust/crates/tokio-stream
+# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md
 
 name: "tokio-stream"
 description: "Utilities to work with `Stream` and `tokio`."
 third_party {
-  url {
-    type: HOMEPAGE
-    value: "https://crates.io/crates/tokio-stream"
-  }
-  url {
-    type: ARCHIVE
-    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.12.crate"
-  }
-  version: "0.1.12"
   license_type: NOTICE
   last_upgrade_date {
-    year: 2023
-    month: 3
-    day: 30
+    year: 2024
+    month: 2
+    day: 5
+  }
+  homepage: "https://crates.io/crates/tokio-stream"
+  identifier {
+    type: "Archive"
+    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.14.crate"
+    version: "0.1.14"
   }
 }
diff --git a/src/lib.rs b/src/lib.rs
index bbd4cef..351c77e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -96,5 +96,8 @@
 mod stream_map;
 pub use stream_map::StreamMap;
 
+mod stream_close;
+pub use stream_close::StreamNotifyClose;
+
 #[doc(no_inline)]
 pub use futures_core::Stream;
diff --git a/src/stream_close.rs b/src/stream_close.rs
new file mode 100644
index 0000000..735acf0
--- /dev/null
+++ b/src/stream_close.rs
@@ -0,0 +1,93 @@
+use crate::Stream;
+use pin_project_lite::pin_project;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+    /// A `Stream` that wraps the values in an `Option`.
+    ///
+    /// Whenever the wrapped stream yields an item, this stream yields that item
+    /// wrapped in `Some`. When the inner stream ends, then this stream first
+    /// yields a `None` item, and then this stream will also end.
+    ///
+    /// # Example
+    ///
+    /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
+    ///
+    /// ```
+    /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
+    ///
+    /// #[tokio::main]
+    /// async fn main() {
+    ///     let mut map = StreamMap::new();
+    ///     let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+    ///     let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+    ///     map.insert(0, stream);
+    ///     map.insert(1, stream2);
+    ///     while let Some((key, val)) = map.next().await {
+    ///         match val {
+    ///             Some(val) => println!("got {val:?} from stream {key:?}"),
+    ///             None => println!("stream {key:?} closed"),
+    ///         }
+    ///     }
+    /// }
+    /// ```
+    #[must_use = "streams do nothing unless polled"]
+    pub struct StreamNotifyClose<S> {
+        #[pin]
+        inner: Option<S>,
+    }
+}
+
+impl<S> StreamNotifyClose<S> {
+    /// Create a new `StreamNotifyClose`.
+    pub fn new(stream: S) -> Self {
+        Self {
+            inner: Some(stream),
+        }
+    }
+
+    /// Get back the inner `Stream`.
+    ///
+    /// Returns `None` if the stream has reached its end.
+    pub fn into_inner(self) -> Option<S> {
+        self.inner
+    }
+}
+
+impl<S> Stream for StreamNotifyClose<S>
+where
+    S: Stream,
+{
+    type Item = Option<S::Item>;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        // We can't invoke poll_next after it ended, so we unset the inner stream as a marker.
+        match self
+            .as_mut()
+            .project()
+            .inner
+            .as_pin_mut()
+            .map(|stream| S::poll_next(stream, cx))
+        {
+            Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))),
+            Some(Poll::Ready(None)) => {
+                self.project().inner.set(None);
+                Poll::Ready(Some(None))
+            }
+            Some(Poll::Pending) => Poll::Pending,
+            None => Poll::Ready(None),
+        }
+    }
+
+    #[inline]
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        if let Some(inner) = &self.inner {
+            // We always return +1 because when there's stream there's atleast one more item.
+            let (l, u) = inner.size_hint();
+            (l.saturating_add(1), u.and_then(|u| u.checked_add(1)))
+        } else {
+            (0, Some(0))
+        }
+    }
+}
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 52d3202..a4ab8a0 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -57,8 +57,10 @@
 
 cfg_time! {
     pub(crate) mod timeout;
+    pub(crate) mod timeout_repeating;
     use timeout::Timeout;
-    use tokio::time::Duration;
+    use timeout_repeating::TimeoutRepeating;
+    use tokio::time::{Duration, Interval};
     mod throttle;
     use throttle::{throttle, Throttle};
     mod chunks_timeout;
@@ -924,7 +926,9 @@
     /// If the wrapped stream yields a value before the deadline is reached, the
     /// value is returned. Otherwise, an error is returned. The caller may decide
     /// to continue consuming the stream and will eventually get the next source
-    /// stream value once it becomes available.
+    /// stream value once it becomes available. See
+    /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
+    /// where the timeouts will repeat.
     ///
     /// # Notes
     ///
@@ -971,6 +975,25 @@
     /// assert_eq!(int_stream.try_next().await, Ok(None));
     /// # }
     /// ```
+    ///
+    /// Once a timeout error is received, no further events will be received
+    /// unless the wrapped stream yields a value (timeouts do not repeat).
+    ///
+    /// ```
+    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+    /// # async fn main() {
+    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
+    /// use std::time::Duration;
+    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
+    /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
+    /// tokio::pin!(timeout_stream);
+    ///
+    /// // Only one timeout will be received between values in the source stream.
+    /// assert!(timeout_stream.try_next().await.is_ok());
+    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
+    /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
+    /// # }
+    /// ```
     #[cfg(all(feature = "time"))]
     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
     fn timeout(self, duration: Duration) -> Timeout<Self>
@@ -980,6 +1003,95 @@
         Timeout::new(self, duration)
     }
 
+    /// Applies a per-item timeout to the passed stream.
+    ///
+    /// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that
+    /// controls the time each element of the stream has to complete before
+    /// timing out.
+    ///
+    /// If the wrapped stream yields a value before the deadline is reached, the
+    /// value is returned. Otherwise, an error is returned. The caller may decide
+    /// to continue consuming the stream and will eventually get the next source
+    /// stream value once it becomes available. Unlike `timeout()`, if no value
+    /// becomes available before the deadline is reached, additional errors are
+    /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
+    /// for an alternative where the timeouts do not repeat.
+    ///
+    /// # Notes
+    ///
+    /// This function consumes the stream passed into it and returns a
+    /// wrapped version of it.
+    ///
+    /// Polling the returned stream will continue to poll the inner stream even
+    /// if one or more items time out.
+    ///
+    /// # Examples
+    ///
+    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    /// use std::time::Duration;
+    /// # let int_stream = stream::iter(1..=3);
+    ///
+    /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
+    /// tokio::pin!(int_stream);
+    ///
+    /// // When no items time out, we get the 3 elements in succession:
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+    /// assert_eq!(int_stream.try_next().await, Ok(None));
+    ///
+    /// // If the second item times out, we get an error and continue polling the stream:
+    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+    /// assert!(int_stream.try_next().await.is_err());
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+    /// assert_eq!(int_stream.try_next().await, Ok(None));
+    ///
+    /// // If we want to stop consuming the source stream the first time an
+    /// // element times out, we can use the `take_while` operator:
+    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+    /// let mut int_stream = int_stream.take_while(Result::is_ok);
+    ///
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+    /// assert_eq!(int_stream.try_next().await, Ok(None));
+    /// # }
+    /// ```
+    ///
+    /// Timeout errors will be continuously produced at the specified interval
+    /// until the wrapped stream yields a value.
+    ///
+    /// ```
+    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+    /// # async fn main() {
+    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
+    /// use std::time::Duration;
+    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
+    /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
+    /// tokio::pin!(timeout_stream);
+    ///
+    /// // Multiple timeouts will be received between values in the source stream.
+    /// assert!(timeout_stream.try_next().await.is_ok());
+    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
+    /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
+    /// // Will eventually receive another value from the source stream...
+    /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
+    /// # }
+    /// ```
+    #[cfg(all(feature = "time"))]
+    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+    fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
+    where
+        Self: Sized,
+    {
+        TimeoutRepeating::new(self, interval)
+    }
+
     /// Slows down a stream by enforcing a delay between items.
     ///
     /// The underlying timer behind this utility has a granularity of one millisecond.
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
index a440d20..17d1349 100644
--- a/src/stream_ext/timeout.rs
+++ b/src/stream_ext/timeout.rs
@@ -23,7 +23,7 @@
     }
 }
 
-/// Error returned by `Timeout`.
+/// Error returned by `Timeout` and `TimeoutRepeating`.
 #[derive(Debug, PartialEq, Eq)]
 pub struct Elapsed(());
 
diff --git a/src/stream_ext/timeout_repeating.rs b/src/stream_ext/timeout_repeating.rs
new file mode 100644
index 0000000..253d2fd
--- /dev/null
+++ b/src/stream_ext/timeout_repeating.rs
@@ -0,0 +1,56 @@
+use crate::stream_ext::Fuse;
+use crate::{Elapsed, Stream};
+use tokio::time::Interval;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method.
+    #[must_use = "streams do nothing unless polled"]
+    #[derive(Debug)]
+    pub struct TimeoutRepeating<S> {
+        #[pin]
+        stream: Fuse<S>,
+        #[pin]
+        interval: Interval,
+    }
+}
+
+impl<S: Stream> TimeoutRepeating<S> {
+    pub(super) fn new(stream: S, interval: Interval) -> Self {
+        TimeoutRepeating {
+            stream: Fuse::new(stream),
+            interval,
+        }
+    }
+}
+
+impl<S: Stream> Stream for TimeoutRepeating<S> {
+    type Item = Result<S::Item, Elapsed>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut me = self.project();
+
+        match me.stream.poll_next(cx) {
+            Poll::Ready(v) => {
+                if v.is_some() {
+                    me.interval.reset();
+                }
+                return Poll::Ready(v.map(Ok));
+            }
+            Poll::Pending => {}
+        };
+
+        ready!(me.interval.poll_tick(cx));
+        Poll::Ready(Some(Err(Elapsed::new())))
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let (lower, _) = self.stream.size_hint();
+
+        // The timeout stream may insert an error an infinite number of times.
+        (lower, None)
+    }
+}
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 2159804..0c11bf1 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -42,10 +42,18 @@
 /// to be merged, it may be advisable to use tasks sending values on a shared
 /// [`mpsc`] channel.
 ///
+/// # Notes
+///
+/// `StreamMap` removes finished streams automatically, without alerting the user.
+/// In some scenarios, the caller would want to know on closed streams.
+/// To do this, use [`StreamNotifyClose`] as a wrapper to your stream.
+/// It will return None when the stream is closed.
+///
 /// [`StreamExt::merge`]: crate::StreamExt::merge
 /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
 /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
 /// [`Box::pin`]: std::boxed::Box::pin
+/// [`StreamNotifyClose`]: crate::StreamNotifyClose
 ///
 /// # Examples
 ///
@@ -170,6 +178,28 @@
 ///     }
 /// }
 /// ```
+///
+/// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
+///
+/// ```
+/// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
+///
+/// #[tokio::main]
+/// async fn main() {
+///     let mut map = StreamMap::new();
+///     let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+///     let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+///     map.insert(0, stream);
+///     map.insert(1, stream2);
+///     while let Some((key, val)) = map.next().await {
+///         match val {
+///             Some(val) => println!("got {val:?} from stream {key:?}"),
+///             None => println!("stream {key:?} closed"),
+///         }
+///     }
+/// }
+/// ```
+
 #[derive(Debug)]
 pub struct StreamMap<K, V> {
     /// Streams stored in the map
@@ -568,7 +598,7 @@
     }
 }
 
-impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V>
+impl<K, V> FromIterator<(K, V)> for StreamMap<K, V>
 where
     K: Hash + Eq,
 {
diff --git a/tests/stream_close.rs b/tests/stream_close.rs
new file mode 100644
index 0000000..9ddb565
--- /dev/null
+++ b/tests/stream_close.rs
@@ -0,0 +1,11 @@
+use tokio_stream::{StreamExt, StreamNotifyClose};
+
+#[tokio::test]
+async fn basic_usage() {
+    let mut stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
+
+    assert_eq!(stream.next().await, Some(Some(0)));
+    assert_eq!(stream.next().await, Some(Some(1)));
+    assert_eq!(stream.next().await, Some(None));
+    assert_eq!(stream.next().await, None);
+}