blob: 0059560a602ab4d78f4a56fb9c1a53e4b2d0b509 [file] [log] [blame] [edit]
#include <c10/cuda/CUDAGuard.h>
#include <c10/util/irange.h>
#include <ATen/cuda/CUDAContext.h>
#include <gtest/gtest.h>
#include <torch/csrc/distributed/c10d/FileStore.hpp>
#include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp>
#include "CUDATest.hpp"
#include "TestUtils.hpp"
using namespace c10d::test;
using at::cuda::CUDAStream;
template <typename T, typename... Args>
std::vector<T> initialize(const std::string& path, int N, Args&&... args) {
std::vector<T> tests;
for (C10_UNUSED const auto i : c10::irange(N)) {
tests.push_back(std::move(T(path, std::forward<Args>(args)...)));
}
std::vector<std::thread> threads;
for (C10_UNUSED const auto i : c10::irange(N)) {
threads.push_back(std::thread([i, N, &tests] { tests[i].start(i, N); }));
}
for (auto& thread : threads) {
thread.join();
}
return tests;
}
class AsyncTest {
public:
AsyncTest(std::string path) : path_(std::move(path)) {}
AsyncTest(AsyncTest&& other) {
path_ = std::move(other.path_);
pg_ = std::move(other.pg_);
}
::c10d::ProcessGroupGloo& getProcessGroup() {
return *pg_;
}
void start(int rank, int size) {
auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
// Use tiny timeout to make this test run fast
auto options = ::c10d::ProcessGroupGloo::Options::create();
options->timeout = std::chrono::milliseconds(50);
options->devices.push_back(
::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>(
new ::c10d::ProcessGroupGloo(store, rank, size, options));
}
protected:
std::string path_;
std::unique_ptr<::c10d::ProcessGroupGloo> pg_;
};
class AsyncInputIsOutputTest : public AsyncTest {
public:
AsyncInputIsOutputTest(const std::string& path, int numTensors)
: AsyncTest(path),
numTensors_(numTensors),
numDevices_(cudaNumDevices()) {
// Allocate inputs on available devices in a round robin fashion.
::at::globalContext().lazyInitCUDA();
inputs_.resize(numTensors_);
for (const auto i : c10::irange(numTensors_)) {
inputs_[i] = at::empty(
{16, 16},
at::device(
{at::kCUDA, static_cast<c10::DeviceIndex>(i % numDevices_)}));
}
// Allocate a stream per device.
//
// The "current stream" is set globally per device in THC, so we
// can't make two tensors on the same device use different streams
// and pass this along to the collective (since it uses the THC
// getters to retrieve the current stream).
//
at::cuda::OptionalCUDAGuard deviceGuard;
streams_.reserve(numDevices_);
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
streams_.push_back(at::cuda::getStreamFromPool());
}
}
void wait(c10::intrusive_ptr<c10d::Work>& work) {
c10::cuda::CUDAMultiStreamGuard guard(streams_);
work->wait();
}
std::vector<at::Tensor> getCpuTensors(
const std::vector<at::Tensor>& gpu_tensors) {
std::vector<at::Tensor> outputs(gpu_tensors.size());
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
// Copy inputs to outputs
for (unsigned i = 0; i < gpu_tensors.size(); i++) {
outputs[i] = gpu_tensors[i].cpu();
}
return outputs;
}
std::vector<at::Tensor> getTensors() {
return getCpuTensors(inputs_);
}
protected:
const int numTensors_;
const int numDevices_;
std::vector<at::Tensor> inputs_;
std::vector<CUDAStream> streams_;
};
class AsyncAllreduceTest : public AsyncInputIsOutputTest {
public:
AsyncAllreduceTest(const std::string& path, int numTensors)
: AsyncInputIsOutputTest(path, numTensors) {}
c10::intrusive_ptr<c10d::Work> run() {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
// Launch sleep on every stream
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
cudaSleep(streams_[i], 10 * 1000 * 1000);
}
// Launch value initialization for every tensor
for (const auto i : c10::irange(numTensors_)) {
deviceGuard.set_index(i % numDevices_);
inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
}
return pg_->allreduce(inputs_);
}
};
class AsyncBroadcastTest : public AsyncInputIsOutputTest {
public:
AsyncBroadcastTest(const std::string& path, int numTensors)
: AsyncInputIsOutputTest(path, numTensors) {}
c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
// Launch sleep on every stream
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
cudaSleep(streams_[i], 10 * 1000 * 1000);
}
// Launch value initialization for every tensor
for (const auto i : c10::irange(numTensors_)) {
deviceGuard.set_index(i % numDevices_);
inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
}
::c10d::BroadcastOptions options;
options.rootRank = rootRank;
options.rootTensor = rootTensor;
return pg_->broadcast(inputs_, options);
}
};
void runAsyncAllreduceTest(
const std::string& path,
size_t numProcesses = 4,
size_t numTensors = 2) {
auto tests = initialize<AsyncAllreduceTest>(path, numProcesses, numTensors);
std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses);
for (const auto i : c10::irange(numProcesses)) {
work[i] = tests[i].run();
}
// Wait for work to complete
for (const auto i : c10::irange(numProcesses)) {
tests[i].wait(work[i]);
}
// Check results
for (const auto i : c10::irange(numProcesses)) {
const auto size = numProcesses * numTensors;
const auto expected = (size * (size - 1)) / 2;
auto tensors = tests[i].getTensors();
auto results = tests[i].getCpuTensors(work[i]->result());
EXPECT_EQ(tensors.size(), results.size());
for (const auto j : c10::irange(tensors.size())) {
auto& tensor = tensors[j];
auto data = tensor.data_ptr<float>();
auto& result_tensor = results[j];
auto result_data = result_tensor.data_ptr<float>();
EXPECT_EQ(tensor.numel(), result_tensor.numel());
for (const auto k : c10::irange(tensor.numel())) {
EXPECT_EQ(data[k], expected);
EXPECT_EQ(result_data[k], expected);
}
}
}
}
void runAsyncBroadcastTest(
const std::string& path,
size_t numProcesses = 4,
size_t numTensors = 1) {
auto tests = initialize<AsyncBroadcastTest>(path, numProcesses, numTensors);
// Try every permutation of root rank and root tensor
for (const auto rootRank : c10::irange(numProcesses)) {
for (const auto rootTensor : c10::irange(numTensors)) {
std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses);
for (const auto i : c10::irange(numProcesses)) {
work[i] = tests[i].run(rootRank, rootTensor);
}
// Wait for work to complete
for (const auto i : c10::irange(numProcesses)) {
tests[i].wait(work[i]);
}
// Check results
const auto expected = (rootRank * numTensors + rootTensor);
for (const auto i : c10::irange(numProcesses)) {
auto tensors = tests[i].getTensors();
for (const auto& tensor : tensors) {
const auto* const data = tensor.const_data_ptr<float>();
for (const auto k : c10::irange(tensor.numel())) {
EXPECT_EQ(data[k], expected);
}
}
}
}
}
}
#ifdef USE_CUDA
TEST(ProcessGroupGlooAsyncTest, testAsyncAllreduce) {
if (!at::cuda::is_available()) {
LOG(INFO) << "CUDA not available, skipping testAsyncAllreduce";
return;
}
TemporaryFile file;
runAsyncAllreduceTest(file.path);
}
TEST(ProcessGroupGlooAsyncTest, testAsyncBroadcast) {
if (!at::cuda::is_available()) {
LOG(INFO) << "CUDA not available, skipping testAsyncBroadcast";
return;
}
TemporaryFile file;
runAsyncBroadcastTest(file.path);
}
#endif