| use std::{ |
| io, |
| io::Write, |
| marker::PhantomData, |
| path::{Path, PathBuf}, |
| sync::{atomic::AtomicBool, Arc}, |
| }; |
| |
| use gix_features::{interrupt, progress, progress::Progress}; |
| use gix_tempfile::{AutoRemove, ContainingDirectory}; |
| |
| use crate::data; |
| |
| mod error; |
| pub use error::Error; |
| |
| mod types; |
| use types::{LockWriter, PassThrough}; |
| pub use types::{Options, Outcome}; |
| |
| use crate::bundle::write::types::SharedTempFile; |
| |
| type ThinPackLookupFn = Box<dyn for<'a> FnMut(gix_hash::ObjectId, &'a mut Vec<u8>) -> Option<gix_object::Data<'a>>>; |
| type ThinPackLookupFnSend = |
| Box<dyn for<'a> FnMut(gix_hash::ObjectId, &'a mut Vec<u8>) -> Option<gix_object::Data<'a>> + Send + 'static>; |
| |
| /// The progress ids used in [`write_to_directory()`][crate::Bundle::write_to_directory()]. |
| /// |
| /// Use this information to selectively extract the progress of interest in case the parent application has custom visualization. |
| #[derive(Debug, Copy, Clone)] |
| pub enum ProgressId { |
| /// The amount of bytes read from the input pack data file. |
| ReadPackBytes, |
| /// A root progress counting logical steps towards an index file on disk. |
| /// |
| /// Underneath will be more progress information related to actually producing the index. |
| IndexingSteps(PhantomData<crate::index::write::ProgressId>), |
| } |
| |
| impl From<ProgressId> for gix_features::progress::Id { |
| fn from(v: ProgressId) -> Self { |
| match v { |
| ProgressId::ReadPackBytes => *b"BWRB", |
| ProgressId::IndexingSteps(_) => *b"BWCI", |
| } |
| } |
| } |
| |
| impl crate::Bundle { |
| /// Given a `pack` data stream, write it along with a generated index into the `directory` if `Some` or discard all output if `None`. |
| /// |
| /// In the latter case, the functionality provided here is more a kind of pack data stream validation. |
| /// |
| /// * `progress` provides detailed progress information which can be discarded with [`gix_features::progress::Discard`]. |
| /// * `should_interrupt` is checked regularly and when true, the whole operation will stop. |
| /// * `thin_pack_base_object_lookup_fn` If set, we expect to see a thin-pack with objects that reference their base object by object id which is |
| /// expected to exist in the object database the bundle is contained within. |
| /// `options` further configure how the task is performed. |
| /// |
| /// # Note |
| /// |
| /// * the resulting pack may be empty, that is, contains zero objects in some situations. This is a valid reply by a server and should |
| /// be accounted for. |
| /// - Empty packs always have the same name and not handling this case will result in at most one superfluous pack. |
| pub fn write_to_directory( |
| pack: impl io::BufRead, |
| directory: Option<impl AsRef<Path>>, |
| mut progress: impl Progress, |
| should_interrupt: &AtomicBool, |
| thin_pack_base_object_lookup_fn: Option<ThinPackLookupFn>, |
| options: Options, |
| ) -> Result<Outcome, Error> { |
| let _span = gix_features::trace::coarse!("gix_pack::Bundle::write_to_directory()"); |
| let mut read_progress = progress.add_child_with_id("read pack", ProgressId::ReadPackBytes.into()); |
| read_progress.init(None, progress::bytes()); |
| let pack = progress::Read { |
| inner: pack, |
| progress: progress::ThroughputOnDrop::new(read_progress), |
| }; |
| |
| let object_hash = options.object_hash; |
| let data_file = Arc::new(parking_lot::Mutex::new(io::BufWriter::with_capacity( |
| 64 * 1024, |
| match directory.as_ref() { |
| Some(directory) => gix_tempfile::new(directory, ContainingDirectory::Exists, AutoRemove::Tempfile)?, |
| None => gix_tempfile::new(std::env::temp_dir(), ContainingDirectory::Exists, AutoRemove::Tempfile)?, |
| }, |
| ))); |
| let (pack_entries_iter, pack_version): ( |
| Box<dyn Iterator<Item = Result<data::input::Entry, data::input::Error>>>, |
| _, |
| ) = match thin_pack_base_object_lookup_fn { |
| Some(thin_pack_lookup_fn) => { |
| let pack = interrupt::Read { |
| inner: pack, |
| should_interrupt, |
| }; |
| let buffered_pack = io::BufReader::new(pack); |
| let pack_entries_iter = data::input::LookupRefDeltaObjectsIter::new( |
| data::input::BytesToEntriesIter::new_from_header( |
| buffered_pack, |
| options.iteration_mode, |
| data::input::EntryDataMode::KeepAndCrc32, |
| object_hash, |
| )?, |
| thin_pack_lookup_fn, |
| ); |
| let pack_version = pack_entries_iter.inner.version(); |
| let pack_entries_iter = data::input::EntriesToBytesIter::new( |
| pack_entries_iter, |
| LockWriter { |
| writer: data_file.clone(), |
| }, |
| pack_version, |
| gix_hash::Kind::Sha1, // Thin packs imply a pack being transported, and there we only ever know SHA1 at the moment. |
| ); |
| (Box::new(pack_entries_iter), pack_version) |
| } |
| None => { |
| let pack = PassThrough { |
| reader: interrupt::Read { |
| inner: pack, |
| should_interrupt, |
| }, |
| writer: Some(data_file.clone()), |
| }; |
| // This buf-reader is required to assure we call 'read()' in order to fill the (extra) buffer. Otherwise all the counting |
| // we do with the wrapped pack reader doesn't work as it does not expect anyone to call BufRead functions directly. |
| // However, this is exactly what's happening in the ZipReader implementation that is eventually used. |
| // The performance impact of this is probably negligible, compared to all the other work that is done anyway :D. |
| let buffered_pack = io::BufReader::new(pack); |
| let pack_entries_iter = data::input::BytesToEntriesIter::new_from_header( |
| buffered_pack, |
| options.iteration_mode, |
| data::input::EntryDataMode::Crc32, |
| object_hash, |
| )?; |
| let pack_version = pack_entries_iter.version(); |
| (Box::new(pack_entries_iter), pack_version) |
| } |
| }; |
| let WriteOutcome { |
| outcome, |
| data_path, |
| index_path, |
| keep_path, |
| } = crate::Bundle::inner_write( |
| directory, |
| progress, |
| options, |
| data_file, |
| pack_entries_iter, |
| should_interrupt, |
| pack_version, |
| )?; |
| |
| Ok(Outcome { |
| index: outcome, |
| object_hash, |
| pack_version, |
| data_path, |
| index_path, |
| keep_path, |
| }) |
| } |
| |
| /// Equivalent to [`write_to_directory()`][crate::Bundle::write_to_directory()] but offloads reading of the pack into its own thread, hence the `Send + 'static'` bounds. |
| /// |
| /// # Note |
| /// |
| /// As it sends portions of the input to a thread it requires the 'static lifetime for the interrupt flags. This can only |
| /// be satisfied by a static `AtomicBool` which is only suitable for programs that only run one of these operations at a time |
| /// or don't mind that all of them abort when the flag is set. |
| pub fn write_to_directory_eagerly<P>( |
| pack: impl io::Read + Send + 'static, |
| pack_size: Option<u64>, |
| directory: Option<impl AsRef<Path>>, |
| mut progress: P, |
| should_interrupt: &'static AtomicBool, |
| thin_pack_base_object_lookup_fn: Option<ThinPackLookupFnSend>, |
| options: Options, |
| ) -> Result<Outcome, Error> |
| where |
| P: Progress, |
| P::SubProgress: 'static, |
| { |
| let _span = gix_features::trace::coarse!("gix_pack::Bundle::write_to_directory_eagerly()"); |
| let mut read_progress = progress.add_child_with_id("read pack", ProgressId::ReadPackBytes.into()); /* Bundle Write Read pack Bytes*/ |
| read_progress.init(pack_size.map(|s| s as usize), progress::bytes()); |
| let pack = progress::Read { |
| inner: pack, |
| progress: progress::ThroughputOnDrop::new(read_progress), |
| }; |
| |
| let data_file = Arc::new(parking_lot::Mutex::new(io::BufWriter::new(match directory.as_ref() { |
| Some(directory) => gix_tempfile::new(directory, ContainingDirectory::Exists, AutoRemove::Tempfile)?, |
| None => gix_tempfile::new(std::env::temp_dir(), ContainingDirectory::Exists, AutoRemove::Tempfile)?, |
| }))); |
| let object_hash = options.object_hash; |
| let eight_pages = 4096 * 8; |
| let (pack_entries_iter, pack_version): ( |
| Box<dyn Iterator<Item = Result<data::input::Entry, data::input::Error>> + Send + 'static>, |
| _, |
| ) = match thin_pack_base_object_lookup_fn { |
| Some(thin_pack_lookup_fn) => { |
| let pack = interrupt::Read { |
| inner: pack, |
| should_interrupt, |
| }; |
| let buffered_pack = io::BufReader::with_capacity(eight_pages, pack); |
| let pack_entries_iter = data::input::LookupRefDeltaObjectsIter::new( |
| data::input::BytesToEntriesIter::new_from_header( |
| buffered_pack, |
| options.iteration_mode, |
| data::input::EntryDataMode::KeepAndCrc32, |
| object_hash, |
| )?, |
| thin_pack_lookup_fn, |
| ); |
| let pack_kind = pack_entries_iter.inner.version(); |
| (Box::new(pack_entries_iter), pack_kind) |
| } |
| None => { |
| let pack = PassThrough { |
| reader: interrupt::Read { |
| inner: pack, |
| should_interrupt, |
| }, |
| writer: Some(data_file.clone()), |
| }; |
| let buffered_pack = io::BufReader::with_capacity(eight_pages, pack); |
| let pack_entries_iter = data::input::BytesToEntriesIter::new_from_header( |
| buffered_pack, |
| options.iteration_mode, |
| data::input::EntryDataMode::Crc32, |
| object_hash, |
| )?; |
| let pack_kind = pack_entries_iter.version(); |
| (Box::new(pack_entries_iter), pack_kind) |
| } |
| }; |
| let num_objects = pack_entries_iter.size_hint().0; |
| let pack_entries_iter = |
| gix_features::parallel::EagerIterIf::new(move || num_objects > 25_000, pack_entries_iter, 5_000, 5); |
| |
| let WriteOutcome { |
| outcome, |
| data_path, |
| index_path, |
| keep_path, |
| } = crate::Bundle::inner_write( |
| directory, |
| progress, |
| options, |
| data_file, |
| pack_entries_iter, |
| should_interrupt, |
| pack_version, |
| )?; |
| |
| Ok(Outcome { |
| index: outcome, |
| object_hash, |
| pack_version, |
| data_path, |
| index_path, |
| keep_path, |
| }) |
| } |
| |
| fn inner_write( |
| directory: Option<impl AsRef<Path>>, |
| mut progress: impl Progress, |
| Options { |
| thread_limit, |
| iteration_mode: _, |
| index_version: index_kind, |
| object_hash, |
| }: Options, |
| data_file: SharedTempFile, |
| pack_entries_iter: impl Iterator<Item = Result<data::input::Entry, data::input::Error>>, |
| should_interrupt: &AtomicBool, |
| pack_version: data::Version, |
| ) -> Result<WriteOutcome, Error> { |
| let indexing_progress = progress.add_child_with_id( |
| "create index file", |
| ProgressId::IndexingSteps(Default::default()).into(), |
| ); |
| Ok(match directory { |
| Some(directory) => { |
| let directory = directory.as_ref(); |
| let mut index_file = gix_tempfile::new(directory, ContainingDirectory::Exists, AutoRemove::Tempfile)?; |
| |
| let outcome = crate::index::File::write_data_iter_to_stream( |
| index_kind, |
| { |
| let data_file = Arc::clone(&data_file); |
| move || new_pack_file_resolver(data_file) |
| }, |
| pack_entries_iter, |
| thread_limit, |
| indexing_progress, |
| &mut index_file, |
| should_interrupt, |
| object_hash, |
| pack_version, |
| )?; |
| |
| let data_path = directory.join(format!("pack-{}.pack", outcome.data_hash.to_hex())); |
| let index_path = data_path.with_extension("idx"); |
| let keep_path = data_path.with_extension("keep"); |
| |
| std::fs::write(&keep_path, b"")?; |
| Arc::try_unwrap(data_file) |
| .expect("only one handle left after pack was consumed") |
| .into_inner() |
| .into_inner() |
| .map_err(|err| Error::from(err.into_error()))? |
| .persist(&data_path)?; |
| index_file |
| .persist(&index_path) |
| .map_err(|err| { |
| progress.info(format!( |
| "pack file at {} is retained despite failing to move the index file into place. You can use plumbing to make it usable.", |
| data_path.display() |
| )); |
| err |
| })?; |
| WriteOutcome { |
| outcome, |
| data_path: Some(data_path), |
| index_path: Some(index_path), |
| keep_path: Some(keep_path), |
| } |
| } |
| None => WriteOutcome { |
| outcome: crate::index::File::write_data_iter_to_stream( |
| index_kind, |
| move || new_pack_file_resolver(data_file), |
| pack_entries_iter, |
| thread_limit, |
| indexing_progress, |
| io::sink(), |
| should_interrupt, |
| object_hash, |
| pack_version, |
| )?, |
| data_path: None, |
| index_path: None, |
| keep_path: None, |
| }, |
| }) |
| } |
| } |
| |
| fn resolve_entry(range: data::EntryRange, mapped_file: &memmap2::Mmap) -> Option<&[u8]> { |
| mapped_file.get(range.start as usize..range.end as usize) |
| } |
| |
| fn new_pack_file_resolver( |
| data_file: SharedTempFile, |
| ) -> io::Result<( |
| impl Fn(data::EntryRange, &memmap2::Mmap) -> Option<&[u8]> + Send + Clone, |
| memmap2::Mmap, |
| )> { |
| let mut guard = data_file.lock(); |
| guard.flush()?; |
| let mapped_file = crate::mmap::read_only(&guard.get_mut().with_mut(|f| f.path().to_owned())?)?; |
| Ok((resolve_entry, mapped_file)) |
| } |
| |
| struct WriteOutcome { |
| outcome: crate::index::write::Outcome, |
| data_path: Option<PathBuf>, |
| index_path: Option<PathBuf>, |
| keep_path: Option<PathBuf>, |
| } |