Initial import of version 0.3.21 am: 991128c83b am: d49f50682b am: bc1346748c am: ace98515a5
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-test/+/2143657
Change-Id: I9a3c12a67fc06ebde75a4405f3b3ca929f99d21e
Signed-off-by: Automerger Merge Worker <[email protected]>
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..77d8405
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,6 @@
+{
+ "git": {
+ "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+ },
+ "path_in_vcs": "futures-test"
+}
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
new file mode 100644
index 0000000..796fcb0
--- /dev/null
+++ b/Android.bp
@@ -0,0 +1,60 @@
+// This file is generated by cargo2android.py --config cargo2android.json.
+// Do not modify this file as changes will be overridden on upgrade.
+
+
+
+rust_test {
+ name: "futures-test_test_src_lib",
+ host_supported: true,
+ crate_name: "futures_test",
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.3.21",
+ srcs: ["src/lib.rs"],
+ test_suites: ["general-tests"],
+ auto_gen_config: true,
+ test_options: {
+ unit_test: true,
+ },
+ edition: "2018",
+ features: [
+ "default",
+ "std",
+ ],
+ rustlibs: [
+ "libfutures",
+ "libfutures_core",
+ "libfutures_executor",
+ "libfutures_io",
+ "libfutures_sink",
+ "libfutures_task",
+ "libfutures_util",
+ "libpin_project",
+ "libpin_utils",
+ ],
+ proc_macros: ["libfutures_macro"],
+}
+
+rust_library {
+ name: "libfutures_test",
+ host_supported: true,
+ crate_name: "futures_test",
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.3.21",
+ srcs: ["src/lib.rs"],
+ edition: "2018",
+ features: [
+ "default",
+ "std",
+ ],
+ rustlibs: [
+ "libfutures_core",
+ "libfutures_executor",
+ "libfutures_io",
+ "libfutures_sink",
+ "libfutures_task",
+ "libfutures_util",
+ "libpin_project",
+ "libpin_utils",
+ ],
+ proc_macros: ["libfutures_macro"],
+}
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..41e5541
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,73 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies.
+#
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
+
+[package]
+edition = "2018"
+rust-version = "1.45"
+name = "futures-test"
+version = "0.3.21"
+description = """
+Common utilities for testing components built off futures-rs.
+"""
+homepage = "https://rust-lang.github.io/futures-rs"
+license = "MIT OR Apache-2.0"
+repository = "https://github.com/rust-lang/futures-rs"
+
+[package.metadata.docs.rs]
+all-features = true
+
+[dependencies.futures-core]
+version = "0.3.21"
+default-features = false
+
+[dependencies.futures-executor]
+version = "0.3.21"
+default-features = false
+
+[dependencies.futures-io]
+version = "0.3.21"
+default-features = false
+
+[dependencies.futures-macro]
+version = "=0.3.21"
+default-features = false
+
+[dependencies.futures-sink]
+version = "0.3.21"
+default-features = false
+
+[dependencies.futures-task]
+version = "0.3.21"
+default-features = false
+
+[dependencies.futures-util]
+version = "0.3.21"
+default-features = false
+
+[dependencies.pin-project]
+version = "1.0.1"
+
+[dependencies.pin-utils]
+version = "0.1.0"
+default-features = false
+
+[dev-dependencies]
+
+[features]
+default = ["std"]
+std = [
+ "futures-core/std",
+ "futures-task/std",
+ "futures-io/std",
+ "futures-util/std",
+ "futures-util/io",
+ "futures-executor/std",
+]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644
index 0000000..b5aa8a7
--- /dev/null
+++ b/Cargo.toml.orig
@@ -0,0 +1,32 @@
+[package]
+name = "futures-test"
+version = "0.3.21"
+edition = "2018"
+rust-version = "1.45"
+license = "MIT OR Apache-2.0"
+repository = "https://github.com/rust-lang/futures-rs"
+homepage = "https://rust-lang.github.io/futures-rs"
+description = """
+Common utilities for testing components built off futures-rs.
+"""
+
+[dependencies]
+futures-core = { version = "0.3.21", path = "../futures-core", default-features = false }
+futures-task = { version = "0.3.21", path = "../futures-task", default-features = false }
+futures-io = { version = "0.3.21", path = "../futures-io", default-features = false }
+futures-util = { version = "0.3.21", path = "../futures-util", default-features = false }
+futures-executor = { version = "0.3.21", path = "../futures-executor", default-features = false }
+futures-sink = { version = "0.3.21", path = "../futures-sink", default-features = false }
+futures-macro = { version = "=0.3.21", path = "../futures-macro", default-features = false }
+pin-utils = { version = "0.1.0", default-features = false }
+pin-project = "1.0.1"
+
+[dev-dependencies]
+futures = { path = "../futures", default-features = false, features = ["std", "executor"] }
+
+[features]
+default = ["std"]
+std = ["futures-core/std", "futures-task/std", "futures-io/std", "futures-util/std", "futures-util/io", "futures-executor/std"]
+
+[package.metadata.docs.rs]
+all-features = true
diff --git a/LICENSE b/LICENSE
new file mode 120000
index 0000000..6b579aa
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1 @@
+LICENSE-APACHE
\ No newline at end of file
diff --git a/LICENSE-APACHE b/LICENSE-APACHE
new file mode 100644
index 0000000..9eb0b09
--- /dev/null
+++ b/LICENSE-APACHE
@@ -0,0 +1,202 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+Copyright (c) 2016 Alex Crichton
+Copyright (c) 2017 The Tokio Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/LICENSE-MIT b/LICENSE-MIT
new file mode 100644
index 0000000..8ad082e
--- /dev/null
+++ b/LICENSE-MIT
@@ -0,0 +1,26 @@
+Copyright (c) 2016 Alex Crichton
+Copyright (c) 2017 The Tokio Authors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/METADATA b/METADATA
new file mode 100644
index 0000000..098ff76
--- /dev/null
+++ b/METADATA
@@ -0,0 +1,20 @@
+name: "futures-test"
+description: "()"
+third_party {
+ url {
+ type: HOMEPAGE
+ value: "https://crates.io/crates/futures-test"
+ }
+ url {
+ type: ARCHIVE
+ value: "https://static.crates.io/crates/futures-test/futures-test-0.3.21.crate"
+ }
+ version: "0.3.21"
+ # Dual-licensed, using the least restrictive per go/thirdpartylicenses#same.
+ license_type: NOTICE
+ last_upgrade_date {
+ year: 2022
+ month: 6
+ day: 22
+ }
+}
diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/MODULE_LICENSE_APACHE2
diff --git a/OWNERS b/OWNERS
new file mode 100644
index 0000000..45dc4dd
--- /dev/null
+++ b/OWNERS
@@ -0,0 +1 @@
+include platform/prebuilts/rust:master:/OWNERS
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b3c30e5
--- /dev/null
+++ b/README.md
@@ -0,0 +1,23 @@
+# futures-test
+
+Common utilities for testing components built off futures-rs.
+
+## Usage
+
+Add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+futures-test = "0.3"
+```
+
+The current `futures-test` requires Rust 1.45 or later.
+
+## License
+
+Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
+[MIT license](LICENSE-MIT) at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall
+be dual licensed as above, without any additional terms or conditions.
diff --git a/TEST_MAPPING b/TEST_MAPPING
new file mode 100644
index 0000000..bbd34fa
--- /dev/null
+++ b/TEST_MAPPING
@@ -0,0 +1,13 @@
+// Generated by update_crate_tests.py for tests that depend on this crate.
+{
+ "presubmit": [
+ {
+ "name": "futures-test_test_src_lib"
+ }
+ ],
+ "presubmit-rust": [
+ {
+ "name": "futures-test_test_src_lib"
+ }
+ ]
+}
diff --git a/cargo2android.json b/cargo2android.json
new file mode 100644
index 0000000..84d8f2d
--- /dev/null
+++ b/cargo2android.json
@@ -0,0 +1,7 @@
+{
+ "dependencies": true,
+ "device": true,
+ "run": true,
+ "tests": true,
+ "orig-cargo-toml": true
+}
diff --git a/src/assert.rs b/src/assert.rs
new file mode 100644
index 0000000..75d7832
--- /dev/null
+++ b/src/assert.rs
@@ -0,0 +1,121 @@
+use futures_core::stream::Stream;
+
+#[doc(hidden)]
+pub fn assert_is_unpin_stream<S: Stream + Unpin>(_: &mut S) {}
+
+/// Assert that the next poll to the provided stream will return
+/// [`Poll::Pending`](futures_core::task::Poll::Pending).
+///
+/// # Examples
+///
+/// ```
+/// use futures::stream;
+/// use futures_test::future::FutureTestExt;
+/// use futures_test::{
+/// assert_stream_pending, assert_stream_next, assert_stream_done,
+/// };
+/// use futures::pin_mut;
+///
+/// let stream = stream::once((async { 5 }).pending_once());
+/// pin_mut!(stream);
+///
+/// assert_stream_pending!(stream);
+/// assert_stream_next!(stream, 5);
+/// assert_stream_done!(stream);
+/// ```
+#[macro_export]
+macro_rules! assert_stream_pending {
+ ($stream:expr) => {{
+ let mut stream = &mut $stream;
+ $crate::__private::assert::assert_is_unpin_stream(stream);
+ let stream = $crate::__private::Pin::new(stream);
+ let mut cx = $crate::task::noop_context();
+ let poll = $crate::__private::stream::Stream::poll_next(stream, &mut cx);
+ if poll.is_ready() {
+ panic!("assertion failed: stream is not pending");
+ }
+ }};
+}
+
+/// Assert that the next poll to the provided stream will return
+/// [`Poll::Ready`](futures_core::task::Poll::Ready) with the provided item.
+///
+/// # Examples
+///
+/// ```
+/// use futures::stream;
+/// use futures_test::future::FutureTestExt;
+/// use futures_test::{
+/// assert_stream_pending, assert_stream_next, assert_stream_done,
+/// };
+/// use futures::pin_mut;
+///
+/// let stream = stream::once((async { 5 }).pending_once());
+/// pin_mut!(stream);
+///
+/// assert_stream_pending!(stream);
+/// assert_stream_next!(stream, 5);
+/// assert_stream_done!(stream);
+/// ```
+#[macro_export]
+macro_rules! assert_stream_next {
+ ($stream:expr, $item:expr) => {{
+ let mut stream = &mut $stream;
+ $crate::__private::assert::assert_is_unpin_stream(stream);
+ let stream = $crate::__private::Pin::new(stream);
+ let mut cx = $crate::task::noop_context();
+ match $crate::__private::stream::Stream::poll_next(stream, &mut cx) {
+ $crate::__private::task::Poll::Ready($crate::__private::Some(x)) => {
+ assert_eq!(x, $item);
+ }
+ $crate::__private::task::Poll::Ready($crate::__private::None) => {
+ panic!(
+ "assertion failed: expected stream to provide item but stream is at its end"
+ );
+ }
+ $crate::__private::task::Poll::Pending => {
+ panic!("assertion failed: expected stream to provide item but stream wasn't ready");
+ }
+ }
+ }};
+}
+
+/// Assert that the next poll to the provided stream will return an empty
+/// [`Poll::Ready`](futures_core::task::Poll::Ready) signalling the
+/// completion of the stream.
+///
+/// # Examples
+///
+/// ```
+/// use futures::stream;
+/// use futures_test::future::FutureTestExt;
+/// use futures_test::{
+/// assert_stream_pending, assert_stream_next, assert_stream_done,
+/// };
+/// use futures::pin_mut;
+///
+/// let stream = stream::once((async { 5 }).pending_once());
+/// pin_mut!(stream);
+///
+/// assert_stream_pending!(stream);
+/// assert_stream_next!(stream, 5);
+/// assert_stream_done!(stream);
+/// ```
+#[macro_export]
+macro_rules! assert_stream_done {
+ ($stream:expr) => {{
+ let mut stream = &mut $stream;
+ $crate::__private::assert::assert_is_unpin_stream(stream);
+ let stream = $crate::__private::Pin::new(stream);
+ let mut cx = $crate::task::noop_context();
+ match $crate::__private::stream::Stream::poll_next(stream, &mut cx) {
+ $crate::__private::task::Poll::Ready($crate::__private::Some(_)) => {
+ panic!("assertion failed: expected stream to be done but had more elements");
+ }
+ $crate::__private::task::Poll::Ready($crate::__private::None) => {}
+ $crate::__private::task::Poll::Pending => {
+ panic!("assertion failed: expected stream to be done but was pending");
+ }
+ }
+ }};
+}
diff --git a/src/assert_unmoved.rs b/src/assert_unmoved.rs
new file mode 100644
index 0000000..95d9a09
--- /dev/null
+++ b/src/assert_unmoved.rs
@@ -0,0 +1,218 @@
+use futures_core::future::{FusedFuture, Future};
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use futures_io::{
+ self as io, AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom,
+};
+use futures_sink::Sink;
+use pin_project::{pin_project, pinned_drop};
+use std::pin::Pin;
+use std::ptr;
+use std::thread::panicking;
+
+/// Combinator that asserts that the underlying type is not moved after being polled.
+///
+/// See the `assert_unmoved` methods on:
+/// * [`FutureTestExt`](crate::future::FutureTestExt::assert_unmoved)
+/// * [`StreamTestExt`](crate::stream::StreamTestExt::assert_unmoved)
+/// * [`SinkTestExt`](crate::sink::SinkTestExt::assert_unmoved_sink)
+/// * [`AsyncReadTestExt`](crate::io::AsyncReadTestExt::assert_unmoved)
+/// * [`AsyncWriteTestExt`](crate::io::AsyncWriteTestExt::assert_unmoved_write)
+#[pin_project(PinnedDrop, !Unpin)]
+#[derive(Debug, Clone)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct AssertUnmoved<T> {
+ #[pin]
+ inner: T,
+ this_ptr: *const Self,
+}
+
+// Safety: having a raw pointer in a struct makes it `!Send`, however the
+// pointer is never dereferenced so this is safe.
+unsafe impl<T: Send> Send for AssertUnmoved<T> {}
+unsafe impl<T: Sync> Sync for AssertUnmoved<T> {}
+
+impl<T> AssertUnmoved<T> {
+ pub(crate) fn new(inner: T) -> Self {
+ Self { inner, this_ptr: ptr::null() }
+ }
+
+ fn poll_with<'a, U>(mut self: Pin<&'a mut Self>, f: impl FnOnce(Pin<&'a mut T>) -> U) -> U {
+ let cur_this = &*self as *const Self;
+ if self.this_ptr.is_null() {
+ // First time being polled
+ *self.as_mut().project().this_ptr = cur_this;
+ } else {
+ assert_eq!(self.this_ptr, cur_this, "AssertUnmoved moved between poll calls");
+ }
+ f(self.project().inner)
+ }
+}
+
+impl<Fut: Future> Future for AssertUnmoved<Fut> {
+ type Output = Fut::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.poll_with(|f| f.poll(cx))
+ }
+}
+
+impl<Fut: FusedFuture> FusedFuture for AssertUnmoved<Fut> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_terminated()
+ }
+}
+
+impl<St: Stream> Stream for AssertUnmoved<St> {
+ type Item = St::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.poll_with(|s| s.poll_next(cx))
+ }
+}
+
+impl<St: FusedStream> FusedStream for AssertUnmoved<St> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_terminated()
+ }
+}
+
+impl<Si: Sink<Item>, Item> Sink<Item> for AssertUnmoved<Si> {
+ type Error = Si::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_with(|s| s.poll_ready(cx))
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ self.poll_with(|s| s.start_send(item))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_with(|s| s.poll_flush(cx))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_with(|s| s.poll_close(cx))
+ }
+}
+
+impl<R: AsyncRead> AsyncRead for AssertUnmoved<R> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_with(|r| r.poll_read(cx, buf))
+ }
+
+ fn poll_read_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_with(|r| r.poll_read_vectored(cx, bufs))
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for AssertUnmoved<W> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_with(|w| w.poll_write(cx, buf))
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_with(|w| w.poll_write_vectored(cx, bufs))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.poll_with(|w| w.poll_flush(cx))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.poll_with(|w| w.poll_close(cx))
+ }
+}
+
+impl<S: AsyncSeek> AsyncSeek for AssertUnmoved<S> {
+ fn poll_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ self.poll_with(|s| s.poll_seek(cx, pos))
+ }
+}
+
+impl<R: AsyncBufRead> AsyncBufRead for AssertUnmoved<R> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ self.poll_with(|r| r.poll_fill_buf(cx))
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ self.poll_with(|r| r.consume(amt))
+ }
+}
+
+#[pinned_drop]
+impl<T> PinnedDrop for AssertUnmoved<T> {
+ fn drop(self: Pin<&mut Self>) {
+ // If the thread is panicking then we can't panic again as that will
+ // cause the process to be aborted.
+ if !panicking() && !self.this_ptr.is_null() {
+ let cur_this = &*self as *const Self;
+ assert_eq!(self.this_ptr, cur_this, "AssertUnmoved moved before drop");
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use futures_core::future::Future;
+ use futures_core::task::{Context, Poll};
+ use futures_util::future::pending;
+ use futures_util::task::noop_waker;
+ use std::pin::Pin;
+
+ use super::AssertUnmoved;
+
+ #[test]
+ fn assert_send_sync() {
+ fn assert<T: Send + Sync>() {}
+ assert::<AssertUnmoved<()>>();
+ }
+
+ #[test]
+ fn dont_panic_when_not_polled() {
+ // This shouldn't panic.
+ let future = AssertUnmoved::new(pending::<()>());
+ drop(future);
+ }
+
+ #[test]
+ #[should_panic(expected = "AssertUnmoved moved between poll calls")]
+ fn dont_double_panic() {
+ // This test should only panic, not abort the process.
+ let waker = noop_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ // First we allocate the future on the stack and poll it.
+ let mut future = AssertUnmoved::new(pending::<()>());
+ let pinned_future = unsafe { Pin::new_unchecked(&mut future) };
+ assert_eq!(pinned_future.poll(&mut cx), Poll::Pending);
+
+ // Next we move it back to the heap and poll it again. This second call
+ // should panic (as the future is moved), but we shouldn't panic again
+ // whilst dropping `AssertUnmoved`.
+ let mut future = Box::new(future);
+ let pinned_boxed_future = unsafe { Pin::new_unchecked(&mut *future) };
+ assert_eq!(pinned_boxed_future.poll(&mut cx), Poll::Pending);
+ }
+}
diff --git a/src/future/mod.rs b/src/future/mod.rs
new file mode 100644
index 0000000..ee5c6dd
--- /dev/null
+++ b/src/future/mod.rs
@@ -0,0 +1,108 @@
+//! Additional combinators for testing futures.
+
+mod pending_once;
+pub use self::pending_once::PendingOnce;
+
+use futures_core::future::Future;
+use std::thread;
+
+pub use crate::assert_unmoved::AssertUnmoved;
+pub use crate::interleave_pending::InterleavePending;
+
+/// Additional combinators for testing futures.
+pub trait FutureTestExt: Future {
+ /// Asserts that the given is not moved after being polled.
+ ///
+ /// A check for movement is performed each time the future is polled
+ /// and when `Drop` is called.
+ ///
+ /// Aside from keeping track of the location at which the future was first
+ /// polled and providing assertions, this future adds no runtime behavior
+ /// and simply delegates to the child future.
+ fn assert_unmoved(self) -> AssertUnmoved<Self>
+ where
+ Self: Sized,
+ {
+ AssertUnmoved::new(self)
+ }
+
+ /// Introduces one [`Poll::Pending`](futures_core::task::Poll::Pending)
+ /// before polling the given future.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::task::Poll;
+ /// use futures::future::FutureExt;
+ /// use futures_test::task::noop_context;
+ /// use futures_test::future::FutureTestExt;
+ /// use futures::pin_mut;
+ ///
+ /// let future = (async { 5 }).pending_once();
+ /// pin_mut!(future);
+ ///
+ /// let mut cx = noop_context();
+ ///
+ /// assert_eq!(future.poll_unpin(&mut cx), Poll::Pending);
+ /// assert_eq!(future.poll_unpin(&mut cx), Poll::Ready(5));
+ /// ```
+ fn pending_once(self) -> PendingOnce<Self>
+ where
+ Self: Sized,
+ {
+ PendingOnce::new(self)
+ }
+
+ /// Runs this future on a dedicated executor running in a background thread.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::oneshot;
+ /// use futures_test::future::FutureTestExt;
+ ///
+ /// let (tx, rx) = oneshot::channel::<i32>();
+ ///
+ /// (async { tx.send(5).unwrap() }).run_in_background();
+ ///
+ /// assert_eq!(rx.await, Ok(5));
+ /// # });
+ /// ```
+ fn run_in_background(self)
+ where
+ Self: Sized + Send + 'static,
+ Self::Output: Send,
+ {
+ thread::spawn(|| futures_executor::block_on(self));
+ }
+
+ /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending)
+ /// in between each call to poll.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::task::Poll;
+ /// use futures::future::{self, Future};
+ /// use futures_test::task::noop_context;
+ /// use futures_test::future::FutureTestExt;
+ /// use futures::pin_mut;
+ ///
+ /// let future = future::ready(1).interleave_pending();
+ /// pin_mut!(future);
+ ///
+ /// let mut cx = noop_context();
+ ///
+ /// assert_eq!(future.as_mut().poll(&mut cx), Poll::Pending);
+ /// assert_eq!(future.as_mut().poll(&mut cx), Poll::Ready(1));
+ /// ```
+ fn interleave_pending(self) -> InterleavePending<Self>
+ where
+ Self: Sized,
+ {
+ InterleavePending::new(self)
+ }
+}
+
+impl<Fut> FutureTestExt for Fut where Fut: Future {}
diff --git a/src/future/pending_once.rs b/src/future/pending_once.rs
new file mode 100644
index 0000000..0fc3ef0
--- /dev/null
+++ b/src/future/pending_once.rs
@@ -0,0 +1,46 @@
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll};
+use pin_project::pin_project;
+use std::pin::Pin;
+
+/// Combinator that guarantees one [`Poll::Pending`] before polling its inner
+/// future.
+///
+/// This is created by the
+/// [`FutureTestExt::pending_once`](super::FutureTestExt::pending_once)
+/// method.
+#[pin_project]
+#[derive(Debug, Clone)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct PendingOnce<Fut> {
+ #[pin]
+ future: Fut,
+ polled_before: bool,
+}
+
+impl<Fut: Future> PendingOnce<Fut> {
+ pub(super) fn new(future: Fut) -> Self {
+ Self { future, polled_before: false }
+ }
+}
+
+impl<Fut: Future> Future for PendingOnce<Fut> {
+ type Output = Fut::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = self.project();
+ if *this.polled_before {
+ this.future.poll(cx)
+ } else {
+ *this.polled_before = true;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+}
+
+impl<Fut: FusedFuture> FusedFuture for PendingOnce<Fut> {
+ fn is_terminated(&self) -> bool {
+ self.polled_before && self.future.is_terminated()
+ }
+}
diff --git a/src/interleave_pending.rs b/src/interleave_pending.rs
new file mode 100644
index 0000000..9164077
--- /dev/null
+++ b/src/interleave_pending.rs
@@ -0,0 +1,191 @@
+use futures_core::future::{FusedFuture, Future};
+use futures_core::stream::{FusedStream, Stream};
+use futures_io::{
+ self as io, AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom,
+};
+use futures_sink::Sink;
+use pin_project::pin_project;
+use std::{
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+/// Wrapper that interleaves [`Poll::Pending`] in calls to poll.
+///
+/// See the `interleave_pending` methods on:
+/// * [`FutureTestExt`](crate::future::FutureTestExt::interleave_pending)
+/// * [`StreamTestExt`](crate::stream::StreamTestExt::interleave_pending)
+/// * [`SinkTestExt`](crate::sink::SinkTestExt::interleave_pending_sink)
+/// * [`AsyncReadTestExt`](crate::io::AsyncReadTestExt::interleave_pending)
+/// * [`AsyncWriteTestExt`](crate::io::AsyncWriteTestExt::interleave_pending_write)
+#[pin_project]
+#[derive(Debug)]
+pub struct InterleavePending<T> {
+ #[pin]
+ inner: T,
+ pended: bool,
+}
+
+impl<T> InterleavePending<T> {
+ pub(crate) fn new(inner: T) -> Self {
+ Self { inner, pended: false }
+ }
+
+ /// Acquires a reference to the underlying I/O object that this adaptor is
+ /// wrapping.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Acquires a mutable reference to the underlying I/O object that this
+ /// adaptor is wrapping.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Acquires a pinned mutable reference to the underlying I/O object that
+ /// this adaptor is wrapping.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
+ self.project().inner
+ }
+
+ /// Consumes this adaptor returning the underlying I/O object.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ fn poll_with<'a, U>(
+ self: Pin<&'a mut Self>,
+ cx: &mut Context<'_>,
+ f: impl FnOnce(Pin<&'a mut T>, &mut Context<'_>) -> Poll<U>,
+ ) -> Poll<U> {
+ let this = self.project();
+ if *this.pended {
+ let next = f(this.inner, cx);
+ if next.is_ready() {
+ *this.pended = false;
+ }
+ next
+ } else {
+ cx.waker().wake_by_ref();
+ *this.pended = true;
+ Poll::Pending
+ }
+ }
+}
+
+impl<Fut: Future> Future for InterleavePending<Fut> {
+ type Output = Fut::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.poll_with(cx, Fut::poll)
+ }
+}
+
+impl<Fut: FusedFuture> FusedFuture for InterleavePending<Fut> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_terminated()
+ }
+}
+
+impl<St: Stream> Stream for InterleavePending<St> {
+ type Item = St::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.poll_with(cx, St::poll_next)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.inner.size_hint()
+ }
+}
+
+impl<St: FusedStream> FusedStream for InterleavePending<St> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_terminated()
+ }
+}
+
+impl<Si: Sink<Item>, Item> Sink<Item> for InterleavePending<Si> {
+ type Error = Si::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_with(cx, Si::poll_ready)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ self.project().inner.start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_with(cx, Si::poll_flush)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_with(cx, Si::poll_close)
+ }
+}
+
+impl<R: AsyncRead> AsyncRead for InterleavePending<R> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_with(cx, |r, cx| r.poll_read(cx, buf))
+ }
+
+ fn poll_read_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_with(cx, |r, cx| r.poll_read_vectored(cx, bufs))
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for InterleavePending<W> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_with(cx, |w, cx| w.poll_write(cx, buf))
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_with(cx, |w, cx| w.poll_write_vectored(cx, bufs))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.poll_with(cx, W::poll_flush)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.poll_with(cx, W::poll_close)
+ }
+}
+
+impl<S: AsyncSeek> AsyncSeek for InterleavePending<S> {
+ fn poll_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ self.poll_with(cx, |s, cx| s.poll_seek(cx, pos))
+ }
+}
+
+impl<R: AsyncBufRead> AsyncBufRead for InterleavePending<R> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ self.poll_with(cx, R::poll_fill_buf)
+ }
+
+ fn consume(self: Pin<&mut Self>, amount: usize) {
+ self.project().inner.consume(amount)
+ }
+}
diff --git a/src/io/limited.rs b/src/io/limited.rs
new file mode 100644
index 0000000..34b72a5
--- /dev/null
+++ b/src/io/limited.rs
@@ -0,0 +1,91 @@
+use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite};
+use pin_project::pin_project;
+use std::{
+ cmp,
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+/// I/O wrapper that limits the number of bytes written or read on each call.
+///
+/// See the [`limited`] and [`limited_write`] methods.
+///
+/// [`limited`]: super::AsyncReadTestExt::limited
+/// [`limited_write`]: super::AsyncWriteTestExt::limited_write
+#[pin_project]
+#[derive(Debug)]
+pub struct Limited<Io> {
+ #[pin]
+ io: Io,
+ limit: usize,
+}
+
+impl<Io> Limited<Io> {
+ pub(crate) fn new(io: Io, limit: usize) -> Self {
+ Self { io, limit }
+ }
+
+ /// Acquires a reference to the underlying I/O object that this adaptor is
+ /// wrapping.
+ pub fn get_ref(&self) -> &Io {
+ &self.io
+ }
+
+ /// Acquires a mutable reference to the underlying I/O object that this
+ /// adaptor is wrapping.
+ pub fn get_mut(&mut self) -> &mut Io {
+ &mut self.io
+ }
+
+ /// Acquires a pinned mutable reference to the underlying I/O object that
+ /// this adaptor is wrapping.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Io> {
+ self.project().io
+ }
+
+ /// Consumes this adaptor returning the underlying I/O object.
+ pub fn into_inner(self) -> Io {
+ self.io
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for Limited<W> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ let this = self.project();
+ this.io.poll_write(cx, &buf[..cmp::min(*this.limit, buf.len())])
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.project().io.poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.project().io.poll_close(cx)
+ }
+}
+
+impl<R: AsyncRead> AsyncRead for Limited<R> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ let this = self.project();
+ let limit = cmp::min(*this.limit, buf.len());
+ this.io.poll_read(cx, &mut buf[..limit])
+ }
+}
+
+impl<R: AsyncBufRead> AsyncBufRead for Limited<R> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ self.project().io.poll_fill_buf(cx)
+ }
+
+ fn consume(self: Pin<&mut Self>, amount: usize) {
+ self.project().io.consume(amount)
+ }
+}
diff --git a/src/io/mod.rs b/src/io/mod.rs
new file mode 100644
index 0000000..0382784
--- /dev/null
+++ b/src/io/mod.rs
@@ -0,0 +1,9 @@
+//! Additional combinators for testing async IO.
+
+mod limited;
+
+pub mod read;
+pub use read::AsyncReadTestExt;
+
+pub mod write;
+pub use write::AsyncWriteTestExt;
diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs
new file mode 100644
index 0000000..cb5f1d3
--- /dev/null
+++ b/src/io/read/mod.rs
@@ -0,0 +1,126 @@
+//! Additional combinators for testing async readers.
+
+use futures_io::AsyncRead;
+
+pub use super::limited::Limited;
+pub use crate::assert_unmoved::AssertUnmoved;
+pub use crate::interleave_pending::InterleavePending;
+
+/// Additional combinators for testing async readers.
+pub trait AsyncReadTestExt: AsyncRead {
+ /// Asserts that the given is not moved after being polled.
+ ///
+ /// A check for movement is performed each time the reader is polled
+ /// and when `Drop` is called.
+ ///
+ /// Aside from keeping track of the location at which the reader was first
+ /// polled and providing assertions, this reader adds no runtime behavior
+ /// and simply delegates to the child reader.
+ fn assert_unmoved(self) -> AssertUnmoved<Self>
+ where
+ Self: Sized,
+ {
+ AssertUnmoved::new(self)
+ }
+
+ /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending)
+ /// in between each read of the reader.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::task::Poll;
+ /// use futures::io::{AsyncRead, Cursor};
+ /// use futures_test::task::noop_context;
+ /// use futures_test::io::AsyncReadTestExt;
+ /// use futures::pin_mut;
+ ///
+ /// let reader = Cursor::new(&[1, 2, 3]).interleave_pending();
+ /// pin_mut!(reader);
+ ///
+ /// let mut cx = noop_context();
+ ///
+ /// let mut buf = [0, 0];
+ ///
+ /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Pending);
+ /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Ready(2));
+ /// assert_eq!(buf, [1, 2]);
+ /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Pending);
+ /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Ready(1));
+ /// assert_eq!(buf, [3, 2]);
+ /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Pending);
+ /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Ready(0));
+ ///
+ /// # Ok::<(), std::io::Error>(())
+ /// ```
+ ///
+ /// ## `AsyncBufRead`
+ ///
+ /// The returned reader will also implement `AsyncBufRead` if the underlying reader does.
+ ///
+ /// ```
+ /// use futures::task::Poll;
+ /// use futures::io::{AsyncBufRead, Cursor};
+ /// use futures_test::task::noop_context;
+ /// use futures_test::io::AsyncReadTestExt;
+ /// use futures::pin_mut;
+ ///
+ /// let reader = Cursor::new(&[1, 2, 3]).interleave_pending();
+ /// pin_mut!(reader);
+ ///
+ /// let mut cx = noop_context();
+ ///
+ /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Pending);
+ /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[1, 2, 3][..]));
+ /// reader.as_mut().consume(2);
+ /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Pending);
+ /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[3][..]));
+ /// reader.as_mut().consume(1);
+ /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Pending);
+ /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[][..]));
+ ///
+ /// # Ok::<(), std::io::Error>(())
+ /// ```
+ fn interleave_pending(self) -> InterleavePending<Self>
+ where
+ Self: Sized,
+ {
+ InterleavePending::new(self)
+ }
+
+ /// Limit the number of bytes allowed to be read on each call to `poll_read`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::task::Poll;
+ /// use futures::io::{AsyncRead, Cursor};
+ /// use futures_test::task::noop_context;
+ /// use futures_test::io::AsyncReadTestExt;
+ /// use futures::pin_mut;
+ ///
+ /// let reader = Cursor::new(&[1, 2, 3, 4, 5]).limited(2);
+ /// pin_mut!(reader);
+ ///
+ /// let mut cx = noop_context();
+ ///
+ /// let mut buf = [0; 10];
+ ///
+ /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf)?, Poll::Ready(2));
+ /// assert_eq!(&buf[..2], &[1, 2]);
+ /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf)?, Poll::Ready(2));
+ /// assert_eq!(&buf[..2], &[3, 4]);
+ /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf)?, Poll::Ready(1));
+ /// assert_eq!(&buf[..1], &[5]);
+ ///
+ /// # Ok::<(), std::io::Error>(())
+ /// ```
+ fn limited(self, limit: usize) -> Limited<Self>
+ where
+ Self: Sized,
+ {
+ Limited::new(self, limit)
+ }
+}
+
+impl<R> AsyncReadTestExt for R where R: AsyncRead {}
diff --git a/src/io/write/mod.rs b/src/io/write/mod.rs
new file mode 100644
index 0000000..01ca4b2
--- /dev/null
+++ b/src/io/write/mod.rs
@@ -0,0 +1,141 @@
+//! Additional combinators for testing async writers.
+
+use futures_io::AsyncWrite;
+
+pub use super::limited::Limited;
+pub use crate::assert_unmoved::AssertUnmoved;
+pub use crate::interleave_pending::InterleavePending;
+pub use crate::track_closed::TrackClosed;
+
+/// Additional combinators for testing async writers.
+pub trait AsyncWriteTestExt: AsyncWrite {
+ /// Asserts that the given is not moved after being polled.
+ ///
+ /// A check for movement is performed each time the writer is polled
+ /// and when `Drop` is called.
+ ///
+ /// Aside from keeping track of the location at which the writer was first
+ /// polled and providing assertions, this writer adds no runtime behavior
+ /// and simply delegates to the child writer.
+ fn assert_unmoved_write(self) -> AssertUnmoved<Self>
+ where
+ Self: Sized,
+ {
+ AssertUnmoved::new(self)
+ }
+
+ /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending)
+ /// in between each operation on the writer.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::task::Poll;
+ /// use futures::io::{AsyncWrite, Cursor};
+ /// use futures_test::task::noop_context;
+ /// use futures_test::io::AsyncWriteTestExt;
+ /// use futures::pin_mut;
+ ///
+ /// let writer = Cursor::new(vec![0u8; 4].into_boxed_slice()).interleave_pending_write();
+ /// pin_mut!(writer);
+ ///
+ /// let mut cx = noop_context();
+ ///
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[1, 2])?, Poll::Pending);
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[1, 2])?, Poll::Ready(2));
+ /// assert_eq!(&writer.get_ref().get_ref()[..], [1, 2, 0, 0]);
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[3, 4])?, Poll::Pending);
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[3, 4])?, Poll::Ready(2));
+ /// assert_eq!(&writer.get_ref().get_ref()[..], [1, 2, 3, 4]);
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[5, 6])?, Poll::Pending);
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[5, 6])?, Poll::Ready(0));
+ ///
+ /// assert_eq!(writer.as_mut().poll_flush(&mut cx)?, Poll::Pending);
+ /// assert_eq!(writer.as_mut().poll_flush(&mut cx)?, Poll::Ready(()));
+ ///
+ /// assert_eq!(writer.as_mut().poll_close(&mut cx)?, Poll::Pending);
+ /// assert_eq!(writer.as_mut().poll_close(&mut cx)?, Poll::Ready(()));
+ ///
+ /// # Ok::<(), std::io::Error>(())
+ /// ```
+ fn interleave_pending_write(self) -> InterleavePending<Self>
+ where
+ Self: Sized,
+ {
+ InterleavePending::new(self)
+ }
+
+ /// Limit the number of bytes allowed to be written on each call to `poll_write`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::task::Poll;
+ /// use futures::io::{AsyncWrite, Cursor};
+ /// use futures_test::task::noop_context;
+ /// use futures_test::io::AsyncWriteTestExt;
+ /// use futures::pin_mut;
+ ///
+ /// let writer = Cursor::new(vec![0u8; 4].into_boxed_slice()).limited_write(2);
+ /// pin_mut!(writer);
+ ///
+ /// let mut cx = noop_context();
+ ///
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[1, 2])?, Poll::Ready(2));
+ /// assert_eq!(&writer.get_ref().get_ref()[..], [1, 2, 0, 0]);
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[3])?, Poll::Ready(1));
+ /// assert_eq!(&writer.get_ref().get_ref()[..], [1, 2, 3, 0]);
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[4, 5])?, Poll::Ready(1));
+ /// assert_eq!(&writer.get_ref().get_ref()[..], [1, 2, 3, 4]);
+ /// assert_eq!(writer.as_mut().poll_write(&mut cx, &[5])?, Poll::Ready(0));
+ ///
+ /// # Ok::<(), std::io::Error>(())
+ /// ```
+ fn limited_write(self, limit: usize) -> Limited<Self>
+ where
+ Self: Sized,
+ {
+ Limited::new(self, limit)
+ }
+
+ /// Track whether this stream has been closed and errors if it is used after closing.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncWriteExt, Cursor};
+ /// use futures_test::io::AsyncWriteTestExt;
+ ///
+ /// let mut writer = Cursor::new(vec![0u8; 4]).track_closed();
+ ///
+ /// writer.write_all(&[1, 2]).await?;
+ /// assert!(!writer.is_closed());
+ /// writer.close().await?;
+ /// assert!(writer.is_closed());
+ ///
+ /// # Ok::<(), std::io::Error>(()) })?;
+ /// # Ok::<(), std::io::Error>(())
+ /// ```
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncWriteExt, Cursor};
+ /// use futures_test::io::AsyncWriteTestExt;
+ ///
+ /// let mut writer = Cursor::new(vec![0u8; 4]).track_closed();
+ ///
+ /// writer.close().await?;
+ /// assert!(writer.write_all(&[1, 2]).await.is_err());
+ /// # Ok::<(), std::io::Error>(()) })?;
+ /// # Ok::<(), std::io::Error>(())
+ /// ```
+ fn track_closed(self) -> TrackClosed<Self>
+ where
+ Self: Sized,
+ {
+ TrackClosed::new(self)
+ }
+}
+
+impl<W> AsyncWriteTestExt for W where W: AsyncWrite {}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..2eb4a1c
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,86 @@
+//! Utilities to make testing [`Future`s](futures_core::future::Future) easier
+
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ single_use_lifetimes,
+ unreachable_pub
+)]
+#![doc(test(
+ no_crate_inject,
+ attr(
+ deny(warnings, rust_2018_idioms, single_use_lifetimes),
+ allow(dead_code, unused_assignments, unused_variables)
+ )
+))]
+
+#[cfg(not(feature = "std"))]
+compile_error!(
+ "`futures-test` must have the `std` feature activated, this is a default-active feature"
+);
+
+// Not public API.
+#[doc(hidden)]
+#[cfg(feature = "std")]
+pub mod __private {
+ pub use futures_core::{future, stream, task};
+ pub use futures_executor::block_on;
+ pub use std::{
+ option::Option::{None, Some},
+ pin::Pin,
+ result::Result::{Err, Ok},
+ };
+
+ pub mod assert {
+ pub use crate::assert::*;
+ }
+}
+
+#[macro_use]
+#[cfg(feature = "std")]
+mod assert;
+
+#[cfg(feature = "std")]
+pub mod task;
+
+#[cfg(feature = "std")]
+pub mod future;
+
+#[cfg(feature = "std")]
+pub mod stream;
+
+#[cfg(feature = "std")]
+pub mod sink;
+
+#[cfg(feature = "std")]
+pub mod io;
+
+mod assert_unmoved;
+mod interleave_pending;
+mod track_closed;
+
+/// Enables an `async` test function. The generated future will be run to completion with
+/// [`futures_executor::block_on`](futures_executor::block_on).
+///
+/// ```
+/// #[futures_test::test]
+/// async fn my_test() {
+/// let fut = async { true };
+/// assert!(fut.await);
+/// }
+/// ```
+///
+/// This is equivalent to the following code:
+///
+/// ```
+/// #[test]
+/// fn my_test() {
+/// futures::executor::block_on(async move {
+/// let fut = async { true };
+/// assert!(fut.await);
+/// })
+/// }
+/// ```
+#[cfg(feature = "std")]
+pub use futures_macro::test_internal as test;
diff --git a/src/sink/mod.rs b/src/sink/mod.rs
new file mode 100644
index 0000000..eb5a6ef
--- /dev/null
+++ b/src/sink/mod.rs
@@ -0,0 +1,82 @@
+//! Additional combinators for testing sinks.
+
+use futures_sink::Sink;
+
+pub use crate::assert_unmoved::AssertUnmoved;
+pub use crate::interleave_pending::InterleavePending;
+pub use crate::track_closed::TrackClosed;
+
+/// Additional combinators for testing sinks.
+pub trait SinkTestExt<Item>: Sink<Item> {
+ /// Asserts that the given is not moved after being polled.
+ ///
+ /// A check for movement is performed each time the sink is polled
+ /// and when `Drop` is called.
+ ///
+ /// Aside from keeping track of the location at which the sink was first
+ /// polled and providing assertions, this sink adds no runtime behavior
+ /// and simply delegates to the child sink.
+ fn assert_unmoved_sink(self) -> AssertUnmoved<Self>
+ where
+ Self: Sized,
+ {
+ AssertUnmoved::new(self)
+ }
+
+ /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending)
+ /// in between each operation on the sink.
+ fn interleave_pending_sink(self) -> InterleavePending<Self>
+ where
+ Self: Sized,
+ {
+ InterleavePending::new(self)
+ }
+
+ /// Track whether this sink has been closed and panics if it is used after closing.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::sink::{SinkExt, drain};
+ /// use futures_test::sink::SinkTestExt;
+ ///
+ /// let mut sink = drain::<i32>().track_closed();
+ ///
+ /// sink.send(1).await?;
+ /// assert!(!sink.is_closed());
+ /// sink.close().await?;
+ /// assert!(sink.is_closed());
+ ///
+ /// # Ok::<(), std::convert::Infallible>(()) })?;
+ /// # Ok::<(), std::convert::Infallible>(())
+ /// ```
+ ///
+ /// Note: Unlike [`AsyncWriteTestExt::track_closed`] when
+ /// used as a sink the adaptor will panic if closed too early as there's no easy way to
+ /// integrate as an error.
+ ///
+ /// [`AsyncWriteTestExt::track_closed`]: crate::io::AsyncWriteTestExt::track_closed
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use std::panic::AssertUnwindSafe;
+ /// use futures::{sink::{SinkExt, drain}, future::FutureExt};
+ /// use futures_test::sink::SinkTestExt;
+ ///
+ /// let mut sink = drain::<i32>().track_closed();
+ ///
+ /// sink.close().await?;
+ /// assert!(AssertUnwindSafe(sink.send(1)).catch_unwind().await.is_err());
+ /// # Ok::<(), std::convert::Infallible>(()) })?;
+ /// # Ok::<(), std::convert::Infallible>(())
+ /// ```
+ fn track_closed(self) -> TrackClosed<Self>
+ where
+ Self: Sized,
+ {
+ TrackClosed::new(self)
+ }
+}
+
+impl<Item, W> SinkTestExt<Item> for W where W: Sink<Item> {}
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
new file mode 100644
index 0000000..9151a21
--- /dev/null
+++ b/src/stream/mod.rs
@@ -0,0 +1,57 @@
+//! Additional combinators for testing streams.
+
+use futures_core::stream::Stream;
+
+pub use crate::assert_unmoved::AssertUnmoved;
+pub use crate::interleave_pending::InterleavePending;
+
+/// Additional combinators for testing streams.
+pub trait StreamTestExt: Stream {
+ /// Asserts that the given is not moved after being polled.
+ ///
+ /// A check for movement is performed each time the stream is polled
+ /// and when `Drop` is called.
+ ///
+ /// Aside from keeping track of the location at which the stream was first
+ /// polled and providing assertions, this stream adds no runtime behavior
+ /// and simply delegates to the child stream.
+ fn assert_unmoved(self) -> AssertUnmoved<Self>
+ where
+ Self: Sized,
+ {
+ AssertUnmoved::new(self)
+ }
+
+ /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending)
+ /// in between each item of the stream.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::task::Poll;
+ /// use futures::stream::{self, Stream};
+ /// use futures_test::task::noop_context;
+ /// use futures_test::stream::StreamTestExt;
+ /// use futures::pin_mut;
+ ///
+ /// let stream = stream::iter(vec![1, 2]).interleave_pending();
+ /// pin_mut!(stream);
+ ///
+ /// let mut cx = noop_context();
+ ///
+ /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending);
+ /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(1)));
+ /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending);
+ /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(2)));
+ /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending);
+ /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(None));
+ /// ```
+ fn interleave_pending(self) -> InterleavePending<Self>
+ where
+ Self: Sized,
+ {
+ InterleavePending::new(self)
+ }
+}
+
+impl<St> StreamTestExt for St where St: Stream {}
diff --git a/src/task/context.rs b/src/task/context.rs
new file mode 100644
index 0000000..b2b0dfe
--- /dev/null
+++ b/src/task/context.rs
@@ -0,0 +1,37 @@
+use crate::task::{noop_waker_ref, panic_waker_ref};
+use futures_core::task::Context;
+
+/// Create a new [`Context`](core::task::Context) where the
+/// [waker](core::task::Context::waker) will panic if used.
+///
+/// # Examples
+///
+/// ```should_panic
+/// use futures_test::task::panic_context;
+///
+/// let cx = panic_context();
+/// cx.waker().wake_by_ref(); // Will panic
+/// ```
+pub fn panic_context() -> Context<'static> {
+ Context::from_waker(panic_waker_ref())
+}
+
+/// Create a new [`Context`](core::task::Context) where the
+/// [waker](core::task::Context::waker) will ignore any uses.
+///
+/// # Examples
+///
+/// ```
+/// use futures::future::Future;
+/// use futures::task::Poll;
+/// use futures_test::task::noop_context;
+/// use futures::pin_mut;
+///
+/// let future = async { 5 };
+/// pin_mut!(future);
+///
+/// assert_eq!(future.poll(&mut noop_context()), Poll::Ready(5));
+/// ```
+pub fn noop_context() -> Context<'static> {
+ Context::from_waker(noop_waker_ref())
+}
diff --git a/src/task/mod.rs b/src/task/mod.rs
new file mode 100644
index 0000000..cec645d
--- /dev/null
+++ b/src/task/mod.rs
@@ -0,0 +1,60 @@
+// TODO: note that paths like futures_core::task::Context actually get redirected to core::task::Context
+// in the rendered docs. Is this desirable? If so, should we change the paths here?
+//
+// Also, there is cross crate links in here. They are not going to work anytime soon. Do we put https links
+// in here? to here: https://rust-lang.github.io/futures-api-docs? The problem is these have a
+// version hardcoded in the url: 0.3.0-alpha.16 We could link to docs.rs, but currently that says:
+// docs.rs failed to build futures-0.3.0-alpha.16 -> ok the reason seems to be that they are on
+// 2019-04-17 which does still have futures-api unstable feature, so that should get solved.
+//
+//! Task related testing utilities.
+//!
+//! This module provides utilities for creating test
+//! [`Context`](futures_core::task::Context)s,
+//! [`Waker`](futures_core::task::Waker)s and
+//! [`Spawn`](futures_task::Spawn) implementations.
+//!
+//! Test contexts:
+//! - [`noop_context`](crate::task::noop_context) creates a context that ignores calls to
+//! [`cx.waker().wake_by_ref()`](futures_core::task::Waker).
+//! - [`panic_context`](crate::task::panic_context) creates a context that panics when
+//! [`cx.waker().wake_by_ref()`](futures_core::task::Waker) is called.
+//!
+//! Test wakers:
+//! - [`noop_waker`](crate::task::noop_waker) creates a waker that ignores calls to
+//! [`wake`](futures_core::task::Waker).
+//! - [`panic_waker`](crate::task::panic_waker) creates a waker that panics when
+//! [`wake`](futures_core::task::Waker) is called.
+//! - [`new_count_waker`](crate::task::new_count_waker) creates a waker that increments a counter whenever
+//! [`wake`](futures_core::task::Waker) is called.
+//!
+//! Test spawners:
+//! - [`NoopSpawner`](crate::task::NoopSpawner) ignores calls to
+//! [`spawn`](futures_util::task::SpawnExt::spawn)
+//! - [`PanicSpawner`](crate::task::PanicSpawner) panics if [`spawn`](futures_util::task::SpawnExt::spawn) is
+//! called.
+//! - [`RecordSpawner`](crate::task::RecordSpawner) records the spawned futures.
+//!
+//! For convenience there additionally exist various functions that directly
+//! return waker/spawner references: [`noop_waker_ref`](crate::task::noop_waker_ref),
+//! [`panic_waker_ref`](crate::task::panic_waker_ref), [`noop_spawner_mut`](crate::task::noop_spawner_mut) and [`panic_spawner_mut`](crate::task::panic_spawner_mut).
+
+mod context;
+pub use self::context::{noop_context, panic_context};
+
+mod noop_spawner;
+pub use self::noop_spawner::{noop_spawner_mut, NoopSpawner};
+
+pub use futures_util::task::{noop_waker, noop_waker_ref};
+
+mod panic_spawner;
+pub use self::panic_spawner::{panic_spawner_mut, PanicSpawner};
+
+mod panic_waker;
+pub use self::panic_waker::{panic_waker, panic_waker_ref};
+
+mod record_spawner;
+pub use self::record_spawner::RecordSpawner;
+
+mod wake_counter;
+pub use self::wake_counter::{new_count_waker, AwokenCount};
diff --git a/src/task/noop_spawner.rs b/src/task/noop_spawner.rs
new file mode 100644
index 0000000..8967f91
--- /dev/null
+++ b/src/task/noop_spawner.rs
@@ -0,0 +1,52 @@
+use futures_task::{FutureObj, Spawn, SpawnError};
+
+/// An implementation of [`Spawn`](futures_task::Spawn) that
+/// discards spawned futures when used.
+///
+/// # Examples
+///
+/// ```
+/// use futures::task::SpawnExt;
+/// use futures_test::task::NoopSpawner;
+///
+/// let spawner = NoopSpawner::new();
+/// spawner.spawn(async { }).unwrap();
+/// ```
+#[derive(Debug)]
+pub struct NoopSpawner {
+ _reserved: (),
+}
+
+impl NoopSpawner {
+ /// Create a new instance
+ pub fn new() -> Self {
+ Self { _reserved: () }
+ }
+}
+
+impl Spawn for NoopSpawner {
+ fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
+ Ok(())
+ }
+}
+
+impl Default for NoopSpawner {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Get a reference to a singleton instance of [`NoopSpawner`].
+///
+/// # Examples
+///
+/// ```
+/// use futures::task::SpawnExt;
+/// use futures_test::task::noop_spawner_mut;
+///
+/// let spawner = noop_spawner_mut();
+/// spawner.spawn(async { }).unwrap();
+/// ```
+pub fn noop_spawner_mut() -> &'static mut NoopSpawner {
+ Box::leak(Box::new(NoopSpawner::new()))
+}
diff --git a/src/task/panic_spawner.rs b/src/task/panic_spawner.rs
new file mode 100644
index 0000000..e29463d
--- /dev/null
+++ b/src/task/panic_spawner.rs
@@ -0,0 +1,54 @@
+use futures_task::{FutureObj, Spawn, SpawnError};
+
+/// An implementation of [`Spawn`](futures_task::Spawn) that panics
+/// when used.
+///
+/// # Examples
+///
+/// ```should_panic
+/// use futures::task::SpawnExt;
+/// use futures_test::task::PanicSpawner;
+///
+/// let spawn = PanicSpawner::new();
+/// spawn.spawn(async { })?; // Will panic
+/// # Ok::<(), Box<dyn std::error::Error>>(())
+/// ```
+#[derive(Debug)]
+pub struct PanicSpawner {
+ _reserved: (),
+}
+
+impl PanicSpawner {
+ /// Create a new instance
+ pub fn new() -> Self {
+ Self { _reserved: () }
+ }
+}
+
+impl Spawn for PanicSpawner {
+ fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
+ panic!("should not spawn")
+ }
+}
+
+impl Default for PanicSpawner {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Get a reference to a singleton instance of [`PanicSpawner`].
+///
+/// # Examples
+///
+/// ```should_panic
+/// use futures::task::SpawnExt;
+/// use futures_test::task::panic_spawner_mut;
+///
+/// let spawner = panic_spawner_mut();
+/// spawner.spawn(async { })?; // Will panic
+/// # Ok::<(), Box<dyn std::error::Error>>(())
+/// ```
+pub fn panic_spawner_mut() -> &'static mut PanicSpawner {
+ Box::leak(Box::new(PanicSpawner::new()))
+}
diff --git a/src/task/panic_waker.rs b/src/task/panic_waker.rs
new file mode 100644
index 0000000..38e2443
--- /dev/null
+++ b/src/task/panic_waker.rs
@@ -0,0 +1,70 @@
+use core::ptr::null;
+use futures_core::task::{RawWaker, RawWakerVTable, Waker};
+
+unsafe fn clone_panic_waker(_data: *const ()) -> RawWaker {
+ raw_panic_waker()
+}
+
+unsafe fn noop(_data: *const ()) {}
+
+unsafe fn wake_panic(_data: *const ()) {
+ if !std::thread::panicking() {
+ panic!("should not be woken");
+ }
+}
+
+const PANIC_WAKER_VTABLE: RawWakerVTable =
+ RawWakerVTable::new(clone_panic_waker, wake_panic, wake_panic, noop);
+
+const fn raw_panic_waker() -> RawWaker {
+ RawWaker::new(null(), &PANIC_WAKER_VTABLE)
+}
+
+/// Create a new [`Waker`](futures_core::task::Waker) which will
+/// panic when `wake()` is called on it. The [`Waker`] can be converted
+/// into a [`Waker`] which will behave the same way.
+///
+/// # Examples
+///
+/// ```should_panic
+/// use futures_test::task::panic_waker;
+///
+/// let waker = panic_waker();
+/// waker.wake(); // Will panic
+/// ```
+pub fn panic_waker() -> Waker {
+ // FIXME: Since 1.46.0 we can use transmute in consts, allowing this function to be const.
+ unsafe { Waker::from_raw(raw_panic_waker()) }
+}
+
+/// Get a global reference to a
+/// [`Waker`](futures_core::task::Waker) referencing a singleton
+/// instance of a [`Waker`] which panics when woken.
+///
+/// # Examples
+///
+/// ```should_panic
+/// use futures_test::task::panic_waker_ref;
+///
+/// let waker = panic_waker_ref();
+/// waker.wake_by_ref(); // Will panic
+/// ```
+pub fn panic_waker_ref() -> &'static Waker {
+ struct SyncRawWaker(RawWaker);
+ unsafe impl Sync for SyncRawWaker {}
+
+ static PANIC_WAKER_INSTANCE: SyncRawWaker = SyncRawWaker(raw_panic_waker());
+
+ // SAFETY: `Waker` is #[repr(transparent)] over its `RawWaker`.
+ unsafe { &*(&PANIC_WAKER_INSTANCE.0 as *const RawWaker as *const Waker) }
+}
+
+#[cfg(test)]
+mod tests {
+ #[test]
+ #[should_panic(expected = "should not be woken")]
+ fn issue_2091_cross_thread_segfault() {
+ let waker = std::thread::spawn(super::panic_waker_ref).join().unwrap();
+ waker.wake_by_ref();
+ }
+}
diff --git a/src/task/record_spawner.rs b/src/task/record_spawner.rs
new file mode 100644
index 0000000..59539fa
--- /dev/null
+++ b/src/task/record_spawner.rs
@@ -0,0 +1,39 @@
+use futures_task::{FutureObj, Spawn, SpawnError};
+use std::cell::{Ref, RefCell};
+
+/// An implementation of [`Spawn`](futures_task::Spawn) that records
+/// any [`Future`](futures_core::future::Future)s spawned on it.
+///
+/// # Examples
+///
+/// ```
+/// use futures::task::SpawnExt;
+/// use futures_test::task::RecordSpawner;
+///
+/// let recorder = RecordSpawner::new();
+/// recorder.spawn(async { }).unwrap();
+/// assert_eq!(recorder.spawned().len(), 1);
+/// ```
+#[derive(Debug, Default)]
+pub struct RecordSpawner {
+ spawned: RefCell<Vec<FutureObj<'static, ()>>>,
+}
+
+impl RecordSpawner {
+ /// Create a new instance
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Inspect any futures that were spawned onto this [`Spawn`].
+ pub fn spawned(&self) -> Ref<'_, Vec<FutureObj<'static, ()>>> {
+ self.spawned.borrow()
+ }
+}
+
+impl Spawn for RecordSpawner {
+ fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
+ self.spawned.borrow_mut().push(future);
+ Ok(())
+ }
+}
diff --git a/src/task/wake_counter.rs b/src/task/wake_counter.rs
new file mode 100644
index 0000000..52c63e1
--- /dev/null
+++ b/src/task/wake_counter.rs
@@ -0,0 +1,59 @@
+use futures_core::task::Waker;
+use futures_util::task::{self, ArcWake};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
+/// Number of times the waker was awoken.
+///
+/// See [`new_count_waker`] for usage.
+#[derive(Debug)]
+pub struct AwokenCount {
+ inner: Arc<WakerInner>,
+}
+
+impl AwokenCount {
+ /// Get the current count.
+ pub fn get(&self) -> usize {
+ self.inner.count.load(Ordering::SeqCst)
+ }
+}
+
+impl PartialEq<usize> for AwokenCount {
+ fn eq(&self, other: &usize) -> bool {
+ self.get() == *other
+ }
+}
+
+#[derive(Debug)]
+struct WakerInner {
+ count: AtomicUsize,
+}
+
+impl ArcWake for WakerInner {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ let _ = arc_self.count.fetch_add(1, Ordering::SeqCst);
+ }
+}
+
+/// Create a new [`Waker`] that counts the number of times it's awoken.
+///
+/// [`Waker`]: futures_core::task::Waker
+///
+/// # Examples
+///
+/// ```
+/// use futures_test::task::new_count_waker;
+///
+/// let (waker, count) = new_count_waker();
+///
+/// assert_eq!(count, 0);
+///
+/// waker.wake_by_ref();
+/// waker.wake();
+///
+/// assert_eq!(count, 2);
+/// ```
+pub fn new_count_waker() -> (Waker, AwokenCount) {
+ let inner = Arc::new(WakerInner { count: AtomicUsize::new(0) });
+ (task::waker(inner.clone()), AwokenCount { inner })
+}
diff --git a/src/track_closed.rs b/src/track_closed.rs
new file mode 100644
index 0000000..be883b1
--- /dev/null
+++ b/src/track_closed.rs
@@ -0,0 +1,143 @@
+use futures_io::AsyncWrite;
+use futures_sink::Sink;
+use std::{
+ io::{self, IoSlice},
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+/// Async wrapper that tracks whether it has been closed.
+///
+/// See the `track_closed` methods on:
+/// * [`SinkTestExt`](crate::sink::SinkTestExt::track_closed)
+/// * [`AsyncWriteTestExt`](crate::io::AsyncWriteTestExt::track_closed)
+#[pin_project::pin_project]
+#[derive(Debug)]
+pub struct TrackClosed<T> {
+ #[pin]
+ inner: T,
+ closed: bool,
+}
+
+impl<T> TrackClosed<T> {
+ pub(crate) fn new(inner: T) -> Self {
+ Self { inner, closed: false }
+ }
+
+ /// Check whether this object has been closed.
+ pub fn is_closed(&self) -> bool {
+ self.closed
+ }
+
+ /// Acquires a reference to the underlying object that this adaptor is
+ /// wrapping.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Acquires a mutable reference to the underlying object that this
+ /// adaptor is wrapping.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Acquires a pinned mutable reference to the underlying object that
+ /// this adaptor is wrapping.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
+ self.project().inner
+ }
+
+ /// Consumes this adaptor returning the underlying object.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+impl<T: AsyncWrite> AsyncWrite for TrackClosed<T> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.is_closed() {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Attempted to write after stream was closed",
+ )));
+ }
+ self.project().inner.poll_write(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ if self.is_closed() {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Attempted to flush after stream was closed",
+ )));
+ }
+ assert!(!self.is_closed());
+ self.project().inner.poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ if self.is_closed() {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Attempted to close after stream was closed",
+ )));
+ }
+ let this = self.project();
+ match this.inner.poll_close(cx) {
+ Poll::Ready(Ok(())) => {
+ *this.closed = true;
+ Poll::Ready(Ok(()))
+ }
+ other => other,
+ }
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ if self.is_closed() {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Attempted to write after stream was closed",
+ )));
+ }
+ self.project().inner.poll_write_vectored(cx, bufs)
+ }
+}
+
+impl<Item, T: Sink<Item>> Sink<Item> for TrackClosed<T> {
+ type Error = T::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ assert!(!self.is_closed());
+ self.project().inner.poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ assert!(!self.is_closed());
+ self.project().inner.start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ assert!(!self.is_closed());
+ self.project().inner.poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ assert!(!self.is_closed());
+ let this = self.project();
+ match this.inner.poll_close(cx) {
+ Poll::Ready(Ok(())) => {
+ *this.closed = true;
+ Poll::Ready(Ok(()))
+ }
+ other => other,
+ }
+ }
+}