blob: d9fcacc83d2fead19e38c82e9bd64c94d126d0df [file] [log] [blame] [edit]
#include <unistd.h>
#include <c10/util/irange.h>
#include <torch/csrc/distributed/c10d/ProcessGroupMPI.hpp>
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#define STR_HELPER(x) #x
#define STR(x) STR_HELPER(x)
// Wait for work to complete
std::vector<std::vector<at::Tensor>> waitWork(
c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
std::vector<c10::intrusive_ptr<c10d::Work>> works) {
std::vector<std::vector<at::Tensor>> outputTensors;
for (auto& work : works) {
try {
work->wait();
} catch (const std::exception& ex) {
std::cerr << "Exception received: " << ex.what() << std::endl;
pg->abort();
}
outputTensors.emplace_back(work->result());
}
return outputTensors;
}
// Wait using Futures
std::vector<std::vector<at::Tensor>> waitFuture(
c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
std::vector<c10::intrusive_ptr<c10d::Work>> works) {
std::vector<std::vector<at::Tensor>> outputTensors;
for (auto& work : works) {
auto fut = work->getFuture();
try {
fut->wait();
} catch (const std::exception& ex) {
std::cerr << "Exception received: " << ex.what() << std::endl;
pg->abort();
}
auto result = fut->value();
if (result.isNone()) {
outputTensors.emplace_back();
} else if (result.isTensorList()) {
outputTensors.emplace_back(result.toTensorVector());
} else {
TORCH_CHECK(false, "future result should be tensor list or none");
}
}
return outputTensors;
}
void testAllreduce(int iter = 1000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
// Generate inputs
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
for (const auto i : c10::irange(iter)) {
auto tensor = at::ones({16, 16}) * i;
std::vector<at::Tensor> tensors = {tensor};
// Queue the work.
c10::intrusive_ptr<::c10d::Work> work = pg->allreduce(tensors);
works.push_back(std::move(work));
}
auto outputTensors = waitFuture(pg, works);
// Get the world size
auto worldSize = pg->getSize();
// Verify outputs
for (const auto i : c10::irange(iter)) {
const auto expected = worldSize * i;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
TORCH_CHECK(false, "BOOM!");
}
}
}
}
void testBroadcast(int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
for (const auto i : c10::irange(iter)) {
auto tensors = std::vector<at::Tensor>();
if (pg->getRank() == 0) {
auto tensor = at::ones({16, 16}) * i;
tensors = std::vector<at::Tensor>({tensor});
} else {
auto tensor = at::zeros({16, 16});
tensors = std::vector<at::Tensor>({tensor});
}
// Queue the work.
c10::intrusive_ptr<::c10d::Work> work = pg->broadcast(tensors);
works.push_back(std::move(work));
}
auto outputTensors = waitFuture(pg, works);
// Verify outputs
for (const auto i : c10::irange(iter)) {
const auto expected = i;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
TORCH_CHECK(false, "BOOM!");
}
}
}
}
void testReduce(int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
for (const auto i : c10::irange(iter)) {
auto tensor = at::ones({16, 16}) * i;
auto tensors = std::vector<at::Tensor>({tensor});
// Queue the work.
c10::intrusive_ptr<::c10d::Work> work = pg->reduce(tensors);
works.push_back(std::move(work));
}
auto outputTensors = waitFuture(pg, works);
// Get the world size
auto worldSize = pg->getSize();
if (pg->getRank() == 0) {
// Verify outputs
for (const auto i : c10::irange(iter)) {
const auto expected = worldSize * i;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
TORCH_CHECK(false, "BOOM!");
}
}
}
}
}
void testAllgather(int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
// Get the world size
auto worldSize = pg->getSize();
auto rank = pg->getRank();
// Generate inputs
for (const auto i : c10::irange(iter)) {
auto tensor = at::ones({16, 16}) * i * rank;
auto tensors = std::vector<at::Tensor>({tensor});
auto outputs = std::vector<std::vector<at::Tensor>>(1);
outputs[0].resize(worldSize);
for (const auto j : c10::irange(worldSize)) {
outputs[0][j] = at::zeros({16, 16});
}
// Queue the work.
c10::intrusive_ptr<::c10d::Work> work = pg->allgather(outputs, tensors);
works.push_back(std::move(work));
}
auto outputTensors = waitFuture(pg, works);
// Verify outputs
for (const auto i : c10::irange(iter)) {
for (const auto j : c10::irange(worldSize)) {
const auto expected = i * j;
auto data = outputTensors[i][j].data_ptr<float>();
for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
if (data[k] != expected) {
TORCH_CHECK(false, "BOOM!");
}
}
}
}
}
void testGather(int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
// Get the world size
auto worldSize = pg->getSize();
auto rank = pg->getRank();
// Generate inputs
for (const auto i : c10::irange(iter)) {
auto tensor = at::ones({16, 16}) * i * rank;
auto tensors = std::vector<at::Tensor>({tensor});
auto outputs = std::vector<std::vector<at::Tensor>>(0);
if (rank == 0) {
outputs = std::vector<std::vector<at::Tensor>>(1);
outputs[0].resize(worldSize);
for (const auto j : c10::irange(worldSize)) {
outputs[0][j] = at::zeros({16, 16});
}
}
// Queue the work.
c10::intrusive_ptr<::c10d::Work> work = pg->gather(outputs, tensors);
works.push_back(std::move(work));
}
auto outputTensors = waitFuture(pg, works);
// Verify outputs
if (rank == 0) {
for (const auto i : c10::irange(iter)) {
for (const auto j : c10::irange(worldSize)) {
const auto expected = i * j;
auto data = outputTensors[i][j].data_ptr<float>();
for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
if (data[k] != expected) {
TORCH_CHECK(false, "BOOM!");
}
}
}
}
} else {
for (const auto i : c10::irange(iter)) {
if (outputTensors[i].size() != 0) {
TORCH_CHECK(false, "BOOM!");
}
}
}
}
void testScatter(int iter = 1) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
// Get the world size
auto worldSize = pg->getSize();
auto rank = pg->getRank();
// Generate inputs
for (const auto i : c10::irange(iter)) {
auto tensor = at::zeros({16, 16});
auto tensors = std::vector<at::Tensor>({tensor});
auto inputs = std::vector<std::vector<at::Tensor>>(0);
if (rank == 0) {
inputs = std::vector<std::vector<at::Tensor>>(1);
inputs[0].resize(worldSize);
for (const auto j : c10::irange(worldSize)) {
inputs[0][j] = at::ones({16, 16}) * i * j;
}
}
// Queue the work.
c10::intrusive_ptr<::c10d::Work> work = pg->scatter(tensors, inputs);
works.push_back(std::move(work));
}
auto outputTensors = waitFuture(pg, works);
// Verify outputs
for (const auto i : c10::irange(iter)) {
for (const auto j : c10::irange(worldSize)) {
const auto expected = i * j;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto k = 0; k < outputTensors[i][0].numel(); ++k) {
if (data[k] != expected) {
TORCH_CHECK(false, "BOOM!");
}
}
}
}
}
void testSendRecv(bool recvAnysource, int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
// Generate inputs
std::vector<c10::intrusive_ptr<::c10d::Work>> works;
// pg->send does not keep sent tensors alive, so we need to.
std::vector<std::vector<at::Tensor>> sendTensors(iter);
auto rank = pg->getRank();
for (const auto i : c10::irange(iter)) {
if (rank == 0) {
auto tensor = at::ones({16, 16}) * i;
sendTensors[i] = std::vector<at::Tensor>({tensor});
// Queue the work.
c10::intrusive_ptr<::c10d::Work> work = pg->send(sendTensors[i], 1, 0);
works.push_back(std::move(work));
} else {
auto tensor = at::zeros({16, 16});
auto recvTensors = std::vector<at::Tensor>({tensor});
// Queue the work.
if (!recvAnysource) {
c10::intrusive_ptr<::c10d::Work> work = pg->recv(recvTensors, 0, 0);
works.push_back(std::move(work));
} else {
c10::intrusive_ptr<::c10d::Work> work =
pg->recvAnysource(recvTensors, 0);
works.push_back(std::move(work));
}
}
}
auto outputTensors = waitWork(pg, works);
if (rank == 0) {
return;
}
std::vector<int> srcRanks;
if (recvAnysource) {
for (const auto& work : works) {
srcRanks.push_back(work->sourceRank());
}
}
// Verify outputs
for (const auto i : c10::irange(iter)) {
if (recvAnysource && srcRanks[i] != 0) {
TORCH_CHECK(false, "src rank is wrong for recvAnysource");
}
const auto expected = i;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
TORCH_CHECK(false, "BOOM!");
}
}
}
}
void testBackendName() {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
if (pg->getBackendName() != std::string(c10d::MPI_BACKEND_NAME)) {
TORCH_CHECK(false, "BOOM!");
}
}
int main(int argc, char** argv) {
#ifdef MPIEXEC
// If we are within an openmpi mpirun, then skip the exec
if (!std::getenv("OMPI_COMM_WORLD_SIZE")) {
std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << std::endl;
execl(STR(MPIEXEC), "-np 2", argv[0], (char*)nullptr);
}
testAllreduce();
testBroadcast();
testReduce();
testAllgather();
testGather();
testScatter();
testSendRecv(false);
testSendRecv(true);
testBackendName();
std::cout << "Test successful" << std::endl;
#else
std::cout << "MPI executable not found, skipping test" << std::endl;
#endif
return EXIT_SUCCESS;
}