Snap for 8564071 from 3ccd22c1882fcff9f5442478123ad654ce0b9105 to mainline-permission-release

Change-Id: I5107db5eef080aada461304a4f442b9e4ad725db
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index f3ad3ab..deccf0d 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
 {
   "git": {
-    "sha1": "c91f8691672c7401b1923ab00bf138975c99391a"
-  }
-}
+    "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+  },
+  "path_in_vcs": "futures-channel"
+}
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 61e78bc..c7bc955 100644
--- a/Android.bp
+++ b/Android.bp
@@ -45,6 +45,8 @@
     name: "libfutures_channel",
     host_supported: true,
     crate_name: "futures_channel",
+    cargo_env_compat: true,
+    cargo_pkg_version: "0.3.21",
     srcs: ["src/lib.rs"],
     edition: "2018",
     features: [
@@ -57,11 +59,9 @@
     ],
     apex_available: [
         "//apex_available:platform",
+        "com.android.bluetooth",
         "com.android.resolv",
         "com.android.virt",
     ],
     min_sdk_version: "29",
 }
-
-// dependent_library ["feature_list"]
-//   futures-core-0.3.14 "alloc,std"
diff --git a/Cargo.toml b/Cargo.toml
index 30ee771..d0a13f6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,32 +3,37 @@
 # When uploading crates to the registry Cargo will automatically
 # "normalize" Cargo.toml files for maximal compatibility
 # with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
 #
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
 
 [package]
 edition = "2018"
+rust-version = "1.45"
 name = "futures-channel"
-version = "0.3.13"
-authors = ["Alex Crichton <[email protected]>"]
-description = "Channels for asynchronous communication using futures-rs.\n"
+version = "0.3.21"
+description = """
+Channels for asynchronous communication using futures-rs.
+"""
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-channel/0.3"
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
+
 [package.metadata.docs.rs]
 all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+    "--cfg",
+    "docsrs",
+]
+
 [dependencies.futures-core]
-version = "0.3.13"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-sink]
-version = "0.3.13"
+version = "0.3.21"
 optional = true
 default-features = false
 
@@ -36,8 +41,11 @@
 
 [features]
 alloc = ["futures-core/alloc"]
-cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
+cfg-target-has-atomic = []
 default = ["std"]
 sink = ["futures-sink"]
-std = ["alloc", "futures-core/std"]
-unstable = ["futures-core/unstable"]
+std = [
+    "alloc",
+    "futures-core/std",
+]
+unstable = []
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 9a33320..f356eab 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,12 +1,11 @@
 [package]
 name = "futures-channel"
+version = "0.3.21"
 edition = "2018"
-version = "0.3.13"
-authors = ["Alex Crichton <[email protected]>"]
+rust-version = "1.45"
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-channel/0.3"
 description = """
 Channels for asynchronous communication using futures-rs.
 """
@@ -17,15 +16,14 @@
 alloc = ["futures-core/alloc"]
 sink = ["futures-sink"]
 
-# Unstable features
-# These features are outside of the normal semver guarantees and require the
-# `unstable` feature as an explicit opt-in to unstable API.
-unstable = ["futures-core/unstable"]
-cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
+# These features are no longer used.
+# TODO: remove in the next major version.
+unstable = []
+cfg-target-has-atomic = []
 
 [dependencies]
-futures-core = { path = "../futures-core", version = "0.3.13", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true }
 
 [dev-dependencies]
 futures = { path = "../futures", default-features = true }
diff --git a/METADATA b/METADATA
index b53d840..29bf06e 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.13.crate"
+    value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.21.crate"
   }
-  version: "0.3.13"
+  version: "0.3.21"
   license_type: NOTICE
   last_upgrade_date {
-    year: 2021
-    month: 4
+    year: 2022
+    month: 3
     day: 1
   }
 }
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..3287be9
--- /dev/null
+++ b/README.md
@@ -0,0 +1,23 @@
+# futures-channel
+
+Channels for asynchronous communication using futures-rs.
+
+## Usage
+
+Add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+futures-channel = "0.3"
+```
+
+The current `futures-channel` requires Rust 1.45 or later.
+
+## License
+
+Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
+[MIT license](LICENSE-MIT) at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall
+be dual licensed as above, without any additional terms or conditions.
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 6798806..5ef61de 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -1,56 +1,45 @@
 // Generated by update_crate_tests.py for tests that depend on this crate.
 {
+  "imports": [
+    {
+      "path": "external/rust/crates/anyhow"
+    },
+    {
+      "path": "external/rust/crates/futures-util"
+    },
+    {
+      "path": "external/rust/crates/tokio"
+    },
+    {
+      "path": "external/rust/crates/tokio-test"
+    }
+  ],
   "presubmit": [
     {
-      "name": "anyhow_device_test_tests_test_boxed"
+      "name": "ZipFuseTest"
     },
     {
-      "name": "anyhow_device_test_tests_test_convert"
+      "name": "authfs_device_test_src_lib"
     },
     {
-      "name": "anyhow_device_test_tests_test_ffi"
+      "name": "doh_unit_test"
     },
     {
-      "name": "anyhow_device_test_tests_test_repr"
+      "name": "virtualizationservice_device_test"
+    }
+  ],
+  "presubmit-rust": [
+    {
+      "name": "ZipFuseTest"
     },
     {
-      "name": "tokio-test_device_test_tests_block_on"
+      "name": "authfs_device_test_src_lib"
     },
     {
-      "name": "anyhow_device_test_tests_test_chain"
+      "name": "doh_unit_test"
     },
     {
-      "name": "anyhow_device_test_tests_test_source"
-    },
-    {
-      "name": "tokio-test_device_test_tests_io"
-    },
-    {
-      "name": "anyhow_device_test_tests_test_autotrait"
-    },
-    {
-      "name": "anyhow_device_test_src_lib"
-    },
-    {
-      "name": "anyhow_device_test_tests_test_context"
-    },
-    {
-      "name": "anyhow_device_test_tests_test_downcast"
-    },
-    {
-      "name": "anyhow_device_test_tests_test_macros"
-    },
-    {
-      "name": "futures-util_device_test_src_lib"
-    },
-    {
-      "name": "anyhow_device_test_tests_test_fmt"
-    },
-    {
-      "name": "tokio-test_device_test_tests_macros"
-    },
-    {
-      "name": "tokio-test_device_test_src_lib"
+      "name": "virtualizationservice_device_test"
     }
   ]
 }
diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs
index e22fe60..7c3c3d3 100644
--- a/benches/sync_mpsc.rs
+++ b/benches/sync_mpsc.rs
@@ -7,8 +7,8 @@
     futures::{
         channel::mpsc::{self, Sender, UnboundedSender},
         ready,
-        stream::{Stream, StreamExt},
         sink::Sink,
+        stream::{Stream, StreamExt},
         task::{Context, Poll},
     },
     futures_test::task::noop_context,
@@ -25,7 +25,6 @@
         // 1000 iterations to avoid measuring overhead of initialization
         // Result should be divided by 1000
         for i in 0..1000 {
-
             // Poll, not ready, park
             assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
 
@@ -73,7 +72,6 @@
     })
 }
 
-
 /// A Stream that continuously sends incrementing number of the queue
 struct TestSender {
     tx: Sender<u32>,
@@ -84,9 +82,7 @@
 impl Stream for TestSender {
     type Item = u32;
 
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-        -> Poll<Option<Self::Item>>
-    {
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         let this = &mut *self;
         let mut tx = Pin::new(&mut this.tx);
 
@@ -123,12 +119,7 @@
         // Each sender can send one item after specified capacity
         let (tx, mut rx) = mpsc::channel(0);
 
-        let mut tx: Vec<_> = (0..100).map(|_| {
-            TestSender {
-                tx: tx.clone(),
-                last: 0
-            }
-        }).collect();
+        let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();
 
         for i in 0..10 {
             for x in &mut tx {
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..05e0496
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,41 @@
+// The rustc-cfg listed below are considered public API, but it is *unstable*
+// and outside of the normal semver guarantees:
+//
+// - `futures_no_atomic_cas`
+//      Assume the target does *not* support atomic CAS operations.
+//      This is usually detected automatically by the build script, but you may
+//      need to enable it manually when building for custom targets or using
+//      non-cargo build systems that don't run the build script.
+//
+// With the exceptions mentioned above, the rustc-cfg emitted by the build
+// script are *not* public API.
+
+#![warn(rust_2018_idioms, single_use_lifetimes)]
+
+use std::env;
+
+include!("no_atomic_cas.rs");
+
+fn main() {
+    let target = match env::var("TARGET") {
+        Ok(target) => target,
+        Err(e) => {
+            println!(
+                "cargo:warning={}: unable to get TARGET environment variable: {}",
+                env!("CARGO_PKG_NAME"),
+                e
+            );
+            return;
+        }
+    };
+
+    // Note that this is `no_*`, not `has_*`. This allows treating
+    // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't
+    // run. This is needed for compatibility with non-cargo build systems that
+    // don't run the build script.
+    if NO_ATOMIC_CAS.contains(&&*target) {
+        println!("cargo:rustc-cfg=futures_no_atomic_cas");
+    }
+
+    println!("cargo:rerun-if-changed=no_atomic_cas.rs");
+}
diff --git a/cargo2android.json b/cargo2android.json
index 01465d0..a7e2a4b 100644
--- a/cargo2android.json
+++ b/cargo2android.json
@@ -1,11 +1,12 @@
 {
   "apex-available": [
     "//apex_available:platform",
+    "com.android.bluetooth",
     "com.android.resolv",
     "com.android.virt"
   ],
-  "min_sdk_version": "29",
   "dependencies": true,
   "device": true,
+  "min-sdk-version": "29",
   "run": true
-}
\ No newline at end of file
+}
diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs
new file mode 100644
index 0000000..9b05d4b
--- /dev/null
+++ b/no_atomic_cas.rs
@@ -0,0 +1,13 @@
+// This file is @generated by no_atomic_cas.sh.
+// It is not intended for manual editing.
+
+const NO_ATOMIC_CAS: &[&str] = &[
+    "avr-unknown-gnu-atmega328",
+    "bpfeb-unknown-none",
+    "bpfel-unknown-none",
+    "msp430-none-elf",
+    "riscv32i-unknown-none-elf",
+    "riscv32imc-unknown-none-elf",
+    "thumbv4t-none-eabi",
+    "thumbv6m-none-eabi",
+];
diff --git a/src/lib.rs b/src/lib.rs
index 22d90d8..4cd936d 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,34 +11,32 @@
 //! All items are only available when the `std` or `alloc` feature of this
 //! library is activated, and it is activated by default.
 
-#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
-
 #![cfg_attr(not(feature = "std"), no_std)]
+#![warn(
+    missing_debug_implementations,
+    missing_docs,
+    rust_2018_idioms,
+    single_use_lifetimes,
+    unreachable_pub
+)]
+#![doc(test(
+    no_crate_inject,
+    attr(
+        deny(warnings, rust_2018_idioms, single_use_lifetimes),
+        allow(dead_code, unused_assignments, unused_variables)
+    )
+))]
 
-#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
-// It cannot be included in the published code because this lints have false positives in the minimum required version.
-#![cfg_attr(test, warn(single_use_lifetimes))]
-#![warn(clippy::all)]
-#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+extern crate alloc;
 
-#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
-compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
-macro_rules! cfg_target_has_atomic {
-    ($($item:item)*) => {$(
-        #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
-        $item
-    )*};
-}
-
-cfg_target_has_atomic! {
-    #[cfg(feature = "alloc")]
-    extern crate alloc;
-
-    #[cfg(feature = "alloc")]
-    mod lock;
-    #[cfg(feature = "std")]
-    pub mod mpsc;
-    #[cfg(feature = "alloc")]
-    pub mod oneshot;
-}
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod lock;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "std")]
+pub mod mpsc;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub mod oneshot;
diff --git a/src/lock.rs b/src/lock.rs
index 5eecdd9..b328d0f 100644
--- a/src/lock.rs
+++ b/src/lock.rs
@@ -6,8 +6,8 @@
 
 use core::cell::UnsafeCell;
 use core::ops::{Deref, DerefMut};
-use core::sync::atomic::Ordering::SeqCst;
 use core::sync::atomic::AtomicBool;
+use core::sync::atomic::Ordering::SeqCst;
 
 /// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
 ///
@@ -37,10 +37,7 @@
 impl<T> Lock<T> {
     /// Creates a new lock around the given value.
     pub(crate) fn new(t: T) -> Self {
-        Self {
-            locked: AtomicBool::new(false),
-            data: UnsafeCell::new(t),
-        }
+        Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) }
     }
 
     /// Attempts to acquire this lock, returning whether the lock was acquired or
diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs
index dd50343..44834b7 100644
--- a/src/mpsc/mod.rs
+++ b/src/mpsc/mod.rs
@@ -79,13 +79,13 @@
 // by the queue structure.
 
 use futures_core::stream::{FusedStream, Stream};
-use futures_core::task::{Context, Poll, Waker};
 use futures_core::task::__internal::AtomicWaker;
+use futures_core::task::{Context, Poll, Waker};
 use std::fmt;
 use std::pin::Pin;
-use std::sync::{Arc, Mutex};
 use std::sync::atomic::AtomicUsize;
 use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Mutex};
 use std::thread;
 
 use crate::mpsc::queue::Queue;
@@ -209,9 +209,7 @@
 
 impl<T> fmt::Debug for TrySendError<T> {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("TrySendError")
-            .field("kind", &self.err.kind)
-            .finish()
+        f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
     }
 }
 
@@ -251,8 +249,7 @@
 
 impl fmt::Debug for TryRecvError {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_tuple("TryRecvError")
-            .finish()
+        f.debug_tuple("TryRecvError").finish()
     }
 }
 
@@ -335,10 +332,7 @@
 
 impl SenderTask {
     fn new() -> Self {
-        Self {
-            task: None,
-            is_parked: false,
-        }
+        Self { task: None, is_parked: false }
     }
 
     fn notify(&mut self) {
@@ -381,9 +375,7 @@
         maybe_parked: false,
     };
 
-    let rx = Receiver {
-        inner: Some(inner),
-    };
+    let rx = Receiver { inner: Some(inner) };
 
     (Sender(Some(tx)), rx)
 }
@@ -399,7 +391,6 @@
 /// the channel. Using an `unbounded` channel has the ability of causing the
 /// process to run out of memory. In this case, the process will be aborted.
 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
-
     let inner = Arc::new(UnboundedInner {
         state: AtomicUsize::new(INIT_STATE),
         message_queue: Queue::new(),
@@ -407,13 +398,9 @@
         recv_task: AtomicWaker::new(),
     });
 
-    let tx = UnboundedSenderInner {
-        inner: inner.clone(),
-    };
+    let tx = UnboundedSenderInner { inner: inner.clone() };
 
-    let rx = UnboundedReceiver {
-        inner: Some(inner),
-    };
+    let rx = UnboundedReceiver { inner: Some(inner) };
 
     (UnboundedSender(Some(tx)), rx)
 }
@@ -430,13 +417,10 @@
         if state.is_open {
             Poll::Ready(Ok(()))
         } else {
-            Poll::Ready(Err(SendError {
-                kind: SendErrorKind::Disconnected,
-            }))
+            Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
         }
     }
 
-
     // Push message to the queue and signal to the receiver
     fn queue_push_and_signal(&self, msg: T) {
         // Push the message onto the message queue
@@ -462,16 +446,17 @@
             // This probably is never hit? Odds are the process will run out of
             // memory first. It may be worth to return something else in this
             // case?
-            assert!(state.num_messages < MAX_CAPACITY, "buffer space \
-                    exhausted; sending this messages would overflow the state");
+            assert!(
+                state.num_messages < MAX_CAPACITY,
+                "buffer space \
+                    exhausted; sending this messages would overflow the state"
+            );
 
             state.num_messages += 1;
 
             let next = encode_state(&state);
             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
-                Ok(_) => {
-                    return Some(state.num_messages)
-                }
+                Ok(_) => return Some(state.num_messages),
                 Err(actual) => curr = actual,
             }
         }
@@ -516,12 +501,7 @@
     fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
         // If the sender is currently blocked, reject the message
         if !self.poll_unparked(None).is_ready() {
-            return Err(TrySendError {
-                err: SendError {
-                    kind: SendErrorKind::Full,
-                },
-                val: msg,
-            });
+            return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
         }
 
         // The channel has capacity to accept the message, so send it
@@ -530,11 +510,8 @@
 
     // Do the send without failing.
     // Can be called only by bounded sender.
-    #[allow(clippy::debug_assert_with_mut_call)]
-    fn do_send_b(&mut self, msg: T)
-        -> Result<(), TrySendError<T>>
-    {
-        // Anyone callig do_send *should* make sure there is room first,
+    fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
+        // Anyone calling do_send *should* make sure there is room first,
         // but assert here for tests as a sanity check.
         debug_assert!(self.poll_unparked(None).is_ready());
 
@@ -551,12 +528,12 @@
                 // the configured buffer size
                 num_messages > self.inner.buffer
             }
-            None => return Err(TrySendError {
-                err: SendError {
-                    kind: SendErrorKind::Disconnected,
-                },
-                val: msg,
-            }),
+            None => {
+                return Err(TrySendError {
+                    err: SendError { kind: SendErrorKind::Disconnected },
+                    val: msg,
+                })
+            }
         };
 
         // If the channel has reached capacity, then the sender task needs to
@@ -600,16 +577,17 @@
             // This probably is never hit? Odds are the process will run out of
             // memory first. It may be worth to return something else in this
             // case?
-            assert!(state.num_messages < MAX_CAPACITY, "buffer space \
-                    exhausted; sending this messages would overflow the state");
+            assert!(
+                state.num_messages < MAX_CAPACITY,
+                "buffer space \
+                    exhausted; sending this messages would overflow the state"
+            );
 
             state.num_messages += 1;
 
             let next = encode_state(&state);
             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
-                Ok(_) => {
-                    return Some(state.num_messages)
-                }
+                Ok(_) => return Some(state.num_messages),
                 Err(actual) => curr = actual,
             }
         }
@@ -644,15 +622,10 @@
     ///   capacity, in which case the current task is queued to be notified once
     ///   capacity is available;
     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
-    fn poll_ready(
-        &mut self,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<(), SendError>> {
+    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
         let state = decode_state(self.inner.state.load(SeqCst));
         if !state.is_open {
-            return Poll::Ready(Err(SendError {
-                kind: SendErrorKind::Disconnected,
-            }));
+            return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
         }
 
         self.poll_unparked(Some(cx)).map(Ok)
@@ -699,7 +672,7 @@
 
             if !task.is_parked {
                 self.maybe_parked = false;
-                return Poll::Ready(())
+                return Poll::Ready(());
             }
 
             // At this point, an unpark request is pending, so there will be an
@@ -724,12 +697,7 @@
         if let Some(inner) = &mut self.0 {
             inner.try_send(msg)
         } else {
-            Err(TrySendError {
-                err: SendError {
-                    kind: SendErrorKind::Disconnected,
-                },
-                val: msg,
-            })
+            Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
         }
     }
 
@@ -739,8 +707,7 @@
     /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
     /// ready to receive a message.
     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
-        self.try_send(msg)
-            .map_err(|e| e.err)
+        self.try_send(msg).map_err(|e| e.err)
     }
 
     /// Polls the channel to determine if there is guaranteed capacity to send
@@ -755,13 +722,8 @@
     ///   capacity, in which case the current task is queued to be notified once
     ///   capacity is available;
     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
-    pub fn poll_ready(
-        &mut self,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<(), SendError>> {
-        let inner = self.0.as_mut().ok_or(SendError {
-            kind: SendErrorKind::Disconnected,
-        })?;
+    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+        let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
         inner.poll_ready(cx)
     }
 
@@ -799,7 +761,10 @@
     }
 
     /// Hashes the receiver into the provided hasher
-    pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
+    pub fn hash_receiver<H>(&self, hasher: &mut H)
+    where
+        H: std::hash::Hasher,
+    {
         use std::hash::Hash;
 
         let ptr = self.0.as_ref().map(|inner| inner.ptr());
@@ -809,13 +774,8 @@
 
 impl<T> UnboundedSender<T> {
     /// Check if the channel is ready to receive a message.
-    pub fn poll_ready(
-        &self,
-        _: &mut Context<'_>,
-    ) -> Poll<Result<(), SendError>> {
-        let inner = self.0.as_ref().ok_or(SendError {
-            kind: SendErrorKind::Disconnected,
-        })?;
+    pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+        let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
         inner.poll_ready_nb()
     }
 
@@ -845,12 +805,7 @@
             }
         }
 
-        Err(TrySendError {
-            err: SendError {
-                kind: SendErrorKind::Disconnected,
-            },
-            val: msg,
-        })
+        Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
     }
 
     /// Send a message on the channel.
@@ -858,8 +813,7 @@
     /// This method should only be called after `poll_ready` has been used to
     /// verify that the channel is ready to receive a message.
     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
-        self.do_send_nb(msg)
-            .map_err(|e| e.err)
+        self.do_send_nb(msg).map_err(|e| e.err)
     }
 
     /// Sends a message along this channel.
@@ -888,7 +842,10 @@
     }
 
     /// Hashes the receiver into the provided hasher
-    pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
+    pub fn hash_receiver<H>(&self, hasher: &mut H)
+    where
+        H: std::hash::Hasher,
+    {
         use std::hash::Hash;
 
         let ptr = self.0.as_ref().map(|inner| inner.ptr());
@@ -928,9 +885,7 @@
                 Ok(_) => {
                     // The ABA problem doesn't matter here. We only care that the
                     // number of senders never exceeds the maximum.
-                    return Self {
-                        inner: self.inner.clone(),
-                    };
+                    return Self { inner: self.inner.clone() };
                 }
                 Err(actual) => curr = actual,
             }
@@ -1021,19 +976,22 @@
     /// only when you've otherwise arranged to be notified when the channel is
     /// no longer empty.
     ///
-    /// This function will panic if called after `try_next` or `poll_next` has
-    /// returned `None`.
+    /// This function returns:
+    /// * `Ok(Some(t))` when message is fetched
+    /// * `Ok(None)` when channel is closed and no messages left in the queue
+    /// * `Err(e)` when there are no messages available, but channel is not yet closed
     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
         match self.next_message() {
-            Poll::Ready(msg) => {
-                Ok(msg)
-            },
+            Poll::Ready(msg) => Ok(msg),
             Poll::Pending => Err(TryRecvError { _priv: () }),
         }
     }
 
     fn next_message(&mut self) -> Poll<Option<T>> {
-        let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
+        let inner = match self.inner.as_mut() {
+            None => return Poll::Ready(None),
+            Some(inner) => inner,
+        };
         // Pop off a message
         match unsafe { inner.message_queue.pop_spin() } {
             Some(msg) => {
@@ -1098,18 +1056,15 @@
 impl<T> Stream for Receiver<T> {
     type Item = T;
 
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<T>> {
-            // Try to read a message off of the message queue.
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+        // Try to read a message off of the message queue.
         match self.next_message() {
             Poll::Ready(msg) => {
                 if msg.is_none() {
                     self.inner = None;
                 }
                 Poll::Ready(msg)
-            },
+            }
             Poll::Pending => {
                 // There are no messages to read, in this case, park.
                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
@@ -1169,19 +1124,22 @@
     /// only when you've otherwise arranged to be notified when the channel is
     /// no longer empty.
     ///
-    /// This function will panic if called after `try_next` or `poll_next` has
-    /// returned `None`.
+    /// This function returns:
+    /// * `Ok(Some(t))` when message is fetched
+    /// * `Ok(None)` when channel is closed and no messages left in the queue
+    /// * `Err(e)` when there are no messages available, but channel is not yet closed
     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
         match self.next_message() {
-            Poll::Ready(msg) => {
-                Ok(msg)
-            },
+            Poll::Ready(msg) => Ok(msg),
             Poll::Pending => Err(TryRecvError { _priv: () }),
         }
     }
 
     fn next_message(&mut self) -> Poll<Option<T>> {
-        let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
+        let inner = match self.inner.as_mut() {
+            None => return Poll::Ready(None),
+            Some(inner) => inner,
+        };
         // Pop off a message
         match unsafe { inner.message_queue.pop_spin() } {
             Some(msg) => {
@@ -1230,10 +1188,7 @@
 impl<T> Stream for UnboundedReceiver<T> {
     type Item = T;
 
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<T>> {
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
         // Try to read a message off of the message queue.
         match self.next_message() {
             Poll::Ready(msg) => {
@@ -1241,7 +1196,7 @@
                     self.inner = None;
                 }
                 Poll::Ready(msg)
-            },
+            }
             Poll::Pending => {
                 // There are no messages to read, in this case, park.
                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
@@ -1339,10 +1294,7 @@
  */
 
 fn decode_state(num: usize) -> State {
-    State {
-        is_open: num & OPEN_MASK == OPEN_MASK,
-        num_messages: num & MAX_CAPACITY,
-    }
+    State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
 }
 
 fn encode_state(state: &State) -> usize {
diff --git a/src/mpsc/queue.rs b/src/mpsc/queue.rs
index b00e1b1..57dc7f5 100644
--- a/src/mpsc/queue.rs
+++ b/src/mpsc/queue.rs
@@ -43,10 +43,10 @@
 
 pub(super) use self::PopResult::*;
 
-use std::thread;
 use std::cell::UnsafeCell;
 use std::ptr;
 use std::sync::atomic::{AtomicPtr, Ordering};
+use std::thread;
 
 /// A result of the `pop` function.
 pub(super) enum PopResult<T> {
@@ -76,15 +76,12 @@
     tail: UnsafeCell<*mut Node<T>>,
 }
 
-unsafe impl<T: Send> Send for Queue<T> { }
-unsafe impl<T: Send> Sync for Queue<T> { }
+unsafe impl<T: Send> Send for Queue<T> {}
+unsafe impl<T: Send> Sync for Queue<T> {}
 
 impl<T> Node<T> {
     unsafe fn new(v: Option<T>) -> *mut Self {
-        Box::into_raw(Box::new(Self {
-            next: AtomicPtr::new(ptr::null_mut()),
-            value: v,
-        }))
+        Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v }))
     }
 }
 
@@ -93,10 +90,7 @@
     /// one consumer.
     pub(super) fn new() -> Self {
         let stub = unsafe { Node::new(None) };
-        Self {
-            head: AtomicPtr::new(stub),
-            tail: UnsafeCell::new(stub),
-        }
+        Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
     }
 
     /// Pushes a new value onto this queue.
@@ -133,7 +127,11 @@
             return Data(ret);
         }
 
-        if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
+        if self.head.load(Ordering::Acquire) == tail {
+            Empty
+        } else {
+            Inconsistent
+        }
     }
 
     /// Pop an element similarly to `pop` function, but spin-wait on inconsistent
diff --git a/src/mpsc/sink_impl.rs b/src/mpsc/sink_impl.rs
index 4ce66b4..1be2016 100644
--- a/src/mpsc/sink_impl.rs
+++ b/src/mpsc/sink_impl.rs
@@ -6,24 +6,15 @@
 impl<T> Sink<T> for Sender<T> {
     type Error = SendError;
 
-    fn poll_ready(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         (*self).poll_ready(cx)
     }
 
-    fn start_send(
-        mut self: Pin<&mut Self>,
-        msg: T,
-    ) -> Result<(), Self::Error> {
+    fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
         (*self).start_send(msg)
     }
 
-    fn poll_flush(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         match (*self).poll_ready(cx) {
             Poll::Ready(Err(ref e)) if e.is_disconnected() => {
                 // If the receiver disconnected, we consider the sink to be flushed.
@@ -33,10 +24,7 @@
         }
     }
 
-    fn poll_close(
-        mut self: Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         self.disconnect();
         Poll::Ready(Ok(()))
     }
@@ -45,31 +33,19 @@
 impl<T> Sink<T> for UnboundedSender<T> {
     type Error = SendError;
 
-    fn poll_ready(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         Self::poll_ready(&*self, cx)
     }
 
-    fn start_send(
-        mut self: Pin<&mut Self>,
-        msg: T,
-    ) -> Result<(), Self::Error> {
+    fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
         Self::start_send(&mut *self, msg)
     }
 
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         Poll::Ready(Ok(()))
     }
 
-    fn poll_close(
-        mut self: Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         self.disconnect();
         Poll::Ready(Ok(()))
     }
@@ -78,29 +54,19 @@
 impl<T> Sink<T> for &UnboundedSender<T> {
     type Error = SendError;
 
-    fn poll_ready(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         UnboundedSender::poll_ready(*self, cx)
     }
 
     fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
-        self.unbounded_send(msg)
-            .map_err(TrySendError::into_send_error)
+        self.unbounded_send(msg).map_err(TrySendError::into_send_error)
     }
 
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         Poll::Ready(Ok(()))
     }
 
-    fn poll_close(
-        self: Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         self.close_channel();
         Poll::Ready(Ok(()))
     }
diff --git a/src/oneshot.rs b/src/oneshot.rs
index dbbce81..5af651b 100644
--- a/src/oneshot.rs
+++ b/src/oneshot.rs
@@ -7,7 +7,7 @@
 use core::pin::Pin;
 use core::sync::atomic::AtomicBool;
 use core::sync::atomic::Ordering::SeqCst;
-use futures_core::future::{Future, FusedFuture};
+use futures_core::future::{FusedFuture, Future};
 use futures_core::task::{Context, Poll, Waker};
 
 use crate::lock::Lock;
@@ -16,7 +16,6 @@
 ///
 /// This is created by the [`channel`](channel) function.
 #[must_use = "futures do nothing unless you `.await` or poll them"]
-#[derive(Debug)]
 pub struct Receiver<T> {
     inner: Arc<Inner<T>>,
 }
@@ -24,7 +23,6 @@
 /// A means of transmitting a single value to another task.
 ///
 /// This is created by the [`channel`](channel) function.
-#[derive(Debug)]
 pub struct Sender<T> {
     inner: Arc<Inner<T>>,
 }
@@ -35,7 +33,6 @@
 
 /// Internal state of the `Receiver`/`Sender` pair above. This is all used as
 /// the internal synchronization between the two for send/recv operations.
-#[derive(Debug)]
 struct Inner<T> {
     /// Indicates whether this oneshot is complete yet. This is filled in both
     /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
@@ -106,12 +103,8 @@
 /// ```
 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
     let inner = Arc::new(Inner::new());
-    let receiver = Receiver {
-        inner: inner.clone(),
-    };
-    let sender = Sender {
-        inner,
-    };
+    let receiver = Receiver { inner: inner.clone() };
+    let sender = Sender { inner };
     (sender, receiver)
 }
 
@@ -127,7 +120,7 @@
 
     fn send(&self, t: T) -> Result<(), T> {
         if self.complete.load(SeqCst) {
-            return Err(t)
+            return Err(t);
         }
 
         // Note that this lock acquisition may fail if the receiver
@@ -164,7 +157,7 @@
         // destructor, but our destructor hasn't run yet so if it's set then the
         // oneshot is gone.
         if self.complete.load(SeqCst) {
-            return Poll::Ready(())
+            return Poll::Ready(());
         }
 
         // If our other half is not gone then we need to park our current task
@@ -273,7 +266,10 @@
         } else {
             let task = cx.waker().clone();
             match self.rx_task.try_lock() {
-                Some(mut slot) => { *slot = Some(task); false },
+                Some(mut slot) => {
+                    *slot = Some(task);
+                    false
+                }
                 None => true,
             }
         };
@@ -394,6 +390,12 @@
     }
 }
 
+impl<T: fmt::Debug> fmt::Debug for Sender<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Sender").field("complete", &self.inner.complete).finish()
+    }
+}
+
 /// A future that resolves when the receiving end of a channel has hung up.
 ///
 /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
@@ -453,10 +455,7 @@
 impl<T> Future for Receiver<T> {
     type Output = Result<T, Canceled>;
 
-    fn poll(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<T, Canceled>> {
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
         self.inner.recv(cx)
     }
 }
@@ -481,3 +480,9 @@
         self.inner.drop_rx()
     }
 }
+
+impl<T: fmt::Debug> fmt::Debug for Receiver<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Receiver").field("complete", &self.inner.complete).finish()
+    }
+}
diff --git a/tests/channel.rs b/tests/channel.rs
index 73dac64..5f01a8e 100644
--- a/tests/channel.rs
+++ b/tests/channel.rs
@@ -1,8 +1,8 @@
 use futures::channel::mpsc;
 use futures::executor::block_on;
 use futures::future::poll_fn;
-use futures::stream::StreamExt;
 use futures::sink::SinkExt;
+use futures::stream::StreamExt;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::thread;
 
@@ -11,9 +11,7 @@
     let (tx, rx) = mpsc::channel(1);
 
     let amt = 20;
-    let t = thread::spawn(move || {
-        block_on(send_sequence(amt, tx))
-    });
+    let t = thread::spawn(move || block_on(send_sequence(amt, tx)));
     let list: Vec<_> = block_on(rx.collect());
     let mut list = list.into_iter();
     for i in (1..=amt).rev() {
@@ -34,9 +32,7 @@
 fn drop_sender() {
     let (tx, mut rx) = mpsc::channel::<u32>(1);
     drop(tx);
-    let f = poll_fn(|cx| {
-        rx.poll_next_unpin(cx)
-    });
+    let f = poll_fn(|cx| rx.poll_next_unpin(cx));
     assert_eq!(block_on(f), None)
 }
 
diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs
index 9eb5296..1a14067 100644
--- a/tests/mpsc-close.rs
+++ b/tests/mpsc-close.rs
@@ -13,9 +13,7 @@
 fn smoke() {
     let (mut sender, receiver) = mpsc::channel(1);
 
-    let t = thread::spawn(move || {
-        while let Ok(()) = block_on(sender.send(42)) {}
-    });
+    let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
 
     // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
     block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
@@ -149,6 +147,7 @@
 
 // Stress test that `try_send()`s occurring concurrently with receiver
 // close/drops don't appear as successful sends.
+#[cfg_attr(miri, ignore)] // Miri is too slow
 #[test]
 fn stress_try_send_as_receiver_closes() {
     const AMT: usize = 10000;
@@ -166,7 +165,7 @@
     struct TestRx {
         rx: mpsc::Receiver<Arc<()>>,
         // The number of times to query `rx` before dropping it.
-        poll_count: usize
+        poll_count: usize,
     }
     struct TestTask {
         command_rx: mpsc::Receiver<TestRx>,
@@ -190,14 +189,11 @@
     impl Future for TestTask {
         type Output = ();
 
-        fn poll(
-            mut self: Pin<&mut Self>,
-            cx: &mut Context<'_>,
-        ) -> Poll<Self::Output> {
+        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
             // Poll the test channel, if one is present.
             if let Some(rx) = &mut self.test_rx {
                 if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
-                   let _ = v.expect("test finished unexpectedly!");
+                    let _ = v.expect("test finished unexpectedly!");
                 }
                 self.countdown -= 1;
                 // Busy-poll until the countdown is finished.
@@ -209,9 +205,9 @@
                     self.test_rx = Some(rx);
                     self.countdown = poll_count;
                     cx.waker().wake_by_ref();
-                },
+                }
                 Poll::Ready(None) => return Poll::Ready(()),
-                Poll::Pending => {},
+                Poll::Pending => {}
             }
             if self.countdown == 0 {
                 // Countdown complete -- drop the Receiver.
@@ -255,10 +251,14 @@
                                 if prev_weak.upgrade().is_none() {
                                     break;
                                 }
-                                assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
+                                assert!(
+                                    t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
                                     "item not dropped on iteration {} after \
                                      {} sends ({} successful). spin=({})",
-                                    i, attempted_sends, successful_sends, spins
+                                    i,
+                                    attempted_sends,
+                                    successful_sends,
+                                    spins
                                 );
                                 spins += 1;
                                 thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
@@ -273,6 +273,27 @@
         }
     }
     drop(cmd_tx);
-    bg.join()
-        .expect("background thread join");
+    bg.join().expect("background thread join");
+}
+
+#[test]
+fn unbounded_try_next_after_none() {
+    let (tx, mut rx) = mpsc::unbounded::<String>();
+    // Drop the sender, close the channel.
+    drop(tx);
+    // Receive the end of channel.
+    assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+    // None received, check we can call `try_next` again.
+    assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+}
+
+#[test]
+fn bounded_try_next_after_none() {
+    let (tx, mut rx) = mpsc::channel::<String>(17);
+    // Drop the sender, close the channel.
+    drop(tx);
+    // Receive the end of channel.
+    assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+    // None received, check we can call `try_next` again.
+    assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
 }
diff --git a/tests/mpsc.rs b/tests/mpsc.rs
index 61c5a50..da0899d 100644
--- a/tests/mpsc.rs
+++ b/tests/mpsc.rs
@@ -1,13 +1,13 @@
 use futures::channel::{mpsc, oneshot};
 use futures::executor::{block_on, block_on_stream};
-use futures::future::{FutureExt, poll_fn};
-use futures::stream::{Stream, StreamExt};
-use futures::sink::{Sink, SinkExt};
-use futures::task::{Context, Poll};
+use futures::future::{poll_fn, FutureExt};
 use futures::pin_mut;
+use futures::sink::{Sink, SinkExt};
+use futures::stream::{Stream, StreamExt};
+use futures::task::{Context, Poll};
 use futures_test::task::{new_count_waker, noop_context};
-use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
 use std::thread;
 
 trait AssertSend: Send {}
@@ -77,7 +77,7 @@
 fn send_recv_threads() {
     let (mut tx, rx) = mpsc::channel::<i32>(16);
 
-    let t = thread::spawn(move|| {
+    let t = thread::spawn(move || {
         block_on(tx.send(1)).unwrap();
     });
 
@@ -200,11 +200,14 @@
 
 #[test]
 fn stress_shared_unbounded() {
+    #[cfg(miri)]
+    const AMT: u32 = 100;
+    #[cfg(not(miri))]
     const AMT: u32 = 10000;
     const NTHREADS: u32 = 8;
     let (tx, rx) = mpsc::unbounded::<i32>();
 
-    let t = thread::spawn(move|| {
+    let t = thread::spawn(move || {
         let result: Vec<_> = block_on(rx.collect());
         assert_eq!(result.len(), (AMT * NTHREADS) as usize);
         for item in result {
@@ -215,7 +218,7 @@
     for _ in 0..NTHREADS {
         let tx = tx.clone();
 
-        thread::spawn(move|| {
+        thread::spawn(move || {
             for _ in 0..AMT {
                 tx.unbounded_send(1).unwrap();
             }
@@ -229,11 +232,14 @@
 
 #[test]
 fn stress_shared_bounded_hard() {
+    #[cfg(miri)]
+    const AMT: u32 = 100;
+    #[cfg(not(miri))]
     const AMT: u32 = 10000;
     const NTHREADS: u32 = 8;
     let (tx, rx) = mpsc::channel::<i32>(0);
 
-    let t = thread::spawn(move|| {
+    let t = thread::spawn(move || {
         let result: Vec<_> = block_on(rx.collect());
         assert_eq!(result.len(), (AMT * NTHREADS) as usize);
         for item in result {
@@ -259,6 +265,9 @@
 #[allow(clippy::same_item_push)]
 #[test]
 fn stress_receiver_multi_task_bounded_hard() {
+    #[cfg(miri)]
+    const AMT: usize = 100;
+    #[cfg(not(miri))]
     const AMT: usize = 10_000;
     const NTHREADS: u32 = 2;
 
@@ -297,9 +306,9 @@
                             }
                             Poll::Ready(None) => {
                                 *rx_opt = None;
-                                break
-                            },
-                            Poll::Pending => {},
+                                break;
+                            }
+                            Poll::Pending => {}
                         }
                     }
                 } else {
@@ -311,7 +320,6 @@
         th.push(t);
     }
 
-
     for i in 0..AMT {
         block_on(tx.send(i)).unwrap();
     }
@@ -328,7 +336,12 @@
 /// after sender dropped.
 #[test]
 fn stress_drop_sender() {
-    fn list() -> impl Stream<Item=i32> {
+    #[cfg(miri)]
+    const ITER: usize = 100;
+    #[cfg(not(miri))]
+    const ITER: usize = 10000;
+
+    fn list() -> impl Stream<Item = i32> {
         let (tx, rx) = mpsc::channel(1);
         thread::spawn(move || {
             block_on(send_one_two_three(tx));
@@ -336,7 +349,7 @@
         rx
     }
 
-    for _ in 0..10000 {
+    for _ in 0..ITER {
         let v: Vec<_> = block_on(list().collect());
         assert_eq!(v, vec![1, 2, 3]);
     }
@@ -381,9 +394,12 @@
     }
 }
 
+#[cfg_attr(miri, ignore)] // Miri is too slow
 #[test]
 fn stress_close_receiver() {
-    for _ in 0..10000 {
+    const ITER: usize = 10000;
+
+    for _ in 0..ITER {
         stress_close_receiver_iter();
     }
 }
@@ -398,6 +414,9 @@
 #[allow(clippy::same_item_push)]
 #[test]
 fn stress_poll_ready() {
+    #[cfg(miri)]
+    const AMT: u32 = 100;
+    #[cfg(not(miri))]
     const AMT: u32 = 1000;
     const NTHREADS: u32 = 8;
 
@@ -407,9 +426,7 @@
         let mut threads = Vec::new();
         for _ in 0..NTHREADS {
             let sender = tx.clone();
-            threads.push(thread::spawn(move || {
-                block_on(stress_poll_ready_sender(sender, AMT))
-            }));
+            threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
         }
         drop(tx);
 
@@ -427,6 +444,7 @@
     stress(16);
 }
 
+#[cfg_attr(miri, ignore)] // Miri is too slow
 #[test]
 fn try_send_1() {
     const N: usize = 3000;
@@ -436,7 +454,7 @@
         for i in 0..N {
             loop {
                 if tx.try_send(i).is_ok() {
-                    break
+                    break;
                 }
             }
         }
@@ -542,8 +560,8 @@
 
 #[test]
 fn hash_receiver() {
-    use std::hash::Hasher;
     use std::collections::hash_map::DefaultHasher;
+    use std::hash::Hasher;
 
     let mut hasher_a1 = DefaultHasher::new();
     let mut hasher_a2 = DefaultHasher::new();
diff --git a/tests/oneshot.rs b/tests/oneshot.rs
index a22d039..c9f5508 100644
--- a/tests/oneshot.rs
+++ b/tests/oneshot.rs
@@ -1,6 +1,6 @@
 use futures::channel::oneshot::{self, Sender};
 use futures::executor::block_on;
-use futures::future::{FutureExt, poll_fn};
+use futures::future::{poll_fn, FutureExt};
 use futures::task::{Context, Poll};
 use futures_test::task::panic_waker_ref;
 use std::sync::mpsc;
@@ -35,6 +35,11 @@
 
 #[test]
 fn cancel_lots() {
+    #[cfg(miri)]
+    const N: usize = 100;
+    #[cfg(not(miri))]
+    const N: usize = 20000;
+
     let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
     let t = thread::spawn(move || {
         for (mut tx, tx2) in rx {
@@ -43,7 +48,7 @@
         }
     });
 
-    for _ in 0..20000 {
+    for _ in 0..N {
         let (otx, orx) = oneshot::channel::<u32>();
         let (tx2, rx2) = mpsc::channel();
         tx.send((otx, tx2)).unwrap();
@@ -70,7 +75,7 @@
     rx.close();
     block_on(poll_fn(|cx| {
         match rx.poll_unpin(cx) {
-            Poll::Ready(Err(_)) => {},
+            Poll::Ready(Err(_)) => {}
             _ => panic!(),
         };
         assert!(tx.poll_canceled(cx).is_ready());
@@ -101,6 +106,11 @@
 
 #[test]
 fn cancel_sends() {
+    #[cfg(miri)]
+    const N: usize = 100;
+    #[cfg(not(miri))]
+    const N: usize = 20000;
+
     let (tx, rx) = mpsc::channel::<Sender<_>>();
     let t = thread::spawn(move || {
         for otx in rx {
@@ -108,7 +118,7 @@
         }
     });
 
-    for _ in 0..20000 {
+    for _ in 0..N {
         let (otx, mut orx) = oneshot::channel::<u32>();
         tx.send(otx).unwrap();