| """Handle the details of subprocess calls and retries for a given benchmark run.""" |
| import dataclasses |
| import json |
| import os |
| import pickle |
| import signal |
| import subprocess |
| import time |
| import uuid |
| from typing import List, Optional, TYPE_CHECKING, Union |
| |
| from core.api import AutoLabels |
| from core.types import Label |
| from core.utils import get_temp_dir |
| from worker.main import ( |
| WORKER_PATH, |
| WorkerFailure, |
| WorkerOutput, |
| WorkerTimerArgs, |
| WorkerUnpickler, |
| ) |
| |
| if TYPE_CHECKING: |
| PopenType = subprocess.Popen[bytes] |
| else: |
| PopenType = subprocess.Popen |
| |
| |
| # Mitigate https://github.com/pytorch/pytorch/issues/37377 |
| _ENV = "MKL_THREADING_LAYER=GNU" |
| _PYTHON = "python" |
| PYTHON_CMD = f"{_ENV} {_PYTHON}" |
| |
| # We must specify `bash` so that `source activate ...` always works |
| SHELL = "/bin/bash" |
| |
| |
| @dataclasses.dataclass(frozen=True) |
| class WorkOrder: |
| """Spec to schedule work with the benchmark runner.""" |
| |
| label: Label |
| autolabels: AutoLabels |
| timer_args: WorkerTimerArgs |
| source_cmd: Optional[str] = None |
| timeout: Optional[float] = None |
| retries: int = 0 |
| |
| def __hash__(self) -> int: |
| return id(self) |
| |
| def __str__(self) -> str: |
| return json.dumps( |
| { |
| "label": self.label, |
| "autolabels": self.autolabels.as_dict, |
| "num_threads": self.timer_args.num_threads, |
| } |
| ) |
| |
| |
| class _BenchmarkProcess: |
| """Wraps subprocess.Popen for a given WorkOrder.""" |
| |
| _work_order: WorkOrder |
| _cpu_list: Optional[str] |
| _proc: PopenType |
| |
| # Internal bookkeeping |
| _communication_file: str |
| _start_time: float |
| _end_time: Optional[float] = None |
| _retcode: Optional[int] |
| _result: Optional[Union[WorkerOutput, WorkerFailure]] = None |
| |
| def __init__(self, work_order: WorkOrder, cpu_list: Optional[str]) -> None: |
| self._work_order = work_order |
| self._cpu_list = cpu_list |
| self._start_time = time.time() |
| self._communication_file = os.path.join(get_temp_dir(), f"{uuid.uuid4()}.pkl") |
| with open(self._communication_file, "wb") as f: |
| pickle.dump(self._work_order.timer_args, f) |
| |
| self._proc = subprocess.Popen( |
| self.cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| shell=True, |
| executable=SHELL, |
| ) |
| |
| def clone(self) -> "_BenchmarkProcess": |
| return _BenchmarkProcess(self._work_order, self._cpu_list) |
| |
| @property |
| def cmd(self) -> str: |
| cmd: List[str] = [] |
| if self._work_order.source_cmd is not None: |
| cmd.extend([self._work_order.source_cmd, "&&"]) |
| |
| cmd.append(_ENV) |
| |
| if self._cpu_list is not None: |
| cmd.extend( |
| [ |
| f"GOMP_CPU_AFFINITY={self._cpu_list}", |
| "taskset", |
| "--cpu-list", |
| self._cpu_list, |
| ] |
| ) |
| |
| cmd.extend( |
| [ |
| _PYTHON, |
| WORKER_PATH, |
| "--communication-file", |
| self._communication_file, |
| ] |
| ) |
| return " ".join(cmd) |
| |
| @property |
| def duration(self) -> float: |
| return (self._end_time or time.time()) - self._start_time |
| |
| @property |
| def result(self) -> Union[WorkerOutput, WorkerFailure]: |
| self._maybe_collect() |
| assert self._result is not None |
| return self._result |
| |
| def poll(self) -> Optional[int]: |
| self._maybe_collect() |
| return self._retcode |
| |
| def interrupt(self) -> None: |
| """Soft interrupt. Allows subprocess to cleanup.""" |
| self._proc.send_signal(signal.SIGINT) |
| |
| def terminate(self) -> None: |
| """Hard interrupt. Immediately SIGTERM subprocess.""" |
| self._proc.terminate() |
| |
| def _maybe_collect(self) -> None: |
| if self._result is not None: |
| # We've already collected the results. |
| return |
| |
| self._retcode = self._proc.poll() |
| if self._retcode is None: |
| # `_proc` is still running |
| return |
| |
| with open(self._communication_file, "rb") as f: |
| result = WorkerUnpickler(f).load_output() |
| |
| if isinstance(result, WorkerOutput) and self._retcode: |
| # Worker managed to complete the designated task, but worker |
| # process did not finish cleanly. |
| result = WorkerFailure("Worker failed silently.") |
| |
| if isinstance(result, WorkerTimerArgs): |
| # Worker failed, but did not write a result so we're left with the |
| # original TimerArgs. Grabbing all of stdout and stderr isn't |
| # ideal, but we don't have a better way to determine what to keep. |
| proc_stdout = self._proc.stdout |
| assert proc_stdout is not None |
| result = WorkerFailure(failure_trace=proc_stdout.read().decode("utf-8")) |
| |
| self._result = result |
| self._end_time = time.time() |
| |
| # Release communication file. |
| os.remove(self._communication_file) |
| |
| |
| class InProgress: |
| """Used by the benchmark runner to track outstanding jobs. |
| This class handles bookkeeping and timeout + retry logic. |
| """ |
| |
| _proc: _BenchmarkProcess |
| _timeouts: int = 0 |
| |
| def __init__(self, work_order: WorkOrder, cpu_list: Optional[str]): |
| self._work_order = work_order |
| self._proc = _BenchmarkProcess(work_order, cpu_list) |
| |
| @property |
| def work_order(self) -> WorkOrder: |
| return self._proc._work_order |
| |
| @property |
| def cpu_list(self) -> Optional[str]: |
| return self._proc._cpu_list |
| |
| @property |
| def proc(self) -> _BenchmarkProcess: |
| # NB: For cleanup only. |
| return self._proc |
| |
| @property |
| def duration(self) -> float: |
| return self._proc.duration |
| |
| def check_finished(self) -> bool: |
| if self._proc.poll() is not None: |
| return True |
| |
| timeout = self.work_order.timeout |
| if timeout is None or self._proc.duration < timeout: |
| return False |
| |
| self._timeouts += 1 |
| max_attempts = (self._work_order.retries or 0) + 1 |
| if self._timeouts < max_attempts: |
| print( |
| f"\nTimeout: {self._work_order.label}, {self._work_order.autolabels} " |
| f"(Attempt {self._timeouts} / {max_attempts})" |
| ) |
| self._proc.interrupt() |
| self._proc = self._proc.clone() |
| return False |
| |
| raise subprocess.TimeoutExpired(cmd=self._proc.cmd, timeout=timeout) |
| |
| @property |
| def result(self) -> Union[WorkerOutput, WorkerFailure]: |
| return self._proc.result |
| |
| def __hash__(self) -> int: |
| return id(self) |