blob: 0e147bc667c858d366ade78236f9b06ac514b302 [file] [log] [blame]
#include <gtest/gtest.h>
#include "caffe2/core/context_gpu.h"
#include "caffe2/core/init.h"
#include "caffe2/core/net.h"
#include "caffe2/core/operator.h"
#include "caffe2/mpi/mpi_common.h"
C10_DEFINE_string(
caffe_test_root,
"gen/",
"The root of the caffe test folder.");
namespace caffe2 {
const char kBcastNet[] = R"NET(
name: "bcast"
op {
output: "comm"
type: "MPICreateCommonWorld"
}
op {
output: "X"
type: "ConstantFill"
arg {
name: "shape"
ints: 10
}
arg {
name: "value"
f: 0.0
}
}
op {
input: "comm"
input: "X"
output: "X"
type: "MPIBroadcast"
arg {
name: "root"
i: 0
}
}
device_option {
device_type: 1
}
)NET";
TEST(MPITest, TestMPIBroadcast) {
NetDef net_def;
CHECK(TextFormat::ParseFromString(string(kBcastNet), &net_def));
// Let's set the network's constant fill value to be the mpi rank.
auto* arg = net_def.mutable_op(1)->mutable_arg(1);
CAFFE_ENFORCE_EQ(arg->name(), "value");
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
arg->set_f(rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
for (int root = 0; root < size; ++root) {
net_def.mutable_op(2)->mutable_arg(0)->set_i(root);
Workspace ws;
unique_ptr<NetBase> net(CreateNet(net_def, &ws));
EXPECT_NE(nullptr, net.get());
EXPECT_TRUE(net->Run());
// Let's test the value.
auto& X = ws.GetBlob("X")->Get<Tensor>();
Tensor X_cpu(X, CPU);
EXPECT_EQ(X.numel(), 10);
for (int i = 0; i < X.numel(); ++i) {
EXPECT_EQ(X_cpu.data<float>()[i], root);
}
}
}
const char kReduceNet[] = R"NET(
name: "reduce"
op {
output: "comm"
type: "MPICreateCommonWorld"
}
op {
output: "X"
type: "ConstantFill"
arg {
name: "shape"
ints: 10
}
arg {
name: "value"
f: 0.0
}
}
op {
input: "comm"
input: "X"
output: "X_reduced"
type: "MPIReduce"
arg {
name: "root"
i: 0
}
}
device_option {
device_type: 1
}
)NET";
TEST(MPITest, TestMPIReduce) {
NetDef net_def;
CHECK(TextFormat::ParseFromString(string(kReduceNet), &net_def));
// Let's set the network's constant fill value to be the mpi rank.
auto* arg = net_def.mutable_op(1)->mutable_arg(1);
CAFFE_ENFORCE_EQ(arg->name(), "value");
int rank0;
MPI_Comm_rank(MPI_COMM_WORLD, &rank0);
arg->set_f(rank0);
int size0;
MPI_Comm_size(MPI_COMM_WORLD, &size0);
for (int root = 0; root < size0; ++root) {
net_def.mutable_op(2)->mutable_arg(0)->set_i(root);
Workspace ws;
unique_ptr<NetBase> net(CreateNet(net_def, &ws));
EXPECT_NE(nullptr, net.get());
EXPECT_TRUE(net->Run());
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (rank == root) {
// Let's test the value.
auto& X = ws.GetBlob("X_reduced")->Get<TensorCUDA>();
EXPECT_EQ(X.numel(), 10);
int expected_result = size * (size - 1) / 2;
Tensor X_cpu(X, CPU);
for (int i = 0; i < X.numel(); ++i) {
EXPECT_EQ(X_cpu.data<float>()[i], expected_result);
}
}
}
}
const char kMPIAllgatherNet[] = R"NET(
name: "allgather"
op {
output: "comm"
type: "MPICreateCommonWorld"
}
op {
output: "X"
type: "ConstantFill"
arg {
name: "shape"
ints: 2
ints: 10
}
arg {
name: "value"
f: 0.0
}
}
op {
input: "comm"
input: "X"
output: "X_gathered"
type: "MPIAllgather"
}
device_option {
device_type: 1
}
)NET";
TEST(MPITest, TestMPIAllgather) {
NetDef net_def;
CHECK(TextFormat::ParseFromString(string(kMPIAllgatherNet), &net_def));
// Let's set the network's constant fill value to be the mpi rank.
auto* arg = net_def.mutable_op(1)->mutable_arg(1);
CAFFE_ENFORCE_EQ(arg->name(), "value");
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
arg->set_f(rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
Workspace ws;
unique_ptr<NetBase> net(CreateNet(net_def, &ws));
EXPECT_NE(nullptr, net.get());
EXPECT_TRUE(net->Run());
// Let's test the value.
auto& X = ws.GetBlob("X")->Get<TensorCUDA>();
Tensor X_cpu(X, CPU);
EXPECT_EQ(X.numel(), 20);
for (int i = 0; i < X.numel(); ++i) {
EXPECT_EQ(X_cpu.data<float>()[i], rank);
}
auto& X_gathered = ws.GetBlob("X_gathered")->Get<TensorCUDA>();
EXPECT_EQ(X_gathered.numel(), 20 * size);
EXPECT_EQ(X_gathered.size(0), 2 * size);
EXPECT_EQ(X_gathered.size(1), 10);
Tensor X_gathered_cpu(X_gathered, CPU);
for (int i = 0; i < X_gathered.numel(); ++i) {
EXPECT_EQ(X_gathered_cpu.data<float>()[i], i / 20);
}
}
const char kMPIAllreduceNet[] = R"NET(
name: "allreduce"
op {
output: "comm"
type: "MPICreateCommonWorld"
}
op {
output: "X"
type: "ConstantFill"
arg {
name: "shape"
ints: 10
}
arg {
name: "value"
f: 0.0
}
}
op {
input: "comm"
input: "X"
output: "X_reduced"
type: "MPIAllreduce"
}
device_option {
device_type: 1
}
)NET";
TEST(MPITest, TestMPIAllreduce) {
NetDef net_def;
CHECK(TextFormat::ParseFromString(string(kMPIAllreduceNet), &net_def));
// Let's set the network's constant fill value to be the mpi rank.
auto* arg = net_def.mutable_op(1)->mutable_arg(1);
CAFFE_ENFORCE_EQ(arg->name(), "value");
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
arg->set_f(rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
Workspace ws;
unique_ptr<NetBase> net(CreateNet(net_def, &ws));
EXPECT_NE(nullptr, net.get());
EXPECT_TRUE(net->Run());
// Let's test the value.
auto& X = ws.GetBlob("X")->Get<TensorCUDA>();
EXPECT_EQ(X.numel(), 10);
Tensor X_cpu(X, CPU);
for (int i = 0; i < X.numel(); ++i) {
EXPECT_EQ(X_cpu.data<float>()[i], rank);
}
auto& X_reduced = ws.GetBlob("X_reduced")->Get<TensorCUDA>();
EXPECT_EQ(X_reduced.numel(), 10);
int expected_result = size * (size - 1) / 2;
Tensor X_reduced_cpu(X_reduced, CPU);
for (int i = 0; i < X_reduced.numel(); ++i) {
EXPECT_EQ(X_reduced_cpu.data<float>()[i], expected_result);
}
}
const char kInPlaceMPIAllreduceNet[] = R"NET(
name: "allreduce"
op {
output: "comm"
type: "MPICreateCommonWorld"
}
op {
output: "X"
type: "ConstantFill"
arg {
name: "shape"
ints: 10
}
arg {
name: "value"
f: 0.0
}
}
op {
input: "comm"
input: "X"
output: "X"
type: "MPIAllreduce"
}
device_option {
device_type: 1
}
)NET";
TEST(MPITest, TestInPlaceMPIAllreduce) {
NetDef net_def;
CHECK(TextFormat::ParseFromString(string(kInPlaceMPIAllreduceNet), &net_def));
// Let's set the network's constant fill value to be the mpi rank.
auto* arg = net_def.mutable_op(1)->mutable_arg(1);
CAFFE_ENFORCE_EQ(arg->name(), "value");
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
arg->set_f(rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
Workspace ws;
unique_ptr<NetBase> net(CreateNet(net_def, &ws));
EXPECT_NE(nullptr, net.get());
EXPECT_TRUE(net->Run());
auto& X_reduced = ws.GetBlob("X")->Get<TensorCUDA>();
EXPECT_EQ(X_reduced.numel(), 10);
int expected_result = size * (size - 1) / 2;
Tensor X_reduced_cpu(X_reduced, CPU);
for (int i = 0; i < X_reduced.numel(); ++i) {
EXPECT_EQ(X_reduced_cpu.data<float>()[i], expected_result);
}
}
} // namespace caffe2
GTEST_API_ int main(int argc, char** argv) {
int mpi_ret;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &mpi_ret);
testing::InitGoogleTest(&argc, argv);
caffe2::GlobalInit(&argc, &argv);
int test_result = RUN_ALL_TESTS();
MPI_Finalize();
return test_result;
}