blob: a905ad60f53091456e15158fb3ff3b635724f3a7 [file] [log] [blame]
#!/usr/bin/env python3
#
# Measure distributed training iteration time.
#
# This program performs a sweep over a) a number of model architectures, and
# b) an increasing number of processes. This produces a 1-GPU baseline,
# an 8-GPU baseline (if applicable), as well as measurements for however
# many processes can participate in training.
#
import argparse
import itertools
import json
import os
import shlex
import subprocess
import sys
import time
import numpy as np
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torchvision
def allgather_object(obj):
out = [None for _ in range(dist.get_world_size())]
dist.all_gather_object(out, obj)
return out
def allgather_run(cmd):
proc = subprocess.run(shlex.split(cmd), capture_output=True)
assert(proc.returncode == 0)
return allgather_object(proc.stdout.decode("utf-8"))
def allequal(iterator):
iterator = iter(iterator)
try:
first = next(iterator)
except StopIteration:
return True
return all(first == rest for rest in iterator)
def benchmark_process_group(pg, benchmark, use_ddp_for_single_rank=True):
torch.manual_seed(pg.rank())
torch.cuda.manual_seed(pg.rank())
model = benchmark.create_model()
data = [(benchmark.generate_inputs(), benchmark.generate_target())]
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
model.parameters(),
0.001,
momentum=0.9,
weight_decay=1e-4)
if use_ddp_for_single_rank or pg.size() > 1:
model = torch.nn.parallel.DistributedDataParallel(
model,
device_ids=[torch.cuda.current_device()],
broadcast_buffers=False,
process_group=pg,
bucket_cap_mb=benchmark.bucket_size)
measurements = []
warmup_iterations = 5
measured_iterations = 10
for (inputs, target) in (data * (warmup_iterations + measured_iterations)):
start = time.time()
output = model(*inputs)
loss = criterion(output, target)
loss.backward()
optimizer.step()
torch.cuda.synchronize()
measurements.append(time.time() - start)
# Throw away measurements for warmup iterations
return measurements[warmup_iterations:]
def run_benchmark(benchmark, ranks, opts):
group = dist.new_group(ranks=ranks, backend=benchmark.distributed_backend)
measurements = []
if dist.get_rank() in set(ranks):
if not opts:
opts = {}
measurements = benchmark_process_group(group, benchmark, **opts)
dist.destroy_process_group(group)
dist.barrier()
# Aggregate measurements for better estimation of percentiles
return list(itertools.chain(*allgather_object(measurements)))
def sweep(benchmark):
# Synthesize the set of benchmarks to run.
# This list contain tuples for ("string prefix", [rank...]).
benchmarks = []
def append_benchmark(prefix, ranks, opts=None):
prefix = "%4d GPUs -- %s" % (len(ranks), prefix)
benchmarks.append((prefix, ranks, opts))
def local_print(msg):
if dist.get_rank() == 0:
print(msg, end='', flush=True) # noqa: E999
def print_header():
local_print("\n")
local_print("%22s" % "")
for p in [50, 75, 90, 95]:
local_print("%14s%10s" % ("sec/iter", "ex/sec"))
local_print("\n")
def print_measurements(prefix, nelem, measurements):
measurements = sorted(measurements)
local_print("%8s:" % prefix)
for p in [50, 75, 90, 95]:
v = np.percentile(measurements, p)
local_print(" p%02d: %1.3fs %6d/s" % (p, v, nelem / v))
local_print("\n")
# Every process runs once by themselves to warm up (CUDA init, etc).
append_benchmark(" warmup", [dist.get_rank()], {"use_ddp_for_single_rank": False})
# Single machine baselines
append_benchmark(" no ddp", range(1), {"use_ddp_for_single_rank": False})
append_benchmark(" 1M/1G", range(1))
append_benchmark(" 1M/2G", range(2))
append_benchmark(" 1M/4G", range(4))
# Multi-machine benchmarks
for i in range(1, (dist.get_world_size() // 8) + 1):
append_benchmark(" %dM/8G" % i, range(i * 8))
# Run benchmarks in order of increasing number of GPUs
print_header()
results = []
for prefix, ranks, opts in sorted(benchmarks, key=lambda tup: len(tup[1])):
# Turn range into materialized list.
ranks = list(ranks)
measurements = run_benchmark(benchmark, ranks, opts)
if "warmup" not in prefix:
print_measurements(prefix, benchmark.batch_size, measurements)
results.append({"ranks": ranks, "measurements": measurements})
return results
class Benchmark(object):
def __init__(self, device, distributed_backend, bucket_size):
self.device = device
self.batch_size = 32
self.distributed_backend = distributed_backend
self.bucket_size = bucket_size
def __str__(self):
raise NotImplementedError
def create_model(self):
raise NotImplementedError
def generate_inputs(self):
raise NotImplementedError
def generate_target(self):
raise NotImplementedError
class TorchvisionBenchmark(Benchmark):
def __init__(self, device, distributed_backend, bucket_size, model):
super(TorchvisionBenchmark, self).__init__(
device,
distributed_backend,
bucket_size,
)
self.model = model
def __str__(self):
return "{} with batch size {}".format(self.model, self.batch_size)
def create_model(self):
return torchvision.models.__dict__[self.model]().to(self.device)
def generate_inputs(self):
return [torch.rand([self.batch_size, 3, 224, 224], device=self.device)]
def generate_target(self):
return torch.tensor([1] * self.batch_size, dtype=torch.long, device=self.device)
def main():
parser = argparse.ArgumentParser(description='PyTorch distributed benchmark suite')
parser.add_argument("--rank", type=int, default=os.environ["RANK"])
parser.add_argument("--world-size", type=int, required=True)
parser.add_argument("--distributed-backend", type=str, default="nccl")
parser.add_argument("--bucket-size", type=int, default=25)
parser.add_argument("--master-addr", type=str, required=True)
parser.add_argument("--master-port", type=str, required=True)
parser.add_argument("--model", type=str)
parser.add_argument("--json", type=str, metavar="PATH", help="Write file with benchmark results")
args = parser.parse_args()
num_gpus_per_node = torch.cuda.device_count()
assert num_gpus_per_node == 8, "Expected 8 GPUs per machine"
# The global process group used only for communicating benchmark
# metadata, like measurements. Not for benchmarking itself.
dist.init_process_group(
backend="gloo",
init_method="tcp://{}:{}".format(args.master_addr, args.master_port),
rank=args.rank,
world_size=args.world_size,
)
output = allgather_run("nvidia-smi topo -m")
if not allequal(output):
print('Output of "nvidia-smi topo -m" differs between machines')
sys.exit(1)
if args.rank == 0:
print("-----------------------------------")
print("PyTorch distributed benchmark suite")
print("-----------------------------------")
print("")
print("* PyTorch version: {}".format(torch.__version__))
print("* CUDA version: {}".format(torch.version.cuda))
print("* Distributed backend: {}".format(args.distributed_backend))
print("* Maximum bucket size: {}MB".format(args.bucket_size))
print("")
print("--- nvidia-smi topo -m ---")
print("")
print(output[0])
print("--------------------------")
print("")
torch.cuda.set_device(dist.get_rank() % 8)
device = torch.device('cuda:%d' % (dist.get_rank() % 8))
benchmarks = []
if args.model:
benchmarks.append(
TorchvisionBenchmark(
device=device,
distributed_backend=args.distributed_backend,
bucket_size=args.bucket_size,
model=args.model))
else:
for model in ["resnet50", "resnet101", "resnext50_32x4d", "resnext101_32x8d"]:
benchmarks.append(
TorchvisionBenchmark(
device=device,
distributed_backend=args.distributed_backend,
bucket_size=args.bucket_size,
model=model))
benchmark_results = []
for benchmark in benchmarks:
if args.rank == 0:
print("\nBenchmark: {}".format(str(benchmark)))
result = sweep(benchmark)
benchmark_results.append({
"model": benchmark.model,
"batch_size": benchmark.batch_size,
"result": result,
})
# Write file with benchmark results if applicable
if args.rank == 0 and args.json:
report = {
"pytorch_version": torch.__version__,
"cuda_version": torch.version.cuda,
"distributed_backend": args.distributed_backend,
"bucket_size": args.bucket_size,
"benchmark_results": benchmark_results,
}
with open(args.json, 'w') as f:
json.dump(report, f)
if __name__ == '__main__':
main()