Merge remote-tracking branch 'origin/upstream'
diff --git a/Android.bp b/Android.bp
new file mode 100644
index 0000000..61a2804
--- /dev/null
+++ b/Android.bp
@@ -0,0 +1,79 @@
+// This file is generated by cargo_embargo.
+// Do not modify this file after the first "rust_*" or "genrule" module
+// because the changes will be overridden on upgrade.
+// Content before the first "rust_*" or "genrule" module is preserved.
+
+package {
+    default_applicable_licenses: ["external_rust_crates_libhttp_body_license"],
+}
+
+license {
+    name: "external_rust_crates_libhttp_body_license",
+    visibility: [":__subpackages__"],
+    license_kinds: ["SPDX-license-identifier-MIT"],
+    license_text: ["LICENSE"],
+}
+
+rust_test {
+    name: "http-body_test_src_lib",
+    host_supported: true,
+    crate_name: "http_body",
+    cargo_env_compat: true,
+    cargo_pkg_version: "0.4.6",
+    crate_root: "src/lib.rs",
+    test_suites: ["general-tests"],
+    auto_gen_config: true,
+    test_options: {
+        unit_test: true,
+    },
+    edition: "2018",
+    rustlibs: [
+        "libbytes",
+        "libhttp",
+        "libpin_project_lite",
+        "libtokio",
+    ],
+}
+
+rust_test {
+    name: "http-body_test_tests_is_end_stream",
+    host_supported: true,
+    crate_name: "is_end_stream",
+    cargo_env_compat: true,
+    cargo_pkg_version: "0.4.6",
+    crate_root: "tests/is_end_stream.rs",
+    test_suites: ["general-tests"],
+    auto_gen_config: true,
+    test_options: {
+        unit_test: true,
+    },
+    edition: "2018",
+    rustlibs: [
+        "libbytes",
+        "libhttp",
+        "libhttp_body",
+        "libpin_project_lite",
+        "libtokio",
+    ],
+}
+
+rust_library {
+    name: "libhttp_body",
+    host_supported: true,
+    crate_name: "http_body",
+    cargo_env_compat: true,
+    cargo_pkg_version: "0.4.6",
+    crate_root: "src/lib.rs",
+    edition: "2018",
+    rustlibs: [
+        "libbytes",
+        "libhttp",
+        "libpin_project_lite",
+    ],
+    apex_available: [
+        "//apex_available:platform",
+        "//apex_available:anyapex",
+    ],
+    product_available: true,
+    vendor_available: true,
+}
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..bdb3a25
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,62 @@
+# 0.4.6 (December 8, 2023)
+
+- Add `Collect` combinator (backported from http-body-util).
+
+# 0.4.5 (May 20, 2022)
+
+- Add `String` impl for `Body`.
+- Add `Limited` body implementation.
+
+# 0.4.4 (October 22, 2021)
+
+- Add `UnsyncBoxBody` and `Body::boxed_unsync`.
+
+# 0.4.3 (August 8, 2021)
+
+- Implement `Default` for `BoxBody`.
+
+# 0.4.2 (May 8, 2021)
+
+- Correctly override `Body::size_hint` and `Body::is_end_stream` for `Empty`.
+- Add `Full` which is a body that consists of a single chunk.
+
+# 0.4.1 (March 18, 2021)
+
+- Add combinators to `Body`:
+  - `map_data`: Change the `Data` chunks produced by the body.
+  - `map_err`: Change the `Error`s produced by the body.
+  - `boxed`: Convert the `Body` into a boxed trait object.
+- Add `Empty`.
+
+# 0.4.0 (December 23, 2020)
+
+- Update `bytes` to v1.0.
+
+# 0.3.1 (December 13, 2019)
+
+- Implement `Body` for `http::Request<impl Body>` and `http::Response<impl Body>`.
+
+# 0.3.0 (December 4, 2019)
+
+- Rename `next` combinator to `data`.
+
+# 0.2.0 (December 3, 2019)
+
+- Update `http` to v0.2.
+- Update `bytes` to v0.5.
+
+# 0.2.0-alpha.3 (October 1, 2019)
+
+- Fix `Body` to be object-safe.
+
+# 0.2.0-alpha.2 (October 1, 2019)
+
+- Add `next` and `trailers` combinator methods.
+
+# 0.2.0-alpha.1 (August 20, 2019)
+
+- Update to use `Pin` in `poll_data` and `poll_trailers`.
+
+# 0.1.0 (May 7, 2019)
+
+- Initial release
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..8a6a94b
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,45 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies.
+#
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
+
+[package]
+edition = "2018"
+name = "http-body"
+version = "0.4.6"
+authors = [
+    "Carl Lerche <[email protected]>",
+    "Lucio Franco <[email protected]>",
+    "Sean McArthur <[email protected]>",
+]
+description = """
+Trait representing an asynchronous, streaming, HTTP request or response body.
+"""
+documentation = "https://docs.rs/http-body"
+readme = "README.md"
+keywords = ["http"]
+categories = ["web-programming"]
+license = "MIT"
+repository = "https://github.com/hyperium/http-body"
+
+[dependencies.bytes]
+version = "1"
+
+[dependencies.http]
+version = "0.2"
+
+[dependencies.pin-project-lite]
+version = "0.2"
+
+[dev-dependencies.tokio]
+version = "1"
+features = [
+    "macros",
+    "rt",
+]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644
index 0000000..e0db028
--- /dev/null
+++ b/Cargo.toml.orig
@@ -0,0 +1,34 @@
+[package]
+name = "http-body"
+# When releasing to crates.io:
+# - Remove path dependencies
+# - Update html_root_url.
+# - Update doc url
+#   - Cargo.toml
+#   - README.md
+# - Update CHANGELOG.md.
+# - Create "vx.y.z" git tag.
+version = "0.4.6"
+authors = [
+  "Carl Lerche <[email protected]>",
+  "Lucio Franco <[email protected]>",
+  "Sean McArthur <[email protected]>",
+]
+edition = "2018"
+readme = "README.md"
+documentation = "https://docs.rs/http-body"
+repository = "https://github.com/hyperium/http-body"
+license = "MIT"
+description = """
+Trait representing an asynchronous, streaming, HTTP request or response body.
+"""
+keywords = ["http"]
+categories = ["web-programming"]
+
+[dependencies]
+bytes = "1"
+http = "0.2"
+pin-project-lite = "0.2"
+
+[dev-dependencies]
+tokio = { version = "1", features = ["macros", "rt"] }
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..27b08f2
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,25 @@
+Copyright (c) 2019 Hyper Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/METADATA b/METADATA
new file mode 100644
index 0000000..af770ae
--- /dev/null
+++ b/METADATA
@@ -0,0 +1,20 @@
+name: "http-body"
+description: "Trait representing an asynchronous, streaming, HTTP request or response body."
+third_party {
+  identifier {
+    type: "crates.io"
+    value: "http-body"
+  }
+  identifier {
+    type: "Archive"
+    value: "https://static.crates.io/crates/http-body/http-body-0.4.6.crate"
+    primary_source: true
+  }
+  version: "0.4.6"
+  license_type: NOTICE
+  last_upgrade_date {
+    year: 2024
+    month: 5
+    day: 27
+  }
+}
diff --git a/MODULE_LICENSE_MIT b/MODULE_LICENSE_MIT
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/MODULE_LICENSE_MIT
diff --git a/OWNERS b/OWNERS
new file mode 100644
index 0000000..48bea6e
--- /dev/null
+++ b/OWNERS
@@ -0,0 +1,2 @@
+# Bug component: 688011
+include platform/prebuilts/rust:main:/OWNERS
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..c82ba29
--- /dev/null
+++ b/README.md
@@ -0,0 +1,27 @@
+# HTTP Body
+
+A trait representing asynchronous operations on an HTTP body.
+
+[![crates.io][crates-badge]][crates-url]
+[![documentation][docs-badge]][docs-url]
+[![MIT License][mit-badge]][mit-url]
+[![CI Status][ci-badge]][ci-url]
+
+[crates-badge]: https://img.shields.io/crates/v/http-body.svg
+[crates-url]: https://crates.io/crates/http-body
+[docs-badge]: https://docs.rs/http-body/badge.svg
+[docs-url]: https://docs.rs/http-body
+[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg
+[mit-url]: LICENSE
+[ci-badge]: https://github.com/hyperium/http-body/workflows/CI/badge.svg
+[ci-url]: https://github.com/hyperium/http-body/actions?query=workflow%3ACI
+
+## License
+
+This project is licensed under the [MIT license](LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in `http-body` by you, shall be licensed as MIT, without any additional
+terms or conditions.
diff --git a/cargo_embargo.json b/cargo_embargo.json
new file mode 100644
index 0000000..c8842d1
--- /dev/null
+++ b/cargo_embargo.json
@@ -0,0 +1,4 @@
+{
+  "run_cargo": false,
+  "tests": true
+}
diff --git a/src/collect.rs b/src/collect.rs
new file mode 100644
index 0000000..b065fff
--- /dev/null
+++ b/src/collect.rs
@@ -0,0 +1,222 @@
+use std::{
+    collections::VecDeque,
+    future::Future,
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+use super::Body;
+
+use bytes::{Buf, Bytes};
+use http::HeaderMap;
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Future that resolves into a [`Collected`].
+    pub struct Collect<T>
+    where
+        T: Body,
+    {
+        #[pin]
+        body: T,
+        collected: Option<Collected<T::Data>>,
+        is_data_done: bool,
+    }
+}
+
+impl<T: Body> Collect<T> {
+    pub(crate) fn new(body: T) -> Self {
+        Self {
+            body,
+            collected: Some(Collected::default()),
+            is_data_done: false,
+        }
+    }
+}
+
+impl<T: Body> Future for Collect<T> {
+    type Output = Result<Collected<T::Data>, T::Error>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let mut me = self.project();
+
+        loop {
+            if !*me.is_data_done {
+                match me.body.as_mut().poll_data(cx) {
+                    Poll::Ready(Some(Ok(data))) => {
+                        me.collected.as_mut().unwrap().push_data(data);
+                    }
+                    Poll::Ready(Some(Err(err))) => {
+                        return Poll::Ready(Err(err));
+                    }
+                    Poll::Ready(None) => {
+                        *me.is_data_done = true;
+                    }
+                    Poll::Pending => return Poll::Pending,
+                }
+            } else {
+                match me.body.as_mut().poll_trailers(cx) {
+                    Poll::Ready(Ok(Some(trailers))) => {
+                        me.collected.as_mut().unwrap().push_trailers(trailers);
+                        break;
+                    }
+                    Poll::Ready(Err(err)) => {
+                        return Poll::Ready(Err(err));
+                    }
+                    Poll::Ready(Ok(None)) => break,
+                    Poll::Pending => return Poll::Pending,
+                }
+            }
+        }
+
+        Poll::Ready(Ok(me.collected.take().expect("polled after complete")))
+    }
+}
+
+/// A collected body produced by [`Body::collect`] which collects all the DATA frames
+/// and trailers.
+#[derive(Debug)]
+pub struct Collected<B> {
+    bufs: BufList<B>,
+    trailers: Option<HeaderMap>,
+}
+
+impl<B: Buf> Collected<B> {
+    /// If there is a trailers frame buffered, returns a reference to it.
+    ///
+    /// Returns `None` if the body contained no trailers.
+    pub fn trailers(&self) -> Option<&HeaderMap> {
+        self.trailers.as_ref()
+    }
+
+    /// Aggregate this buffered into a [`Buf`].
+    pub fn aggregate(self) -> impl Buf {
+        self.bufs
+    }
+
+    /// Convert this body into a [`Bytes`].
+    pub fn to_bytes(mut self) -> Bytes {
+        self.bufs.copy_to_bytes(self.bufs.remaining())
+    }
+
+    fn push_data(&mut self, data: B) {
+        // Only push this frame if it has some data in it, to avoid crashing on
+        // `BufList::push`.
+        if data.has_remaining() {
+            self.bufs.push(data);
+        }
+    }
+
+    fn push_trailers(&mut self, trailers: HeaderMap) {
+        if let Some(current) = &mut self.trailers {
+            current.extend(trailers);
+        } else {
+            self.trailers = Some(trailers);
+        }
+    }
+}
+
+impl<B> Default for Collected<B> {
+    fn default() -> Self {
+        Self {
+            bufs: BufList::default(),
+            trailers: None,
+        }
+    }
+}
+
+impl<B> Unpin for Collected<B> {}
+
+#[derive(Debug)]
+struct BufList<T> {
+    bufs: VecDeque<T>,
+}
+
+impl<T: Buf> BufList<T> {
+    #[inline]
+    pub(crate) fn push(&mut self, buf: T) {
+        debug_assert!(buf.has_remaining());
+        self.bufs.push_back(buf);
+    }
+
+    /*
+    #[inline]
+    pub(crate) fn pop(&mut self) -> Option<T> {
+        self.bufs.pop_front()
+    }
+    */
+}
+
+impl<T: Buf> Buf for BufList<T> {
+    #[inline]
+    fn remaining(&self) -> usize {
+        self.bufs.iter().map(|buf| buf.remaining()).sum()
+    }
+
+    #[inline]
+    fn chunk(&self) -> &[u8] {
+        self.bufs.front().map(Buf::chunk).unwrap_or_default()
+    }
+
+    #[inline]
+    fn advance(&mut self, mut cnt: usize) {
+        while cnt > 0 {
+            {
+                let front = &mut self.bufs[0];
+                let rem = front.remaining();
+                if rem > cnt {
+                    front.advance(cnt);
+                    return;
+                } else {
+                    front.advance(rem);
+                    cnt -= rem;
+                }
+            }
+            self.bufs.pop_front();
+        }
+    }
+
+    #[inline]
+    fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
+        if dst.is_empty() {
+            return 0;
+        }
+        let mut vecs = 0;
+        for buf in &self.bufs {
+            vecs += buf.chunks_vectored(&mut dst[vecs..]);
+            if vecs == dst.len() {
+                break;
+            }
+        }
+        vecs
+    }
+
+    #[inline]
+    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
+        use bytes::{BufMut, BytesMut};
+        // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
+        // request can be fulfilled by the front buffer, we can take advantage.
+        match self.bufs.front_mut() {
+            Some(front) if front.remaining() == len => {
+                let b = front.copy_to_bytes(len);
+                self.bufs.pop_front();
+                b
+            }
+            Some(front) if front.remaining() > len => front.copy_to_bytes(len),
+            _ => {
+                assert!(len <= self.remaining(), "`len` greater than remaining");
+                let mut bm = BytesMut::with_capacity(len);
+                bm.put(self.take(len));
+                bm.freeze()
+            }
+        }
+    }
+}
+
+impl<T> Default for BufList<T> {
+    fn default() -> Self {
+        BufList {
+            bufs: VecDeque::new(),
+        }
+    }
+}
diff --git a/src/combinators/box_body.rs b/src/combinators/box_body.rs
new file mode 100644
index 0000000..97c8313
--- /dev/null
+++ b/src/combinators/box_body.rs
@@ -0,0 +1,134 @@
+use crate::Body;
+use bytes::Buf;
+use std::{
+    fmt,
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+/// A boxed [`Body`] trait object.
+pub struct BoxBody<D, E> {
+    inner: Pin<Box<dyn Body<Data = D, Error = E> + Send + Sync + 'static>>,
+}
+
+/// A boxed [`Body`] trait object that is !Sync.
+pub struct UnsyncBoxBody<D, E> {
+    inner: Pin<Box<dyn Body<Data = D, Error = E> + Send + 'static>>,
+}
+
+impl<D, E> BoxBody<D, E> {
+    /// Create a new `BoxBody`.
+    pub fn new<B>(body: B) -> Self
+    where
+        B: Body<Data = D, Error = E> + Send + Sync + 'static,
+        D: Buf,
+    {
+        Self {
+            inner: Box::pin(body),
+        }
+    }
+}
+
+impl<D, E> fmt::Debug for BoxBody<D, E> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("BoxBody").finish()
+    }
+}
+
+impl<D, E> Body for BoxBody<D, E>
+where
+    D: Buf,
+{
+    type Data = D;
+    type Error = E;
+
+    fn poll_data(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        self.inner.as_mut().poll_data(cx)
+    }
+
+    fn poll_trailers(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+        self.inner.as_mut().poll_trailers(cx)
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.inner.is_end_stream()
+    }
+
+    fn size_hint(&self) -> crate::SizeHint {
+        self.inner.size_hint()
+    }
+}
+
+impl<D, E> Default for BoxBody<D, E>
+where
+    D: Buf + 'static,
+{
+    fn default() -> Self {
+        BoxBody::new(crate::Empty::new().map_err(|err| match err {}))
+    }
+}
+
+// === UnsyncBoxBody ===
+impl<D, E> UnsyncBoxBody<D, E> {
+    /// Create a new `BoxBody`.
+    pub fn new<B>(body: B) -> Self
+    where
+        B: Body<Data = D, Error = E> + Send + 'static,
+        D: Buf,
+    {
+        Self {
+            inner: Box::pin(body),
+        }
+    }
+}
+
+impl<D, E> fmt::Debug for UnsyncBoxBody<D, E> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("UnsyncBoxBody").finish()
+    }
+}
+
+impl<D, E> Body for UnsyncBoxBody<D, E>
+where
+    D: Buf,
+{
+    type Data = D;
+    type Error = E;
+
+    fn poll_data(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        self.inner.as_mut().poll_data(cx)
+    }
+
+    fn poll_trailers(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+        self.inner.as_mut().poll_trailers(cx)
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.inner.is_end_stream()
+    }
+
+    fn size_hint(&self) -> crate::SizeHint {
+        self.inner.size_hint()
+    }
+}
+
+impl<D, E> Default for UnsyncBoxBody<D, E>
+where
+    D: Buf + 'static,
+{
+    fn default() -> Self {
+        UnsyncBoxBody::new(crate::Empty::new().map_err(|err| match err {}))
+    }
+}
diff --git a/src/combinators/map_data.rs b/src/combinators/map_data.rs
new file mode 100644
index 0000000..6d9c5a8
--- /dev/null
+++ b/src/combinators/map_data.rs
@@ -0,0 +1,94 @@
+use crate::Body;
+use bytes::Buf;
+use pin_project_lite::pin_project;
+use std::{
+    any::type_name,
+    fmt,
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+pin_project! {
+    /// Body returned by the [`map_data`] combinator.
+    ///
+    /// [`map_data`]: crate::util::BodyExt::map_data
+    #[derive(Clone, Copy)]
+    pub struct MapData<B, F> {
+        #[pin]
+        inner: B,
+        f: F
+    }
+}
+
+impl<B, F> MapData<B, F> {
+    #[inline]
+    pub(crate) fn new(body: B, f: F) -> Self {
+        Self { inner: body, f }
+    }
+
+    /// Get a reference to the inner body
+    pub fn get_ref(&self) -> &B {
+        &self.inner
+    }
+
+    /// Get a mutable reference to the inner body
+    pub fn get_mut(&mut self) -> &mut B {
+        &mut self.inner
+    }
+
+    /// Get a pinned mutable reference to the inner body
+    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut B> {
+        self.project().inner
+    }
+
+    /// Consume `self`, returning the inner body
+    pub fn into_inner(self) -> B {
+        self.inner
+    }
+}
+
+impl<B, F, B2> Body for MapData<B, F>
+where
+    B: Body,
+    F: FnMut(B::Data) -> B2,
+    B2: Buf,
+{
+    type Data = B2;
+    type Error = B::Error;
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        let this = self.project();
+        match this.inner.poll_data(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(None) => Poll::Ready(None),
+            Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok((this.f)(data)))),
+            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
+        }
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+        self.project().inner.poll_trailers(cx)
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.inner.is_end_stream()
+    }
+}
+
+impl<B, F> fmt::Debug for MapData<B, F>
+where
+    B: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("MapData")
+            .field("inner", &self.inner)
+            .field("f", &type_name::<F>())
+            .finish()
+    }
+}
diff --git a/src/combinators/map_err.rs b/src/combinators/map_err.rs
new file mode 100644
index 0000000..c77168d
--- /dev/null
+++ b/src/combinators/map_err.rs
@@ -0,0 +1,97 @@
+use crate::Body;
+use pin_project_lite::pin_project;
+use std::{
+    any::type_name,
+    fmt,
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+pin_project! {
+    /// Body returned by the [`map_err`] combinator.
+    ///
+    /// [`map_err`]: crate::util::BodyExt::map_err
+    #[derive(Clone, Copy)]
+    pub struct MapErr<B, F> {
+        #[pin]
+        inner: B,
+        f: F
+    }
+}
+
+impl<B, F> MapErr<B, F> {
+    #[inline]
+    pub(crate) fn new(body: B, f: F) -> Self {
+        Self { inner: body, f }
+    }
+
+    /// Get a reference to the inner body
+    pub fn get_ref(&self) -> &B {
+        &self.inner
+    }
+
+    /// Get a mutable reference to the inner body
+    pub fn get_mut(&mut self) -> &mut B {
+        &mut self.inner
+    }
+
+    /// Get a pinned mutable reference to the inner body
+    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut B> {
+        self.project().inner
+    }
+
+    /// Consume `self`, returning the inner body
+    pub fn into_inner(self) -> B {
+        self.inner
+    }
+}
+
+impl<B, F, E> Body for MapErr<B, F>
+where
+    B: Body,
+    F: FnMut(B::Error) -> E,
+{
+    type Data = B::Data;
+    type Error = E;
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        let this = self.project();
+        match this.inner.poll_data(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(None) => Poll::Ready(None),
+            Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok(data))),
+            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err((this.f)(err)))),
+        }
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+        let this = self.project();
+        this.inner.poll_trailers(cx).map_err(this.f)
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.inner.is_end_stream()
+    }
+
+    fn size_hint(&self) -> crate::SizeHint {
+        self.inner.size_hint()
+    }
+}
+
+impl<B, F> fmt::Debug for MapErr<B, F>
+where
+    B: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("MapErr")
+            .field("inner", &self.inner)
+            .field("f", &type_name::<F>())
+            .finish()
+    }
+}
diff --git a/src/combinators/mod.rs b/src/combinators/mod.rs
new file mode 100644
index 0000000..c52f367
--- /dev/null
+++ b/src/combinators/mod.rs
@@ -0,0 +1,11 @@
+//! Combinators for the `Body` trait.
+
+mod box_body;
+mod map_data;
+mod map_err;
+
+pub use self::{
+    box_body::{BoxBody, UnsyncBoxBody},
+    map_data::MapData,
+    map_err::MapErr,
+};
diff --git a/src/empty.rs b/src/empty.rs
new file mode 100644
index 0000000..7d63ceb
--- /dev/null
+++ b/src/empty.rs
@@ -0,0 +1,75 @@
+use super::{Body, SizeHint};
+use bytes::Buf;
+use http::HeaderMap;
+use std::{
+    convert::Infallible,
+    fmt,
+    marker::PhantomData,
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+/// A body that is always empty.
+pub struct Empty<D> {
+    _marker: PhantomData<fn() -> D>,
+}
+
+impl<D> Empty<D> {
+    /// Create a new `Empty`.
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
+
+impl<D: Buf> Body for Empty<D> {
+    type Data = D;
+    type Error = Infallible;
+
+    #[inline]
+    fn poll_data(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        Poll::Ready(None)
+    }
+
+    #[inline]
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        Poll::Ready(Ok(None))
+    }
+
+    fn is_end_stream(&self) -> bool {
+        true
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        SizeHint::with_exact(0)
+    }
+}
+
+impl<D> fmt::Debug for Empty<D> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Empty").finish()
+    }
+}
+
+impl<D> Default for Empty<D> {
+    fn default() -> Self {
+        Self {
+            _marker: PhantomData,
+        }
+    }
+}
+
+impl<D> Clone for Empty<D> {
+    fn clone(&self) -> Self {
+        Self {
+            _marker: PhantomData,
+        }
+    }
+}
+
+impl<D> Copy for Empty<D> {}
diff --git a/src/full.rs b/src/full.rs
new file mode 100644
index 0000000..f1d063b
--- /dev/null
+++ b/src/full.rs
@@ -0,0 +1,151 @@
+use crate::{Body, SizeHint};
+use bytes::{Buf, Bytes};
+use http::HeaderMap;
+use pin_project_lite::pin_project;
+use std::borrow::Cow;
+use std::convert::{Infallible, TryFrom};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+    /// A body that consists of a single chunk.
+    #[derive(Clone, Copy, Debug)]
+    pub struct Full<D> {
+        data: Option<D>,
+    }
+}
+
+impl<D> Full<D>
+where
+    D: Buf,
+{
+    /// Create a new `Full`.
+    pub fn new(data: D) -> Self {
+        let data = if data.has_remaining() {
+            Some(data)
+        } else {
+            None
+        };
+        Full { data }
+    }
+}
+
+impl<D> Body for Full<D>
+where
+    D: Buf,
+{
+    type Data = D;
+    type Error = Infallible;
+
+    fn poll_data(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<D, Self::Error>>> {
+        Poll::Ready(self.data.take().map(Ok))
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        Poll::Ready(Ok(None))
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.data.is_none()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        self.data
+            .as_ref()
+            .map(|data| SizeHint::with_exact(u64::try_from(data.remaining()).unwrap()))
+            .unwrap_or_else(|| SizeHint::with_exact(0))
+    }
+}
+
+impl<D> Default for Full<D>
+where
+    D: Buf,
+{
+    /// Create an empty `Full`.
+    fn default() -> Self {
+        Full { data: None }
+    }
+}
+
+impl<D> From<Bytes> for Full<D>
+where
+    D: Buf + From<Bytes>,
+{
+    fn from(bytes: Bytes) -> Self {
+        Full::new(D::from(bytes))
+    }
+}
+
+impl<D> From<Vec<u8>> for Full<D>
+where
+    D: Buf + From<Vec<u8>>,
+{
+    fn from(vec: Vec<u8>) -> Self {
+        Full::new(D::from(vec))
+    }
+}
+
+impl<D> From<&'static [u8]> for Full<D>
+where
+    D: Buf + From<&'static [u8]>,
+{
+    fn from(slice: &'static [u8]) -> Self {
+        Full::new(D::from(slice))
+    }
+}
+
+impl<D, B> From<Cow<'static, B>> for Full<D>
+where
+    D: Buf + From<&'static B> + From<B::Owned>,
+    B: ToOwned + ?Sized,
+{
+    fn from(cow: Cow<'static, B>) -> Self {
+        match cow {
+            Cow::Borrowed(b) => Full::new(D::from(b)),
+            Cow::Owned(o) => Full::new(D::from(o)),
+        }
+    }
+}
+
+impl<D> From<String> for Full<D>
+where
+    D: Buf + From<String>,
+{
+    fn from(s: String) -> Self {
+        Full::new(D::from(s))
+    }
+}
+
+impl<D> From<&'static str> for Full<D>
+where
+    D: Buf + From<&'static str>,
+{
+    fn from(slice: &'static str) -> Self {
+        Full::new(D::from(slice))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn full_returns_some() {
+        let mut full = Full::new(&b"hello"[..]);
+        assert_eq!(full.size_hint().exact(), Some(b"hello".len() as u64));
+        assert_eq!(full.data().await, Some(Ok(&b"hello"[..])));
+        assert!(full.data().await.is_none());
+    }
+
+    #[tokio::test]
+    async fn empty_full_returns_none() {
+        assert!(Full::<&[u8]>::default().data().await.is_none());
+        assert!(Full::new(&b""[..]).data().await.is_none());
+    }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..2535cda
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,335 @@
+#![doc(html_root_url = "https://docs.rs/http-body/0.4.6")]
+#![deny(
+    missing_debug_implementations,
+    missing_docs,
+    unreachable_pub,
+    broken_intra_doc_links
+)]
+#![cfg_attr(test, deny(warnings))]
+
+//! Asynchronous HTTP request or response body.
+//!
+//! See [`Body`] for more details.
+//!
+//! [`Body`]: trait.Body.html
+
+mod collect;
+mod empty;
+mod full;
+mod limited;
+mod next;
+mod size_hint;
+
+pub mod combinators;
+
+pub use self::collect::Collected;
+pub use self::empty::Empty;
+pub use self::full::Full;
+pub use self::limited::{LengthLimitError, Limited};
+pub use self::next::{Data, Trailers};
+pub use self::size_hint::SizeHint;
+
+use self::combinators::{BoxBody, MapData, MapErr, UnsyncBoxBody};
+use bytes::{Buf, Bytes};
+use http::HeaderMap;
+use std::convert::Infallible;
+use std::ops;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Trait representing a streaming body of a Request or Response.
+///
+/// Data is streamed via the `poll_data` function, which asynchronously yields `T: Buf` values. The
+/// `size_hint` function provides insight into the total number of bytes that will be streamed.
+///
+/// The `poll_trailers` function returns an optional set of trailers used to finalize the request /
+/// response exchange. This is mostly used when using the HTTP/2.0 protocol.
+///
+pub trait Body {
+    /// Values yielded by the `Body`.
+    type Data: Buf;
+
+    /// The error type this `Body` might generate.
+    type Error;
+
+    /// Attempt to pull out the next data buffer of this stream.
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>>;
+
+    /// Poll for an optional **single** `HeaderMap` of trailers.
+    ///
+    /// This function should only be called once `poll_data` returns `None`.
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>>;
+
+    /// Returns `true` when the end of stream has been reached.
+    ///
+    /// An end of stream means that both `poll_data` and `poll_trailers` will
+    /// return `None`.
+    ///
+    /// A return value of `false` **does not** guarantee that a value will be
+    /// returned from `poll_stream` or `poll_trailers`.
+    fn is_end_stream(&self) -> bool {
+        false
+    }
+
+    /// Returns the bounds on the remaining length of the stream.
+    ///
+    /// When the **exact** remaining length of the stream is known, the upper bound will be set and
+    /// will equal the lower bound.
+    fn size_hint(&self) -> SizeHint {
+        SizeHint::default()
+    }
+
+    /// Returns future that resolves to next data chunk, if any.
+    fn data(&mut self) -> Data<'_, Self>
+    where
+        Self: Unpin + Sized,
+    {
+        Data(self)
+    }
+
+    /// Returns future that resolves to trailers, if any.
+    fn trailers(&mut self) -> Trailers<'_, Self>
+    where
+        Self: Unpin + Sized,
+    {
+        Trailers(self)
+    }
+
+    /// Maps this body's data value to a different value.
+    fn map_data<F, B>(self, f: F) -> MapData<Self, F>
+    where
+        Self: Sized,
+        F: FnMut(Self::Data) -> B,
+        B: Buf,
+    {
+        MapData::new(self, f)
+    }
+
+    /// Maps this body's error value to a different value.
+    fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
+    where
+        Self: Sized,
+        F: FnMut(Self::Error) -> E,
+    {
+        MapErr::new(self, f)
+    }
+
+    /// Turn this body into [`Collected`] body which will collect all the DATA frames
+    /// and trailers.
+    fn collect(self) -> crate::collect::Collect<Self>
+    where
+        Self: Sized,
+    {
+        collect::Collect::new(self)
+    }
+
+    /// Turn this body into a boxed trait object.
+    fn boxed(self) -> BoxBody<Self::Data, Self::Error>
+    where
+        Self: Sized + Send + Sync + 'static,
+    {
+        BoxBody::new(self)
+    }
+
+    /// Turn this body into a boxed trait object that is !Sync.
+    fn boxed_unsync(self) -> UnsyncBoxBody<Self::Data, Self::Error>
+    where
+        Self: Sized + Send + 'static,
+    {
+        UnsyncBoxBody::new(self)
+    }
+}
+
+impl<T: Body + Unpin + ?Sized> Body for &mut T {
+    type Data = T::Data;
+    type Error = T::Error;
+
+    fn poll_data(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        Pin::new(&mut **self).poll_data(cx)
+    }
+
+    fn poll_trailers(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        Pin::new(&mut **self).poll_trailers(cx)
+    }
+
+    fn is_end_stream(&self) -> bool {
+        Pin::new(&**self).is_end_stream()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        Pin::new(&**self).size_hint()
+    }
+}
+
+impl<P> Body for Pin<P>
+where
+    P: Unpin + ops::DerefMut,
+    P::Target: Body,
+{
+    type Data = <<P as ops::Deref>::Target as Body>::Data;
+    type Error = <<P as ops::Deref>::Target as Body>::Error;
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        Pin::get_mut(self).as_mut().poll_data(cx)
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        Pin::get_mut(self).as_mut().poll_trailers(cx)
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.as_ref().is_end_stream()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        self.as_ref().size_hint()
+    }
+}
+
+impl<T: Body + Unpin + ?Sized> Body for Box<T> {
+    type Data = T::Data;
+    type Error = T::Error;
+
+    fn poll_data(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        Pin::new(&mut **self).poll_data(cx)
+    }
+
+    fn poll_trailers(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        Pin::new(&mut **self).poll_trailers(cx)
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.as_ref().is_end_stream()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        self.as_ref().size_hint()
+    }
+}
+
+impl<B: Body> Body for http::Request<B> {
+    type Data = B::Data;
+    type Error = B::Error;
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        unsafe {
+            self.map_unchecked_mut(http::Request::body_mut)
+                .poll_data(cx)
+        }
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        unsafe {
+            self.map_unchecked_mut(http::Request::body_mut)
+                .poll_trailers(cx)
+        }
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.body().is_end_stream()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        self.body().size_hint()
+    }
+}
+
+impl<B: Body> Body for http::Response<B> {
+    type Data = B::Data;
+    type Error = B::Error;
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        unsafe {
+            self.map_unchecked_mut(http::Response::body_mut)
+                .poll_data(cx)
+        }
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        unsafe {
+            self.map_unchecked_mut(http::Response::body_mut)
+                .poll_trailers(cx)
+        }
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.body().is_end_stream()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        self.body().size_hint()
+    }
+}
+
+impl Body for String {
+    type Data = Bytes;
+    type Error = Infallible;
+
+    fn poll_data(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        if !self.is_empty() {
+            let s = std::mem::take(&mut *self);
+            Poll::Ready(Some(Ok(s.into_bytes().into())))
+        } else {
+            Poll::Ready(None)
+        }
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        Poll::Ready(Ok(None))
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.is_empty()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        SizeHint::with_exact(self.len() as u64)
+    }
+}
+
+#[cfg(test)]
+fn _assert_bounds() {
+    fn can_be_trait_object(_: &dyn Body<Data = std::io::Cursor<Vec<u8>>, Error = std::io::Error>) {}
+}
diff --git a/src/limited.rs b/src/limited.rs
new file mode 100644
index 0000000..a40add9
--- /dev/null
+++ b/src/limited.rs
@@ -0,0 +1,299 @@
+use crate::{Body, SizeHint};
+use bytes::Buf;
+use http::HeaderMap;
+use pin_project_lite::pin_project;
+use std::error::Error;
+use std::fmt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+    /// A length limited body.
+    ///
+    /// This body will return an error if more than the configured number
+    /// of bytes are returned on polling the wrapped body.
+    #[derive(Clone, Copy, Debug)]
+    pub struct Limited<B> {
+        remaining: usize,
+        #[pin]
+        inner: B,
+    }
+}
+
+impl<B> Limited<B> {
+    /// Create a new `Limited`.
+    pub fn new(inner: B, limit: usize) -> Self {
+        Self {
+            remaining: limit,
+            inner,
+        }
+    }
+}
+
+impl<B> Body for Limited<B>
+where
+    B: Body,
+    B::Error: Into<Box<dyn Error + Send + Sync>>,
+{
+    type Data = B::Data;
+    type Error = Box<dyn Error + Send + Sync>;
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        let this = self.project();
+        let res = match this.inner.poll_data(cx) {
+            Poll::Pending => return Poll::Pending,
+            Poll::Ready(None) => None,
+            Poll::Ready(Some(Ok(data))) => {
+                if data.remaining() > *this.remaining {
+                    *this.remaining = 0;
+                    Some(Err(LengthLimitError.into()))
+                } else {
+                    *this.remaining -= data.remaining();
+                    Some(Ok(data))
+                }
+            }
+            Poll::Ready(Some(Err(err))) => Some(Err(err.into())),
+        };
+
+        Poll::Ready(res)
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        let this = self.project();
+        let res = match this.inner.poll_trailers(cx) {
+            Poll::Pending => return Poll::Pending,
+            Poll::Ready(Ok(data)) => Ok(data),
+            Poll::Ready(Err(err)) => Err(err.into()),
+        };
+
+        Poll::Ready(res)
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.inner.is_end_stream()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        use std::convert::TryFrom;
+        match u64::try_from(self.remaining) {
+            Ok(n) => {
+                let mut hint = self.inner.size_hint();
+                if hint.lower() >= n {
+                    hint.set_exact(n)
+                } else if let Some(max) = hint.upper() {
+                    hint.set_upper(n.min(max))
+                } else {
+                    hint.set_upper(n)
+                }
+                hint
+            }
+            Err(_) => self.inner.size_hint(),
+        }
+    }
+}
+
+/// An error returned when body length exceeds the configured limit.
+#[derive(Debug)]
+#[non_exhaustive]
+pub struct LengthLimitError;
+
+impl fmt::Display for LengthLimitError {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.write_str("length limit exceeded")
+    }
+}
+
+impl Error for LengthLimitError {}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Full;
+    use bytes::Bytes;
+    use std::convert::Infallible;
+
+    #[tokio::test]
+    async fn read_for_body_under_limit_returns_data() {
+        const DATA: &[u8] = b"testing";
+        let inner = Full::new(Bytes::from(DATA));
+        let body = &mut Limited::new(inner, 8);
+
+        let mut hint = SizeHint::new();
+        hint.set_upper(7);
+        assert_eq!(body.size_hint().upper(), hint.upper());
+
+        let data = body.data().await.unwrap().unwrap();
+        assert_eq!(data, DATA);
+        hint.set_upper(0);
+        assert_eq!(body.size_hint().upper(), hint.upper());
+
+        assert!(matches!(body.data().await, None));
+    }
+
+    #[tokio::test]
+    async fn read_for_body_over_limit_returns_error() {
+        const DATA: &[u8] = b"testing a string that is too long";
+        let inner = Full::new(Bytes::from(DATA));
+        let body = &mut Limited::new(inner, 8);
+
+        let mut hint = SizeHint::new();
+        hint.set_upper(8);
+        assert_eq!(body.size_hint().upper(), hint.upper());
+
+        let error = body.data().await.unwrap().unwrap_err();
+        assert!(matches!(error.downcast_ref(), Some(LengthLimitError)));
+    }
+
+    struct Chunky(&'static [&'static [u8]]);
+
+    impl Body for Chunky {
+        type Data = &'static [u8];
+        type Error = Infallible;
+
+        fn poll_data(
+            self: Pin<&mut Self>,
+            _cx: &mut Context<'_>,
+        ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+            let mut this = self;
+            match this.0.split_first().map(|(&head, tail)| (Ok(head), tail)) {
+                Some((data, new_tail)) => {
+                    this.0 = new_tail;
+
+                    Poll::Ready(Some(data))
+                }
+                None => Poll::Ready(None),
+            }
+        }
+
+        fn poll_trailers(
+            self: Pin<&mut Self>,
+            _cx: &mut Context<'_>,
+        ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+            Poll::Ready(Ok(Some(HeaderMap::new())))
+        }
+    }
+
+    #[tokio::test]
+    async fn read_for_chunked_body_around_limit_returns_first_chunk_but_returns_error_on_over_limit_chunk(
+    ) {
+        const DATA: &[&[u8]] = &[b"testing ", b"a string that is too long"];
+        let inner = Chunky(DATA);
+        let body = &mut Limited::new(inner, 8);
+
+        let mut hint = SizeHint::new();
+        hint.set_upper(8);
+        assert_eq!(body.size_hint().upper(), hint.upper());
+
+        let data = body.data().await.unwrap().unwrap();
+        assert_eq!(data, DATA[0]);
+        hint.set_upper(0);
+        assert_eq!(body.size_hint().upper(), hint.upper());
+
+        let error = body.data().await.unwrap().unwrap_err();
+        assert!(matches!(error.downcast_ref(), Some(LengthLimitError)));
+    }
+
+    #[tokio::test]
+    async fn read_for_chunked_body_over_limit_on_first_chunk_returns_error() {
+        const DATA: &[&[u8]] = &[b"testing a string", b" that is too long"];
+        let inner = Chunky(DATA);
+        let body = &mut Limited::new(inner, 8);
+
+        let mut hint = SizeHint::new();
+        hint.set_upper(8);
+        assert_eq!(body.size_hint().upper(), hint.upper());
+
+        let error = body.data().await.unwrap().unwrap_err();
+        assert!(matches!(error.downcast_ref(), Some(LengthLimitError)));
+    }
+
+    #[tokio::test]
+    async fn read_for_chunked_body_under_limit_is_okay() {
+        const DATA: &[&[u8]] = &[b"test", b"ing!"];
+        let inner = Chunky(DATA);
+        let body = &mut Limited::new(inner, 8);
+
+        let mut hint = SizeHint::new();
+        hint.set_upper(8);
+        assert_eq!(body.size_hint().upper(), hint.upper());
+
+        let data = body.data().await.unwrap().unwrap();
+        assert_eq!(data, DATA[0]);
+        hint.set_upper(4);
+        assert_eq!(body.size_hint().upper(), hint.upper());
+
+        let data = body.data().await.unwrap().unwrap();
+        assert_eq!(data, DATA[1]);
+        hint.set_upper(0);
+        assert_eq!(body.size_hint().upper(), hint.upper());
+
+        assert!(matches!(body.data().await, None));
+    }
+
+    #[tokio::test]
+    async fn read_for_trailers_propagates_inner_trailers() {
+        const DATA: &[&[u8]] = &[b"test", b"ing!"];
+        let inner = Chunky(DATA);
+        let body = &mut Limited::new(inner, 8);
+        let trailers = body.trailers().await.unwrap();
+        assert_eq!(trailers, Some(HeaderMap::new()))
+    }
+
+    #[derive(Debug)]
+    enum ErrorBodyError {
+        Data,
+        Trailers,
+    }
+
+    impl fmt::Display for ErrorBodyError {
+        fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
+            Ok(())
+        }
+    }
+
+    impl Error for ErrorBodyError {}
+
+    struct ErrorBody;
+
+    impl Body for ErrorBody {
+        type Data = &'static [u8];
+        type Error = ErrorBodyError;
+
+        fn poll_data(
+            self: Pin<&mut Self>,
+            _cx: &mut Context<'_>,
+        ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+            Poll::Ready(Some(Err(ErrorBodyError::Data)))
+        }
+
+        fn poll_trailers(
+            self: Pin<&mut Self>,
+            _cx: &mut Context<'_>,
+        ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+            Poll::Ready(Err(ErrorBodyError::Trailers))
+        }
+    }
+
+    #[tokio::test]
+    async fn read_for_body_returning_error_propagates_error() {
+        let body = &mut Limited::new(ErrorBody, 8);
+        let error = body.data().await.unwrap().unwrap_err();
+        assert!(matches!(error.downcast_ref(), Some(ErrorBodyError::Data)));
+    }
+
+    #[tokio::test]
+    async fn trailers_for_body_returning_error_propagates_error() {
+        let body = &mut Limited::new(ErrorBody, 8);
+        let error = body.trailers().await.unwrap_err();
+        assert!(matches!(
+            error.downcast_ref(),
+            Some(ErrorBodyError::Trailers)
+        ));
+    }
+}
diff --git a/src/next.rs b/src/next.rs
new file mode 100644
index 0000000..fc87ffc
--- /dev/null
+++ b/src/next.rs
@@ -0,0 +1,31 @@
+use crate::Body;
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task;
+
+#[must_use = "futures don't do anything unless polled"]
+#[derive(Debug)]
+/// Future that resolves to the next data chunk from `Body`
+pub struct Data<'a, T: ?Sized>(pub(crate) &'a mut T);
+
+impl<'a, T: Body + Unpin + ?Sized> Future for Data<'a, T> {
+    type Output = Option<Result<T::Data, T::Error>>;
+
+    fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+        Pin::new(&mut self.0).poll_data(ctx)
+    }
+}
+
+#[must_use = "futures don't do anything unless polled"]
+#[derive(Debug)]
+/// Future that resolves to the optional trailers from `Body`
+pub struct Trailers<'a, T: ?Sized>(pub(crate) &'a mut T);
+
+impl<'a, T: Body + Unpin + ?Sized> Future for Trailers<'a, T> {
+    type Output = Result<Option<http::HeaderMap>, T::Error>;
+
+    fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+        Pin::new(&mut self.0).poll_trailers(ctx)
+    }
+}
diff --git a/src/size_hint.rs b/src/size_hint.rs
new file mode 100644
index 0000000..00a8f19
--- /dev/null
+++ b/src/size_hint.rs
@@ -0,0 +1,86 @@
+use std::u64;
+
+/// A `Body` size hint
+///
+/// The default implementation returns:
+///
+/// * 0 for `lower`
+/// * `None` for `upper`.
+#[derive(Debug, Default, Clone)]
+pub struct SizeHint {
+    lower: u64,
+    upper: Option<u64>,
+}
+
+impl SizeHint {
+    /// Returns a new `SizeHint` with default values
+    #[inline]
+    pub fn new() -> SizeHint {
+        SizeHint::default()
+    }
+
+    /// Returns a new `SizeHint` with both upper and lower bounds set to the
+    /// given value.
+    #[inline]
+    pub fn with_exact(value: u64) -> SizeHint {
+        SizeHint {
+            lower: value,
+            upper: Some(value),
+        }
+    }
+
+    /// Returns the lower bound of data that the `Body` will yield before
+    /// completing.
+    #[inline]
+    pub fn lower(&self) -> u64 {
+        self.lower
+    }
+
+    /// Set the value of the `lower` hint.
+    ///
+    /// # Panics
+    ///
+    /// The function panics if `value` is greater than `upper`.
+    #[inline]
+    pub fn set_lower(&mut self, value: u64) {
+        assert!(value <= self.upper.unwrap_or(u64::MAX));
+        self.lower = value;
+    }
+
+    /// Returns the upper bound of data the `Body` will yield before
+    /// completing, or `None` if the value is unknown.
+    #[inline]
+    pub fn upper(&self) -> Option<u64> {
+        self.upper
+    }
+
+    /// Set the value of the `upper` hint value.
+    ///
+    /// # Panics
+    ///
+    /// This function panics if `value` is less than `lower`.
+    #[inline]
+    pub fn set_upper(&mut self, value: u64) {
+        assert!(value >= self.lower, "`value` is less than than `lower`");
+
+        self.upper = Some(value);
+    }
+
+    /// Returns the exact size of data that will be yielded **if** the
+    /// `lower` and `upper` bounds are equal.
+    #[inline]
+    pub fn exact(&self) -> Option<u64> {
+        if Some(self.lower) == self.upper {
+            self.upper
+        } else {
+            None
+        }
+    }
+
+    /// Set the value of the `lower` and `upper` bounds to exactly the same.
+    #[inline]
+    pub fn set_exact(&mut self, value: u64) {
+        self.lower = value;
+        self.upper = Some(value);
+    }
+}
diff --git a/tests/is_end_stream.rs b/tests/is_end_stream.rs
new file mode 100644
index 0000000..beaeb0b
--- /dev/null
+++ b/tests/is_end_stream.rs
@@ -0,0 +1,79 @@
+use http::HeaderMap;
+use http_body::{Body, SizeHint};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+struct Mock {
+    size_hint: SizeHint,
+}
+
+impl Body for Mock {
+    type Data = ::std::io::Cursor<Vec<u8>>;
+    type Error = ();
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        Poll::Ready(None)
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
+        Poll::Ready(Ok(None))
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        self.size_hint.clone()
+    }
+}
+
+#[test]
+fn is_end_stream_true() {
+    let combos = [
+        (None, None, false),
+        (Some(123), None, false),
+        (Some(0), Some(123), false),
+        (Some(123), Some(123), false),
+        (Some(0), Some(0), false),
+    ];
+
+    for &(lower, upper, is_end_stream) in &combos {
+        let mut size_hint = SizeHint::new();
+        assert_eq!(0, size_hint.lower());
+        assert!(size_hint.upper().is_none());
+
+        if let Some(lower) = lower {
+            size_hint.set_lower(lower);
+        }
+
+        if let Some(upper) = upper {
+            size_hint.set_upper(upper);
+        }
+
+        let mut mock = Mock { size_hint };
+
+        assert_eq!(
+            is_end_stream,
+            Pin::new(&mut mock).is_end_stream(),
+            "size_hint = {:?}",
+            mock.size_hint.clone()
+        );
+    }
+}
+
+#[test]
+fn is_end_stream_default_false() {
+    let mut mock = Mock {
+        size_hint: SizeHint::default(),
+    };
+
+    assert_eq!(
+        false,
+        Pin::new(&mut mock).is_end_stream(),
+        "size_hint = {:?}",
+        mock.size_hint.clone()
+    );
+}