blob: 2b3358d24c783fed809b1aa29039c65e1bbdc706 [file] [log] [blame]
#pragma once
#include <c10/util/irange.h>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <sys/types.h>
#include <cstdlib>
#include <string>
#include <system_error>
#include <vector>
namespace c10d {
inline std::string getTraceStartKey(const std::string& pgName, int rank) {
return pgName + "_" + std::to_string(rank) + "_trace_start";
}
inline std::string getTraceEndKey(const std::string& pgName, int rank) {
return pgName + "_" + std::to_string(rank) + "_trace_end";
}
inline bool traceUpdate(
c10::intrusive_ptr<Store>& store,
const std::string& key,
uint64_t seq,
const std::string& col) {
std::vector<uint8_t> value(col.size() + sizeof(seq) + 1);
memcpy(value.data(), &seq, sizeof(seq));
memcpy(value.data() + sizeof(seq), col.data(), col.size());
try {
store->set(key, value);
return true;
} catch (...) {
LOG(ERROR) << "Store is down while updating #" << seq << " with key "
<< key;
return false;
}
return true;
}
enum TraceDebugEvent {
kEventStart,
kEventEnd,
};
// <seq, <rank, <col, start/end>>>
using TraceMap =
std::map<uint64_t, std::map<int, std::pair<std::string, TraceDebugEvent>>>;
inline std::string ranksToString(const std::vector<int>& ranks) {
std::string str;
for (int rank : ranks) {
if (str.empty()) {
str = std::to_string(rank);
} else {
str += ", " + std::to_string(rank);
}
}
return str;
}
inline std::string ranksFromTrace(
const std::vector<std::pair<int, std::string>>& items) {
std::string ranks;
for (auto& p : items) {
if (ranks.empty()) {
ranks = std::to_string(p.first);
} else {
ranks += ", " + std::to_string(p.first);
}
}
return ranks;
}
inline std::string analyzeMissingRanks(const std::vector<int>& missingRanks) {
return c10::str(
"\n\t - To our best knowledge, ranks [",
ranksToString(missingRanks),
"] are the lagging ranks that caused this timeout. "
"They never joined any collectives");
}
inline std::string analyzeLaggingRanks(const TraceMap& traceMap) {
uint64_t lagSeq = traceMap.begin()->first;
std::vector<int> startRanks;
std::vector<int> endRanks;
for (auto& p : traceMap.begin()->second) {
if (p.second.second == kEventStart) {
startRanks.push_back(p.first);
} else {
endRanks.push_back(p.first);
}
}
std::string report =
"\n\t - To our best knowledge, the lagging/dead/mismatched ranks "
"that caused the desync are:";
if (startRanks.size()) {
report += c10::str(
"\n\t - [",
ranksToString(startRanks),
"] joined but didn't finish collective #",
lagSeq,
" (count from 1)");
}
if (endRanks.size()) {
report += c10::str(
"\n\t [",
ranksToString(endRanks),
"] finished collective #",
lagSeq,
", but didn't join collective #",
lagSeq + 1,
" (count from 1)");
}
return report;
}
inline std::string dumpSnapshot(TraceMap& traceMap) {
std::string report = "\n\t - Snapshot of ranks' latest states:";
for (auto& tracePair : traceMap) {
uint64_t seq = tracePair.first;
std::map<int, std::pair<std::string, TraceDebugEvent>>& subMap =
tracePair.second;
std::unordered_map<std::string, std::vector<int>> collectivesStart;
std::unordered_map<std::string, std::vector<int>> collectivesEnd;
for (auto& p : subMap) {
int rank = p.first;
const std::string& col = p.second.first;
if (p.second.second == kEventStart) {
collectivesStart[col].push_back(rank);
} else {
collectivesEnd[col].push_back(rank);
}
}
if (collectivesStart.size()) {
report += c10::str("\n\t #", seq, " started ranks:");
for (auto& mapPair : collectivesStart) {
report += c10::str(
"\n\t [",
ranksToString(mapPair.second),
"] started ",
mapPair.first);
}
}
if (collectivesEnd.size()) {
report += c10::str("\n\t #", seq, " finished ranks:");
for (auto& mapPair : collectivesEnd) {
report += c10::str(
"\n\t [",
ranksToString(mapPair.second),
"] finished ",
mapPair.first);
}
}
}
return report;
}
inline bool parseTraceValue(
c10::intrusive_ptr<Store>& store,
const std::string& key,
uint64_t& seq,
std::string& col) {
try {
std::vector<uint8_t> traceValue = store->get(key);
memcpy(&seq, traceValue.data(), sizeof(seq));
std::string colName((char*)traceValue.data() + sizeof(seq));
col = colName;
return true;
} catch (...) {
LOG(ERROR) << "Store is down while getting key " << key;
return false;
}
return true;
}
inline std::string retrieveDesyncReport(
c10::intrusive_ptr<Store>& store,
const std::string& pgName,
int myRank,
int worldSize) {
std::string report;
uint64_t thisSeq;
std::string thisCol;
std::vector<int> missingRanks;
TraceMap traceMap;
for (const auto rank : c10::irange(worldSize)) {
// Build traceMapStart.
uint64_t seqStart;
{
std::string traceKeyStart = getTraceStartKey(pgName, rank);
if (!store->check({traceKeyStart})) {
missingRanks.push_back(rank);
continue;
}
std::string col;
if (!parseTraceValue(store, traceKeyStart, seqStart, col)) {
return report;
}
traceMap[seqStart].emplace(rank, std::make_pair(col, kEventStart));
if (rank == myRank) {
thisSeq = seqStart;
thisCol = std::move(col);
}
}
// Build traceMapEnd.
{
std::string traceKeyEnd = getTraceEndKey(pgName, rank);
if (!store->check({traceKeyEnd})) {
continue;
}
uint64_t seq;
std::string col;
if (!parseTraceValue(store, traceKeyEnd, seq, col)) {
return report;
}
if (seq == seqStart) {
traceMap[seq][rank].second = kEventEnd;
}
}
}
TORCH_INTERNAL_ASSERT(
!missingRanks.empty() || !traceMap.empty(),
"Trace shouldn't be empty while enabled GLOO_ASYNC_TIMEOUT_DEBUG");
TORCH_INTERNAL_ASSERT(
!thisCol.empty(),
"Timeout rank [",
myRank,
"] must have collective tracking iteam in c10::Store trace");
TORCH_INTERNAL_ASSERT(
traceMap[thisSeq][myRank].second == kEventStart,
"Timeout rank [",
myRank,
"] last trace item must be kEventStart. thisSeq = ",
thisSeq,
", col = ",
thisCol);
report += c10::str(
"\n\t - [", myRank, "] Timeout at collective: ", thisCol, ", #", thisSeq);
if (!missingRanks.empty()) {
report += analyzeMissingRanks(missingRanks);
} else {
report += analyzeLaggingRanks(traceMap);
report += dumpSnapshot(traceMap);
}
return report;
}
} // namespace c10d