blob: 38951ac5091f26ca395df730ce51b9a465ad97a1 [file] [log] [blame]
import contextlib
import json
import os
import time
import numpy as np
import torch
from . import tensor_engine
class Benchmark:
def __init__(self, mode, device, dtype):
self.mode = mode
self.deterministic = False
self.device = device
self.dtype = dtype
self.output_type = "stdout"
self.print_ir = False
self.print_kernel = False
if mode == "both":
self.requires_grad = True
elif mode == "fwd":
self.requires_grad = False
else:
raise ValueError(f"invalid mode: {mode}")
self.result_grad = None
self.grad_variables = []
self.engine = tensor_engine.get_engine()
self.engine.reset(device)
# forward all member functions in self.engine to self
for method in dir(self.engine):
if not callable(getattr(self.engine, method)):
continue
# don't forward if this function is overriden here
if hasattr(self, method):
continue
# don't forward if it is a internal function
if method.startswith("_"):
continue
method_engine = getattr(self.engine, method)
setattr(self, method, method_engine)
def forward(self):
"""do one step worth of computation"""
raise ValueError("this method should be reimplemented by subclass")
def check(self):
if not self.deterministic:
return
np.testing.assert_allclose(
self.reference(), self.numpy(self.compute()), atol=1e-2
)
def config(self):
"""returns an array for the current benchmark configs"""
raise ValueError("this method should be reimplemented by subclass")
def desc(self):
"""return the description of the current benchmark"""
config = self.config()
config_str = "_".join([str(x) for x in config])
device = self.device
if "NNC_NUM_THREADS" in os.environ:
num_threads_str = os.environ["NNC_NUM_THREADS"]
device += num_threads_str
return f"{self.engine.mode}: {self.module()}_{self.mode}_{device}_{config_str}"
@staticmethod
def module():
raise ValueError("this method should be reimplemented by subclass")
def memory_workload(self):
raise ValueError("this method should be reimplemented by subclass")
def compute_workload(self):
"""return the number of scalar operations it takes to finish the tensor op"""
return None
@staticmethod
def input_iterable():
"""A benchmark child class should return true if it utilizes the input iter arg"""
return False
def dtype_to_bytes(self):
return torch.tensor(0, dtype=self.dtype).element_size()
@staticmethod
def default_configs():
"""return a list of defualt configs for this benchmark"""
raise ValueError("this method should be reimplemented by subclass")
def is_supported(self):
return True
def rand(self, shape, device=None, dtype=None, requires_grad=False):
v = self.engine.rand(
shape, device=device, dtype=dtype, requires_grad=requires_grad
)
if requires_grad:
self.grad_variables.append(v)
return v
def nchw_rand(self, shape, device=None, requires_grad=False):
v = self.engine.nchw_rand(shape, device=device, requires_grad=requires_grad)
if requires_grad:
self.grad_variables.append(v)
return v
def compute(self):
if self.bm_jit:
return self.bm_jit(*self.inputs)
else:
return self.forward(*self.inputs)
def run(self, args):
self.print_ir = args.print_ir
if args.cuda_fuser == "old":
torch._C._jit_override_can_fuse_on_gpu(True)
if args.print_kernel:
os.environ["PYTORCH_FUSION_DEBUG"] = "1"
return self.run_impl(True)
elif args.cuda_fuser == "te":
torch._C._jit_set_texpr_fuser_enabled(True)
with cuda_pointwise_context(
args.cuda_pointwise_loop_levels,
args.cuda_pointwise_block_count,
args.cuda_pointwise_block_size,
):
return self.run_impl(True)
elif args.cuda_fuser == "nvf":
torch._C._jit_set_nvfuser_enabled(True)
torch._C._jit_set_profiling_executor(True)
torch._C._jit_set_profiling_mode(True)
torch._C._jit_override_can_fuse_on_cpu(False)
torch._C._jit_override_can_fuse_on_gpu(False)
torch._C._jit_set_bailout_depth(20)
if args.print_kernel:
os.environ["PYTORCH_CUDA_FUSER_DEBUG"] = "1"
return self.run_impl(True)
else:
return self.run_impl(False)
def run_impl(self, use_fuser):
warmups = 10
if self.device == "cuda":
iters = 1000
else:
iters = 10
engine = tensor_engine.get_engine()
self.bm_jit = None
for i in range(warmups + iters):
if i == warmups:
if self.device == "cuda":
engine.sync_cuda()
time_start = time.time()
if i == 0:
if self.jit_mode == "trace" and use_fuser:
self.bm_jit = torch.jit.trace(
self.forward, example_inputs=self.inputs, check_trace=False
)
if callable(getattr(self, "reference", None)):
self.check()
else:
print("Warning: no reference result for ", self.module())
elif i == 1:
# The fusion graph is visible after the first iter is executed
if self.jit_mode == "trace" and use_fuser and self.print_ir:
print(self.bm_jit.graph_for(*self.inputs))
z = self.compute()
if self.mode == "both":
if self.result_grad is None:
self.result_grad = engine.rand_like(z)
engine.backward([z], [self.result_grad], self.grad_variables)
if self.device == "cuda":
engine.sync_cuda()
duration = time.time() - time_start
iter_time = duration / iters
memory_workload = self.memory_workload()
compute_workload = self.compute_workload()
result_dict = {
"desc": self.desc(),
"us": iter_time * 1e6,
"sol": memory_workload["sol"] * self.dtype_to_bytes() / iter_time / 1e9,
"algorithmic": memory_workload["algorithmic"]
* self.dtype_to_bytes()
/ iter_time
/ 1e9,
}
if compute_workload:
result_dict["compute_workload"] = compute_workload / iter_time / 1e9
self.dump_result(result_dict)
def dump_result(self, result_dict):
if self.output_type == "json":
print(json.dumps(result_dict))
elif self.output_type == "stdout":
msg = "{}: {:.2f} us, SOL {:.2f} GB/s, algorithmic {:.2f} GB/s".format(
result_dict["desc"],
result_dict["us"],
result_dict["sol"],
result_dict["algorithmic"],
)
if "compute_workload" in result_dict:
msg += f", compute {result_dict['compute_workload']:.2f} Gops/s"
print(msg)
else:
raise Exception("Unknown output_type " + self.output_type) # noqa: TRY002
@contextlib.contextmanager
def cuda_pointwise_context(loop_levels, block_count, block_size):
if loop_levels:
old_loop_levels = torch._C._jit_get_te_cuda_pointwise_loop_levels()
torch._C._jit_set_te_cuda_pointwise_loop_levels(loop_levels)
if block_count:
old_block_count = torch._C._jit_get_te_cuda_pointwise_block_count()
torch._C._jit_set_te_cuda_pointwise_block_count(block_count)
if block_size:
old_block_size = torch._C._jit_get_te_cuda_pointwise_block_size()
torch._C._jit_set_te_cuda_pointwise_block_size(block_size)
try:
yield
finally:
if loop_levels:
torch._C._jit_set_te_cuda_pointwise_loop_levels(old_loop_levels)
if block_count:
torch._C._jit_set_te_cuda_pointwise_block_count(old_block_count)
if block_size:
torch._C._jit_set_te_cuda_pointwise_block_size(old_block_size)
# Auxiliary class to facilitate dynamic input shape
class DynamicShape:
r"""
An Auxiliary class for dynamic shape benchmarks
Pre-computes input with random shapes and also
modifies the compute method so in each call the
fuser sees a different input tensor shape
"""
# Number of random inputs in an instance
SAMPLE_SIZE = 100
def __init__(self, dynamic_range=1.2):
self._input_samples = []
self._input_sample_index = 0
self._dynamic_range = (
1.0 / dynamic_range if dynamic_range > 1.0 else dynamic_range
)
self._enable_dynamic_shapes = True
# Returns the input test case that current index points to
@property
def inputs(self):
return self._input_samples[self._input_sample_index]
# An inputs assignment actually adds a test case in the class buffer
@inputs.setter
def inputs(self, val):
self._input_samples.append(val)
# Runs normal compute while increment test case index
def compute(self):
super().compute()
self._input_sample_index = (self._input_sample_index + 1) % self.SAMPLE_SIZE
# Defined by benchmark, the benchmark needs to specify the input
# tensor construction in this method, essentially the same way
# a benchmark creates the inputs list in the initializer
def instantiate_input(self):
raise NotImplementedError
# Instantiate random shaped inputs and start the benchmark run
def run(self, args):
# force disable dynamic shape from command line
if args.no_dynamic_shape:
self._enable_dynamic_shapes = False
self.load_inputs()
super().run(args)
# pre-compute inputs so the creations of random tensors
# do not add to the compute time
def load_inputs(self):
for i in range(self.SAMPLE_SIZE - 1):
self.instantiate_input()
# returns a randomized shape
def rand_shape(self, shape):
if not self._enable_dynamic_shapes:
return shape
ratios = np.random.uniform(self._dynamic_range, 1.0, len(shape))
dyn_shape = list(np.multiply(shape, ratios).astype(int))
return dyn_shape
benchmark_classes = []
def register_benchmark_class(benchmark_cls):
benchmark_classes.append(benchmark_cls)