blob: 00c20d15bce65b81015b59fb52b2bee663a43b70 [file] [log] [blame] [edit]
use std::borrow::Cow;
use std::io;
#[cfg(feature = "multithreaded")]
use std::mem;
#[cfg(feature = "multithreaded")]
use std::sync::Arc;
use ahash::AHashMap;
#[cfg(feature = "multithreaded")]
use dashmap::DashMap;
use once_cell::sync::Lazy;
macro_rules! invalid_data_error {
($($arg:tt)*) => {{
const CAPACITY_HASHMAP: usize = 512;
pub(crate) const CAPACITY_READER: usize = 128 * 1024;
/// Internal parameter (not exposed to users) that determines how many stacks of
/// input data make up a "chunk" (unit that is sent to the threadpool for
/// processing). Chosen by benchmarking various values using the following tests:
/// * cargo test bench_nstacks_dtrace --release -- --ignored --nocapture
/// * cargo test bench_nstacks_perf --release -- --ignored --nocapture
pub(crate) const DEFAULT_NSTACKS_PER_JOB: usize = 100;
/// A guess at the number of bytes contained in any given stack of any given format.
/// Used to calculate the initial capacity of the vector used for sending input
/// data across threads.
#[cfg(feature = "multithreaded")]
const NBYTES_PER_STACK_GUESS: usize = 1024;
const RUST_HASH_LENGTH: usize = 17;
#[cfg(feature = "multithreaded")]
pub static DEFAULT_NTHREADS: Lazy<usize> =
Lazy::new(|| std::thread::available_parallelism().unwrap().into());
#[cfg(not(feature = "multithreaded"))]
pub static DEFAULT_NTHREADS: Lazy<usize> = Lazy::new(|| 1);
/// Sealed trait for internal library authors.
/// If you implement this trait, your type will implement the public-facing
/// `Collapse` trait as well. Implementing this trait gives you parallelism
/// for free as long as you adhere to the requirements described in the
/// comments below.
pub trait CollapsePrivate: Send + Sized {
// *********************************************************** //
// ********************* REQUIRED METHODS ******************** //
// *********************************************************** //
/// Process any header lines that precede the main body of samples.
/// Some formats, such as `dtrace`, contain a header or other non-stack
/// information at the beginning of their input files. If header information
/// is present, this method **must** consume it (i.e. advance the provided
/// reader past it).
/// This method also provides an opportunity to do processing of actual
/// stack data on the main thread before worker threads are spun up. For
/// example, `perf` requires reading the first stack in order to know how to
/// process the rest; so this method is used for that "upfront" processing.
/// If the format you are working with does not contain header information
/// or does not need any special, up-front processing, just have this method
/// return `Ok(())` immediately.
fn pre_process<R>(&mut self, reader: &mut R, occurrences: &mut Occurrences) -> io::Result<()>
R: io::BufRead;
/// Process all samples in a chunk of input (the primary method).
/// This method receives a reader whose header has already been consumed (see above),
/// as well as a mutable reference to an `Occurences` instance (just a hashamp that
/// works across multiple threads). Implementers should parse the stack data
/// contained in the reader and write output to the provided `Occurrences` map.
/// This method may be called multiple times to process batches of incoming samples.
/// Therefore, make sure that when end-of-file is reached, the collapser now considers
/// itself back at the top-level context (e.g., not in the middle of a stack). This
/// means that some internal state, e.g. stack buffers, must be reset by the time this
/// method returns. Other internal state, e.g. caches, however, may be kept.
fn collapse_single_threaded<R>(
&mut self,
reader: R,
occurrences: &mut Occurrences,
) -> io::Result<()>
R: io::BufRead;
/// Determine the end of a stack.
/// Worker threads **must** receive full stacks (as opposed to partial stacks); so this method
/// determines, for your specific format, when the end of a stack has been reached.
/// This method should return `true` if the provided line represents the end of a stack;
/// `false` otherwise.
/// If your format requires more information than merely a line of the input data in order
/// to determine whether or not you are at the end of a stack, you can retrieve/store
/// information on the `self` instance, which is also available to you in this method. This
/// method will be called for every line of input data (excluding those consumed by the
/// `pre_process` method).
fn would_end_stack(&mut self, line: &[u8]) -> bool;
/// Creates a copy and prepares it to be sent to a different thread.
/// This method creates a copy of `self` in order to send it to a different thread.
/// As such, it should clone all the internal fields of `self` **except** those that
/// should be reset because the collapser will now operate in a different stack context.
/// For example, any options should be cloned, but any stack buffers or similar "stack state"
/// should be reset to, for example, an empty vector before this method returns.
fn clone_and_reset_stack_context(&self) -> Self;
/// Determine if this format corresponds to the input data.
/// This method, used by the `guess` collapser, should return whether or not the
/// implementation corresponds with the given input string, i.e. if the input data
/// matches the collapser.
/// - `None` means "not sure -- need more input"
/// - `Some(true)` means "yes, this implementation should work with this string"
/// - `Some(false)` means "no, this implementation definitely won't work"
fn is_applicable(&mut self, input: &str) -> Option<bool>;
/// Returns the number of stacks per job to send to the threadpool.
fn nstacks_per_job(&self) -> usize;
/// Sets the number of stacks per job to send to the threadpool.
fn set_nstacks_per_job(&mut self, n: usize);
/// Returns the number of threads to use.
fn nthreads(&self) -> usize;
/// Sets the number of threads to use.
fn set_nthreads(&mut self, n: usize);
// *********************************************************** //
// ******************** PROVIDED METHODS ********************* //
// *********************************************************** //
fn collapse<R, W>(&mut self, mut reader: R, writer: W) -> io::Result<()>
R: io::BufRead,
W: io::Write,
let mut occurrences = Occurrences::new(self.nthreads());
// Consume the header, if any, and do any other pre-processing
// that needs to occur.
self.pre_process(&mut reader, &mut occurrences)?;
// Do collapsing.
if occurrences.is_concurrent() {
self.collapse_multi_threaded(reader, &mut occurrences)?;
} else {
self.collapse_single_threaded(reader, &mut occurrences)?;
// Write results.
#[cfg(not(feature = "multithreaded"))]
fn collapse_multi_threaded<R>(&mut self, _: R, _: &mut Occurrences) -> io::Result<()>
R: io::BufRead,
#[cfg(feature = "multithreaded")]
fn collapse_multi_threaded<R>(
&mut self,
mut reader: R,
occurrences: &mut Occurrences,
) -> io::Result<()>
R: io::BufRead,
let nstacks_per_job = self.nstacks_per_job();
let nthreads = self.nthreads();
assert_ne!(nstacks_per_job, 0);
assert!(nthreads > 1);
crossbeam_utils::thread::scope(|scope| {
// Channel for sending an error from the worker threads to the main thread
// in the event a worker has failed.
let (tx_error, rx_error) = crossbeam_channel::bounded::<io::Error>(1);
// Channel for sending input data from the main thread to the worker threads.
// We choose `2 * nthreads` as the channel size here in order to limit memory
// usage in the case of particularly large input files.
let (tx_input, rx_input) = crossbeam_channel::bounded::<Vec<u8>>(2 * nthreads);
// Channel for worker threads that have errored to signal to all the other
// worker threads that they should stop work immediately and return.
let (tx_stop, rx_stop) = crossbeam_channel::bounded::<()>(nthreads - 1);
let mut handles = Vec::with_capacity(nthreads);
for _ in 0..nthreads {
let tx_error = tx_error.clone();
let rx_input = rx_input.clone();
let (tx_stop, rx_stop) = (tx_stop.clone(), rx_stop.clone());
let mut folder = self.clone_and_reset_stack_context();
let mut occurrences = occurrences.clone();
// Launch the worker thread...
let handle = scope.spawn(move |_| loop {
crossbeam_channel::select! {
recv(rx_input) -> input => {
// Receive input from the main thread.
let data = match input {
Ok(data) => data,
// The main threads drops it's handle to the input sender once it's
// finished sending data; so if we get an error here, it means
// there is no more data to be sent and we should exit.
Err(_) => return,
// If there is input data, process it.
if let Err(e) = folder.collapse_single_threaded(&data[..], &mut occurrences) {
// In the event of an error...
// We notify all the threads about it here, rather than wait for the main input
// loop to see the error, so that we can also stop the input loop from iterating
// through the rest of the file.
// If the channel is full, it means another thread has also errored
// and already sent a stop signal to the other threads; so there is
// no need to wait or to check for a `SendError` here.
for _ in 0..(nthreads - 1) {
let _ = tx_stop.try_send(());
// Then, send the error produced to the main thread for
// propagation. If the channel is full, it means another thread
// has also errored and already sent its error back to the
// main thread; so there is no need to wait or to check for a
// `SendError` here.
let _ = tx_error.try_send(e);
// Finally, return.
// If successful, return to the top of the loop and continue to poll
// the input and stop channels.
recv(rx_stop) -> _ => {
// Received a signal from another worker thread that it has errored;
// so should cease work immediately and return.
// On the main thread, we're about to start sending data to the worker threads,
// but we only want to send data to the worker threads **if** they're still alive!
// (if one of them produces an error, all of them will exit early). To ensure we don't try
// to send data to dead worker threads, drop the main thread's handle to the input receiver
// here. This way, if all the workers die, every handle to the input receiver will have
// been dropped and we'll get an error when trying to send data on the input sender,
// which will tell us (the main thread) to stop trying to send data and, instead,
// skip to trying to pull an error off the error channel.
// Now that we've dropped the main thread's handle to the input sender, start
// trying to send data to the worker threads...
let buf_capacity = usize::next_power_of_two(NBYTES_PER_STACK_GUESS * nstacks_per_job);
let mut buf = Vec::with_capacity(buf_capacity);
let (mut index, mut nstacks) = (0, 0);
loop {
let n = reader.read_until(b'\n', &mut buf)?;
if n == 0 {
// If we've reached the end of the data, send the final chunk to the worker
// threads and break from the loop, The worker threads may or may not still
// be alive (depending on if one errored in between the sending of the last
// chunk and the sending of this one), but either way we should break the loop;
// so there's no need to check for a `SendError` here.
let _ = tx_input.send(buf);
let line = &buf[index..index + n];
index += n;
if self.would_end_stack(line) {
// If we've reached the end of a stack, count it.
nstacks += 1;
if nstacks == nstacks_per_job {
// If we've accumulated enough stacks to make up a chunk to send to the
// worker threads, try to send it.
let buf_capacity = usize::next_power_of_two(buf.capacity());
let chunk = mem::replace(&mut buf, Vec::with_capacity(buf_capacity));
if tx_input.send(chunk).is_err() {
// If sending the chunk produces a `SendError`, this means that one
// of the worker threads has errored, sent a signal to all the other
// worker threads to shut down, and they have all shutdown, in which
// case we know there will be an error waiting for us on the error
// channel; so we should stop parsing input data (i.e. break).
index = 0;
nstacks = 0;
// The main thread needs to drop its handle to the input sender here because
// that's how we signal to the worker threads that there is no more data coming
// on the input channel, in which case they should exit.
// The main thread needs to drop its handle to the error sender here because we
// are about to poll the error receiver for errors, which will block until all
// the error senders have been dropped (including ours).
// Now we poll the error channel, which will block until either:
// * all work has been completely successfully,
// in which case the expression below will evaluate to `None`, or
// * an error has occurred on one of the worker theads,
// in which case the expression below will evaluate to `Some(<io::Error>)`.
if let Some(e) = rx_error.iter().next() {
return Err(e);
for handle in handles {
/// Occurrences is a HashMap, which uses:
/// * AHashMap if single-threaded
/// * DashMap if multi-threaded
/// This is public because it is part of the sealed `CollapsePrivate` trait's API, but it
/// is in a crate-private module so is not nameable by downstream library users.
#[derive(Clone, Debug)]
pub enum Occurrences {
SingleThreaded(AHashMap<String, usize>),
#[cfg(feature = "multithreaded")]
MultiThreaded(Arc<DashMap<String, usize, ahash::RandomState>>),
impl Occurrences {
#[cfg(feature = "multithreaded")]
pub(crate) fn new(nthreads: usize) -> Self {
assert_ne!(nthreads, 0);
if nthreads == 1 {
} else {
#[cfg(not(feature = "multithreaded"))]
pub(crate) fn new(nthreads: usize) -> Self {
assert_ne!(nthreads, 0);
fn new_single_threaded() -> Self {
let map =
AHashMap::with_capacity_and_hasher(CAPACITY_HASHMAP, ahash::RandomState::default());
#[cfg(feature = "multithreaded")]
fn new_multi_threaded() -> Self {
let map =
DashMap::with_capacity_and_hasher(CAPACITY_HASHMAP, ahash::RandomState::default());
/// Inserts a key-count pair into the map. If the map did not have this key
/// present, `None` is returned. If the map did have this key present, the
/// value is updated, and the old value is returned.
pub(crate) fn insert(&mut self, key: String, count: usize) -> Option<usize> {
use self::Occurrences::*;
match self {
SingleThreaded(map) => map.insert(key, count),
#[cfg(feature = "multithreaded")]
MultiThreaded(arc) => arc.insert(key, count),
/// Inserts a key-count pair into the map if the key does not already exist.
/// If the key does already exist, adds count to the current value of the
/// existing key.
pub(crate) fn insert_or_add(&mut self, key: String, count: usize) {
use self::Occurrences::*;
match self {
SingleThreaded(map) => *map.entry(key).or_insert(0) += count,
#[cfg(feature = "multithreaded")]
MultiThreaded(arc) => *arc.entry(key).or_insert(0) += count,
pub(crate) fn is_concurrent(&self) -> bool {
use self::Occurrences::*;
match self {
SingleThreaded(_) => false,
#[cfg(feature = "multithreaded")]
MultiThreaded(_) => true,
pub(crate) fn write_and_clear<W>(&mut self, mut writer: W) -> io::Result<()>
W: io::Write,
use self::Occurrences::*;
match self {
SingleThreaded(ref mut map) => {
let mut contents: Vec<_> = map.drain().collect();
for (key, value) in contents {
writeln!(writer, "{} {}", key, value)?;
#[cfg(feature = "multithreaded")]
MultiThreaded(ref mut arc) => {
let map = match Arc::get_mut(arc) {
Some(map) => map,
None => panic!(
"Attempting to drain the contents of a concurrent HashMap \
when more than one thread has access to it, which is \
not allowed."
let map = mem::replace(
let contents = map.iter().collect::<Vec<_>>();
let mut pairs = contents.iter().map(|pair| pair.pair()).collect::<Vec<_>>();
for (key, value) in pairs {
writeln!(writer, "{} {}", key, value)?;
/// Demangles partially demangled Rust symbols that were demangled incorrectly by profilers like
/// `sample` and `DTrace`.
/// For example:
/// `_$LT$grep_searcher..searcher..glue..ReadByLine$LT$$u27$s$C$$u20$M$C$$u20$R$C$$u20$S$GT$$GT$::run::h30ecedc997ad7e32`
/// becomes
/// `<grep_searcher::searcher::glue::ReadByLine<'s, M, R, S>>::run`
/// Non-Rust symobols, or Rust symbols that are already demangled, will be returned unchanged.
/// Based on code in
pub(crate) fn fix_partially_demangled_rust_symbol(symbol: &str) -> Cow<str> {
// Rust hashes are hex digits with an `h` prepended.
let is_rust_hash =
|s: &str| s.starts_with('h') && s[1..].chars().all(|c| c.is_ascii_hexdigit());
// If there's no trailing Rust hash just return the symbol as is.
if symbol.len() < RUST_HASH_LENGTH || !is_rust_hash(&symbol[symbol.len() - RUST_HASH_LENGTH..])
return Cow::Borrowed(symbol);
// Strip off trailing hash.
let mut rest = &symbol[..symbol.len() - RUST_HASH_LENGTH];
if rest.ends_with("::") {
rest = &rest[ - 2];
if rest.starts_with("_$") {
rest = &rest[1..];
let mut demangled = String::new();
while !rest.is_empty() {
if rest.starts_with('.') {
if let Some('.') = rest[1..].chars().next() {
rest = &rest[2..];
} else {
rest = &rest[1..];
} else if rest.starts_with('$') {
macro_rules! demangle {
($($pat:expr => $demangled:expr,)*) => ({
$(if rest.starts_with($pat) {
rest = &rest[$pat.len()..];
} else)*
demangle! {
"$SP$" => "@",
"$BP$" => "*",
"$RF$" => "&",
"$LT$" => "<",
"$GT$" => ">",
"$LP$" => "(",
"$RP$" => ")",
"$C$" => ",",
"$u7e$" => "~",
"$u20$" => " ",
"$u27$" => "'",
"$u3d$" => "=",
"$u5b$" => "[",
"$u5d$" => "]",
"$u7b$" => "{",
"$u7d$" => "}",
"$u3b$" => ";",
"$u2b$" => "+",
"$u21$" => "!",
"$u22$" => "\"",
} else {
let idx = match rest.char_indices().find(|&(_, c)| c == '$' || c == '.') {
None => rest.len(),
Some((i, _)) => i,
rest = &rest[idx..];
pub(crate) mod testing {
use std::collections::HashMap;
use std::fmt;
use std::fs::File;
use std::io::Write;
use std::io::{self, BufRead, Read};
use std::path::{Path, PathBuf};
use std::time::Instant;
use libflate::gzip::Decoder;
use super::*;
use crate::collapse::Collapse;
pub(crate) fn read_inputs<P>(inputs: &[P]) -> io::Result<HashMap<PathBuf, Vec<u8>>>
P: AsRef<Path>,
let mut map = HashMap::default();
for path in inputs.iter() {
let path = path.as_ref();
let bytes = {
let mut buf = Vec::new();
let mut file = File::open(path)?;
if path.to_str().unwrap().ends_with(".gz") {
let mut reader = Decoder::new(file)?;
reader.read_to_end(&mut buf)?;
} else {
file.read_to_end(&mut buf)?;
map.insert(path.to_path_buf(), bytes);
pub(crate) fn test_collapse_multi<C, P>(folder: &mut C, inputs: &[P]) -> io::Result<()>
C: Collapse + CollapsePrivate,
P: AsRef<Path>,
const MAX_THREADS: usize = 16;
for (path, bytes) in read_inputs(inputs)? {
let mut writer = Vec::new();
<C as Collapse>::collapse(folder, &bytes[..], &mut writer)?;
let expected = std::str::from_utf8(&writer[..]).unwrap();
for n in 2..=MAX_THREADS {
let mut writer = Vec::new();
<C as Collapse>::collapse(folder, &bytes[..], &mut writer)?;
let actual = std::str::from_utf8(&writer[..]).unwrap();
"Collapsing with {} threads does not produce the same output as collapsing with 1 thread for {}",
pub(crate) fn bench_nstacks<C, P>(folder: &mut C, inputs: &[P]) -> io::Result<()>
C: CollapsePrivate,
P: AsRef<Path>,
const MIN_LINES: usize = 2000;
const NSAMPLES: usize = 100;
const WARMUP_SECS: usize = 3;
let _stdout = io::stdout();
let _stderr = io::stdout();
let mut stdout = _stdout.lock();
let _stderr = _stderr.lock();
struct Foo<'a> {
default: usize,
nlines: usize,
nstacks: usize,
path: &'a Path,
results: HashMap<usize, u64>,
impl<'a> Foo<'a> {
fn new<C>(
folder: &mut C,
path: &'a Path,
bytes: &[u8],
stdout: &mut io::StdoutLock,
) -> io::Result<Option<Self>>
C: CollapsePrivate,
let default = folder.nstacks_per_job();
let (nlines, nstacks) = count_lines_and_stacks(bytes);
if nlines < MIN_LINES {
return Ok(None);
let mut results = HashMap::default();
let iter = vec![default]
for nstacks_per_job in iter {
let mut durations = Vec::new();
for _ in 0..NSAMPLES {
let now = Instant::now();
folder.collapse(bytes, io::sink())?;
let avg_duration =
(durations.iter().sum::<u128>() as f64 / durations.len() as f64) as u64;
results.insert(nstacks_per_job, avg_duration);
Ok(Some(Self {
impl<'a> fmt::Display for Foo<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
"{} (nstacks: {}, lines: {})",
let default_duration = self.results[&self.default];
let mut results = self.results.iter().collect::<Vec<_>>();
results.sort_by(|(_, d1), (_, d2)| (**d1).cmp(*d2));
for (nstacks_per_job, duration) in results.iter().take(10) {
" nstacks_per_job: {:>4} (% of total: {:>3.0}%) | time: {:.0}% of default",
(**nstacks_per_job as f32 / self.nstacks as f32) * 100.0,
**duration as f64 / default_duration as f64 * 100.0,
fn count_lines_and_stacks(bytes: &[u8]) -> (usize, usize) {
let mut reader = io::BufReader::new(bytes);
let mut line = Vec::new();
let (mut nlines, mut nstacks) = (0, 0);
loop {
let n = reader.read_until(0x0A, &mut line).unwrap();
if n == 0 {
nstacks += 1;
let l = String::from_utf8_lossy(&line);
nlines += 1;
if l.trim().is_empty() {
nstacks += 1;
(nlines, nstacks)
let inputs = read_inputs(inputs)?;
// Warmup
let now = Instant::now();
"# Warming up for approximately {} seconds.\n",
while now.elapsed() < std::time::Duration::from_secs(WARMUP_SECS as u64) {
for (_, bytes) in inputs.iter() {
folder.collapse(&bytes[..], io::sink())?;
// Time
let mut foos = Vec::new();
for (path, bytes) in &inputs {
stdout.write_fmt(format_args!("# {} ", path.display()))?;
if let Some(foo) = Foo::new(folder, path, bytes, &mut stdout)? {
foos.sort_by(|a, b| b.nstacks.cmp(&a.nstacks));
for foo in foos {
stdout.write_fmt(format_args!("{}", foo))?;
mod tests {
macro_rules! t {
($a:expr, $b:expr) => {
assert!(ok($a, $b))
macro_rules! t_unchanged {
($a:expr) => {
fn ok(sym: &str, expected: &str) -> bool {
let result = super::fix_partially_demangled_rust_symbol(sym);
if result == expected {
} else {
println!("\n{}\n!=\n{}\n", result, expected);
fn ok_unchanged(sym: &str) -> bool {
let result = super::fix_partially_demangled_rust_symbol(sym);
if result == sym {
} else {
println!("{} should have been unchanged, but got {}", sym, result);
fn fix_partially_demangled_rust_symbols() {
t!("_$LT$std..fs..ReadDir$u20$as$u20$core..iter..traits..iterator..Iterator$GT$::next::hc14f1750ca79129b", "<std::fs::ReadDir as core::iter::traits::iterator::Iterator>::next");
t!("rg::search_parallel::_$u7b$$u7b$closure$u7d$$u7d$::_$u7b$$u7b$closure$u7d$$u7d$::h6e849b55a66fcd85", "rg::search_parallel::_{{closure}}::_{{closure}}");
"<F as alloc::boxed::FnBox<A>>::call_box"
"<&std::fs::File as std::io::Read>::read"
t!("crossbeam_utils::thread::ScopedThreadBuilder::spawn::_$u7b$$u7b$closure$u7d$$u7d$::h8fdc7d4f74c0da05", "crossbeam_utils::thread::ScopedThreadBuilder::spawn::_{{closure}}");
fn fix_partially_demangled_rust_symbol_on_fully_mangled_symbols() {
fn fix_partially_demangled_rust_symbol_on_fully_demangled_symbols() {
t_unchanged!("<F as alloc::boxed::FnBox<A>>::call_box");
t_unchanged!("<std::fs::ReadDir as core::iter::traits::iterator::Iterator>::next");
t_unchanged!("<grep_searcher::searcher::glue::ReadByLine<'s, M, R, S>>::run");
t_unchanged!("<alloc::raw_vec::RawVec<T, A>>::reserve_internal");