| #pragma once |
| |
| #include <c10/util/irange.h> |
| #include <torch/csrc/distributed/c10d/Store.hpp> |
| #include <torch/csrc/distributed/c10d/Types.hpp> |
| |
| #include <sys/types.h> |
| |
| #include <cstdlib> |
| #include <string> |
| #include <system_error> |
| #include <vector> |
| |
| namespace c10d { |
| inline std::string getTraceStartKey(const std::string& pgName, int rank) { |
| return pgName + "_" + std::to_string(rank) + "_trace_start"; |
| } |
| |
| inline std::string getTraceEndKey(const std::string& pgName, int rank) { |
| return pgName + "_" + std::to_string(rank) + "_trace_end"; |
| } |
| |
| inline bool traceUpdate( |
| c10::intrusive_ptr<Store>& store, |
| const std::string& key, |
| uint64_t seq, |
| const std::string& col) { |
| std::vector<uint8_t> value(col.size() + sizeof(seq) + 1); |
| memcpy(value.data(), &seq, sizeof(seq)); |
| memcpy(value.data() + sizeof(seq), col.data(), col.size()); |
| try { |
| store->set(key, value); |
| return true; |
| } catch (...) { |
| LOG(ERROR) << "Store is down while updating #" << seq << " with key " |
| << key; |
| return false; |
| } |
| return true; |
| } |
| |
| enum TraceDebugEvent { |
| kEventStart, |
| kEventEnd, |
| }; |
| // <seq, <rank, <col, start/end>>> |
| using TraceMap = |
| std::map<uint64_t, std::map<int, std::pair<std::string, TraceDebugEvent>>>; |
| |
| inline std::string ranksToString(const std::vector<int>& ranks) { |
| std::string str; |
| for (int rank : ranks) { |
| if (str.empty()) { |
| str = std::to_string(rank); |
| } else { |
| str += ", " + std::to_string(rank); |
| } |
| } |
| return str; |
| } |
| |
| inline std::string ranksFromTrace( |
| const std::vector<std::pair<int, std::string>>& items) { |
| std::string ranks; |
| for (auto& p : items) { |
| if (ranks.empty()) { |
| ranks = std::to_string(p.first); |
| } else { |
| ranks += ", " + std::to_string(p.first); |
| } |
| } |
| return ranks; |
| } |
| |
| inline std::string analyzeMissingRanks(const std::vector<int>& missingRanks) { |
| return c10::str( |
| "\n\t - To our best knowledge, ranks [", |
| ranksToString(missingRanks), |
| "] are the lagging ranks that caused this timeout. " |
| "They never joined any collectives"); |
| } |
| |
| inline std::string analyzeLaggingRanks(const TraceMap& traceMap) { |
| uint64_t lagSeq = traceMap.begin()->first; |
| std::vector<int> startRanks; |
| std::vector<int> endRanks; |
| for (auto& p : traceMap.begin()->second) { |
| if (p.second.second == kEventStart) { |
| startRanks.push_back(p.first); |
| } else { |
| endRanks.push_back(p.first); |
| } |
| } |
| std::string report = |
| "\n\t - To our best knowledge, the lagging/dead/mismatched ranks " |
| "that caused the desync are:"; |
| if (startRanks.size()) { |
| report += c10::str( |
| "\n\t - [", |
| ranksToString(startRanks), |
| "] joined but didn't finish collective #", |
| lagSeq, |
| " (count from 1)"); |
| } |
| if (endRanks.size()) { |
| report += c10::str( |
| "\n\t [", |
| ranksToString(endRanks), |
| "] finished collective #", |
| lagSeq, |
| ", but didn't join collective #", |
| lagSeq + 1, |
| " (count from 1)"); |
| } |
| return report; |
| } |
| |
| inline std::string dumpSnapshot(TraceMap& traceMap) { |
| std::string report = "\n\t - Snapshot of ranks' latest states:"; |
| for (auto& tracePair : traceMap) { |
| uint64_t seq = tracePair.first; |
| std::map<int, std::pair<std::string, TraceDebugEvent>>& subMap = |
| tracePair.second; |
| |
| std::unordered_map<std::string, std::vector<int>> collectivesStart; |
| std::unordered_map<std::string, std::vector<int>> collectivesEnd; |
| for (auto& p : subMap) { |
| int rank = p.first; |
| const std::string& col = p.second.first; |
| if (p.second.second == kEventStart) { |
| collectivesStart[col].push_back(rank); |
| } else { |
| collectivesEnd[col].push_back(rank); |
| } |
| } |
| |
| if (collectivesStart.size()) { |
| report += c10::str("\n\t #", seq, " started ranks:"); |
| for (auto& mapPair : collectivesStart) { |
| report += c10::str( |
| "\n\t [", |
| ranksToString(mapPair.second), |
| "] started ", |
| mapPair.first); |
| } |
| } |
| if (collectivesEnd.size()) { |
| report += c10::str("\n\t #", seq, " finished ranks:"); |
| for (auto& mapPair : collectivesEnd) { |
| report += c10::str( |
| "\n\t [", |
| ranksToString(mapPair.second), |
| "] finished ", |
| mapPair.first); |
| } |
| } |
| } |
| return report; |
| } |
| |
| inline bool parseTraceValue( |
| c10::intrusive_ptr<Store>& store, |
| const std::string& key, |
| uint64_t& seq, |
| std::string& col) { |
| try { |
| std::vector<uint8_t> traceValue = store->get(key); |
| memcpy(&seq, traceValue.data(), sizeof(seq)); |
| std::string colName((char*)traceValue.data() + sizeof(seq)); |
| col = colName; |
| return true; |
| } catch (...) { |
| LOG(ERROR) << "Store is down while getting key " << key; |
| return false; |
| } |
| return true; |
| } |
| |
| inline std::string retrieveDesyncReport( |
| c10::intrusive_ptr<Store>& store, |
| const std::string& pgName, |
| int myRank, |
| int worldSize) { |
| std::string report; |
| |
| uint64_t thisSeq; |
| std::string thisCol; |
| |
| std::vector<int> missingRanks; |
| TraceMap traceMap; |
| |
| for (const auto rank : c10::irange(worldSize)) { |
| // Build traceMapStart. |
| uint64_t seqStart; |
| { |
| std::string traceKeyStart = getTraceStartKey(pgName, rank); |
| if (!store->check({traceKeyStart})) { |
| missingRanks.push_back(rank); |
| continue; |
| } |
| std::string col; |
| if (!parseTraceValue(store, traceKeyStart, seqStart, col)) { |
| return report; |
| } |
| traceMap[seqStart].emplace(rank, std::make_pair(col, kEventStart)); |
| if (rank == myRank) { |
| thisSeq = seqStart; |
| thisCol = std::move(col); |
| } |
| } |
| |
| // Build traceMapEnd. |
| { |
| std::string traceKeyEnd = getTraceEndKey(pgName, rank); |
| if (!store->check({traceKeyEnd})) { |
| continue; |
| } |
| uint64_t seq; |
| std::string col; |
| if (!parseTraceValue(store, traceKeyEnd, seq, col)) { |
| return report; |
| } |
| if (seq == seqStart) { |
| traceMap[seq][rank].second = kEventEnd; |
| } |
| } |
| } |
| |
| TORCH_INTERNAL_ASSERT( |
| !missingRanks.empty() || !traceMap.empty(), |
| "Trace shouldn't be empty while enabled GLOO_ASYNC_TIMEOUT_DEBUG"); |
| TORCH_INTERNAL_ASSERT( |
| !thisCol.empty(), |
| "Timeout rank [", |
| myRank, |
| "] must have collective tracking iteam in c10::Store trace"); |
| TORCH_INTERNAL_ASSERT( |
| traceMap[thisSeq][myRank].second == kEventStart, |
| "Timeout rank [", |
| myRank, |
| "] last trace item must be kEventStart. thisSeq = ", |
| thisSeq, |
| ", col = ", |
| thisCol); |
| |
| report += c10::str( |
| "\n\t - [", myRank, "] Timeout at collective: ", thisCol, ", #", thisSeq); |
| |
| if (!missingRanks.empty()) { |
| report += analyzeMissingRanks(missingRanks); |
| } else { |
| report += analyzeLaggingRanks(traceMap); |
| report += dumpSnapshot(traceMap); |
| } |
| |
| return report; |
| } |
| |
| } // namespace c10d |