blob: 6616bd14a85f5ae5e66f3ac7319d0b79dab79cdc [file] [log] [blame]
use std::io::Write;
use gix_features::hash;
use crate::data::output;
/// The error returned by `next()` in the [`FromEntriesIter`] iterator.
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum Error<E>
where
E: std::error::Error + 'static,
{
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Input(E),
}
/// An implementation of [`Iterator`] to write [encoded entries][output::Entry] to an inner implementation each time
/// `next()` is called.
pub struct FromEntriesIter<I, W> {
/// An iterator for input [`output::Entry`] instances
pub input: I,
/// A way of writing encoded bytes.
output: hash::Write<W>,
/// Our trailing hash when done writing all input entries
trailer: Option<gix_hash::ObjectId>,
/// The amount of objects in the iteration and the version of the packfile to be written.
/// Will be `None` to signal the header was written already.
header_info: Option<(crate::data::Version, u32)>,
/// The pack data version with which pack entries should be written.
entry_version: crate::data::Version,
/// The amount of written bytes thus far
written: u64,
/// Required to quickly find offsets by object IDs, as future objects may refer to those in the past to become a delta offset base.
/// It stores the pack offsets at which objects begin.
/// Additionally we store if an object was invalid, and if so we will not write it nor will we allow delta objects to it.
pack_offsets_and_validity: Vec<(u64, bool)>,
/// If we are done, no additional writes will occur
is_done: bool,
}
impl<I, W, E> FromEntriesIter<I, W>
where
I: Iterator<Item = Result<Vec<output::Entry>, E>>,
W: std::io::Write,
E: std::error::Error + 'static,
{
/// Create a new instance reading [entries][output::Entry] from an `input` iterator and write pack data bytes to
/// `output` writer, resembling a pack of `version` with exactly `num_entries` amount of objects contained in it.
/// `object_hash` is the kind of hash to use for the pack checksum and maybe other places, depending on the version.
///
/// The input chunks are expected to be sorted already. You can use the [`InOrderIter`][gix_features::parallel::InOrderIter] to assure
/// this happens on the fly holding entire chunks in memory as long as needed for them to be dispensed in order.
///
/// # Panics
///
/// Not all combinations of `object_hash` and `version` are supported currently triggering assertion errors.
pub fn new(
input: I,
output: W,
num_entries: u32,
version: crate::data::Version,
object_hash: gix_hash::Kind,
) -> Self {
assert!(
matches!(version, crate::data::Version::V2),
"currently only pack version 2 can be written",
);
FromEntriesIter {
input,
output: hash::Write::new(output, object_hash),
trailer: None,
entry_version: version,
pack_offsets_and_validity: Vec::with_capacity(num_entries as usize),
written: 0,
header_info: Some((version, num_entries)),
is_done: false,
}
}
/// Consume this instance and return the `output` implementation.
///
/// _Note_ that the `input` iterator can be moved out of this instance beforehand.
pub fn into_write(self) -> W {
self.output.inner
}
/// Returns the trailing hash over all written entries once done.
/// It's `None` if we are not yet done writing.
pub fn digest(&self) -> Option<gix_hash::ObjectId> {
self.trailer
}
fn next_inner(&mut self) -> Result<u64, Error<E>> {
let previous_written = self.written;
if let Some((version, num_entries)) = self.header_info.take() {
let header_bytes = crate::data::header::encode(version, num_entries);
self.output.write_all(&header_bytes[..])?;
self.written += header_bytes.len() as u64;
}
match self.input.next() {
Some(entries) => {
for entry in entries.map_err(Error::Input)? {
if entry.is_invalid() {
self.pack_offsets_and_validity.push((0, false));
continue;
};
self.pack_offsets_and_validity.push((self.written, true));
let header = entry.to_entry_header(self.entry_version, |index| {
let (base_offset, is_valid_object) = self.pack_offsets_and_validity[index];
if !is_valid_object {
unreachable!("if you see this the object database is correct as a delta refers to a non-existing object")
}
self.written - base_offset
});
self.written += header.write_to(entry.decompressed_size as u64, &mut self.output)? as u64;
self.written += std::io::copy(&mut &*entry.compressed_data, &mut self.output)?;
}
}
None => {
let digest = self.output.hash.clone().digest();
self.output.inner.write_all(&digest[..])?;
self.written += digest.len() as u64;
self.output.inner.flush()?;
self.is_done = true;
self.trailer = Some(gix_hash::ObjectId::from(digest));
}
};
Ok(self.written - previous_written)
}
}
impl<I, W, E> Iterator for FromEntriesIter<I, W>
where
I: Iterator<Item = Result<Vec<output::Entry>, E>>,
W: std::io::Write,
E: std::error::Error + 'static,
{
/// The amount of bytes written to `out` if `Ok` or the error `E` received from the input.
type Item = Result<u64, Error<E>>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_done {
return None;
}
Some(match self.next_inner() {
Err(err) => {
self.is_done = true;
Err(err)
}
Ok(written) => Ok(written),
})
}
}