| #![warn(rust_2018_idioms)] |
| #![cfg(feature = "full")] |
| |
| // https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_writer.rs |
| |
| use futures::task::{Context, Poll}; |
| use std::io::{self, Cursor}; |
| use std::pin::Pin; |
| use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, SeekFrom}; |
| |
| use futures::future; |
| use tokio_test::assert_ok; |
| |
| use std::cmp; |
| use std::io::IoSlice; |
| |
| mod support { |
| pub(crate) mod io_vec; |
| } |
| use support::io_vec::IoBufs; |
| |
| struct MaybePending { |
| inner: Vec<u8>, |
| ready: bool, |
| } |
| |
| impl MaybePending { |
| fn new(inner: Vec<u8>) -> Self { |
| Self { |
| inner, |
| ready: false, |
| } |
| } |
| } |
| |
| impl AsyncWrite for MaybePending { |
| fn poll_write( |
| mut self: Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| buf: &[u8], |
| ) -> Poll<io::Result<usize>> { |
| if self.ready { |
| self.ready = false; |
| Pin::new(&mut self.inner).poll_write(cx, buf) |
| } else { |
| self.ready = true; |
| cx.waker().wake_by_ref(); |
| Poll::Pending |
| } |
| } |
| |
| fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| Pin::new(&mut self.inner).poll_flush(cx) |
| } |
| |
| fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| Pin::new(&mut self.inner).poll_shutdown(cx) |
| } |
| } |
| |
| async fn write_vectored<W>(writer: &mut W, bufs: &[IoSlice<'_>]) -> io::Result<usize> |
| where |
| W: AsyncWrite + Unpin, |
| { |
| let mut writer = Pin::new(writer); |
| future::poll_fn(|cx| writer.as_mut().poll_write_vectored(cx, bufs)).await |
| } |
| |
| #[tokio::test] |
| async fn buf_writer() { |
| let mut writer = BufWriter::with_capacity(2, Vec::new()); |
| |
| assert_eq!(writer.write(&[0, 1]).await.unwrap(), 2); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!(*writer.get_ref(), [0, 1]); |
| |
| assert_eq!(writer.write(&[2]).await.unwrap(), 1); |
| assert_eq!(writer.buffer(), [2]); |
| assert_eq!(*writer.get_ref(), [0, 1]); |
| |
| assert_eq!(writer.write(&[3]).await.unwrap(), 1); |
| assert_eq!(writer.buffer(), [2, 3]); |
| assert_eq!(*writer.get_ref(), [0, 1]); |
| |
| writer.flush().await.unwrap(); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); |
| |
| assert_eq!(writer.write(&[4]).await.unwrap(), 1); |
| assert_eq!(writer.write(&[5]).await.unwrap(), 1); |
| assert_eq!(writer.buffer(), [4, 5]); |
| assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); |
| |
| assert_eq!(writer.write(&[6]).await.unwrap(), 1); |
| assert_eq!(writer.buffer(), [6]); |
| assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); |
| |
| assert_eq!(writer.write(&[7, 8]).await.unwrap(), 2); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]); |
| |
| assert_eq!(writer.write(&[9, 10, 11]).await.unwrap(), 3); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); |
| |
| writer.flush().await.unwrap(); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); |
| } |
| |
| #[tokio::test] |
| async fn buf_writer_inner_flushes() { |
| let mut w = BufWriter::with_capacity(3, Vec::new()); |
| assert_eq!(w.write(&[0, 1]).await.unwrap(), 2); |
| assert_eq!(*w.get_ref(), []); |
| w.flush().await.unwrap(); |
| let w = w.into_inner(); |
| assert_eq!(w, [0, 1]); |
| } |
| |
| #[tokio::test] |
| async fn buf_writer_seek() { |
| let mut w = BufWriter::with_capacity(3, Cursor::new(Vec::new())); |
| w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap(); |
| w.write_all(&[6, 7]).await.unwrap(); |
| assert_eq!(w.seek(SeekFrom::Current(0)).await.unwrap(), 8); |
| assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); |
| assert_eq!(w.seek(SeekFrom::Start(2)).await.unwrap(), 2); |
| w.write_all(&[8, 9]).await.unwrap(); |
| w.flush().await.unwrap(); |
| assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); |
| } |
| |
| #[tokio::test] |
| async fn maybe_pending_buf_writer() { |
| let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); |
| |
| assert_eq!(writer.write(&[0, 1]).await.unwrap(), 2); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!(&writer.get_ref().inner, &[0, 1]); |
| |
| assert_eq!(writer.write(&[2]).await.unwrap(), 1); |
| assert_eq!(writer.buffer(), [2]); |
| assert_eq!(&writer.get_ref().inner, &[0, 1]); |
| |
| assert_eq!(writer.write(&[3]).await.unwrap(), 1); |
| assert_eq!(writer.buffer(), [2, 3]); |
| assert_eq!(&writer.get_ref().inner, &[0, 1]); |
| |
| writer.flush().await.unwrap(); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); |
| |
| assert_eq!(writer.write(&[4]).await.unwrap(), 1); |
| assert_eq!(writer.write(&[5]).await.unwrap(), 1); |
| assert_eq!(writer.buffer(), [4, 5]); |
| assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); |
| |
| assert_eq!(writer.write(&[6]).await.unwrap(), 1); |
| assert_eq!(writer.buffer(), [6]); |
| assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]); |
| |
| assert_eq!(writer.write(&[7, 8]).await.unwrap(), 2); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]); |
| |
| assert_eq!(writer.write(&[9, 10, 11]).await.unwrap(), 3); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!( |
| writer.get_ref().inner, |
| &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] |
| ); |
| |
| writer.flush().await.unwrap(); |
| assert_eq!(writer.buffer(), []); |
| assert_eq!( |
| &writer.get_ref().inner, |
| &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] |
| ); |
| } |
| |
| #[tokio::test] |
| async fn maybe_pending_buf_writer_inner_flushes() { |
| let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); |
| assert_eq!(w.write(&[0, 1]).await.unwrap(), 2); |
| assert_eq!(&w.get_ref().inner, &[]); |
| w.flush().await.unwrap(); |
| let w = w.into_inner().inner; |
| assert_eq!(w, [0, 1]); |
| } |
| |
| #[tokio::test] |
| async fn maybe_pending_buf_writer_seek() { |
| struct MaybePendingSeek { |
| inner: Cursor<Vec<u8>>, |
| ready_write: bool, |
| ready_seek: bool, |
| seek_res: Option<io::Result<()>>, |
| } |
| |
| impl MaybePendingSeek { |
| fn new(inner: Vec<u8>) -> Self { |
| Self { |
| inner: Cursor::new(inner), |
| ready_write: false, |
| ready_seek: false, |
| seek_res: None, |
| } |
| } |
| } |
| |
| impl AsyncWrite for MaybePendingSeek { |
| fn poll_write( |
| mut self: Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| buf: &[u8], |
| ) -> Poll<io::Result<usize>> { |
| if self.ready_write { |
| self.ready_write = false; |
| Pin::new(&mut self.inner).poll_write(cx, buf) |
| } else { |
| self.ready_write = true; |
| cx.waker().wake_by_ref(); |
| Poll::Pending |
| } |
| } |
| |
| fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| Pin::new(&mut self.inner).poll_flush(cx) |
| } |
| |
| fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| Pin::new(&mut self.inner).poll_shutdown(cx) |
| } |
| } |
| |
| impl AsyncSeek for MaybePendingSeek { |
| fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { |
| self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos)); |
| Ok(()) |
| } |
| fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { |
| if self.ready_seek { |
| self.ready_seek = false; |
| self.seek_res.take().unwrap_or(Ok(()))?; |
| Pin::new(&mut self.inner).poll_complete(cx) |
| } else { |
| self.ready_seek = true; |
| cx.waker().wake_by_ref(); |
| Poll::Pending |
| } |
| } |
| } |
| |
| let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(Vec::new())); |
| w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap(); |
| w.write_all(&[6, 7]).await.unwrap(); |
| assert_eq!(w.seek(SeekFrom::Current(0)).await.unwrap(), 8); |
| assert_eq!( |
| &w.get_ref().inner.get_ref()[..], |
| &[0, 1, 2, 3, 4, 5, 6, 7][..] |
| ); |
| assert_eq!(w.seek(SeekFrom::Start(2)).await.unwrap(), 2); |
| w.write_all(&[8, 9]).await.unwrap(); |
| w.flush().await.unwrap(); |
| assert_eq!( |
| &w.into_inner().inner.into_inner()[..], |
| &[0, 1, 8, 9, 4, 5, 6, 7] |
| ); |
| } |
| |
| struct MockWriter { |
| data: Vec<u8>, |
| write_len: usize, |
| vectored: bool, |
| } |
| |
| impl MockWriter { |
| fn new(write_len: usize) -> Self { |
| MockWriter { |
| data: Vec::new(), |
| write_len, |
| vectored: false, |
| } |
| } |
| |
| fn vectored(write_len: usize) -> Self { |
| MockWriter { |
| data: Vec::new(), |
| write_len, |
| vectored: true, |
| } |
| } |
| |
| fn write_up_to(&mut self, buf: &[u8], limit: usize) -> usize { |
| let len = cmp::min(buf.len(), limit); |
| self.data.extend_from_slice(&buf[..len]); |
| len |
| } |
| } |
| |
| impl AsyncWrite for MockWriter { |
| fn poll_write( |
| self: Pin<&mut Self>, |
| _: &mut Context<'_>, |
| buf: &[u8], |
| ) -> Poll<Result<usize, io::Error>> { |
| let this = self.get_mut(); |
| let n = this.write_up_to(buf, this.write_len); |
| Ok(n).into() |
| } |
| |
| fn poll_write_vectored( |
| self: Pin<&mut Self>, |
| _: &mut Context<'_>, |
| bufs: &[IoSlice<'_>], |
| ) -> Poll<Result<usize, io::Error>> { |
| let this = self.get_mut(); |
| let mut total_written = 0; |
| for buf in bufs { |
| let n = this.write_up_to(buf, this.write_len - total_written); |
| total_written += n; |
| if total_written == this.write_len { |
| break; |
| } |
| } |
| Ok(total_written).into() |
| } |
| |
| fn is_write_vectored(&self) -> bool { |
| self.vectored |
| } |
| |
| fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
| Ok(()).into() |
| } |
| |
| fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
| Ok(()).into() |
| } |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_empty_on_non_vectored() { |
| let mut w = BufWriter::new(MockWriter::new(4)); |
| let n = assert_ok!(write_vectored(&mut w, &[]).await); |
| assert_eq!(n, 0); |
| |
| let io_vec = [IoSlice::new(&[]); 3]; |
| let n = assert_ok!(write_vectored(&mut w, &io_vec).await); |
| assert_eq!(n, 0); |
| |
| assert_ok!(w.flush().await); |
| assert!(w.get_ref().data.is_empty()); |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_empty_on_vectored() { |
| let mut w = BufWriter::new(MockWriter::vectored(4)); |
| let n = assert_ok!(write_vectored(&mut w, &[]).await); |
| assert_eq!(n, 0); |
| |
| let io_vec = [IoSlice::new(&[]); 3]; |
| let n = assert_ok!(write_vectored(&mut w, &io_vec).await); |
| assert_eq!(n, 0); |
| |
| assert_ok!(w.flush().await); |
| assert!(w.get_ref().data.is_empty()); |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_basic_on_non_vectored() { |
| let msg = b"foo bar baz"; |
| let bufs = [ |
| IoSlice::new(&msg[0..4]), |
| IoSlice::new(&msg[4..8]), |
| IoSlice::new(&msg[8..]), |
| ]; |
| let mut w = BufWriter::new(MockWriter::new(4)); |
| let n = assert_ok!(write_vectored(&mut w, &bufs).await); |
| assert_eq!(n, msg.len()); |
| assert!(w.buffer() == &msg[..]); |
| assert_ok!(w.flush().await); |
| assert_eq!(w.get_ref().data, msg); |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_basic_on_vectored() { |
| let msg = b"foo bar baz"; |
| let bufs = [ |
| IoSlice::new(&msg[0..4]), |
| IoSlice::new(&msg[4..8]), |
| IoSlice::new(&msg[8..]), |
| ]; |
| let mut w = BufWriter::new(MockWriter::vectored(4)); |
| let n = assert_ok!(write_vectored(&mut w, &bufs).await); |
| assert_eq!(n, msg.len()); |
| assert!(w.buffer() == &msg[..]); |
| assert_ok!(w.flush().await); |
| assert_eq!(w.get_ref().data, msg); |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_large_total_on_non_vectored() { |
| let msg = b"foo bar baz"; |
| let mut bufs = [ |
| IoSlice::new(&msg[0..4]), |
| IoSlice::new(&msg[4..8]), |
| IoSlice::new(&msg[8..]), |
| ]; |
| let io_vec = IoBufs::new(&mut bufs); |
| let mut w = BufWriter::with_capacity(8, MockWriter::new(4)); |
| let n = assert_ok!(write_vectored(&mut w, &io_vec).await); |
| assert_eq!(n, 8); |
| assert!(w.buffer() == &msg[..8]); |
| let io_vec = io_vec.advance(n); |
| let n = assert_ok!(write_vectored(&mut w, &io_vec).await); |
| assert_eq!(n, 3); |
| assert!(w.get_ref().data.as_slice() == &msg[..8]); |
| assert!(w.buffer() == &msg[8..]); |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_large_total_on_vectored() { |
| let msg = b"foo bar baz"; |
| let mut bufs = [ |
| IoSlice::new(&msg[0..4]), |
| IoSlice::new(&msg[4..8]), |
| IoSlice::new(&msg[8..]), |
| ]; |
| let io_vec = IoBufs::new(&mut bufs); |
| let mut w = BufWriter::with_capacity(8, MockWriter::vectored(10)); |
| let n = assert_ok!(write_vectored(&mut w, &io_vec).await); |
| assert_eq!(n, 10); |
| assert!(w.buffer().is_empty()); |
| let io_vec = io_vec.advance(n); |
| let n = assert_ok!(write_vectored(&mut w, &io_vec).await); |
| assert_eq!(n, 1); |
| assert!(w.get_ref().data.as_slice() == &msg[..10]); |
| assert!(w.buffer() == &msg[10..]); |
| } |
| |
| struct VectoredWriteHarness { |
| writer: BufWriter<MockWriter>, |
| buf_capacity: usize, |
| } |
| |
| impl VectoredWriteHarness { |
| fn new(buf_capacity: usize) -> Self { |
| VectoredWriteHarness { |
| writer: BufWriter::with_capacity(buf_capacity, MockWriter::new(4)), |
| buf_capacity, |
| } |
| } |
| |
| fn with_vectored_backend(buf_capacity: usize) -> Self { |
| VectoredWriteHarness { |
| writer: BufWriter::with_capacity(buf_capacity, MockWriter::vectored(4)), |
| buf_capacity, |
| } |
| } |
| |
| async fn write_all<'a, 'b>(&mut self, mut io_vec: IoBufs<'a, 'b>) -> usize { |
| let mut total_written = 0; |
| while !io_vec.is_empty() { |
| let n = assert_ok!(write_vectored(&mut self.writer, &io_vec).await); |
| assert!(n != 0); |
| assert!(self.writer.buffer().len() <= self.buf_capacity); |
| total_written += n; |
| io_vec = io_vec.advance(n); |
| } |
| total_written |
| } |
| |
| async fn flush(&mut self) -> &[u8] { |
| assert_ok!(self.writer.flush().await); |
| &self.writer.get_ref().data |
| } |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_odd_on_non_vectored() { |
| let msg = b"foo bar baz"; |
| let mut bufs = [ |
| IoSlice::new(&msg[0..4]), |
| IoSlice::new(&[]), |
| IoSlice::new(&msg[4..9]), |
| IoSlice::new(&msg[9..]), |
| ]; |
| let mut h = VectoredWriteHarness::new(8); |
| let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await; |
| assert_eq!(bytes_written, msg.len()); |
| assert_eq!(h.flush().await, msg); |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_odd_on_vectored() { |
| let msg = b"foo bar baz"; |
| let mut bufs = [ |
| IoSlice::new(&msg[0..4]), |
| IoSlice::new(&[]), |
| IoSlice::new(&msg[4..9]), |
| IoSlice::new(&msg[9..]), |
| ]; |
| let mut h = VectoredWriteHarness::with_vectored_backend(8); |
| let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await; |
| assert_eq!(bytes_written, msg.len()); |
| assert_eq!(h.flush().await, msg); |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_large_slice_on_non_vectored() { |
| let msg = b"foo bar baz"; |
| let mut bufs = [ |
| IoSlice::new(&[]), |
| IoSlice::new(&msg[..9]), |
| IoSlice::new(&msg[9..]), |
| ]; |
| let mut h = VectoredWriteHarness::new(8); |
| let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await; |
| assert_eq!(bytes_written, msg.len()); |
| assert_eq!(h.flush().await, msg); |
| } |
| |
| #[tokio::test] |
| async fn write_vectored_large_slice_on_vectored() { |
| let msg = b"foo bar baz"; |
| let mut bufs = [ |
| IoSlice::new(&[]), |
| IoSlice::new(&msg[..9]), |
| IoSlice::new(&msg[9..]), |
| ]; |
| let mut h = VectoredWriteHarness::with_vectored_backend(8); |
| let bytes_written = h.write_all(IoBufs::new(&mut bufs)).await; |
| assert_eq!(bytes_written, msg.len()); |
| assert_eq!(h.flush().await, msg); |
| } |