| use std::io; |
| use std::io::prelude::*; |
| use std::mem; |
| |
| use crate::{Compress, Decompress, DecompressError, FlushCompress, FlushDecompress, Status}; |
| |
| #[derive(Debug)] |
| pub struct Writer<W: Write, D: Ops> { |
| obj: Option<W>, |
| pub data: D, |
| buf: Vec<u8>, |
| } |
| |
| pub trait Ops { |
| type Flush: Flush; |
| fn total_in(&self) -> u64; |
| fn total_out(&self) -> u64; |
| fn run( |
| &mut self, |
| input: &[u8], |
| output: &mut [u8], |
| flush: Self::Flush, |
| ) -> Result<Status, DecompressError>; |
| fn run_vec( |
| &mut self, |
| input: &[u8], |
| output: &mut Vec<u8>, |
| flush: Self::Flush, |
| ) -> Result<Status, DecompressError>; |
| } |
| |
| impl Ops for Compress { |
| type Flush = FlushCompress; |
| fn total_in(&self) -> u64 { |
| self.total_in() |
| } |
| fn total_out(&self) -> u64 { |
| self.total_out() |
| } |
| fn run( |
| &mut self, |
| input: &[u8], |
| output: &mut [u8], |
| flush: FlushCompress, |
| ) -> Result<Status, DecompressError> { |
| Ok(self.compress(input, output, flush).unwrap()) |
| } |
| fn run_vec( |
| &mut self, |
| input: &[u8], |
| output: &mut Vec<u8>, |
| flush: FlushCompress, |
| ) -> Result<Status, DecompressError> { |
| Ok(self.compress_vec(input, output, flush).unwrap()) |
| } |
| } |
| |
| impl Ops for Decompress { |
| type Flush = FlushDecompress; |
| fn total_in(&self) -> u64 { |
| self.total_in() |
| } |
| fn total_out(&self) -> u64 { |
| self.total_out() |
| } |
| fn run( |
| &mut self, |
| input: &[u8], |
| output: &mut [u8], |
| flush: FlushDecompress, |
| ) -> Result<Status, DecompressError> { |
| self.decompress(input, output, flush) |
| } |
| fn run_vec( |
| &mut self, |
| input: &[u8], |
| output: &mut Vec<u8>, |
| flush: FlushDecompress, |
| ) -> Result<Status, DecompressError> { |
| self.decompress_vec(input, output, flush) |
| } |
| } |
| |
| pub trait Flush { |
| fn none() -> Self; |
| fn sync() -> Self; |
| fn finish() -> Self; |
| } |
| |
| impl Flush for FlushCompress { |
| fn none() -> Self { |
| FlushCompress::None |
| } |
| |
| fn sync() -> Self { |
| FlushCompress::Sync |
| } |
| |
| fn finish() -> Self { |
| FlushCompress::Finish |
| } |
| } |
| |
| impl Flush for FlushDecompress { |
| fn none() -> Self { |
| FlushDecompress::None |
| } |
| |
| fn sync() -> Self { |
| FlushDecompress::Sync |
| } |
| |
| fn finish() -> Self { |
| FlushDecompress::Finish |
| } |
| } |
| |
| pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize> |
| where |
| R: BufRead, |
| D: Ops, |
| { |
| loop { |
| let (read, consumed, ret, eof); |
| { |
| let input = obj.fill_buf()?; |
| eof = input.is_empty(); |
| let before_out = data.total_out(); |
| let before_in = data.total_in(); |
| let flush = if eof { |
| D::Flush::finish() |
| } else { |
| D::Flush::none() |
| }; |
| ret = data.run(input, dst, flush); |
| read = (data.total_out() - before_out) as usize; |
| consumed = (data.total_in() - before_in) as usize; |
| } |
| obj.consume(consumed); |
| |
| match ret { |
| // If we haven't ready any data and we haven't hit EOF yet, |
| // then we need to keep asking for more data because if we |
| // return that 0 bytes of data have been read then it will |
| // be interpreted as EOF. |
| Ok(Status::Ok | Status::BufError) if read == 0 && !eof && !dst.is_empty() => continue, |
| Ok(Status::Ok | Status::BufError | Status::StreamEnd) => return Ok(read), |
| |
| Err(..) => { |
| return Err(io::Error::new( |
| io::ErrorKind::InvalidInput, |
| "corrupt deflate stream", |
| )) |
| } |
| } |
| } |
| } |
| |
| impl<W: Write, D: Ops> Writer<W, D> { |
| pub fn new(w: W, d: D) -> Writer<W, D> { |
| Writer { |
| obj: Some(w), |
| data: d, |
| buf: Vec::with_capacity(32 * 1024), |
| } |
| } |
| |
| pub fn finish(&mut self) -> io::Result<()> { |
| loop { |
| self.dump()?; |
| |
| let before = self.data.total_out(); |
| self.data.run_vec(&[], &mut self.buf, D::Flush::finish())?; |
| if before == self.data.total_out() { |
| return Ok(()); |
| } |
| } |
| } |
| |
| pub fn replace(&mut self, w: W) -> W { |
| self.buf.truncate(0); |
| mem::replace(self.get_mut(), w) |
| } |
| |
| pub fn get_ref(&self) -> &W { |
| self.obj.as_ref().unwrap() |
| } |
| |
| pub fn get_mut(&mut self) -> &mut W { |
| self.obj.as_mut().unwrap() |
| } |
| |
| // Note that this should only be called if the outer object is just about |
| // to be consumed! |
| // |
| // (e.g. an implementation of `into_inner`) |
| pub fn take_inner(&mut self) -> W { |
| self.obj.take().unwrap() |
| } |
| |
| pub fn is_present(&self) -> bool { |
| self.obj.is_some() |
| } |
| |
| // Returns total written bytes and status of underlying codec |
| pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> { |
| // miniz isn't guaranteed to actually write any of the buffer provided, |
| // it may be in a flushing mode where it's just giving us data before |
| // we're actually giving it any data. We don't want to spuriously return |
| // `Ok(0)` when possible as it will cause calls to write_all() to fail. |
| // As a result we execute this in a loop to ensure that we try our |
| // darndest to write the data. |
| loop { |
| self.dump()?; |
| |
| let before_in = self.data.total_in(); |
| let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none()); |
| let written = (self.data.total_in() - before_in) as usize; |
| let is_stream_end = matches!(ret, Ok(Status::StreamEnd)); |
| |
| if !buf.is_empty() && written == 0 && ret.is_ok() && !is_stream_end { |
| continue; |
| } |
| return match ret { |
| Ok(st) => match st { |
| Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)), |
| }, |
| Err(..) => Err(io::Error::new( |
| io::ErrorKind::InvalidInput, |
| "corrupt deflate stream", |
| )), |
| }; |
| } |
| } |
| |
| fn dump(&mut self) -> io::Result<()> { |
| // TODO: should manage this buffer not with `drain` but probably more of |
| // a deque-like strategy. |
| while !self.buf.is_empty() { |
| let n = self.obj.as_mut().unwrap().write(&self.buf)?; |
| if n == 0 { |
| return Err(io::ErrorKind::WriteZero.into()); |
| } |
| self.buf.drain(..n); |
| } |
| Ok(()) |
| } |
| } |
| |
| impl<W: Write, D: Ops> Write for Writer<W, D> { |
| fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| self.write_with_status(buf).map(|res| res.0) |
| } |
| |
| fn flush(&mut self) -> io::Result<()> { |
| self.data |
| .run_vec(&[], &mut self.buf, D::Flush::sync()) |
| .unwrap(); |
| |
| // Unfortunately miniz doesn't actually tell us when we're done with |
| // pulling out all the data from the internal stream. To remedy this we |
| // have to continually ask the stream for more memory until it doesn't |
| // give us a chunk of memory the same size as our own internal buffer, |
| // at which point we assume it's reached the end. |
| loop { |
| self.dump()?; |
| let before = self.data.total_out(); |
| self.data |
| .run_vec(&[], &mut self.buf, D::Flush::none()) |
| .unwrap(); |
| if before == self.data.total_out() { |
| break; |
| } |
| } |
| |
| self.obj.as_mut().unwrap().flush() |
| } |
| } |
| |
| impl<W: Write, D: Ops> Drop for Writer<W, D> { |
| fn drop(&mut self) { |
| if self.obj.is_some() { |
| let _ = self.finish(); |
| } |
| } |
| } |