| """Run benchmarks while handling parallelism, isolation, and fault tolerance.""" |
| import math |
| import multiprocessing |
| import subprocess |
| import textwrap |
| import threading |
| import time |
| from typing import Dict, List, Optional, Set, Tuple, Union |
| |
| from execution.work import PYTHON_CMD, SHELL, InProgress, WorkOrder |
| from worker.main import WorkerFailure, WorkerOutput |
| |
| |
| CPU_COUNT: int = multiprocessing.cpu_count() |
| |
| |
| class WorkerFailed(Exception): |
| """Raised in the main process when a worker failure is detected.""" |
| def __init__(self, cmd: str, wrapped_trace: Optional[str] = None) -> None: |
| self.cmd: str = cmd |
| self.wrapped_trace: Optional[str] = wrapped_trace |
| super().__init__() |
| |
| |
| class CorePool: |
| """Allocator style helper class to assign individual tasks to a core range. |
| |
| Pinning tasks to separate cores (or core ranges if `num_threads` > 1) |
| serves two purposes. First, it prevents the machine from being overloaded, |
| which can result in OOMs or Callgrind crashes. Second, it helps reduce |
| noise in the wall times, which are collected as a secondary metric. For |
| multi-threaded workloads, adjacency is important. Often pairs of cores |
| share silicon (e.g. cache), while far away cores may lie on separate NUMA |
| nodes. For this reason, CorePool will only allocate contiguous core ranges. |
| This falls short of full architecture awareness, and instead tries to find |
| a balance between rigor and engineering complexity. |
| """ |
| def __init__(self, min_core_id: int, max_core_id: int) -> None: |
| assert min_core_id >= 0 |
| assert max_core_id >= min_core_id |
| assert max_core_id < CPU_COUNT |
| |
| self._min_core_id: int = min_core_id |
| self._max_core_id: int = max_core_id |
| self._num_cores = max_core_id - min_core_id + 1 |
| print(f"Core pool created: cores {self._min_core_id}-{self._max_core_id}") |
| |
| self._available: List[bool] = [ |
| True for _ in range(min_core_id, min_core_id + self._num_cores)] |
| |
| self._reservations: Dict[str, Tuple[int, ...]] = {} |
| self._lock = threading.Lock() |
| |
| def reserve(self, n: int) -> Optional[str]: |
| """Simple first-fit policy. |
| |
| If successful, return a string for `taskset`. Otherwise, return None. |
| """ |
| with self._lock: |
| for lower_index in range(self._num_cores - n + 1): |
| indices = tuple(range(lower_index, lower_index + n)) |
| if all(self._available[i] for i in indices): |
| for i in indices: |
| self._available[i] = False |
| |
| lower_core = indices[0] + self._min_core_id |
| upper_core = indices[-1] + self._min_core_id |
| key = f"{lower_core}-{upper_core}" if n > 1 else f"{lower_core}" |
| self._reservations[key] = indices |
| return key |
| return None |
| |
| def release(self, key: str) -> None: |
| with self._lock: |
| for i in self._reservations[key]: |
| self._available[i] = True |
| self._reservations.pop(key) |
| |
| |
| class Runner: |
| def __init__( |
| self, |
| work_items: Tuple[WorkOrder, ...], |
| core_pool: Optional[CorePool] = None, |
| cadence: float = 1.0, |
| ) -> None: |
| self._work_items: Tuple[WorkOrder, ...] = work_items |
| self._core_pool: CorePool = core_pool or CorePool(0, CPU_COUNT - 4) |
| self._cadence: float = cadence |
| |
| # Working state. |
| self._work_queue: List[WorkOrder] = list(work_items) |
| self._active_jobs: List[InProgress] = [] |
| self._results: Dict[WorkOrder, WorkerOutput] = {} |
| |
| # Debug information for ETA and error messages. |
| self._start_time: float = -1 |
| self._durations: Dict[WorkOrder, float] = {} |
| self._currently_processed: Optional[WorkOrder] = None |
| |
| if len(work_items) != len(set(work_items)): |
| raise ValueError('Duplicate work items.') |
| |
| def run(self) -> Dict[WorkOrder, WorkerOutput]: |
| try: |
| return self._run() |
| |
| except KeyboardInterrupt: |
| print("\n\nKeyboardInterrupt (ctrl-c) detected. Shutting down children.") |
| self._force_shutdown(verbose=False) |
| raise |
| |
| except subprocess.TimeoutExpired: |
| print("\n\nJob timed out. Shutting down children.") |
| self._force_shutdown(verbose=True) |
| raise |
| |
| except WorkerFailed as e: |
| print('Shutting down all outstanding jobs before re-raising.') |
| self._force_shutdown(verbose=True) |
| print(f"Cmd: {e.cmd}") |
| if e.wrapped_trace: |
| print(e.wrapped_trace) |
| else: |
| print('Unknown failure. (Worker did not report exception contents.)') |
| raise |
| |
| except BaseException: |
| print("\n\nUnknown exception. Shutting down jobs before re-raising.") |
| self._force_shutdown(verbose=True) |
| raise |
| |
| def _run(self) -> Dict[WorkOrder, WorkerOutput]: |
| self._start_time = time.time() |
| self._canary_import() |
| while self._work_queue or self._active_jobs: |
| t0 = time.time() |
| self._update_active_jobs() |
| self._enqueue_new_jobs() |
| self._print_progress() |
| time.sleep(max(self._cadence - (time.time() - t0), 0.0)) |
| print(f"\nTotal time: {time.time() - self._start_time:.0f} seconds") |
| return self._results.copy() |
| |
| def _update_active_jobs(self) -> None: |
| active_jobs: List[InProgress] = [] |
| for job in self._active_jobs: |
| self._currently_processed = job.work_order |
| if not job.check_finished(): |
| active_jobs.append(job) |
| continue |
| |
| result: Union[WorkerOutput, WorkerFailure] = job.result |
| if isinstance(result, WorkerOutput): |
| self._results[job.work_order] = result |
| assert job.cpu_list is not None |
| self._core_pool.release(job.cpu_list) |
| self._durations[job.work_order] = job.duration |
| |
| else: |
| assert isinstance(result, WorkerFailure) |
| raise WorkerFailed(cmd=job.proc.cmd, wrapped_trace=result.failure_trace) |
| self._currently_processed = None |
| self._active_jobs.clear() |
| self._active_jobs.extend(active_jobs) |
| |
| def _enqueue_new_jobs(self) -> None: |
| work_queue: List[WorkOrder] = [] |
| for i, work_order in enumerate(self._work_queue): |
| self._currently_processed = work_order |
| cpu_list = self._core_pool.reserve(work_order.timer_args.num_threads) |
| |
| if cpu_list is None: |
| work_queue.append(work_order) |
| else: |
| self._active_jobs.append(InProgress(work_order, cpu_list)) |
| |
| # Stagger creation. This helps with contention. |
| time.sleep(0.5) |
| self._currently_processed = None |
| self._work_queue.clear() |
| self._work_queue.extend(work_queue) |
| |
| def _print_progress(self) -> None: |
| fraction = f"{len(self._results)} / {len(self._work_items)}" |
| elapsed = f"{time.time() - self._start_time:.0f} seconds" |
| if len(self._results) < 5: |
| eta = "Unknown" |
| else: |
| remaining = len(self._work_items) - len(self._results) |
| iters_remaining = math.ceil(remaining / self._core_pool._num_cores) |
| mean_time = sum(self._durations.values()) / len(self._durations) |
| eta_minutes = math.ceil(iters_remaining * mean_time / 60) |
| eta = f"~{eta_minutes:.0f} minute{'s' if eta_minutes > 1 else ''}" |
| print(f"\r{fraction} ({elapsed}), ETA: {eta}", end="") |
| |
| def _force_shutdown(self, verbose: bool = False) -> None: |
| """Try to interrupt jobs, and kill if need be. |
| We would prefer to softly terminate jobs so that they have a chance to |
| clean up before shutting down. |
| """ |
| for job in self._active_jobs: |
| job.proc.interrupt() |
| |
| if verbose and self._currently_processed is not None: |
| print(textwrap.dedent(f""" |
| Failed when processing the following Job: |
| Label: {self._currently_processed.label} |
| AutoLabels: {self._currently_processed.autolabels} |
| Source cmd: {self._currently_processed.source_cmd} |
| """).strip() + "\n") |
| |
| if self._active_jobs: |
| time.sleep(0.5) |
| |
| remaining_jobs = [j for j in self._active_jobs if j.proc.poll() is None] |
| if remaining_jobs: |
| print( |
| f'SIGINT sent to {len(self._active_jobs)} jobs, ' |
| f'{len(remaining_jobs)} have not yet exited.\n' |
| 'Entering short cleanup loop, after which stragglers will ' |
| 'be forcibly terminated.' |
| ) |
| |
| for _ in range(5): |
| time.sleep(2.0) |
| remaining_jobs = [j for j in remaining_jobs if j.proc.poll() is None] |
| if remaining_jobs: |
| print(f'{len(remaining_jobs)} still remain.') |
| else: |
| print('All remaining jobs have gracefully terminated.') |
| return |
| |
| print(f'{len(remaining_jobs)} jobs refused to exit. Forcibly terminating.') |
| for j in remaining_jobs: |
| j.proc.terminate() |
| |
| def _canary_import(self) -> None: |
| """Make sure we can import torch before launching a slew of workers.""" |
| source_cmds: Set[str] = set() |
| for w in self._work_items: |
| if w.source_cmd is not None: |
| source_cmds.add(f"{w.source_cmd} && ") |
| |
| for source_cmd in (source_cmds or {""}): |
| cmd = f'{source_cmd}{PYTHON_CMD} -c "import torch"' |
| proc = subprocess.run( |
| cmd, |
| shell=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| encoding="utf-8", |
| executable=SHELL, |
| ) |
| |
| if proc.returncode: |
| raise ImportError( |
| f'Failed to import torch in subprocess: {cmd}\n{proc.stdout}') |