blob: 7d5a2683318e8bd0694647cd8a7af1d175d50f64 [file] [log] [blame]
# Owner(s): ["oncall: distributed"]
# This test file contains positive tests for c10d with NCCL backend.
# During the test, it is expected that ProcessGroup will not be aborted, destroyed or incur fatal error.
# Please be mindful of this when adding tests here.
# If you need to add tests for group creation, abort or destroy, please add tests in test_c10d_nccl.py.
# There are two ways to launch tests in this file:
# 1. Run this file directly with `python test_c10d_ops_nccl.py`
# 2. Use multi-process launcher, e.g. `torchrun --standalone --nproc-per-node 2 test_c10d_ops_nccl.py`
import math
import os
import sys
import tempfile
import torch
import torch.distributed as c10d
if not c10d.is_available() or not c10d.is_nccl_available():
print("c10d NCCL not available, skipping tests", file=sys.stderr)
sys.exit(0)
import torch.distributed as dist
from torch.testing._internal.common_cuda import TEST_MULTIGPU
from torch.testing._internal.common_distributed import (
init_multigpu_helper,
MultiProcContinousTest,
requires_nccl,
)
from torch.testing._internal.common_utils import (
skip_but_pass_in_sandcastle_if,
skipIfRocm,
TEST_WITH_DEV_DBG_ASAN,
)
if TEST_WITH_DEV_DBG_ASAN:
print(
"Skip ASAN as torch + multiprocessing spawn have known issues", file=sys.stderr
)
sys.exit(0)
class ProcessGroupNCCLOpTest(MultiProcContinousTest):
@classmethod
def backend_str(cls) -> str:
return "nccl"
@classmethod
def opts(cls, high_priority_stream=False):
opts = c10d.ProcessGroupNCCL.Options()
opts.is_high_priority_stream = high_priority_stream
return opts
@property
def rank_to_GPU(self):
# return rank to GPU map
return init_multigpu_helper(self.world_size, "nccl")
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_empty_tensors(self):
pg = self.pg
local_device_idx = self.rank_to_GPU[self.rank][0]
xs = [torch.FloatTensor([]).cuda(local_device_idx)]
pg.broadcast(xs).wait()
self.assertEqual(0, xs[0].numel())
pg.allreduce(xs).wait()
self.assertEqual(0, xs[0].numel())
pg.reduce(xs).wait()
self.assertEqual(0, xs[0].numel())
ys = [
[
torch.FloatTensor([]).cuda(local_device_idx)
for _ in range(self.world_size)
]
]
pg.allgather(ys, xs).wait()
for y in ys[0]:
self.assertEqual(0, y.numel())
ys = [torch.FloatTensor([]).cuda(local_device_idx)]
xs = [
[
torch.FloatTensor([]).cuda(local_device_idx)
for _ in range(self.world_size)
]
]
pg.reduce_scatter(ys, xs).wait()
self.assertEqual(0, ys[0].numel())
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_broadcast_ops(self):
pg = self.pg
def broadcast(xs, rootRank, rootTensor):
opts = c10d.BroadcastOptions()
opts.rootRank = rootRank
opts.rootTensor = rootTensor
work = pg.broadcast(xs, opts)
work.wait()
return xs
# Every rank is root once
for i in range(self.world_size):
# Run with 1 input tensor
x = torch.tensor([self.rank]).cuda(self.rank_to_GPU[self.rank][0])
output = broadcast([x], i, 0)
self.assertEqual(torch.tensor([i]), output[0])
expected_tensor = torch.empty([i + 1, i + 1]).fill_(i + 1)
xs = [
torch.empty([i + 1, i + 1]).fill_(-1).cuda(device=device_idx)
for device_idx in self.rank_to_GPU[self.rank]
]
# test with multiple input tensors (multiple gpu in one rank)
for j in range(len(xs)):
if self.rank == i:
xs[j] = expected_tensor.cuda(device=self.rank_to_GPU[self.rank][j])
broadcast(xs, i, j)
for tensor in xs:
self.assertEqual(tensor, expected_tensor)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_sparse_allreduce_ops(self):
pg = self.pg
indices = torch.tensor([[0, 1]])
values = torch.tensor([[1, 2, 0], [4, 0, 6]])
sparse_tensor = torch.sparse_coo_tensor(indices, values, size=(2, 3)).to(
self.rank
)
# sparse allreduce call is wrapped in a try catch since the c10d API is only available in the nccl experimental branch
try:
tensor_list = [sparse_tensor]
work = pg.allreduce(tensor_list)
work.wait()
# tensor_list is a list of size 1, with the allreduce output as a dense tensor
a = torch.tensor([[2, 4, 0], [8, 0, 12]]).to(self.rank)
self.assertEqual(tensor_list[0], a)
except RuntimeError as e:
if "NCCL does not support all_reduce with sparse tensors" in str(e):
pass
else:
# Rethrow the exception if it's a different error
raise
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_allreduce_ops(self):
device_count = torch.cuda.device_count()
pg = self.pg
local_device_id = self.rank_to_GPU[self.rank][0]
def allreduce(tensors, op):
opts = c10d.AllreduceOptions()
opts.reduceOp = op
work = pg.allreduce(tensors, opts)
work.wait()
# Sum
tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)]
allreduce(tensors, c10d.ReduceOp.SUM)
ndev = self.world_size
self.assertEqual(
torch.tensor([ndev * (ndev + 1) // 2]),
tensors[0],
)
# Avg (only available for NCCL 2.10+)
if torch.cuda.nccl.version() >= (2, 10, 0):
tensors = [torch.tensor([self.rank + 1.0]).cuda(local_device_id)]
allreduce(tensors, c10d.ReduceOp.AVG)
ndev = self.world_size
self.assertEqual(
torch.tensor([ndev * (ndev + 1.0) / (2.0 * ndev)]),
tensors[0],
)
# Premul Sum
if torch.cuda.nccl.version() >= (2, 11, 1):
for dtype in torch.half, torch.float, torch.double:
for factor in (
3.0,
torch.tensor([5.0], device=local_device_id, dtype=dtype),
):
tensors = [
torch.tensor([self.rank + 1])
.cuda(local_device_id)
.to(dtype=dtype)
]
allreduce(tensors, c10d._make_nccl_premul_sum(factor))
self.assertEqual(
factor
* torch.tensor(
[self.world_size * (self.world_size + 1) / 2],
dtype=dtype,
device=local_device_id,
),
tensors[0],
)
# Product
tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)]
allreduce(tensors, c10d.ReduceOp.PRODUCT)
self.assertEqual(torch.tensor([math.factorial(self.world_size)]), tensors[0])
# Min
tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)]
allreduce(tensors, c10d.ReduceOp.MIN)
self.assertEqual(torch.tensor([1]), tensors[0])
# Max
tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)]
allreduce(tensors, c10d.ReduceOp.MAX)
self.assertEqual(torch.tensor([self.world_size]), tensors[0])
for op, err in zip(
(c10d.ReduceOp.BAND, c10d.ReduceOp.BOR, c10d.ReduceOp.BXOR),
("ReduceOp.BAND", "ReduceOp.BOR", "ReduceOp.BXOR"),
):
with self.assertRaisesRegex(ValueError, "Cannot use " + err + " with NCCL"):
allreduce(tensors, op)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_alltoall_ops_with_cudafree_race(self):
pg = self.pg
opts = c10d.AllToAllOptions()
local_device = f"cuda:{self.rank_to_GPU[self.rank][0]}"
torch.cuda.set_device(local_device)
input = torch.rand(1000, 1000, device=local_device)
output = torch.rand(1000, 1000, device=local_device)
race_tensors = []
# create some tensors to race with alltoall collective
for _ in range(10):
tmp = []
for i in range(5):
tmp.append(torch.rand(10 ** (3 + i), device=local_device))
race_tensors.append(tmp)
for i in range(10):
race_tensors.pop()
work = pg.alltoall_base(output, input, [], [], opts)
# this triggers cudaFree
torch.cuda.empty_cache()
work.wait()
torch.cuda.synchronize(device=local_device)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_allreduce_in_cudagraph(self):
pg = self.pg
local_device_idx = self.rank_to_GPU[self.rank][0]
with torch.cuda.device(local_device_idx):
xs = [torch.FloatTensor([1]).cuda(local_device_idx)]
# single warmup
pg.allreduce(xs).wait()
self.assertEqual(xs[0].item(), 2)
graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(graph):
pg.allreduce(xs).wait()
self.assertEqual(xs[0].item(), 2)
graph.replay()
graph.replay()
self.assertEqual(xs[0].item(), 8)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@skipIfRocm()
def test_nccl_watchdog_cudagraph(self):
# test that the watchdog does not crash graphs with disallowed event query
pg = self.pg
rank = self.rank_to_GPU[self.rank][0]
with torch.cuda.device(rank):
for i in range(10):
xs = [torch.FloatTensor([1]).cuda(rank)]
ys = [torch.FloatTensor([4]).cuda(rank)]
for _ in range(30):
pg.allreduce(xs[0]).wait()
graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(graph):
xs[0] += 0.0
pg.allreduce(xs[0]).wait()
pg.allreduce(xs[0]).wait()
pg.allreduce(xs[0]).wait()
xs[0] += 0.0
for _ in range(100):
graph.replay()
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_reduce_ops(self):
pg = self.pg
local_device_id = self.rank_to_GPU[self.rank][0]
def reduce(xs, rootRank, rootTensor, op=None):
opts = c10d.ReduceOptions()
opts.rootRank = rootRank
opts.rootTensor = rootTensor
if op:
opts.reduceOp = op
work = pg.reduce(xs, opts)
work.wait()
# for every root tensor
for rt in range(self.world_size):
tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)]
reduce(tensors, rt, 0)
if self.rank == rt:
self.assertEqual(
torch.tensor([self.world_size * (self.world_size + 1) // 2]),
tensors[0],
)
else:
self.assertEqual(
torch.tensor([self.rank + 1]),
tensors[0],
)
for op, err in zip(
(c10d.ReduceOp.BAND, c10d.ReduceOp.BOR, c10d.ReduceOp.BXOR),
("ReduceOp.BAND", "ReduceOp.BOR", "ReduceOp.BXOR"),
):
with self.assertRaisesRegex(
ValueError, "Cannot use " + err + " with NCCL"
):
reduce(tensors, self.rank, rt, op)
# Premul sum
if torch.cuda.nccl.version() >= (2, 11, 1):
for factor in (3.0, torch.tensor([5.0], device=local_device_id)):
if isinstance(factor, torch.Tensor):
factor_ref = factor.cpu().item()
else:
factor_ref = factor
float_tensors = [
torch.tensor(
[self.rank + 1.0], device=f"cuda:{local_device_id}"
)
]
float_tensors_ref = [
torch.tensor(
[(self.rank + 1.0) * factor_ref],
device=f"cuda:{local_device_id}",
)
]
reduce(float_tensors_ref, rt, 0)
reduce(float_tensors, rt, 0, c10d._make_nccl_premul_sum(factor))
if self.rank == rt:
self.assertEqual(float_tensors_ref[0], float_tensors[0])
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_allgather_ops(self):
pg = self.pg
local_device_ids = self.rank_to_GPU[self.rank]
def allgather(output_ts, input_ts):
work = pg.allgather(output_ts, input_ts)
return work.wait()
tensors = [torch.empty(2, 2).fill_(2).cuda(device=i) for i in local_device_ids]
output_tensors = []
expected_output = []
output_per_gpu = (
[torch.empty(2, 2).fill_(-1)] * len(local_device_ids) * self.world_size
)
expected_per_gpu = (
[torch.empty(2, 2).fill_(2)] * len(local_device_ids) * self.world_size
)
for gpu in local_device_ids:
output_tensors.append([t.cuda(device=gpu) for t in output_per_gpu])
expected_output.append([t.cuda(device=gpu) for t in expected_per_gpu])
result = allgather(output_tensors, tensors)
# Verification
self.assertEqual(output_tensors, expected_output)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_allgather_base_ops(self):
pg = self.pg
local_device_id = self.rank_to_GPU[self.rank][0]
def allgather_base(output_t, input_t):
work = pg._allgather_base(output_t, input_t)
work.wait()
# allgather_base is GPU number agnostic.
# Each rank contribute one tensor regardless of GPU counts
tensor = torch.tensor([self.rank]).cuda(local_device_id)
output_t = torch.empty((self.world_size), dtype=tensor.dtype).cuda(
local_device_id
)
allgather_base(output_t, tensor)
# Verification
self.assertEqual(torch.arange(self.world_size), output_t)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_allgather_base_basics(self):
pg = self.pg
local_device_id = self.rank_to_GPU[self.rank][0]
def allgather_base(output_t, input_t):
work = pg._allgather_base(output_t, input_t)
work.wait()
# anticipate an error
with self.assertRaisesRegex(
ValueError,
"output tensor size must be equal to world_size times input tensor size",
):
tensor = torch.tensor([self.rank]).cuda(local_device_id)
output_t = torch.empty((self.world_size + 1), dtype=tensor.dtype).cuda(
local_device_id
)
# fails the check because output_t is not correctly sized
allgather_base(output_t, tensor)
# anticipate an error
with self.assertRaisesRegex(
TypeError, "output tensor must have the same type as input tensor"
):
tensor = torch.tensor([self.rank], dtype=torch.float).cuda(local_device_id)
output_t = torch.empty((self.world_size + 1), dtype=torch.long).cuda(
local_device_id
)
# fails the check because the dtype is different
allgather_base(output_t, tensor)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_gather_ops(self):
pg = self.pg
local_device_ids = self.rank_to_GPU[self.rank]
num_gpus = len(local_device_ids)
def gather(output_t, input_t, rootRank):
opts = c10d.GatherOptions()
opts.rootRank = rootRank
if rootRank == self.rank:
work = pg.gather(output_t, input_t, opts)
else:
work = pg.gather([], input_t, opts)
work.wait()
# init input
tensors = []
for device_id in local_device_ids:
tensors.append(torch.tensor([self.rank]).cuda(device_id))
# init output
output_ts = []
for idx in range(num_gpus):
gpu_idx = local_device_ids[idx]
output_ts.append([])
for rank in range(self.world_size):
output_ts[idx].append(torch.tensor([-1]).cuda(gpu_idx))
expected = [[torch.tensor([rank]) for rank in range(self.world_size)]]
for rank in range(self.world_size):
gather(output_ts, tensors, rank)
if rank == self.rank:
self.assertEqual(expected, output_ts)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_gather_stress(self):
pg = self.pg
local_device_ids = self.rank_to_GPU[self.rank]
num_gpus = len(local_device_ids)
def gather(output_t, input_t, rootRank):
opts = c10d.GatherOptions()
opts.rootRank = rootRank
if rootRank == self.rank:
work = pg.gather(output_t, input_t, opts)
else:
work = pg.gather([], input_t, opts)
work.wait()
stress_length = 1000
# init input
tensors = []
for i in range(stress_length):
tensors.append([])
for device_id in local_device_ids:
tensors[i].append(torch.tensor([self.rank]).cuda(device_id))
# init output
output_ts = []
for i in range(stress_length):
output_ts.append([[] for _ in range(num_gpus)])
for idx, ls in enumerate(output_ts[i]):
gpu_idx = local_device_ids[idx]
for _ in range(self.world_size):
ls.append(torch.tensor([-1]).cuda(gpu_idx))
expected = [[torch.tensor([rank]) for rank in range(self.world_size)]]
for i in range(stress_length):
for rank in range(self.world_size):
gather(output_ts[i], tensors[i], rank)
# Verification
if rank == self.rank:
self.assertEqual(output_ts[i], expected)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_gather_checks(self):
pg = self.pg
device_id = self.rank_to_GPU[self.rank][0]
# init input
tensor = torch.tensor([self.rank]).cuda(device_id)
# init output
output_ts = []
for rank in range(self.world_size):
output_ts.append(torch.tensor([-1]).cuda(device_id))
with self.assertRaisesRegex(ValueError, "invalid root rank"):
opts = c10d.GatherOptions()
opts.rootRank = -1
pg.gather([output_ts], [tensor], opts)
with self.assertRaisesRegex(TypeError, "incompatible function arguments"):
pg.gather([output_ts], [tensor], 0)
with self.assertRaisesRegex(ValueError, "invalid root rank"):
opts = c10d.GatherOptions()
opts.rootRank = self.world_size
pg.gather([output_ts], [tensor], opts)
with self.assertRaisesRegex(
# throws error message from dispatcher
RuntimeError,
"There were no tensor arguments to this function",
):
opts = c10d.GatherOptions()
opts.rootRank = 0
pg.gather([output_ts], [], opts)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_scatter_ops(self):
pg = self.pg
local_device_ids = self.rank_to_GPU[self.rank]
num_gpus = len(local_device_ids)
def scatter(output_t, input_t, rootRank):
opts = c10d.ScatterOptions()
opts.rootRank = rootRank
if rootRank == self.rank:
work = pg.scatter(output_t, input_t, opts)
else:
work = pg.scatter(output_t, [], opts)
work.wait()
# init output
tensors = []
for device_id in local_device_ids:
tensors.append(torch.tensor([-1]).cuda(device_id))
# init input
scatter_list = []
for idx in range(num_gpus):
gpu_idx = local_device_ids[idx]
scatter_list.append([])
for rank in range(self.world_size):
scatter_list[idx].append(torch.tensor([rank]).cuda(gpu_idx))
# test each rank to scatter
expected = [torch.tensor([self.rank])]
for rank in range(self.world_size):
scatter(tensors, scatter_list, rank)
self.assertEqual(expected, tensors)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_scatter_stress(self):
pg = self.pg
local_device_ids = self.rank_to_GPU[self.rank]
num_gpus = len(local_device_ids)
def scatter(output_t, input_t, rootRank):
opts = c10d.ScatterOptions()
opts.rootRank = rootRank
if rootRank == self.rank:
work = pg.scatter(output_t, input_t, opts)
else:
work = pg.scatter(output_t, [], opts)
work.wait()
stress_length = 1000
# init output
tensors = []
for i in range(stress_length):
tensors.append([])
for device_id in local_device_ids:
tensors[i].append(torch.tensor([-1]).cuda(device_id))
# init input
scatter_list = []
for i in range(stress_length):
scatter_list.append([[] for _ in range(num_gpus)])
for idx, ls in enumerate(scatter_list[i]):
gpu_idx = local_device_ids[idx]
for rank in range(self.world_size):
ls.append(torch.tensor([rank]).cuda(gpu_idx))
# test each rank to scatter
expected = [torch.tensor([self.rank])]
for i in range(stress_length):
for rank in range(self.world_size):
scatter(tensors[i], scatter_list[i], rank)
# Verification
self.assertEqual(tensors[i], expected)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_scatter_checks(self):
pg = self.pg
local_device_ids = self.rank_to_GPU[self.rank]
num_gpus = len(local_device_ids)
# init output
tensors = []
for device_id in local_device_ids:
tensors.append(torch.tensor([-1]).cuda(device_id))
# init input
scatter_list = []
for idx in range(num_gpus):
gpu_idx = local_device_ids[idx]
scatter_list.append([])
for rank in range(self.world_size):
scatter_list[idx].append(torch.tensor([rank]).cuda(gpu_idx))
with self.assertRaisesRegex(ValueError, "invalid root rank"):
opts = c10d.ScatterOptions()
opts.rootRank = -1
pg.scatter(tensors, scatter_list, opts)
with self.assertRaisesRegex(TypeError, "incompatible function arguments"):
pg.scatter(tensors, scatter_list, 0)
with self.assertRaisesRegex(ValueError, "invalid root rank"):
opts = c10d.ScatterOptions()
opts.rootRank = self.world_size
pg.scatter(tensors, scatter_list, opts)
with self.assertRaisesRegex(
# throws error message from dispatcher
RuntimeError,
"There were no tensor arguments to this function",
):
opts = c10d.ScatterOptions()
opts.rootRank = 0
pg.scatter([], scatter_list, opts)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_reduce_scatter_base_basics(self):
pg = self.pg
local_device_id = self.rank_to_GPU[self.rank][0]
def reduce_scatter_base(output_t, input_t):
work = pg._reduce_scatter_base(output_t, input_t)
work.wait()
# anticipate an error
with self.assertRaisesRegex(
ValueError,
"input tensor must be the same size as output size times world size",
):
input_t = torch.tensor([self.rank]).cuda(local_device_id)
output_t = torch.empty((self.world_size + 1), dtype=input_t.dtype).cuda(
local_device_id
)
# fails the check because output_t is not correctly sized
reduce_scatter_base(output_t, input_t)
# anticipate an error
with self.assertRaisesRegex(
TypeError, "input tensor must be the same type as the output tensor."
):
tensor = torch.tensor([self.rank], dtype=torch.float).cuda(local_device_id)
output_t = torch.empty((self.world_size + 1), dtype=torch.long).cuda(
local_device_id
)
# fails the check because the dtype is different
reduce_scatter_base(output_t, tensor)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_reduce_scatter_ops(self):
pg = self.pg
local_device_ids = self.rank_to_GPU[self.rank]
num_gpus = len(local_device_ids)
def reduce_scatter(outputs, input_lists, op):
opts = c10d.ReduceScatterOptions()
opts.reduceOp = op
work = pg.reduce_scatter(outputs, input_lists, opts)
work.wait()
output = [torch.tensor([0]).cuda(i) for i in local_device_ids]
# GPU/rank
# 0 [1], [2], [3], [4]
# 1 [2], [3], [4], [5]
# 2 [3], [4], [5], [6]
# 3 [4], [5], [6], [7]
# Sum
tensor_lists = []
input_per_gpu = []
for i in range(self.world_size):
input_per_gpu.append(torch.tensor([self.rank + i + 1]))
for gpu in local_device_ids:
tensor_lists.append([t.cuda(device=gpu) for t in input_per_gpu])
reduce_scatter(output, tensor_lists, c10d.ReduceOp.SUM)
for i in range(num_gpus):
expected = torch.tensor(
[
(1 + self.world_size) * self.world_size // 2
+ self.world_size * self.rank
]
)
self.assertEqual(expected, output[i])
# Min
reduce_scatter(output, tensor_lists, c10d.ReduceOp.MIN)
for i in range(num_gpus):
expected = torch.tensor([self.rank + 1 + i])
self.assertEqual(expected, output[i])
# Max
reduce_scatter(output, tensor_lists, c10d.ReduceOp.MAX)
for i in range(num_gpus):
expected = torch.tensor([self.rank + self.world_size + i])
self.assertEqual(expected, output[i])
# Product
reduce_scatter(output, tensor_lists, c10d.ReduceOp.PRODUCT)
# math package don't have math.perm until python 3.8, so
# we implement a naive version here.
def perm(n, k):
prod_val = n
for val in range(n - k + 1, n):
prod_val *= val
return prod_val
for i in range(num_gpus):
prod_val = perm(self.rank + self.world_size, self.world_size)
expected = torch.tensor([prod_val])
self.assertEqual(expected, output[i])
# Test the input params overridden scenarios, aka, when the input is
# a list and output is just one tensor.
# Sum
output_tensor = torch.empty_like(input_per_gpu[0][0]).cuda(self.rank)
input_list = [tensor[0].cuda(self.rank) for tensor in input_per_gpu]
pg.reduce_scatter(output_tensor, input_list, c10d.ReduceOp.SUM).wait()
expected = torch.tensor(
(1 + self.world_size) * self.world_size // 2 + self.world_size * self.rank
)
self.assertEqual(expected, output_tensor)
# Min
pg.reduce_scatter(output_tensor, input_list, c10d.ReduceOp.MIN).wait()
expected = torch.tensor(self.rank + 1)
self.assertEqual(expected, output_tensor)
# Max
pg.reduce_scatter(output_tensor, input_list, c10d.ReduceOp.MAX).wait()
expected = torch.tensor(self.rank + self.world_size)
self.assertEqual(expected, output_tensor)
# Product
pg.reduce_scatter(output_tensor, input_list, c10d.ReduceOp.PRODUCT).wait()
prod_val = self.rank + 1
for k in range(1, self.world_size):
prod_val = prod_val * (self.rank + 1 + k)
expected = torch.tensor(prod_val)
self.assertEqual(expected, output_tensor)
if torch.cuda.nccl.version() >= (2, 11, 1):
for factor in (3.0, torch.tensor([5.0], device=self.rank)):
if isinstance(factor, torch.Tensor):
factor_ref = factor.cpu().item()
else:
factor_ref = factor
output = [t.float() for t in output]
tensor_lists = [[t.float() for t in tl] for tl in tensor_lists]
output_ref = [t.float() for t in output]
tensor_lists_ref = [
[t.float() * factor_ref for t in tl] for tl in tensor_lists
]
reduce_scatter(output, tensor_lists, c10d._make_nccl_premul_sum(factor))
reduce_scatter(output_ref, tensor_lists_ref, c10d.ReduceOp.SUM)
self.assertEqual(output_ref, output)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_reduce_scatter_base_ops(self):
pg = self.pg
local_device_id = self.rank_to_GPU[self.rank][0]
def reduce_scatter_base(output_t, input_t):
work = pg._reduce_scatter_base(output_t, input_t)
work.wait()
# reduce_scatter_base is GPU number agnostic.
# Each rank contribute one tensor regardless of GPU counts
output_t = torch.empty([1]).cuda(local_device_id)
tensor = torch.arange(self.world_size, dtype=output_t.dtype).cuda(
local_device_id
)
reduce_scatter_base(output_t, tensor)
# Verification
self.assertEqual(output_t[0], self.rank * self.world_size)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_barrier(self):
pg = self.pg
local_device_ids = self.rank_to_GPU[self.rank]
def allreduce(tensors):
opts = c10d.AllreduceOptions()
work = pg.allreduce(tensors, opts)
return work
# Making the collective to operate on
# 1, 2, 3, 4, .... len(local_device_ids) GPUs
tensors_list = [[] for _ in range(len(local_device_ids))]
for i in range(1, len(local_device_ids) + 1):
for j in range(i):
tensors_list[i - 1].append(
torch.tensor([j + 1]).cuda(local_device_ids[j])
)
works = []
for tensors in tensors_list:
work = allreduce(tensors)
works.append(work)
# Barrier will ensure that all previous work is completed
pg.barrier().wait()
for i in range(1, len(local_device_ids) + 1):
for j in range(i):
self.assertEqual(
torch.tensor([(j + 1) * self.world_size]), tensors_list[i - 1][j]
)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_send_recv(self):
pg = self.pg
device = self.rank_to_GPU[self.rank][0]
# Generate the same random tensor
torch.manual_seed(0)
send_tensor = torch.rand(10, 10, device=device)
if self.rank == 0:
dist.send(send_tensor, 1)
if self.rank == 1:
recv_tensor = torch.rand(10, 10, device=device)
dist.recv(recv_tensor, 0)
self.assertEqual(send_tensor, recv_tensor)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_send_recv_complex(self):
pg = self.pg
device = self.rank_to_GPU[self.rank][0]
# Generate the same random tensor
torch.manual_seed(0)
send_tensor = torch.rand(10, 10, dtype=torch.cfloat, device=device)
if self.rank == 0:
dist.send(send_tensor, 1)
if self.rank == 1:
recv_tensor = torch.rand(10, 10, dtype=torch.cfloat, device=device)
dist.recv(recv_tensor, 0)
self.assertEqual(send_tensor, recv_tensor)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_send_recv_object_list(self):
device = self.rank_to_GPU[self.rank][0]
val = 99 if self.rank == 0 else None
object_list = [val] * self.world_size
if self.rank == 0:
dist.send_object_list(object_list, 1, device=device)
if self.rank == 1:
dist.recv_object_list(object_list, 0, device=device)
self.assertEqual(object_list[0], 99)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_tensor_register_hook(self):
os.environ["TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"] = "1"
pg = self.pg
local_device_id = self.rank_to_GPU[self.rank][0]
def allgather_base(output_t, input_t):
work = pg._allgather_base(output_t, input_t)
work.wait()
# allgather_base is GPU number agnostic.
# Each rank contribute one tensor regardless of GPU counts
tensor = torch.tensor([self.rank]).cuda(local_device_id)
output_t = torch.empty((self.world_size), dtype=tensor.dtype).cuda(
local_device_id
)
allgather_base(output_t, tensor)
# Verification
self.assertEqual(torch.arange(self.world_size), output_t)
# Unset env
del os.environ["TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"]
if __name__ == "__main__":
rank = int(os.getenv("RANK", -1))
world_size = int(os.getenv("WORLD_SIZE", 2))
if rank != -1:
# Launched with torchrun or other multi-proc launchers. Directly run the test.
ProcessGroupNCCLOpTest.run_rank(rank, world_size)
else:
# Launched as a single process. Spawn subprocess to run the tests.
# Also need a rendezvous file for `init_process_group` purpose.
rdvz_file = tempfile.NamedTemporaryFile(delete=False).name
torch.multiprocessing.spawn(
ProcessGroupNCCLOpTest.run_rank,
nprocs=world_size,
args=(world_size, rdvz_file),
)