blob: 0165e4e01dba0a8b630a4778db69fde97c2c7664 [file] [log] [blame]
use std::sync::atomic::{AtomicBool, Ordering};
use gix_features::{
parallel::{self, in_parallel_if},
progress::{self, Progress},
threading::{lock, Mutable, OwnShared},
};
use super::{Error, Reducer};
use crate::{
data, index,
index::{traverse::Outcome, util},
};
/// Traversal options for [`index::File::traverse_with_lookup()`]
pub struct Options<F> {
/// If `Some`, only use the given amount of threads. Otherwise, the amount of threads to use will be selected based on
/// the amount of available logical cores.
pub thread_limit: Option<usize>,
/// The kinds of safety checks to perform.
pub check: index::traverse::SafetyCheck,
/// A function to create a pack cache
pub make_pack_lookup_cache: F,
}
impl Default for Options<fn() -> crate::cache::Never> {
fn default() -> Self {
Options {
check: Default::default(),
thread_limit: None,
make_pack_lookup_cache: || crate::cache::Never,
}
}
}
/// The progress ids used in [`index::File::traverse_with_lookup()`].
///
/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization.
#[derive(Debug, Copy, Clone)]
pub enum ProgressId {
/// The amount of bytes currently processed to generate a checksum of the *pack data file*.
HashPackDataBytes,
/// The amount of bytes currently processed to generate a checksum of the *pack index file*.
HashPackIndexBytes,
/// Collect all object hashes into a vector and sort it by their pack offset.
CollectSortedIndexEntries,
/// The amount of objects which were decoded by brute-force.
DecodedObjects,
}
impl From<ProgressId> for gix_features::progress::Id {
fn from(v: ProgressId) -> Self {
match v {
ProgressId::HashPackDataBytes => *b"PTHP",
ProgressId::HashPackIndexBytes => *b"PTHI",
ProgressId::CollectSortedIndexEntries => *b"PTCE",
ProgressId::DecodedObjects => *b"PTRO",
}
}
}
/// Verify and validate the content of the index file
impl index::File {
/// Iterate through all _decoded objects_ in the given `pack` and handle them with a `Processor` using a cache to reduce the amount of
/// waste while decoding objects.
///
/// For more details, see the documentation on the [`traverse()`][index::File::traverse()] method.
pub fn traverse_with_lookup<P, C, Processor, E, F>(
&self,
mut processor: Processor,
pack: &data::File,
mut progress: P,
should_interrupt: &AtomicBool,
Options {
thread_limit,
check,
make_pack_lookup_cache,
}: Options<F>,
) -> Result<Outcome<P>, Error<E>>
where
P: Progress,
C: crate::cache::DecodeEntry,
E: std::error::Error + Send + Sync + 'static,
Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn gix_features::progress::RawProgress) -> Result<(), E>
+ Send
+ Clone,
F: Fn() -> C + Send + Clone,
{
let (verify_result, traversal_result) = parallel::join(
{
let pack_progress = progress.add_child_with_id(
format!(
"Hash of pack '{}'",
pack.path().file_name().expect("pack has filename").to_string_lossy()
),
ProgressId::HashPackDataBytes.into(),
);
let index_progress = progress.add_child_with_id(
format!(
"Hash of index '{}'",
self.path.file_name().expect("index has filename").to_string_lossy()
),
ProgressId::HashPackIndexBytes.into(),
);
move || {
let res = self.possibly_verify(pack, check, pack_progress, index_progress, should_interrupt);
if res.is_err() {
should_interrupt.store(true, Ordering::SeqCst);
}
res
}
},
|| {
let index_entries = util::index_entries_sorted_by_offset_ascending(
self,
progress.add_child_with_id("collecting sorted index", ProgressId::CollectSortedIndexEntries.into()),
);
let (chunk_size, thread_limit, available_cores) =
parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
let input_chunks = index_entries.chunks(chunk_size.max(chunk_size));
let reduce_progress = OwnShared::new(Mutable::new({
let mut p = progress.add_child_with_id("Traversing", ProgressId::DecodedObjects.into());
p.init(Some(self.num_objects() as usize), progress::count("objects"));
p
}));
let state_per_thread = {
let reduce_progress = reduce_progress.clone();
move |index| {
(
make_pack_lookup_cache(),
Vec::with_capacity(2048), // decode buffer
lock(&reduce_progress)
.add_child_with_id(format!("thread {index}"), gix_features::progress::UNKNOWN), // per thread progress
)
}
};
in_parallel_if(
there_are_enough_entries_to_process,
input_chunks,
thread_limit,
state_per_thread,
move |entries: &[index::Entry],
(cache, buf, progress)|
-> Result<Vec<data::decode::entry::Outcome>, Error<_>> {
progress.init(
Some(entries.len()),
gix_features::progress::count_with_decimals("objects", 2),
);
let mut stats = Vec::with_capacity(entries.len());
progress.set(0);
for index_entry in entries.iter() {
let result = self.decode_and_process_entry(
check,
pack,
cache,
buf,
progress,
index_entry,
&mut processor,
);
progress.inc();
let stat = match result {
Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
progress.info(format!("Ignoring decode error: {err}"));
continue;
}
res => res,
}?;
stats.push(stat);
if should_interrupt.load(Ordering::Relaxed) {
break;
}
}
Ok(stats)
},
Reducer::from_progress(reduce_progress, pack.data_len(), check, should_interrupt),
)
},
);
Ok(Outcome {
actual_index_checksum: verify_result?,
statistics: traversal_result?,
progress,
})
}
}