Upgrade rust/crates/futures-channel to 0.3.13 am: 7980c2c7f6 am: 55a47278c9 am: c3055f4268
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-channel/+/1662920
Change-Id: I79dac153ca76e3c5d5dabe79f80eade45669a4b0
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 4fd4ba3..f3ad3ab 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "1d53a29ec16ccd5b094fb205edb73591455eb4b6"
+ "sha1": "c91f8691672c7401b1923ab00bf138975c99391a"
}
}
diff --git a/Android.bp b/Android.bp
index 355f9d8..eb938c1 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,5 @@
// This file is generated by cargo2android.py --run --dependencies --device --patch=patches/Android.bp.patch.
+// Do not modify this file as changes will be overridden on upgrade.
package {
default_applicable_licenses: [
@@ -61,4 +62,4 @@
}
// dependent_library ["feature_list"]
-// futures-core-0.3.12 "alloc,std"
+// futures-core-0.3.13 "alloc,std"
diff --git a/Cargo.toml b/Cargo.toml
index 494deb5..30ee771 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
[package]
edition = "2018"
name = "futures-channel"
-version = "0.3.12"
+version = "0.3.13"
authors = ["Alex Crichton <[email protected]>"]
description = "Channels for asynchronous communication using futures-rs.\n"
homepage = "https://rust-lang.github.io/futures-rs"
@@ -24,11 +24,11 @@
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies.futures-core]
-version = "0.3.12"
+version = "0.3.13"
default-features = false
[dependencies.futures-sink]
-version = "0.3.12"
+version = "0.3.13"
optional = true
default-features = false
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 5b4c639..9a33320 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,7 +1,7 @@
[package]
name = "futures-channel"
edition = "2018"
-version = "0.3.12"
+version = "0.3.13"
authors = ["Alex Crichton <[email protected]>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
@@ -24,8 +24,8 @@
cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.12", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.12", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.13", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true }
[dev-dependencies]
futures = { path = "../futures", default-features = true }
diff --git a/METADATA b/METADATA
index 7481020..b53d840 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.12.crate"
+ value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.13.crate"
}
- version: "0.3.12"
+ version: "0.3.13"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 2
- day: 9
+ month: 4
+ day: 1
}
}
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 7e10dd0..6798806 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -2,7 +2,55 @@
{
"presubmit": [
{
+ "name": "anyhow_device_test_tests_test_boxed"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_convert"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_ffi"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_repr"
+ },
+ {
+ "name": "tokio-test_device_test_tests_block_on"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_chain"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_source"
+ },
+ {
+ "name": "tokio-test_device_test_tests_io"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_autotrait"
+ },
+ {
+ "name": "anyhow_device_test_src_lib"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_context"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_downcast"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_macros"
+ },
+ {
"name": "futures-util_device_test_src_lib"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_fmt"
+ },
+ {
+ "name": "tokio-test_device_test_tests_macros"
+ },
+ {
+ "name": "tokio-test_device_test_src_lib"
}
]
}
diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs
index 494c97b..dd50343 100644
--- a/src/mpsc/mod.rs
+++ b/src/mpsc/mod.rs
@@ -86,6 +86,7 @@
use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
+use std::thread;
use crate::mpsc::queue::Queue;
@@ -1047,7 +1048,12 @@
}
None => {
let state = decode_state(inner.state.load(SeqCst));
- if state.is_open || state.num_messages != 0 {
+ if state.is_closed() {
+ // If closed flag is set AND there are no pending messages
+ // it means end of stream
+ self.inner = None;
+ Poll::Ready(None)
+ } else {
// If queue is open, we need to return Pending
// to be woken up when new messages arrive.
// If queue is closed but num_messages is non-zero,
@@ -1056,11 +1062,6 @@
// so we need to park until sender unparks the task
// after queueing the message.
Poll::Pending
- } else {
- // If closed flag is set AND there are no pending messages
- // it means end of stream
- self.inner = None;
- Poll::Ready(None)
}
}
}
@@ -1126,8 +1127,26 @@
// Drain the channel of all pending messages
self.close();
if self.inner.is_some() {
- while let Poll::Ready(Some(..)) = self.next_message() {
- // ...
+ loop {
+ match self.next_message() {
+ Poll::Ready(Some(_)) => {}
+ Poll::Ready(None) => break,
+ Poll::Pending => {
+ let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
+
+ // If the channel is closed, then there is no need to park.
+ if state.is_closed() {
+ break;
+ }
+
+ // TODO: Spinning isn't ideal, it might be worth
+ // investigating using a condvar or some other strategy
+ // here. That said, if this case is hit, then another thread
+ // is about to push the value into the queue and this isn't
+ // the only spinlock in the impl right now.
+ thread::yield_now();
+ }
+ }
}
}
}
@@ -1173,7 +1192,12 @@
}
None => {
let state = decode_state(inner.state.load(SeqCst));
- if state.is_open || state.num_messages != 0 {
+ if state.is_closed() {
+ // If closed flag is set AND there are no pending messages
+ // it means end of stream
+ self.inner = None;
+ Poll::Ready(None)
+ } else {
// If queue is open, we need to return Pending
// to be woken up when new messages arrive.
// If queue is closed but num_messages is non-zero,
@@ -1182,11 +1206,6 @@
// so we need to park until sender unparks the task
// after queueing the message.
Poll::Pending
- } else {
- // If closed flag is set AND there are no pending messages
- // it means end of stream
- self.inner = None;
- Poll::Ready(None)
}
}
}
@@ -1240,8 +1259,26 @@
// Drain the channel of all pending messages
self.close();
if self.inner.is_some() {
- while let Poll::Ready(Some(..)) = self.next_message() {
- // ...
+ loop {
+ match self.next_message() {
+ Poll::Ready(Some(_)) => {}
+ Poll::Ready(None) => break,
+ Poll::Pending => {
+ let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
+
+ // If the channel is closed, then there is no need to park.
+ if state.is_closed() {
+ break;
+ }
+
+ // TODO: Spinning isn't ideal, it might be worth
+ // investigating using a condvar or some other strategy
+ // here. That said, if this case is hit, then another thread
+ // is about to push the value into the queue and this isn't
+ // the only spinlock in the impl right now.
+ thread::yield_now();
+ }
+ }
}
}
}
@@ -1289,6 +1326,12 @@
unsafe impl<T: Send> Send for BoundedInner<T> {}
unsafe impl<T: Send> Sync for BoundedInner<T> {}
+impl State {
+ fn is_closed(&self) -> bool {
+ !self.is_open && self.num_messages == 0
+ }
+}
+
/*
*
* ===== Helpers =====
diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs
index 50852eb..9eb5296 100644
--- a/tests/mpsc-close.rs
+++ b/tests/mpsc-close.rs
@@ -1,9 +1,13 @@
use futures::channel::mpsc;
use futures::executor::block_on;
+use futures::future::Future;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
-use std::sync::Arc;
+use futures::task::{Context, Poll};
+use std::pin::Pin;
+use std::sync::{Arc, Weak};
use std::thread;
+use std::time::{Duration, Instant};
#[test]
fn smoke() {
@@ -142,3 +146,133 @@
assert!(sender.is_closed());
}
}
+
+// Stress test that `try_send()`s occurring concurrently with receiver
+// close/drops don't appear as successful sends.
+#[test]
+fn stress_try_send_as_receiver_closes() {
+ const AMT: usize = 10000;
+ // To provide variable timing characteristics (in the hopes of
+ // reproducing the collision that leads to a race), we busy-re-poll
+ // the test MPSC receiver a variable number of times before actually
+ // stopping. We vary this countdown between 1 and the following
+ // value.
+ const MAX_COUNTDOWN: usize = 20;
+ // When we detect that a successfully sent item is still in the
+ // queue after a disconnect, we spin for up to 100ms to confirm that
+ // it is a persistent condition and not a concurrency illusion.
+ const SPIN_TIMEOUT_S: u64 = 10;
+ const SPIN_SLEEP_MS: u64 = 10;
+ struct TestRx {
+ rx: mpsc::Receiver<Arc<()>>,
+ // The number of times to query `rx` before dropping it.
+ poll_count: usize
+ }
+ struct TestTask {
+ command_rx: mpsc::Receiver<TestRx>,
+ test_rx: Option<mpsc::Receiver<Arc<()>>>,
+ countdown: usize,
+ }
+ impl TestTask {
+ /// Create a new TestTask
+ fn new() -> (TestTask, mpsc::Sender<TestRx>) {
+ let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
+ (
+ TestTask {
+ command_rx,
+ test_rx: None,
+ countdown: 0, // 0 means no countdown is in progress.
+ },
+ command_tx,
+ )
+ }
+ }
+ impl Future for TestTask {
+ type Output = ();
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ // Poll the test channel, if one is present.
+ if let Some(rx) = &mut self.test_rx {
+ if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
+ let _ = v.expect("test finished unexpectedly!");
+ }
+ self.countdown -= 1;
+ // Busy-poll until the countdown is finished.
+ cx.waker().wake_by_ref();
+ }
+ // Accept any newly submitted MPSC channels for testing.
+ match self.command_rx.poll_next_unpin(cx) {
+ Poll::Ready(Some(TestRx { rx, poll_count })) => {
+ self.test_rx = Some(rx);
+ self.countdown = poll_count;
+ cx.waker().wake_by_ref();
+ },
+ Poll::Ready(None) => return Poll::Ready(()),
+ Poll::Pending => {},
+ }
+ if self.countdown == 0 {
+ // Countdown complete -- drop the Receiver.
+ self.test_rx = None;
+ }
+ Poll::Pending
+ }
+ }
+ let (f, mut cmd_tx) = TestTask::new();
+ let bg = thread::spawn(move || block_on(f));
+ for i in 0..AMT {
+ let (mut test_tx, rx) = mpsc::channel(0);
+ let poll_count = i % MAX_COUNTDOWN;
+ cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
+ let mut prev_weak: Option<Weak<()>> = None;
+ let mut attempted_sends = 0;
+ let mut successful_sends = 0;
+ loop {
+ // Create a test item.
+ let item = Arc::new(());
+ let weak = Arc::downgrade(&item);
+ match test_tx.try_send(item) {
+ Ok(_) => {
+ prev_weak = Some(weak);
+ successful_sends += 1;
+ }
+ Err(ref e) if e.is_full() => {}
+ Err(ref e) if e.is_disconnected() => {
+ // Test for evidence of the race condition.
+ if let Some(prev_weak) = prev_weak {
+ if prev_weak.upgrade().is_some() {
+ // The previously sent item is still allocated.
+ // However, there appears to be some aspect of the
+ // concurrency that can legitimately cause the Arc
+ // to be momentarily valid. Spin for up to 100ms
+ // waiting for the previously sent item to be
+ // dropped.
+ let t0 = Instant::now();
+ let mut spins = 0;
+ loop {
+ if prev_weak.upgrade().is_none() {
+ break;
+ }
+ assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
+ "item not dropped on iteration {} after \
+ {} sends ({} successful). spin=({})",
+ i, attempted_sends, successful_sends, spins
+ );
+ spins += 1;
+ thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
+ }
+ }
+ }
+ break;
+ }
+ Err(ref e) => panic!("unexpected error: {}", e),
+ }
+ attempted_sends += 1;
+ }
+ }
+ drop(cmd_tx);
+ bg.join()
+ .expect("background thread join");
+}