blob: 50c6387738d2b9d7ef2fe0d309ed7c3c0cdd8734 [file] [log] [blame]
#include <torch/csrc/distributed/c10d/ProcessGroupWrapper.hpp>
#ifdef USE_C10D_GLOO
#include <c10/core/Allocator.h>
#include <c10/core/DeviceType.h>
#include <c10/core/ScalarType.h>
#include <c10/core/TensorOptions.h>
#include <c10/util/Exception.h>
#include <c10/util/Optional.h>
#include <c10/util/intrusive_ptr.h>
#include <c10/util/irange.h>
#include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
#include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp>
#include <stdexcept>
#include <utility>
namespace c10d {
namespace {
// A container for information about a particular collective, including optype
// and input tensors (if applicable.)
struct CollectiveFingerPrint {
// Current collective's operation type.
OpType op_type_;
// Number of input tensors
std::size_t num_tensors_{};
// input tensor data types
std::vector<int8_t> tensor_dtypes_;
// input tensor device types
std::vector<int8_t> tensor_device_types_;
// input tensor sizes
std::vector<std::vector<int64_t>> tensor_sizes_;
int sequence_number_;
CollectiveFingerPrint(
OpType op_type,
const std::vector<at::Tensor>& input_tensors,
int sequence_number)
: op_type_(op_type),
num_tensors_(input_tensors.size()),
sequence_number_(sequence_number) {
tensor_dtypes_.reserve(num_tensors_);
tensor_device_types_.reserve(num_tensors_);
tensor_sizes_.reserve(num_tensors_);
for (const at::Tensor& t : input_tensors) {
tensor_dtypes_.push_back(static_cast<int8_t>(t.dtype().toScalarType()));
tensor_device_types_.push_back(static_cast<int8_t>(t.device().type()));
tensor_sizes_.push_back(t.sizes().vec());
}
}
// Constructor for the data received from deserialized fingerprint
CollectiveFingerPrint(
OpType op_type,
size_t num_tensors,
std::vector<int8_t> tensor_dtypes,
std::vector<int8_t> tensor_device_types,
std::vector<std::vector<int64_t>> tensor_sizes,
int sequence_number)
: op_type_(op_type),
num_tensors_(num_tensors),
tensor_dtypes_(std::move(tensor_dtypes)),
tensor_device_types_(std::move(tensor_device_types)),
tensor_sizes_(std::move(tensor_sizes)),
sequence_number_(sequence_number) {}
// Logs collective information in case of a failure.
friend std::ostream& operator<<(
std::ostream& output,
const CollectiveFingerPrint& collective_fingerprint);
// Executes and verifies the collective fingerprint.
void verify(c10::intrusive_ptr<Backend> backend) {
at::Tensor serialized_tensor = serialize_fingerprint();
std::vector<at::Tensor> inp{serialized_tensor};
// First verify tensor shapes. This is needed because if e.g. tensor dim
// does not match across processes, directly verifying tensors will result
// in a crash during allgather, but we'd actually like to report a
// description about the inconsistency. Since the input is just a 1D tensor
// the shape will be a single int k_i and we need to make sure k_i is
// consistent across the whole world.
std::vector<at::Tensor> sp = c10d::getTensorShapes(inp);
verify_tensors(sp, backend);
// Now verify consistency for the actual tensor.
verify_tensors(inp, backend);
}
// Takes a serialized fingerprint from
// CollectiveFingerPrint::serialize_fingerprint and deserializes it back to a
// CollectiveFingerPrint struct
CollectiveFingerPrint deserialize_fingerprint(at::Tensor serialized_tensor) {
OpType optype;
auto dtypes = std::vector<int8_t>();
auto device_types = std::vector<int8_t>();
auto sizes = std::vector<std::vector<int64_t>>();
int index = 0;
int seq = 0;
// 1. OpType
optype = OpType(serialized_tensor[index].item<int>());
index++;
int num_tensors = 0;
if (index < serialized_tensor.size(0)) {
seq = serialized_tensor[index].item<int64_t>();
index++;
// 2. Num tensors
num_tensors = serialized_tensor[index].item<int>();
index++;
dtypes.reserve(num_tensors);
device_types.reserve(num_tensors);
sizes.reserve(num_tensors);
// 3. Tensor dtypes
for (int i = 0; i < num_tensors; i++) {
dtypes.push_back(serialized_tensor[index].item<int8_t>());
index++;
}
// 4. Device types
for (int i = 0; i < num_tensors; i++) {
device_types.push_back(serialized_tensor[index].item<int8_t>());
index++;
}
// 5. Tensor shapes
for (int i = 0; i < num_tensors; i++) {
// 5a. Shape size
int size = serialized_tensor[index].item<int>();
index++;
// 5b. Shape
auto shapeVec = std::vector<int64_t>();
shapeVec.reserve(size);
for (int j = 0; j < size; j++) {
shapeVec.push_back(serialized_tensor[index].item<int64_t>());
index++;
}
sizes.push_back(shapeVec);
}
}
return CollectiveFingerPrint(
optype, num_tensors, dtypes, device_types, sizes, seq);
}
private:
void verify_tensors(
std::vector<at::Tensor>& tensors_to_verify,
c10::intrusive_ptr<Backend>& backend) {
// Create output tensor data structure to pass into allgather.
std::vector<std::vector<at::Tensor>> output_tensors;
// output tensors: [<tensor 0 outputs>, <tensor 1 outputs>, ..., <tensor n
// outputs>]
output_tensors.reserve(tensors_to_verify.size());
for (const auto& tensor_shape : tensors_to_verify) {
// Each rank has its own outputs shape, e.g.
// <tensor 0 outputs>: [<rank 0 tensor>, <rank 1 tensor>, ..., <rank n
// tensor>]
std::vector<at::Tensor> outputs;
outputs.reserve(backend->getSize());
for (const auto i : c10::irange(backend->getSize())) {
std::ignore = i; // Suppress unused variable warning
outputs.emplace_back(at::zeros_like(tensor_shape));
}
output_tensors.emplace_back(outputs);
}
// Allgather tensor shapes.
backend->allgather(output_tensors, tensors_to_verify)->wait();
// Verify equivalence
for (const auto i : c10::irange(output_tensors.size())) {
const std::vector<at::Tensor> gathered_tensors = output_tensors[i];
const at::Tensor reference_tensor = tensors_to_verify[i];
for (const auto rank : c10::irange(gathered_tensors.size())) {
const auto& rank_tensor = gathered_tensors[rank];
if (!rank_tensor.equal(reference_tensor)) {
CollectiveFingerPrint rank_fingerprint =
deserialize_fingerprint(rank_tensor);
std::stringstream ss;
ss << "Detected mismatch between collectives on ranks. Rank "
<< backend->getRank() << " is running collective: " << *this
<< ", but Rank " << rank
<< " is running collective: " << rank_fingerprint << ".";
auto diff_result = compute_collective_diff(rank_fingerprint);
if (std::get<0>(diff_result)) {
ss << std::get<1>(diff_result);
}
TORCH_CHECK(false, ss.str());
}
}
}
}
static std::vector<std::string> get_size_strs(
const CollectiveFingerPrint& collective_fingerprint) {
std::vector<std::string> size_strs;
if (!collective_fingerprint.tensor_sizes_.empty()) {
for (const auto& single_tensor_shape_num :
collective_fingerprint.tensor_sizes_[0]) {
size_strs.emplace_back(std::to_string(single_tensor_shape_num));
}
}
return size_strs;
}
static std::vector<std::string> get_dtype_strs(
const CollectiveFingerPrint& collective_fingerprint) {
std::vector<std::string> dtype_strs;
dtype_strs.reserve(collective_fingerprint.tensor_dtypes_.size());
for (const auto& tensor_dtype : collective_fingerprint.tensor_dtypes_) {
dtype_strs.emplace_back(
c10::toString(static_cast<at::ScalarType>(tensor_dtype)));
}
return dtype_strs;
}
static std::vector<std::string> get_device_type_strs(
const CollectiveFingerPrint& collective_fingerprint) {
std::vector<std::string> device_type_strs;
device_type_strs.reserve(
collective_fingerprint.tensor_device_types_.size());
for (const auto& tensor_device_type :
collective_fingerprint.tensor_device_types_) {
device_type_strs.emplace_back(
c10::toString(static_cast<at::DeviceType>(tensor_device_type)));
}
return device_type_strs;
}
std::pair<bool, std::string> compute_collective_diff(
CollectiveFingerPrint& other) {
// Computes the difference between two collectives (seq num, tensor shapes,
// collective type, etc) for easier understanding of how mismatched
// collectives across ranks differ.
bool found_diff = false;
std::stringstream ss;
ss << "Collectives differ in the following aspects: ";
// Check seq_num
if (other.sequence_number_ != sequence_number_) {
found_diff = true;
ss << c10::str(
"\t Sequence number: ",
sequence_number_,
"vs ",
other.sequence_number_);
}
// Check op type
auto other_op = opTypeToString(other.op_type_);
auto this_op = opTypeToString(op_type_);
if (other_op.compare(this_op) != 0) {
found_diff = true;
ss << c10::str(" Op type: ", this_op, "vs ", other_op);
}
auto check = [&ss, &found_diff](
const char* arg,
std::vector<std::string> other,
std::vector<std::string> curr) {
if (other.size() != curr.size()) {
found_diff = true;
ss << c10::str(" Tensor ", arg, ": ", curr, "vs ", other);
return;
}
for (size_t i = 0; i < other.size(); ++i) {
if (other[i].compare(curr[i]) != 0) {
found_diff = true;
ss << c10::str(" Tensor ", arg, ": ", curr, "vs ", other);
return;
}
}
};
// check tensor sizes
auto other_sizes = get_size_strs(other);
auto this_sizes = get_size_strs(*this);
check("Tensor shapes", other_sizes, this_sizes);
// check tensor dtypes
auto other_dtypes = get_dtype_strs(other);
auto this_dtypes = get_dtype_strs(*this);
check("Tensor dtypes", other_dtypes, this_dtypes);
// check tensor devices
auto other_devices = get_device_type_strs(other);
auto this_devices = get_device_type_strs(*this);
check("Tensor devices", other_devices, this_devices);
if (!found_diff) {
return std::make_pair(false, ss.str());
} else {
return std::make_pair(true, ss.str());
}
}
// Serializes the information (op type, input shapes, data types, device
// types) about the collective fingerprint into a tensor
at::Tensor serialize_fingerprint() {
auto data = std::make_unique<std::vector<int64_t>>();
// std::vector<int64_t> data;
// 1. OpType
data->push_back(static_cast<int64_t>(op_type_));
// sequence number
data->push_back(sequence_number_);
// 2. Num tensors
data->push_back(static_cast<int64_t>(num_tensors_));
// 3. Tensor dtypes
for (const auto& type : tensor_dtypes_) {
data->push_back(type);
}
// 4. Device types
for (const auto& d : tensor_device_types_) {
data->push_back(d);
}
// 5. Shapes
for (const auto& sizes : tensor_sizes_) {
data->push_back(sizes.size());
for (const auto& s : sizes) {
data->push_back(s);
}
}
// Serialize data into tensor
int64_t data_size = data->size();
// Need to release here and get the ptr due to C++ parameter evaluation
// order.
auto d = data.release();
at::Tensor serialized_tensor =
at::for_blob(d->data(), {data_size})
.context(
d,
[](void* ctx) {
delete static_cast<std::vector<int64_t>*>(ctx);
})
.options(at::TensorOptions().dtype(at::kLong))
.make_tensor();
return serialized_tensor;
}
};
std::ostream& operator<<(
std::ostream& output,
const CollectiveFingerPrint& collective_fingerprint) {
std::string collectiveInfo;
auto op_type_str = opTypeToString(collective_fingerprint.op_type_);
if (collective_fingerprint.num_tensors_ != 0) {
// Convert dtype and device type info to string.
std::vector<std::string> dtype_strs =
CollectiveFingerPrint::get_dtype_strs(collective_fingerprint);
std::vector<std::string> device_type_strs =
CollectiveFingerPrint::get_device_type_strs(collective_fingerprint);
std::vector<std::string> size_strs =
CollectiveFingerPrint::get_size_strs(collective_fingerprint);
collectiveInfo = c10::str(
"CollectiveFingerPrint(",
"SequenceNumber=",
collective_fingerprint.sequence_number_,
", OpType=",
op_type_str,
", TensorShape=[",
c10::Join(", ", size_strs),
"], TensorDtypes=",
(dtype_strs),
", TensorDeviceTypes=",
(device_type_strs),
")");
} else {
collectiveInfo = c10::str(
"CollectiveFingerPrint(",
"SequenceNumber=",
collective_fingerprint.sequence_number_,
"OpType=",
op_type_str,
")");
}
return output << collectiveInfo;
}
} // namespace
ProcessGroupWrapper::ProcessGroupWrapper(
c10::intrusive_ptr<Backend> backend,
c10::intrusive_ptr<Backend> glooBackend)
: Backend(backend->getRank(), backend->getSize()),
backend_(backend),
glooBackend_(std::move(glooBackend)) {
// Set the sequence number for the underlying process group.
backend_->setSequenceNumberForGroup();
}
const std::string ProcessGroupWrapper::getBackendName() const {
return backend_->getBackendName();
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::broadcast(
std::vector<at::Tensor>& data,
const BroadcastOptions& opts) {
runCollectiveChecks(OpType::BROADCAST, data);
return backend_->broadcast(data, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::allreduce(
std::vector<at::Tensor>& data,
const AllreduceOptions& opts) {
runCollectiveChecks(OpType::ALLREDUCE, data);
return backend_->allreduce(data, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::allreduce_coalesced(
std::vector<at::Tensor>& tensors,
const AllreduceCoalescedOptions& opts) {
// NOTE: We don't enforce shape checking for allreduce_coalesced because
// the implementation itself does not enforce it we have tests that use
// inconsistent shapes, see python implementation in distributed_c10d for
// details.
runCollectiveChecks(OpType::ALLREDUCE_COALESCED, {});
return backend_->allreduce_coalesced(tensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::reduce(
std::vector<at::Tensor>& tensors,
const ReduceOptions& opts) {
runCollectiveChecks(OpType::REDUCE, tensors);
return backend_->reduce(tensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts) {
runCollectiveChecks(OpType::ALLGATHER, inputTensors);
return backend_->allgather(outputTensors, inputTensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::_allgather_base(
at::Tensor& outputBuffer,
at::Tensor& inputBuffer,
const AllgatherOptions& opts) {
std::vector<at::Tensor> inputTensors({inputBuffer});
runCollectiveChecks(OpType::_ALLGATHER_BASE, inputTensors);
return backend_->_allgather_base(outputBuffer, inputBuffer, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::allgather_coalesced(
std::vector<std::vector<at::Tensor>>& outputTensorLists,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts) {
// NOTE: We don't enforce shape checking for allgather_coalesced because
// the implementation itself does not enforce it we have tests that use
// inconsistent shapes, see python implementation in distributed_c10d for
// details.
runCollectiveChecks(OpType::ALLGATHER_COALESCED, {});
return backend_->allgather_coalesced(outputTensorLists, inputTensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::gather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const GatherOptions& opts) {
runCollectiveChecks(OpType::GATHER, inputTensors);
return backend_->gather(outputTensors, inputTensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::scatter(
std::vector<at::Tensor>& outputTensors,
std::vector<std::vector<at::Tensor>>& inputTensors,
const ScatterOptions& opts) {
runCollectiveChecks(OpType::SCATTER, outputTensors);
return backend_->scatter(outputTensors, inputTensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::reduce_scatter(
std::vector<at::Tensor>& outputTensors,
std::vector<std::vector<at::Tensor>>& inputTensors,
const ReduceScatterOptions& opts) {
runCollectiveChecks(OpType::REDUCE_SCATTER, outputTensors);
return backend_->reduce_scatter(outputTensors, inputTensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::alltoall_base(
at::Tensor& outputTensor,
at::Tensor& inputTensor,
std::vector<int64_t>& outputSplitSizes,
std::vector<int64_t>& inputSplitSizes,
const AllToAllOptions& opts) {
// alltoall supports uneven split, so don't enforce shape checking.
runCollectiveChecks(OpType::ALLTOALL_BASE, {});
return backend_->alltoall_base(
outputTensor, inputTensor, outputSplitSizes, inputSplitSizes, opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::alltoall(
std::vector<at::Tensor>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllToAllOptions& opts) {
// alltoall supports uneven split, so don't enforce shape checking.
runCollectiveChecks(OpType::ALLTOALL, {});
return backend_->alltoall(outputTensors, inputTensors, opts);
}
void ProcessGroupWrapper::monitoredBarrier(
const BarrierOptions& opts,
bool waitAllRanks) {
return backend_->monitoredBarrier(opts, waitAllRanks);
}
void ProcessGroupWrapper::setSequenceNumberForGroup() {
// Set underlying pg's sequence number if it is not set.
if (backend_->getSequenceNumberForGroup() == 0) {
// Set the sequence number for the underlying process group.
backend_->setSequenceNumberForGroup();
}
}
uint64_t ProcessGroupWrapper::getSequenceNumberForGroup() {
return backend_->getSequenceNumberForGroup();
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::send(
std::vector<at::Tensor>& tensors,
int dstRank,
int tag) {
return backend_->send(tensors, dstRank, tag);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::recv(
std::vector<at::Tensor>& tensors,
int srcRank,
int tag) {
return backend_->recv(tensors, srcRank, tag);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::recvAnysource(
std::vector<at::Tensor>& tensors,
int tag) {
return backend_->recvAnysource(tensors, tag);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::barrier(
const BarrierOptions& opts) {
runCollectiveChecks(OpType::BARRIER, {});
return backend_->barrier(opts);
}
c10::intrusive_ptr<Work> ProcessGroupWrapper::_reduce_scatter_base(
at::Tensor& outputBuffer,
at::Tensor& inputBuffer,
const ReduceScatterOptions& opts) {
runCollectiveChecks(
OpType::_REDUCE_SCATTER_BASE, {inputBuffer, outputBuffer});
return backend_->_reduce_scatter_base(outputBuffer, inputBuffer, opts);
}
c10::intrusive_ptr<Backend> ProcessGroupWrapper::getWrappedPg() const {
return backend_;
}
void ProcessGroupWrapper::runCollectiveChecks(
OpType op_type,
const std::vector<at::Tensor>& tensors) {
// first perform a monitored barrier to ensure all ranks can synchronize.
c10d::BarrierOptions options;
// TODO: we should use wrapped backend_'s timeout here, but C++ ProcessGroup
// API does not expose timeout.
auto seq = getSequenceNumberForGroup();
auto finger_print = CollectiveFingerPrint(op_type, tensors, seq);
LOG(INFO) << "[Rank " << getRank() << "] "
<< "Running collective: " << finger_print;
try {
glooBackend_->monitoredBarrier(options, /* waitAllRanks */ true);
} catch (const std::runtime_error& e) {
// Attach collective info to the exception and re-raise.
std::stringstream ss;
ss << finger_print;
auto collective_info = ss.str();
auto err_msg = c10::str(
"ProcessGroupWrapper: Monitored Barrier encountered error running collective: ",
collective_info,
". Error: \n",
e.what());
TORCH_CHECK(false, err_msg);
}
// Will throw if an ill-formed collective is detected.
finger_print.verify(glooBackend_);
}
} // namespace c10d
#endif // USE_C10D_GLOO