blob: fba2736939a084648b8617909ac5d3e092c77bdb [file] [log] [blame] [edit]
//!A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
/// A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
#[cfg(feature = "io-pipe")]
pub mod pipe {
use std::io;
use bytes::{Buf, BufMut, BytesMut};
/// The write-end of the pipe, receiving items to become available in the [`Reader`].
/// It's commonly used with the [`std::io::Write`] trait it implements.
pub struct Writer {
/// The channel through which bytes are transferred. Useful for sending [`std::io::Error`]s instead.
pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>,
buf: BytesMut,
/// The read-end of the pipe, implementing the [`std::io::Read`] trait.
pub struct Reader {
channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>,
buf: BytesMut,
impl io::BufRead for Reader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.buf.is_empty() {
match {
Ok(Ok(buf)) => self.buf = buf,
Ok(Err(err)) => return Err(err),
Err(_) => {}
fn consume(&mut self, amt: usize) {
impl io::Read for Reader {
fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> {
let mut written = 0;
while !out.is_empty() {
if self.buf.is_empty() {
match {
Ok(Ok(buf)) => self.buf = buf,
Ok(Err(err)) => return Err(err),
Err(_) => break,
let bytes_to_write = self.buf.len().min(out.len());
let (to_write, rest) = out.split_at_mut(bytes_to_write);
out = rest;
written += bytes_to_write;
impl io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
.map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
fn flush(&mut self) -> io::Result<()> {
/// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe.
/// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing.
/// If `0`, the `write` end will always block until the `read` end consumes the transferred bytes.
pub fn unidirectional(in_flight_writes: usize) -> (Writer, Reader) {
let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes);
Writer {
channel: tx,
buf: BytesMut::with_capacity(4096),
Reader {
channel: rx,
buf: BytesMut::new(),