| use std::{ |
| convert::TryInto, |
| fmt, |
| hash::Hasher, |
| io::{self, BufRead, ErrorKind}, |
| mem::size_of, |
| }; |
| use twox_hash::XxHash32; |
| |
| use super::header::{ |
| BlockInfo, BlockMode, FrameInfo, LZ4F_LEGACY_MAGIC_NUMBER, MAGIC_NUMBER_SIZE, |
| MAX_FRAME_INFO_SIZE, MIN_FRAME_INFO_SIZE, |
| }; |
| use super::Error; |
| use crate::{ |
| block::WINDOW_SIZE, |
| sink::{vec_sink_for_decompression, SliceSink}, |
| }; |
| |
| /// A reader for decompressing the LZ4 frame format |
| /// |
| /// This Decoder wraps any other reader that implements `io::Read`. |
| /// Bytes read will be decompressed according to the [LZ4 frame format]( |
| /// https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md). |
| /// |
| /// # Example 1 |
| /// Deserializing json values out of a compressed file. |
| /// |
| /// ```no_run |
| /// let compressed_input = std::fs::File::open("datafile").unwrap(); |
| /// let mut decompressed_input = lz4_flex::frame::FrameDecoder::new(compressed_input); |
| /// let json: serde_json::Value = serde_json::from_reader(decompressed_input).unwrap(); |
| /// ``` |
| /// |
| /// # Example |
| /// Deserializing multiple json values out of a compressed file |
| /// |
| /// ```no_run |
| /// let compressed_input = std::fs::File::open("datafile").unwrap(); |
| /// let mut decompressed_input = lz4_flex::frame::FrameDecoder::new(compressed_input); |
| /// loop { |
| /// match serde_json::from_reader::<_, serde_json::Value>(&mut decompressed_input) { |
| /// Ok(json) => { println!("json {:?}", json); } |
| /// Err(e) if e.is_eof() => break, |
| /// Err(e) => panic!("{}", e), |
| /// } |
| /// } |
| /// ``` |
| pub struct FrameDecoder<R: io::Read> { |
| /// The underlying reader. |
| r: R, |
| /// The FrameInfo of the frame currently being decoded. |
| /// It starts as `None` and is filled with the FrameInfo is read from the input. |
| /// It's reset to `None` once the frame EndMarker is read from the input. |
| current_frame_info: Option<FrameInfo>, |
| /// Xxhash32 used when content checksum is enabled. |
| content_hasher: XxHash32, |
| /// Total length of decompressed output for the current frame. |
| content_len: u64, |
| /// The compressed bytes buffer, taken from the underlying reader. |
| src: Vec<u8>, |
| /// The decompressed bytes buffer. Bytes are decompressed from src to dst |
| /// before being passed back to the caller. |
| dst: Vec<u8>, |
| /// Index into dst and length: starting point of bytes previously output |
| /// that are still part of the decompressor window. |
| ext_dict_offset: usize, |
| ext_dict_len: usize, |
| /// Index into dst: starting point of bytes not yet read by caller. |
| dst_start: usize, |
| /// Index into dst: ending point of bytes not yet read by caller. |
| dst_end: usize, |
| } |
| |
| impl<R: io::Read> FrameDecoder<R> { |
| /// Creates a new Decoder for the specified reader. |
| pub fn new(rdr: R) -> FrameDecoder<R> { |
| FrameDecoder { |
| r: rdr, |
| src: Default::default(), |
| dst: Default::default(), |
| ext_dict_offset: 0, |
| ext_dict_len: 0, |
| dst_start: 0, |
| dst_end: 0, |
| current_frame_info: None, |
| content_hasher: XxHash32::with_seed(0), |
| content_len: 0, |
| } |
| } |
| |
| /// Gets a reference to the underlying reader in this decoder. |
| pub fn get_ref(&self) -> &R { |
| &self.r |
| } |
| |
| /// Gets a mutable reference to the underlying reader in this decoder. |
| /// |
| /// Note that mutation of the stream may result in surprising results if |
| /// this decoder is continued to be used. |
| pub fn get_mut(&mut self) -> &mut R { |
| &mut self.r |
| } |
| |
| /// Consumes the FrameDecoder and returns the underlying reader. |
| pub fn into_inner(self) -> R { |
| self.r |
| } |
| |
| fn read_frame_info(&mut self) -> Result<usize, io::Error> { |
| let mut buffer = [0u8; MAX_FRAME_INFO_SIZE]; |
| |
| match self.r.read(&mut buffer[..MAGIC_NUMBER_SIZE])? { |
| 0 => return Ok(0), |
| MAGIC_NUMBER_SIZE => (), |
| read => self.r.read_exact(&mut buffer[read..MAGIC_NUMBER_SIZE])?, |
| } |
| |
| if u32::from_le_bytes(buffer[0..MAGIC_NUMBER_SIZE].try_into().unwrap()) |
| != LZ4F_LEGACY_MAGIC_NUMBER |
| { |
| match self |
| .r |
| .read(&mut buffer[MAGIC_NUMBER_SIZE..MIN_FRAME_INFO_SIZE])? |
| { |
| 0 => return Ok(0), |
| MIN_FRAME_INFO_SIZE => (), |
| read => self |
| .r |
| .read_exact(&mut buffer[MAGIC_NUMBER_SIZE + read..MIN_FRAME_INFO_SIZE])?, |
| } |
| } |
| let required = FrameInfo::read_size(&buffer[..MIN_FRAME_INFO_SIZE])?; |
| if required != MIN_FRAME_INFO_SIZE && required != MAGIC_NUMBER_SIZE { |
| self.r |
| .read_exact(&mut buffer[MIN_FRAME_INFO_SIZE..required])?; |
| } |
| |
| let frame_info = FrameInfo::read(&buffer[..required])?; |
| if frame_info.dict_id.is_some() { |
| // Unsupported right now so it must be None |
| return Err(Error::DictionaryNotSupported.into()); |
| } |
| |
| let max_block_size = frame_info.block_size.get_size(); |
| let dst_size = if frame_info.block_mode == BlockMode::Linked { |
| // In linked mode we consume the output (bumping dst_start) but leave the |
| // beginning of dst to be used as a prefix in subsequent blocks. |
| // That is at least until we have at least `max_block_size + WINDOW_SIZE` |
| // bytes in dst, then we setup an ext_dict with the last WINDOW_SIZE bytes |
| // and the output goes to the beginning of dst again. |
| // Since we always want to be able to write a full block (up to max_block_size) |
| // we need a buffer with at least `max_block_size * 2 + WINDOW_SIZE` bytes. |
| max_block_size * 2 + WINDOW_SIZE |
| } else { |
| max_block_size |
| }; |
| self.src.clear(); |
| self.dst.clear(); |
| self.src.reserve_exact(max_block_size); |
| self.dst.reserve_exact(dst_size); |
| self.current_frame_info = Some(frame_info); |
| self.content_hasher = XxHash32::with_seed(0); |
| self.content_len = 0; |
| self.ext_dict_len = 0; |
| self.dst_start = 0; |
| self.dst_end = 0; |
| Ok(required) |
| } |
| |
| #[inline] |
| fn read_checksum(r: &mut R) -> Result<u32, io::Error> { |
| let mut checksum_buffer = [0u8; size_of::<u32>()]; |
| r.read_exact(&mut checksum_buffer[..])?; |
| let checksum = u32::from_le_bytes(checksum_buffer); |
| Ok(checksum) |
| } |
| |
| #[inline] |
| fn check_block_checksum(data: &[u8], expected_checksum: u32) -> Result<(), io::Error> { |
| let mut block_hasher = XxHash32::with_seed(0); |
| block_hasher.write(data); |
| let calc_checksum = block_hasher.finish() as u32; |
| if calc_checksum != expected_checksum { |
| return Err(Error::BlockChecksumError.into()); |
| } |
| Ok(()) |
| } |
| |
| fn read_block(&mut self) -> io::Result<usize> { |
| debug_assert_eq!(self.dst_start, self.dst_end); |
| let frame_info = self.current_frame_info.as_ref().unwrap(); |
| |
| // Adjust dst buffer offsets to decompress the next block |
| let max_block_size = frame_info.block_size.get_size(); |
| if frame_info.block_mode == BlockMode::Linked { |
| // In linked mode we consume the output (bumping dst_start) but leave the |
| // beginning of dst to be used as a prefix in subsequent blocks. |
| // That is at least until we have at least `max_block_size + WINDOW_SIZE` |
| // bytes in dst, then we setup an ext_dict with the last WINDOW_SIZE bytes |
| // and the output goes to the beginning of dst again. |
| debug_assert_eq!(self.dst.capacity(), max_block_size * 2 + WINDOW_SIZE); |
| if self.dst_start + max_block_size > self.dst.capacity() { |
| // Output might not fit in the buffer. |
| // The ext_dict will become the last WINDOW_SIZE bytes |
| debug_assert!(self.dst_start >= max_block_size + WINDOW_SIZE); |
| self.ext_dict_offset = self.dst_start - WINDOW_SIZE; |
| self.ext_dict_len = WINDOW_SIZE; |
| // Output goes in the beginning of the buffer again. |
| self.dst_start = 0; |
| self.dst_end = 0; |
| } else if self.dst_start + self.ext_dict_len > WINDOW_SIZE { |
| // There's more than WINDOW_SIZE bytes of lookback adding the prefix and ext_dict. |
| // Since we have a limited buffer we must shrink ext_dict in favor of the prefix, |
| // so that we can fit up to max_block_size bytes between dst_start and ext_dict |
| // start. |
| let delta = self |
| .ext_dict_len |
| .min(self.dst_start + self.ext_dict_len - WINDOW_SIZE); |
| self.ext_dict_offset += delta; |
| self.ext_dict_len -= delta; |
| debug_assert!(self.dst_start + self.ext_dict_len >= WINDOW_SIZE) |
| } |
| } else { |
| debug_assert_eq!(self.ext_dict_len, 0); |
| debug_assert_eq!(self.dst.capacity(), max_block_size); |
| self.dst_start = 0; |
| self.dst_end = 0; |
| } |
| |
| // Read and decompress block |
| let block_info = { |
| let mut buffer = [0u8; 4]; |
| if let Err(err) = self.r.read_exact(&mut buffer) { |
| if err.kind() == ErrorKind::UnexpectedEof { |
| return Ok(0); |
| } else { |
| return Err(err); |
| } |
| } |
| BlockInfo::read(&buffer)? |
| }; |
| match block_info { |
| BlockInfo::Uncompressed(len) => { |
| let len = len as usize; |
| if len > max_block_size { |
| return Err(Error::BlockTooBig.into()); |
| } |
| // TODO: Attempt to avoid initialization of read buffer when |
| // https://github.com/rust-lang/rust/issues/42788 stabilizes |
| self.r.read_exact(vec_resize_and_get_mut( |
| &mut self.dst, |
| self.dst_start, |
| self.dst_start + len, |
| ))?; |
| if frame_info.block_checksums { |
| let expected_checksum = Self::read_checksum(&mut self.r)?; |
| Self::check_block_checksum( |
| &self.dst[self.dst_start..self.dst_start + len], |
| expected_checksum, |
| )?; |
| } |
| |
| self.dst_end += len; |
| self.content_len += len as u64; |
| } |
| BlockInfo::Compressed(len) => { |
| let len = len as usize; |
| if len > max_block_size { |
| return Err(Error::BlockTooBig.into()); |
| } |
| // TODO: Attempt to avoid initialization of read buffer when |
| // https://github.com/rust-lang/rust/issues/42788 stabilizes |
| self.r |
| .read_exact(vec_resize_and_get_mut(&mut self.src, 0, len))?; |
| if frame_info.block_checksums { |
| let expected_checksum = Self::read_checksum(&mut self.r)?; |
| Self::check_block_checksum(&self.src[..len], expected_checksum)?; |
| } |
| |
| let with_dict_mode = |
| frame_info.block_mode == BlockMode::Linked && self.ext_dict_len != 0; |
| let decomp_size = if with_dict_mode { |
| debug_assert!(self.dst_start + max_block_size <= self.ext_dict_offset); |
| let (head, tail) = self.dst.split_at_mut(self.ext_dict_offset); |
| let ext_dict = &tail[..self.ext_dict_len]; |
| |
| debug_assert!(head.len() - self.dst_start >= max_block_size); |
| crate::block::decompress::decompress_internal::<true, _>( |
| &self.src[..len], |
| &mut SliceSink::new(head, self.dst_start), |
| ext_dict, |
| ) |
| } else { |
| // Independent blocks OR linked blocks with only prefix data |
| debug_assert!(self.dst.capacity() - self.dst_start >= max_block_size); |
| crate::block::decompress::decompress_internal::<false, _>( |
| &self.src[..len], |
| &mut vec_sink_for_decompression( |
| &mut self.dst, |
| 0, |
| self.dst_start, |
| self.dst_start + max_block_size, |
| ), |
| b"", |
| ) |
| } |
| .map_err(Error::DecompressionError)?; |
| |
| self.dst_end += decomp_size; |
| self.content_len += decomp_size as u64; |
| } |
| |
| BlockInfo::EndMark => { |
| if let Some(expected) = frame_info.content_size { |
| if self.content_len != expected { |
| return Err(Error::ContentLengthError { |
| expected, |
| actual: self.content_len, |
| } |
| .into()); |
| } |
| } |
| if frame_info.content_checksum { |
| let expected_checksum = Self::read_checksum(&mut self.r)?; |
| let calc_checksum = self.content_hasher.finish() as u32; |
| if calc_checksum != expected_checksum { |
| return Err(Error::ContentChecksumError.into()); |
| } |
| } |
| self.current_frame_info = None; |
| return Ok(0); |
| } |
| } |
| |
| // Content checksum, if applicable |
| if frame_info.content_checksum { |
| self.content_hasher |
| .write(&self.dst[self.dst_start..self.dst_end]); |
| } |
| |
| Ok(self.dst_end - self.dst_start) |
| } |
| |
| fn read_more(&mut self) -> io::Result<usize> { |
| if self.current_frame_info.is_none() && self.read_frame_info()? == 0 { |
| return Ok(0); |
| } |
| self.read_block() |
| } |
| } |
| |
| impl<R: io::Read> io::Read for FrameDecoder<R> { |
| fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| loop { |
| // Fill read buffer if there's uncompressed data left |
| if self.dst_start < self.dst_end { |
| let read_len = std::cmp::min(self.dst_end - self.dst_start, buf.len()); |
| let dst_read_end = self.dst_start + read_len; |
| buf[..read_len].copy_from_slice(&self.dst[self.dst_start..dst_read_end]); |
| self.dst_start = dst_read_end; |
| return Ok(read_len); |
| } |
| if self.read_more()? == 0 { |
| return Ok(0); |
| } |
| } |
| } |
| |
| fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> { |
| let mut written = 0; |
| loop { |
| match self.fill_buf() { |
| Ok(b) if b.is_empty() => return Ok(written), |
| Ok(b) => { |
| let s = std::str::from_utf8(b).map_err(|_| { |
| io::Error::new( |
| io::ErrorKind::InvalidData, |
| "stream did not contain valid UTF-8", |
| ) |
| })?; |
| buf.push_str(s); |
| let len = s.len(); |
| self.consume(len); |
| written += len; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue, |
| Err(e) => return Err(e), |
| } |
| } |
| } |
| |
| fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { |
| let mut written = 0; |
| loop { |
| match self.fill_buf() { |
| Ok(b) if b.is_empty() => return Ok(written), |
| Ok(b) => { |
| buf.extend_from_slice(b); |
| let len = b.len(); |
| self.consume(len); |
| written += len; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue, |
| Err(e) => return Err(e), |
| } |
| } |
| } |
| } |
| |
| impl<R: io::Read> io::BufRead for FrameDecoder<R> { |
| fn fill_buf(&mut self) -> io::Result<&[u8]> { |
| if self.dst_start == self.dst_end { |
| self.read_more()?; |
| } |
| Ok(&self.dst[self.dst_start..self.dst_end]) |
| } |
| |
| fn consume(&mut self, amt: usize) { |
| assert!(amt <= self.dst_end - self.dst_start); |
| self.dst_start += amt; |
| } |
| } |
| |
| impl<R: fmt::Debug + io::Read> fmt::Debug for FrameDecoder<R> { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.debug_struct("FrameDecoder") |
| .field("r", &self.r) |
| .field("content_hasher", &self.content_hasher) |
| .field("content_len", &self.content_len) |
| .field("src", &"[...]") |
| .field("dst", &"[...]") |
| .field("dst_start", &self.dst_start) |
| .field("dst_end", &self.dst_end) |
| .field("ext_dict_offset", &self.ext_dict_offset) |
| .field("ext_dict_len", &self.ext_dict_len) |
| .field("current_frame_info", &self.current_frame_info) |
| .finish() |
| } |
| } |
| |
| /// Similar to `v.get_mut(start..end) but will adjust the len if needed. |
| #[inline] |
| fn vec_resize_and_get_mut(v: &mut Vec<u8>, start: usize, end: usize) -> &mut [u8] { |
| if end > v.len() { |
| v.resize(end, 0) |
| } |
| &mut v[start..end] |
| } |