blob: c745fde88fa1de4edc33511dcb670e678b442d40 [file] [log] [blame]
"""Handle the details of subprocess calls and retries for a given benchmark run."""
import dataclasses
import json
import os
import pickle
import signal
import subprocess
import time
import uuid
from typing import List, Optional, TYPE_CHECKING, Union
from core.api import AutoLabels
from core.types import Label
from core.utils import get_temp_dir
from worker.main import (
WORKER_PATH,
WorkerFailure,
WorkerOutput,
WorkerTimerArgs,
WorkerUnpickler,
)
if TYPE_CHECKING:
PopenType = subprocess.Popen[bytes]
else:
PopenType = subprocess.Popen
# Mitigate https://github.com/pytorch/pytorch/issues/37377
_ENV = "MKL_THREADING_LAYER=GNU"
_PYTHON = "python"
PYTHON_CMD = f"{_ENV} {_PYTHON}"
# We must specify `bash` so that `source activate ...` always works
SHELL = "/bin/bash"
@dataclasses.dataclass(frozen=True)
class WorkOrder:
"""Spec to schedule work with the benchmark runner."""
label: Label
autolabels: AutoLabels
timer_args: WorkerTimerArgs
source_cmd: Optional[str] = None
timeout: Optional[float] = None
retries: int = 0
def __hash__(self) -> int:
return id(self)
def __str__(self) -> str:
return json.dumps(
{
"label": self.label,
"autolabels": self.autolabels.as_dict,
"num_threads": self.timer_args.num_threads,
}
)
class _BenchmarkProcess:
"""Wraps subprocess.Popen for a given WorkOrder."""
_work_order: WorkOrder
_cpu_list: Optional[str]
_proc: PopenType
# Internal bookkeeping
_communication_file: str
_start_time: float
_end_time: Optional[float] = None
_retcode: Optional[int]
_result: Optional[Union[WorkerOutput, WorkerFailure]] = None
def __init__(self, work_order: WorkOrder, cpu_list: Optional[str]) -> None:
self._work_order = work_order
self._cpu_list = cpu_list
self._start_time = time.time()
self._communication_file = os.path.join(get_temp_dir(), f"{uuid.uuid4()}.pkl")
with open(self._communication_file, "wb") as f:
pickle.dump(self._work_order.timer_args, f)
self._proc = subprocess.Popen(
self.cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
executable=SHELL,
)
def clone(self) -> "_BenchmarkProcess":
return _BenchmarkProcess(self._work_order, self._cpu_list)
@property
def cmd(self) -> str:
cmd: List[str] = []
if self._work_order.source_cmd is not None:
cmd.extend([self._work_order.source_cmd, "&&"])
cmd.append(_ENV)
if self._cpu_list is not None:
cmd.extend(
[
f"GOMP_CPU_AFFINITY={self._cpu_list}",
"taskset",
"--cpu-list",
self._cpu_list,
]
)
cmd.extend(
[
_PYTHON,
WORKER_PATH,
"--communication-file",
self._communication_file,
]
)
return " ".join(cmd)
@property
def duration(self) -> float:
return (self._end_time or time.time()) - self._start_time
@property
def result(self) -> Union[WorkerOutput, WorkerFailure]:
self._maybe_collect()
assert self._result is not None
return self._result
def poll(self) -> Optional[int]:
self._maybe_collect()
return self._retcode
def interrupt(self) -> None:
"""Soft interrupt. Allows subprocess to cleanup."""
self._proc.send_signal(signal.SIGINT)
def terminate(self) -> None:
"""Hard interrupt. Immediately SIGTERM subprocess."""
self._proc.terminate()
def _maybe_collect(self) -> None:
if self._result is not None:
# We've already collected the results.
return
self._retcode = self._proc.poll()
if self._retcode is None:
# `_proc` is still running
return
with open(self._communication_file, "rb") as f:
result = WorkerUnpickler(f).load_output()
if isinstance(result, WorkerOutput) and self._retcode:
# Worker managed to complete the designated task, but worker
# process did not finish cleanly.
result = WorkerFailure("Worker failed silently.")
if isinstance(result, WorkerTimerArgs):
# Worker failed, but did not write a result so we're left with the
# original TimerArgs. Grabbing all of stdout and stderr isn't
# ideal, but we don't have a better way to determine what to keep.
proc_stdout = self._proc.stdout
assert proc_stdout is not None
result = WorkerFailure(failure_trace=proc_stdout.read().decode("utf-8"))
self._result = result
self._end_time = time.time()
# Release communication file.
os.remove(self._communication_file)
class InProgress:
"""Used by the benchmark runner to track outstanding jobs.
This class handles bookkeeping and timeout + retry logic.
"""
_proc: _BenchmarkProcess
_timeouts: int = 0
def __init__(self, work_order: WorkOrder, cpu_list: Optional[str]):
self._work_order = work_order
self._proc = _BenchmarkProcess(work_order, cpu_list)
@property
def work_order(self) -> WorkOrder:
return self._proc._work_order
@property
def cpu_list(self) -> Optional[str]:
return self._proc._cpu_list
@property
def proc(self) -> _BenchmarkProcess:
# NB: For cleanup only.
return self._proc
@property
def duration(self) -> float:
return self._proc.duration
def check_finished(self) -> bool:
if self._proc.poll() is not None:
return True
timeout = self.work_order.timeout
if timeout is None or self._proc.duration < timeout:
return False
self._timeouts += 1
max_attempts = (self._work_order.retries or 0) + 1
if self._timeouts < max_attempts:
print(
f"\nTimeout: {self._work_order.label}, {self._work_order.autolabels} "
f"(Attempt {self._timeouts} / {max_attempts})"
)
self._proc.interrupt()
self._proc = self._proc.clone()
return False
raise subprocess.TimeoutExpired(cmd=self._proc.cmd, timeout=timeout)
@property
def result(self) -> Union[WorkerOutput, WorkerFailure]:
return self._proc.result
def __hash__(self) -> int:
return id(self)