blob: ebf03e7ae1ddd0d28070d6d4c2c52edb8a3b122d [file] [log] [blame] [edit]
# Owner(s): ["oncall: distributed"]
import re
import sys
import torch
import torch.cuda
import torch.cuda.nccl as nccl
import torch.distributed as c10d
from torch.testing._internal.common_cuda import TEST_CUDA, TEST_MULTIGPU
from torch.testing._internal.common_device_type import (
dtypes,
instantiate_device_type_tests,
)
from torch.testing._internal.common_utils import (
IS_WINDOWS,
load_tests,
NoTest,
run_tests,
skip_but_pass_in_sandcastle_if,
TEST_WITH_ROCM,
TestCase,
)
HIP_VERSION = (
0.0
if torch.version.hip is None
else float(re.search(r"^\d+\.\d+", torch.version.hip)[0])
)
# load_tests from common_utils is used to automatically filter tests for
# sharding on sandcastle. This line silences flake warnings
load_tests = load_tests
nGPUs = torch.cuda.device_count()
if not TEST_CUDA:
print("CUDA not available, skipping tests", file=sys.stderr)
TestCase = NoTest # noqa: F811
datatypes = [torch.float]
if (
TEST_CUDA and c10d.is_nccl_available() and nccl.version() >= (2, 10)
) or TEST_WITH_ROCM:
datatypes.append(torch.bfloat16)
class TestNCCL(TestCase):
@skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows")
def test_unique_id(self, device):
uid = nccl.unique_id()
self.assertIsInstance(uid, bytes)
self.assertGreater(len(uid), 1)
@skip_but_pass_in_sandcastle_if(
TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm"
)
@skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows")
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected")
@dtypes(*datatypes)
def test_broadcast(self, device, dtype):
expected = torch.zeros(128).uniform_().to(dtype=dtype)
tensors = [expected.cuda()]
for device in range(1, torch.cuda.device_count()):
tensors.append(torch.zeros(128, dtype=dtype, device=device))
nccl.broadcast(tensors)
for i in range(torch.cuda.device_count()):
self.assertEqual(tensors[i], expected)
# Test with tuple
tensors = [expected.cuda()]
for device in range(1, torch.cuda.device_count()):
tensors.append(torch.zeros(128, dtype=dtype, device=device))
nccl.broadcast(tuple(tensors))
for i in range(torch.cuda.device_count()):
self.assertEqual(tensors[i], expected)
@skip_but_pass_in_sandcastle_if(
TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm"
)
@skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows")
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected")
@dtypes(*datatypes)
def test_reduce(self, device, dtype):
cpu_tensors = [
torch.zeros(128).uniform_().to(dtype=dtype) for i in range(nGPUs)
]
expected = torch.zeros(128, dtype=dtype)
for t in cpu_tensors:
expected.add_(t)
tensors = [cpu_tensors[i].cuda(i) for i in range(nGPUs)]
nccl.reduce(tensors)
self.assertEqual(tensors[0], expected)
# Test with tuple
tensors = [cpu_tensors[i].cuda(i) for i in range(nGPUs)]
nccl.reduce(tuple(tensors))
self.assertEqual(tensors[0], expected)
@skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows")
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected")
@skip_but_pass_in_sandcastle_if(
TEST_WITH_ROCM and HIP_VERSION < 3.5 and dtype == torch.bfloat16, # noqa: F821
"Skip bfloat16 test for ROCm < 3.5",
)
@dtypes(*datatypes)
def test_all_reduce(self, device, dtype):
cpu_tensors = [
torch.zeros(128).uniform_().to(dtype=dtype) for i in range(nGPUs)
]
expected = torch.zeros(128, dtype=dtype)
for t in cpu_tensors:
expected.add_(t)
tensors = [cpu_tensors[i].cuda(i) for i in range(nGPUs)]
nccl.all_reduce(tensors)
for tensor in tensors:
self.assertEqual(tensor, expected)
# Test with tuple.
tensors = tuple(cpu_tensors[i].cuda(i) for i in range(nGPUs))
nccl.all_reduce(tensors)
for tensor in tensors:
self.assertEqual(tensor, expected)
# Test with set.
tensors = {cpu_tensors[i].cuda(i) for i in range(nGPUs)}
nccl.all_reduce(tensors)
for tensor in tensors:
self.assertEqual(tensor, expected)
@skip_but_pass_in_sandcastle_if(
TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm"
)
@skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows")
def test_collective_errors(self, device):
t = torch.rand(10).cuda(0)
with self.assertRaisesRegex(
TypeError, "Inputs should be a collection of tensors"
):
nccl.all_reduce(t)
with self.assertRaisesRegex(
TypeError, "Inputs should be a collection of tensors"
):
nccl.reduce(t)
with self.assertRaisesRegex(
TypeError, "Inputs should be a collection of tensors"
):
nccl.broadcast(t)
with self.assertRaisesRegex(
TypeError, "Inputs should be a collection of tensors"
):
nccl.all_gather(t, t)
with self.assertRaisesRegex(
TypeError, "Inputs should be a collection of tensors"
):
nccl.reduce_scatter(t, t)
@skip_but_pass_in_sandcastle_if(
TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm"
)
@skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows")
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected")
@dtypes(*datatypes)
def test_all_gather(self, device, dtype):
cpu_inputs = [torch.zeros(128).uniform_().to(dtype=dtype) for i in range(nGPUs)]
expected = torch.cat(cpu_inputs, 0)
inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)]
outputs = [
torch.zeros(128 * nGPUs, device=i, dtype=dtype) for i in range(nGPUs)
]
nccl.all_gather(inputs, outputs)
for tensor in outputs:
self.assertEqual(tensor, expected)
# Test with tuple.
inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)]
outputs = [
torch.zeros(128 * nGPUs, device=i, dtype=dtype) for i in range(nGPUs)
]
nccl.all_gather(tuple(inputs), tuple(outputs))
for tensor in outputs:
self.assertEqual(tensor, expected)
@skip_but_pass_in_sandcastle_if(
TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm"
)
@skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows")
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected")
@dtypes(*datatypes)
def test_reduce_scatter(self, device, dtype):
in_size = 32 * nGPUs
out_size = 32
cpu_inputs = [
torch.zeros(in_size).uniform_().to(dtype=dtype) for i in range(nGPUs)
]
expected = torch.zeros(in_size, dtype=dtype)
for t in cpu_inputs:
expected.add_(t)
expected = expected.view(nGPUs, 32)
inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)]
outputs = [torch.zeros(out_size, device=i, dtype=dtype) for i in range(nGPUs)]
nccl.reduce_scatter(inputs, outputs)
for i in range(nGPUs):
self.assertEqual(outputs[i], expected[i])
# Test with tuple
inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)]
outputs = [torch.zeros(out_size, device=i, dtype=dtype) for i in range(nGPUs)]
nccl.reduce_scatter(tuple(inputs), tuple(outputs))
for i in range(nGPUs):
self.assertEqual(outputs[i], expected[i])
instantiate_device_type_tests(TestNCCL, globals(), only_for="cuda")
if __name__ == "__main__":
run_tests()