| /*! |
| This module provides two `std::io::Read` implementations: |
| |
| * [`read::FrameDecoder`](struct.FrameDecoder.html) |
| wraps another `std::io::Read` implemenation, and decompresses data encoded |
| using the Snappy frame format. Use this if you have a compressed data source |
| and wish to read it as uncompressed data. |
| * [`read::FrameEncoder`](struct.FrameEncoder.html) |
| wraps another `std::io::Read` implemenation, and compresses data encoded |
| using the Snappy frame format. Use this if you have uncompressed data source |
| and wish to read it as compressed data. |
| |
| Typically, `read::FrameDecoder` is the version that you'll want. |
| */ |
| |
| use std::cmp; |
| use std::fmt; |
| use std::io; |
| |
| use crate::bytes; |
| use crate::compress::Encoder; |
| use crate::crc32::CheckSummer; |
| use crate::decompress::{decompress_len, Decoder}; |
| use crate::error::Error; |
| use crate::frame::{ |
| compress_frame, ChunkType, CHUNK_HEADER_AND_CRC_SIZE, |
| MAX_COMPRESS_BLOCK_SIZE, STREAM_BODY, STREAM_IDENTIFIER, |
| }; |
| use crate::MAX_BLOCK_SIZE; |
| |
| /// The maximum size of a compressed block, including the header and stream |
| /// identifier, that can be emitted by FrameEncoder. |
| const MAX_READ_FRAME_ENCODER_BLOCK_SIZE: usize = STREAM_IDENTIFIER.len() |
| + CHUNK_HEADER_AND_CRC_SIZE |
| + MAX_COMPRESS_BLOCK_SIZE; |
| |
| /// A reader for decompressing a Snappy stream. |
| /// |
| /// This `FrameDecoder` wraps any other reader that implements `std::io::Read`. |
| /// Bytes read from this reader are decompressed using the |
| /// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt) |
| /// (file extension `sz`, MIME type `application/x-snappy-framed`). |
| /// |
| /// This reader can potentially make many small reads from the underlying |
| /// stream depending on its format, therefore, passing in a buffered reader |
| /// may be beneficial. |
| pub struct FrameDecoder<R: io::Read> { |
| /// The underlying reader. |
| r: R, |
| /// A Snappy decoder that we reuse that does the actual block based |
| /// decompression. |
| dec: Decoder, |
| /// A CRC32 checksummer that is configured to either use the portable |
| /// fallback version or the SSE4.2 accelerated version when the right CPU |
| /// features are available. |
| checksummer: CheckSummer, |
| /// The compressed bytes buffer, taken from the underlying reader. |
| src: Vec<u8>, |
| /// The decompressed bytes buffer. Bytes are decompressed from src to dst |
| /// before being passed back to the caller. |
| dst: Vec<u8>, |
| /// Index into dst: starting point of bytes not yet given back to caller. |
| dsts: usize, |
| /// Index into dst: ending point of bytes not yet given back to caller. |
| dste: usize, |
| /// Whether we've read the special stream header or not. |
| read_stream_ident: bool, |
| } |
| |
| impl<R: io::Read> FrameDecoder<R> { |
| /// Create a new reader for streaming Snappy decompression. |
| pub fn new(rdr: R) -> FrameDecoder<R> { |
| FrameDecoder { |
| r: rdr, |
| dec: Decoder::new(), |
| checksummer: CheckSummer::new(), |
| src: vec![0; MAX_COMPRESS_BLOCK_SIZE], |
| dst: vec![0; MAX_BLOCK_SIZE], |
| dsts: 0, |
| dste: 0, |
| read_stream_ident: false, |
| } |
| } |
| |
| /// Gets a reference to the underlying reader in this decoder. |
| pub fn get_ref(&self) -> &R { |
| &self.r |
| } |
| |
| /// Gets a mutable reference to the underlying reader in this decoder. |
| /// |
| /// Note that mutation of the stream may result in surprising results if |
| /// this decoder is continued to be used. |
| pub fn get_mut(&mut self) -> &mut R { |
| &mut self.r |
| } |
| |
| /// Gets the underlying reader of this decoder. |
| pub fn into_inner(self) -> R { |
| self.r |
| } |
| } |
| |
| impl<R: io::Read> io::Read for FrameDecoder<R> { |
| fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| macro_rules! fail { |
| ($err:expr) => { |
| return Err(io::Error::from($err)) |
| }; |
| } |
| loop { |
| if self.dsts < self.dste { |
| let len = cmp::min(self.dste - self.dsts, buf.len()); |
| let dste = self.dsts.checked_add(len).unwrap(); |
| buf[0..len].copy_from_slice(&self.dst[self.dsts..dste]); |
| self.dsts = dste; |
| return Ok(len); |
| } |
| if !read_exact_eof(&mut self.r, &mut self.src[0..4])? { |
| return Ok(0); |
| } |
| let ty = ChunkType::from_u8(self.src[0]); |
| if !self.read_stream_ident { |
| if ty != Ok(ChunkType::Stream) { |
| fail!(Error::StreamHeader { byte: self.src[0] }); |
| } |
| self.read_stream_ident = true; |
| } |
| let len64 = bytes::read_u24_le(&self.src[1..]) as u64; |
| if len64 > self.src.len() as u64 { |
| fail!(Error::UnsupportedChunkLength { |
| len: len64, |
| header: false, |
| }); |
| } |
| let len = len64 as usize; |
| match ty { |
| Err(b) if 0x02 <= b && b <= 0x7F => { |
| // Spec says that chunk types 0x02-0x7F are reserved and |
| // conformant decoders must return an error. |
| fail!(Error::UnsupportedChunkType { byte: b }); |
| } |
| Err(b) if 0x80 <= b && b <= 0xFD => { |
| // Spec says that chunk types 0x80-0xFD are reserved but |
| // skippable. |
| self.r.read_exact(&mut self.src[0..len])?; |
| } |
| Err(b) => { |
| // Can never happen. 0x02-0x7F and 0x80-0xFD are handled |
| // above in the error case. That leaves 0x00, 0x01, 0xFE |
| // and 0xFF, each of which correspond to one of the four |
| // defined chunk types. |
| unreachable!("BUG: unhandled chunk type: {}", b); |
| } |
| Ok(ChunkType::Padding) => { |
| // Just read and move on. |
| self.r.read_exact(&mut self.src[0..len])?; |
| } |
| Ok(ChunkType::Stream) => { |
| if len != STREAM_BODY.len() { |
| fail!(Error::UnsupportedChunkLength { |
| len: len64, |
| header: true, |
| }) |
| } |
| self.r.read_exact(&mut self.src[0..len])?; |
| if &self.src[0..len] != STREAM_BODY { |
| fail!(Error::StreamHeaderMismatch { |
| bytes: self.src[0..len].to_vec(), |
| }); |
| } |
| } |
| Ok(ChunkType::Uncompressed) => { |
| if len < 4 { |
| fail!(Error::UnsupportedChunkLength { |
| len: len as u64, |
| header: false, |
| }); |
| } |
| let expected_sum = bytes::io_read_u32_le(&mut self.r)?; |
| let n = len - 4; |
| if n > self.dst.len() { |
| fail!(Error::UnsupportedChunkLength { |
| len: n as u64, |
| header: false, |
| }); |
| } |
| self.r.read_exact(&mut self.dst[0..n])?; |
| let got_sum = |
| self.checksummer.crc32c_masked(&self.dst[0..n]); |
| if expected_sum != got_sum { |
| fail!(Error::Checksum { |
| expected: expected_sum, |
| got: got_sum, |
| }); |
| } |
| self.dsts = 0; |
| self.dste = n; |
| } |
| Ok(ChunkType::Compressed) => { |
| if len < 4 { |
| fail!(Error::UnsupportedChunkLength { |
| len: len as u64, |
| header: false, |
| }); |
| } |
| let expected_sum = bytes::io_read_u32_le(&mut self.r)?; |
| let sn = len - 4; |
| if sn > self.src.len() { |
| fail!(Error::UnsupportedChunkLength { |
| len: len64, |
| header: false, |
| }); |
| } |
| self.r.read_exact(&mut self.src[0..sn])?; |
| let dn = decompress_len(&self.src)?; |
| if dn > self.dst.len() { |
| fail!(Error::UnsupportedChunkLength { |
| len: dn as u64, |
| header: false, |
| }); |
| } |
| self.dec |
| .decompress(&self.src[0..sn], &mut self.dst[0..dn])?; |
| let got_sum = |
| self.checksummer.crc32c_masked(&self.dst[0..dn]); |
| if expected_sum != got_sum { |
| fail!(Error::Checksum { |
| expected: expected_sum, |
| got: got_sum, |
| }); |
| } |
| self.dsts = 0; |
| self.dste = dn; |
| } |
| } |
| } |
| } |
| } |
| |
| impl<R: fmt::Debug + io::Read> fmt::Debug for FrameDecoder<R> { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.debug_struct("FrameDecoder") |
| .field("r", &self.r) |
| .field("dec", &self.dec) |
| .field("checksummer", &self.checksummer) |
| .field("src", &"[...]") |
| .field("dst", &"[...]") |
| .field("dsts", &self.dsts) |
| .field("dste", &self.dste) |
| .field("read_stream_ident", &self.read_stream_ident) |
| .finish() |
| } |
| } |
| |
| /// A reader for compressing data using snappy as it is read. |
| /// |
| /// This `FrameEncoder` wraps any other reader that implements `std::io::Read`. |
| /// Bytes read from this reader are compressed using the |
| /// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt) |
| /// (file extension `sz`, MIME type `application/x-snappy-framed`). |
| /// |
| /// Usually you'll want |
| /// [`read::FrameDecoder`](struct.FrameDecoder.html) |
| /// (for decompressing while reading) or |
| /// [`write::FrameEncoder`](../write/struct.FrameEncoder.html) |
| /// (for compressing while writing) instead. |
| /// |
| /// Unlike `FrameDecoder`, this will attempt to make large reads roughly |
| /// equivalent to the size of a single Snappy block. Therefore, callers may not |
| /// benefit from using a buffered reader. |
| pub struct FrameEncoder<R: io::Read> { |
| /// Internally, we split `FrameEncoder` in two to keep the borrow checker |
| /// happy. The `inner` member contains everything that `read_frame` needs |
| /// to fetch a frame's worth of data and compress it. |
| inner: Inner<R>, |
| /// Data that we've encoded and are ready to return to our caller. |
| dst: Vec<u8>, |
| /// Starting point of bytes in `dst` not yet given back to the caller. |
| dsts: usize, |
| /// Ending point of bytes in `dst` that we want to give to our caller. |
| dste: usize, |
| } |
| |
| struct Inner<R: io::Read> { |
| /// The underlying data source. |
| r: R, |
| /// An encoder that we reuse that does the actual block based compression. |
| enc: Encoder, |
| /// A CRC32 checksummer that is configured to either use the portable |
| /// fallback version or the SSE4.2 accelerated version when the right CPU |
| /// features are available. |
| checksummer: CheckSummer, |
| /// Data taken from the underlying `r`, and not yet compressed. |
| src: Vec<u8>, |
| /// Have we written the standard snappy header to `dst` yet? |
| wrote_stream_ident: bool, |
| } |
| |
| impl<R: io::Read> FrameEncoder<R> { |
| /// Create a new reader for streaming Snappy compression. |
| pub fn new(rdr: R) -> FrameEncoder<R> { |
| FrameEncoder { |
| inner: Inner { |
| r: rdr, |
| enc: Encoder::new(), |
| checksummer: CheckSummer::new(), |
| src: vec![0; MAX_BLOCK_SIZE], |
| wrote_stream_ident: false, |
| }, |
| dst: vec![0; MAX_READ_FRAME_ENCODER_BLOCK_SIZE], |
| dsts: 0, |
| dste: 0, |
| } |
| } |
| |
| /// Gets a reference to the underlying reader in this decoder. |
| pub fn get_ref(&self) -> &R { |
| &self.inner.r |
| } |
| |
| /// Gets a mutable reference to the underlying reader in this decoder. |
| /// |
| /// Note that mutation of the stream may result in surprising results if |
| /// this encoder is continued to be used. |
| pub fn get_mut(&mut self) -> &mut R { |
| &mut self.inner.r |
| } |
| |
| /// Read previously compressed data from `self.dst`, returning the number of |
| /// bytes read. If `self.dst` is empty, returns 0. |
| fn read_from_dst(&mut self, buf: &mut [u8]) -> usize { |
| let available_bytes = self.dste - self.dsts; |
| let count = cmp::min(available_bytes, buf.len()); |
| buf[..count].copy_from_slice(&self.dst[self.dsts..self.dsts + count]); |
| self.dsts += count; |
| count |
| } |
| } |
| |
| impl<R: io::Read> io::Read for FrameEncoder<R> { |
| fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| // Try reading previously compressed bytes from our `dst` buffer, if |
| // any. |
| let count = self.read_from_dst(buf); |
| |
| if count > 0 { |
| // We had some bytes in our `dst` buffer that we used. |
| Ok(count) |
| } else if buf.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE { |
| // Our output `buf` is big enough that we can directly write into |
| // it, so bypass `dst` entirely. |
| self.inner.read_frame(buf) |
| } else { |
| // We need to refill `self.dst`, and then return some bytes from |
| // that. |
| let count = self.inner.read_frame(&mut self.dst)?; |
| self.dsts = 0; |
| self.dste = count; |
| Ok(self.read_from_dst(buf)) |
| } |
| } |
| } |
| |
| impl<R: io::Read> Inner<R> { |
| /// Read from `self.r`, and create a new frame, writing it to `dst`, which |
| /// must be at least `MAX_READ_FRAME_ENCODER_BLOCK_SIZE` bytes in size. |
| fn read_frame(&mut self, dst: &mut [u8]) -> io::Result<usize> { |
| debug_assert!(dst.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE); |
| |
| // We make one read to the underlying reader. If the underlying reader |
| // doesn't fill the buffer but there are still bytes to be read, then |
| // compression won't be optimal. The alternative would be to block |
| // until our buffer is maximally full (or we see EOF), but this seems |
| // more surprising. In general, io::Read implementations should try to |
| // fill the caller's buffer as much as they can, so this seems like the |
| // better choice. |
| let nread = self.r.read(&mut self.src)?; |
| if nread == 0 { |
| return Ok(0); |
| } |
| |
| // If we haven't yet written the stream header to `dst`, write it. |
| let mut dst_write_start = 0; |
| if !self.wrote_stream_ident { |
| dst[0..STREAM_IDENTIFIER.len()].copy_from_slice(STREAM_IDENTIFIER); |
| dst_write_start += STREAM_IDENTIFIER.len(); |
| self.wrote_stream_ident = true; |
| } |
| |
| // Reserve space for our chunk header. We need to use `split_at_mut` so |
| // that we can get two mutable slices pointing at non-overlapping parts |
| // of `dst`. |
| let (chunk_header, remaining_dst) = |
| dst[dst_write_start..].split_at_mut(CHUNK_HEADER_AND_CRC_SIZE); |
| dst_write_start += CHUNK_HEADER_AND_CRC_SIZE; |
| |
| // Compress our frame if possible, telling `compress_frame` to always |
| // put the output in `dst`. |
| let frame_data = compress_frame( |
| &mut self.enc, |
| self.checksummer, |
| &self.src[..nread], |
| chunk_header, |
| remaining_dst, |
| true, |
| )?; |
| Ok(dst_write_start + frame_data.len()) |
| } |
| } |
| |
| impl<R: fmt::Debug + io::Read> fmt::Debug for FrameEncoder<R> { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.debug_struct("FrameEncoder") |
| .field("inner", &self.inner) |
| .field("dst", &"[...]") |
| .field("dsts", &self.dsts) |
| .field("dste", &self.dste) |
| .finish() |
| } |
| } |
| |
| impl<R: fmt::Debug + io::Read> fmt::Debug for Inner<R> { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.debug_struct("Inner") |
| .field("r", &self.r) |
| .field("enc", &self.enc) |
| .field("checksummer", &self.checksummer) |
| .field("src", &"[...]") |
| .field("wrote_stream_ident", &self.wrote_stream_ident) |
| .finish() |
| } |
| } |
| |
| // read_exact_eof is like Read::read_exact, except it detects EOF |
| // and returns Ok(false) instead of an error. |
| // |
| // If buf was read successfully, it returns Ok(true). |
| fn read_exact_eof<R: io::Read>( |
| rdr: &mut R, |
| buf: &mut [u8], |
| ) -> io::Result<bool> { |
| match rdr.read(buf) { |
| // EOF |
| Ok(0) => Ok(false), |
| // Read everything w/ the read call |
| Ok(i) if i == buf.len() => Ok(true), |
| // There's some bytes left to fill, which can be deferred to read_exact |
| Ok(i) => { |
| rdr.read_exact(&mut buf[i..])?; |
| Ok(true) |
| } |
| Err(e) => Err(e), |
| } |
| } |