blob: 93dcc58949ee2c6bc2c0397d9080d5607394f61a [file] [log] [blame]
pub mod sync {
use std::io::Read;
pub fn to_vec(mut read: impl Read) -> Vec<u8> {
let mut output = vec![];
read.read_to_end(&mut output).unwrap();
output
}
}
#[cfg(feature = "futures-io")]
pub mod futures {
pub mod bufread {
pub use futures::io::AsyncBufRead;
use crate::utils::{InputStream, TrackEof};
use futures::stream::{StreamExt as _, TryStreamExt as _};
pub fn from(input: &InputStream) -> impl AsyncBufRead {
// By using the stream here we ensure that each chunk will require a separate
// read/poll_fill_buf call to process to help test reading multiple chunks.
TrackEof::new(input.stream().map(Ok).into_async_read())
}
}
pub mod read {
use crate::utils::{block_on, pin_mut};
use futures::io::{copy_buf, AsyncRead, AsyncReadExt, BufReader, Cursor};
pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
// TODO: https://github.com/rust-lang-nursery/futures-rs/issues/1510
// All current test cases are < 100kB
let mut output = Cursor::new(vec![0; 102_400]);
pin_mut!(read);
let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
let mut output = output.into_inner();
output.truncate(len as usize);
output
}
pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
pin_mut!(reader);
block_on(reader.read(output))
}
}
pub mod write {
use crate::utils::{block_on, Pin, TrackClosed};
use futures::io::{AsyncWrite, AsyncWriteExt as _};
use futures_test::io::AsyncWriteTestExt as _;
pub fn to_vec(
input: &[Vec<u8>],
create_writer: impl for<'a> FnOnce(
&'a mut (dyn AsyncWrite + Unpin),
) -> Pin<Box<dyn AsyncWrite + 'a>>,
limit: usize,
) -> Vec<u8> {
let mut output = Vec::new();
{
let mut test_writer = TrackClosed::new(
(&mut output)
.limited_write(limit)
.interleave_pending_write(),
);
{
let mut writer = create_writer(&mut test_writer);
for chunk in input {
block_on(writer.write_all(chunk)).unwrap();
block_on(writer.flush()).unwrap();
}
block_on(writer.close()).unwrap();
}
assert!(test_writer.is_closed());
}
output
}
}
}
#[cfg(feature = "tokio")]
pub mod tokio {
pub mod bufread {
use crate::utils::{InputStream, TrackEof};
use bytes::Bytes;
use futures::stream::StreamExt;
pub use tokio::io::AsyncBufRead;
use tokio_util::io::StreamReader;
pub fn from(input: &InputStream) -> impl AsyncBufRead {
// By using the stream here we ensure that each chunk will require a separate
// read/poll_fill_buf call to process to help test reading multiple chunks.
TrackEof::new(StreamReader::new(
input.stream().map(Bytes::from).map(std::io::Result::Ok),
))
}
}
pub mod read {
use crate::utils::{block_on, pin_mut, tokio_ext::copy_buf};
use std::io::Cursor;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
let mut output = Cursor::new(vec![0; 102_400]);
pin_mut!(read);
let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
let mut output = output.into_inner();
output.truncate(len as usize);
output
}
pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
pin_mut!(reader);
block_on(reader.read(output))
}
}
pub mod write {
use crate::utils::{
block_on, tokio_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin,
};
use std::io::Cursor;
use tokio::io::{AsyncWrite, AsyncWriteExt as _};
pub fn to_vec(
input: &[Vec<u8>],
create_writer: impl for<'a> FnOnce(
&'a mut (dyn AsyncWrite + Unpin),
) -> Pin<Box<dyn AsyncWrite + 'a>>,
limit: usize,
) -> Vec<u8> {
let mut output = Cursor::new(Vec::new());
{
let mut test_writer = TrackClosed::new(
(&mut output)
.limited_write(limit)
.interleave_pending_write(),
);
{
let mut writer = create_writer(&mut test_writer);
for chunk in input {
block_on(writer.write_all(chunk)).unwrap();
block_on(writer.flush()).unwrap();
}
block_on(writer.shutdown()).unwrap();
}
assert!(test_writer.is_closed());
}
output.into_inner()
}
}
}