| //! LineReader |
| //! |
| //! A fast byte-delimiter-oriented buffered reader, offering a faster alternative |
| //! to `read_until` that returns byte slices into its internal buffer rather than |
| //! copying them out to one you provide. |
| //! |
| //! Because the internal buffer is fixed, lines longer than the buffer will be |
| //! split. |
| |
| /* |
| 128k blocks: 0 lines 31603121046 bytes in 36.85s (817.92 MB/s) |
| LineReader: 501636842 lines 31603121046 bytes in 73.96s (407.52 MB/s) |
| read_until: 501636842 lines 31603121046 bytes in 119.30s (252.62 MB/s) |
| read_line: 501636842 lines 31603121046 bytes in 139.14s (216.61 MB/s) |
| lines(): 501636842 lines 30599847362 bytes in 167.17s (174.57 MB/s) |
| */ |
| |
| use std::cmp; |
| use std::io; |
| use std::io::ErrorKind; |
| |
| extern crate memchr; |
| use memchr::{memchr, memrchr}; |
| |
| const NEWLINE: u8 = b'\n'; |
| const DEFAULT_CAPACITY: usize = 1024 * 64; |
| |
| /// The `LineReader` struct adds buffered, byte-delimited (default: `\n`) |
| /// reading to any io::Reader. |
| pub struct LineReader<R> { |
| inner: R, |
| delimiter: u8, |
| buf: Vec<u8>, |
| pos: usize, |
| end_of_complete: usize, |
| end_of_buffer: usize, |
| } |
| |
| use std::fmt; |
| |
| impl<R: io::Read> fmt::Debug for LineReader<R> { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| write!( |
| f, |
| "LineReader {{ delimiter: {:?}, pos: {}, end_of_complete: {}, end_of_buffer: {} }}", |
| self.delimiter, self.pos, self.end_of_complete, self.end_of_buffer |
| ) |
| } |
| } |
| |
| impl<R: io::Read> LineReader<R> { |
| /// Create a new `LineReader` around the reader with a default capacity of |
| /// 64 KiB and delimiter of `\n`. |
| /// |
| /// ```no_run |
| /// # use linereader::LineReader; |
| /// # use std::fs::File; |
| /// # use std::io; |
| /// # fn x() -> io::Result<()> { |
| /// let reader = LineReader::new(File::open("myfile.txt")?); |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub fn new(inner: R) -> Self { |
| Self::with_delimiter_and_capacity(NEWLINE, DEFAULT_CAPACITY, inner) |
| } |
| |
| /// Create a new `LineReader` around the reader with a given capacity and |
| /// delimiter of `\n`. |
| /// |
| /// ```no_run |
| /// # use linereader::LineReader; |
| /// # use std::fs::File; |
| /// # use std::io; |
| /// # fn x() -> io::Result<()> { |
| /// let mut reader = LineReader::with_capacity(1024*512, File::open("myfile.txt")?); |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub fn with_capacity(capacity: usize, inner: R) -> Self { |
| Self::with_delimiter_and_capacity(NEWLINE, capacity, inner) |
| } |
| |
| /// Create a new `LineReader` around the reader with a default capacity of |
| /// 64 KiB and the given delimiter. |
| /// |
| /// ```no_run |
| /// # use linereader::LineReader; |
| /// # use std::fs::File; |
| /// # use std::io; |
| /// # fn x() -> io::Result<()> { |
| /// let mut reader = LineReader::with_delimiter(b'\t', File::open("myfile.txt")?); |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub fn with_delimiter(delimiter: u8, inner: R) -> Self { |
| Self::with_delimiter_and_capacity(delimiter, DEFAULT_CAPACITY, inner) |
| } |
| |
| /// Create a new `LineReader` around the reader with a given capacity and |
| /// delimiter. |
| /// |
| /// ```no_run |
| /// # use linereader::LineReader; |
| /// # use std::fs::File; |
| /// # use std::io; |
| /// # fn x() -> io::Result<()> { |
| /// let mut reader = LineReader::with_delimiter_and_capacity(b'\t', 1024*512, File::open("myfile.txt")?); |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub fn with_delimiter_and_capacity(delimiter: u8, capacity: usize, inner: R) -> Self { |
| Self { |
| inner, |
| delimiter, |
| buf: vec![0; capacity], |
| pos: 0, |
| end_of_complete: 0, |
| end_of_buffer: 0, |
| } |
| } |
| |
| /// Run the given closure for each line while while the closure returns `Ok(true)`. |
| /// |
| /// If either the reader or the closure return an error, iteration ends and the error is returned. |
| /// |
| /// ```no_run |
| /// # use linereader::LineReader; |
| /// # use std::fs::File; |
| /// # use std::io; |
| /// # fn x() -> io::Result<()> { |
| /// let buf: &[u8] = b"foo\nbar\nbaz"; |
| /// let mut reader = LineReader::new(buf); |
| /// let mut lines = vec![]; |
| /// reader.for_each(|line| { |
| /// lines.push(line.to_vec()); |
| /// Ok(true) |
| /// })?; |
| /// assert_eq!(lines.len(), 3); |
| /// assert_eq!(lines[0], b"foo\n"); |
| /// assert_eq!(lines[1], b"bar\n"); |
| /// assert_eq!(lines[2], b"baz"); |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub fn for_each<F: FnMut(&[u8]) -> io::Result<bool>>(&mut self, mut f: F) -> io::Result<()> { |
| while let Some(line) = self.next_line() { |
| if !f(line?)? { |
| break; |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| /// Get the next line from the reader, an IO error, or `None` on EOF. The delimiter |
| /// is included in any returned slice, unless the file ends without one or a line was |
| /// truncated to the buffer size due to length. |
| /// |
| /// ```no_run |
| /// # use linereader::LineReader; |
| /// # use std::fs::File; |
| /// # use std::io; |
| /// # fn x() -> io::Result<()> { |
| /// # let mut reader = LineReader::new(File::open("myfile.txt")?); |
| /// while let Some(line) = reader.next_line() { |
| /// let line = line?; // unwrap io::Result to &[u8] |
| /// } |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub fn next_line(&mut self) -> Option<io::Result<&[u8]>> { |
| if self.pos < self.end_of_complete { |
| let lastpos = self.pos; |
| self.pos = cmp::min( |
| 1 + lastpos |
| + memchr(self.delimiter, &self.buf[lastpos..self.end_of_complete]) |
| .unwrap_or(self.end_of_complete), |
| self.end_of_complete, |
| ); |
| |
| return Some(Ok(&self.buf[lastpos..self.pos])); |
| } |
| |
| match self.refill() { |
| Ok(true) => self.next_line(), |
| Ok(false) => { |
| if self.end_of_buffer == self.pos { |
| None |
| } else { |
| self.pos = self.end_of_buffer; |
| Some(Ok(&self.buf[..self.end_of_buffer])) |
| } |
| } |
| Err(e) => Some(Err(e)), |
| } |
| } |
| |
| /// Return a slice of complete lines, up to the size of the internal buffer. |
| /// |
| /// This is functionally identical to next_line, only instead of getting up |
| /// to the *first* instance of the delimiter, you get up to the *last*. |
| /// |
| /// ```no_run |
| /// # use linereader::LineReader; |
| /// # use std::fs::File; |
| /// # use std::io; |
| /// # fn x() -> io::Result<()> { |
| /// # let mut reader = LineReader::new(File::open("myfile.txt")?); |
| /// while let Some(lines) = reader.next_batch() { |
| /// let lines = lines?; // unwrap io::Result to &[u8] |
| /// } |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub fn next_batch(&mut self) -> Option<io::Result<&[u8]>> { |
| if self.pos < self.end_of_complete { |
| let ret = &self.buf[self.pos..self.end_of_complete]; |
| self.pos = self.end_of_complete; |
| return Some(Ok(ret)); |
| } |
| |
| match self.refill() { |
| Ok(true) => self.next_batch(), |
| Ok(false) => { |
| if self.end_of_buffer == self.pos { |
| None |
| } else { |
| self.pos = self.end_of_buffer; |
| Some(Ok(&self.buf[..self.end_of_buffer])) |
| } |
| } |
| Err(e) => Some(Err(e)), |
| } |
| } |
| |
| fn refill(&mut self) -> io::Result<bool> { |
| assert!(self.pos == self.end_of_complete); |
| assert!(self.end_of_complete <= self.end_of_buffer); |
| |
| self.pos = 0; |
| |
| // Move the start of the next line, if any, to the start of buf |
| let fragment_len = self.end_of_buffer - self.end_of_complete; |
| if fragment_len > 0 { |
| // unsafe variants of these using ptr::copy/copy_nonoverlapping can |
| // be found in 5ccea2c - they made no appreciable difference. |
| if fragment_len > self.end_of_complete { |
| self.buf.drain(..self.end_of_complete); |
| self.buf.extend(vec![0_u8; self.end_of_complete]); |
| } else { |
| let (start, rest) = self.buf.split_at_mut(self.end_of_complete); |
| start[0..fragment_len].copy_from_slice(&rest[0..fragment_len]); |
| } |
| self.end_of_buffer = fragment_len; |
| } else { |
| self.end_of_buffer = 0; |
| } |
| |
| // Fill the rest of buf from the underlying IO |
| while self.end_of_buffer < self.buf.len() { |
| // Loop until we find a delimiter or read zero bytes. |
| match self.inner.read(&mut self.buf[self.end_of_buffer..]) { |
| Ok(0) => { |
| self.end_of_complete = self.end_of_buffer; |
| return Ok(false); |
| } |
| Ok(n) => { |
| let lastpos = self.end_of_buffer; |
| self.end_of_buffer += n; |
| if let Some(nl) = |
| memrchr(self.delimiter, &self.buf[lastpos..self.end_of_buffer]) |
| { |
| self.end_of_complete = cmp::min(self.end_of_buffer, 1 + lastpos + nl); |
| return Ok(true); |
| } else { |
| // No delimiter - see if we can read any more. |
| self.end_of_complete = self.end_of_buffer; |
| } |
| } |
| Err(ref e) if e.kind() == ErrorKind::Interrupted => {} |
| Err(e) => return Err(e), |
| } |
| } |
| |
| // We read through until the end of the buffer. |
| Ok(true) |
| } |
| |
| /// Reset the internal state of the buffer. Next lines are read from wherever |
| /// the reader happens to be. |
| pub fn reset(&mut self) { |
| self.pos = 0; |
| self.end_of_buffer = 0; |
| self.end_of_complete = 0; |
| } |
| |
| /// Get a reference to the reader. |
| pub fn get_ref(&self) -> &R { |
| &self.inner |
| } |
| |
| /// Get a mutable reference to the reader. |
| pub fn get_mut(&mut self) -> &mut R { |
| &mut self.inner |
| } |
| |
| /// Unwrap this `LineReader`, returning the underlying reader and discarding any |
| /// unread buffered lines. |
| pub fn into_inner(self) -> R { |
| self.inner |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use LineReader; |
| |
| #[test] |
| fn test_next_line() { |
| let buf: &[u8] = b"0a0\n1bb1\n2ccc2\n3dddd3\n4eeeee4\n5ffffffff5\n6ggggg6\n7hhhhhh7"; |
| let mut reader = LineReader::with_capacity(8, buf); |
| |
| assert_eq!(b"0a0\n", reader.next_line().unwrap().unwrap()); |
| assert_eq!(b"1bb1\n", reader.next_line().unwrap().unwrap()); |
| assert_eq!(b"2ccc2\n", reader.next_line().unwrap().unwrap()); |
| assert_eq!(b"3dddd3\n", reader.next_line().unwrap().unwrap()); |
| assert_eq!(b"4eeeee4\n", reader.next_line().unwrap().unwrap()); |
| assert_eq!(b"5fffffff", reader.next_line().unwrap().unwrap()); |
| assert_eq!(b"f5\n", reader.next_line().unwrap().unwrap()); |
| assert_eq!(b"6ggggg6\n", reader.next_line().unwrap().unwrap()); |
| assert_eq!(b"7hhhhhh7", reader.next_line().unwrap().unwrap()); |
| assert!(reader.next_line().is_none()); |
| } |
| |
| #[test] |
| fn test_next_batch() { |
| let buf: &[u8] = b"0a0\n1bb1\n2ccc2\n3dddd3\n4eeeee4\n5ffffffff5\n6ggggg6\n7hhhhhh7"; |
| let mut reader = LineReader::with_capacity(19, buf); |
| |
| assert_eq!(b"0a0\n1bb1\n2ccc2\n", reader.next_batch().unwrap().unwrap()); |
| assert_eq!(b"3dddd3\n4eeeee4\n", reader.next_batch().unwrap().unwrap()); |
| assert_eq!( |
| b"5ffffffff5\n6ggggg6\n", |
| reader.next_batch().unwrap().unwrap() |
| ); |
| assert_eq!(b"7hhhhhh7", reader.next_batch().unwrap().unwrap()); |
| } |
| |
| #[test] |
| fn test_for_each() { |
| let buf: &[u8] = b"f\nba\nbaz\n"; |
| let mut reader = LineReader::new(buf); |
| |
| let mut len = 2; |
| reader.for_each(|l| { assert_eq!(len, l.len()); len += 1; Ok(true) }).unwrap(); |
| |
| let buf: &[u8] = b"f\nba\nbaz\n"; |
| let mut reader = LineReader::new(buf); |
| |
| reader.for_each(|l| { assert_eq!(l.len(), 2); Ok(false) }).unwrap(); |
| } |
| |
| extern crate rand; |
| use std::io::BufRead; |
| use std::io::{Cursor, Read}; |
| use tests::rand::prelude::*; |
| |
| #[test] |
| fn test_next_line_randomly() { |
| let mut rng = thread_rng(); |
| |
| for _ in 1..128 { |
| let mut buf = [0u8; 65535]; |
| rng.fill(&mut buf[..]); |
| let delimiter = rng.gen::<u8>(); |
| let max_line = rng.gen::<u8>().saturating_add(8) as usize; |
| |
| let mut reader = |
| LineReader::with_delimiter_and_capacity(delimiter, max_line, Cursor::new(&buf[..])); |
| let mut cursor = Cursor::new(&buf[..]); |
| let mut expected = vec![]; |
| |
| while cursor |
| .by_ref() |
| .take(max_line as u64) |
| .read_until(delimiter, &mut expected) |
| .unwrap() > 0 |
| { |
| assert_eq!(expected, reader.next_line().unwrap().unwrap()); |
| expected.clear(); |
| } |
| |
| assert!(reader.next_line().is_none()); |
| } |
| } |
| } |